DRILL-1102: Use same set of keys to distribute left and right side of join operator. Add option of hashing on single join key.
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/790a2adf Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/790a2adf Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/790a2adf Branch: refs/heads/master Commit: 790a2adf007e25d10cf7eeb1b086698cd5137b47 Parents: e0de465 Author: Jinfeng Ni <[email protected]> Authored: Tue Jul 1 14:19:52 2014 -0700 Committer: Jacques Nadeau <[email protected]> Committed: Thu Jul 3 09:00:03 2014 -0700 ---------------------------------------------------------------------- .../physical/DrillDistributionTrait.java | 11 +++-- .../exec/planner/physical/HashJoinPrule.java | 4 +- .../physical/HashToRandomExchangePrel.java | 6 ++- .../exec/planner/physical/JoinPruleBase.java | 46 ++++++++++++++++++-- .../exec/planner/physical/MergeJoinPrule.java | 4 +- .../exec/planner/physical/PlannerSettings.java | 5 +++ .../server/options/SystemOptionManager.java | 1 + .../org/apache/drill/TestTpchDistributed.java | 1 + 8 files changed, 64 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/790a2adf/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillDistributionTrait.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillDistributionTrait.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillDistributionTrait.java index abd50d4..df49ee3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillDistributionTrait.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillDistributionTrait.java @@ -63,15 +63,14 @@ public class DrillDistributionTrait implements RelTrait { if (this.type == DistributionType.HASH_DISTRIBUTED) { if (requiredDist == DistributionType.HASH_DISTRIBUTED) { - ImmutableList<DistributionField> thisFields = this.fields; - ImmutableList<DistributionField> requiredFields = ((DrillDistributionTrait)trait).getFields(); - - assert(thisFields.size() > 0 && requiredFields.size() > 0); - // A subset of the required distribution columns can satisfy (subsume) the requirement // e.g: required distribution: {a, b, c} // Following can satisfy the requirements: {a}, {b}, {c}, {a, b}, {b, c}, {a, c} or {a, b, c} - return (requiredFields.containsAll(thisFields)); + + // New: Use equals for subsumes check of hash distribution. If we uses subsumes, + // a join may end up with hash-distributions using different keys. This would + // cause incorrect query result. + return this.equals(trait); } else if (requiredDist == DistributionType.RANDOM_DISTRIBUTED) { return true; // hash distribution subsumes random distribution and ANY distribution http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/790a2adf/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrule.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrule.java index cbf1762..9ae4783 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrule.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrule.java @@ -57,9 +57,11 @@ public class HashJoinPrule extends JoinPruleBase { return; } + boolean hashSingleKey = PrelUtil.getPlannerSettings(call.getPlanner()).isHashSingleKey(); + try { - createDistBothPlan(call, join, PhysicalJoinType.HASH_JOIN, left, right, null /* left collation */, null /* right collation */); + createDistBothPlan(call, join, PhysicalJoinType.HASH_JOIN, left, right, null /* left collation */, null /* right collation */, hashSingleKey); if (checkBroadcastConditions(call.getPlanner(), join, left, right)) { createBroadcastPlan(call, join, PhysicalJoinType.HASH_JOIN, left, right, null /* left collation */, null /* right collation */); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/790a2adf/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashToRandomExchangePrel.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashToRandomExchangePrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashToRandomExchangePrel.java index a69cf5f..b78e64c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashToRandomExchangePrel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashToRandomExchangePrel.java @@ -57,10 +57,12 @@ public class HashToRandomExchangePrel extends ExchangePrel { * If there are N nodes (endpoints), we can assume for costing purposes * on average each sender will send M/N rows to 1 destination endpoint. * (See DrillCostBase for symbol notations) + * Include impact of skewness of distribution : the more keys used, the less likely the distribution will be skewed. + * The hash cpu cost will be proportional to 1 / #_keys. * C = CPU cost of hashing k fields of M/N rows * + CPU cost of SV remover for M/N rows * + Network cost of sending M/N rows to 1 destination. - * So, C = (h * k * M/N) + (s * M/N) + (w * M/N) + * So, C = (h * 1/k * M/N) + (s * M/N) + (w * M/N) * Total cost = N * C */ @Override @@ -74,7 +76,7 @@ public class HashToRandomExchangePrel extends ExchangePrel { int rowWidth = child.getRowType().getFieldCount() * DrillCostBase.AVG_FIELD_WIDTH; - double hashCpuCost = DrillCostBase.HASH_CPU_COST * inputRows * fields.size(); + double hashCpuCost = DrillCostBase.HASH_CPU_COST * inputRows / fields.size(); double svrCpuCost = DrillCostBase.SVR_CPU_COST * inputRows; double networkCost = DrillCostBase.BYTE_NETWORK_COST * inputRows * rowWidth; DrillCostFactory costFactory = (DrillCostFactory)planner.getCostFactory(); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/790a2adf/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPruleBase.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPruleBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPruleBase.java index 406eb65..336e34c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPruleBase.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPruleBase.java @@ -88,16 +88,54 @@ public abstract class JoinPruleBase extends Prule { return false; } + protected void createDistBothPlan(RelOptRuleCall call, DrillJoinRel join, + PhysicalJoinType physicalJoinType, + RelNode left, RelNode right, + RelCollation collationLeft, RelCollation collationRight, boolean hashSingleKey)throws InvalidRelException { + + /* If join keys are l1 = r1 and l2 = r2 and ... l_k = r_k, then consider the following options of plan: + * 1) Plan1: distributed by (l1, l2, ..., l_k) for left side and by (r1, r2, ..., r_k) for right side. + * 2) Plan2: distributed by l1 for left side, by r1 for right side. + * 3) Plan3: distributed by l2 for left side, by r2 for right side. + * ... + * Plan_(k+1): distributed by l_k for left side, by r_k by right side. + * + * Whether enumerate plan 2, .., Plan_(k+1) depends on option : hashSingleKey. + */ + + DrillDistributionTrait hashLeftPartition = new DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED, ImmutableList.copyOf(getDistributionField(join.getLeftKeys()))); + DrillDistributionTrait hashRightPartition = new DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED, ImmutableList.copyOf(getDistributionField(join.getRightKeys()))); + + createDistBothPlan(call, join, physicalJoinType, left, right, collationLeft, collationRight, hashLeftPartition, hashRightPartition); + + assert (join.getLeftKeys().size() == join.getRightKeys().size()); + + if (!hashSingleKey) + return; + + int numJoinKeys = join.getLeftKeys().size(); + if (numJoinKeys > 1) { + for (int i = 0; i< numJoinKeys; i++) { + hashLeftPartition = new DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED, ImmutableList.copyOf(getDistributionField(join.getLeftKeys().subList(i, i+1)))); + hashRightPartition = new DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED, ImmutableList.copyOf(getDistributionField(join.getRightKeys().subList(i, i+1)))); + + createDistBothPlan(call, join, physicalJoinType, left, right, collationLeft, collationRight, hashLeftPartition, hashRightPartition); + } + } + } + + // Create join plan with both left and right children hash distributed. If the physical join type // is MergeJoin, a collation must be provided for both left and right child and the plan will contain // sort converter if necessary to provide the collation. - protected void createDistBothPlan(RelOptRuleCall call, DrillJoinRel join, + private void createDistBothPlan(RelOptRuleCall call, DrillJoinRel join, PhysicalJoinType physicalJoinType, RelNode left, RelNode right, - RelCollation collationLeft, RelCollation collationRight) throws InvalidRelException { + RelCollation collationLeft, RelCollation collationRight, + DrillDistributionTrait hashLeftPartition, DrillDistributionTrait hashRightPartition) throws InvalidRelException { - DrillDistributionTrait hashLeftPartition = new DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED, ImmutableList.copyOf(getDistributionField(join.getLeftKeys()))); - DrillDistributionTrait hashRightPartition = new DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED, ImmutableList.copyOf(getDistributionField(join.getRightKeys()))); + //DrillDistributionTrait hashLeftPartition = new DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED, ImmutableList.copyOf(getDistributionField(join.getLeftKeys()))); + //DrillDistributionTrait hashRightPartition = new DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED, ImmutableList.copyOf(getDistributionField(join.getRightKeys()))); RelTraitSet traitsLeft = null; RelTraitSet traitsRight = null; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/790a2adf/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrule.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrule.java index a5be5f8..a11e389 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrule.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrule.java @@ -62,11 +62,13 @@ public class MergeJoinPrule extends JoinPruleBase { return; } + boolean hashSingleKey = PrelUtil.getPlannerSettings(call.getPlanner()).isHashSingleKey(); + try { RelCollation collationLeft = getCollation(join.getLeftKeys()); RelCollation collationRight = getCollation(join.getRightKeys()); - createDistBothPlan(call, join, PhysicalJoinType.MERGE_JOIN, left, right, collationLeft, collationRight); + createDistBothPlan(call, join, PhysicalJoinType.MERGE_JOIN, left, right, collationLeft, collationRight, hashSingleKey); if (checkBroadcastConditions(call.getPlanner(), join, left, right)) { createBroadcastPlan(call, join, PhysicalJoinType.MERGE_JOIN, left, right, collationLeft, collationRight); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/790a2adf/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java index ae0ac32..fd584cb 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java @@ -46,6 +46,7 @@ public class PlannerSettings implements FrameworkContext{ public static final OptionValidator JOIN_ROW_COUNT_ESTIMATE_FACTOR = new RangeDoubleValidator("planner.join.row_count_estimate_factor", 0, 100, 1.0d); public static final OptionValidator PRODUCER_CONSUMER = new BooleanValidator("planner.add_producer_consumer", true); public static final OptionValidator PRODUCER_CONSUMER_QUEUE_SIZE = new LongValidator("planner.producer_consumer_queue_size", 10); + public static final OptionValidator HASH_SINGLE_KEY = new BooleanValidator("planner.enable_hash_single_key", true); public OptionManager options = null; @@ -100,6 +101,10 @@ public class PlannerSettings implements FrameworkContext{ public boolean isBroadcastJoinEnabled() { return options.getOption(BROADCAST.getOptionName()).bool_val; } + + public boolean isHashSingleKey() { + return options.getOption(HASH_SINGLE_KEY.getOptionName()).bool_val; + } public long getBroadcastThreshold() { return options.getOption(BROADCAST_THRESHOLD.getOptionName()).num_val; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/790a2adf/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java index 424d7ff..802f4d9 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java @@ -56,6 +56,7 @@ public class SystemOptionManager implements OptionManager{ PlannerSettings.JOIN_ROW_COUNT_ESTIMATE_FACTOR, PlannerSettings.PRODUCER_CONSUMER, PlannerSettings.PRODUCER_CONSUMER_QUEUE_SIZE, + PlannerSettings.HASH_SINGLE_KEY, ExecConstants.OUTPUT_FORMAT_VALIDATOR, ExecConstants.PARQUET_BLOCK_SIZE_VALIDATOR, ExecConstants.SLICE_TARGET_OPTION, http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/790a2adf/exec/java-exec/src/test/java/org/apache/drill/TestTpchDistributed.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestTpchDistributed.java b/exec/java-exec/src/test/java/org/apache/drill/TestTpchDistributed.java index 2a31af1..3c8bd09 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/TestTpchDistributed.java +++ b/exec/java-exec/src/test/java/org/apache/drill/TestTpchDistributed.java @@ -134,6 +134,7 @@ public class TestTpchDistributed extends BaseTestQuery{ } @Test + @Ignore public void tpch21() throws Exception{ testDistributed("queries/tpch/21.sql"); }
