HIVE-11303: Getting Tez LimitExceededException after dag execution on large query (Jason Dere, reviewed by Vikram Dixit)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/04d54f61 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/04d54f61 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/04d54f61 Branch: refs/heads/master Commit: 04d54f61c9f56906160936751e772080c079498c Parents: 9904162 Author: Jason Dere <jd...@hortonworks.com> Authored: Tue Jul 21 14:03:12 2015 -0700 Committer: Jason Dere <jd...@hortonworks.com> Committed: Tue Jul 21 14:03:12 2015 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/hive/conf/HiveConf.java | 3 + .../test/resources/testconfiguration.properties | 4 + .../apache/hadoop/hive/ql/exec/JoinUtil.java | 87 +- .../hadoop/hive/ql/exec/MapJoinOperator.java | 2 +- .../apache/hadoop/hive/ql/exec/Operator.java | 6 + .../hive/ql/exec/tez/KeyValuesAdapter.java | 47 ++ .../hive/ql/exec/tez/KeyValuesFromKeyValue.java | 90 +++ .../ql/exec/tez/KeyValuesFromKeyValues.java | 48 ++ .../hive/ql/exec/tez/ReduceRecordProcessor.java | 11 +- .../hive/ql/exec/tez/ReduceRecordSource.java | 12 +- .../ql/exec/vector/VectorMapJoinOperator.java | 1 - .../mapjoin/VectorMapJoinCommonOperator.java | 1 + .../hive/ql/optimizer/ConvertJoinMapJoin.java | 213 +++-- .../hive/ql/optimizer/MapJoinProcessor.java | 44 +- .../ql/optimizer/ReduceSinkMapJoinProc.java | 84 +- .../hadoop/hive/ql/parse/GenTezProcContext.java | 12 + .../hadoop/hive/ql/parse/GenTezUtils.java | 23 +- .../apache/hadoop/hive/ql/parse/GenTezWork.java | 81 +- .../apache/hadoop/hive/ql/plan/BaseWork.java | 2 +- .../hive/ql/plan/CommonMergeJoinDesc.java | 4 + .../hadoop/hive/ql/plan/ExprNodeDescUtils.java | 115 +++ .../apache/hadoop/hive/ql/plan/MapJoinDesc.java | 11 + .../apache/hadoop/hive/ql/plan/ReduceWork.java | 2 +- .../clientpositive/tez_dynpart_hashjoin_1.q | 101 +++ .../clientpositive/tez_dynpart_hashjoin_2.q | 83 ++ .../tez_vector_dynpart_hashjoin_1.q | 102 +++ .../tez_vector_dynpart_hashjoin_2.q | 84 ++ .../tez/tez_dynpart_hashjoin_1.q.out | 791 ++++++++++++++++++ .../tez/tez_dynpart_hashjoin_2.q.out | 564 +++++++++++++ .../tez/tez_vector_dynpart_hashjoin_1.q.out | 804 +++++++++++++++++++ .../tez/tez_vector_dynpart_hashjoin_2.q.out | 570 +++++++++++++ 31 files changed, 3899 insertions(+), 103 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/04d54f61/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 39477d6..33b67dd 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1046,6 +1046,9 @@ public class HiveConf extends Configuration { "job, process those skewed keys. The same key need not be skewed for all the tables, and so,\n" + "the follow-up map-reduce job (for the skewed keys) would be much faster, since it would be a\n" + "map-join."), + HIVEDYNAMICPARTITIONHASHJOIN("hive.optimize.dynamic.partition.hashjoin", false, + "Whether to enable dynamically partitioned hash join optimization. \n" + + "This setting is also dependent on enabling hive.auto.convert.join"), HIVECONVERTJOIN("hive.auto.convert.join", true, "Whether Hive enables the optimization about converting common join into mapjoin based on the input file size"), HIVECONVERTJOINNOCONDITIONALTASK("hive.auto.convert.join.noconditionaltask", true, http://git-wip-us.apache.org/repos/asf/hive/blob/04d54f61/itests/src/test/resources/testconfiguration.properties ---------------------------------------------------------------------- diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties index 97715fc..fbde465 100644 --- a/itests/src/test/resources/testconfiguration.properties +++ b/itests/src/test/resources/testconfiguration.properties @@ -334,6 +334,10 @@ minitez.query.files=bucket_map_join_tez1.q,\ tez_dml.q,\ tez_fsstat.q,\ tez_insert_overwrite_local_directory_1.q,\ + tez_dynpart_hashjoin_1.q,\ + tez_dynpart_hashjoin_2.q,\ + tez_vector_dynpart_hashjoin_1.q,\ + tez_vector_dynpart_hashjoin_2.q,\ tez_join_hash.q,\ tez_join_result_complex.q,\ tez_join_tests.q,\ http://git-wip-us.apache.org/repos/asf/hive/blob/04d54f61/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinUtil.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinUtil.java index 7b57550..0aaa51a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinUtil.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinUtil.java @@ -40,6 +40,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption; import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.apache.hadoop.mapred.Reporter; @@ -65,8 +66,21 @@ public class JoinUtil { int iterate = Math.min(exprEntries.length, inputObjInspector.length); for (byte alias = 0; alias < iterate; alias++) { + ObjectInspector inputOI = inputObjInspector[alias]; + + // For vectorized reduce-side operators getting inputs from a reduce sink, + // the row object inspector will get a flattened version of the object inspector + // where the nested key/value structs are replaced with a single struct: + // Example: { key: { reducesinkkey0:int }, value: { _col0:int, _col1:int, .. } } + // Would get converted to the following for a vectorized input: + // { 'key.reducesinkkey0':int, 'value._col0':int, 'value._col1':int, .. } + // The ExprNodeEvaluator initialzation below gets broken with the flattened + // object inpsectors, so convert it back to the a form that contains the + // nested key/value structs. + inputOI = unflattenObjInspector(inputOI); + if (alias == (byte) posBigTableAlias || - exprEntries[alias] == null || inputObjInspector[alias] == null) { + exprEntries[alias] == null || inputOI == null) { // skip the driver and directly loadable tables continue; } @@ -74,7 +88,7 @@ public class JoinUtil { List<ExprNodeEvaluator> exprList = exprEntries[alias]; List<ObjectInspector> fieldOIList = new ArrayList<ObjectInspector>(); for (int i = 0; i < exprList.size(); i++) { - fieldOIList.add(exprList.get(i).initialize(inputObjInspector[alias])); + fieldOIList.add(exprList.get(i).initialize(inputOI)); } result[alias] = fieldOIList; } @@ -350,4 +364,73 @@ public class JoinUtil { rc.setTableDesc(tblDesc); return rc; } + + private static String KEY_FIELD_PREFIX = (Utilities.ReduceField.KEY + ".").toLowerCase(); + private static String VALUE_FIELD_PREFIX = (Utilities.ReduceField.VALUE + ".").toLowerCase(); + + /** + * Create a new struct object inspector for the list of struct fields, first removing the + * prefix from the field name. + * @param fields + * @param prefixToRemove + * @return + */ + private static ObjectInspector createStructFromFields(List<StructField> fields, String prefixToRemove) { + int prefixLength = prefixToRemove.length() + 1; // also remove the '.' after the prefix + ArrayList<String> fieldNames = new ArrayList<String>(); + ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>(); + for (StructField field : fields) { + fieldNames.add(field.getFieldName().substring(prefixLength)); + fieldOIs.add(field.getFieldObjectInspector()); + } + return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs); + } + + /** + * Checks the input object inspector to see if it is in for form of a flattened struct + * like the ones generated by a vectorized reduce sink input: + * { 'key.reducesinkkey0':int, 'value._col0':int, 'value._col1':int, .. } + * If so, then it creates an "unflattened" struct that contains nested key/value + * structs: + * { key: { reducesinkkey0:int }, value: { _col0:int, _col1:int, .. } } + * + * @param oi + * @return unflattened object inspector if unflattening is needed, + * otherwise the original object inspector + */ + private static ObjectInspector unflattenObjInspector(ObjectInspector oi) { + if (oi instanceof StructObjectInspector) { + // Check if all fields start with "key." or "value." + // If so, then unflatten by adding an additional level of nested key and value structs + // Example: { "key.reducesinkkey0":int, "key.reducesinkkey1": int, "value._col6":int } + // Becomes + // { "key": { "reducesinkkey0":int, "reducesinkkey1":int }, "value": { "_col6":int } } + ArrayList<StructField> keyFields = new ArrayList<StructField>(); + ArrayList<StructField> valueFields = new ArrayList<StructField>(); + for (StructField field : ((StructObjectInspector) oi).getAllStructFieldRefs()) { + String fieldNameLower = field.getFieldName().toLowerCase(); + if (fieldNameLower.startsWith(KEY_FIELD_PREFIX)) { + keyFields.add(field); + } else if (fieldNameLower.startsWith(VALUE_FIELD_PREFIX)) { + valueFields.add(field); + } else { + // Not a flattened struct, no need to unflatten + return oi; + } + } + + // All field names are of the form "key." or "value." + // Create key/value structs and add the respective fields to each one + ArrayList<ObjectInspector> reduceFieldOIs = new ArrayList<ObjectInspector>(); + reduceFieldOIs.add(createStructFromFields(keyFields, Utilities.ReduceField.KEY.toString())); + reduceFieldOIs.add(createStructFromFields(valueFields, Utilities.ReduceField.VALUE.toString())); + + // Finally create the outer struct to contain the key, value structs + return ObjectInspectorFactory.getStandardStructObjectInspector( + Utilities.reduceFieldNameList, + reduceFieldOIs); + } + + return oi; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/04d54f61/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java index a40f0a9..1b9d7ef 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java @@ -154,7 +154,7 @@ public class MapJoinOperator extends AbstractMapJoinOperator<MapJoinDesc> implem final ExecMapperContext mapContext = getExecContext(); final MapredContext mrContext = MapredContext.get(); - if (!conf.isBucketMapJoin()) { + if (!conf.isBucketMapJoin() && !conf.isDynamicPartitionHashJoin()) { /* * The issue with caching in case of bucket map join is that different tasks * process different buckets and if the container is reused to join a different bucket, http://git-wip-us.apache.org/repos/asf/hive/blob/04d54f61/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java index d7f1b42..0f02737 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java @@ -1354,4 +1354,10 @@ public abstract class Operator<T extends OperatorDesc> implements Serializable,C return childOperators; } } + + public void removeParents() { + for (Operator<?> parent : new ArrayList<Operator<?>>(getParentOperators())) { + removeParent(parent); + } + } } http://git-wip-us.apache.org/repos/asf/hive/blob/04d54f61/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KeyValuesAdapter.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KeyValuesAdapter.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KeyValuesAdapter.java new file mode 100644 index 0000000..8f706fe --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KeyValuesAdapter.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.tez; + +import java.io.IOException; + +/** + * Key-values interface for the Reader used by ReduceRecordSource + */ +public interface KeyValuesAdapter { + /** + * Get the key for current record + * @return + * @throws IOException + */ + Object getCurrentKey() throws IOException; + + /** + * Get the values for the current record + * @return + * @throws IOException + */ + Iterable<Object> getCurrentValues() throws IOException; + + /** + * Move to the next record + * @return true if successful, false if there are no more records to process + * @throws IOException + */ + boolean next() throws IOException; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/04d54f61/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KeyValuesFromKeyValue.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KeyValuesFromKeyValue.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KeyValuesFromKeyValue.java new file mode 100644 index 0000000..51cdeca --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KeyValuesFromKeyValue.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.tez; + +import java.io.IOException; +import java.util.Iterator; +import java.util.NoSuchElementException; +import org.apache.tez.runtime.library.api.KeyValueReader; + +/** + * Provides a key/values (note the plural values) interface out of a KeyValueReader, + * needed by ReduceRecordSource when reading input from a key/value source. + */ +public class KeyValuesFromKeyValue implements KeyValuesAdapter { + protected KeyValueReader reader; + protected ValueIterator<Object> valueIterator = + new ValueIterator<Object>(); + + private static class ValueIterator<T> implements Iterator<T>, Iterable<T> { + + protected boolean hasNextValue = false; + protected T value = null; + + @Override + public boolean hasNext() { + return hasNextValue; + } + + @Override + public T next() { + if (!hasNextValue) { + throw new NoSuchElementException(); + } + hasNextValue = false; + return value; + } + + void reset(T value) { + this.value = value; + hasNextValue = true; + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + + @Override + public Iterator<T> iterator() { + return this; + } + } + + public KeyValuesFromKeyValue(KeyValueReader reader) { + this.reader = reader; + } + + @Override + public Object getCurrentKey() throws IOException { + return reader.getCurrentKey(); + } + + @Override + public Iterable<Object> getCurrentValues() throws IOException { + Object obj = reader.getCurrentValue(); + valueIterator.reset(obj); + return valueIterator; + } + + @Override + public boolean next() throws IOException { + return reader.next(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/04d54f61/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KeyValuesFromKeyValues.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KeyValuesFromKeyValues.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KeyValuesFromKeyValues.java new file mode 100644 index 0000000..b027bce --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KeyValuesFromKeyValues.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.tez; + +import java.io.IOException; +import org.apache.tez.runtime.library.api.KeyValuesReader; + +/** + * Provides a key/values interface out of a KeyValuesReader for use by ReduceRecordSource. + */ +public class KeyValuesFromKeyValues implements KeyValuesAdapter { + protected KeyValuesReader reader; + + public KeyValuesFromKeyValues(KeyValuesReader reader) { + this.reader = reader; + } + + @Override + public Object getCurrentKey() throws IOException { + return reader.getCurrentKey(); + } + + @Override + public Iterable<Object> getCurrentValues() throws IOException { + return reader.getCurrentValues(); + } + + @Override + public boolean next() throws IOException { + return reader.next(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/04d54f61/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java index 545d7c6..d649672 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java @@ -50,6 +50,7 @@ import org.apache.tez.runtime.api.Input; import org.apache.tez.runtime.api.LogicalInput; import org.apache.tez.runtime.api.LogicalOutput; import org.apache.tez.runtime.api.ProcessorContext; +import org.apache.tez.runtime.api.Reader; import org.apache.tez.runtime.library.api.KeyValuesReader; /** @@ -129,10 +130,11 @@ public class ReduceRecordProcessor extends RecordProcessor{ tagToReducerMap.put(mergeReduceWork.getTag(), mergeReduceWork); } - bigTablePosition = (byte) reduceWork.getTag(); ((TezContext) MapredContext.get()).setDummyOpsMap(connectOps); } + bigTablePosition = (byte) reduceWork.getTag(); + ObjectInspector[] mainWorkOIs = null; ((TezContext) MapredContext.get()).setInputs(inputs); ((TezContext) MapredContext.get()).setTezProcessorContext(processorContext); @@ -227,10 +229,13 @@ public class ReduceRecordProcessor extends RecordProcessor{ reducer.setParentOperators(null); // clear out any parents as reducer is the root TableDesc keyTableDesc = redWork.getKeyDesc(); - KeyValuesReader reader = (KeyValuesReader) inputs.get(inputName).getReader(); + Reader reader = inputs.get(inputName).getReader(); sources[tag] = new ReduceRecordSource(); - sources[tag].init(jconf, redWork.getReducer(), redWork.getVectorMode(), keyTableDesc, + // Only the big table input source should be vectorized (if applicable) + // Note this behavior may have to change if we ever implement a vectorized merge join + boolean vectorizedRecordSource = (tag == bigTablePosition) && redWork.getVectorMode(); + sources[tag].init(jconf, redWork.getReducer(), vectorizedRecordSource, keyTableDesc, valueTableDesc, reader, tag == bigTablePosition, (byte) tag, redWork.getVectorScratchColumnTypeMap()); ois[tag] = sources[tag].getObjectInspector(); http://git-wip-us.apache.org/repos/asf/hive/blob/04d54f61/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java index 20f6dba..89f7572 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java @@ -57,6 +57,8 @@ import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; +import org.apache.tez.runtime.api.Reader; +import org.apache.tez.runtime.library.api.KeyValueReader; import org.apache.tez.runtime.library.api.KeyValuesReader; /** @@ -107,7 +109,7 @@ public class ReduceRecordSource implements RecordSource { /* this is only used in the error code path */ private List<VectorExpressionWriter> valueStringWriters; - private KeyValuesReader reader; + private KeyValuesAdapter reader; private boolean handleGroupKey; @@ -120,7 +122,7 @@ public class ReduceRecordSource implements RecordSource { private final GroupIterator groupIterator = new GroupIterator(); void init(JobConf jconf, Operator<?> reducer, boolean vectorized, TableDesc keyTableDesc, - TableDesc valueTableDesc, KeyValuesReader reader, boolean handleGroupKey, byte tag, + TableDesc valueTableDesc, Reader reader, boolean handleGroupKey, byte tag, Map<Integer, String> vectorScratchColumnTypeMap) throws Exception { @@ -129,7 +131,11 @@ public class ReduceRecordSource implements RecordSource { this.reducer = reducer; this.vectorized = vectorized; this.keyTableDesc = keyTableDesc; - this.reader = reader; + if (reader instanceof KeyValueReader) { + this.reader = new KeyValuesFromKeyValue((KeyValueReader) reader); + } else { + this.reader = new KeyValuesFromKeyValues((KeyValuesReader) reader); + } this.handleGroupKey = handleGroupKey; this.tag = tag; http://git-wip-us.apache.org/repos/asf/hive/blob/04d54f61/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java index e9bd44a..9bd811c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java @@ -100,7 +100,6 @@ public class VectorMapJoinOperator extends VectorMapJoinBaseOperator { @Override public Collection<Future<?>> initializeOp(Configuration hconf) throws HiveException { - // Use a final variable to properly parameterize the processVectorInspector closure. // Using a member variable in the closure will not do the right thing... final int parameterizePosBigTable = conf.getPosBigTable(); http://git-wip-us.apache.org/repos/asf/hive/blob/04d54f61/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java index 4c8c4b1..87ebcf2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java @@ -551,6 +551,7 @@ public abstract class VectorMapJoinCommonOperator extends MapJoinOperator implem @Override protected Collection<Future<?>> initializeOp(Configuration hconf) throws HiveException { + Collection<Future<?>> result = super.initializeOp(hconf); if (LOG.isDebugEnabled()) { http://git-wip-us.apache.org/repos/asf/hive/blob/04d54f61/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java index 5a87bd6..e3acdfc 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java @@ -93,9 +93,7 @@ public class ConvertJoinMapJoin implements NodeProcessor { if (retval == null) { return retval; } else { - int pos = 0; // it doesn't matter which position we use in this case. - convertJoinSMBJoin(joinOp, context, pos, 0, false); - return null; + fallbackToReduceSideJoin(joinOp, context); } } @@ -103,27 +101,8 @@ public class ConvertJoinMapJoin implements NodeProcessor { // exact number of buckets. Else choose the largest number of estimated // reducers from the parent operators. int numBuckets = -1; - int estimatedBuckets = -1; if (context.conf.getBoolVar(HiveConf.ConfVars.HIVE_CONVERT_JOIN_BUCKET_MAPJOIN_TEZ)) { - for (Operator<? extends OperatorDesc>parentOp : joinOp.getParentOperators()) { - if (parentOp.getOpTraits().getNumBuckets() > 0) { - numBuckets = (numBuckets < parentOp.getOpTraits().getNumBuckets()) ? - parentOp.getOpTraits().getNumBuckets() : numBuckets; - } - - if (parentOp instanceof ReduceSinkOperator) { - ReduceSinkOperator rs = (ReduceSinkOperator) parentOp; - estimatedBuckets = (estimatedBuckets < rs.getConf().getNumReducers()) ? - rs.getConf().getNumReducers() : estimatedBuckets; - } - } - - if (numBuckets <= 0) { - numBuckets = estimatedBuckets; - if (numBuckets <= 0) { - numBuckets = 1; - } - } + numBuckets = estimateNumBuckets(joinOp, true); } else { numBuckets = 1; } @@ -136,7 +115,7 @@ public class ConvertJoinMapJoin implements NodeProcessor { } else { // only case is full outer join with SMB enabled which is not possible. Convert to regular // join. - convertJoinSMBJoin(joinOp, context, 0, 0, false); + fallbackToReduceSideJoin(joinOp, context); return null; } } @@ -155,20 +134,18 @@ public class ConvertJoinMapJoin implements NodeProcessor { if (mapJoinConversionPos < 0) { // we are just converting to a common merge join operator. The shuffle // join in map-reduce case. - int pos = 0; // it doesn't matter which position we use in this case. - convertJoinSMBJoin(joinOp, context, pos, 0, false); + fallbackToReduceSideJoin(joinOp, context); return null; } - MapJoinOperator mapJoinOp = convertJoinMapJoin(joinOp, context, mapJoinConversionPos); + MapJoinOperator mapJoinOp = convertJoinMapJoin(joinOp, context, mapJoinConversionPos, true); // map join operator by default has no bucket cols and num of reduce sinks // reduced by 1 - mapJoinOp -.setOpTraits(new OpTraits(null, -1, null)); + mapJoinOp.setOpTraits(new OpTraits(null, -1, null)); mapJoinOp.setStatistics(joinOp.getStatistics()); // propagate this change till the next RS for (Operator<? extends OperatorDesc> childOp : mapJoinOp.getChildOperators()) { - setAllChildrenTraitsToNull(childOp); + setAllChildrenTraits(childOp, mapJoinOp.getOpTraits()); } return null; @@ -180,7 +157,7 @@ public class ConvertJoinMapJoin implements NodeProcessor { // we cannot convert to bucket map join, we cannot convert to // map join either based on the size. Check if we can convert to SMB join. if (context.conf.getBoolVar(HiveConf.ConfVars.HIVE_AUTO_SORTMERGE_JOIN) == false) { - convertJoinSMBJoin(joinOp, context, 0, 0, false); + fallbackToReduceSideJoin(joinOp, context); return null; } Class<? extends BigTableSelectorForAutoSMJ> bigTableMatcherClass = null; @@ -209,8 +186,7 @@ public class ConvertJoinMapJoin implements NodeProcessor { // contains aliases from sub-query // we are just converting to a common merge join operator. The shuffle // join in map-reduce case. - int pos = 0; // it doesn't matter which position we use in this case. - convertJoinSMBJoin(joinOp, context, pos, 0, false); + fallbackToReduceSideJoin(joinOp, context); return null; } @@ -220,8 +196,7 @@ public class ConvertJoinMapJoin implements NodeProcessor { } else { // we are just converting to a common merge join operator. The shuffle // join in map-reduce case. - int pos = 0; // it doesn't matter which position we use in this case. - convertJoinSMBJoin(joinOp, context, pos, 0, false); + fallbackToReduceSideJoin(joinOp, context); } return null; } @@ -317,16 +292,16 @@ public class ConvertJoinMapJoin implements NodeProcessor { mergeJoinOp.cloneOriginalParentsList(mergeJoinOp.getParentOperators()); } - private void setAllChildrenTraitsToNull(Operator<? extends OperatorDesc> currentOp) { + private void setAllChildrenTraits(Operator<? extends OperatorDesc> currentOp, OpTraits opTraits) { if (currentOp instanceof ReduceSinkOperator) { return; } - currentOp.setOpTraits(new OpTraits(null, -1, null)); + currentOp.setOpTraits(new OpTraits(opTraits.getBucketColNames(), opTraits.getNumBuckets(), opTraits.getSortCols())); for (Operator<? extends OperatorDesc> childOp : currentOp.getChildOperators()) { if ((childOp instanceof ReduceSinkOperator) || (childOp instanceof GroupByOperator)) { break; } - setAllChildrenTraitsToNull(childOp); + setAllChildrenTraits(childOp, opTraits); } } @@ -338,7 +313,7 @@ public class ConvertJoinMapJoin implements NodeProcessor { return false; } - MapJoinOperator mapJoinOp = convertJoinMapJoin(joinOp, context, bigTablePosition); + MapJoinOperator mapJoinOp = convertJoinMapJoin(joinOp, context, bigTablePosition, true); MapJoinDesc joinDesc = mapJoinOp.getConf(); joinDesc.setBucketMapJoin(true); @@ -633,7 +608,7 @@ public class ConvertJoinMapJoin implements NodeProcessor { */ public MapJoinOperator convertJoinMapJoin(JoinOperator joinOp, OptimizeTezProcContext context, - int bigTablePosition) throws SemanticException { + int bigTablePosition, boolean removeReduceSink) throws SemanticException { // bail on mux operator because currently the mux operator masks the emit keys // of the constituent reduce sinks. for (Operator<? extends OperatorDesc> parentOp : joinOp.getParentOperators()) { @@ -646,45 +621,49 @@ public class ConvertJoinMapJoin implements NodeProcessor { MapJoinOperator mapJoinOp = MapJoinProcessor.convertJoinOpMapJoinOp(context.conf, joinOp, joinOp.getConf().isLeftInputJoin(), joinOp.getConf().getBaseSrc(), - joinOp.getConf().getMapAliases(), bigTablePosition, true); + joinOp.getConf().getMapAliases(), bigTablePosition, true, removeReduceSink); mapJoinOp.getConf().setHybridHashJoin(HiveConf.getBoolVar(context.conf, - HiveConf.ConfVars.HIVEUSEHYBRIDGRACEHASHJOIN)); + HiveConf.ConfVars.HIVEUSEHYBRIDGRACEHASHJOIN)); Operator<? extends OperatorDesc> parentBigTableOp = mapJoinOp.getParentOperators().get(bigTablePosition); if (parentBigTableOp instanceof ReduceSinkOperator) { - for (Operator<?> p : parentBigTableOp.getParentOperators()) { - // we might have generated a dynamic partition operator chain. Since - // we're removing the reduce sink we need do remove that too. - Set<Operator<?>> dynamicPartitionOperators = new HashSet<Operator<?>>(); - Map<Operator<?>, AppMasterEventOperator> opEventPairs = new HashMap<>(); - for (Operator<?> c : p.getChildOperators()) { - AppMasterEventOperator event = findDynamicPartitionBroadcast(c); - if (event != null) { - dynamicPartitionOperators.add(c); - opEventPairs.put(c, event); + if (removeReduceSink) { + for (Operator<?> p : parentBigTableOp.getParentOperators()) { + // we might have generated a dynamic partition operator chain. Since + // we're removing the reduce sink we need do remove that too. + Set<Operator<?>> dynamicPartitionOperators = new HashSet<Operator<?>>(); + Map<Operator<?>, AppMasterEventOperator> opEventPairs = new HashMap<>(); + for (Operator<?> c : p.getChildOperators()) { + AppMasterEventOperator event = findDynamicPartitionBroadcast(c); + if (event != null) { + dynamicPartitionOperators.add(c); + opEventPairs.put(c, event); + } } - } - for (Operator<?> c : dynamicPartitionOperators) { - if (context.pruningOpsRemovedByPriorOpt.isEmpty() || - !context.pruningOpsRemovedByPriorOpt.contains(opEventPairs.get(c))) { - p.removeChild(c); - // at this point we've found the fork in the op pipeline that has the pruning as a child plan. - LOG.info("Disabling dynamic pruning for: " - + ((DynamicPruningEventDesc) opEventPairs.get(c).getConf()).getTableScan().getName() - + ". Need to be removed together with reduce sink"); + for (Operator<?> c : dynamicPartitionOperators) { + if (context.pruningOpsRemovedByPriorOpt.isEmpty() || + !context.pruningOpsRemovedByPriorOpt.contains(opEventPairs.get(c))) { + p.removeChild(c); + // at this point we've found the fork in the op pipeline that has the pruning as a child plan. + LOG.info("Disabling dynamic pruning for: " + + ((DynamicPruningEventDesc) opEventPairs.get(c).getConf()).getTableScan().getName() + + ". Need to be removed together with reduce sink"); + } + } + for (Operator<?> op : dynamicPartitionOperators) { + context.pruningOpsRemovedByPriorOpt.add(opEventPairs.get(op)); } } - for (Operator<?> op : dynamicPartitionOperators) { - context.pruningOpsRemovedByPriorOpt.add(opEventPairs.get(op)); + + mapJoinOp.getParentOperators().remove(bigTablePosition); + if (!(mapJoinOp.getParentOperators().contains(parentBigTableOp.getParentOperators().get(0)))) { + mapJoinOp.getParentOperators().add(bigTablePosition, + parentBigTableOp.getParentOperators().get(0)); } + parentBigTableOp.getParentOperators().get(0).removeChild(parentBigTableOp); } - mapJoinOp.getParentOperators().remove(bigTablePosition); - if (!(mapJoinOp.getParentOperators().contains(parentBigTableOp.getParentOperators().get(0)))) { - mapJoinOp.getParentOperators().add(bigTablePosition, - parentBigTableOp.getParentOperators().get(0)); - } - parentBigTableOp.getParentOperators().get(0).removeChild(parentBigTableOp); + for (Operator<? extends OperatorDesc>op : mapJoinOp.getParentOperators()) { if (!(op.getChildOperators().contains(mapJoinOp))) { op.getChildOperators().add(mapJoinOp); @@ -720,4 +699,100 @@ public class ConvertJoinMapJoin implements NodeProcessor { return null; } + + /** + * Estimate the number of buckets in the join, using the parent operators' OpTraits and/or + * parent operators' number of reducers + * @param joinOp + * @param useOpTraits Whether OpTraits should be used for the estimate. + * @return + */ + private static int estimateNumBuckets(JoinOperator joinOp, boolean useOpTraits) { + int numBuckets = -1; + int estimatedBuckets = -1; + + for (Operator<? extends OperatorDesc>parentOp : joinOp.getParentOperators()) { + if (parentOp.getOpTraits().getNumBuckets() > 0) { + numBuckets = (numBuckets < parentOp.getOpTraits().getNumBuckets()) ? + parentOp.getOpTraits().getNumBuckets() : numBuckets; + } + + if (parentOp instanceof ReduceSinkOperator) { + ReduceSinkOperator rs = (ReduceSinkOperator) parentOp; + estimatedBuckets = (estimatedBuckets < rs.getConf().getNumReducers()) ? + rs.getConf().getNumReducers() : estimatedBuckets; + } + } + + if (!useOpTraits) { + // Ignore the value we got from OpTraits. + // The logic below will fall back to the estimate from numReducers + numBuckets = -1; + } + + if (numBuckets <= 0) { + numBuckets = estimatedBuckets; + if (numBuckets <= 0) { + numBuckets = 1; + } + } + + return numBuckets; + } + + private boolean convertJoinDynamicPartitionedHashJoin(JoinOperator joinOp, OptimizeTezProcContext context) + throws SemanticException { + // Attempt dynamic partitioned hash join + // Since we don't have big table index yet, must start with estimate of numReducers + int numReducers = estimateNumBuckets(joinOp, false); + LOG.info("Try dynamic partitioned hash join with estimated " + numReducers + " reducers"); + int bigTablePos = getMapJoinConversionPos(joinOp, context, numReducers); + if (bigTablePos >= 0) { + // Now that we have the big table index, get real numReducers value based on big table RS + ReduceSinkOperator bigTableParentRS = + (ReduceSinkOperator) (joinOp.getParentOperators().get(bigTablePos)); + numReducers = bigTableParentRS.getConf().getNumReducers(); + LOG.debug("Real big table reducers = " + numReducers); + + MapJoinOperator mapJoinOp = convertJoinMapJoin(joinOp, context, bigTablePos, false); + if (mapJoinOp != null) { + LOG.info("Selected dynamic partitioned hash join"); + mapJoinOp.getConf().setDynamicPartitionHashJoin(true); + // Set OpTraits for dynamically partitioned hash join: + // bucketColNames: Re-use previous joinOp's bucketColNames. Parent operators should be + // reduce sink, which should have bucket columns based on the join keys. + // numBuckets: set to number of reducers + // sortCols: This is an unsorted join - no sort cols + OpTraits opTraits = new OpTraits( + joinOp.getOpTraits().getBucketColNames(), + numReducers, + null); + mapJoinOp.setOpTraits(opTraits); + mapJoinOp.setStatistics(joinOp.getStatistics()); + // propagate this change till the next RS + for (Operator<? extends OperatorDesc> childOp : mapJoinOp.getChildOperators()) { + setAllChildrenTraits(childOp, mapJoinOp.getOpTraits()); + } + return true; + } + } + + return false; + } + + private void fallbackToReduceSideJoin(JoinOperator joinOp, OptimizeTezProcContext context) + throws SemanticException { + if (context.conf.getBoolVar(HiveConf.ConfVars.HIVECONVERTJOIN) && + context.conf.getBoolVar(HiveConf.ConfVars.HIVEDYNAMICPARTITIONHASHJOIN)) { + if (convertJoinDynamicPartitionedHashJoin(joinOp, context)) { + return; + } + } + + // we are just converting to a common merge join operator. The shuffle + // join in map-reduce case. + int pos = 0; // it doesn't matter which position we use in this case. + LOG.info("Fallback to common merge join operator"); + convertJoinSMBJoin(joinOp, context, pos, 0, false); + } } http://git-wip-us.apache.org/repos/asf/hive/blob/04d54f61/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java index 4d84f0f..f8f2b7b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java @@ -29,6 +29,8 @@ import java.util.Map; import java.util.Set; import java.util.Stack; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.ObjectPair; import org.apache.hadoop.hive.conf.HiveConf; @@ -57,6 +59,7 @@ import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; import org.apache.hadoop.hive.ql.lib.Rule; import org.apache.hadoop.hive.ql.lib.RuleRegExp; import org.apache.hadoop.hive.ql.parse.GenMapRedWalker; +import org.apache.hadoop.hive.ql.parse.OptimizeTezProcContext; import org.apache.hadoop.hive.ql.parse.ParseContext; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; @@ -89,6 +92,7 @@ public class MapJoinProcessor implements Transform { // (column type + column name). The column name is not really used anywhere, but it // needs to be passed. Use the string defined below for that. private static final String MAPJOINKEY_FIELDPREFIX = "mapjoinkey"; + private static final Log LOG = LogFactory.getLog(MapJoinProcessor.class.getName()); public MapJoinProcessor() { } @@ -356,11 +360,18 @@ public class MapJoinProcessor implements Transform { public static MapJoinOperator convertJoinOpMapJoinOp(HiveConf hconf, JoinOperator op, boolean leftInputJoin, String[] baseSrc, List<String> mapAliases, int mapJoinPos, boolean noCheckOuterJoin) throws SemanticException { + return convertJoinOpMapJoinOp(hconf, op, leftInputJoin, baseSrc, mapAliases, + mapJoinPos, noCheckOuterJoin, true); + } + + public static MapJoinOperator convertJoinOpMapJoinOp(HiveConf hconf, + JoinOperator op, boolean leftInputJoin, String[] baseSrc, List<String> mapAliases, + int mapJoinPos, boolean noCheckOuterJoin, boolean adjustParentsChildren) + throws SemanticException { MapJoinDesc mapJoinDescriptor = getMapJoinDesc(hconf, op, leftInputJoin, baseSrc, mapAliases, - mapJoinPos, noCheckOuterJoin); - + mapJoinPos, noCheckOuterJoin, adjustParentsChildren); // reduce sink row resolver used to generate map join op RowSchema outputRS = op.getSchema(); @@ -1025,7 +1036,7 @@ public class MapJoinProcessor implements Transform { public static MapJoinDesc getMapJoinDesc(HiveConf hconf, JoinOperator op, boolean leftInputJoin, String[] baseSrc, List<String> mapAliases, - int mapJoinPos, boolean noCheckOuterJoin) throws SemanticException { + int mapJoinPos, boolean noCheckOuterJoin, boolean adjustParentsChildren) throws SemanticException { JoinDesc desc = op.getConf(); JoinCondDesc[] condns = desc.getConds(); Byte[] tagOrder = desc.getTagOrder(); @@ -1072,6 +1083,26 @@ public class MapJoinProcessor implements Transform { // get the join keys from old parent ReduceSink operators Map<Byte, List<ExprNodeDesc>> keyExprMap = pair.getSecond(); + if (!adjustParentsChildren) { + // Since we did not remove reduce sink parents, keep the original value expressions + newValueExprs = valueExprs; + + // Join key exprs are represented in terms of the original table columns, + // we need to convert these to the generated column names we can see in the Join operator + Map<Byte, List<ExprNodeDesc>> newKeyExprMap = new HashMap<Byte, List<ExprNodeDesc>>(); + for (Map.Entry<Byte, List<ExprNodeDesc>> mapEntry : keyExprMap.entrySet()) { + Byte pos = mapEntry.getKey(); + ReduceSinkOperator rsParent = oldReduceSinkParentOps.get(pos.byteValue()); + List<ExprNodeDesc> keyExprList = + ExprNodeDescUtils.resolveJoinKeysAsRSColumns(mapEntry.getValue(), rsParent); + if (keyExprList == null) { + throw new SemanticException("Error resolving join keys"); + } + newKeyExprMap.put(pos, keyExprList); + } + keyExprMap = newKeyExprMap; + } + // construct valueTableDescs and valueFilteredTableDescs List<TableDesc> valueTableDescs = new ArrayList<TableDesc>(); List<TableDesc> valueFilteredTableDescs = new ArrayList<TableDesc>(); @@ -1163,4 +1194,11 @@ public class MapJoinProcessor implements Transform { return mapJoinDescriptor; } + + public static MapJoinDesc getMapJoinDesc(HiveConf hconf, + JoinOperator op, boolean leftInputJoin, String[] baseSrc, List<String> mapAliases, + int mapJoinPos, boolean noCheckOuterJoin) throws SemanticException { + return getMapJoinDesc(hconf, op, leftInputJoin, baseSrc, + mapAliases, mapJoinPos, noCheckOuterJoin, true); + } } http://git-wip-us.apache.org/repos/asf/hive/blob/04d54f61/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java index bca91dd..b546838 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java @@ -21,12 +21,15 @@ package org.apache.hadoop.hive.ql.optimizer; import java.util.ArrayList; import java.util.EnumSet; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.Stack; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator; import org.apache.hadoop.hive.ql.exec.MapJoinOperator; import org.apache.hadoop.hive.ql.exec.Operator; @@ -58,11 +61,13 @@ import org.apache.hadoop.hive.ql.plan.TezWork; import org.apache.hadoop.hive.ql.plan.TezWork.VertexType; import org.apache.hadoop.hive.ql.stats.StatsUtils; +import com.google.common.collect.Sets; + import static org.apache.hadoop.hive.ql.plan.ReduceSinkDesc.ReducerTraits.FIXED; public class ReduceSinkMapJoinProc implements NodeProcessor { - protected transient Log LOG = LogFactory.getLog(this.getClass().getName()); + private final static Log LOG = LogFactory.getLog(ReduceSinkMapJoinProc.class.getName()); /* (non-Javadoc) * This processor addresses the RS-MJ case that occurs in tez on the small/hash @@ -79,7 +84,40 @@ public class ReduceSinkMapJoinProc implements NodeProcessor { GenTezProcContext context = (GenTezProcContext) procContext; MapJoinOperator mapJoinOp = (MapJoinOperator)nd; - if (stack.size() < 2 || !(stack.get(stack.size() - 2) instanceof ReduceSinkOperator)) { + // remember the original parent list before we start modifying it. + if (!context.mapJoinParentMap.containsKey(mapJoinOp)) { + List<Operator<?>> parents = new ArrayList<Operator<?>>(mapJoinOp.getParentOperators()); + context.mapJoinParentMap.put(mapJoinOp, parents); + } + + boolean isBigTable = stack.size() < 2 + || !(stack.get(stack.size() - 2) instanceof ReduceSinkOperator); + + ReduceSinkOperator parentRS = null; + if (!isBigTable) { + parentRS = (ReduceSinkOperator)stack.get(stack.size() - 2); + + // For dynamic partitioned hash join, the big table will also be coming from a ReduceSinkOperator + // Check for this condition. + // TODO: use indexOf(), or parentRS.getTag()? + isBigTable = + (mapJoinOp.getParentOperators().indexOf(parentRS) == mapJoinOp.getConf().getPosBigTable()); + } + + if (mapJoinOp.getConf().isDynamicPartitionHashJoin() && + !context.mapJoinToUnprocessedSmallTableReduceSinks.containsKey(mapJoinOp)) { + // Initialize set of unprocessed small tables + Set<ReduceSinkOperator> rsSet = Sets.newIdentityHashSet(); + for (int pos = 0; pos < mapJoinOp.getParentOperators().size(); ++pos) { + if (pos == mapJoinOp.getConf().getPosBigTable()) { + continue; + } + rsSet.add((ReduceSinkOperator) mapJoinOp.getParentOperators().get(pos)); + } + context.mapJoinToUnprocessedSmallTableReduceSinks.put(mapJoinOp, rsSet); + } + + if (isBigTable) { context.currentMapJoinOperators.add(mapJoinOp); return null; } @@ -87,14 +125,29 @@ public class ReduceSinkMapJoinProc implements NodeProcessor { context.preceedingWork = null; context.currentRootOperator = null; - ReduceSinkOperator parentRS = (ReduceSinkOperator)stack.get(stack.size() - 2); + return processReduceSinkToHashJoin(parentRS, mapJoinOp, context); + } + + public static BaseWork getMapJoinParentWork(GenTezProcContext context, Operator<?> parentRS) { + BaseWork parentWork; + if (context.unionWorkMap.containsKey(parentRS)) { + parentWork = context.unionWorkMap.get(parentRS); + } else { + assert context.childToWorkMap.get(parentRS).size() == 1; + parentWork = context.childToWorkMap.get(parentRS).get(0); + } + return parentWork; + } + + public static Object processReduceSinkToHashJoin(ReduceSinkOperator parentRS, MapJoinOperator mapJoinOp, + GenTezProcContext context) throws SemanticException { // remove the tag for in-memory side of mapjoin parentRS.getConf().setSkipTag(true); parentRS.setSkipTag(true); - // remember the original parent list before we start modifying it. - if (!context.mapJoinParentMap.containsKey(mapJoinOp)) { - List<Operator<?>> parents = new ArrayList<Operator<?>>(mapJoinOp.getParentOperators()); - context.mapJoinParentMap.put(mapJoinOp, parents); + + // Mark this small table as being processed + if (mapJoinOp.getConf().isDynamicPartitionHashJoin()) { + context.mapJoinToUnprocessedSmallTableReduceSinks.get(mapJoinOp).remove(parentRS); } List<BaseWork> mapJoinWork = null; @@ -109,13 +162,7 @@ public class ReduceSinkMapJoinProc implements NodeProcessor { * */ mapJoinWork = context.mapJoinWorkMap.get(mapJoinOp); - BaseWork parentWork; - if (context.unionWorkMap.containsKey(parentRS)) { - parentWork = context.unionWorkMap.get(parentRS); - } else { - assert context.childToWorkMap.get(parentRS).size() == 1; - parentWork = context.childToWorkMap.get(parentRS).get(0); - } + BaseWork parentWork = getMapJoinParentWork(context, parentRS); // set the link between mapjoin and parent vertex int pos = context.mapJoinParentMap.get(mapJoinOp).indexOf(parentRS); @@ -161,6 +208,11 @@ public class ReduceSinkMapJoinProc implements NodeProcessor { keyCount /= bucketCount; tableSize /= bucketCount; } + } else if (joinConf.isDynamicPartitionHashJoin()) { + // For dynamic partitioned hash join, assuming table is split evenly among the reduce tasks. + bucketCount = parentRS.getConf().getNumReducers(); + keyCount /= bucketCount; + tableSize /= bucketCount; } } LOG.info("Mapjoin " + mapJoinOp + ", pos: " + pos + " --> " + parentWork.getName() + " (" @@ -218,6 +270,8 @@ public class ReduceSinkMapJoinProc implements NodeProcessor { edgeType = EdgeType.CUSTOM_SIMPLE_EDGE; } } + } else if (mapJoinOp.getConf().isDynamicPartitionHashJoin()) { + edgeType = EdgeType.CUSTOM_SIMPLE_EDGE; } TezEdgeProperty edgeProp = new TezEdgeProperty(null, edgeType, numBuckets); @@ -232,7 +286,7 @@ public class ReduceSinkMapJoinProc implements NodeProcessor { } ReduceSinkOperator r = null; - if (parentRS.getConf().getOutputName() != null) { + if (context.connectedReduceSinks.contains(parentRS)) { LOG.debug("Cloning reduce sink for multi-child broadcast edge"); // we've already set this one up. Need to clone for the next work. r = (ReduceSinkOperator) OperatorFactory.getAndMakeChild( http://git-wip-us.apache.org/repos/asf/hive/blob/04d54f61/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java index f474eae..9334c73 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.parse; import java.io.Serializable; +import java.util.HashMap; import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.LinkedList; @@ -107,6 +108,10 @@ public class GenTezProcContext implements NodeProcessorCtx{ // map that says which mapjoin belongs to which work item public final Map<MapJoinOperator, List<BaseWork>> mapJoinWorkMap; + // Mapping of reducesink to mapjoin operators + // Only used for dynamic partitioned hash joins (mapjoin operator in the reducer) + public final Map<Operator<?>, MapJoinOperator> smallTableParentToMapJoinMap; + // a map to keep track of which root generated which work public final Map<Operator<?>, BaseWork> rootToWorkMap; @@ -151,6 +156,11 @@ public class GenTezProcContext implements NodeProcessorCtx{ // remember the connections between ts and event public final Map<TableScanOperator, List<AppMasterEventOperator>> tsToEventMap; + // When processing dynamic partitioned hash joins, some of the small tables may not get processed + // before the mapjoin's parents are removed during GenTezWork.process(). This is to keep + // track of which small tables haven't been processed yet. + public Map<MapJoinOperator, Set<ReduceSinkOperator>> mapJoinToUnprocessedSmallTableReduceSinks; + @SuppressWarnings("unchecked") public GenTezProcContext(HiveConf conf, ParseContext parseContext, List<Task<MoveWork>> moveTask, List<Task<? extends Serializable>> rootTasks, @@ -167,6 +177,7 @@ public class GenTezProcContext implements NodeProcessorCtx{ this.leafOperatorToFollowingWork = new LinkedHashMap<Operator<?>, BaseWork>(); this.linkOpWithWorkMap = new LinkedHashMap<Operator<?>, Map<BaseWork, TezEdgeProperty>>(); this.linkWorkWithReduceSinkMap = new LinkedHashMap<BaseWork, List<ReduceSinkOperator>>(); + this.smallTableParentToMapJoinMap = new LinkedHashMap<Operator<?>, MapJoinOperator>(); this.mapJoinWorkMap = new LinkedHashMap<MapJoinOperator, List<BaseWork>>(); this.rootToWorkMap = new LinkedHashMap<Operator<?>, BaseWork>(); this.childToWorkMap = new LinkedHashMap<Operator<?>, List<BaseWork>>(); @@ -188,6 +199,7 @@ public class GenTezProcContext implements NodeProcessorCtx{ this.tsToEventMap = new LinkedHashMap<TableScanOperator, List<AppMasterEventOperator>>(); this.opMergeJoinWorkMap = new LinkedHashMap<Operator<?>, MergeJoinWork>(); this.currentMergeJoinOperator = null; + this.mapJoinToUnprocessedSmallTableReduceSinks = new HashMap<MapJoinOperator, Set<ReduceSinkOperator>>(); rootTasks.add(currentTask); } http://git-wip-us.apache.org/repos/asf/hive/blob/04d54f61/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java index 93ad145..a9d1f8e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hive.ql.exec.AppMasterEventOperator; import org.apache.hadoop.hive.ql.exec.FetchTask; import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator; +import org.apache.hadoop.hive.ql.exec.MapJoinOperator; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.OperatorUtils; import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; @@ -130,12 +131,13 @@ public class GenTezUtils { tezWork.add(reduceWork); TezEdgeProperty edgeProp; + EdgeType edgeType = determineEdgeType(context.preceedingWork, reduceWork); if (reduceWork.isAutoReduceParallelism()) { edgeProp = - new TezEdgeProperty(context.conf, EdgeType.SIMPLE_EDGE, true, + new TezEdgeProperty(context.conf, edgeType, true, reduceWork.getMinReduceTasks(), reduceWork.getMaxReduceTasks(), bytesPerReducer); } else { - edgeProp = new TezEdgeProperty(EdgeType.SIMPLE_EDGE); + edgeProp = new TezEdgeProperty(edgeType); } tezWork.connect( @@ -470,4 +472,21 @@ public class GenTezUtils { curr.removeChild(child); } + + public static EdgeType determineEdgeType(BaseWork preceedingWork, BaseWork followingWork) { + if (followingWork instanceof ReduceWork) { + // Ideally there should be a better way to determine that the followingWork contains + // a dynamic partitioned hash join, but in some cases (createReduceWork()) it looks like + // the work must be created/connected first, before the GenTezProcContext can be updated + // with the mapjoin/work relationship. + ReduceWork reduceWork = (ReduceWork) followingWork; + if (reduceWork.getReducer() instanceof MapJoinOperator) { + MapJoinOperator joinOp = (MapJoinOperator) reduceWork.getReducer(); + if (joinOp.getConf().isDynamicPartitionHashJoin()) { + return EdgeType.CUSTOM_SIMPLE_EDGE; + } + } + } + return EdgeType.SIMPLE_EDGE; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/04d54f61/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java index 6b3e19d..c4e0413 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java @@ -40,6 +40,7 @@ import org.apache.hadoop.hive.ql.lib.Node; import org.apache.hadoop.hive.ql.lib.NodeProcessor; import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils; +import org.apache.hadoop.hive.ql.optimizer.ReduceSinkMapJoinProc; import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.hive.ql.plan.MergeJoinWork; import org.apache.hadoop.hive.ql.plan.OperatorDesc; @@ -165,8 +166,11 @@ public class GenTezWork implements NodeProcessor { mergeJoinWork.addMergedWork(work, null, context.leafOperatorToFollowingWork); Operator<? extends OperatorDesc> parentOp = getParentFromStack(context.currentMergeJoinOperator, stack); + // Set the big table position. Both the reduce work and merge join operator + // should be set with the same value. int pos = context.currentMergeJoinOperator.getTagForOperator(parentOp); work.setTag(pos); + context.currentMergeJoinOperator.getConf().setBigTablePosition(pos); tezWork.setVertexType(work, VertexType.MULTI_INPUT_UNINITIALIZED_EDGES); for (BaseWork parentWork : tezWork.getParents(work)) { TezEdgeProperty edgeProp = tezWork.getEdgeProperty(parentWork, work); @@ -190,6 +194,50 @@ public class GenTezWork implements NodeProcessor { // remember which mapjoin operator links with which work if (!context.currentMapJoinOperators.isEmpty()) { for (MapJoinOperator mj: context.currentMapJoinOperators) { + // For dynamic partitioned hash join, ReduceSinkMapJoinProc rule may not get run for all + // of the ReduceSink parents, because the parents of the MapJoin operator get + // removed later on in this method. Keep track of the parent to mapjoin mapping + // so we can later run the same logic that is run in ReduceSinkMapJoinProc. + if (mj.getConf().isDynamicPartitionHashJoin()) { + // Since this is a dynamic partitioned hash join, the work for this join should be a ReduceWork + ReduceWork reduceWork = (ReduceWork) work; + int bigTablePosition = mj.getConf().getPosBigTable(); + reduceWork.setTag(bigTablePosition); + + // Use context.mapJoinParentMap to get the original RS parents, because + // the MapJoin's parents may have been replaced by dummy operator. + List<Operator<?>> mapJoinOriginalParents = context.mapJoinParentMap.get(mj); + if (mapJoinOriginalParents == null) { + throw new SemanticException("Unexpected error - context.mapJoinParentMap did not have an entry for " + mj); + } + for (int pos = 0; pos < mapJoinOriginalParents.size(); ++pos) { + // This processing only needs to happen for the small tables + if (pos == bigTablePosition) { + continue; + } + Operator<?> parentOp = mapJoinOriginalParents.get(pos); + context.smallTableParentToMapJoinMap.put(parentOp, mj); + + ReduceSinkOperator parentRS = (ReduceSinkOperator) parentOp; + + // TableDesc needed for dynamic partitioned hash join + GenMapRedUtils.setKeyAndValueDesc(reduceWork, parentRS); + + // For small table RS parents that have already been processed, we need to + // add the tag to the RS work to the reduce work that contains this map join. + // This was not being done for normal mapjoins, where the small table typically + // has its ReduceSink parent removed. + if (!context.mapJoinToUnprocessedSmallTableReduceSinks.get(mj).contains(parentRS)) { + // This reduce sink has been processed already, so the work for the parentRS exists + BaseWork parentWork = ReduceSinkMapJoinProc.getMapJoinParentWork(context, parentRS); + int tag = parentRS.getConf().getTag(); + tag = (tag == -1 ? 0 : tag); + reduceWork.getTagToInput().put(tag, parentWork.getName()); + } + + } + } + LOG.debug("Processing map join: " + mj); // remember the mapping in case we scan another branch of the // mapjoin later @@ -369,15 +417,44 @@ public class GenTezWork implements NodeProcessor { // remember the output name of the reduce sink rs.getConf().setOutputName(rWork.getName()); + // For dynamic partitioned hash join, run the ReduceSinkMapJoinProc logic for any + // ReduceSink parents that we missed. + MapJoinOperator mj = context.smallTableParentToMapJoinMap.get(rs); + if (mj != null) { + // Only need to run the logic for tables we missed + if (context.mapJoinToUnprocessedSmallTableReduceSinks.get(mj).contains(rs)) { + // ReduceSinkMapJoinProc logic does not work unless the ReduceSink is connected as + // a parent of the MapJoin, but at this point we have already removed all of the + // parents from the MapJoin. + // Try temporarily adding the RS as a parent + ArrayList<Operator<?>> tempMJParents = new ArrayList<Operator<?>>(); + tempMJParents.add(rs); + mj.setParentOperators(tempMJParents); + // ReduceSink also needs MapJoin as child + List<Operator<?>> rsChildren = rs.getChildOperators(); + rsChildren.add(mj); + + // Since the MapJoin has had all of its other parents removed at this point, + // it would be bad here if processReduceSinkToHashJoin() tries to do anything + // with the RS parent based on its position in the list of parents. + ReduceSinkMapJoinProc.processReduceSinkToHashJoin(rs, mj, context); + + // Remove any parents from MapJoin again + mj.removeParents(); + // TODO: do we also need to remove the MapJoin from the list of RS's children? + } + } + if (!context.connectedReduceSinks.contains(rs)) { // add dependency between the two work items TezEdgeProperty edgeProp; + EdgeType edgeType = utils.determineEdgeType(work, followingWork); if (rWork.isAutoReduceParallelism()) { edgeProp = - new TezEdgeProperty(context.conf, EdgeType.SIMPLE_EDGE, true, + new TezEdgeProperty(context.conf, edgeType, true, rWork.getMinReduceTasks(), rWork.getMaxReduceTasks(), bytesPerReducer); } else { - edgeProp = new TezEdgeProperty(EdgeType.SIMPLE_EDGE); + edgeProp = new TezEdgeProperty(edgeType); } tezWork.connect(work, followingWork, edgeProp); context.connectedReduceSinks.add(rs); http://git-wip-us.apache.org/repos/asf/hive/blob/04d54f61/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java index fa697ef..d574c5c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java @@ -47,7 +47,7 @@ public abstract class BaseWork extends AbstractOperatorDesc { // Their function is mainly as root ops to give the mapjoin the correct // schema info. List<HashTableDummyOperator> dummyOps; - int tag; + int tag = 0; private final List<String> sortColNames = new ArrayList<String>(); private MapredLocalWork mrLocalWork; http://git-wip-us.apache.org/repos/asf/hive/blob/04d54f61/ql/src/java/org/apache/hadoop/hive/ql/plan/CommonMergeJoinDesc.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/CommonMergeJoinDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/CommonMergeJoinDesc.java index f9c34cb..cce9bc4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/CommonMergeJoinDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/CommonMergeJoinDesc.java @@ -45,4 +45,8 @@ public class CommonMergeJoinDesc extends MapJoinDesc implements Serializable { public int getBigTablePosition() { return mapJoinConversionPos; } + + public void setBigTablePosition(int pos) { + mapJoinConversionPos = pos; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/04d54f61/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDescUtils.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDescUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDescUtils.java index fb3c4a3..e291a48 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDescUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDescUtils.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluatorFactory; import org.apache.hadoop.hive.ql.exec.FunctionRegistry; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.UDF; +import org.apache.hadoop.hive.ql.optimizer.ConstantPropagateProcFactory; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBridge; @@ -278,6 +279,59 @@ public class ExprNodeDescUtils { throw new SemanticException("Met multiple parent operators"); } + public static List<ExprNodeDesc> resolveJoinKeysAsRSColumns(List<ExprNodeDesc> sourceList, + Operator<?> reduceSinkOp) { + ArrayList<ExprNodeDesc> result = new ArrayList<ExprNodeDesc>(sourceList.size()); + for (ExprNodeDesc source : sourceList) { + ExprNodeDesc newExpr = resolveJoinKeysAsRSColumns(source, reduceSinkOp); + if (newExpr == null) { + return null; + } + result.add(newExpr); + } + return result; + } + + /** + * Join keys are expressions based on the select operator. Resolve the expressions so they + * are based on the ReduceSink operator + * SEL -> RS -> JOIN + * @param source + * @param reduceSinkOp + * @return + */ + public static ExprNodeDesc resolveJoinKeysAsRSColumns(ExprNodeDesc source, Operator<?> reduceSinkOp) { + // Assuming this is only being done for join keys. As a result we shouldn't have to recursively + // check any nested child expressions, because the result of the expression should exist as an + // output column of the ReduceSink operator + if (source == null) { + return null; + } + + // columnExprMap has the reverse of what we need - a mapping of the internal column names + // to the ExprNodeDesc from the previous operation. + // Find the key/value where the ExprNodeDesc value matches the column we are searching for. + // The key portion of the entry will be the internal column name for the join key expression. + for (Map.Entry<String, ExprNodeDesc> mapEntry : reduceSinkOp.getColumnExprMap().entrySet()) { + if (mapEntry.getValue().isSame(source)) { + String columnInternalName = mapEntry.getKey(); + if (source instanceof ExprNodeColumnDesc) { + // The join key is a table column. Create the ExprNodeDesc based on this column. + ColumnInfo columnInfo = reduceSinkOp.getSchema().getColumnInfo(columnInternalName); + return new ExprNodeColumnDesc(columnInfo); + } else { + // Join key expression is likely some expression involving functions/operators, so there + // is no actual table column for this. But the ReduceSink operator should still have an + // output column corresponding to this expression, using the columnInternalName. + // TODO: does tableAlias matter for this kind of expression? + return new ExprNodeColumnDesc(source.getTypeInfo(), columnInternalName, "", false); + } + } + } + + return null; // Couldn't find reference to expression + } + public static ExprNodeDesc[] extractComparePair(ExprNodeDesc expr1, ExprNodeDesc expr2) { expr1 = extractConstant(expr1); expr2 = extractConstant(expr2); @@ -483,4 +537,65 @@ public class ExprNodeDescUtils { return exprColLst; } + + public static List<ExprNodeDesc> flattenExprList(List<ExprNodeDesc> sourceList) { + ArrayList<ExprNodeDesc> result = new ArrayList<ExprNodeDesc>(sourceList.size()); + for (ExprNodeDesc source : sourceList) { + result.add(flattenExpr(source)); + } + return result; + } + + /** + * A normal reduce operator's rowObjectInspector looks like a struct containing + * nested key/value structs that contain the column values: + * { key: { reducesinkkey0:int }, value: { _col0:int, _col1:int, .. } } + * + * While the rowObjectInspector looks the same for vectorized queries during + * compilation time, within the tasks at query execution the rowObjectInspector + * has changed to a flatter structure without nested key/value structs: + * { 'key.reducesinkkey0':int, 'value._col0':int, 'value._col1':int, .. } + * + * Trying to fetch 'key.reducesinkkey0' by name from the list of flattened + * ObjectInspectors does not work because the '.' gets interpreted as a field member, + * even though it is a flattened list of column values. + * This workaround converts the column name referenced in the ExprNodeDesc + * from a nested field name (key.reducesinkkey0) to key_reducesinkkey0, + * simply by replacing '.' with '_'. + * @param source + * @return + */ + public static ExprNodeDesc flattenExpr(ExprNodeDesc source) { + if (source instanceof ExprNodeGenericFuncDesc) { + // all children expression should be resolved + ExprNodeGenericFuncDesc function = (ExprNodeGenericFuncDesc) source.clone(); + List<ExprNodeDesc> newChildren = flattenExprList(function.getChildren()); + for (ExprNodeDesc newChild : newChildren) { + if (newChild == null) { + // Could not resolve all of the function children, fail + return null; + } + } + function.setChildren(newChildren); + return function; + } + if (source instanceof ExprNodeColumnDesc) { + ExprNodeColumnDesc column = (ExprNodeColumnDesc) source; + // Create a new ColumnInfo, replacing STRUCT.COLUMN with STRUCT_COLUMN + String newColumn = column.getColumn().replace('.', '_'); + return new ExprNodeColumnDesc(source.getTypeInfo(), newColumn, column.getTabAlias(), false); + } + if (source instanceof ExprNodeFieldDesc) { + // field expression should be resolved + ExprNodeFieldDesc field = (ExprNodeFieldDesc) source.clone(); + ExprNodeDesc fieldDesc = flattenExpr(field.getDesc()); + if (fieldDesc == null) { + return null; + } + field.setDesc(fieldDesc); + return field; + } + // constant or null expr, just return + return source; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/04d54f61/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java index cee9100..e27b89b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java @@ -29,6 +29,8 @@ import java.util.Map.Entry; import java.util.Set; import org.apache.hadoop.hive.ql.plan.Explain.Level; +import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; + /** * Map Join operator Descriptor implementation. * @@ -71,6 +73,7 @@ public class MapJoinDesc extends JoinDesc implements Serializable { protected boolean genJoinKeys = true; private boolean isHybridHashJoin; + private boolean isDynamicPartitionHashJoin = false; // Extra parameters only for vectorization. private VectorMapJoinDesc vectorDesc; @@ -369,4 +372,12 @@ public class MapJoinDesc extends JoinDesc implements Serializable { public boolean getGenJoinKeys() { return genJoinKeys; } + + public boolean isDynamicPartitionHashJoin() { + return isDynamicPartitionHashJoin; + } + + public void setDynamicPartitionHashJoin(boolean isDistributedHashJoin) { + this.isDynamicPartitionHashJoin = isDistributedHashJoin; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/04d54f61/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java index a78a92e..020d6de 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java @@ -136,7 +136,7 @@ public class ReduceWork extends BaseWork { return null; } if (valueObjectInspector == null) { - valueObjectInspector = getObjectInspector(tagToValueDesc.get(0)); + valueObjectInspector = getObjectInspector(tagToValueDesc.get(tag)); } return valueObjectInspector; } http://git-wip-us.apache.org/repos/asf/hive/blob/04d54f61/ql/src/test/queries/clientpositive/tez_dynpart_hashjoin_1.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/tez_dynpart_hashjoin_1.q b/ql/src/test/queries/clientpositive/tez_dynpart_hashjoin_1.q new file mode 100644 index 0000000..e3325c4 --- /dev/null +++ b/ql/src/test/queries/clientpositive/tez_dynpart_hashjoin_1.q @@ -0,0 +1,101 @@ + +set hive.explain.user=false; +set hive.auto.convert.join=false; +set hive.optimize.dynamic.partition.hashjoin=false; + +-- First try with regular mergejoin +explain +select + * +from alltypesorc a join alltypesorc b on a.cint = b.cint +where + a.cint between 1000000 and 3000000 and b.cbigint is not null +order by a.cint; + +select + * +from alltypesorc a join alltypesorc b on a.cint = b.cint +where + a.cint between 1000000 and 3000000 and b.cbigint is not null +order by a.cint; + +explain +select + count(*) +from alltypesorc a join alltypesorc b on a.cint = b.cint +where + a.cint between 1000000 and 3000000 and b.cbigint is not null; + +select + count(*) +from alltypesorc a join alltypesorc b on a.cint = b.cint +where + a.cint between 1000000 and 3000000 and b.cbigint is not null; + +explain +select + a.csmallint, count(*) c1 +from alltypesorc a join alltypesorc b on a.cint = b.cint +where + a.cint between 1000000 and 3000000 and b.cbigint is not null +group by a.csmallint +order by c1; + +select + a.csmallint, count(*) c1 +from alltypesorc a join alltypesorc b on a.cint = b.cint +where + a.cint between 1000000 and 3000000 and b.cbigint is not null +group by a.csmallint +order by c1; + +set hive.auto.convert.join=true; +set hive.optimize.dynamic.partition.hashjoin=true; +set hive.auto.convert.join.noconditionaltask.size=200000; +set hive.exec.reducers.bytes.per.reducer=200000; + +-- Try with dynamically partitioned hashjoin +explain +select + * +from alltypesorc a join alltypesorc b on a.cint = b.cint +where + a.cint between 1000000 and 3000000 and b.cbigint is not null +order by a.cint; + +select + * +from alltypesorc a join alltypesorc b on a.cint = b.cint +where + a.cint between 1000000 and 3000000 and b.cbigint is not null +order by a.cint; + +explain +select + count(*) +from alltypesorc a join alltypesorc b on a.cint = b.cint +where + a.cint between 1000000 and 3000000 and b.cbigint is not null; + +select + count(*) +from alltypesorc a join alltypesorc b on a.cint = b.cint +where + a.cint between 1000000 and 3000000 and b.cbigint is not null; + +explain +select + a.csmallint, count(*) c1 +from alltypesorc a join alltypesorc b on a.cint = b.cint +where + a.cint between 1000000 and 3000000 and b.cbigint is not null +group by a.csmallint +order by c1; + +select + a.csmallint, count(*) c1 +from alltypesorc a join alltypesorc b on a.cint = b.cint +where + a.cint between 1000000 and 3000000 and b.cbigint is not null +group by a.csmallint +order by c1; http://git-wip-us.apache.org/repos/asf/hive/blob/04d54f61/ql/src/test/queries/clientpositive/tez_dynpart_hashjoin_2.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/tez_dynpart_hashjoin_2.q b/ql/src/test/queries/clientpositive/tez_dynpart_hashjoin_2.q new file mode 100644 index 0000000..af4e2b8 --- /dev/null +++ b/ql/src/test/queries/clientpositive/tez_dynpart_hashjoin_2.q @@ -0,0 +1,83 @@ + +set hive.explain.user=false; +set hive.auto.convert.join=false; +set hive.optimize.dynamic.partition.hashjoin=false; + +-- Multiple tables, and change the order of the big table (alltypesorc) +-- First try with regular mergejoin +explain +select + a.* +from + alltypesorc a, + src b, + src c +where + a.csmallint = cast(b.key as int) and a.csmallint = (cast(c.key as int) + 0) + and (a.csmallint < 100) +order by a.csmallint, a.ctinyint, a.cint; + +select + a.* +from + alltypesorc a, + src b, + src c +where + a.csmallint = cast(b.key as int) and a.csmallint = (cast(c.key as int) + 0) + and (a.csmallint < 100) +order by a.csmallint, a.ctinyint, a.cint; + +set hive.auto.convert.join=true; +set hive.optimize.dynamic.partition.hashjoin=true; +set hive.auto.convert.join.noconditionaltask.size=2000; +set hive.exec.reducers.bytes.per.reducer=200000; + +-- noconditionaltask.size needs to be low enough that entire filtered table results do not fit in one task's hash table +-- Try with dynamically partitioned hash join +explain +select + a.* +from + alltypesorc a, + src b, + src c +where + a.csmallint = cast(b.key as int) and a.csmallint = (cast(c.key as int) + 0) + and (a.csmallint < 100) +order by a.csmallint, a.ctinyint, a.cint; + +select + a.* +from + alltypesorc a, + src b, + src c +where + a.csmallint = cast(b.key as int) and a.csmallint = (cast(c.key as int) + 0) + and (a.csmallint < 100) +order by a.csmallint, a.ctinyint, a.cint; + +-- Try different order of tables +explain +select + a.* +from + src b, + alltypesorc a, + src c +where + a.csmallint = cast(b.key as int) and a.csmallint = (cast(c.key as int) + 0) + and (a.csmallint < 100) +order by a.csmallint, a.ctinyint, a.cint; + +select + a.* +from + src b, + alltypesorc a, + src c +where + a.csmallint = cast(b.key as int) and a.csmallint = (cast(c.key as int) + 0) + and (a.csmallint < 100) +order by a.csmallint, a.ctinyint, a.cint; http://git-wip-us.apache.org/repos/asf/hive/blob/04d54f61/ql/src/test/queries/clientpositive/tez_vector_dynpart_hashjoin_1.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/tez_vector_dynpart_hashjoin_1.q b/ql/src/test/queries/clientpositive/tez_vector_dynpart_hashjoin_1.q new file mode 100644 index 0000000..65fee16 --- /dev/null +++ b/ql/src/test/queries/clientpositive/tez_vector_dynpart_hashjoin_1.q @@ -0,0 +1,102 @@ + +set hive.explain.user=false; +set hive.auto.convert.join=false; +set hive.optimize.dynamic.partition.hashjoin=false; + +-- First try with regular mergejoin +explain +select + * +from alltypesorc a join alltypesorc b on a.cint = b.cint +where + a.cint between 1000000 and 3000000 and b.cbigint is not null +order by a.cint; + +select + * +from alltypesorc a join alltypesorc b on a.cint = b.cint +where + a.cint between 1000000 and 3000000 and b.cbigint is not null +order by a.cint; + +explain +select + count(*) +from alltypesorc a join alltypesorc b on a.cint = b.cint +where + a.cint between 1000000 and 3000000 and b.cbigint is not null; + +select + count(*) +from alltypesorc a join alltypesorc b on a.cint = b.cint +where + a.cint between 1000000 and 3000000 and b.cbigint is not null; + +explain +select + a.csmallint, count(*) c1 +from alltypesorc a join alltypesorc b on a.cint = b.cint +where + a.cint between 1000000 and 3000000 and b.cbigint is not null +group by a.csmallint +order by c1; + +select + a.csmallint, count(*) c1 +from alltypesorc a join alltypesorc b on a.cint = b.cint +where + a.cint between 1000000 and 3000000 and b.cbigint is not null +group by a.csmallint +order by c1; + +set hive.auto.convert.join=true; +set hive.optimize.dynamic.partition.hashjoin=true; +set hive.auto.convert.join.noconditionaltask.size=200000; +set hive.exec.reducers.bytes.per.reducer=200000; +set hive.vectorized.execution.enabled=true; + +-- Try with dynamically partitioned hashjoin +explain +select + * +from alltypesorc a join alltypesorc b on a.cint = b.cint +where + a.cint between 1000000 and 3000000 and b.cbigint is not null +order by a.cint; + +select + * +from alltypesorc a join alltypesorc b on a.cint = b.cint +where + a.cint between 1000000 and 3000000 and b.cbigint is not null +order by a.cint; + +explain +select + count(*) +from alltypesorc a join alltypesorc b on a.cint = b.cint +where + a.cint between 1000000 and 3000000 and b.cbigint is not null; + +select + count(*) +from alltypesorc a join alltypesorc b on a.cint = b.cint +where + a.cint between 1000000 and 3000000 and b.cbigint is not null; + +explain +select + a.csmallint, count(*) c1 +from alltypesorc a join alltypesorc b on a.cint = b.cint +where + a.cint between 1000000 and 3000000 and b.cbigint is not null +group by a.csmallint +order by c1; + +select + a.csmallint, count(*) c1 +from alltypesorc a join alltypesorc b on a.cint = b.cint +where + a.cint between 1000000 and 3000000 and b.cbigint is not null +group by a.csmallint +order by c1; http://git-wip-us.apache.org/repos/asf/hive/blob/04d54f61/ql/src/test/queries/clientpositive/tez_vector_dynpart_hashjoin_2.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/tez_vector_dynpart_hashjoin_2.q b/ql/src/test/queries/clientpositive/tez_vector_dynpart_hashjoin_2.q new file mode 100644 index 0000000..606f455 --- /dev/null +++ b/ql/src/test/queries/clientpositive/tez_vector_dynpart_hashjoin_2.q @@ -0,0 +1,84 @@ + +set hive.explain.user=false; +set hive.auto.convert.join=false; +set hive.optimize.dynamic.partition.hashjoin=false; + +-- Multiple tables, and change the order of the big table (alltypesorc) +-- First try with regular mergejoin +explain +select + a.* +from + alltypesorc a, + src b, + src c +where + a.csmallint = cast(b.key as int) and a.csmallint = (cast(c.key as int) + 0) + and (a.csmallint < 100) +order by a.csmallint, a.ctinyint, a.cint; + +select + a.* +from + alltypesorc a, + src b, + src c +where + a.csmallint = cast(b.key as int) and a.csmallint = (cast(c.key as int) + 0) + and (a.csmallint < 100) +order by a.csmallint, a.ctinyint, a.cint; + +set hive.auto.convert.join=true; +set hive.optimize.dynamic.partition.hashjoin=true; +set hive.auto.convert.join.noconditionaltask.size=2000; +set hive.exec.reducers.bytes.per.reducer=200000; +set hive.vectorized.execution.enabled=true; + +-- noconditionaltask.size needs to be low enough that entire filtered table results do not fit in one task's hash table +-- Try with dynamically partitioned hash join +explain +select + a.* +from + alltypesorc a, + src b, + src c +where + a.csmallint = cast(b.key as int) and a.csmallint = (cast(c.key as int) + 0) + and (a.csmallint < 100) +order by a.csmallint, a.ctinyint, a.cint; + +select + a.* +from + alltypesorc a, + src b, + src c +where + a.csmallint = cast(b.key as int) and a.csmallint = (cast(c.key as int) + 0) + and (a.csmallint < 100) +order by a.csmallint, a.ctinyint, a.cint; + +-- Try different order of tables +explain +select + a.* +from + src b, + alltypesorc a, + src c +where + a.csmallint = cast(b.key as int) and a.csmallint = (cast(c.key as int) + 0) + and (a.csmallint < 100) +order by a.csmallint, a.ctinyint, a.cint; + +select + a.* +from + src b, + alltypesorc a, + src c +where + a.csmallint = cast(b.key as int) and a.csmallint = (cast(c.key as int) + 0) + and (a.csmallint < 100) +order by a.csmallint, a.ctinyint, a.cint;