Repository: hive
Updated Branches:
  refs/heads/master 6dc245241 -> cb866e894


HIVE-16821: Vectorization: support Explain Analyze in vectorized mode (Gopal V, 
reviewed by Prasanth Jayachandran)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/cb866e89
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/cb866e89
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/cb866e89

Branch: refs/heads/master
Commit: cb866e894bc5cf536ab3ba7b0e1542e8dbda7932
Parents: 6dc2452
Author: Gopal V <gop...@apache.org>
Authored: Wed Jan 24 22:35:26 2018 -0800
Committer: Gopal V <gop...@apache.org>
Committed: Wed Jan 24 22:35:26 2018 -0800

----------------------------------------------------------------------
 .../apache/hadoop/hive/ql/exec/Operator.java    | 46 +++++++++-------
 .../hadoop/hive/ql/exec/TableScanOperator.java  | 35 +++++++-----
 .../VectorReduceSinkCommonOperator.java         |  1 +
 .../optimizer/physical/PhysicalOptimizer.java   |  3 +-
 .../hadoop/hive/ql/parse/TezCompiler.java       |  3 +-
 .../hive/ql/parse/spark/SparkCompiler.java      |  3 +-
 .../queries/clientpositive/explainanalyze_3.q   |  1 +
 .../clientpositive/tez/explainanalyze_3.q.out   | 56 ++++++++++----------
 8 files changed, 83 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/cb866e89/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
index 2462938..199b181 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
@@ -113,9 +113,8 @@ public abstract class Operator<T extends OperatorDesc> 
implements Serializable,C
   private boolean useBucketizedHiveInputFormat;
 
   // Data structures specific for vectorized operators.
-  private int size;
-  private boolean selectedInUse;
-  private int[] selected;
+  private transient boolean multiChildren;
+  private transient int[] selected;
 
   // dummy operator (for not increasing seqId)
   protected Operator(String name, CompilationOpContext cContext) {
@@ -129,8 +128,6 @@ public abstract class Operator<T extends OperatorDesc> 
implements Serializable,C
     childOperators = new ArrayList<Operator<? extends OperatorDesc>>();
     parentOperators = new ArrayList<Operator<? extends OperatorDesc>>();
     abortOp = new AtomicBoolean(false);
-    // Initializing data structures for vectorization
-    selected = new int[VectorizedRowBatch.DEFAULT_SIZE];
   }
 
   public Operator(CompilationOpContext cContext) {
@@ -323,6 +320,9 @@ public abstract class Operator<T extends OperatorDesc> 
implements Serializable,C
     // String className = this.getClass().getName();
 
     this.done = false;
+    this.runTimeNumRows = 0; // initializeOp can be overridden
+    // Initializing data structures for vectorForward
+    this.selected = new int[VectorizedRowBatch.DEFAULT_SIZE];
     if (state == State.INIT) {
       return;
     }
@@ -345,6 +345,7 @@ public abstract class Operator<T extends OperatorDesc> 
implements Serializable,C
     for (int i = 0; i < childOperatorsArray.length; i++) {
       childOperatorsArray[i] = childOperators.get(i);
     }
+    multiChildren = childOperatorsArray.length > 1;
     childOperatorsTag = new int[childOperatorsArray.length];
     for (int i = 0; i < childOperatorsArray.length; i++) {
       List<Operator<? extends OperatorDesc>> parentOperators =
@@ -487,7 +488,6 @@ public abstract class Operator<T extends OperatorDesc> 
implements Serializable,C
   protected void initializeOp(Configuration hconf) throws HiveException {
     this.hconf = hconf;
     rootInitializeCalled = true;
-    runTimeNumRows = 0;
   }
 
   /**
@@ -704,6 +704,12 @@ public abstract class Operator<T extends OperatorDesc> 
implements Serializable,C
     // call the operator specific close routine
     closeOp(abort);
 
+    // closeOp can be overriden
+    if (conf != null && conf.getRuntimeStatsTmpDir() != null) {
+      publishRunTimeStats();
+    }
+    this.runTimeNumRows = 0;
+
     reporter = null;
 
     try {
@@ -733,10 +739,6 @@ public abstract class Operator<T extends OperatorDesc> 
implements Serializable,C
    * should overwrite this funtion for their specific cleanup routine.
    */
   protected void closeOp(boolean abort) throws HiveException {
-    if (conf != null && conf.getRuntimeStatsTmpDir() != null) {
-      publishRunTimeStats();
-    }
-    runTimeNumRows = 0;
   }
 
   private boolean jobCloseDone = false;
@@ -894,26 +896,32 @@ public abstract class Operator<T extends OperatorDesc> 
implements Serializable,C
     forward(row, rowInspector, false);
   }
 
+  protected void forward(VectorizedRowBatch vrg, ObjectInspector rowInspector)
+      throws HiveException {
+    forward(vrg, rowInspector, true);
+  }
+
   protected void forward(Object row, ObjectInspector rowInspector, boolean 
isVectorized)
       throws HiveException {
-    if (isVectorized && getNumChild() > 1) {
+    if (isVectorized) {
       vectorForward((VectorizedRowBatch) row, rowInspector);
-      return;
+    } else {
+      baseForward(row, rowInspector);
     }
-    baseForward(row, rowInspector);
   }
 
   private void vectorForward(VectorizedRowBatch vrg, ObjectInspector 
rowInspector)
       throws HiveException {
-    runTimeNumRows++;
+    this.runTimeNumRows += vrg.count();
     if (getDone()) {
       return;
     }
 
     // Data structures to store original values
-    size = vrg.size;
-    selectedInUse = vrg.selectedInUse;
-    if (vrg.selectedInUse) {
+    final int size = vrg.size;
+    final boolean selectedInUse = vrg.selectedInUse;
+    final boolean saveState = (selectedInUse && multiChildren);
+    if (saveState) {
       System.arraycopy(vrg.selected, 0, selected, 0, size);
     }
 
@@ -927,7 +935,7 @@ public abstract class Operator<T extends OperatorDesc> 
implements Serializable,C
         // Restore original values
         vrg.size = size;
         vrg.selectedInUse = selectedInUse;
-        if (vrg.selectedInUse) {
+        if (saveState) {
           System.arraycopy(selected, 0, vrg.selected, 0, size);
         }
       }
@@ -941,7 +949,7 @@ public abstract class Operator<T extends OperatorDesc> 
implements Serializable,C
 
   private void baseForward(Object row, ObjectInspector rowInspector)
       throws HiveException {
-    runTimeNumRows++;
+    this.runTimeNumRows++;
     if (getDone()) {
       return;
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/cb866e89/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java
index 7e5c724..0799181 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java
@@ -115,18 +115,7 @@ public class TableScanOperator extends 
Operator<TableScanDesc> implements
   @Override
   public void process(Object row, int tag) throws HiveException {
     if (rowLimit >= 0) {
-      if (vectorized) {
-        VectorizedRowBatch batch = (VectorizedRowBatch) row;
-        if (currCount >= rowLimit) {
-          setDone(true);
-          return;
-        }
-        if (currCount + batch.size > rowLimit) {
-          batch.size = rowLimit - currCount;
-        }
-        currCount += batch.size;
-      } else if (currCount++ >= rowLimit) {
-        setDone(true);
+      if (checkSetDone(row, tag)) {
         return;
       }
     }
@@ -136,6 +125,28 @@ public class TableScanOperator extends 
Operator<TableScanDesc> implements
     forward(row, inputObjInspectors[tag], vectorized);
   }
 
+  private boolean checkSetDone(Object row, int tag) {
+    if (row instanceof VectorizedRowBatch) {
+      // We need to check with 'instanceof' instead of just checking
+      // vectorized because the row can be a VectorizedRowBatch when
+      // FetchOptimizer kicks in even if the operator pipeline is not
+      // vectorized
+      VectorizedRowBatch batch = (VectorizedRowBatch) row;
+      if (currCount >= rowLimit) {
+        setDone(true);
+        return true;
+      }
+      if (currCount + batch.size > rowLimit) {
+        batch.size = rowLimit - currCount;
+      }
+      currCount += batch.size;
+    } else if (currCount++ >= rowLimit) {
+      setDone(true);
+      return true;
+    }
+    return false;
+  }
+
   // Change the table partition for collecting stats
   @Override
   public void cleanUpInputFileChangedOp() throws HiveException {

http://git-wip-us.apache.org/repos/asf/hive/blob/cb866e89/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkCommonOperator.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkCommonOperator.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkCommonOperator.java
index 3a6d8c1..8dd7cfe 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkCommonOperator.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkCommonOperator.java
@@ -405,6 +405,7 @@ public abstract class VectorReduceSinkCommonOperator 
extends TerminalOperator<Re
     if (LOG.isInfoEnabled()) {
       LOG.info(toString() + ": records written - " + numRows);
     }
+    this.runTimeNumRows = numRows;
     recordCounter.set(numRows);
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/cb866e89/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java
index 5571826..a64a498 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java
@@ -88,8 +88,7 @@ public class PhysicalOptimizer {
     // Vectorization should be the last optimization, because it doesn't 
modify the plan
     // or any operators. It makes a very low level transformation to the 
expressions to
     // run in the vectorized mode.
-    if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED)
-        && pctx.getContext().getExplainAnalyze() == null) {
+    if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED)) {
       resolvers.add(new Vectorizer());
     }
     if 
(!"none".equalsIgnoreCase(hiveConf.getVar(HiveConf.ConfVars.HIVESTAGEIDREARRANGE)))
 {

http://git-wip-us.apache.org/repos/asf/hive/blob/cb866e89/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
index 6e274d1..f9a6386 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
@@ -669,8 +669,7 @@ public class TezCompiler extends TaskCompiler {
       LOG.debug("Skipping llap pre-vectorization pass");
     }
 
-    if (conf.getBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED)
-        && ctx.getExplainAnalyze() == null) {
+    if (conf.getBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED)) {
       physicalCtx = new Vectorizer().resolve(physicalCtx);
     } else {
       LOG.debug("Skipping vectorization");

http://git-wip-us.apache.org/repos/asf/hive/blob/cb866e89/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java
index 5220281..08e7f43 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java
@@ -588,8 +588,7 @@ public class SparkCompiler extends TaskCompiler {
       LOG.debug("Skipping cross product analysis");
     }
 
-    if (conf.getBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED)
-        && ctx.getExplainAnalyze() == null) {
+    if (conf.getBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED)) {
       (new Vectorizer()).resolve(physicalCtx);
     } else {
       LOG.debug("Skipping vectorization");

http://git-wip-us.apache.org/repos/asf/hive/blob/cb866e89/ql/src/test/queries/clientpositive/explainanalyze_3.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/explainanalyze_3.q 
b/ql/src/test/queries/clientpositive/explainanalyze_3.q
index d7773fc..8011124 100644
--- a/ql/src/test/queries/clientpositive/explainanalyze_3.q
+++ b/ql/src/test/queries/clientpositive/explainanalyze_3.q
@@ -12,6 +12,7 @@ set 
hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
 
 set hive.exec.dynamic.partition.mode=nonstrict;
 set hive.vectorized.execution.enabled=true;
+set hive.llap.io.enabled=false;
 
 explain analyze select key, value
 FROM srcpart LATERAL VIEW explode(array(1,2,3)) myTable AS myCol;

http://git-wip-us.apache.org/repos/asf/hive/blob/cb866e89/ql/src/test/results/clientpositive/tez/explainanalyze_3.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/tez/explainanalyze_3.q.out 
b/ql/src/test/results/clientpositive/tez/explainanalyze_3.q.out
index 444ff12..a1ea37a 100644
--- a/ql/src/test/results/clientpositive/tez/explainanalyze_3.q.out
+++ b/ql/src/test/results/clientpositive/tez/explainanalyze_3.q.out
@@ -350,10 +350,10 @@ Stage-3
         Stage-2
           Dependency Collection{}
             Stage-1
-              Map 1
-              File Output Operator [FS_2]
+              Map 1 vectorized
+              File Output Operator [FS_4]
                 table:{"name:":"default.src_autho_test"}
-                Select Operator [SEL_1] (rows=500/500 width=178)
+                Select Operator [SEL_3] (rows=500/500 width=178)
                   Output:["_col0","_col1"]
                   TableScan [TS_0] (rows=500/500 width=178)
                     
default@src,src,Tbl:COMPLETE,Col:COMPLETE,Output:["key","value"]
@@ -610,15 +610,15 @@ Stage-0
   Fetch Operator
     limit:5
     Stage-1
-      Reducer 2
-      File Output Operator [FS_5]
-        Limit [LIM_4] (rows=5/5 width=178)
+      Reducer 2 vectorized
+      File Output Operator [FS_10]
+        Limit [LIM_9] (rows=5/5 width=178)
           Number of rows:5
-          Select Operator [SEL_3] (rows=500/5 width=178)
+          Select Operator [SEL_8] (rows=500/5 width=178)
             Output:["_col0","_col1"]
-          <-Map 1 [SIMPLE_EDGE]
-            SHUFFLE [RS_2]
-              Select Operator [SEL_1] (rows=500/500 width=178)
+          <-Map 1 [SIMPLE_EDGE] vectorized
+            SHUFFLE [RS_7]
+              Select Operator [SEL_6] (rows=500/500 width=178)
                 Output:["_col0","_col1"]
                 TableScan [TS_0] (rows=500/500 width=178)
                   
default@src,src,Tbl:COMPLETE,Col:COMPLETE,Output:["key","value"]
@@ -672,19 +672,19 @@ Stage-3
                       File Output Operator [FS_5]
                         Group By Operator [GBY_3] (rows=1/1 width=2760)
                           
Output:["_col0","_col1","_col2","_col3","_col4"],aggregations:["compute_stats(VALUE._col0,
 'hll')","compute_stats(VALUE._col2, 'hll')","compute_stats(VALUE._col3, 
'hll')","compute_stats(VALUE._col4, 'hll')","compute_stats(VALUE._col5, 'hll')"]
-                        <-Map 1 [CUSTOM_SIMPLE_EDGE]
-                          File Output Operator [FS_3]
+                        <-Map 1 [CUSTOM_SIMPLE_EDGE] vectorized
+                          File Output Operator [FS_8]
                             table:{"name:":"default.orc_merge5"}
-                            Select Operator [SEL_2] (rows=1/3 width=352)
+                            Select Operator [SEL_7] (rows=1/3 width=352)
                               Output:["_col0","_col1","_col2","_col3","_col4"]
-                              Filter Operator [FIL_4] (rows=1/3 width=352)
+                              Filter Operator [FIL_6] (rows=1/3 width=352)
                                 predicate:(userid <= 13)
                                 TableScan [TS_0] (rows=1/15000 width=352)
                                   
default@orc_merge5,orc_merge5,Tbl:COMPLETE,Col:NONE,Output:["userid","string1","subtype","decimal1","ts"]
-                          PARTITION_ONLY_SHUFFLE [RS_2]
-                            Select Operator [SEL_1] (rows=1/3 width=352)
+                          PARTITION_ONLY_SHUFFLE [RS_7]
+                            Select Operator [SEL_6] (rows=1/3 width=352)
                               
Output:["userid","string1","subtype","decimal1","ts"]
-                               Please refer to the previous Select Operator 
[SEL_2]
+                               Please refer to the previous Select Operator 
[SEL_7]
             Stage-4(CONDITIONAL)
               File Merge
                  Please refer to the previous Stage-8(CONDITIONAL CHILD TASKS: 
Stage-5, Stage-4, Stage-6)
@@ -845,24 +845,24 @@ Stage-0
   Fetch Operator
     limit:-1
     Stage-1
-      Map 2
-      File Output Operator [FS_10]
-        Select Operator [SEL_9] (rows=391/480 width=186)
+      Map 2 vectorized
+      File Output Operator [FS_34]
+        Select Operator [SEL_33] (rows=391/480 width=186)
           Output:["_col0","_col1","_col2"]
-          Map Join Operator [MAPJOIN_25] (rows=391/480 width=186)
-            
BucketMapJoin:true,Conds:RS_6._col0=SEL_5._col0(Inner),HybridGraceHashJoin:true,Output:["_col0","_col1","_col3"]
-          <-Map 1 [CUSTOM_EDGE]
-            MULTICAST [RS_6]
+          Map Join Operator [MAPJOIN_32] (rows=391/480 width=186)
+            
BucketMapJoin:true,Conds:RS_29._col0=SEL_31._col0(Inner),HybridGraceHashJoin:true,Output:["_col0","_col1","_col3"]
+          <-Map 1 [CUSTOM_EDGE] vectorized
+            MULTICAST [RS_29]
               PartitionCols:_col0
-              Select Operator [SEL_2] (rows=242/242 width=95)
+              Select Operator [SEL_28] (rows=242/242 width=95)
                 Output:["_col0","_col1"]
-                Filter Operator [FIL_13] (rows=242/242 width=95)
+                Filter Operator [FIL_27] (rows=242/242 width=95)
                   predicate:key is not null
                   TableScan [TS_0] (rows=242/242 width=95)
                     
default@tab,a,Tbl:COMPLETE,Col:COMPLETE,Output:["key","value"]
-          <-Select Operator [SEL_5] (rows=500/500 width=95)
+          <-Select Operator [SEL_31] (rows=500/500 width=95)
               Output:["_col0","_col1"]
-              Filter Operator [FIL_14] (rows=500/500 width=95)
+              Filter Operator [FIL_30] (rows=500/500 width=95)
                 predicate:key is not null
                 TableScan [TS_3] (rows=500/500 width=95)
                   
default@tab_part,b,Tbl:COMPLETE,Col:COMPLETE,Output:["key","value"]

Reply via email to