DRILL-646: Broadcast-inner plans for Hash Join and Merge Join.

 - Consolidated planning methods in base class.
 - Add option to enable/disable broadcast join. Default is disabled for now.


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/f1eaadea
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/f1eaadea
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/f1eaadea

Branch: refs/heads/master
Commit: f1eaadea87558bbb96bdc01fc69b78879fd65037
Parents: d414d5d
Author: Aman Sinha <[email protected]>
Authored: Sat May 10 18:05:14 2014 -0700
Committer: Aditya Kishore <[email protected]>
Committed: Mon May 12 21:33:36 2014 -0700

----------------------------------------------------------------------
 .../exec/planner/physical/HashJoinPrule.java    |  48 +++-----
 .../exec/planner/physical/JoinPruleBase.java    | 115 +++++++++++++++++++
 .../exec/planner/physical/MergeJoinPrule.java   |  37 +-----
 .../exec/planner/physical/PlannerSettings.java  |  12 +-
 .../server/options/SystemOptionManager.java     |   1 +
 5 files changed, 146 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f1eaadea/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrule.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrule.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrule.java
index 851877f..cbf1762 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrule.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrule.java
@@ -22,10 +22,13 @@ import java.util.logging.Logger;
 import org.apache.drill.exec.planner.logical.DrillJoinRel;
 import org.apache.drill.exec.planner.logical.RelOptHelper;
 import org.eigenbase.rel.InvalidRelException;
+import org.eigenbase.rel.RelCollation;
 import org.eigenbase.rel.RelNode;
+import org.eigenbase.rel.metadata.RelMetadataQuery;
 import org.eigenbase.relopt.RelOptRule;
 import org.eigenbase.relopt.RelOptRuleCall;
 import org.eigenbase.relopt.RelTraitSet;
+import org.eigenbase.relopt.volcano.RelSubset;
 import org.eigenbase.trace.EigenbaseTrace;
 
 import com.google.common.collect.ImmutableList;
@@ -36,8 +39,7 @@ public class HashJoinPrule extends JoinPruleBase {
 
   private HashJoinPrule() {
     super(
-        RelOptHelper.some(DrillJoinRel.class, RelOptHelper.any(RelNode.class), 
RelOptHelper.any(RelNode.class)),
-        "Prel.HashJoinPrule");
+        RelOptHelper.any(DrillJoinRel.class), "Prel.HashJoinPrule");
   }
 
   @Override
@@ -48,50 +50,26 @@ public class HashJoinPrule extends JoinPruleBase {
   @Override
   public void onMatch(RelOptRuleCall call) {
     final DrillJoinRel join = (DrillJoinRel) call.rel(0);
-    final RelNode left = call.rel(1);
-    final RelNode right = call.rel(2);
+    final RelNode left = join.getLeft();
+    final RelNode right = join.getRight();
     
     if (!checkPreconditions(join, left, right)) {
       return;
     }
     
     try {
-      // Create transform request for HashJoin plan with both children HASH 
distributed
-      DrillDistributionTrait hashLeftPartition = new 
DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED,
 ImmutableList.copyOf(getDistributionField(join.getLeftKeys())));
-      DrillDistributionTrait hashRightPartition = new 
DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED,
 ImmutableList.copyOf(getDistributionField(join.getRightKeys())));
 
-      RelTraitSet traitsLeft = 
left.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(hashLeftPartition);   
-      RelTraitSet traitsRight = 
right.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(hashRightPartition);
-
-      createTransformRequest(call, join, left, right, traitsLeft, traitsRight);
-
-      // Create transform request for HashJoin plan with left child ANY 
distributed and right child BROADCAST distributed
-      /// TODO: ANY distribution seems to create some problems..need to revisit
-      // DrillDistributionTrait distAnyLeft = new 
DrillDistributionTrait(DrillDistributionTrait.DistributionType.ANY);
-      DrillDistributionTrait distBroadcastRight = new 
DrillDistributionTrait(DrillDistributionTrait.DistributionType.BROADCAST_DISTRIBUTED);
-      // traitsLeft = 
left.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(distAnyLeft);
-      traitsRight = 
right.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(distBroadcastRight);
-
-      //temporarily not generate this plan
-      //createTransformRequest(call, join, left, right, traitsLeft, 
traitsRight);
+      createDistBothPlan(call, join, PhysicalJoinType.HASH_JOIN, left, right, 
null /* left collation */, null /* right collation */);
+      
+      if (checkBroadcastConditions(call.getPlanner(), join, left, right)) {
+        createBroadcastPlan(call, join, PhysicalJoinType.HASH_JOIN, left, 
right, null /* left collation */, null /* right collation */);
 
+        // createBroadcastPlan1(call, join, PhysicalJoinType.HASH_JOIN, left, 
right, null, null);
+      }
+      
     } catch (InvalidRelException e) {
       tracer.warning(e.toString());
     }
   }
 
-  private void createTransformRequest(RelOptRuleCall call, DrillJoinRel join, 
-                                      RelNode left, RelNode right, 
-                                      RelTraitSet traitsLeft, RelTraitSet 
traitsRight)
-    throws InvalidRelException { 
-
-    final RelNode convertedLeft = convert(left, traitsLeft);
-    final RelNode convertedRight = convert(right, traitsRight);
-      
-    HashJoinPrel newJoin = new HashJoinPrel(join.getCluster(), traitsLeft, 
-                                              convertedLeft, convertedRight, 
join.getCondition(),
-                                              join.getJoinType());
-    call.transformTo(newJoin) ;
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f1eaadea/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPruleBase.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPruleBase.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPruleBase.java
index 883aac5..1b30d68 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPruleBase.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPruleBase.java
@@ -20,19 +20,30 @@ package org.apache.drill.exec.planner.physical;
 
 import java.util.List;
 
+import org.apache.drill.exec.planner.common.DrillJoinRelBase;
 import org.apache.drill.exec.planner.logical.DrillJoinRel;
 import 
org.apache.drill.exec.planner.physical.DrillDistributionTrait.DistributionField;
+import org.eigenbase.rel.InvalidRelException;
+import org.eigenbase.rel.RelCollation;
 import org.eigenbase.rel.RelNode;
+import org.eigenbase.rel.metadata.RelMetadataQuery;
+import org.eigenbase.relopt.RelOptPlanner;
 import org.eigenbase.relopt.RelOptRule;
+import org.eigenbase.relopt.RelOptRuleCall;
 import org.eigenbase.relopt.RelOptRuleOperand;
 import org.eigenbase.relopt.RelOptUtil;
+import org.eigenbase.relopt.RelTraitSet;
+import org.eigenbase.relopt.volcano.RelSubset;
 import org.eigenbase.rex.RexNode;
 
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 
 // abstract base class for the join physical rules  
 public abstract class JoinPruleBase extends RelOptRule {
 
+  protected static enum PhysicalJoinType {HASH_JOIN, MERGE_JOIN};
+  
   protected JoinPruleBase(RelOptRuleOperand operand, String description) {
     super(operand, description);   
   }
@@ -63,4 +74,108 @@ public abstract class JoinPruleBase extends RelOptRule {
     return distFields;
   }  
   
+  protected boolean checkBroadcastConditions(RelOptPlanner planner, 
DrillJoinRel join, RelNode left, RelNode right) {
+    if (! PrelUtil.getPlannerSettings(planner).isBroadcastJoinEnabled()) {
+      return false;
+    }
+
+    double estimatedRightRowCount = RelMetadataQuery.getRowCount(right);
+    if (estimatedRightRowCount < 
PrelUtil.getSettings(join.getCluster()).getBroadcastThreshold() 
+        && ! 
left.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE).equals(DrillDistributionTrait.SINGLETON)
+        ) {
+      return true;
+    }
+    return false;
+  }
+  
+  // Create join plan with both left and right children hash distributed. If 
the physical join type 
+  // is MergeJoin, a collation must be provided for both left and right child 
and the plan will contain 
+  // sort converter if necessary to provide the collation. 
+  protected void createDistBothPlan(RelOptRuleCall call, DrillJoinRel join,
+      PhysicalJoinType physicalJoinType, 
+      RelNode left, RelNode right, 
+      RelCollation collationLeft, RelCollation collationRight) throws 
InvalidRelException {
+ 
+    DrillDistributionTrait hashLeftPartition = new 
DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED,
 ImmutableList.copyOf(getDistributionField(join.getLeftKeys())));
+    DrillDistributionTrait hashRightPartition = new 
DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED,
 ImmutableList.copyOf(getDistributionField(join.getRightKeys())));
+    RelTraitSet traitsLeft = null;
+    RelTraitSet traitsRight = null;
+    
+    if (physicalJoinType == PhysicalJoinType.MERGE_JOIN) { 
+      assert collationLeft != null && collationRight != null;
+      traitsLeft = 
left.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(collationLeft).plus(hashLeftPartition);
+      traitsRight = 
right.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(collationRight).plus(hashRightPartition);
+    } else if (physicalJoinType == PhysicalJoinType.HASH_JOIN) {
+      traitsLeft = 
left.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(hashLeftPartition);
+      traitsRight = 
right.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(hashRightPartition);
+    }
+
+    final RelNode convertedLeft = convert(left, traitsLeft);
+    final RelNode convertedRight = convert(right, traitsRight);
+    
+    DrillJoinRelBase newJoin = null;
+    
+    if (physicalJoinType == PhysicalJoinType.HASH_JOIN) { 
+      newJoin = new HashJoinPrel(join.getCluster(), traitsLeft, 
+                                 convertedLeft, convertedRight, 
join.getCondition(),
+                                 join.getJoinType());
+      
+    } else if (physicalJoinType == PhysicalJoinType.MERGE_JOIN) { 
+      newJoin = new MergeJoinPrel(join.getCluster(), traitsLeft, 
+                                  convertedLeft, convertedRight, 
join.getCondition(),
+                                  join.getJoinType());
+    }
+    call.transformTo(newJoin);    
+  }
+  
+  // Create join plan with left child ANY distributed and right child 
BROADCAST distributed. If the physical join type 
+  // is MergeJoin, a collation must be provided for both left and right child 
and the plan will contain sort converter
+  // if necessary to provide the collation. 
+  protected void createBroadcastPlan(RelOptRuleCall call, DrillJoinRel join,
+      PhysicalJoinType physicalJoinType, 
+      RelNode left, RelNode right, 
+      RelCollation collationLeft, RelCollation collationRight) throws 
InvalidRelException {
+
+    DrillDistributionTrait distBroadcastRight = new 
DrillDistributionTrait(DrillDistributionTrait.DistributionType.BROADCAST_DISTRIBUTED);
+    RelTraitSet traitsRight = null;
+    if (physicalJoinType == PhysicalJoinType.MERGE_JOIN) {
+      assert collationLeft != null && collationRight != null;
+      traitsRight = 
right.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(collationRight).plus(distBroadcastRight);
+    } else {
+      traitsRight = 
right.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(distBroadcastRight);
+    }
+    
+    RelTraitSet traitsLeft = left.getTraitSet().plus(Prel.DRILL_PHYSICAL);
+    RelNode convertedLeft = convert(left, traitsLeft);  
+    RelNode convertedRight = convert(right, traitsRight);
+
+    traitsLeft = left.getTraitSet().plus(Prel.DRILL_PHYSICAL);
+
+    DrillJoinRelBase newJoin = null;
+    
+    if (convertedLeft instanceof RelSubset) {
+      RelSubset subset = (RelSubset) convertedLeft;
+      for (RelNode rel : subset.getRelList()) {
+        if 
(!rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE).equals(DrillDistributionTrait.DEFAULT))
 {
+          DrillDistributionTrait toDist = 
rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE);
+          if (physicalJoinType == PhysicalJoinType.MERGE_JOIN) {
+            traitsLeft = 
call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(collationLeft).plus(toDist);
+          } else {
+            traitsLeft = 
call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(toDist);
+          }
+          
+          RelNode newLeft = convert(left, traitsLeft);
+          if (physicalJoinType == PhysicalJoinType.HASH_JOIN) {
+            newJoin = new HashJoinPrel(join.getCluster(), traitsLeft, newLeft, 
convertedRight, join.getCondition(),
+                                       join.getJoinType());
+          } else if (physicalJoinType == PhysicalJoinType.MERGE_JOIN) {
+            newJoin = new MergeJoinPrel(join.getCluster(), traitsLeft, 
newLeft, convertedRight, join.getCondition(),
+                                        join.getJoinType());
+          }
+          call.transformTo(newJoin) ;
+        }
+      }
+    }
+  }
+  
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f1eaadea/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrule.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrule.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrule.java
index 30f651c..a5be5f8 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrule.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrule.java
@@ -27,9 +27,11 @@ import org.eigenbase.rel.RelCollation;
 import org.eigenbase.rel.RelCollationImpl;
 import org.eigenbase.rel.RelFieldCollation;
 import org.eigenbase.rel.RelNode;
+import org.eigenbase.rel.metadata.RelMetadataQuery;
 import org.eigenbase.relopt.RelOptRule;
 import org.eigenbase.relopt.RelOptRuleCall;
 import org.eigenbase.relopt.RelTraitSet;
+import org.eigenbase.relopt.volcano.RelSubset;
 import org.eigenbase.trace.EigenbaseTrace;
 
 import com.google.common.collect.ImmutableList;
@@ -64,43 +66,16 @@ public class MergeJoinPrule extends JoinPruleBase {
       RelCollation collationLeft = getCollation(join.getLeftKeys());
       RelCollation collationRight = getCollation(join.getRightKeys());
 
-      // Create transform request for MergeJoin plan with both children HASH 
distributed
-      DrillDistributionTrait hashLeftPartition = new 
DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED,
 ImmutableList.copyOf(getDistributionField(join.getLeftKeys())));
-      DrillDistributionTrait hashRightPartition = new 
DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED,
 ImmutableList.copyOf(getDistributionField(join.getRightKeys())));
+      createDistBothPlan(call, join, PhysicalJoinType.MERGE_JOIN, left, right, 
collationLeft, collationRight);
 
-      RelTraitSet traitsLeft = 
left.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(collationLeft).plus(hashLeftPartition);
   
-      RelTraitSet traitsRight = 
right.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(collationRight).plus(hashRightPartition);
-
-      createTransformRequest(call, join, left, right, traitsLeft, traitsRight);
-
-      // Create transform request for MergeJoin plan with left child ANY 
distributed and right child BROADCAST distributed
-      /// TODO: ANY distribution seems to create some problems..need to revisit
-      // DrillDistributionTrait distAnyLeft = new 
DrillDistributionTrait(DrillDistributionTrait.DistributionType.ANY);
-      DrillDistributionTrait distBroadcastRight = new 
DrillDistributionTrait(DrillDistributionTrait.DistributionType.BROADCAST_DISTRIBUTED);
-      // traitsLeft = 
left.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(collationLeft).plus(distAnyLeft);
-      traitsRight = 
right.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(collationRight).plus(distBroadcastRight);
-
-      //temporarily not generate this plan
-      //createTransformRequest(call, join, left, right, traitsLeft, 
traitsRight);
+      if (checkBroadcastConditions(call.getPlanner(), join, left, right)) {
+        createBroadcastPlan(call, join, PhysicalJoinType.MERGE_JOIN, left, 
right, collationLeft, collationRight);
+      }
 
     } catch (InvalidRelException e) {
       tracer.warning(e.toString());
     }
   }
-
-  private void createTransformRequest(RelOptRuleCall call, DrillJoinRel join, 
-                                      RelNode left, RelNode right, 
-                                      RelTraitSet traitsLeft, RelTraitSet 
traitsRight)
-    throws InvalidRelException { 
-
-    final RelNode convertedLeft = convert(left, traitsLeft);
-    final RelNode convertedRight = convert(right, traitsRight);
-      
-    MergeJoinPrel newJoin = new MergeJoinPrel(join.getCluster(), traitsLeft, 
-                                              convertedLeft, convertedRight, 
join.getCondition(),
-                                              join.getJoinType());
-    call.transformTo(newJoin) ;
-  }
   
   private RelCollation getCollation(List<Integer> keys){    
     List<RelFieldCollation> fields = Lists.newArrayList();

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f1eaadea/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
index 5bead30..e65ef17 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
@@ -28,6 +28,7 @@ public class PlannerSettings implements FrameworkContext{
 
   private int numEndPoints = 0;
   private boolean useDefaultCosting = false; // True: use default Optiq 
costing, False: use Drill costing
+  private int broadcastThreshold = 10000; // Consider broadcast inner plans if 
estimated rows is less than this threshold
 
   public static final OptionValidator EXCHANGE = new 
BooleanValidator("planner.disable_exchanges", false);
   public static final OptionValidator HASHAGG = new 
BooleanValidator("planner.enable_hashagg", true);
@@ -35,7 +36,8 @@ public class PlannerSettings implements FrameworkContext{
   public static final OptionValidator HASHJOIN = new 
BooleanValidator("planner.enable_hashjoin", true);  
   public static final OptionValidator MERGEJOIN = new 
BooleanValidator("planner.enable_mergejoin", true);  
   public static final OptionValidator MULTIPHASE = new 
BooleanValidator("planner.enable_multiphase_agg", false);  
-  
+  public static final OptionValidator BROADCAST = new 
BooleanValidator("planner.enable_broadcast_join", false);
+
   public OptionManager options = null;
 
   public PlannerSettings(OptionManager options){
@@ -82,6 +84,14 @@ public class PlannerSettings implements FrameworkContext{
     return options.getOption(MULTIPHASE.getOptionName()).bool_val;
   }
   
+  public boolean isBroadcastJoinEnabled() {
+    return options.getOption(BROADCAST.getOptionName()).bool_val;  
+  }
+
+  public int getBroadcastThreshold() {
+    return broadcastThreshold;  
+  }
+  
   @Override
   public <T> T unwrap(Class<T> clazz) {
     if(clazz == PlannerSettings.class){

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f1eaadea/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index 21031e5..8d9a68f 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -40,6 +40,7 @@ public class SystemOptionManager implements OptionManager{
       PlannerSettings.HASHJOIN,
       PlannerSettings.MERGEJOIN, 
       PlannerSettings.MULTIPHASE,
+      PlannerSettings.BROADCAST,
       ExecConstants.OUTPUT_FORMAT_VALIDATOR,
       ExecConstants.PARQUET_BLOCK_SIZE_VALIDATOR
   };

Reply via email to