Repository: hive Updated Branches: refs/heads/branch-1 b555cc331 -> 9285349fd
HIVE-11119 : Spark reduce vectorization doesnt account for scratch columns (Ashutosh Chauhan via Xuefu Zhang) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/9285349f Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/9285349f Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/9285349f Branch: refs/heads/branch-1 Commit: 9285349fdd0a3a7da6ba19e40b3f97b6015d44a0 Parents: b555cc3 Author: Ashutosh Chauhan <hashut...@apache.org> Authored: Fri Jun 26 12:07:53 2015 -0700 Committer: Xuefu Zhang <xzh...@cloudera.com> Committed: Tue Jun 30 06:01:56 2015 -0700 ---------------------------------------------------------------------- .../ql/exec/spark/SparkReduceRecordHandler.java | 29 ++++---------- .../hive/ql/exec/tez/ReduceRecordSource.java | 33 ++++------------ .../ql/exec/vector/VectorizedBatchUtil.java | 41 ++++++++++++++------ 3 files changed, 44 insertions(+), 59 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/9285349f/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java index 23d6a5d..ac5e3ca 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java @@ -26,6 +26,7 @@ import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.common.ObjectPair; import org.apache.hadoop.hive.ql.exec.MapredContext; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.OperatorUtils; @@ -47,7 +48,7 @@ import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.SerDeUtils; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; -import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.DataOutputBuffer; @@ -70,7 +71,6 @@ import org.apache.hadoop.util.StringUtils; public class SparkReduceRecordHandler extends SparkRecordHandler { private static final Log LOG = LogFactory.getLog(SparkReduceRecordHandler.class); - private static final String PLAN_KEY = "__REDUCE_PLAN__"; // Input value serde needs to be an array to support different SerDe // for different tags @@ -153,8 +153,10 @@ public class SparkReduceRecordHandler extends SparkRecordHandler { /* vectorization only works with struct object inspectors */ valueStructInspectors[tag] = (StructObjectInspector) valueObjectInspector[tag]; - batches[tag] = VectorizedBatchUtil.constructVectorizedRowBatch(keyStructInspector, - valueStructInspectors[tag]); + ObjectPair<VectorizedRowBatch, StandardStructObjectInspector> pair = VectorizedBatchUtil. + constructVectorizedRowBatch(keyStructInspector, + valueStructInspectors[tag], gWork.getVectorScratchColumnTypeMap()); + batches[tag] = pair.getFirst(); final int totalColumns = keysColumnOffset + valueStructInspectors[tag].getAllStructFieldRefs().size(); valueStringWriters[tag] = new ArrayList<VectorExpressionWriter>(totalColumns); @@ -163,24 +165,7 @@ public class SparkReduceRecordHandler extends SparkRecordHandler { valueStringWriters[tag].addAll(Arrays.asList(VectorExpressionWriterFactory .genVectorStructExpressionWritables(valueStructInspectors[tag]))); - /* - * The row object inspector used by ReduceWork needs to be a - * **standard** struct object inspector, not just any struct object - * inspector. - */ - ArrayList<String> colNames = new ArrayList<String>(); - List<? extends StructField> fields = keyStructInspector.getAllStructFieldRefs(); - for (StructField field : fields) { - colNames.add(Utilities.ReduceField.KEY.toString() + "." + field.getFieldName()); - ois.add(field.getFieldObjectInspector()); - } - fields = valueStructInspectors[tag].getAllStructFieldRefs(); - for (StructField field : fields) { - colNames.add(Utilities.ReduceField.VALUE.toString() + "." + field.getFieldName()); - ois.add(field.getFieldObjectInspector()); - } - rowObjectInspector[tag] = ObjectInspectorFactory.getStandardStructObjectInspector( - colNames, ois); + rowObjectInspector[tag] = pair.getSecond(); } else { ois.add(keyObjectInspector); ois.add(valueObjectInspector[tag]); http://git-wip-us.apache.org/repos/asf/hive/blob/9285349f/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java index cdabe3a..7d79e87 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java @@ -26,6 +26,7 @@ import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.common.ObjectPair; import org.apache.hadoop.hive.ql.exec.CommonMergeJoinOperator; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.Utilities; @@ -34,7 +35,6 @@ import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; import org.apache.hadoop.hive.ql.exec.vector.VectorDeserializeRow; import org.apache.hadoop.hive.ql.exec.vector.VectorizedBatchUtil; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; -import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriterFactory; import org.apache.hadoop.hive.ql.log.PerfLogger; @@ -51,7 +51,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption; -import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.mapred.JobConf; @@ -95,7 +95,6 @@ public class ReduceRecordSource implements RecordSource { private VectorDeserializeRow valueLazyBinaryDeserializeToRow; - private VectorizedRowBatchCtx batchContext; private VectorizedRowBatch batch; // number of columns pertaining to keys in a vectorized row batch @@ -117,7 +116,7 @@ public class ReduceRecordSource implements RecordSource { private final PerfLogger perfLogger = PerfLogger.getPerfLogger(); private Iterable<Object> valueWritables; - + private final GroupIterator groupIterator = new GroupIterator(); void init(JobConf jconf, Operator<?> reducer, boolean vectorized, TableDesc keyTableDesc, @@ -169,26 +168,10 @@ public class ReduceRecordSource implements RecordSource { .asList(VectorExpressionWriterFactory .genVectorStructExpressionWritables(valueStructInspectors))); - /* - * The row object inspector used by ReduceWork needs to be a **standard** - * struct object inspector, not just any struct object inspector. - */ - ArrayList<String> colNames = new ArrayList<String>(); - List<? extends StructField> fields = keyStructInspector.getAllStructFieldRefs(); - for (StructField field: fields) { - colNames.add(Utilities.ReduceField.KEY.toString() + "." + field.getFieldName()); - ois.add(field.getFieldObjectInspector()); - } - fields = valueStructInspectors.getAllStructFieldRefs(); - for (StructField field: fields) { - colNames.add(Utilities.ReduceField.VALUE.toString() + "." + field.getFieldName()); - ois.add(field.getFieldObjectInspector()); - } - rowObjectInspector = ObjectInspectorFactory.getStandardStructObjectInspector(colNames, ois); - - batchContext = new VectorizedRowBatchCtx(); - batchContext.init(vectorScratchColumnTypeMap, (StructObjectInspector) rowObjectInspector); - batch = batchContext.createVectorizedRowBatch(); + ObjectPair<VectorizedRowBatch, StandardStructObjectInspector> pair = + VectorizedBatchUtil.constructVectorizedRowBatch(keyStructInspector, valueStructInspectors, vectorScratchColumnTypeMap); + rowObjectInspector = pair.getSecond(); + batch = pair.getFirst(); // Setup vectorized deserialization for the key and value. BinarySortableSerDe binarySortableSerDe = (BinarySortableSerDe) inputKeyDeserializer; @@ -237,7 +220,7 @@ public class ReduceRecordSource implements RecordSource { } perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_INIT_OPERATORS); } - + @Override public final boolean isGrouped() { return vectorized; http://git-wip-us.apache.org/repos/asf/hive/blob/9285349f/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java index 99cb620..3780113 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java @@ -24,13 +24,16 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.LinkedList; import java.util.List; +import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.common.ObjectPair; import org.apache.hadoop.hive.common.type.HiveChar; import org.apache.hadoop.hive.common.type.HiveIntervalDayTime; import org.apache.hadoop.hive.common.type.HiveIntervalYearMonth; import org.apache.hadoop.hive.common.type.HiveVarchar; +import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.serde2.io.ByteWritable; import org.apache.hadoop.hive.serde2.io.DateWritable; @@ -192,21 +195,35 @@ public class VectorizedBatchUtil { /** * Create VectorizedRowBatch from key and value object inspectors - * + * The row object inspector used by ReduceWork needs to be a **standard** + * struct object inspector, not just any struct object inspector. * @param keyInspector * @param valueInspector - * @return VectorizedRowBatch + * @param vectorScratchColumnTypeMap + * @return VectorizedRowBatch, OI * @throws HiveException */ - public static VectorizedRowBatch constructVectorizedRowBatch( - StructObjectInspector keyInspector, StructObjectInspector valueInspector) + public static ObjectPair<VectorizedRowBatch, StandardStructObjectInspector> constructVectorizedRowBatch( + StructObjectInspector keyInspector, StructObjectInspector valueInspector, Map<Integer, String> vectorScratchColumnTypeMap) throws HiveException { - final List<ColumnVector> cvList = new LinkedList<ColumnVector>(); - allocateColumnVector(keyInspector, cvList); - allocateColumnVector(valueInspector, cvList); - final VectorizedRowBatch result = new VectorizedRowBatch(cvList.size()); - result.cols = cvList.toArray(result.cols); - return result; + + ArrayList<String> colNames = new ArrayList<String>(); + ArrayList<ObjectInspector> ois = new ArrayList<ObjectInspector>(); + List<? extends StructField> fields = keyInspector.getAllStructFieldRefs(); + for (StructField field: fields) { + colNames.add(Utilities.ReduceField.KEY.toString() + "." + field.getFieldName()); + ois.add(field.getFieldObjectInspector()); + } + fields = valueInspector.getAllStructFieldRefs(); + for (StructField field: fields) { + colNames.add(Utilities.ReduceField.VALUE.toString() + "." + field.getFieldName()); + ois.add(field.getFieldObjectInspector()); + } + StandardStructObjectInspector rowObjectInspector = ObjectInspectorFactory.getStandardStructObjectInspector(colNames, ois); + + VectorizedRowBatchCtx batchContext = new VectorizedRowBatchCtx(); + batchContext.init(vectorScratchColumnTypeMap, rowObjectInspector); + return new ObjectPair<>(batchContext.createVectorizedRowBatch(), rowObjectInspector); } /** @@ -559,7 +576,7 @@ public class VectorizedBatchUtil { for(StructField field : fields) { TypeInfo typeInfo = TypeInfoUtils.getTypeInfoFromTypeString( field.getFieldObjectInspector().getTypeName()); - ObjectInspector standardWritableObjectInspector = + ObjectInspector standardWritableObjectInspector = TypeInfoUtils.getStandardWritableObjectInspectorFromTypeInfo(typeInfo); oids.add(standardWritableObjectInspector); columnNames.add(field.getFieldName()); @@ -634,7 +651,7 @@ public class VectorizedBatchUtil { for (int i = start; i < start + length; i++) { char ch = (char) bytes[i]; if (ch < ' ' || ch > '~') { - sb.append(String.format("\\%03d", (int) (bytes[i] & 0xff))); + sb.append(String.format("\\%03d", bytes[i] & 0xff)); } else { sb.append(ch); }