HIVE-11303: Getting Tez LimitExceededException after dag execution on large 
query (Jason Dere, reviewed by Vikram Dixit)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/04d54f61
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/04d54f61
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/04d54f61

Branch: refs/heads/master
Commit: 04d54f61c9f56906160936751e772080c079498c
Parents: 9904162
Author: Jason Dere <jd...@hortonworks.com>
Authored: Tue Jul 21 14:03:12 2015 -0700
Committer: Jason Dere <jd...@hortonworks.com>
Committed: Tue Jul 21 14:03:12 2015 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |   3 +
 .../test/resources/testconfiguration.properties |   4 +
 .../apache/hadoop/hive/ql/exec/JoinUtil.java    |  87 +-
 .../hadoop/hive/ql/exec/MapJoinOperator.java    |   2 +-
 .../apache/hadoop/hive/ql/exec/Operator.java    |   6 +
 .../hive/ql/exec/tez/KeyValuesAdapter.java      |  47 ++
 .../hive/ql/exec/tez/KeyValuesFromKeyValue.java |  90 +++
 .../ql/exec/tez/KeyValuesFromKeyValues.java     |  48 ++
 .../hive/ql/exec/tez/ReduceRecordProcessor.java |  11 +-
 .../hive/ql/exec/tez/ReduceRecordSource.java    |  12 +-
 .../ql/exec/vector/VectorMapJoinOperator.java   |   1 -
 .../mapjoin/VectorMapJoinCommonOperator.java    |   1 +
 .../hive/ql/optimizer/ConvertJoinMapJoin.java   | 213 +++--
 .../hive/ql/optimizer/MapJoinProcessor.java     |  44 +-
 .../ql/optimizer/ReduceSinkMapJoinProc.java     |  84 +-
 .../hadoop/hive/ql/parse/GenTezProcContext.java |  12 +
 .../hadoop/hive/ql/parse/GenTezUtils.java       |  23 +-
 .../apache/hadoop/hive/ql/parse/GenTezWork.java |  81 +-
 .../apache/hadoop/hive/ql/plan/BaseWork.java    |   2 +-
 .../hive/ql/plan/CommonMergeJoinDesc.java       |   4 +
 .../hadoop/hive/ql/plan/ExprNodeDescUtils.java  | 115 +++
 .../apache/hadoop/hive/ql/plan/MapJoinDesc.java |  11 +
 .../apache/hadoop/hive/ql/plan/ReduceWork.java  |   2 +-
 .../clientpositive/tez_dynpart_hashjoin_1.q     | 101 +++
 .../clientpositive/tez_dynpart_hashjoin_2.q     |  83 ++
 .../tez_vector_dynpart_hashjoin_1.q             | 102 +++
 .../tez_vector_dynpart_hashjoin_2.q             |  84 ++
 .../tez/tez_dynpart_hashjoin_1.q.out            | 791 ++++++++++++++++++
 .../tez/tez_dynpart_hashjoin_2.q.out            | 564 +++++++++++++
 .../tez/tez_vector_dynpart_hashjoin_1.q.out     | 804 +++++++++++++++++++
 .../tez/tez_vector_dynpart_hashjoin_2.q.out     | 570 +++++++++++++
 31 files changed, 3899 insertions(+), 103 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/04d54f61/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java 
b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 39477d6..33b67dd 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -1046,6 +1046,9 @@ public class HiveConf extends Configuration {
         "job, process those skewed keys. The same key need not be skewed for 
all the tables, and so,\n" +
         "the follow-up map-reduce job (for the skewed keys) would be much 
faster, since it would be a\n" +
         "map-join."),
+    HIVEDYNAMICPARTITIONHASHJOIN("hive.optimize.dynamic.partition.hashjoin", 
false,
+        "Whether to enable dynamically partitioned hash join optimization. \n" 
+
+        "This setting is also dependent on enabling hive.auto.convert.join"),
     HIVECONVERTJOIN("hive.auto.convert.join", true,
         "Whether Hive enables the optimization about converting common join 
into mapjoin based on the input file size"),
     
HIVECONVERTJOINNOCONDITIONALTASK("hive.auto.convert.join.noconditionaltask", 
true,

http://git-wip-us.apache.org/repos/asf/hive/blob/04d54f61/itests/src/test/resources/testconfiguration.properties
----------------------------------------------------------------------
diff --git a/itests/src/test/resources/testconfiguration.properties 
b/itests/src/test/resources/testconfiguration.properties
index 97715fc..fbde465 100644
--- a/itests/src/test/resources/testconfiguration.properties
+++ b/itests/src/test/resources/testconfiguration.properties
@@ -334,6 +334,10 @@ minitez.query.files=bucket_map_join_tez1.q,\
   tez_dml.q,\
   tez_fsstat.q,\
   tez_insert_overwrite_local_directory_1.q,\
+  tez_dynpart_hashjoin_1.q,\
+  tez_dynpart_hashjoin_2.q,\
+  tez_vector_dynpart_hashjoin_1.q,\
+  tez_vector_dynpart_hashjoin_2.q,\
   tez_join_hash.q,\
   tez_join_result_complex.q,\
   tez_join_tests.q,\

http://git-wip-us.apache.org/repos/asf/hive/blob/04d54f61/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinUtil.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinUtil.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinUtil.java
index 7b57550..0aaa51a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinUtil.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinUtil.java
@@ -40,6 +40,7 @@ import 
org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
 import 
org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
 import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
 import org.apache.hadoop.mapred.Reporter;
@@ -65,8 +66,21 @@ public class JoinUtil {
 
     int iterate = Math.min(exprEntries.length, inputObjInspector.length);
     for (byte alias = 0; alias < iterate; alias++) {
+      ObjectInspector inputOI = inputObjInspector[alias];
+
+      // For vectorized reduce-side operators getting inputs from a reduce 
sink,
+      // the row object inspector will get a flattened version of the object 
inspector
+      // where the nested key/value structs are replaced with a single struct:
+      // Example: { key: { reducesinkkey0:int }, value: { _col0:int, 
_col1:int, .. } }
+      // Would get converted to the following for a vectorized input:
+      //   { 'key.reducesinkkey0':int, 'value._col0':int, 'value._col1':int, 
.. }
+      // The ExprNodeEvaluator initialzation below gets broken with the 
flattened
+      // object inpsectors, so convert it back to the a form that contains the
+      // nested key/value structs.
+      inputOI = unflattenObjInspector(inputOI);
+
       if (alias == (byte) posBigTableAlias ||
-          exprEntries[alias] == null || inputObjInspector[alias] == null) {
+          exprEntries[alias] == null || inputOI == null) {
         // skip the driver and directly loadable tables
         continue;
       }
@@ -74,7 +88,7 @@ public class JoinUtil {
       List<ExprNodeEvaluator> exprList = exprEntries[alias];
       List<ObjectInspector> fieldOIList = new ArrayList<ObjectInspector>();
       for (int i = 0; i < exprList.size(); i++) {
-        fieldOIList.add(exprList.get(i).initialize(inputObjInspector[alias]));
+        fieldOIList.add(exprList.get(i).initialize(inputOI));
       }
       result[alias] = fieldOIList;
     }
@@ -350,4 +364,73 @@ public class JoinUtil {
     rc.setTableDesc(tblDesc);
     return rc;
   }
+
+  private static String KEY_FIELD_PREFIX = (Utilities.ReduceField.KEY + 
".").toLowerCase();
+  private static String VALUE_FIELD_PREFIX = (Utilities.ReduceField.VALUE + 
".").toLowerCase();
+
+  /**
+   * Create a new struct object inspector for the list of struct fields, first 
removing the
+   * prefix from the field name.
+   * @param fields
+   * @param prefixToRemove
+   * @return
+   */
+  private static ObjectInspector createStructFromFields(List<StructField> 
fields, String prefixToRemove) {
+    int prefixLength = prefixToRemove.length() + 1; // also remove the '.' 
after the prefix
+    ArrayList<String> fieldNames = new ArrayList<String>();
+    ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();
+    for (StructField field : fields) {
+      fieldNames.add(field.getFieldName().substring(prefixLength));
+      fieldOIs.add(field.getFieldObjectInspector());
+    }
+    return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, 
fieldOIs);
+  }
+
+  /**
+   * Checks the input object inspector to see if it is in for form of a 
flattened struct
+   * like the ones generated by a vectorized reduce sink input:
+   *   { 'key.reducesinkkey0':int, 'value._col0':int, 'value._col1':int, .. }
+   * If so, then it creates an "unflattened" struct that contains nested 
key/value
+   * structs:
+   *   { key: { reducesinkkey0:int }, value: { _col0:int, _col1:int, .. } }
+   *
+   * @param oi
+   * @return unflattened object inspector if unflattening is needed,
+   *         otherwise the original object inspector
+   */
+  private static ObjectInspector unflattenObjInspector(ObjectInspector oi) {
+    if (oi instanceof StructObjectInspector) {
+      // Check if all fields start with "key." or "value."
+      // If so, then unflatten by adding an additional level of nested key and 
value structs
+      // Example: { "key.reducesinkkey0":int, "key.reducesinkkey1": int, 
"value._col6":int }
+      // Becomes
+      //   { "key": { "reducesinkkey0":int, "reducesinkkey1":int }, "value": { 
"_col6":int } }
+      ArrayList<StructField> keyFields = new ArrayList<StructField>();
+      ArrayList<StructField> valueFields = new ArrayList<StructField>();
+      for (StructField field : ((StructObjectInspector) 
oi).getAllStructFieldRefs()) {
+        String fieldNameLower = field.getFieldName().toLowerCase();
+        if (fieldNameLower.startsWith(KEY_FIELD_PREFIX)) {
+          keyFields.add(field);
+        } else if (fieldNameLower.startsWith(VALUE_FIELD_PREFIX)) {
+          valueFields.add(field);
+        } else {
+          // Not a flattened struct, no need to unflatten
+          return oi;
+        }
+      }
+
+      // All field names are of the form "key." or "value."
+      // Create key/value structs and add the respective fields to each one
+      ArrayList<ObjectInspector> reduceFieldOIs = new 
ArrayList<ObjectInspector>();
+      reduceFieldOIs.add(createStructFromFields(keyFields, 
Utilities.ReduceField.KEY.toString()));
+      reduceFieldOIs.add(createStructFromFields(valueFields, 
Utilities.ReduceField.VALUE.toString()));
+
+      // Finally create the outer struct to contain the key, value structs
+      return ObjectInspectorFactory.getStandardStructObjectInspector(
+          Utilities.reduceFieldNameList,
+          reduceFieldOIs);
+    }
+
+    return oi;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/04d54f61/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
index a40f0a9..1b9d7ef 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
@@ -154,7 +154,7 @@ public class MapJoinOperator extends 
AbstractMapJoinOperator<MapJoinDesc> implem
     final ExecMapperContext mapContext = getExecContext();
     final MapredContext mrContext = MapredContext.get();
 
-    if (!conf.isBucketMapJoin()) {
+    if (!conf.isBucketMapJoin() && !conf.isDynamicPartitionHashJoin()) {
       /*
        * The issue with caching in case of bucket map join is that different 
tasks
        * process different buckets and if the container is reused to join a 
different bucket,

http://git-wip-us.apache.org/repos/asf/hive/blob/04d54f61/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
index d7f1b42..0f02737 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
@@ -1354,4 +1354,10 @@ public abstract class Operator<T extends OperatorDesc> 
implements Serializable,C
       return childOperators;
     }
   }
+
+  public void removeParents() {
+    for (Operator<?> parent : new 
ArrayList<Operator<?>>(getParentOperators())) {
+      removeParent(parent);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/04d54f61/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KeyValuesAdapter.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KeyValuesAdapter.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KeyValuesAdapter.java
new file mode 100644
index 0000000..8f706fe
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KeyValuesAdapter.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.tez;
+
+import java.io.IOException;
+
+/**
+ * Key-values interface for the Reader used by ReduceRecordSource
+ */
+public interface KeyValuesAdapter {
+  /**
+   * Get the key for current record
+   * @return
+   * @throws IOException
+   */
+  Object getCurrentKey() throws IOException;
+
+  /**
+   * Get the values for the current record
+   * @return
+   * @throws IOException
+   */
+  Iterable<Object> getCurrentValues() throws IOException;
+
+  /**
+   * Move to the next record
+   * @return true if successful, false if there are no more records to process
+   * @throws IOException
+   */
+  boolean next() throws IOException;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/04d54f61/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KeyValuesFromKeyValue.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KeyValuesFromKeyValue.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KeyValuesFromKeyValue.java
new file mode 100644
index 0000000..51cdeca
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KeyValuesFromKeyValue.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.tez;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import org.apache.tez.runtime.library.api.KeyValueReader;
+
+/**
+ * Provides a key/values (note the plural values) interface out of a 
KeyValueReader,
+ * needed by ReduceRecordSource when reading input from a key/value source.
+ */
+public class KeyValuesFromKeyValue implements KeyValuesAdapter {
+  protected KeyValueReader reader;
+  protected ValueIterator<Object> valueIterator =
+      new ValueIterator<Object>();
+
+  private static class ValueIterator<T> implements Iterator<T>, Iterable<T> {
+
+    protected boolean hasNextValue = false;
+    protected T value = null;
+
+    @Override
+    public boolean hasNext() {
+      return hasNextValue;
+    }
+
+    @Override
+    public T next() {
+      if (!hasNextValue) {
+        throw new NoSuchElementException();
+      }
+      hasNextValue = false;
+      return value;
+    }
+
+    void reset(T value) {
+      this.value = value;
+      hasNextValue = true;
+    }
+
+    @Override
+    public void remove() {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Iterator<T> iterator() {
+      return this;
+    }
+  }
+
+  public KeyValuesFromKeyValue(KeyValueReader reader) {
+    this.reader = reader;
+  }
+
+  @Override
+  public Object getCurrentKey() throws IOException {
+    return reader.getCurrentKey();
+  }
+
+  @Override
+  public Iterable<Object> getCurrentValues() throws IOException {
+    Object obj = reader.getCurrentValue();
+    valueIterator.reset(obj);
+    return valueIterator;
+  }
+
+  @Override
+  public boolean next() throws IOException {
+    return reader.next();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/04d54f61/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KeyValuesFromKeyValues.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KeyValuesFromKeyValues.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KeyValuesFromKeyValues.java
new file mode 100644
index 0000000..b027bce
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KeyValuesFromKeyValues.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.tez;
+
+import java.io.IOException;
+import org.apache.tez.runtime.library.api.KeyValuesReader;
+
+/**
+ * Provides a key/values interface out of a KeyValuesReader for use by 
ReduceRecordSource.
+ */
+public class KeyValuesFromKeyValues implements KeyValuesAdapter {
+  protected KeyValuesReader reader;
+
+  public KeyValuesFromKeyValues(KeyValuesReader reader) {
+    this.reader = reader;
+  }
+
+  @Override
+  public Object getCurrentKey() throws IOException {
+    return reader.getCurrentKey();
+  }
+
+  @Override
+  public Iterable<Object> getCurrentValues() throws IOException {
+    return reader.getCurrentValues();
+  }
+
+  @Override
+  public boolean next() throws IOException {
+    return reader.next();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/04d54f61/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
index 545d7c6..d649672 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
@@ -50,6 +50,7 @@ import org.apache.tez.runtime.api.Input;
 import org.apache.tez.runtime.api.LogicalInput;
 import org.apache.tez.runtime.api.LogicalOutput;
 import org.apache.tez.runtime.api.ProcessorContext;
+import org.apache.tez.runtime.api.Reader;
 import org.apache.tez.runtime.library.api.KeyValuesReader;
 
 /**
@@ -129,10 +130,11 @@ public class ReduceRecordProcessor  extends 
RecordProcessor{
         tagToReducerMap.put(mergeReduceWork.getTag(), mergeReduceWork);
       }
 
-      bigTablePosition = (byte) reduceWork.getTag();
       ((TezContext) MapredContext.get()).setDummyOpsMap(connectOps);
     }
 
+    bigTablePosition = (byte) reduceWork.getTag();
+
     ObjectInspector[] mainWorkOIs = null;
     ((TezContext) MapredContext.get()).setInputs(inputs);
     ((TezContext) 
MapredContext.get()).setTezProcessorContext(processorContext);
@@ -227,10 +229,13 @@ public class ReduceRecordProcessor  extends 
RecordProcessor{
     reducer.setParentOperators(null); // clear out any parents as reducer is 
the root
 
     TableDesc keyTableDesc = redWork.getKeyDesc();
-    KeyValuesReader reader = (KeyValuesReader) 
inputs.get(inputName).getReader();
+    Reader reader = inputs.get(inputName).getReader();
 
     sources[tag] = new ReduceRecordSource();
-    sources[tag].init(jconf, redWork.getReducer(), redWork.getVectorMode(), 
keyTableDesc,
+    // Only the big table input source should be vectorized (if applicable)
+    // Note this behavior may have to change if we ever implement a vectorized 
merge join
+    boolean vectorizedRecordSource = (tag == bigTablePosition) && 
redWork.getVectorMode();
+    sources[tag].init(jconf, redWork.getReducer(), vectorizedRecordSource, 
keyTableDesc,
         valueTableDesc, reader, tag == bigTablePosition, (byte) tag,
         redWork.getVectorScratchColumnTypeMap());
     ois[tag] = sources[tag].getObjectInspector();

http://git-wip-us.apache.org/repos/asf/hive/blob/04d54f61/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java
index 20f6dba..89f7572 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java
@@ -57,6 +57,8 @@ import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.tez.runtime.api.Reader;
+import org.apache.tez.runtime.library.api.KeyValueReader;
 import org.apache.tez.runtime.library.api.KeyValuesReader;
 
 /**
@@ -107,7 +109,7 @@ public class ReduceRecordSource implements RecordSource {
   /* this is only used in the error code path */
   private List<VectorExpressionWriter> valueStringWriters;
 
-  private KeyValuesReader reader;
+  private KeyValuesAdapter reader;
 
   private boolean handleGroupKey;
 
@@ -120,7 +122,7 @@ public class ReduceRecordSource implements RecordSource {
   private final GroupIterator groupIterator = new GroupIterator();
 
   void init(JobConf jconf, Operator<?> reducer, boolean vectorized, TableDesc 
keyTableDesc,
-      TableDesc valueTableDesc, KeyValuesReader reader, boolean 
handleGroupKey, byte tag,
+      TableDesc valueTableDesc, Reader reader, boolean handleGroupKey, byte 
tag,
       Map<Integer, String> vectorScratchColumnTypeMap)
       throws Exception {
 
@@ -129,7 +131,11 @@ public class ReduceRecordSource implements RecordSource {
     this.reducer = reducer;
     this.vectorized = vectorized;
     this.keyTableDesc = keyTableDesc;
-    this.reader = reader;
+    if (reader instanceof KeyValueReader) {
+      this.reader = new KeyValuesFromKeyValue((KeyValueReader) reader);
+    } else {
+      this.reader = new KeyValuesFromKeyValues((KeyValuesReader) reader);
+    }
     this.handleGroupKey = handleGroupKey;
     this.tag = tag;
 

http://git-wip-us.apache.org/repos/asf/hive/blob/04d54f61/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java
index e9bd44a..9bd811c 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java
@@ -100,7 +100,6 @@ public class VectorMapJoinOperator extends 
VectorMapJoinBaseOperator {
 
   @Override
   public Collection<Future<?>> initializeOp(Configuration hconf) throws 
HiveException {
-
     // Use a final variable to properly parameterize the 
processVectorInspector closure.
     // Using a member variable in the closure will not do the right thing...
     final int parameterizePosBigTable = conf.getPosBigTable();

http://git-wip-us.apache.org/repos/asf/hive/blob/04d54f61/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java
index 4c8c4b1..87ebcf2 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java
@@ -551,6 +551,7 @@ public abstract class VectorMapJoinCommonOperator extends 
MapJoinOperator implem
 
   @Override
   protected Collection<Future<?>> initializeOp(Configuration hconf) throws 
HiveException {
+
     Collection<Future<?>> result = super.initializeOp(hconf);
 
     if (LOG.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/hive/blob/04d54f61/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java 
b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
index 5a87bd6..e3acdfc 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
@@ -93,9 +93,7 @@ public class ConvertJoinMapJoin implements NodeProcessor {
       if (retval == null) {
         return retval;
       } else {
-        int pos = 0; // it doesn't matter which position we use in this case.
-        convertJoinSMBJoin(joinOp, context, pos, 0, false);
-        return null;
+        fallbackToReduceSideJoin(joinOp, context);
       }
     }
 
@@ -103,27 +101,8 @@ public class ConvertJoinMapJoin implements NodeProcessor {
     // exact number of buckets. Else choose the largest number of estimated
     // reducers from the parent operators.
     int numBuckets = -1;
-    int estimatedBuckets = -1;
     if 
(context.conf.getBoolVar(HiveConf.ConfVars.HIVE_CONVERT_JOIN_BUCKET_MAPJOIN_TEZ))
 {
-      for (Operator<? extends OperatorDesc>parentOp : 
joinOp.getParentOperators()) {
-        if (parentOp.getOpTraits().getNumBuckets() > 0) {
-          numBuckets = (numBuckets < parentOp.getOpTraits().getNumBuckets()) ?
-              parentOp.getOpTraits().getNumBuckets() : numBuckets;
-        }
-
-        if (parentOp instanceof ReduceSinkOperator) {
-          ReduceSinkOperator rs = (ReduceSinkOperator) parentOp;
-          estimatedBuckets = (estimatedBuckets < 
rs.getConf().getNumReducers()) ?
-              rs.getConf().getNumReducers() : estimatedBuckets;
-        }
-      }
-
-      if (numBuckets <= 0) {
-        numBuckets = estimatedBuckets;
-        if (numBuckets <= 0) {
-          numBuckets = 1;
-        }
-      }
+      numBuckets = estimateNumBuckets(joinOp, true);
     } else {
       numBuckets = 1;
     }
@@ -136,7 +115,7 @@ public class ConvertJoinMapJoin implements NodeProcessor {
       } else {
         // only case is full outer join with SMB enabled which is not 
possible. Convert to regular
         // join.
-        convertJoinSMBJoin(joinOp, context, 0, 0, false);
+        fallbackToReduceSideJoin(joinOp, context);
         return null;
       }
     }
@@ -155,20 +134,18 @@ public class ConvertJoinMapJoin implements NodeProcessor {
     if (mapJoinConversionPos < 0) {
       // we are just converting to a common merge join operator. The shuffle
       // join in map-reduce case.
-      int pos = 0; // it doesn't matter which position we use in this case.
-      convertJoinSMBJoin(joinOp, context, pos, 0, false);
+      fallbackToReduceSideJoin(joinOp, context);
       return null;
     }
 
-    MapJoinOperator mapJoinOp = convertJoinMapJoin(joinOp, context, 
mapJoinConversionPos);
+    MapJoinOperator mapJoinOp = convertJoinMapJoin(joinOp, context, 
mapJoinConversionPos, true);
     // map join operator by default has no bucket cols and num of reduce sinks
     // reduced by 1
-    mapJoinOp
-.setOpTraits(new OpTraits(null, -1, null));
+    mapJoinOp.setOpTraits(new OpTraits(null, -1, null));
     mapJoinOp.setStatistics(joinOp.getStatistics());
     // propagate this change till the next RS
     for (Operator<? extends OperatorDesc> childOp : 
mapJoinOp.getChildOperators()) {
-      setAllChildrenTraitsToNull(childOp);
+      setAllChildrenTraits(childOp, mapJoinOp.getOpTraits());
     }
 
     return null;
@@ -180,7 +157,7 @@ public class ConvertJoinMapJoin implements NodeProcessor {
     // we cannot convert to bucket map join, we cannot convert to
     // map join either based on the size. Check if we can convert to SMB join.
     if (context.conf.getBoolVar(HiveConf.ConfVars.HIVE_AUTO_SORTMERGE_JOIN) == 
false) {
-      convertJoinSMBJoin(joinOp, context, 0, 0, false);
+      fallbackToReduceSideJoin(joinOp, context);
       return null;
     }
     Class<? extends BigTableSelectorForAutoSMJ> bigTableMatcherClass = null;
@@ -209,8 +186,7 @@ public class ConvertJoinMapJoin implements NodeProcessor {
       // contains aliases from sub-query
       // we are just converting to a common merge join operator. The shuffle
       // join in map-reduce case.
-      int pos = 0; // it doesn't matter which position we use in this case.
-      convertJoinSMBJoin(joinOp, context, pos, 0, false);
+      fallbackToReduceSideJoin(joinOp, context);
       return null;
     }
 
@@ -220,8 +196,7 @@ public class ConvertJoinMapJoin implements NodeProcessor {
     } else {
       // we are just converting to a common merge join operator. The shuffle
       // join in map-reduce case.
-      int pos = 0; // it doesn't matter which position we use in this case.
-      convertJoinSMBJoin(joinOp, context, pos, 0, false);
+      fallbackToReduceSideJoin(joinOp, context);
     }
     return null;
   }
@@ -317,16 +292,16 @@ public class ConvertJoinMapJoin implements NodeProcessor {
     mergeJoinOp.cloneOriginalParentsList(mergeJoinOp.getParentOperators());
   }
 
-  private void setAllChildrenTraitsToNull(Operator<? extends OperatorDesc> 
currentOp) {
+  private void setAllChildrenTraits(Operator<? extends OperatorDesc> 
currentOp, OpTraits opTraits) {
     if (currentOp instanceof ReduceSinkOperator) {
       return;
     }
-    currentOp.setOpTraits(new OpTraits(null, -1, null));
+    currentOp.setOpTraits(new OpTraits(opTraits.getBucketColNames(), 
opTraits.getNumBuckets(), opTraits.getSortCols()));
     for (Operator<? extends OperatorDesc> childOp : 
currentOp.getChildOperators()) {
       if ((childOp instanceof ReduceSinkOperator) || (childOp instanceof 
GroupByOperator)) {
         break;
       }
-      setAllChildrenTraitsToNull(childOp);
+      setAllChildrenTraits(childOp, opTraits);
     }
   }
 
@@ -338,7 +313,7 @@ public class ConvertJoinMapJoin implements NodeProcessor {
       return false;
     }
 
-    MapJoinOperator mapJoinOp = convertJoinMapJoin(joinOp, context, 
bigTablePosition);
+    MapJoinOperator mapJoinOp = convertJoinMapJoin(joinOp, context, 
bigTablePosition, true);
     MapJoinDesc joinDesc = mapJoinOp.getConf();
     joinDesc.setBucketMapJoin(true);
 
@@ -633,7 +608,7 @@ public class ConvertJoinMapJoin implements NodeProcessor {
    */
 
   public MapJoinOperator convertJoinMapJoin(JoinOperator joinOp, 
OptimizeTezProcContext context,
-      int bigTablePosition) throws SemanticException {
+      int bigTablePosition, boolean removeReduceSink) throws SemanticException 
{
     // bail on mux operator because currently the mux operator masks the emit 
keys
     // of the constituent reduce sinks.
     for (Operator<? extends OperatorDesc> parentOp : 
joinOp.getParentOperators()) {
@@ -646,45 +621,49 @@ public class ConvertJoinMapJoin implements NodeProcessor {
     MapJoinOperator mapJoinOp =
         MapJoinProcessor.convertJoinOpMapJoinOp(context.conf, joinOp,
             joinOp.getConf().isLeftInputJoin(), joinOp.getConf().getBaseSrc(),
-            joinOp.getConf().getMapAliases(), bigTablePosition, true);
+            joinOp.getConf().getMapAliases(), bigTablePosition, true, 
removeReduceSink);
     mapJoinOp.getConf().setHybridHashJoin(HiveConf.getBoolVar(context.conf,
-      HiveConf.ConfVars.HIVEUSEHYBRIDGRACEHASHJOIN));
+        HiveConf.ConfVars.HIVEUSEHYBRIDGRACEHASHJOIN));
 
     Operator<? extends OperatorDesc> parentBigTableOp =
         mapJoinOp.getParentOperators().get(bigTablePosition);
     if (parentBigTableOp instanceof ReduceSinkOperator) {
-      for (Operator<?> p : parentBigTableOp.getParentOperators()) {
-        // we might have generated a dynamic partition operator chain. Since
-        // we're removing the reduce sink we need do remove that too.
-        Set<Operator<?>> dynamicPartitionOperators = new 
HashSet<Operator<?>>();
-        Map<Operator<?>, AppMasterEventOperator> opEventPairs = new 
HashMap<>();
-        for (Operator<?> c : p.getChildOperators()) {
-          AppMasterEventOperator event = findDynamicPartitionBroadcast(c);
-          if (event != null) {
-            dynamicPartitionOperators.add(c);
-            opEventPairs.put(c, event);
+      if (removeReduceSink) {
+        for (Operator<?> p : parentBigTableOp.getParentOperators()) {
+          // we might have generated a dynamic partition operator chain. Since
+          // we're removing the reduce sink we need do remove that too.
+          Set<Operator<?>> dynamicPartitionOperators = new 
HashSet<Operator<?>>();
+          Map<Operator<?>, AppMasterEventOperator> opEventPairs = new 
HashMap<>();
+          for (Operator<?> c : p.getChildOperators()) {
+            AppMasterEventOperator event = findDynamicPartitionBroadcast(c);
+            if (event != null) {
+              dynamicPartitionOperators.add(c);
+              opEventPairs.put(c, event);
+            }
           }
-        }
-        for (Operator<?> c : dynamicPartitionOperators) {
-          if (context.pruningOpsRemovedByPriorOpt.isEmpty() ||
-              
!context.pruningOpsRemovedByPriorOpt.contains(opEventPairs.get(c))) {
-            p.removeChild(c);
-            // at this point we've found the fork in the op pipeline that has 
the pruning as a child plan.
-            LOG.info("Disabling dynamic pruning for: "
-                + ((DynamicPruningEventDesc) 
opEventPairs.get(c).getConf()).getTableScan().getName()
-                + ". Need to be removed together with reduce sink");
+          for (Operator<?> c : dynamicPartitionOperators) {
+            if (context.pruningOpsRemovedByPriorOpt.isEmpty() ||
+                
!context.pruningOpsRemovedByPriorOpt.contains(opEventPairs.get(c))) {
+              p.removeChild(c);
+              // at this point we've found the fork in the op pipeline that 
has the pruning as a child plan.
+              LOG.info("Disabling dynamic pruning for: "
+                  + ((DynamicPruningEventDesc) 
opEventPairs.get(c).getConf()).getTableScan().getName()
+                  + ". Need to be removed together with reduce sink");
+            }
+          }
+          for (Operator<?> op : dynamicPartitionOperators) {
+            context.pruningOpsRemovedByPriorOpt.add(opEventPairs.get(op));
           }
         }
-        for (Operator<?> op : dynamicPartitionOperators) {
-          context.pruningOpsRemovedByPriorOpt.add(opEventPairs.get(op));
+
+        mapJoinOp.getParentOperators().remove(bigTablePosition);
+        if 
(!(mapJoinOp.getParentOperators().contains(parentBigTableOp.getParentOperators().get(0))))
 {
+          mapJoinOp.getParentOperators().add(bigTablePosition,
+              parentBigTableOp.getParentOperators().get(0));
         }
+        
parentBigTableOp.getParentOperators().get(0).removeChild(parentBigTableOp);
       }
-      mapJoinOp.getParentOperators().remove(bigTablePosition);
-      if 
(!(mapJoinOp.getParentOperators().contains(parentBigTableOp.getParentOperators().get(0))))
 {
-        mapJoinOp.getParentOperators().add(bigTablePosition,
-            parentBigTableOp.getParentOperators().get(0));
-      }
-      
parentBigTableOp.getParentOperators().get(0).removeChild(parentBigTableOp);
+
       for (Operator<? extends OperatorDesc>op : 
mapJoinOp.getParentOperators()) {
         if (!(op.getChildOperators().contains(mapJoinOp))) {
           op.getChildOperators().add(mapJoinOp);
@@ -720,4 +699,100 @@ public class ConvertJoinMapJoin implements NodeProcessor {
 
     return null;
   }
+
+  /**
+   * Estimate the number of buckets in the join, using the parent operators' 
OpTraits and/or
+   * parent operators' number of reducers
+   * @param joinOp
+   * @param useOpTraits  Whether OpTraits should be used for the estimate.
+   * @return
+   */
+  private static int estimateNumBuckets(JoinOperator joinOp, boolean 
useOpTraits) {
+    int numBuckets = -1;
+    int estimatedBuckets = -1;
+
+    for (Operator<? extends OperatorDesc>parentOp : 
joinOp.getParentOperators()) {
+      if (parentOp.getOpTraits().getNumBuckets() > 0) {
+        numBuckets = (numBuckets < parentOp.getOpTraits().getNumBuckets()) ?
+            parentOp.getOpTraits().getNumBuckets() : numBuckets;
+      }
+
+      if (parentOp instanceof ReduceSinkOperator) {
+        ReduceSinkOperator rs = (ReduceSinkOperator) parentOp;
+        estimatedBuckets = (estimatedBuckets < rs.getConf().getNumReducers()) ?
+            rs.getConf().getNumReducers() : estimatedBuckets;
+      }
+    }
+
+    if (!useOpTraits) {
+      // Ignore the value we got from OpTraits.
+      // The logic below will fall back to the estimate from numReducers
+      numBuckets = -1;
+    }
+
+    if (numBuckets <= 0) {
+      numBuckets = estimatedBuckets;
+      if (numBuckets <= 0) {
+        numBuckets = 1;
+      }
+    }
+
+    return numBuckets;
+  }
+
+  private boolean convertJoinDynamicPartitionedHashJoin(JoinOperator joinOp, 
OptimizeTezProcContext context)
+    throws SemanticException {
+    // Attempt dynamic partitioned hash join
+    // Since we don't have big table index yet, must start with estimate of 
numReducers
+    int numReducers = estimateNumBuckets(joinOp, false);
+    LOG.info("Try dynamic partitioned hash join with estimated " + numReducers 
+ " reducers");
+    int bigTablePos = getMapJoinConversionPos(joinOp, context, numReducers);
+    if (bigTablePos >= 0) {
+      // Now that we have the big table index, get real numReducers value 
based on big table RS
+      ReduceSinkOperator bigTableParentRS =
+          (ReduceSinkOperator) (joinOp.getParentOperators().get(bigTablePos));
+      numReducers = bigTableParentRS.getConf().getNumReducers();
+      LOG.debug("Real big table reducers = " + numReducers);
+
+      MapJoinOperator mapJoinOp = convertJoinMapJoin(joinOp, context, 
bigTablePos, false);
+      if (mapJoinOp != null) {
+        LOG.info("Selected dynamic partitioned hash join");
+        mapJoinOp.getConf().setDynamicPartitionHashJoin(true);
+        // Set OpTraits for dynamically partitioned hash join:
+        // bucketColNames: Re-use previous joinOp's bucketColNames. Parent 
operators should be
+        //   reduce sink, which should have bucket columns based on the join 
keys.
+        // numBuckets: set to number of reducers
+        // sortCols: This is an unsorted join - no sort cols
+        OpTraits opTraits = new OpTraits(
+            joinOp.getOpTraits().getBucketColNames(),
+            numReducers,
+            null);
+        mapJoinOp.setOpTraits(opTraits);
+        mapJoinOp.setStatistics(joinOp.getStatistics());
+        // propagate this change till the next RS
+        for (Operator<? extends OperatorDesc> childOp : 
mapJoinOp.getChildOperators()) {
+          setAllChildrenTraits(childOp, mapJoinOp.getOpTraits());
+        }
+        return true;
+      }
+    }
+
+    return false;
+  }
+
+  private void fallbackToReduceSideJoin(JoinOperator joinOp, 
OptimizeTezProcContext context)
+      throws SemanticException {
+    if (context.conf.getBoolVar(HiveConf.ConfVars.HIVECONVERTJOIN) &&
+        
context.conf.getBoolVar(HiveConf.ConfVars.HIVEDYNAMICPARTITIONHASHJOIN)) {
+      if (convertJoinDynamicPartitionedHashJoin(joinOp, context)) {
+        return;
+      }
+    }
+
+    // we are just converting to a common merge join operator. The shuffle
+    // join in map-reduce case.
+    int pos = 0; // it doesn't matter which position we use in this case.
+    LOG.info("Fallback to common merge join operator");
+    convertJoinSMBJoin(joinOp, context, pos, 0, false);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/04d54f61/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java 
b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java
index 4d84f0f..f8f2b7b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java
@@ -29,6 +29,8 @@ import java.util.Map;
 import java.util.Set;
 import java.util.Stack;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.ObjectPair;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -57,6 +59,7 @@ import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
 import org.apache.hadoop.hive.ql.lib.Rule;
 import org.apache.hadoop.hive.ql.lib.RuleRegExp;
 import org.apache.hadoop.hive.ql.parse.GenMapRedWalker;
+import org.apache.hadoop.hive.ql.parse.OptimizeTezProcContext;
 import org.apache.hadoop.hive.ql.parse.ParseContext;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
@@ -89,6 +92,7 @@ public class MapJoinProcessor implements Transform {
   // (column type + column name). The column name is not really used anywhere, 
but it
   // needs to be passed. Use the string defined below for that.
   private static final String MAPJOINKEY_FIELDPREFIX = "mapjoinkey";
+  private static final Log LOG = 
LogFactory.getLog(MapJoinProcessor.class.getName());
 
   public MapJoinProcessor() {
   }
@@ -356,11 +360,18 @@ public class MapJoinProcessor implements Transform {
   public static MapJoinOperator convertJoinOpMapJoinOp(HiveConf hconf,
       JoinOperator op, boolean leftInputJoin, String[] baseSrc, List<String> 
mapAliases,
       int mapJoinPos, boolean noCheckOuterJoin) throws SemanticException {
+    return convertJoinOpMapJoinOp(hconf, op, leftInputJoin, baseSrc, 
mapAliases,
+        mapJoinPos, noCheckOuterJoin, true);
+  }
+
+  public static MapJoinOperator convertJoinOpMapJoinOp(HiveConf hconf,
+      JoinOperator op, boolean leftInputJoin, String[] baseSrc, List<String> 
mapAliases,
+      int mapJoinPos, boolean noCheckOuterJoin, boolean adjustParentsChildren)
+          throws SemanticException {
 
     MapJoinDesc mapJoinDescriptor =
         getMapJoinDesc(hconf, op, leftInputJoin, baseSrc, mapAliases,
-                mapJoinPos, noCheckOuterJoin);
-
+            mapJoinPos, noCheckOuterJoin, adjustParentsChildren);
     // reduce sink row resolver used to generate map join op
     RowSchema outputRS = op.getSchema();
 
@@ -1025,7 +1036,7 @@ public class MapJoinProcessor implements Transform {
 
   public static MapJoinDesc getMapJoinDesc(HiveConf hconf,
       JoinOperator op, boolean leftInputJoin, String[] baseSrc, List<String> 
mapAliases,
-      int mapJoinPos, boolean noCheckOuterJoin) throws SemanticException {
+      int mapJoinPos, boolean noCheckOuterJoin, boolean adjustParentsChildren) 
throws SemanticException {
     JoinDesc desc = op.getConf();
     JoinCondDesc[] condns = desc.getConds();
     Byte[] tagOrder = desc.getTagOrder();
@@ -1072,6 +1083,26 @@ public class MapJoinProcessor implements Transform {
     // get the join keys from old parent ReduceSink operators
     Map<Byte, List<ExprNodeDesc>> keyExprMap = pair.getSecond();
 
+    if (!adjustParentsChildren) {
+      // Since we did not remove reduce sink parents, keep the original value 
expressions
+      newValueExprs = valueExprs;
+
+      // Join key exprs are represented in terms of the original table columns,
+      // we need to convert these to the generated column names we can see in 
the Join operator
+      Map<Byte, List<ExprNodeDesc>> newKeyExprMap = new HashMap<Byte, 
List<ExprNodeDesc>>();
+      for (Map.Entry<Byte, List<ExprNodeDesc>> mapEntry : 
keyExprMap.entrySet()) {
+        Byte pos = mapEntry.getKey();
+        ReduceSinkOperator rsParent = 
oldReduceSinkParentOps.get(pos.byteValue());
+        List<ExprNodeDesc> keyExprList =
+            ExprNodeDescUtils.resolveJoinKeysAsRSColumns(mapEntry.getValue(), 
rsParent);
+        if (keyExprList == null) {
+          throw new SemanticException("Error resolving join keys");
+        }
+        newKeyExprMap.put(pos, keyExprList);
+      }
+      keyExprMap = newKeyExprMap;
+    }
+
     // construct valueTableDescs and valueFilteredTableDescs
     List<TableDesc> valueTableDescs = new ArrayList<TableDesc>();
     List<TableDesc> valueFilteredTableDescs = new ArrayList<TableDesc>();
@@ -1163,4 +1194,11 @@ public class MapJoinProcessor implements Transform {
 
     return mapJoinDescriptor;
   }
+
+  public static MapJoinDesc getMapJoinDesc(HiveConf hconf,
+      JoinOperator op, boolean leftInputJoin, String[] baseSrc, List<String> 
mapAliases,
+      int mapJoinPos, boolean noCheckOuterJoin) throws SemanticException {
+    return getMapJoinDesc(hconf, op, leftInputJoin, baseSrc,
+        mapAliases, mapJoinPos, noCheckOuterJoin, true);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/04d54f61/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java 
b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java
index bca91dd..b546838 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java
@@ -21,12 +21,15 @@ package org.apache.hadoop.hive.ql.optimizer;
 import java.util.ArrayList;
 import java.util.EnumSet;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.Stack;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator;
 import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
@@ -58,11 +61,13 @@ import org.apache.hadoop.hive.ql.plan.TezWork;
 import org.apache.hadoop.hive.ql.plan.TezWork.VertexType;
 import org.apache.hadoop.hive.ql.stats.StatsUtils;
 
+import com.google.common.collect.Sets;
+
 import static 
org.apache.hadoop.hive.ql.plan.ReduceSinkDesc.ReducerTraits.FIXED;
 
 public class ReduceSinkMapJoinProc implements NodeProcessor {
 
-  protected transient Log LOG = LogFactory.getLog(this.getClass().getName());
+  private final static Log LOG = 
LogFactory.getLog(ReduceSinkMapJoinProc.class.getName());
 
   /* (non-Javadoc)
    * This processor addresses the RS-MJ case that occurs in tez on the 
small/hash
@@ -79,7 +84,40 @@ public class ReduceSinkMapJoinProc implements NodeProcessor {
     GenTezProcContext context = (GenTezProcContext) procContext;
     MapJoinOperator mapJoinOp = (MapJoinOperator)nd;
 
-    if (stack.size() < 2 || !(stack.get(stack.size() - 2) instanceof 
ReduceSinkOperator)) {
+    // remember the original parent list before we start modifying it.
+    if (!context.mapJoinParentMap.containsKey(mapJoinOp)) {
+      List<Operator<?>> parents = new 
ArrayList<Operator<?>>(mapJoinOp.getParentOperators());
+      context.mapJoinParentMap.put(mapJoinOp, parents);
+    }
+
+    boolean isBigTable = stack.size() < 2
+        || !(stack.get(stack.size() - 2) instanceof ReduceSinkOperator);
+
+    ReduceSinkOperator parentRS = null;
+    if (!isBigTable) {
+      parentRS = (ReduceSinkOperator)stack.get(stack.size() - 2);
+
+      // For dynamic partitioned hash join, the big table will also be coming 
from a ReduceSinkOperator
+      // Check for this condition.
+      // TODO: use indexOf(), or parentRS.getTag()?
+      isBigTable =
+          (mapJoinOp.getParentOperators().indexOf(parentRS) == 
mapJoinOp.getConf().getPosBigTable());
+    }
+
+    if (mapJoinOp.getConf().isDynamicPartitionHashJoin() &&
+        
!context.mapJoinToUnprocessedSmallTableReduceSinks.containsKey(mapJoinOp)) {
+      // Initialize set of unprocessed small tables
+      Set<ReduceSinkOperator> rsSet = Sets.newIdentityHashSet();
+      for (int pos = 0; pos < mapJoinOp.getParentOperators().size(); ++pos) {
+        if (pos == mapJoinOp.getConf().getPosBigTable()) {
+          continue;
+        }
+        rsSet.add((ReduceSinkOperator) 
mapJoinOp.getParentOperators().get(pos));
+      }
+      context.mapJoinToUnprocessedSmallTableReduceSinks.put(mapJoinOp, rsSet);
+    }
+
+    if (isBigTable) {
       context.currentMapJoinOperators.add(mapJoinOp);
       return null;
     }
@@ -87,14 +125,29 @@ public class ReduceSinkMapJoinProc implements 
NodeProcessor {
     context.preceedingWork = null;
     context.currentRootOperator = null;
 
-    ReduceSinkOperator parentRS = (ReduceSinkOperator)stack.get(stack.size() - 
2);
+    return processReduceSinkToHashJoin(parentRS, mapJoinOp, context);
+  }
+
+  public static BaseWork getMapJoinParentWork(GenTezProcContext context, 
Operator<?> parentRS) {
+    BaseWork parentWork;
+    if (context.unionWorkMap.containsKey(parentRS)) {
+      parentWork = context.unionWorkMap.get(parentRS);
+    } else {
+      assert context.childToWorkMap.get(parentRS).size() == 1;
+      parentWork = context.childToWorkMap.get(parentRS).get(0);
+    }
+    return parentWork;
+  }
+
+  public static Object processReduceSinkToHashJoin(ReduceSinkOperator 
parentRS, MapJoinOperator mapJoinOp,
+      GenTezProcContext context) throws SemanticException {
     // remove the tag for in-memory side of mapjoin
     parentRS.getConf().setSkipTag(true);
     parentRS.setSkipTag(true);
-    // remember the original parent list before we start modifying it.
-    if (!context.mapJoinParentMap.containsKey(mapJoinOp)) {
-      List<Operator<?>> parents = new 
ArrayList<Operator<?>>(mapJoinOp.getParentOperators());
-      context.mapJoinParentMap.put(mapJoinOp, parents);
+
+    // Mark this small table as being processed
+    if (mapJoinOp.getConf().isDynamicPartitionHashJoin()) {
+      
context.mapJoinToUnprocessedSmallTableReduceSinks.get(mapJoinOp).remove(parentRS);
     }
 
     List<BaseWork> mapJoinWork = null;
@@ -109,13 +162,7 @@ public class ReduceSinkMapJoinProc implements 
NodeProcessor {
      *
      */
     mapJoinWork = context.mapJoinWorkMap.get(mapJoinOp);
-    BaseWork parentWork;
-    if (context.unionWorkMap.containsKey(parentRS)) {
-      parentWork = context.unionWorkMap.get(parentRS);
-    } else {
-      assert context.childToWorkMap.get(parentRS).size() == 1;
-      parentWork = context.childToWorkMap.get(parentRS).get(0);
-    }
+    BaseWork parentWork = getMapJoinParentWork(context, parentRS);
 
     // set the link between mapjoin and parent vertex
     int pos = context.mapJoinParentMap.get(mapJoinOp).indexOf(parentRS);
@@ -161,6 +208,11 @@ public class ReduceSinkMapJoinProc implements 
NodeProcessor {
           keyCount /= bucketCount;
           tableSize /= bucketCount;
         }
+      } else if (joinConf.isDynamicPartitionHashJoin()) {
+        // For dynamic partitioned hash join, assuming table is split evenly 
among the reduce tasks.
+        bucketCount = parentRS.getConf().getNumReducers();
+        keyCount /= bucketCount;
+        tableSize /= bucketCount;
       }
     }
     LOG.info("Mapjoin " + mapJoinOp + ", pos: " + pos + " --> " + 
parentWork.getName() + " ("
@@ -218,6 +270,8 @@ public class ReduceSinkMapJoinProc implements NodeProcessor 
{
           edgeType = EdgeType.CUSTOM_SIMPLE_EDGE;
         }
       }
+    } else if (mapJoinOp.getConf().isDynamicPartitionHashJoin()) {
+      edgeType = EdgeType.CUSTOM_SIMPLE_EDGE;
     }
     TezEdgeProperty edgeProp = new TezEdgeProperty(null, edgeType, numBuckets);
 
@@ -232,7 +286,7 @@ public class ReduceSinkMapJoinProc implements NodeProcessor 
{
         }
 
         ReduceSinkOperator r = null;
-        if (parentRS.getConf().getOutputName() != null) {
+        if (context.connectedReduceSinks.contains(parentRS)) {
           LOG.debug("Cloning reduce sink for multi-child broadcast edge");
           // we've already set this one up. Need to clone for the next work.
           r = (ReduceSinkOperator) OperatorFactory.getAndMakeChild(

http://git-wip-us.apache.org/repos/asf/hive/blob/04d54f61/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java
index f474eae..9334c73 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hive.ql.parse;
 
 import java.io.Serializable;
+import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
 import java.util.LinkedList;
@@ -107,6 +108,10 @@ public class GenTezProcContext implements NodeProcessorCtx{
   // map that says which mapjoin belongs to which work item
   public final Map<MapJoinOperator, List<BaseWork>> mapJoinWorkMap;
 
+  // Mapping of reducesink to mapjoin operators
+  // Only used for dynamic partitioned hash joins (mapjoin operator in the 
reducer)
+  public final Map<Operator<?>, MapJoinOperator> smallTableParentToMapJoinMap;
+
   // a map to keep track of which root generated which work
   public final Map<Operator<?>, BaseWork> rootToWorkMap;
 
@@ -151,6 +156,11 @@ public class GenTezProcContext implements NodeProcessorCtx{
   // remember the connections between ts and event
   public final Map<TableScanOperator, List<AppMasterEventOperator>> 
tsToEventMap;
 
+  // When processing dynamic partitioned hash joins, some of the small tables 
may not get processed
+  // before the mapjoin's parents are removed during GenTezWork.process(). 
This is to keep
+  // track of which small tables haven't been processed yet.
+  public Map<MapJoinOperator, Set<ReduceSinkOperator>> 
mapJoinToUnprocessedSmallTableReduceSinks;
+
   @SuppressWarnings("unchecked")
   public GenTezProcContext(HiveConf conf, ParseContext parseContext,
       List<Task<MoveWork>> moveTask, List<Task<? extends Serializable>> 
rootTasks,
@@ -167,6 +177,7 @@ public class GenTezProcContext implements NodeProcessorCtx{
     this.leafOperatorToFollowingWork = new LinkedHashMap<Operator<?>, 
BaseWork>();
     this.linkOpWithWorkMap = new LinkedHashMap<Operator<?>, Map<BaseWork, 
TezEdgeProperty>>();
     this.linkWorkWithReduceSinkMap = new LinkedHashMap<BaseWork, 
List<ReduceSinkOperator>>();
+    this.smallTableParentToMapJoinMap = new LinkedHashMap<Operator<?>, 
MapJoinOperator>();
     this.mapJoinWorkMap = new LinkedHashMap<MapJoinOperator, List<BaseWork>>();
     this.rootToWorkMap = new LinkedHashMap<Operator<?>, BaseWork>();
     this.childToWorkMap = new LinkedHashMap<Operator<?>, List<BaseWork>>();
@@ -188,6 +199,7 @@ public class GenTezProcContext implements NodeProcessorCtx{
     this.tsToEventMap = new LinkedHashMap<TableScanOperator, 
List<AppMasterEventOperator>>();
     this.opMergeJoinWorkMap = new LinkedHashMap<Operator<?>, MergeJoinWork>();
     this.currentMergeJoinOperator = null;
+    this.mapJoinToUnprocessedSmallTableReduceSinks = new 
HashMap<MapJoinOperator, Set<ReduceSinkOperator>>();
 
     rootTasks.add(currentTask);
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/04d54f61/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
index 93ad145..a9d1f8e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.hive.ql.exec.AppMasterEventOperator;
 import org.apache.hadoop.hive.ql.exec.FetchTask;
 import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
 import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator;
+import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.OperatorUtils;
 import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
@@ -130,12 +131,13 @@ public class GenTezUtils {
     tezWork.add(reduceWork);
 
     TezEdgeProperty edgeProp;
+    EdgeType edgeType = determineEdgeType(context.preceedingWork, reduceWork);
     if (reduceWork.isAutoReduceParallelism()) {
       edgeProp =
-          new TezEdgeProperty(context.conf, EdgeType.SIMPLE_EDGE, true,
+          new TezEdgeProperty(context.conf, edgeType, true,
               reduceWork.getMinReduceTasks(), reduceWork.getMaxReduceTasks(), 
bytesPerReducer);
     } else {
-      edgeProp = new TezEdgeProperty(EdgeType.SIMPLE_EDGE);
+      edgeProp = new TezEdgeProperty(edgeType);
     }
 
     tezWork.connect(
@@ -470,4 +472,21 @@ public class GenTezUtils {
 
     curr.removeChild(child);
   }
+
+  public static EdgeType determineEdgeType(BaseWork preceedingWork, BaseWork 
followingWork) {
+    if (followingWork instanceof ReduceWork) {
+      // Ideally there should be a better way to determine that the 
followingWork contains
+      // a dynamic partitioned hash join, but in some cases 
(createReduceWork()) it looks like
+      // the work must be created/connected first, before the 
GenTezProcContext can be updated
+      // with the mapjoin/work relationship.
+      ReduceWork reduceWork = (ReduceWork) followingWork;
+      if (reduceWork.getReducer() instanceof MapJoinOperator) {
+        MapJoinOperator joinOp = (MapJoinOperator) reduceWork.getReducer();
+        if (joinOp.getConf().isDynamicPartitionHashJoin()) {
+          return EdgeType.CUSTOM_SIMPLE_EDGE;
+        }
+      }
+    }
+    return EdgeType.SIMPLE_EDGE;
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/04d54f61/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java
index 6b3e19d..c4e0413 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.hive.ql.lib.Node;
 import org.apache.hadoop.hive.ql.lib.NodeProcessor;
 import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
 import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils;
+import org.apache.hadoop.hive.ql.optimizer.ReduceSinkMapJoinProc;
 import org.apache.hadoop.hive.ql.plan.BaseWork;
 import org.apache.hadoop.hive.ql.plan.MergeJoinWork;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
@@ -165,8 +166,11 @@ public class GenTezWork implements NodeProcessor {
       mergeJoinWork.addMergedWork(work, null, 
context.leafOperatorToFollowingWork);
       Operator<? extends OperatorDesc> parentOp =
           getParentFromStack(context.currentMergeJoinOperator, stack);
+      // Set the big table position. Both the reduce work and merge join 
operator
+      // should be set with the same value.
       int pos = context.currentMergeJoinOperator.getTagForOperator(parentOp);
       work.setTag(pos);
+      context.currentMergeJoinOperator.getConf().setBigTablePosition(pos);
       tezWork.setVertexType(work, VertexType.MULTI_INPUT_UNINITIALIZED_EDGES);
       for (BaseWork parentWork : tezWork.getParents(work)) {
         TezEdgeProperty edgeProp = tezWork.getEdgeProperty(parentWork, work);
@@ -190,6 +194,50 @@ public class GenTezWork implements NodeProcessor {
     // remember which mapjoin operator links with which work
     if (!context.currentMapJoinOperators.isEmpty()) {
       for (MapJoinOperator mj: context.currentMapJoinOperators) {
+        // For dynamic partitioned hash join, ReduceSinkMapJoinProc rule may 
not get run for all
+        // of the ReduceSink parents, because the parents of the MapJoin 
operator get
+        // removed later on in this method. Keep track of the parent to 
mapjoin mapping
+        // so we can later run the same logic that is run in 
ReduceSinkMapJoinProc.
+        if (mj.getConf().isDynamicPartitionHashJoin()) {
+          // Since this is a dynamic partitioned hash join, the work for this 
join should be a ReduceWork
+          ReduceWork reduceWork = (ReduceWork) work;
+          int bigTablePosition = mj.getConf().getPosBigTable();
+          reduceWork.setTag(bigTablePosition);
+
+          // Use context.mapJoinParentMap to get the original RS parents, 
because
+          // the MapJoin's parents may have been replaced by dummy operator.
+          List<Operator<?>> mapJoinOriginalParents = 
context.mapJoinParentMap.get(mj);
+          if (mapJoinOriginalParents == null) {
+            throw new SemanticException("Unexpected error - 
context.mapJoinParentMap did not have an entry for " + mj);
+          }
+          for (int pos = 0; pos < mapJoinOriginalParents.size(); ++pos) {
+            // This processing only needs to happen for the small tables
+            if (pos == bigTablePosition) {
+              continue;
+            }
+            Operator<?> parentOp = mapJoinOriginalParents.get(pos);
+            context.smallTableParentToMapJoinMap.put(parentOp, mj);
+
+            ReduceSinkOperator parentRS = (ReduceSinkOperator) parentOp;
+
+            // TableDesc needed for dynamic partitioned hash join
+            GenMapRedUtils.setKeyAndValueDesc(reduceWork, parentRS);
+
+            // For small table RS parents that have already been processed, we 
need to
+            // add the tag to the RS work to the reduce work that contains 
this map join.
+            // This was not being done for normal mapjoins, where the small 
table typically
+            // has its ReduceSink parent removed.
+            if 
(!context.mapJoinToUnprocessedSmallTableReduceSinks.get(mj).contains(parentRS)) 
{
+              // This reduce sink has been processed already, so the work for 
the parentRS exists
+              BaseWork parentWork = 
ReduceSinkMapJoinProc.getMapJoinParentWork(context, parentRS);
+              int tag = parentRS.getConf().getTag();
+              tag = (tag == -1 ? 0 : tag);
+              reduceWork.getTagToInput().put(tag, parentWork.getName());
+            }
+
+          }
+        }
+
         LOG.debug("Processing map join: " + mj);
         // remember the mapping in case we scan another branch of the
         // mapjoin later
@@ -369,15 +417,44 @@ public class GenTezWork implements NodeProcessor {
         // remember the output name of the reduce sink
         rs.getConf().setOutputName(rWork.getName());
 
+        // For dynamic partitioned hash join, run the ReduceSinkMapJoinProc 
logic for any
+        // ReduceSink parents that we missed.
+        MapJoinOperator mj = context.smallTableParentToMapJoinMap.get(rs);
+        if (mj != null) {
+          // Only need to run the logic for tables we missed
+          if 
(context.mapJoinToUnprocessedSmallTableReduceSinks.get(mj).contains(rs)) {
+            // ReduceSinkMapJoinProc logic does not work unless the ReduceSink 
is connected as
+            // a parent of the MapJoin, but at this point we have already 
removed all of the
+            // parents from the MapJoin.
+            // Try temporarily adding the RS as a parent
+            ArrayList<Operator<?>> tempMJParents = new 
ArrayList<Operator<?>>();
+            tempMJParents.add(rs);
+            mj.setParentOperators(tempMJParents);
+            // ReduceSink also needs MapJoin as child
+            List<Operator<?>> rsChildren = rs.getChildOperators();
+            rsChildren.add(mj);
+
+            // Since the MapJoin has had all of its other parents removed at 
this point,
+            // it would be bad here if processReduceSinkToHashJoin() tries to 
do anything
+            // with the RS parent based on its position in the list of parents.
+            ReduceSinkMapJoinProc.processReduceSinkToHashJoin(rs, mj, context);
+
+            // Remove any parents from MapJoin again
+            mj.removeParents();
+            // TODO: do we also need to remove the MapJoin from the list of 
RS's children?
+          }
+        }
+
         if (!context.connectedReduceSinks.contains(rs)) {
           // add dependency between the two work items
           TezEdgeProperty edgeProp;
+          EdgeType edgeType = utils.determineEdgeType(work, followingWork);
           if (rWork.isAutoReduceParallelism()) {
             edgeProp =
-                new TezEdgeProperty(context.conf, EdgeType.SIMPLE_EDGE, true,
+                new TezEdgeProperty(context.conf, edgeType, true,
                     rWork.getMinReduceTasks(), rWork.getMaxReduceTasks(), 
bytesPerReducer);
           } else {
-            edgeProp = new TezEdgeProperty(EdgeType.SIMPLE_EDGE);
+            edgeProp = new TezEdgeProperty(edgeType);
           }
           tezWork.connect(work, followingWork, edgeProp);
           context.connectedReduceSinks.add(rs);

http://git-wip-us.apache.org/repos/asf/hive/blob/04d54f61/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java 
b/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java
index fa697ef..d574c5c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java
@@ -47,7 +47,7 @@ public abstract class BaseWork extends AbstractOperatorDesc {
   // Their function is mainly as root ops to give the mapjoin the correct
   // schema info.
   List<HashTableDummyOperator> dummyOps;
-  int tag;
+  int tag = 0;
   private final List<String> sortColNames = new ArrayList<String>();
 
   private MapredLocalWork mrLocalWork;

http://git-wip-us.apache.org/repos/asf/hive/blob/04d54f61/ql/src/java/org/apache/hadoop/hive/ql/plan/CommonMergeJoinDesc.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/plan/CommonMergeJoinDesc.java 
b/ql/src/java/org/apache/hadoop/hive/ql/plan/CommonMergeJoinDesc.java
index f9c34cb..cce9bc4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/CommonMergeJoinDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/CommonMergeJoinDesc.java
@@ -45,4 +45,8 @@ public class CommonMergeJoinDesc extends MapJoinDesc 
implements Serializable {
   public int getBigTablePosition() {
     return mapJoinConversionPos;
   }
+
+  public void setBigTablePosition(int pos) {
+    mapJoinConversionPos = pos;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/04d54f61/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDescUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDescUtils.java 
b/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDescUtils.java
index fb3c4a3..e291a48 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDescUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDescUtils.java
@@ -28,6 +28,7 @@ import 
org.apache.hadoop.hive.ql.exec.ExprNodeEvaluatorFactory;
 import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.UDF;
+import org.apache.hadoop.hive.ql.optimizer.ConstantPropagateProcFactory;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBridge;
@@ -278,6 +279,59 @@ public class ExprNodeDescUtils {
     throw new SemanticException("Met multiple parent operators");
   }
 
+  public static List<ExprNodeDesc> 
resolveJoinKeysAsRSColumns(List<ExprNodeDesc> sourceList,
+      Operator<?> reduceSinkOp) {
+    ArrayList<ExprNodeDesc> result = new 
ArrayList<ExprNodeDesc>(sourceList.size());
+    for (ExprNodeDesc source : sourceList) {
+      ExprNodeDesc newExpr = resolveJoinKeysAsRSColumns(source, reduceSinkOp);
+      if (newExpr == null) {
+        return null;
+      }
+      result.add(newExpr);
+    }
+    return result;
+  }
+
+  /**
+   * Join keys are expressions based on the select operator. Resolve the 
expressions so they
+   * are based on the ReduceSink operator
+   *   SEL -> RS -> JOIN
+   * @param source
+   * @param reduceSinkOp
+   * @return
+   */
+  public static ExprNodeDesc resolveJoinKeysAsRSColumns(ExprNodeDesc source, 
Operator<?> reduceSinkOp) {
+    // Assuming this is only being done for join keys. As a result we 
shouldn't have to recursively
+    // check any nested child expressions, because the result of the 
expression should exist as an
+    // output column of the ReduceSink operator
+    if (source == null) {
+      return null;
+    }
+
+    // columnExprMap has the reverse of what we need - a mapping of the 
internal column names
+    // to the ExprNodeDesc from the previous operation.
+    // Find the key/value where the ExprNodeDesc value matches the column we 
are searching for.
+    // The key portion of the entry will be the internal column name for the 
join key expression.
+    for (Map.Entry<String, ExprNodeDesc> mapEntry : 
reduceSinkOp.getColumnExprMap().entrySet()) {
+      if (mapEntry.getValue().isSame(source)) {
+        String columnInternalName = mapEntry.getKey();
+        if (source instanceof ExprNodeColumnDesc) {
+          // The join key is a table column. Create the ExprNodeDesc based on 
this column.
+          ColumnInfo columnInfo = 
reduceSinkOp.getSchema().getColumnInfo(columnInternalName);
+          return new ExprNodeColumnDesc(columnInfo);
+        } else {
+          // Join key expression is likely some expression involving 
functions/operators, so there
+          // is no actual table column for this. But the ReduceSink operator 
should still have an
+          // output column corresponding to this expression, using the 
columnInternalName.
+          // TODO: does tableAlias matter for this kind of expression?
+          return new ExprNodeColumnDesc(source.getTypeInfo(), 
columnInternalName, "", false);
+        }
+      }
+    }
+
+    return null;  // Couldn't find reference to expression
+  }
+
   public static ExprNodeDesc[] extractComparePair(ExprNodeDesc expr1, 
ExprNodeDesc expr2) {
     expr1 = extractConstant(expr1);
     expr2 = extractConstant(expr2);
@@ -483,4 +537,65 @@ public class ExprNodeDescUtils {
 
     return exprColLst;
   }  
+
+  public static List<ExprNodeDesc> flattenExprList(List<ExprNodeDesc> 
sourceList) {
+    ArrayList<ExprNodeDesc> result = new 
ArrayList<ExprNodeDesc>(sourceList.size());
+    for (ExprNodeDesc source : sourceList) {
+      result.add(flattenExpr(source));
+    }
+    return result;
+  }
+
+  /**
+   * A normal reduce operator's rowObjectInspector looks like a struct 
containing
+   *  nested key/value structs that contain the column values:
+   *  { key: { reducesinkkey0:int }, value: { _col0:int, _col1:int, .. } }
+   *
+   * While the rowObjectInspector looks the same for vectorized queries during
+   * compilation time, within the tasks at query execution the 
rowObjectInspector
+   * has changed to a flatter structure without nested key/value structs:
+   *  { 'key.reducesinkkey0':int, 'value._col0':int, 'value._col1':int, .. }
+   *
+   * Trying to fetch 'key.reducesinkkey0' by name from the list of flattened
+   * ObjectInspectors does not work because the '.' gets interpreted as a 
field member,
+   * even though it is a flattened list of column values.
+   * This workaround converts the column name referenced in the ExprNodeDesc
+   * from a nested field name (key.reducesinkkey0) to key_reducesinkkey0,
+   * simply by replacing '.' with '_'.
+   * @param source
+   * @return
+   */
+  public static ExprNodeDesc flattenExpr(ExprNodeDesc source) {
+    if (source instanceof ExprNodeGenericFuncDesc) {
+      // all children expression should be resolved
+      ExprNodeGenericFuncDesc function = (ExprNodeGenericFuncDesc) 
source.clone();
+      List<ExprNodeDesc> newChildren = flattenExprList(function.getChildren());
+      for (ExprNodeDesc newChild : newChildren) {
+        if (newChild == null) {
+          // Could not resolve all of the function children, fail
+          return null;
+        }
+      }
+      function.setChildren(newChildren);
+      return function;
+    }
+    if (source instanceof ExprNodeColumnDesc) {
+      ExprNodeColumnDesc column = (ExprNodeColumnDesc) source;
+      // Create a new ColumnInfo, replacing STRUCT.COLUMN with STRUCT_COLUMN
+      String newColumn = column.getColumn().replace('.', '_');
+      return new ExprNodeColumnDesc(source.getTypeInfo(), newColumn, 
column.getTabAlias(), false);
+    }
+    if (source instanceof ExprNodeFieldDesc) {
+      // field expression should be resolved
+      ExprNodeFieldDesc field = (ExprNodeFieldDesc) source.clone();
+      ExprNodeDesc fieldDesc = flattenExpr(field.getDesc());
+      if (fieldDesc == null) {
+        return null;
+      }
+      field.setDesc(fieldDesc);
+      return field;
+    }
+    // constant or null expr, just return
+    return source;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/04d54f61/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java 
b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java
index cee9100..e27b89b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java
@@ -29,6 +29,8 @@ import java.util.Map.Entry;
 import java.util.Set;
 import org.apache.hadoop.hive.ql.plan.Explain.Level;
 
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+
 /**
  * Map Join operator Descriptor implementation.
  *
@@ -71,6 +73,7 @@ public class MapJoinDesc extends JoinDesc implements 
Serializable {
   protected boolean genJoinKeys = true;
 
   private boolean isHybridHashJoin;
+  private boolean isDynamicPartitionHashJoin = false;
 
   // Extra parameters only for vectorization.
   private VectorMapJoinDesc vectorDesc;
@@ -369,4 +372,12 @@ public class MapJoinDesc extends JoinDesc implements 
Serializable {
   public boolean getGenJoinKeys() {
     return genJoinKeys;
   }
+
+  public boolean isDynamicPartitionHashJoin() {
+    return isDynamicPartitionHashJoin;
+  }
+
+  public void setDynamicPartitionHashJoin(boolean isDistributedHashJoin) {
+    this.isDynamicPartitionHashJoin = isDistributedHashJoin;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/04d54f61/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java 
b/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java
index a78a92e..020d6de 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java
@@ -136,7 +136,7 @@ public class ReduceWork extends BaseWork {
       return null;
     }
     if (valueObjectInspector == null) {
-      valueObjectInspector = getObjectInspector(tagToValueDesc.get(0));
+      valueObjectInspector = getObjectInspector(tagToValueDesc.get(tag));
     }
     return valueObjectInspector;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/04d54f61/ql/src/test/queries/clientpositive/tez_dynpart_hashjoin_1.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/tez_dynpart_hashjoin_1.q 
b/ql/src/test/queries/clientpositive/tez_dynpart_hashjoin_1.q
new file mode 100644
index 0000000..e3325c4
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/tez_dynpart_hashjoin_1.q
@@ -0,0 +1,101 @@
+
+set hive.explain.user=false;
+set hive.auto.convert.join=false;
+set hive.optimize.dynamic.partition.hashjoin=false;
+
+-- First try with regular mergejoin
+explain
+select
+  *
+from alltypesorc a join alltypesorc b on a.cint = b.cint
+where
+  a.cint between 1000000 and 3000000 and b.cbigint is not null
+order by a.cint;
+
+select
+  *
+from alltypesorc a join alltypesorc b on a.cint = b.cint
+where
+  a.cint between 1000000 and 3000000 and b.cbigint is not null
+order by a.cint;
+
+explain
+select
+  count(*)
+from alltypesorc a join alltypesorc b on a.cint = b.cint
+where
+  a.cint between 1000000 and 3000000 and b.cbigint is not null;
+
+select
+  count(*)
+from alltypesorc a join alltypesorc b on a.cint = b.cint
+where
+  a.cint between 1000000 and 3000000 and b.cbigint is not null;
+
+explain
+select
+  a.csmallint, count(*) c1
+from alltypesorc a join alltypesorc b on a.cint = b.cint
+where
+  a.cint between 1000000 and 3000000 and b.cbigint is not null
+group by a.csmallint
+order by c1;
+
+select
+  a.csmallint, count(*) c1
+from alltypesorc a join alltypesorc b on a.cint = b.cint
+where
+  a.cint between 1000000 and 3000000 and b.cbigint is not null
+group by a.csmallint
+order by c1;
+
+set hive.auto.convert.join=true;
+set hive.optimize.dynamic.partition.hashjoin=true;
+set hive.auto.convert.join.noconditionaltask.size=200000;
+set hive.exec.reducers.bytes.per.reducer=200000;
+
+-- Try with dynamically partitioned hashjoin
+explain
+select
+  *
+from alltypesorc a join alltypesorc b on a.cint = b.cint
+where
+  a.cint between 1000000 and 3000000 and b.cbigint is not null
+order by a.cint;
+
+select
+  *
+from alltypesorc a join alltypesorc b on a.cint = b.cint
+where
+  a.cint between 1000000 and 3000000 and b.cbigint is not null
+order by a.cint;
+
+explain
+select
+  count(*)
+from alltypesorc a join alltypesorc b on a.cint = b.cint
+where
+  a.cint between 1000000 and 3000000 and b.cbigint is not null;
+
+select
+  count(*)
+from alltypesorc a join alltypesorc b on a.cint = b.cint
+where
+  a.cint between 1000000 and 3000000 and b.cbigint is not null;
+
+explain
+select
+  a.csmallint, count(*) c1
+from alltypesorc a join alltypesorc b on a.cint = b.cint
+where
+  a.cint between 1000000 and 3000000 and b.cbigint is not null
+group by a.csmallint
+order by c1;
+
+select
+  a.csmallint, count(*) c1
+from alltypesorc a join alltypesorc b on a.cint = b.cint
+where
+  a.cint between 1000000 and 3000000 and b.cbigint is not null
+group by a.csmallint
+order by c1;

http://git-wip-us.apache.org/repos/asf/hive/blob/04d54f61/ql/src/test/queries/clientpositive/tez_dynpart_hashjoin_2.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/tez_dynpart_hashjoin_2.q 
b/ql/src/test/queries/clientpositive/tez_dynpart_hashjoin_2.q
new file mode 100644
index 0000000..af4e2b8
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/tez_dynpart_hashjoin_2.q
@@ -0,0 +1,83 @@
+
+set hive.explain.user=false;
+set hive.auto.convert.join=false;
+set hive.optimize.dynamic.partition.hashjoin=false;
+
+-- Multiple tables, and change the order of the big table (alltypesorc)
+-- First try with regular mergejoin
+explain
+select
+  a.*
+from
+  alltypesorc a,
+  src b,
+  src c
+where
+  a.csmallint = cast(b.key as int) and a.csmallint = (cast(c.key as int) + 0)
+  and (a.csmallint < 100)
+order by a.csmallint, a.ctinyint, a.cint;
+
+select
+  a.*
+from
+  alltypesorc a,
+  src b,
+  src c
+where
+  a.csmallint = cast(b.key as int) and a.csmallint = (cast(c.key as int) + 0)
+  and (a.csmallint < 100)
+order by a.csmallint, a.ctinyint, a.cint;
+
+set hive.auto.convert.join=true;
+set hive.optimize.dynamic.partition.hashjoin=true;
+set hive.auto.convert.join.noconditionaltask.size=2000;
+set hive.exec.reducers.bytes.per.reducer=200000;
+
+-- noconditionaltask.size needs to be low enough that entire filtered table 
results do not fit in one task's hash table
+-- Try with dynamically partitioned hash join 
+explain
+select
+  a.*
+from
+  alltypesorc a,
+  src b,
+  src c
+where
+  a.csmallint = cast(b.key as int) and a.csmallint = (cast(c.key as int) + 0)
+  and (a.csmallint < 100)
+order by a.csmallint, a.ctinyint, a.cint;
+
+select
+  a.*
+from
+  alltypesorc a,
+  src b,
+  src c
+where
+  a.csmallint = cast(b.key as int) and a.csmallint = (cast(c.key as int) + 0)
+  and (a.csmallint < 100)
+order by a.csmallint, a.ctinyint, a.cint;
+
+-- Try different order of tables
+explain
+select
+  a.*
+from
+  src b,
+  alltypesorc a,
+  src c
+where
+  a.csmallint = cast(b.key as int) and a.csmallint = (cast(c.key as int) + 0)
+  and (a.csmallint < 100)
+order by a.csmallint, a.ctinyint, a.cint;
+
+select
+  a.*
+from
+  src b,
+  alltypesorc a,
+  src c
+where
+  a.csmallint = cast(b.key as int) and a.csmallint = (cast(c.key as int) + 0)
+  and (a.csmallint < 100)
+order by a.csmallint, a.ctinyint, a.cint;

http://git-wip-us.apache.org/repos/asf/hive/blob/04d54f61/ql/src/test/queries/clientpositive/tez_vector_dynpart_hashjoin_1.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/tez_vector_dynpart_hashjoin_1.q 
b/ql/src/test/queries/clientpositive/tez_vector_dynpart_hashjoin_1.q
new file mode 100644
index 0000000..65fee16
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/tez_vector_dynpart_hashjoin_1.q
@@ -0,0 +1,102 @@
+
+set hive.explain.user=false;
+set hive.auto.convert.join=false;
+set hive.optimize.dynamic.partition.hashjoin=false;
+
+-- First try with regular mergejoin
+explain
+select
+  *
+from alltypesorc a join alltypesorc b on a.cint = b.cint
+where
+  a.cint between 1000000 and 3000000 and b.cbigint is not null
+order by a.cint;
+
+select
+  *
+from alltypesorc a join alltypesorc b on a.cint = b.cint
+where
+  a.cint between 1000000 and 3000000 and b.cbigint is not null
+order by a.cint;
+
+explain
+select
+  count(*)
+from alltypesorc a join alltypesorc b on a.cint = b.cint
+where
+  a.cint between 1000000 and 3000000 and b.cbigint is not null;
+
+select
+  count(*)
+from alltypesorc a join alltypesorc b on a.cint = b.cint
+where
+  a.cint between 1000000 and 3000000 and b.cbigint is not null;
+
+explain
+select
+  a.csmallint, count(*) c1
+from alltypesorc a join alltypesorc b on a.cint = b.cint
+where
+  a.cint between 1000000 and 3000000 and b.cbigint is not null
+group by a.csmallint
+order by c1;
+
+select
+  a.csmallint, count(*) c1
+from alltypesorc a join alltypesorc b on a.cint = b.cint
+where
+  a.cint between 1000000 and 3000000 and b.cbigint is not null
+group by a.csmallint
+order by c1;
+
+set hive.auto.convert.join=true;
+set hive.optimize.dynamic.partition.hashjoin=true;
+set hive.auto.convert.join.noconditionaltask.size=200000;
+set hive.exec.reducers.bytes.per.reducer=200000;
+set hive.vectorized.execution.enabled=true;
+
+-- Try with dynamically partitioned hashjoin
+explain
+select
+  *
+from alltypesorc a join alltypesorc b on a.cint = b.cint
+where
+  a.cint between 1000000 and 3000000 and b.cbigint is not null
+order by a.cint;
+
+select
+  *
+from alltypesorc a join alltypesorc b on a.cint = b.cint
+where
+  a.cint between 1000000 and 3000000 and b.cbigint is not null
+order by a.cint;
+
+explain
+select
+  count(*)
+from alltypesorc a join alltypesorc b on a.cint = b.cint
+where
+  a.cint between 1000000 and 3000000 and b.cbigint is not null;
+
+select
+  count(*)
+from alltypesorc a join alltypesorc b on a.cint = b.cint
+where
+  a.cint between 1000000 and 3000000 and b.cbigint is not null;
+
+explain
+select
+  a.csmallint, count(*) c1
+from alltypesorc a join alltypesorc b on a.cint = b.cint
+where
+  a.cint between 1000000 and 3000000 and b.cbigint is not null
+group by a.csmallint
+order by c1;
+
+select
+  a.csmallint, count(*) c1
+from alltypesorc a join alltypesorc b on a.cint = b.cint
+where
+  a.cint between 1000000 and 3000000 and b.cbigint is not null
+group by a.csmallint
+order by c1;

http://git-wip-us.apache.org/repos/asf/hive/blob/04d54f61/ql/src/test/queries/clientpositive/tez_vector_dynpart_hashjoin_2.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/tez_vector_dynpart_hashjoin_2.q 
b/ql/src/test/queries/clientpositive/tez_vector_dynpart_hashjoin_2.q
new file mode 100644
index 0000000..606f455
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/tez_vector_dynpart_hashjoin_2.q
@@ -0,0 +1,84 @@
+
+set hive.explain.user=false;
+set hive.auto.convert.join=false;
+set hive.optimize.dynamic.partition.hashjoin=false;
+
+-- Multiple tables, and change the order of the big table (alltypesorc)
+-- First try with regular mergejoin
+explain
+select
+  a.*
+from
+  alltypesorc a,
+  src b,
+  src c
+where
+  a.csmallint = cast(b.key as int) and a.csmallint = (cast(c.key as int) + 0)
+  and (a.csmallint < 100)
+order by a.csmallint, a.ctinyint, a.cint;
+
+select
+  a.*
+from
+  alltypesorc a,
+  src b,
+  src c
+where
+  a.csmallint = cast(b.key as int) and a.csmallint = (cast(c.key as int) + 0)
+  and (a.csmallint < 100)
+order by a.csmallint, a.ctinyint, a.cint;
+
+set hive.auto.convert.join=true;
+set hive.optimize.dynamic.partition.hashjoin=true;
+set hive.auto.convert.join.noconditionaltask.size=2000;
+set hive.exec.reducers.bytes.per.reducer=200000;
+set hive.vectorized.execution.enabled=true;
+
+-- noconditionaltask.size needs to be low enough that entire filtered table 
results do not fit in one task's hash table
+-- Try with dynamically partitioned hash join 
+explain
+select
+  a.*
+from
+  alltypesorc a,
+  src b,
+  src c
+where
+  a.csmallint = cast(b.key as int) and a.csmallint = (cast(c.key as int) + 0)
+  and (a.csmallint < 100)
+order by a.csmallint, a.ctinyint, a.cint;
+
+select
+  a.*
+from
+  alltypesorc a,
+  src b,
+  src c
+where
+  a.csmallint = cast(b.key as int) and a.csmallint = (cast(c.key as int) + 0)
+  and (a.csmallint < 100)
+order by a.csmallint, a.ctinyint, a.cint;
+
+-- Try different order of tables
+explain
+select
+  a.*
+from
+  src b,
+  alltypesorc a,
+  src c
+where
+  a.csmallint = cast(b.key as int) and a.csmallint = (cast(c.key as int) + 0)
+  and (a.csmallint < 100)
+order by a.csmallint, a.ctinyint, a.cint;
+
+select
+  a.*
+from
+  src b,
+  alltypesorc a,
+  src c
+where
+  a.csmallint = cast(b.key as int) and a.csmallint = (cast(c.key as int) + 0)
+  and (a.csmallint < 100)
+order by a.csmallint, a.ctinyint, a.cint;

Reply via email to