Repository: hive
Updated Branches:
  refs/heads/master 2b9f2f5e2 -> 6a2993912


HIVE-10559: IndexOutOfBoundsException with RemoveDynamicPruningBySize (Wei 
Zhang via Gunther Hagleitner)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/6a299391
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/6a299391
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/6a299391

Branch: refs/heads/master
Commit: 6a2993912730702edb68dd37acee7ec0af4e6d19
Parents: 2b9f2f5
Author: Gunther Hagleitner <gunt...@apache.org>
Authored: Thu May 14 15:48:34 2015 -0700
Committer: Gunther Hagleitner <gunt...@apache.org>
Committed: Thu May 14 15:48:34 2015 -0700

----------------------------------------------------------------------
 .../hive/ql/optimizer/ConvertJoinMapJoin.java   |  25 ++--
 .../optimizer/RemoveDynamicPruningBySize.java   |  19 ++-
 .../hadoop/hive/ql/parse/GenTezUtils.java       |  16 +++
 .../hive/ql/parse/OptimizeTezProcContext.java   |  14 +++
 .../hadoop/hive/ql/parse/TezCompiler.java       |  19 ++-
 .../dynamic_partition_pruning_2.q               |  17 +++
 .../tez/dynamic_partition_pruning_2.q.out       | 116 +++++++++++++++++++
 7 files changed, 196 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/6a299391/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java 
b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
index d42b643..bcffdbc 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
@@ -636,13 +636,26 @@ public class ConvertJoinMapJoin implements NodeProcessor {
         // we might have generated a dynamic partition operator chain. Since
         // we're removing the reduce sink we need do remove that too.
         Set<Operator<?>> dynamicPartitionOperators = new 
HashSet<Operator<?>>();
+        Map<Operator<?>, AppMasterEventOperator> opEventPairs = new 
HashMap<>();
         for (Operator<?> c : p.getChildOperators()) {
-          if (hasDynamicPartitionBroadcast(c)) {
+          AppMasterEventOperator event = findDynamicPartitionBroadcast(c);
+          if (event != null) {
             dynamicPartitionOperators.add(c);
+            opEventPairs.put(c, event);
           }
         }
         for (Operator<?> c : dynamicPartitionOperators) {
-          p.removeChild(c);
+          if (context.pruningOpsRemovedByPriorOpt.isEmpty() ||
+              
!context.pruningOpsRemovedByPriorOpt.contains(opEventPairs.get(c))) {
+            p.removeChild(c);
+            // at this point we've found the fork in the op pipeline that has 
the pruning as a child plan.
+            LOG.info("Disabling dynamic pruning for: "
+                + ((DynamicPruningEventDesc) 
opEventPairs.get(c).getConf()).getTableScan().getName()
+                + ". Need to be removed together with reduce sink");
+          }
+        }
+        for (Operator<?> op : dynamicPartitionOperators) {
+          context.pruningOpsRemovedByPriorOpt.add(opEventPairs.get(op));
         }
       }
       mapJoinOp.getParentOperators().remove(bigTablePosition);
@@ -662,15 +675,13 @@ public class ConvertJoinMapJoin implements NodeProcessor {
     return mapJoinOp;
   }
 
-  private boolean hasDynamicPartitionBroadcast(Operator<?> parent) {
-    boolean hasDynamicPartitionPruning = false;
+  private AppMasterEventOperator findDynamicPartitionBroadcast(Operator<?> 
parent) {
 
     for (Operator<?> op : parent.getChildOperators()) {
       while (op != null) {
         if (op instanceof AppMasterEventOperator && op.getConf() instanceof 
DynamicPruningEventDesc) {
           // found dynamic partition pruning operator
-          hasDynamicPartitionPruning = true;
-          break;
+          return (AppMasterEventOperator)op;
         }
         if (op instanceof ReduceSinkOperator || op instanceof 
FileSinkOperator) {
           // crossing reduce sink or file sink means the pruning isn't for 
this parent.
@@ -686,6 +697,6 @@ public class ConvertJoinMapJoin implements NodeProcessor {
       }
     }
 
-    return hasDynamicPartitionPruning;
+    return null;
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/6a299391/ql/src/java/org/apache/hadoop/hive/ql/optimizer/RemoveDynamicPruningBySize.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/RemoveDynamicPruningBySize.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/RemoveDynamicPruningBySize.java
index 4803959..5d01311 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/RemoveDynamicPruningBySize.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/RemoveDynamicPruningBySize.java
@@ -24,10 +24,10 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.ql.exec.AppMasterEventOperator;
-import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.lib.Node;
 import org.apache.hadoop.hive.ql.lib.NodeProcessor;
 import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.parse.GenTezUtils;
 import org.apache.hadoop.hive.ql.parse.OptimizeTezProcContext;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.plan.AppMasterEventDesc;
@@ -52,20 +52,15 @@ public class RemoveDynamicPruningBySize implements 
NodeProcessor {
     AppMasterEventDesc desc = event.getConf();
 
     if (desc.getStatistics().getDataSize() > context.conf
-        .getLongVar(ConfVars.TEZ_DYNAMIC_PARTITION_PRUNING_MAX_DATA_SIZE)) {
-      Operator<?> child = event;
-      Operator<?> curr = event;
-
-      while (curr.getChildOperators().size() <= 1) {
-        child = curr;
-        curr = curr.getParentOperators().get(0);
-      }
-      // at this point we've found the fork in the op pipeline that has the
-      // pruning as a child plan.
+        .getLongVar(ConfVars.TEZ_DYNAMIC_PARTITION_PRUNING_MAX_DATA_SIZE) &&
+        (context.pruningOpsRemovedByPriorOpt.isEmpty() ||
+         !context.pruningOpsRemovedByPriorOpt.contains(event))) {
+      context.pruningOpsRemovedByPriorOpt.add(event);
+      GenTezUtils.getUtils().removeBranch(event);
+      // at this point we've found the fork in the op pipeline that has the 
pruning as a child plan.
       LOG.info("Disabling dynamic pruning for: "
           + ((DynamicPruningEventDesc) desc).getTableScan().getName()
           + ". Expected data size is too big: " + 
desc.getStatistics().getDataSize());
-      curr.removeChild(child);
     }
     return false;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/6a299391/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
index 241e9d7..0edfc5d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
@@ -461,4 +461,20 @@ public class GenTezUtils {
       findRoots(p, ops);
     }
   }
+
+  /**
+   * Remove an operator branch. When we see a fork, we know it's time to do 
the removal.
+   * @param event the leaf node of which branch to be removed
+   */
+  public void removeBranch(AppMasterEventOperator event) {
+    Operator<?> child = event;
+    Operator<?> curr = event;
+
+    while (curr.getChildOperators().size() <= 1) {
+      child = curr;
+      curr = curr.getParentOperators().get(0);
+    }
+
+    curr.removeChild(child);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/6a299391/ql/src/java/org/apache/hadoop/hive/ql/parse/OptimizeTezProcContext.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/OptimizeTezProcContext.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/OptimizeTezProcContext.java
index ee71971..e58eb66 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/OptimizeTezProcContext.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/OptimizeTezProcContext.java
@@ -49,6 +49,19 @@ public class OptimizeTezProcContext implements 
NodeProcessorCtx{
   public final Set<ReadEntity> inputs;
   public final Set<WriteEntity> outputs;
 
+  /* Two of the optimization rules, ConvertJoinMapJoin and 
RemoveDynamicPruningBySize, are put into
+     stats dependent optimizations and run together in TezCompiler. There's no 
guarantee which one
+     runs first, but in either case, the prior one may have removed a chain 
which the latter one is
+     not aware of. So we need to remember the leaf node(s) of that chain so it 
can be skipped.
+
+     For example, as ConvertJoinMapJoin is removing the reduce sink, it may 
also have removed a
+     dynamic partition pruning operator chain. However, 
RemoveDynamicPruningBySize doesn't know this
+     and still tries to traverse that removed chain which will cause NPE.
+
+     This may also happen when RemoveDynamicPruningBySize happens first.
+    */
+  public HashSet<AppMasterEventOperator> pruningOpsRemovedByPriorOpt;
+
   public final Set<ReduceSinkOperator> visitedReduceSinks
     = new HashSet<ReduceSinkOperator>();
 
@@ -66,6 +79,7 @@ public class OptimizeTezProcContext implements 
NodeProcessorCtx{
     this.parseContext = parseContext;
     this.inputs = inputs;
     this.outputs = outputs;
+    this.pruningOpsRemovedByPriorOpt = new HashSet<AppMasterEventOperator>();
   }
 
   public void setRootOperators(Deque<Operator<? extends OperatorDesc>> roots) {

http://git-wip-us.apache.org/repos/asf/hive/blob/6a299391/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
index ea12990..56707af 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
@@ -149,7 +149,7 @@ public class TezCompiler extends TaskCompiler {
         if (component.size() != 1) {
           LOG.info("Found cycle in operator plan...");
           cycleFree = false;
-          removeEventOperator(component);
+          removeEventOperator(component, procCtx);
           break;
         }
       }
@@ -157,7 +157,7 @@ public class TezCompiler extends TaskCompiler {
     }
   }
 
-  private void removeEventOperator(Set<Operator<?>> component) {
+  private void removeEventOperator(Set<Operator<?>> component, 
OptimizeTezProcContext context) {
     AppMasterEventOperator victim = null;
     for (Operator<?> o : component) {
       if (o instanceof AppMasterEventOperator) {
@@ -169,20 +169,17 @@ public class TezCompiler extends TaskCompiler {
       }
     }
 
-    Operator<?> child = victim;
-    Operator<?> curr = victim;
-
-    while (curr.getChildOperators().size() <= 1) {
-      child = curr;
-      curr = curr.getParentOperators().get(0);
+    if (victim == null ||
+        (!context.pruningOpsRemovedByPriorOpt.isEmpty() &&
+         context.pruningOpsRemovedByPriorOpt.contains(victim))) {
+      return;
     }
 
-    // at this point we've found the fork in the op pipeline that has the
-    // pruning as a child plan.
+    GenTezUtils.getUtils().removeBranch(victim);
+    // at this point we've found the fork in the op pipeline that has the 
pruning as a child plan.
     LOG.info("Disabling dynamic pruning for: "
         + ((DynamicPruningEventDesc) 
victim.getConf()).getTableScan().toString()
         + ". Needed to break cyclic dependency");
-    curr.removeChild(child);
   }
 
   // Tarjan's algo

http://git-wip-us.apache.org/repos/asf/hive/blob/6a299391/ql/src/test/queries/clientpositive/dynamic_partition_pruning_2.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/dynamic_partition_pruning_2.q 
b/ql/src/test/queries/clientpositive/dynamic_partition_pruning_2.q
index 5a7f113..a4e84b1 100644
--- a/ql/src/test/queries/clientpositive/dynamic_partition_pruning_2.q
+++ b/ql/src/test/queries/clientpositive/dynamic_partition_pruning_2.q
@@ -118,3 +118,20 @@ SELECT amount FROM agg_01, dim_shops WHERE dim_shops_id = 
id AND label = 'bar';
 SELECT amount FROM agg_01, dim_shops WHERE dim_shops_id = id AND label = 'foo'
 UNION ALL
 SELECT amount FROM agg_01, dim_shops WHERE dim_shops_id = id AND label = 'bar';
+
+set hive.tez.dynamic.partition.pruning.max.event.size=1000000;
+set hive.tez.dynamic.partition.pruning.max.data.size=10000;
+-- Dynamic partition pruning will be removed as data size exceeds the limit;
+-- and for self join on partitioning column, it should not fail (HIVE-10559).
+explain
+select count(*)
+from srcpart s1,
+     srcpart s2
+where s1.ds = s2.ds
+;
+
+select count(*)
+from srcpart s1,
+     srcpart s2
+where s1.ds = s2.ds
+;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/6a299391/ql/src/test/results/clientpositive/tez/dynamic_partition_pruning_2.q.out
----------------------------------------------------------------------
diff --git 
a/ql/src/test/results/clientpositive/tez/dynamic_partition_pruning_2.q.out 
b/ql/src/test/results/clientpositive/tez/dynamic_partition_pruning_2.q.out
index 8c8531c..8b0b81d 100644
--- a/ql/src/test/results/clientpositive/tez/dynamic_partition_pruning_2.q.out
+++ b/ql/src/test/results/clientpositive/tez/dynamic_partition_pruning_2.q.out
@@ -963,3 +963,119 @@ POSTHOOK: Input: default@dim_shops
 4
 5
 6
+PREHOOK: query: -- Dynamic partition pruning will be removed as data size 
exceeds the limit;
+-- and for self join on partitioning column, it should not fail (HIVE-10559).
+explain
+select count(*)
+from srcpart s1,
+     srcpart s2
+where s1.ds = s2.ds
+PREHOOK: type: QUERY
+POSTHOOK: query: -- Dynamic partition pruning will be removed as data size 
exceeds the limit;
+-- and for self join on partitioning column, it should not fail (HIVE-10559).
+explain
+select count(*)
+from srcpart s1,
+     srcpart s2
+where s1.ds = s2.ds
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+  Stage-1 is a root stage
+  Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+  Stage: Stage-1
+    Tez
+      Edges:
+        Map 1 <- Map 3 (BROADCAST_EDGE)
+        Reducer 2 <- Map 1 (SIMPLE_EDGE)
+#### A masked pattern was here ####
+      Vertices:
+        Map 1 
+            Map Operator Tree:
+                TableScan
+                  alias: s1
+                  filterExpr: ds is not null (type: boolean)
+                  Statistics: Num rows: 2000 Data size: 21248 Basic stats: 
COMPLETE Column stats: NONE
+                  Select Operator
+                    expressions: ds (type: string)
+                    outputColumnNames: _col0
+                    Statistics: Num rows: 2000 Data size: 21248 Basic stats: 
COMPLETE Column stats: NONE
+                    Map Join Operator
+                      condition map:
+                           Inner Join 0 to 1
+                      keys:
+                        0 _col0 (type: string)
+                        1 _col0 (type: string)
+                      input vertices:
+                        1 Map 3
+                      Statistics: Num rows: 2200 Data size: 23372 Basic stats: 
COMPLETE Column stats: NONE
+                      HybridGraceHashJoin: true
+                      Group By Operator
+                        aggregations: count()
+                        mode: hash
+                        outputColumnNames: _col0
+                        Statistics: Num rows: 1 Data size: 8 Basic stats: 
COMPLETE Column stats: NONE
+                        Reduce Output Operator
+                          sort order: 
+                          Statistics: Num rows: 1 Data size: 8 Basic stats: 
COMPLETE Column stats: NONE
+                          value expressions: _col0 (type: bigint)
+        Map 3 
+            Map Operator Tree:
+                TableScan
+                  alias: s1
+                  filterExpr: ds is not null (type: boolean)
+                  Statistics: Num rows: 2000 Data size: 21248 Basic stats: 
COMPLETE Column stats: NONE
+                  Select Operator
+                    expressions: ds (type: string)
+                    outputColumnNames: _col0
+                    Statistics: Num rows: 2000 Data size: 21248 Basic stats: 
COMPLETE Column stats: NONE
+                    Reduce Output Operator
+                      key expressions: _col0 (type: string)
+                      sort order: +
+                      Map-reduce partition columns: _col0 (type: string)
+                      Statistics: Num rows: 2000 Data size: 21248 Basic stats: 
COMPLETE Column stats: NONE
+        Reducer 2 
+            Reduce Operator Tree:
+              Group By Operator
+                aggregations: count(VALUE._col0)
+                mode: mergepartial
+                outputColumnNames: _col0
+                Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE 
Column stats: NONE
+                File Output Operator
+                  compressed: false
+                  Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE 
Column stats: NONE
+                  table:
+                      input format: org.apache.hadoop.mapred.TextInputFormat
+                      output format: 
org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+                      serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+
+  Stage: Stage-0
+    Fetch Operator
+      limit: -1
+      Processor Tree:
+        ListSink
+
+PREHOOK: query: select count(*)
+from srcpart s1,
+     srcpart s2
+where s1.ds = s2.ds
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcpart
+PREHOOK: Input: default@srcpart@ds=2008-04-08/hr=11
+PREHOOK: Input: default@srcpart@ds=2008-04-08/hr=12
+PREHOOK: Input: default@srcpart@ds=2008-04-09/hr=11
+PREHOOK: Input: default@srcpart@ds=2008-04-09/hr=12
+#### A masked pattern was here ####
+POSTHOOK: query: select count(*)
+from srcpart s1,
+     srcpart s2
+where s1.ds = s2.ds
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcpart
+POSTHOOK: Input: default@srcpart@ds=2008-04-08/hr=11
+POSTHOOK: Input: default@srcpart@ds=2008-04-08/hr=12
+POSTHOOK: Input: default@srcpart@ds=2008-04-09/hr=11
+POSTHOOK: Input: default@srcpart@ds=2008-04-09/hr=12
+#### A masked pattern was here ####
+2000000

Reply via email to