Repository: hive Updated Branches: refs/heads/master 6dc245241 -> cb866e894
HIVE-16821: Vectorization: support Explain Analyze in vectorized mode (Gopal V, reviewed by Prasanth Jayachandran) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/cb866e89 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/cb866e89 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/cb866e89 Branch: refs/heads/master Commit: cb866e894bc5cf536ab3ba7b0e1542e8dbda7932 Parents: 6dc2452 Author: Gopal V <gop...@apache.org> Authored: Wed Jan 24 22:35:26 2018 -0800 Committer: Gopal V <gop...@apache.org> Committed: Wed Jan 24 22:35:26 2018 -0800 ---------------------------------------------------------------------- .../apache/hadoop/hive/ql/exec/Operator.java | 46 +++++++++------- .../hadoop/hive/ql/exec/TableScanOperator.java | 35 +++++++----- .../VectorReduceSinkCommonOperator.java | 1 + .../optimizer/physical/PhysicalOptimizer.java | 3 +- .../hadoop/hive/ql/parse/TezCompiler.java | 3 +- .../hive/ql/parse/spark/SparkCompiler.java | 3 +- .../queries/clientpositive/explainanalyze_3.q | 1 + .../clientpositive/tez/explainanalyze_3.q.out | 56 ++++++++++---------- 8 files changed, 83 insertions(+), 65 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/cb866e89/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java index 2462938..199b181 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java @@ -113,9 +113,8 @@ public abstract class Operator<T extends OperatorDesc> implements Serializable,C private boolean useBucketizedHiveInputFormat; // Data structures specific for vectorized operators. - private int size; - private boolean selectedInUse; - private int[] selected; + private transient boolean multiChildren; + private transient int[] selected; // dummy operator (for not increasing seqId) protected Operator(String name, CompilationOpContext cContext) { @@ -129,8 +128,6 @@ public abstract class Operator<T extends OperatorDesc> implements Serializable,C childOperators = new ArrayList<Operator<? extends OperatorDesc>>(); parentOperators = new ArrayList<Operator<? extends OperatorDesc>>(); abortOp = new AtomicBoolean(false); - // Initializing data structures for vectorization - selected = new int[VectorizedRowBatch.DEFAULT_SIZE]; } public Operator(CompilationOpContext cContext) { @@ -323,6 +320,9 @@ public abstract class Operator<T extends OperatorDesc> implements Serializable,C // String className = this.getClass().getName(); this.done = false; + this.runTimeNumRows = 0; // initializeOp can be overridden + // Initializing data structures for vectorForward + this.selected = new int[VectorizedRowBatch.DEFAULT_SIZE]; if (state == State.INIT) { return; } @@ -345,6 +345,7 @@ public abstract class Operator<T extends OperatorDesc> implements Serializable,C for (int i = 0; i < childOperatorsArray.length; i++) { childOperatorsArray[i] = childOperators.get(i); } + multiChildren = childOperatorsArray.length > 1; childOperatorsTag = new int[childOperatorsArray.length]; for (int i = 0; i < childOperatorsArray.length; i++) { List<Operator<? extends OperatorDesc>> parentOperators = @@ -487,7 +488,6 @@ public abstract class Operator<T extends OperatorDesc> implements Serializable,C protected void initializeOp(Configuration hconf) throws HiveException { this.hconf = hconf; rootInitializeCalled = true; - runTimeNumRows = 0; } /** @@ -704,6 +704,12 @@ public abstract class Operator<T extends OperatorDesc> implements Serializable,C // call the operator specific close routine closeOp(abort); + // closeOp can be overriden + if (conf != null && conf.getRuntimeStatsTmpDir() != null) { + publishRunTimeStats(); + } + this.runTimeNumRows = 0; + reporter = null; try { @@ -733,10 +739,6 @@ public abstract class Operator<T extends OperatorDesc> implements Serializable,C * should overwrite this funtion for their specific cleanup routine. */ protected void closeOp(boolean abort) throws HiveException { - if (conf != null && conf.getRuntimeStatsTmpDir() != null) { - publishRunTimeStats(); - } - runTimeNumRows = 0; } private boolean jobCloseDone = false; @@ -894,26 +896,32 @@ public abstract class Operator<T extends OperatorDesc> implements Serializable,C forward(row, rowInspector, false); } + protected void forward(VectorizedRowBatch vrg, ObjectInspector rowInspector) + throws HiveException { + forward(vrg, rowInspector, true); + } + protected void forward(Object row, ObjectInspector rowInspector, boolean isVectorized) throws HiveException { - if (isVectorized && getNumChild() > 1) { + if (isVectorized) { vectorForward((VectorizedRowBatch) row, rowInspector); - return; + } else { + baseForward(row, rowInspector); } - baseForward(row, rowInspector); } private void vectorForward(VectorizedRowBatch vrg, ObjectInspector rowInspector) throws HiveException { - runTimeNumRows++; + this.runTimeNumRows += vrg.count(); if (getDone()) { return; } // Data structures to store original values - size = vrg.size; - selectedInUse = vrg.selectedInUse; - if (vrg.selectedInUse) { + final int size = vrg.size; + final boolean selectedInUse = vrg.selectedInUse; + final boolean saveState = (selectedInUse && multiChildren); + if (saveState) { System.arraycopy(vrg.selected, 0, selected, 0, size); } @@ -927,7 +935,7 @@ public abstract class Operator<T extends OperatorDesc> implements Serializable,C // Restore original values vrg.size = size; vrg.selectedInUse = selectedInUse; - if (vrg.selectedInUse) { + if (saveState) { System.arraycopy(selected, 0, vrg.selected, 0, size); } } @@ -941,7 +949,7 @@ public abstract class Operator<T extends OperatorDesc> implements Serializable,C private void baseForward(Object row, ObjectInspector rowInspector) throws HiveException { - runTimeNumRows++; + this.runTimeNumRows++; if (getDone()) { return; } http://git-wip-us.apache.org/repos/asf/hive/blob/cb866e89/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java index 7e5c724..0799181 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java @@ -115,18 +115,7 @@ public class TableScanOperator extends Operator<TableScanDesc> implements @Override public void process(Object row, int tag) throws HiveException { if (rowLimit >= 0) { - if (vectorized) { - VectorizedRowBatch batch = (VectorizedRowBatch) row; - if (currCount >= rowLimit) { - setDone(true); - return; - } - if (currCount + batch.size > rowLimit) { - batch.size = rowLimit - currCount; - } - currCount += batch.size; - } else if (currCount++ >= rowLimit) { - setDone(true); + if (checkSetDone(row, tag)) { return; } } @@ -136,6 +125,28 @@ public class TableScanOperator extends Operator<TableScanDesc> implements forward(row, inputObjInspectors[tag], vectorized); } + private boolean checkSetDone(Object row, int tag) { + if (row instanceof VectorizedRowBatch) { + // We need to check with 'instanceof' instead of just checking + // vectorized because the row can be a VectorizedRowBatch when + // FetchOptimizer kicks in even if the operator pipeline is not + // vectorized + VectorizedRowBatch batch = (VectorizedRowBatch) row; + if (currCount >= rowLimit) { + setDone(true); + return true; + } + if (currCount + batch.size > rowLimit) { + batch.size = rowLimit - currCount; + } + currCount += batch.size; + } else if (currCount++ >= rowLimit) { + setDone(true); + return true; + } + return false; + } + // Change the table partition for collecting stats @Override public void cleanUpInputFileChangedOp() throws HiveException { http://git-wip-us.apache.org/repos/asf/hive/blob/cb866e89/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkCommonOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkCommonOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkCommonOperator.java index 3a6d8c1..8dd7cfe 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkCommonOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkCommonOperator.java @@ -405,6 +405,7 @@ public abstract class VectorReduceSinkCommonOperator extends TerminalOperator<Re if (LOG.isInfoEnabled()) { LOG.info(toString() + ": records written - " + numRows); } + this.runTimeNumRows = numRows; recordCounter.set(numRows); } http://git-wip-us.apache.org/repos/asf/hive/blob/cb866e89/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java index 5571826..a64a498 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java @@ -88,8 +88,7 @@ public class PhysicalOptimizer { // Vectorization should be the last optimization, because it doesn't modify the plan // or any operators. It makes a very low level transformation to the expressions to // run in the vectorized mode. - if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED) - && pctx.getContext().getExplainAnalyze() == null) { + if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED)) { resolvers.add(new Vectorizer()); } if (!"none".equalsIgnoreCase(hiveConf.getVar(HiveConf.ConfVars.HIVESTAGEIDREARRANGE))) { http://git-wip-us.apache.org/repos/asf/hive/blob/cb866e89/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java index 6e274d1..f9a6386 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java @@ -669,8 +669,7 @@ public class TezCompiler extends TaskCompiler { LOG.debug("Skipping llap pre-vectorization pass"); } - if (conf.getBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED) - && ctx.getExplainAnalyze() == null) { + if (conf.getBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED)) { physicalCtx = new Vectorizer().resolve(physicalCtx); } else { LOG.debug("Skipping vectorization"); http://git-wip-us.apache.org/repos/asf/hive/blob/cb866e89/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java index 5220281..08e7f43 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java @@ -588,8 +588,7 @@ public class SparkCompiler extends TaskCompiler { LOG.debug("Skipping cross product analysis"); } - if (conf.getBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED) - && ctx.getExplainAnalyze() == null) { + if (conf.getBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED)) { (new Vectorizer()).resolve(physicalCtx); } else { LOG.debug("Skipping vectorization"); http://git-wip-us.apache.org/repos/asf/hive/blob/cb866e89/ql/src/test/queries/clientpositive/explainanalyze_3.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/explainanalyze_3.q b/ql/src/test/queries/clientpositive/explainanalyze_3.q index d7773fc..8011124 100644 --- a/ql/src/test/queries/clientpositive/explainanalyze_3.q +++ b/ql/src/test/queries/clientpositive/explainanalyze_3.q @@ -12,6 +12,7 @@ set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager; set hive.exec.dynamic.partition.mode=nonstrict; set hive.vectorized.execution.enabled=true; +set hive.llap.io.enabled=false; explain analyze select key, value FROM srcpart LATERAL VIEW explode(array(1,2,3)) myTable AS myCol; http://git-wip-us.apache.org/repos/asf/hive/blob/cb866e89/ql/src/test/results/clientpositive/tez/explainanalyze_3.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientpositive/tez/explainanalyze_3.q.out b/ql/src/test/results/clientpositive/tez/explainanalyze_3.q.out index 444ff12..a1ea37a 100644 --- a/ql/src/test/results/clientpositive/tez/explainanalyze_3.q.out +++ b/ql/src/test/results/clientpositive/tez/explainanalyze_3.q.out @@ -350,10 +350,10 @@ Stage-3 Stage-2 Dependency Collection{} Stage-1 - Map 1 - File Output Operator [FS_2] + Map 1 vectorized + File Output Operator [FS_4] table:{"name:":"default.src_autho_test"} - Select Operator [SEL_1] (rows=500/500 width=178) + Select Operator [SEL_3] (rows=500/500 width=178) Output:["_col0","_col1"] TableScan [TS_0] (rows=500/500 width=178) default@src,src,Tbl:COMPLETE,Col:COMPLETE,Output:["key","value"] @@ -610,15 +610,15 @@ Stage-0 Fetch Operator limit:5 Stage-1 - Reducer 2 - File Output Operator [FS_5] - Limit [LIM_4] (rows=5/5 width=178) + Reducer 2 vectorized + File Output Operator [FS_10] + Limit [LIM_9] (rows=5/5 width=178) Number of rows:5 - Select Operator [SEL_3] (rows=500/5 width=178) + Select Operator [SEL_8] (rows=500/5 width=178) Output:["_col0","_col1"] - <-Map 1 [SIMPLE_EDGE] - SHUFFLE [RS_2] - Select Operator [SEL_1] (rows=500/500 width=178) + <-Map 1 [SIMPLE_EDGE] vectorized + SHUFFLE [RS_7] + Select Operator [SEL_6] (rows=500/500 width=178) Output:["_col0","_col1"] TableScan [TS_0] (rows=500/500 width=178) default@src,src,Tbl:COMPLETE,Col:COMPLETE,Output:["key","value"] @@ -672,19 +672,19 @@ Stage-3 File Output Operator [FS_5] Group By Operator [GBY_3] (rows=1/1 width=2760) Output:["_col0","_col1","_col2","_col3","_col4"],aggregations:["compute_stats(VALUE._col0, 'hll')","compute_stats(VALUE._col2, 'hll')","compute_stats(VALUE._col3, 'hll')","compute_stats(VALUE._col4, 'hll')","compute_stats(VALUE._col5, 'hll')"] - <-Map 1 [CUSTOM_SIMPLE_EDGE] - File Output Operator [FS_3] + <-Map 1 [CUSTOM_SIMPLE_EDGE] vectorized + File Output Operator [FS_8] table:{"name:":"default.orc_merge5"} - Select Operator [SEL_2] (rows=1/3 width=352) + Select Operator [SEL_7] (rows=1/3 width=352) Output:["_col0","_col1","_col2","_col3","_col4"] - Filter Operator [FIL_4] (rows=1/3 width=352) + Filter Operator [FIL_6] (rows=1/3 width=352) predicate:(userid <= 13) TableScan [TS_0] (rows=1/15000 width=352) default@orc_merge5,orc_merge5,Tbl:COMPLETE,Col:NONE,Output:["userid","string1","subtype","decimal1","ts"] - PARTITION_ONLY_SHUFFLE [RS_2] - Select Operator [SEL_1] (rows=1/3 width=352) + PARTITION_ONLY_SHUFFLE [RS_7] + Select Operator [SEL_6] (rows=1/3 width=352) Output:["userid","string1","subtype","decimal1","ts"] - Please refer to the previous Select Operator [SEL_2] + Please refer to the previous Select Operator [SEL_7] Stage-4(CONDITIONAL) File Merge Please refer to the previous Stage-8(CONDITIONAL CHILD TASKS: Stage-5, Stage-4, Stage-6) @@ -845,24 +845,24 @@ Stage-0 Fetch Operator limit:-1 Stage-1 - Map 2 - File Output Operator [FS_10] - Select Operator [SEL_9] (rows=391/480 width=186) + Map 2 vectorized + File Output Operator [FS_34] + Select Operator [SEL_33] (rows=391/480 width=186) Output:["_col0","_col1","_col2"] - Map Join Operator [MAPJOIN_25] (rows=391/480 width=186) - BucketMapJoin:true,Conds:RS_6._col0=SEL_5._col0(Inner),HybridGraceHashJoin:true,Output:["_col0","_col1","_col3"] - <-Map 1 [CUSTOM_EDGE] - MULTICAST [RS_6] + Map Join Operator [MAPJOIN_32] (rows=391/480 width=186) + BucketMapJoin:true,Conds:RS_29._col0=SEL_31._col0(Inner),HybridGraceHashJoin:true,Output:["_col0","_col1","_col3"] + <-Map 1 [CUSTOM_EDGE] vectorized + MULTICAST [RS_29] PartitionCols:_col0 - Select Operator [SEL_2] (rows=242/242 width=95) + Select Operator [SEL_28] (rows=242/242 width=95) Output:["_col0","_col1"] - Filter Operator [FIL_13] (rows=242/242 width=95) + Filter Operator [FIL_27] (rows=242/242 width=95) predicate:key is not null TableScan [TS_0] (rows=242/242 width=95) default@tab,a,Tbl:COMPLETE,Col:COMPLETE,Output:["key","value"] - <-Select Operator [SEL_5] (rows=500/500 width=95) + <-Select Operator [SEL_31] (rows=500/500 width=95) Output:["_col0","_col1"] - Filter Operator [FIL_14] (rows=500/500 width=95) + Filter Operator [FIL_30] (rows=500/500 width=95) predicate:key is not null TableScan [TS_3] (rows=500/500 width=95) default@tab_part,b,Tbl:COMPLETE,Col:COMPLETE,Output:["key","value"]