Repository: incubator-drill
Updated Branches:
  refs/heads/master cdd2ce905 -> 4ea36c3f1


Create 2 phase plan for qualified plain aggregates (no group-by).


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/62a8bf2f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/62a8bf2f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/62a8bf2f

Branch: refs/heads/master
Commit: 62a8bf2f582acb28d35b704a12e1b75ecbc8aa9f
Parents: cdd2ce9
Author: Aman Sinha <[email protected]>
Authored: Thu May 15 18:19:27 2014 -0700
Committer: Aman Sinha <[email protected]>
Committed: Thu May 15 18:20:03 2014 -0700

----------------------------------------------------------------------
 .../exec/planner/physical/StreamAggPrule.java   | 36 ++++++++++++++++++--
 1 file changed, 33 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/62a8bf2f/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrule.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrule.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrule.java
index bccdea5..ff648a4 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrule.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrule.java
@@ -65,10 +65,40 @@ public class StreamAggPrule extends AggPruleBase {
     try {
       if (aggregate.getGroupSet().isEmpty()) {
         DrillDistributionTrait singleDist = DrillDistributionTrait.SINGLETON;
-        traits = 
call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(collation).plus(singleDist);
-        createTransformRequest(call, aggregate, input, traits);
-      } else {
+        RelTraitSet singleDistTrait = 
call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(singleDist);
+        
+        if (create2PhasePlan(call, aggregate)) {
+          traits = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL) 
;
+
+          RelNode convertedInput = convert(input, traits);  
+
+          if (convertedInput instanceof RelSubset) {
+            RelSubset subset = (RelSubset) convertedInput;
+            for (RelNode rel : subset.getRelList()) {
+              if 
(!rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE).equals(DrillDistributionTrait.DEFAULT))
 {
+                DrillDistributionTrait toDist = 
rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE);              
+                traits = 
call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(toDist);
+                RelNode newInput = convert(input, traits);
+
+                StreamAggPrel phase1Agg = new 
StreamAggPrel(aggregate.getCluster(), traits, newInput,
+                    aggregate.getGroupSet(),
+                    aggregate.getAggCallList());
+
+                UnionExchangePrel exch = 
+                    new UnionExchangePrel(phase1Agg.getCluster(), 
singleDistTrait, phase1Agg);
+        
+                StreamAggPrel phase2Agg =  new 
StreamAggPrel(aggregate.getCluster(), singleDistTrait, exch,
+                    aggregate.getGroupSet(),
+                    aggregate.getAggCallList());
 
+                call.transformTo(phase2Agg);  
+              }
+            }
+          }
+        } else {        
+          createTransformRequest(call, aggregate, input, singleDistTrait);
+        }
+      } else {
         // hash distribute on all grouping keys
         DrillDistributionTrait distOnAllKeys = 
             new 
DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED,
 

Reply via email to