Repository: hive Updated Branches: refs/heads/master 4904ab786 -> 22af0eff0
HIVE-16065: Vectorization: Wrong Key/Value information used by Vectorizer (Matt McCline, reviewed by Jason Dere) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/22af0eff Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/22af0eff Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/22af0eff Branch: refs/heads/master Commit: 22af0eff0461d2b91a643276c15870f7901d3119 Parents: 4904ab7 Author: Matt McCline <mmccl...@hortonworks.com> Authored: Mon Mar 6 11:05:15 2017 -0800 Committer: Matt McCline <mmccl...@hortonworks.com> Committed: Mon Mar 6 11:05:15 2017 -0800 ---------------------------------------------------------------------- .../hive/ql/exec/tez/ReduceRecordProcessor.java | 2 +- .../hive/ql/exec/tez/ReduceRecordSource.java | 8 ++- .../hive/ql/optimizer/physical/Vectorizer.java | 76 ++++++++++++++------ .../apache/hadoop/hive/ql/plan/BaseWork.java | 10 +++ .../apache/hadoop/hive/ql/plan/ReduceWork.java | 31 -------- .../clientpositive/spark/vectorized_ptf.q.out | 8 +-- .../spark/vectorized_shufflejoin.q.out | 2 +- 7 files changed, 76 insertions(+), 61 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/22af0eff/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java index 2d06545..3fb9fb1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java @@ -300,7 +300,7 @@ public class ReduceRecordProcessor extends RecordProcessor{ boolean vectorizedRecordSource = (tag == bigTablePosition) && redWork.getVectorMode(); sources[tag].init(jconf, redWork.getReducer(), vectorizedRecordSource, keyTableDesc, valueTableDesc, reader, tag == bigTablePosition, (byte) tag, - redWork.getVectorizedRowBatchCtx()); + redWork.getVectorizedRowBatchCtx(), redWork.getVectorizedVertexNum()); ois[tag] = sources[tag].getObjectInspector(); } http://git-wip-us.apache.org/repos/asf/hive/blob/22af0eff/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java index 8cd49c5..ad8b9e0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java @@ -121,11 +121,14 @@ public class ReduceRecordSource implements RecordSource { private final GroupIterator groupIterator = new GroupIterator(); + private long vectorizedVertexNum; + void init(JobConf jconf, Operator<?> reducer, boolean vectorized, TableDesc keyTableDesc, TableDesc valueTableDesc, Reader reader, boolean handleGroupKey, byte tag, - VectorizedRowBatchCtx batchContext) + VectorizedRowBatchCtx batchContext, long vectorizedVertexNum) throws Exception { + this.vectorizedVertexNum = vectorizedVertexNum; ObjectInspector keyObjectInspector; this.reducer = reducer; @@ -476,7 +479,8 @@ public class ReduceRecordSource implements RecordSource { + StringUtils.stringifyException(e2) + " ]"; } throw new HiveException("Hive Runtime Error while processing vector batch (tag=" - + tag + ") " + rowString, e); + + tag + ") (vectorizedVertexNum " + vectorizedVertexNum + ") " + + rowString, e); } } http://git-wip-us.apache.org/repos/asf/hive/blob/22af0eff/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java index f09bfa4..50eda15 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java @@ -201,6 +201,7 @@ import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; import org.apache.hadoop.hive.serde2.Deserializer; import org.apache.hadoop.hive.serde2.NullStructSerDe; +import org.apache.hadoop.hive.serde2.SerDeUtils; import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; import org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe; import org.apache.hadoop.hive.serde2.SerDeException; @@ -220,6 +221,7 @@ import org.apache.hadoop.mapred.TextInputFormat; import org.apache.hive.common.util.AnnotationUtils; import org.apache.hive.common.util.HiveStringUtils; import org.apache.hive.common.util.ReflectionUtil; +import org.apache.hadoop.util.ReflectionUtils; import com.google.common.base.Preconditions; @@ -307,6 +309,8 @@ public class Vectorizer implements PhysicalPlanResolver { currentBaseWork.setNotVectorizedReason(null); } + private long vectorizedVertexNum = -1; + public Vectorizer() { /* @@ -604,6 +608,8 @@ public class Vectorizer implements PhysicalPlanResolver { VectorTaskColumnInfo vectorTaskColumnInfo = new VectorTaskColumnInfo(); vectorTaskColumnInfo.assume(); + mapWork.setVectorizedVertexNum(++vectorizedVertexNum); + boolean ret; try { ret = validateMapWork(mapWork, vectorTaskColumnInfo, isTezOrSpark); @@ -1137,6 +1143,8 @@ public class Vectorizer implements PhysicalPlanResolver { VectorTaskColumnInfo vectorTaskColumnInfo = new VectorTaskColumnInfo(); vectorTaskColumnInfo.assume(); + reduceWork.setVectorizedVertexNum(++vectorizedVertexNum); + boolean ret; try { ret = validateReduceWork(reduceWork, vectorTaskColumnInfo); @@ -1164,38 +1172,56 @@ public class Vectorizer implements PhysicalPlanResolver { ArrayList<String> reduceColumnNames = new ArrayList<String>(); ArrayList<TypeInfo> reduceTypeInfos = new ArrayList<TypeInfo>(); + if (reduceWork.getNeedsTagging()) { + setNodeIssue("Tagging not supported"); + return false; + } + try { - // Check key ObjectInspector. - ObjectInspector keyObjectInspector = reduceWork.getKeyObjectInspector(); - if (keyObjectInspector == null || !(keyObjectInspector instanceof StructObjectInspector)) { - setNodeIssue("Key object inspector missing or not StructObjectInspector"); - return false; + TableDesc keyTableDesc = reduceWork.getKeyDesc(); + if (LOG.isDebugEnabled()) { + LOG.debug("Using reduce tag " + reduceWork.getTag()); } - StructObjectInspector keyStructObjectInspector = (StructObjectInspector)keyObjectInspector; - List<? extends StructField> keyFields = keyStructObjectInspector.getAllStructFieldRefs(); + TableDesc valueTableDesc = reduceWork.getTagToValueDesc().get(reduceWork.getTag()); - if (reduceWork.getNeedsTagging()) { - setNodeIssue("Tez doesn't use tagging"); + Deserializer keyDeserializer = + ReflectionUtils.newInstance( + keyTableDesc.getDeserializerClass(), null); + SerDeUtils.initializeSerDe(keyDeserializer, null, keyTableDesc.getProperties(), null); + ObjectInspector keyObjectInspector = keyDeserializer.getObjectInspector(); + if (keyObjectInspector == null) { + setNodeIssue("Key object inspector null"); return false; } - - // Check value ObjectInspector. - ObjectInspector valueObjectInspector = reduceWork.getValueObjectInspector(); - if (valueObjectInspector == null || - !(valueObjectInspector instanceof StructObjectInspector)) { - setNodeIssue("Value object inspector missing or not StructObjectInspector"); + if (!(keyObjectInspector instanceof StructObjectInspector)) { + setNodeIssue("Key object inspector not StructObjectInspector"); return false; } - StructObjectInspector valueStructObjectInspector = (StructObjectInspector)valueObjectInspector; - List<? extends StructField> valueFields = valueStructObjectInspector.getAllStructFieldRefs(); + StructObjectInspector keyStructObjectInspector = (StructObjectInspector) keyObjectInspector; + List<? extends StructField> keyFields = keyStructObjectInspector.getAllStructFieldRefs(); for (StructField field: keyFields) { reduceColumnNames.add(Utilities.ReduceField.KEY.toString() + "." + field.getFieldName()); reduceTypeInfos.add(TypeInfoUtils.getTypeInfoFromTypeString(field.getFieldObjectInspector().getTypeName())); } - for (StructField field: valueFields) { - reduceColumnNames.add(Utilities.ReduceField.VALUE.toString() + "." + field.getFieldName()); - reduceTypeInfos.add(TypeInfoUtils.getTypeInfoFromTypeString(field.getFieldObjectInspector().getTypeName())); + + Deserializer valueDeserializer = + ReflectionUtils.newInstance( + valueTableDesc.getDeserializerClass(), null); + SerDeUtils.initializeSerDe(valueDeserializer, null, valueTableDesc.getProperties(), null); + ObjectInspector valueObjectInspector = valueDeserializer.getObjectInspector(); + if (valueObjectInspector != null) { + if (!(valueObjectInspector instanceof StructObjectInspector)) { + setNodeIssue("Value object inspector not StructObjectInspector"); + return false; + } + StructObjectInspector valueStructObjectInspector = (StructObjectInspector) valueObjectInspector; + List<? extends StructField> valueFields = valueStructObjectInspector.getAllStructFieldRefs(); + + for (StructField field: valueFields) { + reduceColumnNames.add(Utilities.ReduceField.VALUE.toString() + "." + field.getFieldName()); + reduceTypeInfos.add(TypeInfoUtils.getTypeInfoFromTypeString(field.getFieldObjectInspector().getTypeName())); + } } } catch (Exception e) { throw new SemanticException(e); @@ -1520,6 +1546,10 @@ public class Vectorizer implements PhysicalPlanResolver { if (op instanceof TableScanOperator) { if (taskVectorizationContext == null) { taskVectorizationContext = getVectorizationContext(op.getName(), vectorTaskColumnInfo); + if (LOG.isInfoEnabled()) { + LOG.info("MapWorkVectorizationNodeProcessor process vectorizedVertexNum " + vectorizedVertexNum + " mapColumnNames " + vectorTaskColumnInfo.allColumnNames.toString()); + LOG.info("MapWorkVectorizationNodeProcessor process vectorizedVertexNum " + vectorizedVertexNum + " mapTypeInfos " + vectorTaskColumnInfo.allTypeInfos.toString()); + } } vContext = taskVectorizationContext; } else { @@ -1584,8 +1614,10 @@ public class Vectorizer implements PhysicalPlanResolver { currentOperator = op; if (op.getParentOperators().size() == 0) { - LOG.info("ReduceWorkVectorizationNodeProcessor process reduceColumnNames " + vectorTaskColumnInfo.allColumnNames.toString()); - + if (LOG.isInfoEnabled()) { + LOG.info("ReduceWorkVectorizationNodeProcessor process vectorizedVertexNum " + vectorizedVertexNum + " reduceColumnNames " + vectorTaskColumnInfo.allColumnNames.toString()); + LOG.info("ReduceWorkVectorizationNodeProcessor process vectorizedVertexNum " + vectorizedVertexNum + " reduceTypeInfos " + vectorTaskColumnInfo.allTypeInfos.toString()); + } vContext = new VectorizationContext("__Reduce_Shuffle__", vectorTaskColumnInfo.allColumnNames, hiveConf); taskVectorizationContext = vContext; http://git-wip-us.apache.org/repos/asf/hive/blob/22af0eff/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java index 286ee3b..0984df7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java @@ -89,6 +89,8 @@ public abstract class BaseWork extends AbstractOperatorDesc { private boolean allNative; private boolean usesVectorUDFAdaptor; + protected long vectorizedVertexNum; + protected boolean llapMode = false; protected boolean uberMode = false; @@ -183,6 +185,14 @@ public abstract class BaseWork extends AbstractOperatorDesc { return returnSet; } + public void setVectorizedVertexNum(long vectorizedVertexNum) { + this.vectorizedVertexNum = vectorizedVertexNum; + } + + public long getVectorizedVertexNum() { + return vectorizedVertexNum; + } + // ----------------------------------------------------------------------------------------------- public void setVectorizationExamined(boolean vectorizationExamined) { http://git-wip-us.apache.org/repos/asf/hive/blob/22af0eff/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java index f4ab2a0..ee784dc 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java @@ -112,37 +112,6 @@ public class ReduceWork extends BaseWork { return keyDesc; } - private ObjectInspector getObjectInspector(TableDesc desc) { - ObjectInspector objectInspector; - try { - Deserializer deserializer = ReflectionUtil.newInstance(desc - .getDeserializerClass(), null); - SerDeUtils.initializeSerDe(deserializer, null, desc.getProperties(), null); - objectInspector = deserializer.getObjectInspector(); - } catch (Exception e) { - return null; - } - return objectInspector; - } - - public ObjectInspector getKeyObjectInspector() { - if (keyObjectInspector == null) { - keyObjectInspector = getObjectInspector(keyDesc); - } - return keyObjectInspector; - } - - // Only works when not tagging. - public ObjectInspector getValueObjectInspector() { - if (needsTagging) { - return null; - } - if (valueObjectInspector == null) { - valueObjectInspector = getObjectInspector(tagToValueDesc.get(tag)); - } - return valueObjectInspector; - } - public List<TableDesc> getTagToValueDesc() { return tagToValueDesc; } http://git-wip-us.apache.org/repos/asf/hive/blob/22af0eff/ql/src/test/results/clientpositive/spark/vectorized_ptf.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientpositive/spark/vectorized_ptf.q.out b/ql/src/test/results/clientpositive/spark/vectorized_ptf.q.out index d4c2228..b1b820e 100644 --- a/ql/src/test/results/clientpositive/spark/vectorized_ptf.q.out +++ b/ql/src/test/results/clientpositive/spark/vectorized_ptf.q.out @@ -570,7 +570,7 @@ STAGE PLANS: Reduce Vectorization: enabled: true enableConditionsMet: hive.vectorized.execution.reduce.enabled IS true, hive.execution.engine spark IN [tez, spark] IS true - notVectorizedReason: Tez doesn't use tagging + notVectorizedReason: Tagging not supported vectorized: false Reduce Operator Tree: Join Operator @@ -1996,7 +1996,7 @@ STAGE PLANS: Reduce Vectorization: enabled: true enableConditionsMet: hive.vectorized.execution.reduce.enabled IS true, hive.execution.engine spark IN [tez, spark] IS true - notVectorizedReason: Tez doesn't use tagging + notVectorizedReason: Tagging not supported vectorized: false Reduce Operator Tree: Join Operator @@ -2268,7 +2268,7 @@ STAGE PLANS: Reduce Vectorization: enabled: true enableConditionsMet: hive.vectorized.execution.reduce.enabled IS true, hive.execution.engine spark IN [tez, spark] IS true - notVectorizedReason: Tez doesn't use tagging + notVectorizedReason: Tagging not supported vectorized: false Reduce Operator Tree: Join Operator @@ -4062,7 +4062,7 @@ STAGE PLANS: Reduce Vectorization: enabled: true enableConditionsMet: hive.vectorized.execution.reduce.enabled IS true, hive.execution.engine spark IN [tez, spark] IS true - notVectorizedReason: Tez doesn't use tagging + notVectorizedReason: Tagging not supported vectorized: false Reduce Operator Tree: Join Operator http://git-wip-us.apache.org/repos/asf/hive/blob/22af0eff/ql/src/test/results/clientpositive/spark/vectorized_shufflejoin.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientpositive/spark/vectorized_shufflejoin.q.out b/ql/src/test/results/clientpositive/spark/vectorized_shufflejoin.q.out index 3b8b636..9aec2aa 100644 --- a/ql/src/test/results/clientpositive/spark/vectorized_shufflejoin.q.out +++ b/ql/src/test/results/clientpositive/spark/vectorized_shufflejoin.q.out @@ -111,7 +111,7 @@ STAGE PLANS: Reduce Vectorization: enabled: true enableConditionsMet: hive.vectorized.execution.reduce.enabled IS true, hive.execution.engine spark IN [tez, spark] IS true - notVectorizedReason: Tez doesn't use tagging + notVectorizedReason: Tagging not supported vectorized: false Reduce Operator Tree: Join Operator