Repository: hive
Updated Branches:
  refs/heads/master 4904ab786 -> 22af0eff0


HIVE-16065: Vectorization: Wrong Key/Value information used by Vectorizer (Matt 
McCline, reviewed by Jason Dere)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/22af0eff
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/22af0eff
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/22af0eff

Branch: refs/heads/master
Commit: 22af0eff0461d2b91a643276c15870f7901d3119
Parents: 4904ab7
Author: Matt McCline <mmccl...@hortonworks.com>
Authored: Mon Mar 6 11:05:15 2017 -0800
Committer: Matt McCline <mmccl...@hortonworks.com>
Committed: Mon Mar 6 11:05:15 2017 -0800

----------------------------------------------------------------------
 .../hive/ql/exec/tez/ReduceRecordProcessor.java |  2 +-
 .../hive/ql/exec/tez/ReduceRecordSource.java    |  8 ++-
 .../hive/ql/optimizer/physical/Vectorizer.java  | 76 ++++++++++++++------
 .../apache/hadoop/hive/ql/plan/BaseWork.java    | 10 +++
 .../apache/hadoop/hive/ql/plan/ReduceWork.java  | 31 --------
 .../clientpositive/spark/vectorized_ptf.q.out   |  8 +--
 .../spark/vectorized_shufflejoin.q.out          |  2 +-
 7 files changed, 76 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/22af0eff/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
index 2d06545..3fb9fb1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
@@ -300,7 +300,7 @@ public class ReduceRecordProcessor  extends RecordProcessor{
     boolean vectorizedRecordSource = (tag == bigTablePosition) && 
redWork.getVectorMode();
     sources[tag].init(jconf, redWork.getReducer(), vectorizedRecordSource, 
keyTableDesc,
         valueTableDesc, reader, tag == bigTablePosition, (byte) tag,
-        redWork.getVectorizedRowBatchCtx());
+        redWork.getVectorizedRowBatchCtx(), redWork.getVectorizedVertexNum());
     ois[tag] = sources[tag].getObjectInspector();
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/22af0eff/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java
index 8cd49c5..ad8b9e0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java
@@ -121,11 +121,14 @@ public class ReduceRecordSource implements RecordSource {
 
   private final GroupIterator groupIterator = new GroupIterator();
 
+  private long vectorizedVertexNum;
+
   void init(JobConf jconf, Operator<?> reducer, boolean vectorized, TableDesc 
keyTableDesc,
       TableDesc valueTableDesc, Reader reader, boolean handleGroupKey, byte 
tag,
-      VectorizedRowBatchCtx batchContext)
+      VectorizedRowBatchCtx batchContext, long vectorizedVertexNum)
       throws Exception {
 
+    this.vectorizedVertexNum = vectorizedVertexNum;
     ObjectInspector keyObjectInspector;
 
     this.reducer = reducer;
@@ -476,7 +479,8 @@ public class ReduceRecordSource implements RecordSource {
             + StringUtils.stringifyException(e2) + " ]";
       }
       throw new HiveException("Hive Runtime Error while processing vector 
batch (tag="
-          + tag + ") " + rowString, e);
+          + tag + ") (vectorizedVertexNum " + vectorizedVertexNum + ") " +
+          rowString, e);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/22af0eff/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java 
b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
index f09bfa4..50eda15 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
@@ -201,6 +201,7 @@ import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
 import org.apache.hadoop.hive.serde2.Deserializer;
 import org.apache.hadoop.hive.serde2.NullStructSerDe;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
 import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
 import org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe;
 import org.apache.hadoop.hive.serde2.SerDeException;
@@ -220,6 +221,7 @@ import org.apache.hadoop.mapred.TextInputFormat;
 import org.apache.hive.common.util.AnnotationUtils;
 import org.apache.hive.common.util.HiveStringUtils;
 import org.apache.hive.common.util.ReflectionUtil;
+import org.apache.hadoop.util.ReflectionUtils;
 
 import com.google.common.base.Preconditions;
 
@@ -307,6 +309,8 @@ public class Vectorizer implements PhysicalPlanResolver {
     currentBaseWork.setNotVectorizedReason(null);
   }
 
+  private long vectorizedVertexNum = -1;
+
   public Vectorizer() {
 
     /*
@@ -604,6 +608,8 @@ public class Vectorizer implements PhysicalPlanResolver {
       VectorTaskColumnInfo vectorTaskColumnInfo = new VectorTaskColumnInfo();
       vectorTaskColumnInfo.assume();
 
+      mapWork.setVectorizedVertexNum(++vectorizedVertexNum);
+
       boolean ret;
       try {
         ret = validateMapWork(mapWork, vectorTaskColumnInfo, isTezOrSpark);
@@ -1137,6 +1143,8 @@ public class Vectorizer implements PhysicalPlanResolver {
       VectorTaskColumnInfo vectorTaskColumnInfo = new VectorTaskColumnInfo();
       vectorTaskColumnInfo.assume();
 
+      reduceWork.setVectorizedVertexNum(++vectorizedVertexNum);
+
       boolean ret;
       try {
         ret = validateReduceWork(reduceWork, vectorTaskColumnInfo);
@@ -1164,38 +1172,56 @@ public class Vectorizer implements PhysicalPlanResolver 
{
       ArrayList<String> reduceColumnNames = new ArrayList<String>();
       ArrayList<TypeInfo> reduceTypeInfos = new ArrayList<TypeInfo>();
 
+      if (reduceWork.getNeedsTagging()) {
+        setNodeIssue("Tagging not supported");
+        return false;
+      }
+
       try {
-        // Check key ObjectInspector.
-        ObjectInspector keyObjectInspector = 
reduceWork.getKeyObjectInspector();
-        if (keyObjectInspector == null || !(keyObjectInspector instanceof 
StructObjectInspector)) {
-          setNodeIssue("Key object inspector missing or not 
StructObjectInspector");
-          return false;
+        TableDesc keyTableDesc = reduceWork.getKeyDesc();
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Using reduce tag " + reduceWork.getTag());
         }
-        StructObjectInspector keyStructObjectInspector = 
(StructObjectInspector)keyObjectInspector;
-        List<? extends StructField> keyFields = 
keyStructObjectInspector.getAllStructFieldRefs();
+        TableDesc valueTableDesc = 
reduceWork.getTagToValueDesc().get(reduceWork.getTag());
 
-        if (reduceWork.getNeedsTagging()) {
-          setNodeIssue("Tez doesn't use tagging");
+        Deserializer keyDeserializer =
+            ReflectionUtils.newInstance(
+                keyTableDesc.getDeserializerClass(), null);
+        SerDeUtils.initializeSerDe(keyDeserializer, null, 
keyTableDesc.getProperties(), null);
+        ObjectInspector keyObjectInspector = 
keyDeserializer.getObjectInspector();
+        if (keyObjectInspector == null) {
+          setNodeIssue("Key object inspector null");
           return false;
         }
-
-        // Check value ObjectInspector.
-        ObjectInspector valueObjectInspector = 
reduceWork.getValueObjectInspector();
-        if (valueObjectInspector == null ||
-                !(valueObjectInspector instanceof StructObjectInspector)) {
-          setNodeIssue("Value object inspector missing or not 
StructObjectInspector");
+        if (!(keyObjectInspector instanceof StructObjectInspector)) {
+          setNodeIssue("Key object inspector not StructObjectInspector");
           return false;
         }
-        StructObjectInspector valueStructObjectInspector = 
(StructObjectInspector)valueObjectInspector;
-        List<? extends StructField> valueFields = 
valueStructObjectInspector.getAllStructFieldRefs();
+        StructObjectInspector keyStructObjectInspector = 
(StructObjectInspector) keyObjectInspector;
+        List<? extends StructField> keyFields = 
keyStructObjectInspector.getAllStructFieldRefs();
 
         for (StructField field: keyFields) {
           reduceColumnNames.add(Utilities.ReduceField.KEY.toString() + "." + 
field.getFieldName());
           
reduceTypeInfos.add(TypeInfoUtils.getTypeInfoFromTypeString(field.getFieldObjectInspector().getTypeName()));
         }
-        for (StructField field: valueFields) {
-          reduceColumnNames.add(Utilities.ReduceField.VALUE.toString() + "." + 
field.getFieldName());
-          
reduceTypeInfos.add(TypeInfoUtils.getTypeInfoFromTypeString(field.getFieldObjectInspector().getTypeName()));
+
+        Deserializer valueDeserializer =
+            ReflectionUtils.newInstance(
+                valueTableDesc.getDeserializerClass(), null);
+        SerDeUtils.initializeSerDe(valueDeserializer, null, 
valueTableDesc.getProperties(), null);
+        ObjectInspector valueObjectInspector = 
valueDeserializer.getObjectInspector();
+        if (valueObjectInspector != null) {
+          if (!(valueObjectInspector instanceof StructObjectInspector)) {
+            setNodeIssue("Value object inspector not StructObjectInspector");
+            return false;
+          }
+          StructObjectInspector valueStructObjectInspector = 
(StructObjectInspector) valueObjectInspector;
+          List<? extends StructField> valueFields = 
valueStructObjectInspector.getAllStructFieldRefs();
+
+          for (StructField field: valueFields) {
+            reduceColumnNames.add(Utilities.ReduceField.VALUE.toString() + "." 
+ field.getFieldName());
+            
reduceTypeInfos.add(TypeInfoUtils.getTypeInfoFromTypeString(field.getFieldObjectInspector().getTypeName()));
+          }
         }
       } catch (Exception e) {
         throw new SemanticException(e);
@@ -1520,6 +1546,10 @@ public class Vectorizer implements PhysicalPlanResolver {
       if (op instanceof TableScanOperator) {
         if (taskVectorizationContext == null) {
           taskVectorizationContext = getVectorizationContext(op.getName(), 
vectorTaskColumnInfo);
+          if (LOG.isInfoEnabled()) {
+            LOG.info("MapWorkVectorizationNodeProcessor process 
vectorizedVertexNum " + vectorizedVertexNum + " mapColumnNames " + 
vectorTaskColumnInfo.allColumnNames.toString());
+            LOG.info("MapWorkVectorizationNodeProcessor process 
vectorizedVertexNum " + vectorizedVertexNum + " mapTypeInfos " + 
vectorTaskColumnInfo.allTypeInfos.toString());
+          }
         }
         vContext = taskVectorizationContext;
       } else {
@@ -1584,8 +1614,10 @@ public class Vectorizer implements PhysicalPlanResolver {
 
       currentOperator = op;
       if (op.getParentOperators().size() == 0) {
-        LOG.info("ReduceWorkVectorizationNodeProcessor process 
reduceColumnNames " + vectorTaskColumnInfo.allColumnNames.toString());
-
+        if (LOG.isInfoEnabled()) {
+          LOG.info("ReduceWorkVectorizationNodeProcessor process 
vectorizedVertexNum " + vectorizedVertexNum + " reduceColumnNames " + 
vectorTaskColumnInfo.allColumnNames.toString());
+          LOG.info("ReduceWorkVectorizationNodeProcessor process 
vectorizedVertexNum " + vectorizedVertexNum + " reduceTypeInfos " + 
vectorTaskColumnInfo.allTypeInfos.toString());
+        }
         vContext = new VectorizationContext("__Reduce_Shuffle__", 
vectorTaskColumnInfo.allColumnNames, hiveConf);
         taskVectorizationContext = vContext;
 

http://git-wip-us.apache.org/repos/asf/hive/blob/22af0eff/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java 
b/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java
index 286ee3b..0984df7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java
@@ -89,6 +89,8 @@ public abstract class BaseWork extends AbstractOperatorDesc {
   private boolean allNative;
   private boolean usesVectorUDFAdaptor;
 
+  protected long vectorizedVertexNum;
+
   protected boolean llapMode = false;
   protected boolean uberMode = false;
 
@@ -183,6 +185,14 @@ public abstract class BaseWork extends 
AbstractOperatorDesc {
     return returnSet;
   }
 
+  public void setVectorizedVertexNum(long vectorizedVertexNum) {
+    this.vectorizedVertexNum = vectorizedVertexNum;
+  }
+
+  public long getVectorizedVertexNum() {
+    return vectorizedVertexNum;
+  }
+
   // 
-----------------------------------------------------------------------------------------------
 
   public void setVectorizationExamined(boolean vectorizationExamined) {

http://git-wip-us.apache.org/repos/asf/hive/blob/22af0eff/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java 
b/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java
index f4ab2a0..ee784dc 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java
@@ -112,37 +112,6 @@ public class ReduceWork extends BaseWork {
      return keyDesc;
   }
 
-  private ObjectInspector getObjectInspector(TableDesc desc) {
-    ObjectInspector objectInspector;
-    try {
-      Deserializer deserializer = ReflectionUtil.newInstance(desc
-                .getDeserializerClass(), null);
-      SerDeUtils.initializeSerDe(deserializer, null, desc.getProperties(), 
null);
-      objectInspector = deserializer.getObjectInspector();
-    } catch (Exception e) {
-      return null;
-    }
-    return objectInspector;
-  }
-
-  public ObjectInspector getKeyObjectInspector() {
-    if (keyObjectInspector == null) {
-      keyObjectInspector = getObjectInspector(keyDesc);
-    }
-    return keyObjectInspector;
-  }
-
-  // Only works when not tagging.
-  public ObjectInspector getValueObjectInspector() {
-    if (needsTagging) {
-      return null;
-    }
-    if (valueObjectInspector == null) {
-      valueObjectInspector = getObjectInspector(tagToValueDesc.get(tag));
-    }
-    return valueObjectInspector;
-  }
-
   public List<TableDesc> getTagToValueDesc() {
     return tagToValueDesc;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/22af0eff/ql/src/test/results/clientpositive/spark/vectorized_ptf.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/spark/vectorized_ptf.q.out 
b/ql/src/test/results/clientpositive/spark/vectorized_ptf.q.out
index d4c2228..b1b820e 100644
--- a/ql/src/test/results/clientpositive/spark/vectorized_ptf.q.out
+++ b/ql/src/test/results/clientpositive/spark/vectorized_ptf.q.out
@@ -570,7 +570,7 @@ STAGE PLANS:
             Reduce Vectorization:
                 enabled: true
                 enableConditionsMet: hive.vectorized.execution.reduce.enabled 
IS true, hive.execution.engine spark IN [tez, spark] IS true
-                notVectorizedReason: Tez doesn't use tagging
+                notVectorizedReason: Tagging not supported
                 vectorized: false
             Reduce Operator Tree:
               Join Operator
@@ -1996,7 +1996,7 @@ STAGE PLANS:
             Reduce Vectorization:
                 enabled: true
                 enableConditionsMet: hive.vectorized.execution.reduce.enabled 
IS true, hive.execution.engine spark IN [tez, spark] IS true
-                notVectorizedReason: Tez doesn't use tagging
+                notVectorizedReason: Tagging not supported
                 vectorized: false
             Reduce Operator Tree:
               Join Operator
@@ -2268,7 +2268,7 @@ STAGE PLANS:
             Reduce Vectorization:
                 enabled: true
                 enableConditionsMet: hive.vectorized.execution.reduce.enabled 
IS true, hive.execution.engine spark IN [tez, spark] IS true
-                notVectorizedReason: Tez doesn't use tagging
+                notVectorizedReason: Tagging not supported
                 vectorized: false
             Reduce Operator Tree:
               Join Operator
@@ -4062,7 +4062,7 @@ STAGE PLANS:
             Reduce Vectorization:
                 enabled: true
                 enableConditionsMet: hive.vectorized.execution.reduce.enabled 
IS true, hive.execution.engine spark IN [tez, spark] IS true
-                notVectorizedReason: Tez doesn't use tagging
+                notVectorizedReason: Tagging not supported
                 vectorized: false
             Reduce Operator Tree:
               Join Operator

http://git-wip-us.apache.org/repos/asf/hive/blob/22af0eff/ql/src/test/results/clientpositive/spark/vectorized_shufflejoin.q.out
----------------------------------------------------------------------
diff --git 
a/ql/src/test/results/clientpositive/spark/vectorized_shufflejoin.q.out 
b/ql/src/test/results/clientpositive/spark/vectorized_shufflejoin.q.out
index 3b8b636..9aec2aa 100644
--- a/ql/src/test/results/clientpositive/spark/vectorized_shufflejoin.q.out
+++ b/ql/src/test/results/clientpositive/spark/vectorized_shufflejoin.q.out
@@ -111,7 +111,7 @@ STAGE PLANS:
             Reduce Vectorization:
                 enabled: true
                 enableConditionsMet: hive.vectorized.execution.reduce.enabled 
IS true, hive.execution.engine spark IN [tez, spark] IS true
-                notVectorizedReason: Tez doesn't use tagging
+                notVectorizedReason: Tagging not supported
                 vectorized: false
             Reduce Operator Tree:
               Join Operator

Reply via email to