Repository: incubator-drill Updated Branches: refs/heads/master cdd2ce905 -> 4ea36c3f1
Create 2 phase plan for qualified plain aggregates (no group-by). Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/62a8bf2f Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/62a8bf2f Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/62a8bf2f Branch: refs/heads/master Commit: 62a8bf2f582acb28d35b704a12e1b75ecbc8aa9f Parents: cdd2ce9 Author: Aman Sinha <[email protected]> Authored: Thu May 15 18:19:27 2014 -0700 Committer: Aman Sinha <[email protected]> Committed: Thu May 15 18:20:03 2014 -0700 ---------------------------------------------------------------------- .../exec/planner/physical/StreamAggPrule.java | 36 ++++++++++++++++++-- 1 file changed, 33 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/62a8bf2f/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrule.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrule.java index bccdea5..ff648a4 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrule.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrule.java @@ -65,10 +65,40 @@ public class StreamAggPrule extends AggPruleBase { try { if (aggregate.getGroupSet().isEmpty()) { DrillDistributionTrait singleDist = DrillDistributionTrait.SINGLETON; - traits = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(collation).plus(singleDist); - createTransformRequest(call, aggregate, input, traits); - } else { + RelTraitSet singleDistTrait = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(singleDist); + + if (create2PhasePlan(call, aggregate)) { + traits = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL) ; + + RelNode convertedInput = convert(input, traits); + + if (convertedInput instanceof RelSubset) { + RelSubset subset = (RelSubset) convertedInput; + for (RelNode rel : subset.getRelList()) { + if (!rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE).equals(DrillDistributionTrait.DEFAULT)) { + DrillDistributionTrait toDist = rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE); + traits = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(toDist); + RelNode newInput = convert(input, traits); + + StreamAggPrel phase1Agg = new StreamAggPrel(aggregate.getCluster(), traits, newInput, + aggregate.getGroupSet(), + aggregate.getAggCallList()); + + UnionExchangePrel exch = + new UnionExchangePrel(phase1Agg.getCluster(), singleDistTrait, phase1Agg); + + StreamAggPrel phase2Agg = new StreamAggPrel(aggregate.getCluster(), singleDistTrait, exch, + aggregate.getGroupSet(), + aggregate.getAggCallList()); + call.transformTo(phase2Agg); + } + } + } + } else { + createTransformRequest(call, aggregate, input, singleDistTrait); + } + } else { // hash distribute on all grouping keys DrillDistributionTrait distOnAllKeys = new DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED,
