Repository: hive Updated Branches: refs/heads/master 2b9f2f5e2 -> 6a2993912
HIVE-10559: IndexOutOfBoundsException with RemoveDynamicPruningBySize (Wei Zhang via Gunther Hagleitner) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/6a299391 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/6a299391 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/6a299391 Branch: refs/heads/master Commit: 6a2993912730702edb68dd37acee7ec0af4e6d19 Parents: 2b9f2f5 Author: Gunther Hagleitner <gunt...@apache.org> Authored: Thu May 14 15:48:34 2015 -0700 Committer: Gunther Hagleitner <gunt...@apache.org> Committed: Thu May 14 15:48:34 2015 -0700 ---------------------------------------------------------------------- .../hive/ql/optimizer/ConvertJoinMapJoin.java | 25 ++-- .../optimizer/RemoveDynamicPruningBySize.java | 19 ++- .../hadoop/hive/ql/parse/GenTezUtils.java | 16 +++ .../hive/ql/parse/OptimizeTezProcContext.java | 14 +++ .../hadoop/hive/ql/parse/TezCompiler.java | 19 ++- .../dynamic_partition_pruning_2.q | 17 +++ .../tez/dynamic_partition_pruning_2.q.out | 116 +++++++++++++++++++ 7 files changed, 196 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/6a299391/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java index d42b643..bcffdbc 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java @@ -636,13 +636,26 @@ public class ConvertJoinMapJoin implements NodeProcessor { // we might have generated a dynamic partition operator chain. Since // we're removing the reduce sink we need do remove that too. Set<Operator<?>> dynamicPartitionOperators = new HashSet<Operator<?>>(); + Map<Operator<?>, AppMasterEventOperator> opEventPairs = new HashMap<>(); for (Operator<?> c : p.getChildOperators()) { - if (hasDynamicPartitionBroadcast(c)) { + AppMasterEventOperator event = findDynamicPartitionBroadcast(c); + if (event != null) { dynamicPartitionOperators.add(c); + opEventPairs.put(c, event); } } for (Operator<?> c : dynamicPartitionOperators) { - p.removeChild(c); + if (context.pruningOpsRemovedByPriorOpt.isEmpty() || + !context.pruningOpsRemovedByPriorOpt.contains(opEventPairs.get(c))) { + p.removeChild(c); + // at this point we've found the fork in the op pipeline that has the pruning as a child plan. + LOG.info("Disabling dynamic pruning for: " + + ((DynamicPruningEventDesc) opEventPairs.get(c).getConf()).getTableScan().getName() + + ". Need to be removed together with reduce sink"); + } + } + for (Operator<?> op : dynamicPartitionOperators) { + context.pruningOpsRemovedByPriorOpt.add(opEventPairs.get(op)); } } mapJoinOp.getParentOperators().remove(bigTablePosition); @@ -662,15 +675,13 @@ public class ConvertJoinMapJoin implements NodeProcessor { return mapJoinOp; } - private boolean hasDynamicPartitionBroadcast(Operator<?> parent) { - boolean hasDynamicPartitionPruning = false; + private AppMasterEventOperator findDynamicPartitionBroadcast(Operator<?> parent) { for (Operator<?> op : parent.getChildOperators()) { while (op != null) { if (op instanceof AppMasterEventOperator && op.getConf() instanceof DynamicPruningEventDesc) { // found dynamic partition pruning operator - hasDynamicPartitionPruning = true; - break; + return (AppMasterEventOperator)op; } if (op instanceof ReduceSinkOperator || op instanceof FileSinkOperator) { // crossing reduce sink or file sink means the pruning isn't for this parent. @@ -686,6 +697,6 @@ public class ConvertJoinMapJoin implements NodeProcessor { } } - return hasDynamicPartitionPruning; + return null; } } http://git-wip-us.apache.org/repos/asf/hive/blob/6a299391/ql/src/java/org/apache/hadoop/hive/ql/optimizer/RemoveDynamicPruningBySize.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/RemoveDynamicPruningBySize.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/RemoveDynamicPruningBySize.java index 4803959..5d01311 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/RemoveDynamicPruningBySize.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/RemoveDynamicPruningBySize.java @@ -24,10 +24,10 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.ql.exec.AppMasterEventOperator; -import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.lib.Node; import org.apache.hadoop.hive.ql.lib.NodeProcessor; import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; +import org.apache.hadoop.hive.ql.parse.GenTezUtils; import org.apache.hadoop.hive.ql.parse.OptimizeTezProcContext; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.AppMasterEventDesc; @@ -52,20 +52,15 @@ public class RemoveDynamicPruningBySize implements NodeProcessor { AppMasterEventDesc desc = event.getConf(); if (desc.getStatistics().getDataSize() > context.conf - .getLongVar(ConfVars.TEZ_DYNAMIC_PARTITION_PRUNING_MAX_DATA_SIZE)) { - Operator<?> child = event; - Operator<?> curr = event; - - while (curr.getChildOperators().size() <= 1) { - child = curr; - curr = curr.getParentOperators().get(0); - } - // at this point we've found the fork in the op pipeline that has the - // pruning as a child plan. + .getLongVar(ConfVars.TEZ_DYNAMIC_PARTITION_PRUNING_MAX_DATA_SIZE) && + (context.pruningOpsRemovedByPriorOpt.isEmpty() || + !context.pruningOpsRemovedByPriorOpt.contains(event))) { + context.pruningOpsRemovedByPriorOpt.add(event); + GenTezUtils.getUtils().removeBranch(event); + // at this point we've found the fork in the op pipeline that has the pruning as a child plan. LOG.info("Disabling dynamic pruning for: " + ((DynamicPruningEventDesc) desc).getTableScan().getName() + ". Expected data size is too big: " + desc.getStatistics().getDataSize()); - curr.removeChild(child); } return false; } http://git-wip-us.apache.org/repos/asf/hive/blob/6a299391/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java index 241e9d7..0edfc5d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java @@ -461,4 +461,20 @@ public class GenTezUtils { findRoots(p, ops); } } + + /** + * Remove an operator branch. When we see a fork, we know it's time to do the removal. + * @param event the leaf node of which branch to be removed + */ + public void removeBranch(AppMasterEventOperator event) { + Operator<?> child = event; + Operator<?> curr = event; + + while (curr.getChildOperators().size() <= 1) { + child = curr; + curr = curr.getParentOperators().get(0); + } + + curr.removeChild(child); + } } http://git-wip-us.apache.org/repos/asf/hive/blob/6a299391/ql/src/java/org/apache/hadoop/hive/ql/parse/OptimizeTezProcContext.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/OptimizeTezProcContext.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/OptimizeTezProcContext.java index ee71971..e58eb66 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/OptimizeTezProcContext.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/OptimizeTezProcContext.java @@ -49,6 +49,19 @@ public class OptimizeTezProcContext implements NodeProcessorCtx{ public final Set<ReadEntity> inputs; public final Set<WriteEntity> outputs; + /* Two of the optimization rules, ConvertJoinMapJoin and RemoveDynamicPruningBySize, are put into + stats dependent optimizations and run together in TezCompiler. There's no guarantee which one + runs first, but in either case, the prior one may have removed a chain which the latter one is + not aware of. So we need to remember the leaf node(s) of that chain so it can be skipped. + + For example, as ConvertJoinMapJoin is removing the reduce sink, it may also have removed a + dynamic partition pruning operator chain. However, RemoveDynamicPruningBySize doesn't know this + and still tries to traverse that removed chain which will cause NPE. + + This may also happen when RemoveDynamicPruningBySize happens first. + */ + public HashSet<AppMasterEventOperator> pruningOpsRemovedByPriorOpt; + public final Set<ReduceSinkOperator> visitedReduceSinks = new HashSet<ReduceSinkOperator>(); @@ -66,6 +79,7 @@ public class OptimizeTezProcContext implements NodeProcessorCtx{ this.parseContext = parseContext; this.inputs = inputs; this.outputs = outputs; + this.pruningOpsRemovedByPriorOpt = new HashSet<AppMasterEventOperator>(); } public void setRootOperators(Deque<Operator<? extends OperatorDesc>> roots) { http://git-wip-us.apache.org/repos/asf/hive/blob/6a299391/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java index ea12990..56707af 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java @@ -149,7 +149,7 @@ public class TezCompiler extends TaskCompiler { if (component.size() != 1) { LOG.info("Found cycle in operator plan..."); cycleFree = false; - removeEventOperator(component); + removeEventOperator(component, procCtx); break; } } @@ -157,7 +157,7 @@ public class TezCompiler extends TaskCompiler { } } - private void removeEventOperator(Set<Operator<?>> component) { + private void removeEventOperator(Set<Operator<?>> component, OptimizeTezProcContext context) { AppMasterEventOperator victim = null; for (Operator<?> o : component) { if (o instanceof AppMasterEventOperator) { @@ -169,20 +169,17 @@ public class TezCompiler extends TaskCompiler { } } - Operator<?> child = victim; - Operator<?> curr = victim; - - while (curr.getChildOperators().size() <= 1) { - child = curr; - curr = curr.getParentOperators().get(0); + if (victim == null || + (!context.pruningOpsRemovedByPriorOpt.isEmpty() && + context.pruningOpsRemovedByPriorOpt.contains(victim))) { + return; } - // at this point we've found the fork in the op pipeline that has the - // pruning as a child plan. + GenTezUtils.getUtils().removeBranch(victim); + // at this point we've found the fork in the op pipeline that has the pruning as a child plan. LOG.info("Disabling dynamic pruning for: " + ((DynamicPruningEventDesc) victim.getConf()).getTableScan().toString() + ". Needed to break cyclic dependency"); - curr.removeChild(child); } // Tarjan's algo http://git-wip-us.apache.org/repos/asf/hive/blob/6a299391/ql/src/test/queries/clientpositive/dynamic_partition_pruning_2.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/dynamic_partition_pruning_2.q b/ql/src/test/queries/clientpositive/dynamic_partition_pruning_2.q index 5a7f113..a4e84b1 100644 --- a/ql/src/test/queries/clientpositive/dynamic_partition_pruning_2.q +++ b/ql/src/test/queries/clientpositive/dynamic_partition_pruning_2.q @@ -118,3 +118,20 @@ SELECT amount FROM agg_01, dim_shops WHERE dim_shops_id = id AND label = 'bar'; SELECT amount FROM agg_01, dim_shops WHERE dim_shops_id = id AND label = 'foo' UNION ALL SELECT amount FROM agg_01, dim_shops WHERE dim_shops_id = id AND label = 'bar'; + +set hive.tez.dynamic.partition.pruning.max.event.size=1000000; +set hive.tez.dynamic.partition.pruning.max.data.size=10000; +-- Dynamic partition pruning will be removed as data size exceeds the limit; +-- and for self join on partitioning column, it should not fail (HIVE-10559). +explain +select count(*) +from srcpart s1, + srcpart s2 +where s1.ds = s2.ds +; + +select count(*) +from srcpart s1, + srcpart s2 +where s1.ds = s2.ds +; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/6a299391/ql/src/test/results/clientpositive/tez/dynamic_partition_pruning_2.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientpositive/tez/dynamic_partition_pruning_2.q.out b/ql/src/test/results/clientpositive/tez/dynamic_partition_pruning_2.q.out index 8c8531c..8b0b81d 100644 --- a/ql/src/test/results/clientpositive/tez/dynamic_partition_pruning_2.q.out +++ b/ql/src/test/results/clientpositive/tez/dynamic_partition_pruning_2.q.out @@ -963,3 +963,119 @@ POSTHOOK: Input: default@dim_shops 4 5 6 +PREHOOK: query: -- Dynamic partition pruning will be removed as data size exceeds the limit; +-- and for self join on partitioning column, it should not fail (HIVE-10559). +explain +select count(*) +from srcpart s1, + srcpart s2 +where s1.ds = s2.ds +PREHOOK: type: QUERY +POSTHOOK: query: -- Dynamic partition pruning will be removed as data size exceeds the limit; +-- and for self join on partitioning column, it should not fail (HIVE-10559). +explain +select count(*) +from srcpart s1, + srcpart s2 +where s1.ds = s2.ds +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez + Edges: + Map 1 <- Map 3 (BROADCAST_EDGE) + Reducer 2 <- Map 1 (SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: s1 + filterExpr: ds is not null (type: boolean) + Statistics: Num rows: 2000 Data size: 21248 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: ds (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 2000 Data size: 21248 Basic stats: COMPLETE Column stats: NONE + Map Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 _col0 (type: string) + 1 _col0 (type: string) + input vertices: + 1 Map 3 + Statistics: Num rows: 2200 Data size: 23372 Basic stats: COMPLETE Column stats: NONE + HybridGraceHashJoin: true + Group By Operator + aggregations: count() + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: bigint) + Map 3 + Map Operator Tree: + TableScan + alias: s1 + filterExpr: ds is not null (type: boolean) + Statistics: Num rows: 2000 Data size: 21248 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: ds (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 2000 Data size: 21248 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 2000 Data size: 21248 Basic stats: COMPLETE Column stats: NONE + Reducer 2 + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: select count(*) +from srcpart s1, + srcpart s2 +where s1.ds = s2.ds +PREHOOK: type: QUERY +PREHOOK: Input: default@srcpart +PREHOOK: Input: default@srcpart@ds=2008-04-08/hr=11 +PREHOOK: Input: default@srcpart@ds=2008-04-08/hr=12 +PREHOOK: Input: default@srcpart@ds=2008-04-09/hr=11 +PREHOOK: Input: default@srcpart@ds=2008-04-09/hr=12 +#### A masked pattern was here #### +POSTHOOK: query: select count(*) +from srcpart s1, + srcpart s2 +where s1.ds = s2.ds +POSTHOOK: type: QUERY +POSTHOOK: Input: default@srcpart +POSTHOOK: Input: default@srcpart@ds=2008-04-08/hr=11 +POSTHOOK: Input: default@srcpart@ds=2008-04-08/hr=12 +POSTHOOK: Input: default@srcpart@ds=2008-04-09/hr=11 +POSTHOOK: Input: default@srcpart@ds=2008-04-09/hr=12 +#### A masked pattern was here #### +2000000