Repository: incubator-hawq Updated Branches: refs/heads/master 2761edb60 -> b360e70bc
HAWQ-1409. Send AGG-TYPE header to PXF This change is mean to be a proof of concept that pushing down aggregate function information from HAWQ to the underlying external storage layer does indeed improve performance Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/b360e70b Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/b360e70b Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/b360e70b Branch: refs/heads/master Commit: b360e70bce636691dee17aefadc1383ccf0aa638 Parents: 2761edb Author: Kavinder Dhaliwal <kavind...@gmail.com> Authored: Fri Mar 3 15:27:05 2017 -0800 Committer: Kavinder Dhaliwal <kavind...@gmail.com> Committed: Fri Mar 31 16:10:18 2017 -0700 ---------------------------------------------------------------------- src/backend/access/external/fileam.c | 10 +++++++++- src/backend/access/external/pxfheaders.c | 12 ++++++++++++ src/backend/executor/nodeAgg.c | 13 +++++++++++++ src/backend/executor/nodeExternalscan.c | 7 ++++++- src/bin/gpfusion/gpbridgeapi.c | 7 ++++++- src/include/access/extprotocol.h | 2 +- src/include/access/fileam.h | 4 +++- src/include/access/pxfheaders.h | 2 ++ src/include/executor/executor.h | 2 ++ src/include/nodes/execnodes.h | 1 + 10 files changed, 55 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/b360e70b/src/backend/access/external/fileam.c ---------------------------------------------------------------------- diff --git a/src/backend/access/external/fileam.c b/src/backend/access/external/fileam.c index d16d516..f77b29e 100644 --- a/src/backend/access/external/fileam.c +++ b/src/backend/access/external/fileam.c @@ -459,12 +459,20 @@ external_stopscan(FileScanDesc scan) * ---------------- */ ExternalSelectDesc -external_getnext_init(PlanState *state) { +external_getnext_init(PlanState *state, ExternalScanState *es_state) { ExternalSelectDesc desc = (ExternalSelectDesc) palloc0(sizeof(ExternalSelectDescData)); + Plan *rootPlan; if (state != NULL) { desc->projInfo = state->ps_ProjInfo; + /* + * If we have an agg type then our parent is an Agg node + */ + rootPlan = state->state->es_plannedstmt->planTree; + if (IsA(rootPlan, Agg) && es_state->parent_agg_type) { + desc->agg_type = es_state->parent_agg_type; + } } return desc; } http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/b360e70b/src/backend/access/external/pxfheaders.c ---------------------------------------------------------------------- diff --git a/src/backend/access/external/pxfheaders.c b/src/backend/access/external/pxfheaders.c index 8e91644..d8904b4 100644 --- a/src/backend/access/external/pxfheaders.c +++ b/src/backend/access/external/pxfheaders.c @@ -110,6 +110,18 @@ void build_http_header(PxfInputData *input) else churl_headers_append(headers, "X-GP-HAS-FILTER", "0"); + /* Aggregate information */ + if (input->agg_type) { + switch(input->agg_type) { + case EXEC_FLAG_EXTERNAL_AGG_COUNT: + churl_headers_append(headers, "X-GP-AGG-TYPE", "count"); + break; + default: + churl_headers_append(headers, "X-GP-AGG-TYPE", "unknown"); + break; + } + } + add_delegation_token_headers(headers, input); add_remote_credentials(headers); } http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/b360e70b/src/backend/executor/nodeAgg.c ---------------------------------------------------------------------- diff --git a/src/backend/executor/nodeAgg.c b/src/backend/executor/nodeAgg.c index e4eb791..574dcc6 100644 --- a/src/backend/executor/nodeAgg.c +++ b/src/backend/executor/nodeAgg.c @@ -1950,6 +1950,19 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) * initialize child nodes */ outerPlan = outerPlan(node); + if (IsA(outerPlan, ExternalScan)) { + /* + * Hack to indicate to PXF when there is an external scan + */ + if (list_length(aggstate->aggs) == 1) { + AggrefExprState *aggrefstate = (AggrefExprState *) linitial(aggstate->aggs); + Aggref *aggref = (Aggref *) aggrefstate->xprstate.expr; + //Only dealing with one agg + if (aggref->aggfnoid == COUNT_ANY_OID || aggref->aggfnoid == COUNT_STAR_OID) { + eflags |= EXEC_FLAG_EXTERNAL_AGG_COUNT; + } + } + } outerPlanState(aggstate) = ExecInitNode(outerPlan, estate, eflags); /* http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/b360e70b/src/backend/executor/nodeExternalscan.c ---------------------------------------------------------------------- diff --git a/src/backend/executor/nodeExternalscan.c b/src/backend/executor/nodeExternalscan.c index 2831faa..8f2ba88 100644 --- a/src/backend/executor/nodeExternalscan.c +++ b/src/backend/executor/nodeExternalscan.c @@ -80,7 +80,7 @@ ExternalNext(ExternalScanState *node) /* * get the next tuple from the file access methods */ - externalSelectDesc = external_getnext_init(&(node->ss.ps)); + externalSelectDesc = external_getnext_init(&(node->ss.ps), node); tuple = external_getnext(scandesc, direction, externalSelectDesc); /* @@ -237,6 +237,11 @@ ExecInitExternalScan(ExternalScan *node, EState *estate, int eflags) externalstate->ss.ps.delayEagerFree = ((eflags & (EXEC_FLAG_REWIND | EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)) != 0); + /* + * If eflag contains EXEC_FLAG_EXTERNAL_AGG_COUNT then notify the underlying storage level + */ + externalstate->parent_agg_type = (eflags & EXEC_FLAG_EXTERNAL_AGG_COUNT); + initGpmonPktForExternalScan((Plan *)node, &externalstate->ss.ps.gpmon_pkt, estate); return externalstate; http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/b360e70b/src/bin/gpfusion/gpbridgeapi.c ---------------------------------------------------------------------- diff --git a/src/bin/gpfusion/gpbridgeapi.c b/src/bin/gpfusion/gpbridgeapi.c index cf4dd84..f176586 100644 --- a/src/bin/gpfusion/gpbridgeapi.c +++ b/src/bin/gpfusion/gpbridgeapi.c @@ -182,8 +182,13 @@ void add_querydata_to_http_header(gphadoop_context* context, PG_FUNCTION_ARGS) inputData.rel = EXTPROTOCOL_GET_RELATION(fcinfo); inputData.quals = EXTPROTOCOL_GET_SCANQUALS(fcinfo); inputData.filterstr = serializePxfFilterQuals(EXTPROTOCOL_GET_SCANQUALS(fcinfo)); - if (EXTPROTOCOL_GET_SELECTDESC(fcinfo)) + if (EXTPROTOCOL_GET_SELECTDESC(fcinfo)) { inputData.proj_info = EXTPROTOCOL_GET_PROJINFO(fcinfo); + int agg_type = EXTPROTOCOL_GET_AGG_TYPE(fcinfo); + if (agg_type) { + inputData.agg_type = agg_type; + } + } add_delegation_token(&inputData); build_http_header(&inputData); http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/b360e70b/src/include/access/extprotocol.h ---------------------------------------------------------------------- diff --git a/src/include/access/extprotocol.h b/src/include/access/extprotocol.h index 4b69bb7..c1aa724 100644 --- a/src/include/access/extprotocol.h +++ b/src/include/access/extprotocol.h @@ -66,8 +66,8 @@ typedef ExtProtocolData *ExtProtocol; #define EXTPROTOCOL_GET_USER_CTX(fcinfo) (((ExtProtocolData*) fcinfo->context)->prot_user_ctx) #define EXTPROTOCOL_GET_SELECTDESC(fcinfo) (((ExtProtocolData*) fcinfo->context)->desc) #define EXTPROTOCOL_GET_PROJINFO(fcinfo) (((ExtProtocolData*) fcinfo->context)->desc->projInfo) +#define EXTPROTOCOL_GET_AGG_TYPE(fcinfo) (((ExtProtocolData*) fcinfo->context)->desc->agg_type) #define EXTPROTOCOL_IS_LAST_CALL(fcinfo) (((ExtProtocolData*) fcinfo->context)->prot_last_call) - #define EXTPROTOCOL_SET_LAST_CALL(fcinfo) (((ExtProtocolData*) fcinfo->context)->prot_last_call = true) #define EXTPROTOCOL_SET_USER_CTX(fcinfo, p) \ (((ExtProtocolData*) fcinfo->context)->prot_user_ctx = p) http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/b360e70b/src/include/access/fileam.h ---------------------------------------------------------------------- diff --git a/src/include/access/fileam.h b/src/include/access/fileam.h index 1e926d5..713c185 100644 --- a/src/include/access/fileam.h +++ b/src/include/access/fileam.h @@ -70,6 +70,8 @@ typedef ExternalInsertDescData *ExternalInsertDesc; typedef struct ExternalSelectDescData { ProjectionInfo *projInfo; + // Information needed for aggregate pushdown + int agg_type; } ExternalSelectDescData; typedef enum DataLineStatus @@ -89,7 +91,7 @@ extern FileScanDesc external_beginscan(Relation relation, Index scanrelid, extern void external_rescan(FileScanDesc scan); extern void external_endscan(FileScanDesc scan); extern void external_stopscan(FileScanDesc scan); -extern ExternalSelectDesc external_getnext_init(PlanState *state); +extern ExternalSelectDesc external_getnext_init(PlanState *state, ExternalScanState *es_state); extern HeapTuple external_getnext(FileScanDesc scan, ScanDirection direction, ExternalSelectDesc desc); extern ExternalInsertDesc external_insert_init(Relation rel, int errAosegno); extern Oid external_insert(ExternalInsertDesc extInsertDesc, HeapTuple instup); http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/b360e70b/src/include/access/pxfheaders.h ---------------------------------------------------------------------- diff --git a/src/include/access/pxfheaders.h b/src/include/access/pxfheaders.h index f4adc6c..1c92da8 100644 --- a/src/include/access/pxfheaders.h +++ b/src/include/access/pxfheaders.h @@ -46,6 +46,8 @@ typedef struct sPxfInputData PxfHdfsToken token; ProjectionInfo *proj_info; List *quals; + int agg_type; + int agg_groups; } PxfInputData; void build_http_header(PxfInputData *input); http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/b360e70b/src/include/executor/executor.h ---------------------------------------------------------------------- diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h index a6ff148..ea1dad2 100644 --- a/src/include/executor/executor.h +++ b/src/include/executor/executor.h @@ -72,6 +72,8 @@ struct ChunkTransportState; /* #include "cdb/cdbinterconnect.h" */ #define EXEC_FLAG_BACKWARD 0x0004 /* need backward scan */ #define EXEC_FLAG_MARK 0x0008 /* need mark/restore */ +#define EXEC_FLAG_EXTERNAL_AGG_COUNT 0x0010 /* can support external agg */ + /* * ExecEvalExpr was formerly a function containing a switch statement; http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/b360e70b/src/include/nodes/execnodes.h ---------------------------------------------------------------------- diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index 7a2e733..c6719a7 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -1846,6 +1846,7 @@ typedef struct ExternalScanState struct FileScanDescData *ess_ScanDesc; bool cdb_want_ctid; ItemPointerData cdb_fake_ctid; + int parent_agg_type; } ExternalScanState; /* ----------------