Github user huor commented on a diff in the pull request:
https://github.com/apache/incubator-hawq/pull/1384#discussion_r208434815
--- Diff: src/backend/cdb/cdbdatalocality.c ---
@@ -1579,7 +1700,242 @@ static void ParquetGetSegFileDataLocation(Relation
relation,
return;
}
+static void InvokeHDFSProtocolBlockLocation(Oid procOid,
+ List *locs,
+ List **blockLocations)
+{
+ ExtProtocolValidatorData *validator_data;
+ FmgrInfo *validator_udf;
+ FunctionCallInfoData fcinfo;
+
+ validator_data = (ExtProtocolValidatorData *)
+ palloc0
(sizeof(ExtProtocolValidatorData));
+ validator_udf = palloc(sizeof(FmgrInfo));
+ fmgr_info(procOid, validator_udf);
+
+ validator_data->type = T_ExtProtocolValidatorData;
+ validator_data->url_list = locs;
+ validator_data->format_opts = NULL;
+ validator_data->errmsg = NULL;
+ validator_data->direction = EXT_VALIDATE_READ;
+ validator_data->action = EXT_VALID_ACT_GETBLKLOC;
+
+ InitFunctionCallInfoData(/* FunctionCallInfoData */ fcinfo,
+ /* FmgrInfo */
validator_udf,
+ /* nArgs */ 0,
+ /* Call Context */
(Node *) validator_data,
+ /* ResultSetInfo */
NULL);
+
+ /* invoke validator. if this function returns - validation passed */
+ FunctionCallInvoke(&fcinfo);
+
+ ExtProtocolBlockLocationData *bls =
+ (ExtProtocolBlockLocationData *)(fcinfo.resultinfo);
+ /* debug output block location. */
+ if (bls != NULL)
+ {
+ ListCell *c;
+ foreach(c, bls->files)
+ {
+ blocklocation_file *blf = (blocklocation_file
*)(lfirst(c));
+ elog(DEBUG3, "DEBUG LOCATION for %s with %d blocks",
+ blf->file_uri, blf->block_num);
+ for ( int i = 0 ; i < blf->block_num ; ++i )
+ {
+ BlockLocation *pbl = &(blf->locations[i]);
+ elog(DEBUG3, "DEBUG LOCATION for block %d : %d,
"
+ INT64_FORMAT ", "
INT64_FORMAT ", %d",
+ i,
+ pbl->corrupt, pbl->length,
pbl->offset,
+ pbl->numOfNodes);
+ for ( int j = 0 ; j < pbl->numOfNodes ; ++j )
+ {
+ elog(DEBUG3, "DEBUG LOCATION for block
%d : %s, %s, %s",
+ i,
+ pbl->hosts[j],
pbl->names[j],
+
pbl->topologyPaths[j]);
+ }
+ }
+ }
+ }
+ elog(DEBUG3, "after invoking get block location API");
+
+ /* get location data from fcinfo.resultinfo. */
+ if (bls != NULL)
+ {
+ Assert(bls->type == T_ExtProtocolBlockLocationData);
+ while(list_length(bls->files) > 0)
+ {
+ void *v = lfirst(list_head(bls->files));
+ bls->files = list_delete_first(bls->files);
+ *blockLocations = lappend(*blockLocations, v);
+ }
+ }
+ pfree(validator_data);
+ pfree(validator_udf);
+}
+
+Oid
+LookupCustomProtocolBlockLocationFunc(char *protoname)
+{
+ List* funcname = NIL;
+ Oid procOid = InvalidOid;
+ Oid argList[1];
+ Oid returnOid;
+
+ char* new_func_name = (char *)palloc0(strlen(protoname) + 16);
+ sprintf(new_func_name, "%s_blocklocation", protoname);
+ funcname = lappend(funcname, makeString(new_func_name));
+ returnOid = VOIDOID;
+ procOid = LookupFuncName(funcname, 0, argList, true);
+
+ if (!OidIsValid(procOid))
+ ereport(ERROR, (errcode(ERRCODE_UNDEFINED_FUNCTION),
+ errmsg("protocol function %s
was not found.",
+ new_func_name),
+ errhint("Create it with CREATE
FUNCTION."),
+ errOmitLocation(true)));
+
+ /* check return type matches */
+ if (get_func_rettype(procOid) != returnOid)
+ ereport(ERROR, (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
+ errmsg("protocol function %s
has an incorrect return type",
+ new_func_name),
+ errOmitLocation(true)));
+
+ /* check allowed volatility */
+ if (func_volatile(procOid) != PROVOLATILE_STABLE)
+ ereport(ERROR, (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
+ errmsg("protocol function %s is
not declared STABLE.",
+ new_func_name),
+ errOmitLocation(true)));
+ pfree(new_func_name);
+
+ return procOid;
+}
+
+static void ExternalGetHdfsFileDataLocation(
+ Relation relation,
+ split_to_segment_mapping_context *context,
+ int64 splitsize,
+ Relation_Data *rel_data,
+ int* allblocks) {
+ ExtTableEntry *ext_entry = GetExtTableEntry(rel_data->relid);
+ Assert(ext_entry->locations != NIL);
+ int64 total_size = 0;
+ int segno = 1;
+
+ /*
+ * Step 1. get external HDFS location from URI.
+ */
+ char* first_uri_str = (char *)
strVal(lfirst(list_head(ext_entry->locations)));
+ /* We must have at least one location. */
+ Assert(first_uri_str != NULL);
+ Uri* uri = ParseExternalTableUri(first_uri_str);
+ bool isHdfs = false;
+ if (uri != NULL && is_hdfs_protocol(uri)) {
+ isHdfs = true;
+ }
+ Assert(isHdfs); /* Currently, we accept HDFS only. */
+
+ /*
+ * Step 2. Get function to call for getting location information. This
work
+ * is done by validator function registered for this external protocol.
+ */
+ Oid procOid = InvalidOid;
+ if (isHdfs) {
+ procOid = LookupCustomProtocolBlockLocationFunc("hdfs");
+ }
+ else
+ {
+ Assert(false);
+ }
+
+ /*
+ * Step 3. Call validator to get location data.
+ */
+
+ /* Prepare function call parameter by passing into location string.
This is
+ * only called at dispatcher side. */
+ List *bls = NULL; /* Block locations */
+ if (OidIsValid(procOid) && Gp_role == GP_ROLE_DISPATCH)
+ {
+ InvokeHDFSProtocolBlockLocation(procOid, ext_entry->locations,
&bls);
+ }
+
+ /*
+ * Step 4. Build data location info for optimization after this call.
+ */
+
+ /* Go through each files */
+ ListCell *cbl = NULL;
+ foreach(cbl, bls)
+ {
+ blocklocation_file *f = (blocklocation_file *)lfirst(cbl);
+ BlockLocation *locations = f->locations;
+ int block_num = f->block_num;
+ int64 logic_len = 0;
+ *allblocks += block_num;
+ if ((locations != NULL) && (block_num > 0)) {
+ // calculate length for one specific file
+ for (int j = 0; j < block_num; ++j) {
+ logic_len += locations[j].length;
+ // locations[j].lowerBoundInc = NULL;
--- End diff --
Remove lowerBoundInc and upperBoundExc
---