DRILL-646: Broadcast-inner plans for Hash Join and Merge Join. - Consolidated planning methods in base class. - Add option to enable/disable broadcast join. Default is disabled for now.
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/f1eaadea Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/f1eaadea Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/f1eaadea Branch: refs/heads/master Commit: f1eaadea87558bbb96bdc01fc69b78879fd65037 Parents: d414d5d Author: Aman Sinha <[email protected]> Authored: Sat May 10 18:05:14 2014 -0700 Committer: Aditya Kishore <[email protected]> Committed: Mon May 12 21:33:36 2014 -0700 ---------------------------------------------------------------------- .../exec/planner/physical/HashJoinPrule.java | 48 +++----- .../exec/planner/physical/JoinPruleBase.java | 115 +++++++++++++++++++ .../exec/planner/physical/MergeJoinPrule.java | 37 +----- .../exec/planner/physical/PlannerSettings.java | 12 +- .../server/options/SystemOptionManager.java | 1 + 5 files changed, 146 insertions(+), 67 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f1eaadea/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrule.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrule.java index 851877f..cbf1762 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrule.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrule.java @@ -22,10 +22,13 @@ import java.util.logging.Logger; import org.apache.drill.exec.planner.logical.DrillJoinRel; import org.apache.drill.exec.planner.logical.RelOptHelper; import org.eigenbase.rel.InvalidRelException; +import org.eigenbase.rel.RelCollation; import org.eigenbase.rel.RelNode; +import org.eigenbase.rel.metadata.RelMetadataQuery; import org.eigenbase.relopt.RelOptRule; import org.eigenbase.relopt.RelOptRuleCall; import org.eigenbase.relopt.RelTraitSet; +import org.eigenbase.relopt.volcano.RelSubset; import org.eigenbase.trace.EigenbaseTrace; import com.google.common.collect.ImmutableList; @@ -36,8 +39,7 @@ public class HashJoinPrule extends JoinPruleBase { private HashJoinPrule() { super( - RelOptHelper.some(DrillJoinRel.class, RelOptHelper.any(RelNode.class), RelOptHelper.any(RelNode.class)), - "Prel.HashJoinPrule"); + RelOptHelper.any(DrillJoinRel.class), "Prel.HashJoinPrule"); } @Override @@ -48,50 +50,26 @@ public class HashJoinPrule extends JoinPruleBase { @Override public void onMatch(RelOptRuleCall call) { final DrillJoinRel join = (DrillJoinRel) call.rel(0); - final RelNode left = call.rel(1); - final RelNode right = call.rel(2); + final RelNode left = join.getLeft(); + final RelNode right = join.getRight(); if (!checkPreconditions(join, left, right)) { return; } try { - // Create transform request for HashJoin plan with both children HASH distributed - DrillDistributionTrait hashLeftPartition = new DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED, ImmutableList.copyOf(getDistributionField(join.getLeftKeys()))); - DrillDistributionTrait hashRightPartition = new DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED, ImmutableList.copyOf(getDistributionField(join.getRightKeys()))); - RelTraitSet traitsLeft = left.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(hashLeftPartition); - RelTraitSet traitsRight = right.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(hashRightPartition); - - createTransformRequest(call, join, left, right, traitsLeft, traitsRight); - - // Create transform request for HashJoin plan with left child ANY distributed and right child BROADCAST distributed - /// TODO: ANY distribution seems to create some problems..need to revisit - // DrillDistributionTrait distAnyLeft = new DrillDistributionTrait(DrillDistributionTrait.DistributionType.ANY); - DrillDistributionTrait distBroadcastRight = new DrillDistributionTrait(DrillDistributionTrait.DistributionType.BROADCAST_DISTRIBUTED); - // traitsLeft = left.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(distAnyLeft); - traitsRight = right.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(distBroadcastRight); - - //temporarily not generate this plan - //createTransformRequest(call, join, left, right, traitsLeft, traitsRight); + createDistBothPlan(call, join, PhysicalJoinType.HASH_JOIN, left, right, null /* left collation */, null /* right collation */); + + if (checkBroadcastConditions(call.getPlanner(), join, left, right)) { + createBroadcastPlan(call, join, PhysicalJoinType.HASH_JOIN, left, right, null /* left collation */, null /* right collation */); + // createBroadcastPlan1(call, join, PhysicalJoinType.HASH_JOIN, left, right, null, null); + } + } catch (InvalidRelException e) { tracer.warning(e.toString()); } } - private void createTransformRequest(RelOptRuleCall call, DrillJoinRel join, - RelNode left, RelNode right, - RelTraitSet traitsLeft, RelTraitSet traitsRight) - throws InvalidRelException { - - final RelNode convertedLeft = convert(left, traitsLeft); - final RelNode convertedRight = convert(right, traitsRight); - - HashJoinPrel newJoin = new HashJoinPrel(join.getCluster(), traitsLeft, - convertedLeft, convertedRight, join.getCondition(), - join.getJoinType()); - call.transformTo(newJoin) ; - } - } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f1eaadea/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPruleBase.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPruleBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPruleBase.java index 883aac5..1b30d68 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPruleBase.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPruleBase.java @@ -20,19 +20,30 @@ package org.apache.drill.exec.planner.physical; import java.util.List; +import org.apache.drill.exec.planner.common.DrillJoinRelBase; import org.apache.drill.exec.planner.logical.DrillJoinRel; import org.apache.drill.exec.planner.physical.DrillDistributionTrait.DistributionField; +import org.eigenbase.rel.InvalidRelException; +import org.eigenbase.rel.RelCollation; import org.eigenbase.rel.RelNode; +import org.eigenbase.rel.metadata.RelMetadataQuery; +import org.eigenbase.relopt.RelOptPlanner; import org.eigenbase.relopt.RelOptRule; +import org.eigenbase.relopt.RelOptRuleCall; import org.eigenbase.relopt.RelOptRuleOperand; import org.eigenbase.relopt.RelOptUtil; +import org.eigenbase.relopt.RelTraitSet; +import org.eigenbase.relopt.volcano.RelSubset; import org.eigenbase.rex.RexNode; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; // abstract base class for the join physical rules public abstract class JoinPruleBase extends RelOptRule { + protected static enum PhysicalJoinType {HASH_JOIN, MERGE_JOIN}; + protected JoinPruleBase(RelOptRuleOperand operand, String description) { super(operand, description); } @@ -63,4 +74,108 @@ public abstract class JoinPruleBase extends RelOptRule { return distFields; } + protected boolean checkBroadcastConditions(RelOptPlanner planner, DrillJoinRel join, RelNode left, RelNode right) { + if (! PrelUtil.getPlannerSettings(planner).isBroadcastJoinEnabled()) { + return false; + } + + double estimatedRightRowCount = RelMetadataQuery.getRowCount(right); + if (estimatedRightRowCount < PrelUtil.getSettings(join.getCluster()).getBroadcastThreshold() + && ! left.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE).equals(DrillDistributionTrait.SINGLETON) + ) { + return true; + } + return false; + } + + // Create join plan with both left and right children hash distributed. If the physical join type + // is MergeJoin, a collation must be provided for both left and right child and the plan will contain + // sort converter if necessary to provide the collation. + protected void createDistBothPlan(RelOptRuleCall call, DrillJoinRel join, + PhysicalJoinType physicalJoinType, + RelNode left, RelNode right, + RelCollation collationLeft, RelCollation collationRight) throws InvalidRelException { + + DrillDistributionTrait hashLeftPartition = new DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED, ImmutableList.copyOf(getDistributionField(join.getLeftKeys()))); + DrillDistributionTrait hashRightPartition = new DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED, ImmutableList.copyOf(getDistributionField(join.getRightKeys()))); + RelTraitSet traitsLeft = null; + RelTraitSet traitsRight = null; + + if (physicalJoinType == PhysicalJoinType.MERGE_JOIN) { + assert collationLeft != null && collationRight != null; + traitsLeft = left.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(collationLeft).plus(hashLeftPartition); + traitsRight = right.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(collationRight).plus(hashRightPartition); + } else if (physicalJoinType == PhysicalJoinType.HASH_JOIN) { + traitsLeft = left.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(hashLeftPartition); + traitsRight = right.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(hashRightPartition); + } + + final RelNode convertedLeft = convert(left, traitsLeft); + final RelNode convertedRight = convert(right, traitsRight); + + DrillJoinRelBase newJoin = null; + + if (physicalJoinType == PhysicalJoinType.HASH_JOIN) { + newJoin = new HashJoinPrel(join.getCluster(), traitsLeft, + convertedLeft, convertedRight, join.getCondition(), + join.getJoinType()); + + } else if (physicalJoinType == PhysicalJoinType.MERGE_JOIN) { + newJoin = new MergeJoinPrel(join.getCluster(), traitsLeft, + convertedLeft, convertedRight, join.getCondition(), + join.getJoinType()); + } + call.transformTo(newJoin); + } + + // Create join plan with left child ANY distributed and right child BROADCAST distributed. If the physical join type + // is MergeJoin, a collation must be provided for both left and right child and the plan will contain sort converter + // if necessary to provide the collation. + protected void createBroadcastPlan(RelOptRuleCall call, DrillJoinRel join, + PhysicalJoinType physicalJoinType, + RelNode left, RelNode right, + RelCollation collationLeft, RelCollation collationRight) throws InvalidRelException { + + DrillDistributionTrait distBroadcastRight = new DrillDistributionTrait(DrillDistributionTrait.DistributionType.BROADCAST_DISTRIBUTED); + RelTraitSet traitsRight = null; + if (physicalJoinType == PhysicalJoinType.MERGE_JOIN) { + assert collationLeft != null && collationRight != null; + traitsRight = right.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(collationRight).plus(distBroadcastRight); + } else { + traitsRight = right.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(distBroadcastRight); + } + + RelTraitSet traitsLeft = left.getTraitSet().plus(Prel.DRILL_PHYSICAL); + RelNode convertedLeft = convert(left, traitsLeft); + RelNode convertedRight = convert(right, traitsRight); + + traitsLeft = left.getTraitSet().plus(Prel.DRILL_PHYSICAL); + + DrillJoinRelBase newJoin = null; + + if (convertedLeft instanceof RelSubset) { + RelSubset subset = (RelSubset) convertedLeft; + for (RelNode rel : subset.getRelList()) { + if (!rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE).equals(DrillDistributionTrait.DEFAULT)) { + DrillDistributionTrait toDist = rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE); + if (physicalJoinType == PhysicalJoinType.MERGE_JOIN) { + traitsLeft = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(collationLeft).plus(toDist); + } else { + traitsLeft = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(toDist); + } + + RelNode newLeft = convert(left, traitsLeft); + if (physicalJoinType == PhysicalJoinType.HASH_JOIN) { + newJoin = new HashJoinPrel(join.getCluster(), traitsLeft, newLeft, convertedRight, join.getCondition(), + join.getJoinType()); + } else if (physicalJoinType == PhysicalJoinType.MERGE_JOIN) { + newJoin = new MergeJoinPrel(join.getCluster(), traitsLeft, newLeft, convertedRight, join.getCondition(), + join.getJoinType()); + } + call.transformTo(newJoin) ; + } + } + } + } + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f1eaadea/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrule.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrule.java index 30f651c..a5be5f8 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrule.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrule.java @@ -27,9 +27,11 @@ import org.eigenbase.rel.RelCollation; import org.eigenbase.rel.RelCollationImpl; import org.eigenbase.rel.RelFieldCollation; import org.eigenbase.rel.RelNode; +import org.eigenbase.rel.metadata.RelMetadataQuery; import org.eigenbase.relopt.RelOptRule; import org.eigenbase.relopt.RelOptRuleCall; import org.eigenbase.relopt.RelTraitSet; +import org.eigenbase.relopt.volcano.RelSubset; import org.eigenbase.trace.EigenbaseTrace; import com.google.common.collect.ImmutableList; @@ -64,43 +66,16 @@ public class MergeJoinPrule extends JoinPruleBase { RelCollation collationLeft = getCollation(join.getLeftKeys()); RelCollation collationRight = getCollation(join.getRightKeys()); - // Create transform request for MergeJoin plan with both children HASH distributed - DrillDistributionTrait hashLeftPartition = new DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED, ImmutableList.copyOf(getDistributionField(join.getLeftKeys()))); - DrillDistributionTrait hashRightPartition = new DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED, ImmutableList.copyOf(getDistributionField(join.getRightKeys()))); + createDistBothPlan(call, join, PhysicalJoinType.MERGE_JOIN, left, right, collationLeft, collationRight); - RelTraitSet traitsLeft = left.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(collationLeft).plus(hashLeftPartition); - RelTraitSet traitsRight = right.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(collationRight).plus(hashRightPartition); - - createTransformRequest(call, join, left, right, traitsLeft, traitsRight); - - // Create transform request for MergeJoin plan with left child ANY distributed and right child BROADCAST distributed - /// TODO: ANY distribution seems to create some problems..need to revisit - // DrillDistributionTrait distAnyLeft = new DrillDistributionTrait(DrillDistributionTrait.DistributionType.ANY); - DrillDistributionTrait distBroadcastRight = new DrillDistributionTrait(DrillDistributionTrait.DistributionType.BROADCAST_DISTRIBUTED); - // traitsLeft = left.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(collationLeft).plus(distAnyLeft); - traitsRight = right.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(collationRight).plus(distBroadcastRight); - - //temporarily not generate this plan - //createTransformRequest(call, join, left, right, traitsLeft, traitsRight); + if (checkBroadcastConditions(call.getPlanner(), join, left, right)) { + createBroadcastPlan(call, join, PhysicalJoinType.MERGE_JOIN, left, right, collationLeft, collationRight); + } } catch (InvalidRelException e) { tracer.warning(e.toString()); } } - - private void createTransformRequest(RelOptRuleCall call, DrillJoinRel join, - RelNode left, RelNode right, - RelTraitSet traitsLeft, RelTraitSet traitsRight) - throws InvalidRelException { - - final RelNode convertedLeft = convert(left, traitsLeft); - final RelNode convertedRight = convert(right, traitsRight); - - MergeJoinPrel newJoin = new MergeJoinPrel(join.getCluster(), traitsLeft, - convertedLeft, convertedRight, join.getCondition(), - join.getJoinType()); - call.transformTo(newJoin) ; - } private RelCollation getCollation(List<Integer> keys){ List<RelFieldCollation> fields = Lists.newArrayList(); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f1eaadea/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java index 5bead30..e65ef17 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java @@ -28,6 +28,7 @@ public class PlannerSettings implements FrameworkContext{ private int numEndPoints = 0; private boolean useDefaultCosting = false; // True: use default Optiq costing, False: use Drill costing + private int broadcastThreshold = 10000; // Consider broadcast inner plans if estimated rows is less than this threshold public static final OptionValidator EXCHANGE = new BooleanValidator("planner.disable_exchanges", false); public static final OptionValidator HASHAGG = new BooleanValidator("planner.enable_hashagg", true); @@ -35,7 +36,8 @@ public class PlannerSettings implements FrameworkContext{ public static final OptionValidator HASHJOIN = new BooleanValidator("planner.enable_hashjoin", true); public static final OptionValidator MERGEJOIN = new BooleanValidator("planner.enable_mergejoin", true); public static final OptionValidator MULTIPHASE = new BooleanValidator("planner.enable_multiphase_agg", false); - + public static final OptionValidator BROADCAST = new BooleanValidator("planner.enable_broadcast_join", false); + public OptionManager options = null; public PlannerSettings(OptionManager options){ @@ -82,6 +84,14 @@ public class PlannerSettings implements FrameworkContext{ return options.getOption(MULTIPHASE.getOptionName()).bool_val; } + public boolean isBroadcastJoinEnabled() { + return options.getOption(BROADCAST.getOptionName()).bool_val; + } + + public int getBroadcastThreshold() { + return broadcastThreshold; + } + @Override public <T> T unwrap(Class<T> clazz) { if(clazz == PlannerSettings.class){ http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f1eaadea/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java index 21031e5..8d9a68f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java @@ -40,6 +40,7 @@ public class SystemOptionManager implements OptionManager{ PlannerSettings.HASHJOIN, PlannerSettings.MERGEJOIN, PlannerSettings.MULTIPHASE, + PlannerSettings.BROADCAST, ExecConstants.OUTPUT_FORMAT_VALIDATOR, ExecConstants.PARQUET_BLOCK_SIZE_VALIDATOR };
