Github user Librago commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1384#discussion_r208434274 --- Diff: contrib/exthdfs/exthdfs.c --- @@ -0,0 +1,472 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + + +#include "postgres.h" + +#include "common.h" +#include "access/extprotocol.h" +#include "cdb/cdbdatalocality.h" +#include "storage/fd.h" +#include "storage/filesystem.h" +#include "utils/uri.h" + + + + +PG_MODULE_MAGIC; + +PG_FUNCTION_INFO_V1(hdfsprotocol_blocklocation); +PG_FUNCTION_INFO_V1(hdfsprotocol_validate); + +Datum hdfsprotocol_blocklocation(PG_FUNCTION_ARGS); +Datum hdfsprotocol_validate(PG_FUNCTION_ARGS); + +Datum hdfsprotocol_blocklocation(PG_FUNCTION_ARGS) +{ + + // Build the result instance + int nsize = 0; + int numOfBlock = 0; + ExtProtocolBlockLocationData *bldata = + palloc0(sizeof(ExtProtocolBlockLocationData)); + if (bldata == NULL) + { + elog(ERROR, "hdfsprotocol_blocklocation : " + "cannot allocate due to no memory"); + } + bldata->type = T_ExtProtocolBlockLocationData; + fcinfo->resultinfo = bldata; + + ExtProtocolValidatorData *pvalidator_data = (ExtProtocolValidatorData *) + (fcinfo->context); + + + // Parse URI of the first location, we expect all locations uses the same + // name node server. This is checked in validation function. + + char *first_uri_str = + (char *)strVal(lfirst(list_head(pvalidator_data->url_list))); + Uri *uri = ParseExternalTableUri(first_uri_str); + + elog(DEBUG3, "hdfsprotocol_blocklocation : " + "extracted HDFS name node address %s:%d", + uri->hostname, uri->port); + + // Create file system instance + hdfsFS fs = hdfsConnect(uri->hostname, uri->port); + if (fs == NULL) + { + elog(ERROR, "hdfsprotocol_blocklocation : " + "failed to create HDFS instance to connect to %s:%d", + uri->hostname, uri->port); + } + + // Clean up uri instance as we don't need it any longer + pfree(uri); + + // Check all locations to get files to fetch location. + ListCell *lc = NULL; + foreach(lc, pvalidator_data->url_list) + { + // Parse current location URI. + char *url = (char *)strVal(lfirst(lc)); + Uri *uri = ParseExternalTableUri(url); + if (uri == NULL) + { + elog(ERROR, "hdfsprotocol_blocklocation : " + "invalid URI encountered %s", url); + } + + // + // NOTICE: We temporarily support only directories as locations. We plan + // to extend the logic to specifying single file as one location + // very soon. + + + // get files contained in the path. + hdfsFileInfo *fiarray = hdfsListDirectory(fs, uri->path,&nsize); + if (fiarray == NULL) + { + elog(ERROR, "hdfsprotocol_blocklocation : " + "failed to get files of path %s", + uri->path); + } + + int i = 0 ; + // Call block location api to get data location for each file + for (i = 0 ; i < nsize ; i++) + { + hdfsFileInfo *fi = &fiarray[i]; + + // break condition of this for loop + if (fi == NULL) {break;} + + // Build file name full path. + const char *fname = fi->mName; + char *fullpath = palloc0( // slash + strlen(fname) + // name + 1); // \0 + sprintf(fullpath, "%s", fname); + + elog(DEBUG3, "hdfsprotocol_blocklocation : " + "built full path file %s", fullpath); + + // Get file full length. + int64_t len = fi->mSize; + + elog(DEBUG3, "hdfsprotocol_blocklocation : " + "got file %s length " INT64_FORMAT, + fullpath, len); + + if (len == 0) { + pfree(fullpath); + continue; + } + + // Get block location data for this file + BlockLocation *bla = hdfsGetFileBlockLocations(fs, fullpath, 0, len,&numOfBlock); + if (bla == NULL) + { + elog(ERROR, "hdfsprotocol_blocklocation : " + "failed to get block location of path %s. " + "It is reported generally due to HDFS service errors or " + "another session's ongoing writing.", + fullpath); + } + + // Add file full path and its block number as result. + blocklocation_file *blf = palloc0(sizeof(blocklocation_file)); + blf->file_uri = pstrdup(fullpath); + blf->block_num = numOfBlock; + blf->locations = palloc0(sizeof(BlockLocation) * blf->block_num); + + elog(DEBUG3, "hdfsprotocol_blocklocation : file %s has %d blocks", + fullpath, blf->block_num); + + // We don't need it any longer + pfree(fullpath); + int bidx = 0; + // Add block information as a list. + for (bidx = 0 ; bidx < blf->block_num ; bidx++) + { + BlockLocation *blo = &bla[bidx]; + BlockLocation *bl = &(blf->locations[bidx]); + bl->numOfNodes = blo->numOfNodes; + bl->hosts = (char **)palloc0(sizeof(char *) * bl->numOfNodes); + bl->names = (char **)palloc0(sizeof(char *) * bl->numOfNodes); + bl->topologyPaths = (char **)palloc0(sizeof(char *) * bl->numOfNodes); + bl->offset = blo->offset; + bl->length = blo->length; + bl->corrupt = blo->corrupt; + + int nidx = 0 ; + for (nidx = 0 ; nidx < bl->numOfNodes ; nidx++) + { + bl->hosts[nidx] = pstrdup(*blo[nidx].hosts); + bl->names[nidx] = pstrdup(*blo[nidx].names); + bl->topologyPaths[nidx] =pstrdup(*blo[nidx].topologyPaths); + } + } + + bldata->files = lappend(bldata->files, (void *)(blf)); + + // Clean up block location instances created by the lib. + hdfsFreeFileBlockLocations(bla,numOfBlock); + } + + // Clean up URI instance in loop as we don't need it any longer + pfree(uri); + + // Clean up file info array created by the lib for this location. + hdfsFreeFileInfo(fiarray,nsize); + } + + // destroy fs instance + hdfsDisconnect(fs); + + PG_RETURN_VOID(); + +} + +Datum hdfsprotocol_validate(PG_FUNCTION_ARGS) +{ + elog(DEBUG3, "hdfsprotocol_validate() begin"); + + /* Check which action should perform. */ + ExtProtocolValidatorData *pvalidator_data = + (ExtProtocolValidatorData *)(fcinfo->context); + + if (pvalidator_data->forceCreateDir) + Assert(pvalidator_data->url_list && pvalidator_data->url_list->length == 1); + + if (pvalidator_data->direction == EXT_VALIDATE_WRITE) + { + /* accept only one directory location */ + if (list_length(pvalidator_data->url_list) != 1) + { + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("hdfsprotocol_validate : " + "only one location url is supported for writable external hdfs"))); + } + } + + /* Go through first round to get formatter type */ + bool isCsv = false; + bool isText = false; + bool isOrc = false; + ListCell *optcell = NULL; + foreach(optcell, pvalidator_data->format_opts) + { + DefElem *de = (DefElem *)lfirst(optcell); + if (strcasecmp(de->defname, "formatter") == 0) + { + char *val = strVal(de->arg); + if (strcasecmp(val, "csv") == 0) + { + isCsv = true; + } + else if (strcasecmp(val, "text") == 0) + { + isText = true; + } + else if (strcasecmp(val, "orc") == 0) + { + isOrc = true; + } + } + } + if(1) + { + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("hdfsprotocol_validate : " + "no formatter is supported for external hdfs"))); + } + if (!isCsv && !isText && !isOrc) + { + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("hdfsprotocol_validate : " + "only 'csv', 'text' and 'orc' formatter is supported for external hdfs"))); + } + Assert(isCsv || isText || isOrc); + + /* Validate formatter options */ + foreach(optcell, pvalidator_data->format_opts) + { + DefElem *de = (DefElem *)lfirst(optcell); + if (strcasecmp(de->defname, "delimiter") == 0) + { + char *val = strVal(de->arg); + /* Validation 1. User can not specify 'OFF' in delimiter */ + if (strcasecmp(val, "off") == 0) + { + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("hdfsprotocol_validate : " + "'off' value of 'delimiter' option is not supported"))); + } + /* Validation 2. Can specify multibytes characters */ + if (strlen(val) < 1) + { + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("hdfsprotocol_validate : " + "'delimiter' option accepts multibytes characters"))); + } + } + + if (strcasecmp(de->defname, "escape") == 0) + { + char *val = strVal(de->arg); + /* Validation 3. User can not specify 'OFF' in delimiter */ + if (strcasecmp(val, "off") == 0) + { + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("hdfsprotocol_validate : " + "'off' value of 'escape' option is not supported"))); + } + /* Validation 4. Can only specify one character */ + if (strlen(val) != 1) + { + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("hdfsprotocol_validate : " + "'escape' option accepts single character"))); + } + } + + if (strcasecmp(de->defname, "newline") == 0) + { + char *val = strVal(de->arg); + /* Validation 5. only accept 'lf', 'cr', 'crlf' */ + if (strcasecmp(val, "lf") != 0 && + strcasecmp(val, "cr") != 0 && + strcasecmp(val, "crlf") != 0) + { + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("hdfsprotocol_validate : " + "the value of 'newline' option can only be " + "'lf', 'cr' or 'crlf'"))); + } + } + + if (strcasecmp(de->defname, "quote") == 0) + { + /* This is allowed only for csv mode formatter */ + if (!isCsv) + { + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("hdfsprotocol_validate : " + "'quote' option is only available in 'csv' formatter"))); + } + + char *val = strVal(de->arg); + /* Validation 5. Can only specify one character */ + if (strlen(val) != 1) + { + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("hdfsprotocol_validate : " + "'quote' option accepts single character"))); + } + } + + if (strcasecmp(de->defname, "force_notnull") == 0) + { + /* This is allowed only for csv mode formatter */ + if (!isCsv) + { + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("hdfsprotocol_validate : " + "'force_notnull' option is only available in 'csv' formatter"))); + } + } + + if (strcasecmp(de->defname, "force_quote") == 0) + { + /* This is allowed only for csv mode formatter */ + if (!isCsv) + { + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("hdfsprotocol_validate : " + "'force_quote' option is only available in 'csv' formatter"))); + } + } + } + + /* All urls should + * 1) have the same protocol name 'hdfs', + * 2) the same hdfs namenode server address + */ + /* Check all locations to get files to fetch location. */ + char *nnaddr = NULL; + int nnport = -1; + ListCell *lc = NULL; + foreach(lc, pvalidator_data->url_list) + { + /* Parse current location URI. */ + char *url = (char *)strVal(lfirst(lc)); + Uri *uri = ParseExternalTableUri(url); + if (uri == NULL) + { + elog(ERROR, "hdfsprotocol_validate : " + "invalid URI encountered %s", url); + } + + if (uri->protocol != URI_HDFS) + { + elog(ERROR, "hdfsprotocol_validate : " + "invalid URI protocol encountered in %s, " + "hdfs:// protocol is required", + url); + } + + if (nnaddr == NULL) + { + nnaddr = pstrdup(uri->hostname); + nnport = uri->port; + } + else + { + if (strcmp(nnaddr, uri->hostname) != 0) + { + elog(ERROR, "hdfsprotocol_validate : " + "different name server addresses are detected, " + "both %s and %s are found", + nnaddr, uri->hostname); + } + if (nnport != uri->port) + { + elog(ERROR, "hdfsprotocol_validate : " + "different name server ports are detected, " + "both %d and %d are found", + nnport, uri->port); + } + } + + /* SHOULD ADD LOGIC HERE TO CREATE UNEXISTING PATH */ + if (pvalidator_data->forceCreateDir) { + + elog(LOG, "hdfs_validator() forced creating dir"); + + /* Create file system instance */ + hdfsFS fs = hdfsConnect(uri->hostname, uri->port); + if (fs == NULL) + { + elog(ERROR, "hdfsprotocol_validate : " + "failed to create HDFS instance to connect to %s:%d", + uri->hostname, uri->port); + } + + if (hdfsExists(fs, uri->path) == -1) + elog(ERROR, "hdfsprotocol_validate : " + "Location \"%s\" is not exist", + uri->path); + + /* destroy fs instance */ + hdfsDisconnect(fs); + } + + /* Clean up temporarily created instances */ + pfree(uri); --- End diff -- this is a memory leak, use FreeExternalTableUri(uri)
---