Repository: incubator-hawq
Updated Branches:
  refs/heads/master 2761edb60 -> b360e70bc


HAWQ-1409. Send AGG-TYPE header to PXF

This change is mean to be a proof of concept that pushing down
aggregate function information from HAWQ to the underlying external
storage layer does indeed improve performance


Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/b360e70b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/b360e70b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/b360e70b

Branch: refs/heads/master
Commit: b360e70bce636691dee17aefadc1383ccf0aa638
Parents: 2761edb
Author: Kavinder Dhaliwal <kavind...@gmail.com>
Authored: Fri Mar 3 15:27:05 2017 -0800
Committer: Kavinder Dhaliwal <kavind...@gmail.com>
Committed: Fri Mar 31 16:10:18 2017 -0700

----------------------------------------------------------------------
 src/backend/access/external/fileam.c     | 10 +++++++++-
 src/backend/access/external/pxfheaders.c | 12 ++++++++++++
 src/backend/executor/nodeAgg.c           | 13 +++++++++++++
 src/backend/executor/nodeExternalscan.c  |  7 ++++++-
 src/bin/gpfusion/gpbridgeapi.c           |  7 ++++++-
 src/include/access/extprotocol.h         |  2 +-
 src/include/access/fileam.h              |  4 +++-
 src/include/access/pxfheaders.h          |  2 ++
 src/include/executor/executor.h          |  2 ++
 src/include/nodes/execnodes.h            |  1 +
 10 files changed, 55 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/b360e70b/src/backend/access/external/fileam.c
----------------------------------------------------------------------
diff --git a/src/backend/access/external/fileam.c 
b/src/backend/access/external/fileam.c
index d16d516..f77b29e 100644
--- a/src/backend/access/external/fileam.c
+++ b/src/backend/access/external/fileam.c
@@ -459,12 +459,20 @@ external_stopscan(FileScanDesc scan)
  *     ----------------
  */
 ExternalSelectDesc
-external_getnext_init(PlanState *state) {
+external_getnext_init(PlanState *state, ExternalScanState *es_state) {
        ExternalSelectDesc desc = (ExternalSelectDesc) 
palloc0(sizeof(ExternalSelectDescData));
+       Plan *rootPlan;
 
        if (state != NULL)
        {
                desc->projInfo = state->ps_ProjInfo;
+               /*
+                * If we have an agg type then our parent is an Agg node
+                */
+               rootPlan = state->state->es_plannedstmt->planTree;
+               if (IsA(rootPlan, Agg) && es_state->parent_agg_type) {
+                       desc->agg_type = es_state->parent_agg_type;
+               }
        }
        return desc;
 }

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/b360e70b/src/backend/access/external/pxfheaders.c
----------------------------------------------------------------------
diff --git a/src/backend/access/external/pxfheaders.c 
b/src/backend/access/external/pxfheaders.c
index 8e91644..d8904b4 100644
--- a/src/backend/access/external/pxfheaders.c
+++ b/src/backend/access/external/pxfheaders.c
@@ -110,6 +110,18 @@ void build_http_header(PxfInputData *input)
        else
                churl_headers_append(headers, "X-GP-HAS-FILTER", "0");
 
+       /* Aggregate information */
+       if (input->agg_type) {
+               switch(input->agg_type) {
+               case EXEC_FLAG_EXTERNAL_AGG_COUNT:
+                       churl_headers_append(headers, "X-GP-AGG-TYPE", "count");
+                       break;
+               default:
+                       churl_headers_append(headers, "X-GP-AGG-TYPE", 
"unknown");
+                       break;
+               }
+       }
+
        add_delegation_token_headers(headers, input);
        add_remote_credentials(headers);
 }

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/b360e70b/src/backend/executor/nodeAgg.c
----------------------------------------------------------------------
diff --git a/src/backend/executor/nodeAgg.c b/src/backend/executor/nodeAgg.c
index e4eb791..574dcc6 100644
--- a/src/backend/executor/nodeAgg.c
+++ b/src/backend/executor/nodeAgg.c
@@ -1950,6 +1950,19 @@ ExecInitAgg(Agg *node, EState *estate, int eflags)
         * initialize child nodes
         */
        outerPlan = outerPlan(node);
+       if (IsA(outerPlan, ExternalScan)) {
+               /*
+                * Hack to indicate to PXF when there is an external scan
+                */
+               if (list_length(aggstate->aggs) == 1) {
+                               AggrefExprState *aggrefstate = (AggrefExprState 
*) linitial(aggstate->aggs);
+                               Aggref     *aggref = (Aggref *) 
aggrefstate->xprstate.expr;
+                               //Only dealing with one agg
+                               if (aggref->aggfnoid == COUNT_ANY_OID || 
aggref->aggfnoid == COUNT_STAR_OID) {
+                                       eflags |= EXEC_FLAG_EXTERNAL_AGG_COUNT;
+                               }
+               }
+       }
        outerPlanState(aggstate) = ExecInitNode(outerPlan, estate, eflags);
 
        /*

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/b360e70b/src/backend/executor/nodeExternalscan.c
----------------------------------------------------------------------
diff --git a/src/backend/executor/nodeExternalscan.c 
b/src/backend/executor/nodeExternalscan.c
index 2831faa..8f2ba88 100644
--- a/src/backend/executor/nodeExternalscan.c
+++ b/src/backend/executor/nodeExternalscan.c
@@ -80,7 +80,7 @@ ExternalNext(ExternalScanState *node)
        /*
         * get the next tuple from the file access methods
         */
-       externalSelectDesc = external_getnext_init(&(node->ss.ps));
+       externalSelectDesc = external_getnext_init(&(node->ss.ps), node);
        tuple = external_getnext(scandesc, direction, externalSelectDesc);
 
        /*
@@ -237,6 +237,11 @@ ExecInitExternalScan(ExternalScan *node, EState *estate, 
int eflags)
        externalstate->ss.ps.delayEagerFree =
                ((eflags & (EXEC_FLAG_REWIND | EXEC_FLAG_BACKWARD | 
EXEC_FLAG_MARK)) != 0);
 
+       /*
+        * If eflag contains EXEC_FLAG_EXTERNAL_AGG_COUNT then notify the 
underlying storage level
+        */
+       externalstate->parent_agg_type = (eflags & 
EXEC_FLAG_EXTERNAL_AGG_COUNT);
+
        initGpmonPktForExternalScan((Plan *)node, 
&externalstate->ss.ps.gpmon_pkt, estate);
 
        return externalstate;

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/b360e70b/src/bin/gpfusion/gpbridgeapi.c
----------------------------------------------------------------------
diff --git a/src/bin/gpfusion/gpbridgeapi.c b/src/bin/gpfusion/gpbridgeapi.c
index cf4dd84..f176586 100644
--- a/src/bin/gpfusion/gpbridgeapi.c
+++ b/src/bin/gpfusion/gpbridgeapi.c
@@ -182,8 +182,13 @@ void add_querydata_to_http_header(gphadoop_context* 
context, PG_FUNCTION_ARGS)
        inputData.rel = EXTPROTOCOL_GET_RELATION(fcinfo);
        inputData.quals = EXTPROTOCOL_GET_SCANQUALS(fcinfo);
        inputData.filterstr = 
serializePxfFilterQuals(EXTPROTOCOL_GET_SCANQUALS(fcinfo));
-       if (EXTPROTOCOL_GET_SELECTDESC(fcinfo))
+       if (EXTPROTOCOL_GET_SELECTDESC(fcinfo)) {
                inputData.proj_info = EXTPROTOCOL_GET_PROJINFO(fcinfo);
+               int agg_type = EXTPROTOCOL_GET_AGG_TYPE(fcinfo);
+               if (agg_type) {
+                       inputData.agg_type = agg_type;
+               }
+       }
        add_delegation_token(&inputData);
        
        build_http_header(&inputData);

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/b360e70b/src/include/access/extprotocol.h
----------------------------------------------------------------------
diff --git a/src/include/access/extprotocol.h b/src/include/access/extprotocol.h
index 4b69bb7..c1aa724 100644
--- a/src/include/access/extprotocol.h
+++ b/src/include/access/extprotocol.h
@@ -66,8 +66,8 @@ typedef ExtProtocolData *ExtProtocol;
 #define EXTPROTOCOL_GET_USER_CTX(fcinfo)   (((ExtProtocolData*) 
fcinfo->context)->prot_user_ctx)
 #define EXTPROTOCOL_GET_SELECTDESC(fcinfo)   (((ExtProtocolData*) 
fcinfo->context)->desc)
 #define EXTPROTOCOL_GET_PROJINFO(fcinfo) (((ExtProtocolData*) 
fcinfo->context)->desc->projInfo)
+#define EXTPROTOCOL_GET_AGG_TYPE(fcinfo) (((ExtProtocolData*) 
fcinfo->context)->desc->agg_type)
 #define EXTPROTOCOL_IS_LAST_CALL(fcinfo)   (((ExtProtocolData*) 
fcinfo->context)->prot_last_call)
-
 #define EXTPROTOCOL_SET_LAST_CALL(fcinfo)  (((ExtProtocolData*) 
fcinfo->context)->prot_last_call = true)
 #define EXTPROTOCOL_SET_USER_CTX(fcinfo, p) \
        (((ExtProtocolData*) fcinfo->context)->prot_user_ctx = p)

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/b360e70b/src/include/access/fileam.h
----------------------------------------------------------------------
diff --git a/src/include/access/fileam.h b/src/include/access/fileam.h
index 1e926d5..713c185 100644
--- a/src/include/access/fileam.h
+++ b/src/include/access/fileam.h
@@ -70,6 +70,8 @@ typedef ExternalInsertDescData *ExternalInsertDesc;
 typedef struct ExternalSelectDescData
 {
        ProjectionInfo *projInfo;
+       // Information needed for aggregate pushdown
+       int  agg_type;
 } ExternalSelectDescData;
 
 typedef enum DataLineStatus
@@ -89,7 +91,7 @@ extern FileScanDesc external_beginscan(Relation relation, 
Index scanrelid,
 extern void external_rescan(FileScanDesc scan);
 extern void external_endscan(FileScanDesc scan);
 extern void external_stopscan(FileScanDesc scan);
-extern ExternalSelectDesc external_getnext_init(PlanState *state);
+extern ExternalSelectDesc external_getnext_init(PlanState *state, 
ExternalScanState *es_state);
 extern HeapTuple external_getnext(FileScanDesc scan, ScanDirection direction, 
ExternalSelectDesc desc);
 extern ExternalInsertDesc external_insert_init(Relation rel, int errAosegno);
 extern Oid external_insert(ExternalInsertDesc extInsertDesc, HeapTuple instup);

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/b360e70b/src/include/access/pxfheaders.h
----------------------------------------------------------------------
diff --git a/src/include/access/pxfheaders.h b/src/include/access/pxfheaders.h
index f4adc6c..1c92da8 100644
--- a/src/include/access/pxfheaders.h
+++ b/src/include/access/pxfheaders.h
@@ -46,6 +46,8 @@ typedef struct sPxfInputData
        PxfHdfsToken    token;
        ProjectionInfo  *proj_info;
        List                    *quals;
+       int                             agg_type;
+       int                             agg_groups;
 } PxfInputData;
 
 void build_http_header(PxfInputData *input);

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/b360e70b/src/include/executor/executor.h
----------------------------------------------------------------------
diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h
index a6ff148..ea1dad2 100644
--- a/src/include/executor/executor.h
+++ b/src/include/executor/executor.h
@@ -72,6 +72,8 @@ struct ChunkTransportState;             /* #include 
"cdb/cdbinterconnect.h" */
 #define EXEC_FLAG_BACKWARD             0x0004  /* need backward scan */
 #define EXEC_FLAG_MARK                 0x0008  /* need mark/restore */
 
+#define EXEC_FLAG_EXTERNAL_AGG_COUNT  0x0010   /* can support external agg */
+
 
 /*
  * ExecEvalExpr was formerly a function containing a switch statement;

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/b360e70b/src/include/nodes/execnodes.h
----------------------------------------------------------------------
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 7a2e733..c6719a7 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -1846,6 +1846,7 @@ typedef struct ExternalScanState
        struct FileScanDescData *ess_ScanDesc;
        bool cdb_want_ctid;
        ItemPointerData cdb_fake_ctid;
+       int parent_agg_type;
 } ExternalScanState;
 
 /* ----------------

Reply via email to