Repository: hive
Updated Branches:
  refs/heads/branch-1 b555cc331 -> 9285349fd


HIVE-11119 : Spark reduce vectorization doesnt account for scratch columns 
(Ashutosh Chauhan via Xuefu Zhang)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/9285349f
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/9285349f
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/9285349f

Branch: refs/heads/branch-1
Commit: 9285349fdd0a3a7da6ba19e40b3f97b6015d44a0
Parents: b555cc3
Author: Ashutosh Chauhan <hashut...@apache.org>
Authored: Fri Jun 26 12:07:53 2015 -0700
Committer: Xuefu Zhang <xzh...@cloudera.com>
Committed: Tue Jun 30 06:01:56 2015 -0700

----------------------------------------------------------------------
 .../ql/exec/spark/SparkReduceRecordHandler.java | 29 ++++----------
 .../hive/ql/exec/tez/ReduceRecordSource.java    | 33 ++++------------
 .../ql/exec/vector/VectorizedBatchUtil.java     | 41 ++++++++++++++------
 3 files changed, 44 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/9285349f/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java
index 23d6a5d..ac5e3ca 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java
@@ -26,6 +26,7 @@ import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.common.ObjectPair;
 import org.apache.hadoop.hive.ql.exec.MapredContext;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.OperatorUtils;
@@ -47,7 +48,7 @@ import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.serde2.SerDeUtils;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
-import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import 
org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.DataOutputBuffer;
@@ -70,7 +71,6 @@ import org.apache.hadoop.util.StringUtils;
 public class SparkReduceRecordHandler extends SparkRecordHandler {
 
   private static final Log LOG = 
LogFactory.getLog(SparkReduceRecordHandler.class);
-  private static final String PLAN_KEY = "__REDUCE_PLAN__";
 
   // Input value serde needs to be an array to support different SerDe
   // for different tags
@@ -153,8 +153,10 @@ public class SparkReduceRecordHandler extends 
SparkRecordHandler {
           /* vectorization only works with struct object inspectors */
           valueStructInspectors[tag] = (StructObjectInspector) 
valueObjectInspector[tag];
 
-          batches[tag] = 
VectorizedBatchUtil.constructVectorizedRowBatch(keyStructInspector,
-              valueStructInspectors[tag]);
+          ObjectPair<VectorizedRowBatch, StandardStructObjectInspector> pair = 
VectorizedBatchUtil.
+              constructVectorizedRowBatch(keyStructInspector,
+              valueStructInspectors[tag], 
gWork.getVectorScratchColumnTypeMap());
+          batches[tag] = pair.getFirst();
           final int totalColumns = keysColumnOffset
               + valueStructInspectors[tag].getAllStructFieldRefs().size();
           valueStringWriters[tag] = new 
ArrayList<VectorExpressionWriter>(totalColumns);
@@ -163,24 +165,7 @@ public class SparkReduceRecordHandler extends 
SparkRecordHandler {
           
valueStringWriters[tag].addAll(Arrays.asList(VectorExpressionWriterFactory
               
.genVectorStructExpressionWritables(valueStructInspectors[tag])));
 
-          /*
-           * The row object inspector used by ReduceWork needs to be a
-           * **standard** struct object inspector, not just any struct object
-           * inspector.
-           */
-          ArrayList<String> colNames = new ArrayList<String>();
-          List<? extends StructField> fields = 
keyStructInspector.getAllStructFieldRefs();
-          for (StructField field : fields) {
-            colNames.add(Utilities.ReduceField.KEY.toString() + "." + 
field.getFieldName());
-            ois.add(field.getFieldObjectInspector());
-          }
-          fields = valueStructInspectors[tag].getAllStructFieldRefs();
-          for (StructField field : fields) {
-            colNames.add(Utilities.ReduceField.VALUE.toString() + "." + 
field.getFieldName());
-            ois.add(field.getFieldObjectInspector());
-          }
-          rowObjectInspector[tag] = 
ObjectInspectorFactory.getStandardStructObjectInspector(
-              colNames, ois);
+          rowObjectInspector[tag] = pair.getSecond();
         } else {
           ois.add(keyObjectInspector);
           ois.add(valueObjectInspector[tag]);

http://git-wip-us.apache.org/repos/asf/hive/blob/9285349f/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java
index cdabe3a..7d79e87 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java
@@ -26,6 +26,7 @@ import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.common.ObjectPair;
 import org.apache.hadoop.hive.ql.exec.CommonMergeJoinOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.Utilities;
@@ -34,7 +35,6 @@ import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.VectorDeserializeRow;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedBatchUtil;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
-import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
 import 
org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter;
 import 
org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriterFactory;
 import org.apache.hadoop.hive.ql.log.PerfLogger;
@@ -51,7 +51,7 @@ import 
org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
 import 
org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
-import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import 
org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.mapred.JobConf;
@@ -95,7 +95,6 @@ public class ReduceRecordSource implements RecordSource {
 
   private VectorDeserializeRow valueLazyBinaryDeserializeToRow;
 
-  private VectorizedRowBatchCtx batchContext;
   private VectorizedRowBatch batch;
 
   // number of columns pertaining to keys in a vectorized row batch
@@ -117,7 +116,7 @@ public class ReduceRecordSource implements RecordSource {
   private final PerfLogger perfLogger = PerfLogger.getPerfLogger();
 
   private Iterable<Object> valueWritables;
-  
+
   private final GroupIterator groupIterator = new GroupIterator();
 
   void init(JobConf jconf, Operator<?> reducer, boolean vectorized, TableDesc 
keyTableDesc,
@@ -169,26 +168,10 @@ public class ReduceRecordSource implements RecordSource {
             .asList(VectorExpressionWriterFactory
                 .genVectorStructExpressionWritables(valueStructInspectors)));
 
-        /*
-         * The row object inspector used by ReduceWork needs to be a 
**standard**
-         * struct object inspector, not just any struct object inspector.
-         */
-        ArrayList<String> colNames = new ArrayList<String>();
-        List<? extends StructField> fields = 
keyStructInspector.getAllStructFieldRefs();
-        for (StructField field: fields) {
-          colNames.add(Utilities.ReduceField.KEY.toString() + "." + 
field.getFieldName());
-          ois.add(field.getFieldObjectInspector());
-        }
-        fields = valueStructInspectors.getAllStructFieldRefs();
-        for (StructField field: fields) {
-          colNames.add(Utilities.ReduceField.VALUE.toString() + "." + 
field.getFieldName());
-          ois.add(field.getFieldObjectInspector());
-        }
-        rowObjectInspector = 
ObjectInspectorFactory.getStandardStructObjectInspector(colNames, ois);
-
-        batchContext = new VectorizedRowBatchCtx();
-        batchContext.init(vectorScratchColumnTypeMap, (StructObjectInspector) 
rowObjectInspector);
-        batch = batchContext.createVectorizedRowBatch();
+        ObjectPair<VectorizedRowBatch, StandardStructObjectInspector> pair =
+            
VectorizedBatchUtil.constructVectorizedRowBatch(keyStructInspector, 
valueStructInspectors, vectorScratchColumnTypeMap);
+        rowObjectInspector = pair.getSecond();
+        batch = pair.getFirst();
 
         // Setup vectorized deserialization for the key and value.
         BinarySortableSerDe binarySortableSerDe = (BinarySortableSerDe) 
inputKeyDeserializer;
@@ -237,7 +220,7 @@ public class ReduceRecordSource implements RecordSource {
     }
     perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_INIT_OPERATORS);
   }
-  
+
   @Override
   public final boolean isGrouped() {
     return vectorized;

http://git-wip-us.apache.org/repos/asf/hive/blob/9285349f/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java
index 99cb620..3780113 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java
@@ -24,13 +24,16 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.common.ObjectPair;
 import org.apache.hadoop.hive.common.type.HiveChar;
 import org.apache.hadoop.hive.common.type.HiveIntervalDayTime;
 import org.apache.hadoop.hive.common.type.HiveIntervalYearMonth;
 import org.apache.hadoop.hive.common.type.HiveVarchar;
+import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.serde2.io.ByteWritable;
 import org.apache.hadoop.hive.serde2.io.DateWritable;
@@ -192,21 +195,35 @@ public class VectorizedBatchUtil {
 
   /**
    * Create VectorizedRowBatch from key and value object inspectors
-   *
+   * The row object inspector used by ReduceWork needs to be a **standard**
+   * struct object inspector, not just any struct object inspector.
    * @param keyInspector
    * @param valueInspector
-   * @return VectorizedRowBatch
+   * @param vectorScratchColumnTypeMap
+   * @return VectorizedRowBatch, OI
    * @throws HiveException
    */
-  public static VectorizedRowBatch constructVectorizedRowBatch(
-      StructObjectInspector keyInspector, StructObjectInspector valueInspector)
+  public static ObjectPair<VectorizedRowBatch, StandardStructObjectInspector> 
constructVectorizedRowBatch(
+      StructObjectInspector keyInspector, StructObjectInspector 
valueInspector, Map<Integer, String> vectorScratchColumnTypeMap)
           throws HiveException {
-    final List<ColumnVector> cvList = new LinkedList<ColumnVector>();
-    allocateColumnVector(keyInspector, cvList);
-    allocateColumnVector(valueInspector, cvList);
-    final VectorizedRowBatch result = new VectorizedRowBatch(cvList.size());
-    result.cols = cvList.toArray(result.cols);
-    return result;
+
+    ArrayList<String> colNames = new ArrayList<String>();
+    ArrayList<ObjectInspector> ois = new ArrayList<ObjectInspector>();
+    List<? extends StructField> fields = keyInspector.getAllStructFieldRefs();
+    for (StructField field: fields) {
+      colNames.add(Utilities.ReduceField.KEY.toString() + "." + 
field.getFieldName());
+      ois.add(field.getFieldObjectInspector());
+    }
+    fields = valueInspector.getAllStructFieldRefs();
+    for (StructField field: fields) {
+      colNames.add(Utilities.ReduceField.VALUE.toString() + "." + 
field.getFieldName());
+      ois.add(field.getFieldObjectInspector());
+    }
+    StandardStructObjectInspector rowObjectInspector = 
ObjectInspectorFactory.getStandardStructObjectInspector(colNames, ois);
+
+    VectorizedRowBatchCtx batchContext = new VectorizedRowBatchCtx();
+    batchContext.init(vectorScratchColumnTypeMap, rowObjectInspector);
+    return new ObjectPair<>(batchContext.createVectorizedRowBatch(), 
rowObjectInspector);
   }
 
   /**
@@ -559,7 +576,7 @@ public class VectorizedBatchUtil {
     for(StructField field : fields) {
       TypeInfo typeInfo = TypeInfoUtils.getTypeInfoFromTypeString(
           field.getFieldObjectInspector().getTypeName());
-      ObjectInspector standardWritableObjectInspector = 
+      ObjectInspector standardWritableObjectInspector =
               
TypeInfoUtils.getStandardWritableObjectInspectorFromTypeInfo(typeInfo);
       oids.add(standardWritableObjectInspector);
       columnNames.add(field.getFieldName());
@@ -634,7 +651,7 @@ public class VectorizedBatchUtil {
     for (int i = start; i < start + length; i++) {
       char ch = (char) bytes[i];
       if (ch < ' ' || ch > '~') {
-        sb.append(String.format("\\%03d", (int) (bytes[i] & 0xff)));
+        sb.append(String.format("\\%03d", bytes[i] & 0xff));
       } else {
         sb.append(ch);
       }

Reply via email to