HIVE-16065: Vectorization: Wrong Key/Value information used by Vectorizer (Matt McCline, reviewed by Jason Dere)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/154c58b5 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/154c58b5 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/154c58b5 Branch: refs/heads/branch-2.2 Commit: 154c58b56163130a07f152a9bfe37cc7d6f2e6b5 Parents: 2f3c949 Author: Matt McCline <mmccl...@hortonworks.com> Authored: Thu Mar 2 18:40:24 2017 -0800 Committer: Owen O'Malley <omal...@apache.org> Committed: Tue Mar 28 15:27:58 2017 -0700 ---------------------------------------------------------------------- .../hive/ql/exec/tez/ReduceRecordProcessor.java | 2 +- .../hive/ql/exec/tez/ReduceRecordSource.java | 8 ++- .../hive/ql/optimizer/physical/Vectorizer.java | 74 ++++++++++++++------ .../apache/hadoop/hive/ql/plan/BaseWork.java | 10 +++ .../apache/hadoop/hive/ql/plan/ReduceWork.java | 31 -------- 5 files changed, 70 insertions(+), 55 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/154c58b5/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java index 2d06545..3fb9fb1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java @@ -300,7 +300,7 @@ public class ReduceRecordProcessor extends RecordProcessor{ boolean vectorizedRecordSource = (tag == bigTablePosition) && redWork.getVectorMode(); sources[tag].init(jconf, redWork.getReducer(), vectorizedRecordSource, keyTableDesc, valueTableDesc, reader, tag == bigTablePosition, (byte) tag, - redWork.getVectorizedRowBatchCtx()); + redWork.getVectorizedRowBatchCtx(), redWork.getVectorizedVertexNum()); ois[tag] = sources[tag].getObjectInspector(); } http://git-wip-us.apache.org/repos/asf/hive/blob/154c58b5/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java index 7e41b7a..342e1ed 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java @@ -120,11 +120,14 @@ public class ReduceRecordSource implements RecordSource { private final GroupIterator groupIterator = new GroupIterator(); + private long vectorizedVertexNum; + void init(JobConf jconf, Operator<?> reducer, boolean vectorized, TableDesc keyTableDesc, TableDesc valueTableDesc, Reader reader, boolean handleGroupKey, byte tag, - VectorizedRowBatchCtx batchContext) + VectorizedRowBatchCtx batchContext, long vectorizedVertexNum) throws Exception { + this.vectorizedVertexNum = vectorizedVertexNum; ObjectInspector keyObjectInspector; this.reducer = reducer; @@ -471,7 +474,8 @@ public class ReduceRecordSource implements RecordSource { + StringUtils.stringifyException(e2) + " ]"; } throw new HiveException("Hive Runtime Error while processing vector batch (tag=" - + tag + ") " + rowString, e); + + tag + ") (vectorizedVertexNum " + vectorizedVertexNum + ") " + + rowString, e); } } http://git-wip-us.apache.org/repos/asf/hive/blob/154c58b5/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java index 864b783..a30923f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java @@ -168,6 +168,7 @@ import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; import org.apache.hadoop.hive.serde2.Deserializer; import org.apache.hadoop.hive.serde2.NullStructSerDe; +import org.apache.hadoop.hive.serde2.SerDeUtils; import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; import org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; @@ -182,6 +183,7 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; import org.apache.hadoop.mapred.SequenceFileInputFormat; import org.apache.hadoop.mapred.TextInputFormat; +import org.apache.hadoop.util.ReflectionUtils; import com.google.common.base.Preconditions; @@ -242,6 +244,8 @@ public class Vectorizer implements PhysicalPlanResolver { HiveVectorAdaptorUsageMode hiveVectorAdaptorUsageMode; + private long vectorizedVertexNum = -1; + public Vectorizer() { /* @@ -486,6 +490,7 @@ public class Vectorizer implements PhysicalPlanResolver { private void convertMapWork(MapWork mapWork, boolean isTez) throws SemanticException { VectorTaskColumnInfo vectorTaskColumnInfo = new VectorTaskColumnInfo(); + mapWork.setVectorizedVertexNum(++vectorizedVertexNum); boolean ret = validateMapWork(mapWork, vectorTaskColumnInfo, isTez); if (ret) { vectorizeMapWork(mapWork, vectorTaskColumnInfo, isTez); @@ -935,6 +940,7 @@ public class Vectorizer implements PhysicalPlanResolver { private void convertReduceWork(ReduceWork reduceWork, boolean isTez) throws SemanticException { VectorTaskColumnInfo vectorTaskColumnInfo = new VectorTaskColumnInfo(); + reduceWork.setVectorizedVertexNum(++vectorizedVertexNum); boolean ret = validateReduceWork(reduceWork, vectorTaskColumnInfo, isTez); if (ret) { vectorizeReduceWork(reduceWork, vectorTaskColumnInfo, isTez); @@ -947,36 +953,56 @@ public class Vectorizer implements PhysicalPlanResolver { ArrayList<String> reduceColumnNames = new ArrayList<String>(); ArrayList<TypeInfo> reduceTypeInfos = new ArrayList<TypeInfo>(); + if (reduceWork.getNeedsTagging()) { + LOG.info("Tagging not supported"); + return false; + } + try { - // Check key ObjectInspector. - ObjectInspector keyObjectInspector = reduceWork.getKeyObjectInspector(); - if (keyObjectInspector == null || !(keyObjectInspector instanceof StructObjectInspector)) { - return false; + TableDesc keyTableDesc = reduceWork.getKeyDesc(); + if (LOG.isDebugEnabled()) { + LOG.debug("Using reduce tag " + reduceWork.getTag()); } - StructObjectInspector keyStructObjectInspector = (StructObjectInspector)keyObjectInspector; - List<? extends StructField> keyFields = keyStructObjectInspector.getAllStructFieldRefs(); - - // Tez doesn't use tagging... - if (reduceWork.getNeedsTagging()) { + TableDesc valueTableDesc = reduceWork.getTagToValueDesc().get(reduceWork.getTag()); + + Deserializer keyDeserializer = + ReflectionUtils.newInstance( + keyTableDesc.getDeserializerClass(), null); + SerDeUtils.initializeSerDe(keyDeserializer, null, keyTableDesc.getProperties(), null); + ObjectInspector keyObjectInspector = keyDeserializer.getObjectInspector(); + if (keyObjectInspector == null) { + LOG.info("Key object inspector null"); return false; } - - // Check value ObjectInspector. - ObjectInspector valueObjectInspector = reduceWork.getValueObjectInspector(); - if (valueObjectInspector == null || - !(valueObjectInspector instanceof StructObjectInspector)) { + if (!(keyObjectInspector instanceof StructObjectInspector)) { + LOG.info("Key object inspector not StructObjectInspector"); return false; } - StructObjectInspector valueStructObjectInspector = (StructObjectInspector)valueObjectInspector; - List<? extends StructField> valueFields = valueStructObjectInspector.getAllStructFieldRefs(); + StructObjectInspector keyStructObjectInspector = (StructObjectInspector) keyObjectInspector; + List<? extends StructField> keyFields = keyStructObjectInspector.getAllStructFieldRefs(); for (StructField field: keyFields) { reduceColumnNames.add(Utilities.ReduceField.KEY.toString() + "." + field.getFieldName()); reduceTypeInfos.add(TypeInfoUtils.getTypeInfoFromTypeString(field.getFieldObjectInspector().getTypeName())); } - for (StructField field: valueFields) { - reduceColumnNames.add(Utilities.ReduceField.VALUE.toString() + "." + field.getFieldName()); - reduceTypeInfos.add(TypeInfoUtils.getTypeInfoFromTypeString(field.getFieldObjectInspector().getTypeName())); + + Deserializer valueDeserializer = + ReflectionUtils.newInstance( + valueTableDesc.getDeserializerClass(), null); + SerDeUtils.initializeSerDe(valueDeserializer, null, valueTableDesc.getProperties(), null); + ObjectInspector valueObjectInspector = valueDeserializer.getObjectInspector(); + if (valueObjectInspector != null) { + if (!(valueObjectInspector instanceof StructObjectInspector)) { + LOG.info("Value object inspector not StructObjectInspector"); + return false; + } + StructObjectInspector valueStructObjectInspector = (StructObjectInspector) valueObjectInspector; + List<? extends StructField> valueFields = valueStructObjectInspector.getAllStructFieldRefs(); + + for (StructField field: valueFields) { + reduceColumnNames.add(Utilities.ReduceField.VALUE.toString() + "." + field.getFieldName()); + reduceTypeInfos.add(TypeInfoUtils.getTypeInfoFromTypeString(field.getFieldObjectInspector().getTypeName())); + } } } catch (Exception e) { throw new SemanticException(e); @@ -1254,6 +1280,10 @@ public class Vectorizer implements PhysicalPlanResolver { if (op instanceof TableScanOperator) { if (taskVectorizationContext == null) { taskVectorizationContext = getVectorizationContext(op.getName(), vectorTaskColumnInfo); + if (LOG.isInfoEnabled()) { + LOG.info("MapWorkVectorizationNodeProcessor process vectorizedVertexNum " + vectorizedVertexNum + " mapColumnNames " + vectorTaskColumnInfo.allColumnNames.toString()); + LOG.info("MapWorkVectorizationNodeProcessor process vectorizedVertexNum " + vectorizedVertexNum + " mapTypeInfos " + vectorTaskColumnInfo.allTypeInfos.toString()); + } } vContext = taskVectorizationContext; } else { @@ -1320,8 +1350,10 @@ public class Vectorizer implements PhysicalPlanResolver { boolean saveRootVectorOp = false; if (op.getParentOperators().size() == 0) { - LOG.info("ReduceWorkVectorizationNodeProcessor process reduceColumnNames " + vectorTaskColumnInfo.allColumnNames.toString()); - + if (LOG.isInfoEnabled()) { + LOG.info("ReduceWorkVectorizationNodeProcessor process vectorizedVertexNum " + vectorizedVertexNum + " reduceColumnNames " + vectorTaskColumnInfo.allColumnNames.toString()); + LOG.info("ReduceWorkVectorizationNodeProcessor process vectorizedVertexNum " + vectorizedVertexNum + " reduceTypeInfos " + vectorTaskColumnInfo.allTypeInfos.toString()); + } vContext = new VectorizationContext("__Reduce_Shuffle__", vectorTaskColumnInfo.allColumnNames, hiveConf); taskVectorizationContext = vContext; http://git-wip-us.apache.org/repos/asf/hive/blob/154c58b5/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java index 8c341fc..990da12 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java @@ -73,6 +73,8 @@ public abstract class BaseWork extends AbstractOperatorDesc { protected boolean useVectorizedInputFileFormat; + protected long vectorizedVertexNum; + protected boolean llapMode = false; protected boolean uberMode = false; @@ -167,6 +169,14 @@ public abstract class BaseWork extends AbstractOperatorDesc { return returnSet; } + public void setVectorizedVertexNum(long vectorizedVertexNum) { + this.vectorizedVertexNum = vectorizedVertexNum; + } + + public long getVectorizedVertexNum() { + return vectorizedVertexNum; + } + // ----------------------------------------------------------------------------------------------- /* http://git-wip-us.apache.org/repos/asf/hive/blob/154c58b5/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java index 72fc4ca..1910ebd 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java @@ -103,37 +103,6 @@ public class ReduceWork extends BaseWork { return keyDesc; } - private ObjectInspector getObjectInspector(TableDesc desc) { - ObjectInspector objectInspector; - try { - Deserializer deserializer = ReflectionUtil.newInstance(desc - .getDeserializerClass(), null); - SerDeUtils.initializeSerDe(deserializer, null, desc.getProperties(), null); - objectInspector = deserializer.getObjectInspector(); - } catch (Exception e) { - return null; - } - return objectInspector; - } - - public ObjectInspector getKeyObjectInspector() { - if (keyObjectInspector == null) { - keyObjectInspector = getObjectInspector(keyDesc); - } - return keyObjectInspector; - } - - // Only works when not tagging. - public ObjectInspector getValueObjectInspector() { - if (needsTagging) { - return null; - } - if (valueObjectInspector == null) { - valueObjectInspector = getObjectInspector(tagToValueDesc.get(tag)); - } - return valueObjectInspector; - } - public List<TableDesc> getTagToValueDesc() { return tagToValueDesc; }