This is an automated email from the ASF dual-hosted git repository. maxyang pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/cloudberry.git
commit 60179b110bb8a46c7ae01e1b204c4159c47e8173 Author: Chris Hajas <[email protected]> AuthorDate: Mon Jun 27 14:57:33 2022 -0700 Support intermediate aggs in Orca plans (#13707) Orca in GPDB6 has support for intermediate aggs, which isn't used in postgres. This is useful when we have a DQA and a regular "ride-along" agg. However, we need to differentiate when we should run the combine/final/trans functions when this ride-along agg is present. This commit re-adds support for intermediate aggs. The logic here is the same as 6X, however, instead of explicitly using the aggstage, we use the aggsplit, which is determined from the aggstage. The logic is defined in `AGGSPLIT_INTERNMEDIATE`. The changes in nodeAgg.c are to allow the aggref and aggstate to differ for an aggregate. This is necessary and expected in the case of an intermediate agg, as the loop will iterate over each aggstate->aggs, but the aggsplit can now be different between the aggref and the aggstate. Thus the aggsplit references are also changed to use aggref instead of aggstate. --- src/backend/executor/nodeAgg.c | 14 ++-- .../gpopt/translate/CTranslatorDXLToScalar.cpp | 5 +- .../gporca/data/dxl/minidump/DQA-1-RegularAgg.mdp | 2 +- .../gporca/data/dxl/minidump/DQA-2-RegularAgg.mdp | 2 +- .../data/dxl/minidump/MDQA-SameDQAColumn.mdp | 2 +- .../gporca/libgpopt/src/xforms/CXformSplitDQA.cpp | 86 ++++++---------------- src/include/nodes/nodes.h | 2 + src/test/regress/expected/aggregates_optimizer.out | 2 - 8 files changed, 33 insertions(+), 82 deletions(-) diff --git a/src/backend/executor/nodeAgg.c b/src/backend/executor/nodeAgg.c index a19c442d9d..75f160856c 100644 --- a/src/backend/executor/nodeAgg.c +++ b/src/backend/executor/nodeAgg.c @@ -3884,8 +3884,6 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) /* Planner should have assigned aggregate to correct level */ Assert(aggref->agglevelsup == 0); - /* ... and the split mode should match */ - Assert(aggref->aggsplit == aggstate->aggsplit); peragg = &peraggs[aggref->aggno]; @@ -3917,7 +3915,7 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) Assert(OidIsValid(aggtranstype)); /* Final function only required if we're finalizing the aggregates */ - if (DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit)) + if (DO_AGGSPLIT_SKIPFINAL(aggref->aggsplit)) peragg->finalfn_oid = finalfn_oid = InvalidOid; else peragg->finalfn_oid = finalfn_oid = aggform->aggfinalfn; @@ -3936,10 +3934,10 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) * every aggregate with an INTERNAL state has a serialization * function. Verify that. */ - if (DO_AGGSPLIT_SERIALIZE(aggstate->aggsplit)) + if (DO_AGGSPLIT_SERIALIZE(aggref->aggsplit)) { /* serialization only valid when not running finalfn */ - Assert(DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit)); + Assert(DO_AGGSPLIT_SKIPFINAL(aggref->aggsplit)); if (!OidIsValid(aggform->aggserialfn)) elog(ERROR, "serialfunc not provided for serialization aggregation"); @@ -3947,10 +3945,10 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) } /* Likewise for deserialization functions */ - if (DO_AGGSPLIT_DESERIALIZE(aggstate->aggsplit)) + if (DO_AGGSPLIT_DESERIALIZE(aggref->aggsplit)) { /* deserialization only valid when combining states */ - Assert(DO_AGGSPLIT_COMBINE(aggstate->aggsplit)); + Assert(DO_AGGSPLIT_COMBINE(aggref->aggsplit)); if (!OidIsValid(aggform->aggdeserialfn)) elog(ERROR, "deserialfunc not provided for deserialization aggregation"); @@ -4058,7 +4056,7 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) * of using the transition function, we'll use the combine * function */ - if (DO_AGGSPLIT_COMBINE(aggstate->aggsplit)) + if (DO_AGGSPLIT_COMBINE(aggref->aggsplit)) { transfn_oid = aggform->aggcombinefn; diff --git a/src/backend/gpopt/translate/CTranslatorDXLToScalar.cpp b/src/backend/gpopt/translate/CTranslatorDXLToScalar.cpp index c640913243..de79f591cf 100644 --- a/src/backend/gpopt/translate/CTranslatorDXLToScalar.cpp +++ b/src/backend/gpopt/translate/CTranslatorDXLToScalar.cpp @@ -581,10 +581,7 @@ CTranslatorDXLToScalar::TranslateDXLScalarAggrefToScalar( aggref->aggsplit = AGGSPLIT_INITIAL_SERIAL; break; case EdxlaggstageIntermediate: - GPOS_RAISE( - gpdxl::ExmaDXL, gpdxl::ExmiPlStmt2DXLConversion, - GPOS_WSZ_LIT( - "GPDB_96_MERGE_FIXME: Intermediate aggregate stage not implemented")); + aggref->aggsplit = AGGSPLIT_INTERNMEDIATE; break; case EdxlaggstageFinal: aggref->aggsplit = AGGSPLIT_FINAL_DESERIAL; diff --git a/src/backend/gporca/data/dxl/minidump/DQA-1-RegularAgg.mdp b/src/backend/gporca/data/dxl/minidump/DQA-1-RegularAgg.mdp index c6f1155860..c1eaf49b79 100644 --- a/src/backend/gporca/data/dxl/minidump/DQA-1-RegularAgg.mdp +++ b/src/backend/gporca/data/dxl/minidump/DQA-1-RegularAgg.mdp @@ -275,7 +275,7 @@ </dxl:LogicalGet> </dxl:LogicalGroupBy> </dxl:Query> - <dxl:Plan Id="0" SpaceSize="30"> + <dxl:Plan Id="0" SpaceSize="42"> <dxl:GatherMotion InputSegments="0,1" OutputSegments="-1"> <dxl:Properties> <dxl:Cost StartupCost="0" TotalCost="577.928905" Rows="4.000000" Width="16"/> diff --git a/src/backend/gporca/data/dxl/minidump/DQA-2-RegularAgg.mdp b/src/backend/gporca/data/dxl/minidump/DQA-2-RegularAgg.mdp index 12327bbf39..b39c08475b 100644 --- a/src/backend/gporca/data/dxl/minidump/DQA-2-RegularAgg.mdp +++ b/src/backend/gporca/data/dxl/minidump/DQA-2-RegularAgg.mdp @@ -320,7 +320,7 @@ </dxl:LogicalGet> </dxl:LogicalGroupBy> </dxl:Query> - <dxl:Plan Id="0" SpaceSize="30"> + <dxl:Plan Id="0" SpaceSize="42"> <dxl:GatherMotion InputSegments="0,1" OutputSegments="-1"> <dxl:Properties> <dxl:Cost StartupCost="0" TotalCost="579.950342" Rows="4.000000" Width="24"/> diff --git a/src/backend/gporca/data/dxl/minidump/MDQA-SameDQAColumn.mdp b/src/backend/gporca/data/dxl/minidump/MDQA-SameDQAColumn.mdp index 39b76e7708..aa75d10265 100644 --- a/src/backend/gporca/data/dxl/minidump/MDQA-SameDQAColumn.mdp +++ b/src/backend/gporca/data/dxl/minidump/MDQA-SameDQAColumn.mdp @@ -352,7 +352,7 @@ </dxl:LogicalGroupBy> </dxl:LogicalLimit> </dxl:Query> - <dxl:Plan Id="0" SpaceSize="101"> + <dxl:Plan Id="0" SpaceSize="131"> <dxl:GatherMotion InputSegments="0,1" OutputSegments="-1"> <dxl:Properties> <dxl:Cost StartupCost="0" TotalCost="585.939124" Rows="5.000000" Width="28"/> diff --git a/src/backend/gporca/libgpopt/src/xforms/CXformSplitDQA.cpp b/src/backend/gporca/libgpopt/src/xforms/CXformSplitDQA.cpp index 3cf2f8ccde..dbc325a33c 100644 --- a/src/backend/gporca/libgpopt/src/xforms/CXformSplitDQA.cpp +++ b/src/backend/gporca/libgpopt/src/xforms/CXformSplitDQA.cpp @@ -79,40 +79,6 @@ CXformSplitDQA::Exfp(CExpressionHandle &exprhdl) const return CXform::ExfpHigh; } -// Checks whether or not the project list contains at least one DQA and one -// non-DQA. -static bool -FContainsRideAlongAggregate(CExpression *pexprProjectList) -{ - bool hasDQA = false; - bool hasNonDQA = false; - - const ULONG size = pexprProjectList->PdrgPexpr()->Size(); - for (ULONG ul = 0; ul < size; ul++) - { - CExpression *pexpr = (*pexprProjectList->PdrgPexpr())[ul]; - - const ULONG sizeInner = pexpr->PdrgPexpr()->Size(); - CScalarAggFunc *paggfunc; - if (sizeInner != 1 || (paggfunc = CScalarAggFunc::PopConvert( - (*pexpr->PdrgPexpr())[0]->Pop())) == nullptr) - { - continue; - } - - if (paggfunc->IsDistinct()) - { - hasDQA = true; - } - else - { - hasNonDQA = true; - } - } - - return hasDQA && hasNonDQA; -} - //--------------------------------------------------------------------------- // @function: // CXformSplitDQA::Transform @@ -184,37 +150,27 @@ CXformSplitDQA::Transform(CXformContext *pxfctxt, CXformResult *pxfres, pxfres->Add(pexprThreeStageDQA); - // GPDB_96_MERGE_FIXME: Postgres 9.6 merge commit 38d881555207 replaced - // Greenplum multi-stage aggregate executor code with upstream. In the - // process, we lost the intermediate aggregate stage which is useful when - // we have a 'ride-along' aggregate. For example, - // - // SELECT SUM(a), COUNT(DISTINCT b) FROM foo; - // - // After we re-implement intermediate aggregate stage in executor we should - // be able to re-enable the following transform optimization. - if (!FContainsRideAlongAggregate(pexprProjectList)) - { - // generate two-stage agg - // this transform is useful for cases where distinct column is same as distributed column. - // for a query like "select count(distinct a) from bar;" - // we generate a two stage agg where the aggregate operator gives us the distinct values. - // CScalarProjectList for the Local agg below is empty on purpose. - - // +--CLogicalGbAgg( Global ) Grp Cols: [][Global] - // |--CLogicalGbAgg( Local ) Grp Cols: ["a" (0)][Local], - // | |--CLogicalGet "bar" ("bar"), - // | +--CScalarProjectList - // +--CScalarProjectList - // +--CScalarProjectElement "count" (9) - // +--CScalarAggFunc (count , Distinct: false , Aggregate Stage: Global) - // +--CScalarIdent "a" (0) - - CExpression *pexprTwoStageScalarDQA = PexprSplitHelper( - mp, col_factory, md_accessor, pexpr, pexprRelational, phmexprcr, - pdrgpcrArgDQA, CLogicalGbAgg::EasTwoStageScalarDQA); - pxfres->Add(pexprTwoStageScalarDQA); - } + + // generate two-stage agg + // this transform is useful for cases where distinct column is same as distributed column. + // for a query like "select count(distinct a) from bar;" + // we generate a two stage agg where the aggregate operator gives us the distinct values. + // CScalarProjectList for the Local agg below is empty on purpose. + + // +--CLogicalGbAgg( Global ) Grp Cols: [][Global] + // |--CLogicalGbAgg( Local ) Grp Cols: ["a" (0)][Local], + // | |--CLogicalGet "bar" ("bar"), + // | +--CScalarProjectList + // +--CScalarProjectList + // +--CScalarProjectElement "count" (9) + // +--CScalarAggFunc (count , Distinct: false , Aggregate Stage: Global) + // +--CScalarIdent "a" (0) + + CExpression *pexprTwoStageScalarDQA = PexprSplitHelper( + mp, col_factory, md_accessor, pexpr, pexprRelational, phmexprcr, + pdrgpcrArgDQA, CLogicalGbAgg::EasTwoStageScalarDQA); + pxfres->Add(pexprTwoStageScalarDQA); + // generate local DQA, global agg for both scalar and non-scalar agg cases. // for a query like "select count(distinct a) from bar;" diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h index 22b16cd2e5..a591ae5b1d 100644 --- a/src/include/nodes/nodes.h +++ b/src/include/nodes/nodes.h @@ -987,6 +987,8 @@ typedef enum AggSplit * stripped away from Aggs in setrefs.c. */ AGGSPLIT_DEDUPLICATED = AGGSPLITOP_DEDUPLICATED, + + AGGSPLIT_INTERNMEDIATE = AGGSPLITOP_SKIPFINAL | AGGSPLITOP_SERIALIZE | AGGSPLITOP_COMBINE | AGGSPLITOP_DESERIALIZE, } AggSplit; /* Test whether an AggSplit value selects each primitive option: */ diff --git a/src/test/regress/expected/aggregates_optimizer.out b/src/test/regress/expected/aggregates_optimizer.out index 9328790ecb..c4d863adbb 100644 --- a/src/test/regress/expected/aggregates_optimizer.out +++ b/src/test/regress/expected/aggregates_optimizer.out @@ -579,8 +579,6 @@ group by ten order by ten; select ten, count(four), sum(DISTINCT four) from onek group by ten order by ten; -INFO: GPORCA failed to produce a plan, falling back to planner -DETAIL: GPDB Expression type: GPDB_96_MERGE_FIXME: Intermediate aggregate stage not implemented not supported in DXL ten | count | sum -----+-------+----- 0 | 100 | 2 --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
