HIVE-16546: LLAP: Fail map join tasks if hash table memory exceeds threshold (Prasanth Jayachandran reviewed by Sergey Shelukhin)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/0ffff404 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/0ffff404 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/0ffff404 Branch: refs/heads/master Commit: 0ffff404088a428da752a60a0847f51845e618ff Parents: c2637e6 Author: Prasanth Jayachandran <prasan...@apache.org> Authored: Tue May 2 10:53:19 2017 -0700 Committer: Prasanth Jayachandran <prasan...@apache.org> Committed: Tue May 2 10:53:19 2017 -0700 ---------------------------------------------------------------------- .../hadoop/hive/common/MemoryEstimate.java | 29 ++++++++++ .../org/apache/hadoop/hive/conf/HiveConf.java | 4 ++ .../llap/IncrementalObjectSizeEstimator.java | 4 +- .../UDAFTemplates/VectorUDAFAvg.txt | 2 +- .../UDAFTemplates/VectorUDAFMinMax.txt | 2 +- .../UDAFTemplates/VectorUDAFMinMaxDecimal.txt | 2 +- .../VectorUDAFMinMaxIntervalDayTime.txt | 2 +- .../UDAFTemplates/VectorUDAFMinMaxString.txt | 4 +- .../UDAFTemplates/VectorUDAFMinMaxTimestamp.txt | 2 +- .../UDAFTemplates/VectorUDAFSum.txt | 2 +- .../UDAFTemplates/VectorUDAFVar.txt | 2 +- .../UDAFTemplates/VectorUDAFVarDecimal.txt | 4 +- .../mapjoin/MapJoinMemoryExhaustionError.java | 28 ++++++++++ .../MapJoinMemoryExhaustionException.java | 29 ---------- .../mapjoin/MapJoinMemoryExhaustionHandler.java | 6 +- .../hadoop/hive/ql/exec/mr/MapredLocalTask.java | 5 +- .../persistence/BytesBytesMultiHashMap.java | 17 +++++- .../ql/exec/persistence/HashMapWrapper.java | 10 +++- .../persistence/HybridHashTableContainer.java | 5 ++ .../persistence/MapJoinBytesTableContainer.java | 58 +++++++++++++++++++- .../exec/persistence/MapJoinTableContainer.java | 3 +- .../hive/ql/exec/tez/HashTableLoader.java | 42 ++++++++++++-- .../hadoop/hive/ql/exec/tez/TezProcessor.java | 11 +++- .../vector/VectorAggregationBufferBatch.java | 4 +- .../ql/exec/vector/VectorGroupByOperator.java | 2 +- .../aggregates/VectorAggregateExpression.java | 2 +- .../aggregates/VectorUDAFAvgDecimal.java | 2 +- .../aggregates/VectorUDAFAvgTimestamp.java | 2 +- .../aggregates/VectorUDAFBloomFilter.java | 4 +- .../aggregates/VectorUDAFBloomFilterMerge.java | 2 +- .../expressions/aggregates/VectorUDAFCount.java | 2 +- .../aggregates/VectorUDAFCountMerge.java | 2 +- .../aggregates/VectorUDAFCountStar.java | 2 +- .../aggregates/VectorUDAFStdPopTimestamp.java | 2 +- .../aggregates/VectorUDAFStdSampTimestamp.java | 2 +- .../aggregates/VectorUDAFSumDecimal.java | 2 +- .../aggregates/VectorUDAFVarPopTimestamp.java | 2 +- .../aggregates/VectorUDAFVarSampTimestamp.java | 2 +- .../fast/VectorMapJoinFastBytesHashMap.java | 5 ++ .../VectorMapJoinFastBytesHashMultiSet.java | 5 ++ .../fast/VectorMapJoinFastBytesHashSet.java | 5 ++ .../fast/VectorMapJoinFastBytesHashTable.java | 6 ++ .../fast/VectorMapJoinFastHashTable.java | 7 +++ .../fast/VectorMapJoinFastHashTableLoader.java | 47 +++++++++++++++- .../mapjoin/fast/VectorMapJoinFastKeyStore.java | 11 +++- .../fast/VectorMapJoinFastLongHashMap.java | 9 ++- .../fast/VectorMapJoinFastLongHashMultiSet.java | 5 ++ .../fast/VectorMapJoinFastLongHashSet.java | 5 ++ .../fast/VectorMapJoinFastLongHashTable.java | 15 +++++ .../fast/VectorMapJoinFastMultiKeyHashMap.java | 5 ++ .../VectorMapJoinFastMultiKeyHashMultiSet.java | 4 ++ .../fast/VectorMapJoinFastMultiKeyHashSet.java | 5 +- .../fast/VectorMapJoinFastStringHashMap.java | 9 +++ .../VectorMapJoinFastStringHashMultiSet.java | 8 +++ .../fast/VectorMapJoinFastStringHashSet.java | 8 +++ .../fast/VectorMapJoinFastTableContainer.java | 16 +++++- .../fast/VectorMapJoinFastValueStore.java | 8 ++- .../hashtable/VectorMapJoinHashTable.java | 3 +- .../VectorMapJoinOptimizedHashSet.java | 5 ++ .../VectorMapJoinOptimizedHashTable.java | 9 +++ .../VectorMapJoinOptimizedStringHashSet.java | 8 +++ .../hive/ql/optimizer/ConvertJoinMapJoin.java | 3 +- .../hive/ql/optimizer/MapJoinProcessor.java | 8 +-- .../calcite/translator/HiveOpConverter.java | 2 +- .../physical/GenMRSkewJoinProcessor.java | 3 +- .../physical/GenSparkSkewJoinProcessor.java | 3 +- .../hadoop/hive/ql/parse/SemanticAnalyzer.java | 2 +- .../apache/hadoop/hive/ql/plan/JoinDesc.java | 17 +++++- .../apache/hadoop/hive/ql/plan/MapJoinDesc.java | 12 ++-- .../ql/udf/generic/GenericUDAFComputeStats.java | 22 ++++---- .../TestMapJoinMemoryExhaustionHandler.java | 4 +- .../apache/hadoop/hive/serde2/WriteBuffers.java | 25 ++++++++- .../hadoop/hive/ql/util/JavaDataModel.java | 26 ++++----- 73 files changed, 509 insertions(+), 133 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/common/src/java/org/apache/hadoop/hive/common/MemoryEstimate.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/common/MemoryEstimate.java b/common/src/java/org/apache/hadoop/hive/common/MemoryEstimate.java new file mode 100644 index 0000000..36ae56f --- /dev/null +++ b/common/src/java/org/apache/hadoop/hive/common/MemoryEstimate.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.common; + +/** + * Interface that can be used to provide size estimates based on data structures held in memory for an object instance. + */ +public interface MemoryEstimate { + /** + * Returns estimated memory size based {@link org.apache.hadoop.hive.ql.util.JavaDataModel} + * + * @return estimated memory size in bytes + */ + long getEstimatedMemorySize(); +} http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index ea8485d..84398c6 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -3161,6 +3161,10 @@ public class HiveConf extends Configuration { "hive.llap.mapjoin.memory.oversubscribe.factor amount of memory can be borrowed based on which mapjoin\n" + "conversion decision will be made). This is only an upper bound. Lower bound is determined by number of\n" + "executors and configured max concurrency."), + LLAP_MAPJOIN_MEMORY_MONITOR_CHECK_INTERVAL("hive.llap.mapjoin.memory.monitor.check.interval", 100000L, + "Check memory usage of mapjoin hash tables after every interval of this many rows. If map join hash table\n" + + "memory usage exceeds (hive.auto.convert.join.noconditionaltask.size * hive.hash.table.inflation.factor)\n" + + "when running in LLAP, tasks will get killed and not retried. Set the value to 0 to disable this feature."), LLAP_DAEMON_AM_REPORTER_MAX_THREADS("hive.llap.daemon.am-reporter.max.threads", 4, "Maximum number of threads to be used for AM reporter. If this is lower than number of\n" + "executors in llap daemon, it would be set to number of executors at runtime.", http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/llap-server/src/java/org/apache/hadoop/hive/llap/IncrementalObjectSizeEstimator.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/IncrementalObjectSizeEstimator.java b/llap-server/src/java/org/apache/hadoop/hive/llap/IncrementalObjectSizeEstimator.java index ff6e7ce..6cf8dbb 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/IncrementalObjectSizeEstimator.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/IncrementalObjectSizeEstimator.java @@ -116,7 +116,7 @@ public class IncrementalObjectSizeEstimator { addToProcessing(byType, stack, fieldObj, fieldClass); } } - estimator.directSize = JavaDataModel.alignUp( + estimator.directSize = (int) JavaDataModel.alignUp( estimator.directSize, memoryModel.memoryAlign()); } } @@ -454,7 +454,7 @@ public class IncrementalObjectSizeEstimator { if (len != 0) { int elementSize = getPrimitiveSize(e.field.getType().getComponentType()); arraySize += elementSize * len; - arraySize = JavaDataModel.alignUp(arraySize, memoryModel.memoryAlign()); + arraySize = (int) JavaDataModel.alignUp(arraySize, memoryModel.memoryAlign()); } referencedSize += arraySize; break; http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFAvg.txt ---------------------------------------------------------------------- diff --git a/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFAvg.txt b/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFAvg.txt index 4393c3b..46cbb5b 100644 --- a/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFAvg.txt +++ b/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFAvg.txt @@ -471,7 +471,7 @@ public class <ClassName> extends VectorAggregateExpression { } @Override - public int getAggregationBufferFixedSize() { + public long getAggregationBufferFixedSize() { JavaDataModel model = JavaDataModel.get(); return JavaDataModel.alignUp( model.object() + http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFMinMax.txt ---------------------------------------------------------------------- diff --git a/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFMinMax.txt b/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFMinMax.txt index 7468c2f..2261e1b 100644 --- a/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFMinMax.txt +++ b/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFMinMax.txt @@ -442,7 +442,7 @@ public class <ClassName> extends VectorAggregateExpression { } @Override - public int getAggregationBufferFixedSize() { + public long getAggregationBufferFixedSize() { JavaDataModel model = JavaDataModel.get(); return JavaDataModel.alignUp( model.object() + http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFMinMaxDecimal.txt ---------------------------------------------------------------------- diff --git a/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFMinMaxDecimal.txt b/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFMinMaxDecimal.txt index 57b7ea5..58d2d22 100644 --- a/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFMinMaxDecimal.txt +++ b/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFMinMaxDecimal.txt @@ -458,7 +458,7 @@ public class <ClassName> extends VectorAggregateExpression { } @Override - public int getAggregationBufferFixedSize() { + public long getAggregationBufferFixedSize() { JavaDataModel model = JavaDataModel.get(); return JavaDataModel.alignUp( model.object() + http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFMinMaxIntervalDayTime.txt ---------------------------------------------------------------------- diff --git a/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFMinMaxIntervalDayTime.txt b/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFMinMaxIntervalDayTime.txt index 749e97e..515692e 100644 --- a/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFMinMaxIntervalDayTime.txt +++ b/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFMinMaxIntervalDayTime.txt @@ -441,7 +441,7 @@ public class <ClassName> extends VectorAggregateExpression { } @Override - public int getAggregationBufferFixedSize() { + public long getAggregationBufferFixedSize() { JavaDataModel model = JavaDataModel.get(); return JavaDataModel.alignUp( model.object() + http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFMinMaxString.txt ---------------------------------------------------------------------- diff --git a/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFMinMaxString.txt b/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFMinMaxString.txt index 9dfc147..c210e4c 100644 --- a/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFMinMaxString.txt +++ b/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFMinMaxString.txt @@ -81,7 +81,7 @@ public class <ClassName> extends VectorAggregateExpression { @Override public int getVariableSize() { JavaDataModel model = JavaDataModel.get(); - return model.lengthForByteArrayOfSize(bytes.length); + return (int) model.lengthForByteArrayOfSize(bytes.length); } @Override @@ -388,7 +388,7 @@ public class <ClassName> extends VectorAggregateExpression { } @Override - public int getAggregationBufferFixedSize() { + public long getAggregationBufferFixedSize() { JavaDataModel model = JavaDataModel.get(); return JavaDataModel.alignUp( model.object() + http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFMinMaxTimestamp.txt ---------------------------------------------------------------------- diff --git a/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFMinMaxTimestamp.txt b/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFMinMaxTimestamp.txt index 32ecb34..074aefd 100644 --- a/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFMinMaxTimestamp.txt +++ b/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFMinMaxTimestamp.txt @@ -443,7 +443,7 @@ public class <ClassName> extends VectorAggregateExpression { } @Override - public int getAggregationBufferFixedSize() { + public long getAggregationBufferFixedSize() { JavaDataModel model = JavaDataModel.get(); return JavaDataModel.alignUp( model.object() + http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFSum.txt ---------------------------------------------------------------------- diff --git a/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFSum.txt b/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFSum.txt index bd0f14d..a89ae0a 100644 --- a/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFSum.txt +++ b/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFSum.txt @@ -433,7 +433,7 @@ public class <ClassName> extends VectorAggregateExpression { } @Override - public int getAggregationBufferFixedSize() { + public long getAggregationBufferFixedSize() { JavaDataModel model = JavaDataModel.get(); return JavaDataModel.alignUp( model.object(), http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFVar.txt ---------------------------------------------------------------------- diff --git a/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFVar.txt b/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFVar.txt index dc9d4b1..1e3516b 100644 --- a/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFVar.txt +++ b/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFVar.txt @@ -513,7 +513,7 @@ public class <ClassName> extends VectorAggregateExpression { } @Override - public int getAggregationBufferFixedSize() { + public long getAggregationBufferFixedSize() { JavaDataModel model = JavaDataModel.get(); return JavaDataModel.alignUp( model.object() + http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFVarDecimal.txt ---------------------------------------------------------------------- diff --git a/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFVarDecimal.txt b/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFVarDecimal.txt index 01062a9..b3ec7e9 100644 --- a/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFVarDecimal.txt +++ b/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFVarDecimal.txt @@ -467,7 +467,7 @@ public class <ClassName> extends VectorAggregateExpression { } @Override - public int getAggregationBufferFixedSize() { + public long getAggregationBufferFixedSize() { JavaDataModel model = JavaDataModel.get(); return JavaDataModel.alignUp( model.object() + @@ -488,4 +488,4 @@ public class <ClassName> extends VectorAggregateExpression { public void setInputExpression(VectorExpression inputExpression) { this.inputExpression = inputExpression; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionError.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionError.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionError.java new file mode 100644 index 0000000..4ad4f98 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionError.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.mapjoin; + +/** + * When this Error is thrown, better not retry. + */ +public class MapJoinMemoryExhaustionError extends Error { + private static final long serialVersionUID = 3678353959830506881L; + public MapJoinMemoryExhaustionError(String msg) { + super(msg); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionException.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionException.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionException.java deleted file mode 100644 index dbe00b6..0000000 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionException.java +++ /dev/null @@ -1,29 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.ql.exec.mapjoin; - -import org.apache.hadoop.hive.ql.metadata.HiveException; - - - -public class MapJoinMemoryExhaustionException extends HiveException { - private static final long serialVersionUID = 3678353959830506881L; - public MapJoinMemoryExhaustionException(String msg) { - super(msg); - } -} http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionHandler.java index 7fc3226..d5e81e1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionHandler.java @@ -86,17 +86,17 @@ public class MapJoinMemoryExhaustionHandler { * * @param tableContainerSize currently table container size * @param numRows number of rows processed - * @throws MapJoinMemoryExhaustionException + * @throws MapJoinMemoryExhaustionError */ public void checkMemoryStatus(long tableContainerSize, long numRows) - throws MapJoinMemoryExhaustionException { + throws MapJoinMemoryExhaustionError { long usedMemory = memoryMXBean.getHeapMemoryUsage().getUsed(); double percentage = (double) usedMemory / (double) maxHeapSize; String msg = Utilities.now() + "\tProcessing rows:\t" + numRows + "\tHashtable size:\t" + tableContainerSize + "\tMemory usage:\t" + usedMemory + "\tpercentage:\t" + percentageNumberFormat.format(percentage); console.printInfo(msg); if(percentage > maxMemoryUsage) { - throw new MapJoinMemoryExhaustionException(msg); + throw new MapJoinMemoryExhaustionError(msg); } } } http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java index 595d1bd..c5d4f9a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java @@ -60,7 +60,7 @@ import org.apache.hadoop.hive.ql.exec.SerializationUtilities; import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.Utilities; -import org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionException; +import org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionError; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.HiveInputFormat; import org.apache.hadoop.hive.ql.metadata.HiveException; @@ -69,7 +69,6 @@ import org.apache.hadoop.hive.ql.plan.FetchWork; import org.apache.hadoop.hive.ql.plan.MapredLocalWork; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.api.StageType; -import org.apache.hadoop.hive.ql.session.OperationLog; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; @@ -385,7 +384,7 @@ public class MapredLocalTask extends Task<MapredLocalWork> implements Serializab + Utilities.showTime(elapsed) + " sec."); } catch (Throwable throwable) { if (throwable instanceof OutOfMemoryError - || (throwable instanceof MapJoinMemoryExhaustionException)) { + || (throwable instanceof MapJoinMemoryExhaustionError)) { l4j.error("Hive Runtime Error: Map local work exhausted memory", throwable); return 3; } else { http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java index 04e24bd..360b639 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java @@ -24,6 +24,8 @@ import java.util.Map; import java.util.TreeMap; import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.hive.common.MemoryEstimate; +import org.apache.hadoop.hive.ql.util.JavaDataModel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; @@ -46,7 +48,7 @@ import com.google.common.annotations.VisibleForTesting; * Initially inspired by HPPC LongLongOpenHashMap; however, the code is almost completely reworked * and there's very little in common left save for quadratic probing (and that with some changes). */ -public final class BytesBytesMultiHashMap { +public final class BytesBytesMultiHashMap implements MemoryEstimate { public static final Logger LOG = LoggerFactory.getLogger(BytesBytesMultiHashMap.class); /* @@ -521,7 +523,18 @@ public final class BytesBytesMultiHashMap { * @return number of bytes */ public long memorySize() { - return writeBuffers.size() + refs.length * 8 + 100; + return getEstimatedMemorySize(); + } + + @Override + public long getEstimatedMemorySize() { + JavaDataModel jdm = JavaDataModel.get(); + long size = 0; + size += writeBuffers.getEstimatedMemorySize(); + size += jdm.lengthForLongArrayOfSize(refs.length); + // 11 primitive1 fields, 2 refs above with alignment + size += JavaDataModel.alignUp(15 * jdm.primitive1(), jdm.memoryAlign()); + return size; } public void seal() { http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java index a3bccc6..adf1a90 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java @@ -53,7 +53,7 @@ import org.apache.hadoop.io.Writable; public class HashMapWrapper extends AbstractMapJoinTableContainer implements Serializable { private static final long serialVersionUID = 1L; protected static final Logger LOG = LoggerFactory.getLogger(HashMapWrapper.class); - + private static final long DEFAULT_HASHMAP_ENTRY_SIZE = 1024L; // default threshold for using main memory based HashMap private static final int THRESHOLD = 1000000; private static final float LOADFACTOR = 0.75f; @@ -140,6 +140,14 @@ public class HashMapWrapper extends AbstractMapJoinTableContainer implements Ser return new GetAdaptor(keyTypeFromLoader); } + @Override + public long getEstimatedMemorySize() { + // TODO: Key and Values are Object[] which can be eagerly deserialized or lazily deserialized. To accurately + // estimate the entry size, every possible Objects in Key, Value should implement MemoryEstimate interface which + // is very intrusive. So assuming default entry size here. + return size() * DEFAULT_HASHMAP_ENTRY_SIZE; + } + private class GetAdaptor implements ReusableGetAdaptor { private Object[] currentKey; http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java index 04e89e8..6523f00 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java @@ -118,6 +118,11 @@ public class HybridHashTableContainer private final String spillLocalDirs; + @Override + public long getEstimatedMemorySize() { + return memoryUsed; + } + /** * This class encapsulates the triplet together since they are closely related to each other * The triplet: hashmap (either in memory or on disk), small table container, big table container http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java index c86e5f5..014d17a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java @@ -26,6 +26,7 @@ import java.util.Collections; import java.util.List; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.common.MemoryEstimate; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator; import org.apache.hadoop.hive.ql.exec.JoinUtil; @@ -33,6 +34,7 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorHashKeyWrapper; import org.apache.hadoop.hive.ql.exec.vector.VectorHashKeyWrapperBatch; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter; import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.util.JavaDataModel; import org.apache.hadoop.hive.serde2.ByteStream.Output; import org.apache.hadoop.hive.serde2.ByteStream.RandomAccessOutput; import org.apache.hadoop.hive.serde2.AbstractSerDe; @@ -72,6 +74,11 @@ public class MapJoinBytesTableContainer implements MapJoinTableContainer, MapJoinTableContainerDirectAccess { private static final Logger LOG = LoggerFactory.getLogger(MapJoinTableContainer.class); + // TODO: For object inspector fields, assigning 16KB for now. To better estimate the memory size every + // object inspectors have to implement MemoryEstimate interface which is a lot of change with little benefit compared + // to writing an instrumentation agent for object size estimation + public static final long DEFAULT_OBJECT_INSPECTOR_MEMORY_SIZE = 16 * 1024L; + private final BytesBytesMultiHashMap hashMap; /** The OI used to deserialize values. We never deserialize keys. */ private LazyBinaryStructObjectInspector internalValueOi; @@ -147,7 +154,7 @@ public class MapJoinBytesTableContainer this.notNullMarkers = notNullMarkers; } - public static interface KeyValueHelper extends BytesBytesMultiHashMap.KvSource { + public static interface KeyValueHelper extends BytesBytesMultiHashMap.KvSource, MemoryEstimate { void setKeyValue(Writable key, Writable val) throws SerDeException; /** Get hash value from the key. */ int getHashFromKey() throws SerDeException; @@ -216,6 +223,22 @@ public class MapJoinBytesTableContainer public int getHashFromKey() throws SerDeException { throw new UnsupportedOperationException("Not supported for MapJoinBytesTableContainer"); } + + @Override + public long getEstimatedMemorySize() { + JavaDataModel jdm = JavaDataModel.get(); + long size = 0; + size += keySerDe == null ? 0 : jdm.object(); + size += valSerDe == null ? 0 : jdm.object(); + size += keySoi == null ? 0 : DEFAULT_OBJECT_INSPECTOR_MEMORY_SIZE; + size += valSoi == null ? 0 : DEFAULT_OBJECT_INSPECTOR_MEMORY_SIZE; + size += keyOis == null ? 0 : jdm.arrayList() + keyOis.size() * DEFAULT_OBJECT_INSPECTOR_MEMORY_SIZE; + size += valOis == null ? 0 : jdm.arrayList() + valOis.size() * DEFAULT_OBJECT_INSPECTOR_MEMORY_SIZE; + size += keyObjs == null ? 0 : jdm.array() + keyObjs.length * jdm.object(); + size += valObjs == null ? 0 : jdm.array() + valObjs.length * jdm.object(); + size += jdm.primitive1(); + return size; + } } static class LazyBinaryKvWriter implements KeyValueHelper { @@ -319,6 +342,15 @@ public class MapJoinBytesTableContainer aliasFilter &= filterGetter.getShort(); return aliasFilter; } + + @Override + public long getEstimatedMemorySize() { + JavaDataModel jdm = JavaDataModel.get(); + long size = 0; + size += (4 * jdm.object()); + size += jdm.primitive1(); + return size; + } } /* @@ -361,6 +393,15 @@ public class MapJoinBytesTableContainer int keyLength = key.getLength(); return HashCodeUtil.murmurHash(keyBytes, 0, keyLength); } + + @Override + public long getEstimatedMemorySize() { + JavaDataModel jdm = JavaDataModel.get(); + long size = 0; + size += jdm.object() + (key == null ? 0 : key.getCapacity()); + size += jdm.object() + (val == null ? 0 : val.getCapacity()); + return size; + } } @Override @@ -768,4 +809,19 @@ public class MapJoinBytesTableContainer public int size() { return hashMap.size(); } + + @Override + public long getEstimatedMemorySize() { + JavaDataModel jdm = JavaDataModel.get(); + long size = 0; + size += hashMap.getEstimatedMemorySize(); + size += directWriteHelper == null ? 0 : directWriteHelper.getEstimatedMemorySize(); + size += writeHelper == null ? 0 : writeHelper.getEstimatedMemorySize(); + size += sortableSortOrders == null ? 0 : jdm.lengthForBooleanArrayOfSize(sortableSortOrders.length); + size += nullMarkers == null ? 0 : jdm.lengthForByteArrayOfSize(nullMarkers.length); + size += notNullMarkers == null ? 0 : jdm.lengthForByteArrayOfSize(notNullMarkers.length); + size += jdm.arrayList(); // empty list + size += DEFAULT_OBJECT_INSPECTOR_MEMORY_SIZE; + return size; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainer.java index 6d71fef..5ca5ff6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainer.java @@ -23,6 +23,7 @@ import java.util.List; import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator; import org.apache.hadoop.hive.ql.exec.JoinUtil; +import org.apache.hadoop.hive.common.MemoryEstimate; import org.apache.hadoop.hive.ql.exec.vector.VectorHashKeyWrapper; import org.apache.hadoop.hive.ql.exec.vector.VectorHashKeyWrapperBatch; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter; @@ -31,7 +32,7 @@ import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.io.Writable; -public interface MapJoinTableContainer { +public interface MapJoinTableContainer extends MemoryEstimate { /** * Retrieve rows from hashtable key by key, one key at a time, w/o copying the structures * for each key. "Old" HashMapWrapper will still create/retrieve new objects for java HashMap; http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java index 7b13e90..7011d23 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java @@ -24,6 +24,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionError; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -146,7 +147,20 @@ public class HashTableLoader implements org.apache.hadoop.hive.ql.exec.HashTable } nwayConf.setNumberOfPartitions(numPartitions); } - + final float inflationFactor = HiveConf.getFloatVar(hconf, HiveConf.ConfVars.HIVE_HASH_TABLE_INFLATION_FACTOR); + final long memoryCheckInterval = HiveConf.getLongVar(hconf, + HiveConf.ConfVars.LLAP_MAPJOIN_MEMORY_MONITOR_CHECK_INTERVAL); + final boolean isLlap = "llap".equals(HiveConf.getVar(hconf, HiveConf.ConfVars.HIVE_EXECUTION_MODE)); + long numEntries = 0; + long noCondTaskSize = desc.getNoConditionalTaskSize(); + boolean doMemCheck = isLlap && inflationFactor > 0.0f && noCondTaskSize > 0 && memoryCheckInterval > 0; + if (!doMemCheck) { + LOG.info("Not doing hash table memory monitoring. isLlap: {} inflationFactor: {} noConditionalTaskSize: {} " + + "memoryCheckInterval: {}", isLlap, inflationFactor, noCondTaskSize, memoryCheckInterval); + } else { + LOG.info("Memory monitoring for hash table loader enabled. noconditionalTaskSize: {} inflationFactor: {} ", + noCondTaskSize, inflationFactor); + } for (int pos = 0; pos < mapJoinTables.length; pos++) { if (pos == desc.getPosBigTable()) { continue; @@ -205,12 +219,32 @@ public class HashTableLoader implements org.apache.hadoop.hive.ql.exec.HashTable tableContainer = new HashMapWrapper(hconf, keyCount); } - LOG.info("Using tableContainer " + tableContainer.getClass().getSimpleName()); + LOG.info("Using tableContainer: " + tableContainer.getClass().getSimpleName()); tableContainer.setSerde(keyCtx, valCtx); while (kvReader.next()) { - tableContainer.putRow( - (Writable)kvReader.getCurrentKey(), (Writable)kvReader.getCurrentValue()); + tableContainer.putRow((Writable) kvReader.getCurrentKey(), (Writable) kvReader.getCurrentValue()); + numEntries++; + if (doMemCheck && ((numEntries % memoryCheckInterval) == 0)) { + final long estMemUsage = tableContainer.getEstimatedMemorySize(); + final long threshold = (long) (inflationFactor * noCondTaskSize); + // guard against poor configuration of noconditional task size. We let hash table grow till 2/3'rd memory + // available for container/executor + final long effectiveThreshold = (long) Math.max(threshold, (2.0/3.0) * desc.getMaxMemoryAvailable()); + if (estMemUsage > effectiveThreshold) { + String msg = "Hash table loading exceeded memory limits." + + " estimatedMemoryUsage: " + estMemUsage + " noconditionalTaskSize: " + noCondTaskSize + + " inflationFactor: " + inflationFactor + " threshold: " + threshold + + " effectiveThreshold: " + effectiveThreshold; + LOG.error(msg); + throw new MapJoinMemoryExhaustionError(msg); + } else { + if (LOG.isInfoEnabled()) { + LOG.info("Checking hash table loader memory usage.. numEntries: {} estimatedMemoryUsage: {} " + + "effectiveThreshold: {}", numEntries, estMemUsage, effectiveThreshold); + } + } + } } tableContainer.seal(); LOG.info("Finished loading hashtable using " + tableContainer.getClass() + ". Small table position: " + pos); http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java index 486d43a..4242262 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java @@ -23,6 +23,8 @@ import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionError; +import org.apache.tez.runtime.api.TaskFailureType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -41,6 +43,8 @@ import org.apache.tez.runtime.api.LogicalOutput; import org.apache.tez.runtime.api.ProcessorContext; import org.apache.tez.runtime.library.api.KeyValueWriter; +import com.google.common.base.Throwables; + /** * Hive processor for Tez that forms the vertices in Tez and processes the data. * Does what ExecMapper and ExecReducer does for hive in MR framework. @@ -189,8 +193,11 @@ public class TezProcessor extends AbstractLogicalIOProcessor { } catch (Throwable t) { originalThrowable = t; } finally { - if (originalThrowable != null && originalThrowable instanceof Error) { - LOG.error(StringUtils.stringifyException(originalThrowable)); + if (originalThrowable != null && (originalThrowable instanceof Error || + Throwables.getRootCause(originalThrowable) instanceof Error)) { + LOG.error("Cannot recover from this FATAL error", StringUtils.stringifyException(originalThrowable)); + getContext().reportFailure(TaskFailureType.FATAL, originalThrowable, + "Cannot recover from this error"); throw new RuntimeException(originalThrowable); } http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAggregationBufferBatch.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAggregationBufferBatch.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAggregationBufferBatch.java index 630046d..84128e8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAggregationBufferBatch.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAggregationBufferBatch.java @@ -57,7 +57,7 @@ public class VectorAggregationBufferBatch { /** * Memory consumed by a set of aggregation buffers */ - private int aggregatorsFixedSize; + private long aggregatorsFixedSize; /** * Array of indexes for aggregators that have variable size @@ -76,7 +76,7 @@ public class VectorAggregationBufferBatch { * Returns the fixed size consumed by the aggregation buffers * @return */ - public int getAggregatorsFixedSize() { + public long getAggregatorsFixedSize() { return aggregatorsFixedSize; } http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java index 5b4c7c3..30916a0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java @@ -286,7 +286,7 @@ public class VectorGroupByOperator extends Operator<GroupByDesc> implements /** * Total per hashtable entry fixed memory (does not depend on key/agg values). */ - private int fixedHashEntrySize; + private long fixedHashEntrySize; /** * Average per hashtable entry variable size memory (depends on key/agg value). http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorAggregateExpression.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorAggregateExpression.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorAggregateExpression.java index 0866f63..7ab4473 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorAggregateExpression.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorAggregateExpression.java @@ -52,7 +52,7 @@ public abstract class VectorAggregateExpression implements Serializable { public abstract Object evaluateOutput(AggregationBuffer agg) throws HiveException; public abstract ObjectInspector getOutputObjectInspector(); - public abstract int getAggregationBufferFixedSize(); + public abstract long getAggregationBufferFixedSize(); public boolean hasVariableSize() { return false; } http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFAvgDecimal.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFAvgDecimal.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFAvgDecimal.java index 74e25ae..4aac9d3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFAvgDecimal.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFAvgDecimal.java @@ -492,7 +492,7 @@ public class VectorUDAFAvgDecimal extends VectorAggregateExpression { } @Override - public int getAggregationBufferFixedSize() { + public long getAggregationBufferFixedSize() { JavaDataModel model = JavaDataModel.get(); return JavaDataModel.alignUp( model.object() + http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFAvgTimestamp.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFAvgTimestamp.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFAvgTimestamp.java index 483d9dc..365dcf6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFAvgTimestamp.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFAvgTimestamp.java @@ -464,7 +464,7 @@ public class VectorUDAFAvgTimestamp extends VectorAggregateExpression { } @Override - public int getAggregationBufferFixedSize() { + public long getAggregationBufferFixedSize() { JavaDataModel model = JavaDataModel.get(); return JavaDataModel.alignUp( model.object() + http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFBloomFilter.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFBloomFilter.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFBloomFilter.java index 2139eae..52b05ca 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFBloomFilter.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFBloomFilter.java @@ -383,7 +383,7 @@ public class VectorUDAFBloomFilter extends VectorAggregateExpression { } @Override - public int getAggregationBufferFixedSize() { + public long getAggregationBufferFixedSize() { if (bitSetSize < 0) { // Not pretty, but we need a way to get the size try { @@ -396,7 +396,7 @@ public class VectorUDAFBloomFilter extends VectorAggregateExpression { // BloomFilter: object(BitSet: object(data: long[]), numBits: int, numHashFunctions: int) JavaDataModel model = JavaDataModel.get(); - int bloomFilterSize = JavaDataModel.alignUp(model.object() + model.lengthForLongArrayOfSize(bitSetSize), + long bloomFilterSize = JavaDataModel.alignUp(model.object() + model.lengthForLongArrayOfSize(bitSetSize), model.memoryAlign()); return JavaDataModel.alignUp( model.object() + bloomFilterSize + model.primitive1() + model.primitive1(), http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFBloomFilterMerge.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFBloomFilterMerge.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFBloomFilterMerge.java index d2446d5..b986eb4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFBloomFilterMerge.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFBloomFilterMerge.java @@ -339,7 +339,7 @@ public class VectorUDAFBloomFilterMerge extends VectorAggregateExpression { } @Override - public int getAggregationBufferFixedSize() { + public long getAggregationBufferFixedSize() { if (aggBufferSize < 0) { // Not pretty, but we need a way to get the size try { http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCount.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCount.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCount.java index 494febc..cadb6dd 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCount.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCount.java @@ -259,7 +259,7 @@ public class VectorUDAFCount extends VectorAggregateExpression { } @Override - public int getAggregationBufferFixedSize() { + public long getAggregationBufferFixedSize() { JavaDataModel model = JavaDataModel.get(); return JavaDataModel.alignUp( model.object() + http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCountMerge.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCountMerge.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCountMerge.java index dec88cb..c489f8f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCountMerge.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCountMerge.java @@ -385,7 +385,7 @@ public class VectorUDAFCountMerge extends VectorAggregateExpression { } @Override - public int getAggregationBufferFixedSize() { + public long getAggregationBufferFixedSize() { JavaDataModel model = JavaDataModel.get(); return JavaDataModel.alignUp( model.object() + http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCountStar.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCountStar.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCountStar.java index 337ba0a..3b66030 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCountStar.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCountStar.java @@ -142,7 +142,7 @@ public class VectorUDAFCountStar extends VectorAggregateExpression { } @Override - public int getAggregationBufferFixedSize() { + public long getAggregationBufferFixedSize() { JavaDataModel model = JavaDataModel.get(); return JavaDataModel.alignUp( model.object() + http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFStdPopTimestamp.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFStdPopTimestamp.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFStdPopTimestamp.java index 8cd3506..5388d37 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFStdPopTimestamp.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFStdPopTimestamp.java @@ -508,7 +508,7 @@ public class VectorUDAFStdPopTimestamp extends VectorAggregateExpression { } @Override - public int getAggregationBufferFixedSize() { + public long getAggregationBufferFixedSize() { JavaDataModel model = JavaDataModel.get(); return JavaDataModel.alignUp( model.object() + http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFStdSampTimestamp.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFStdSampTimestamp.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFStdSampTimestamp.java index 61d6977..1769dc0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFStdSampTimestamp.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFStdSampTimestamp.java @@ -508,7 +508,7 @@ public class VectorUDAFStdSampTimestamp extends VectorAggregateExpression { } @Override - public int getAggregationBufferFixedSize() { + public long getAggregationBufferFixedSize() { JavaDataModel model = JavaDataModel.get(); return JavaDataModel.alignUp( model.object() + http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFSumDecimal.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFSumDecimal.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFSumDecimal.java index b10f66f..a37e3f6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFSumDecimal.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFSumDecimal.java @@ -431,7 +431,7 @@ public class VectorUDAFSumDecimal extends VectorAggregateExpression { } @Override - public int getAggregationBufferFixedSize() { + public long getAggregationBufferFixedSize() { JavaDataModel model = JavaDataModel.get(); return JavaDataModel.alignUp( model.object(), http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFVarPopTimestamp.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFVarPopTimestamp.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFVarPopTimestamp.java index 2709b07..61cdeaa 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFVarPopTimestamp.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFVarPopTimestamp.java @@ -508,7 +508,7 @@ public class VectorUDAFVarPopTimestamp extends VectorAggregateExpression { } @Override - public int getAggregationBufferFixedSize() { + public long getAggregationBufferFixedSize() { JavaDataModel model = JavaDataModel.get(); return JavaDataModel.alignUp( model.object() + http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFVarSampTimestamp.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFVarSampTimestamp.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFVarSampTimestamp.java index 03dce1e..c375461 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFVarSampTimestamp.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFVarSampTimestamp.java @@ -508,7 +508,7 @@ public class VectorUDAFVarSampTimestamp extends VectorAggregateExpression { } @Override - public int getAggregationBufferFixedSize() { + public long getAggregationBufferFixedSize() { JavaDataModel model = JavaDataModel.get(); return JavaDataModel.alignUp( model.object() + http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMap.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMap.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMap.java index 6242daf..b5eab8b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMap.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMap.java @@ -107,4 +107,9 @@ public abstract class VectorMapJoinFastBytesHashMap // Share the same write buffers with our value store. keyStore = new VectorMapJoinFastKeyStore(valueStore.writeBuffers()); } + + @Override + public long getEstimatedMemorySize() { + return super.getEstimatedMemorySize() + valueStore.getEstimatedMemorySize() + keyStore.getEstimatedMemorySize(); + } } http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMultiSet.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMultiSet.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMultiSet.java index 1a41961..e779762 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMultiSet.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMultiSet.java @@ -97,4 +97,9 @@ public abstract class VectorMapJoinFastBytesHashMultiSet keyStore = new VectorMapJoinFastKeyStore(writeBuffersSize); } + + @Override + public long getEstimatedMemorySize() { + return super.getEstimatedMemorySize() + keyStore.getEstimatedMemorySize(); + } } http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashSet.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashSet.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashSet.java index 331867c..d493319 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashSet.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashSet.java @@ -84,4 +84,9 @@ public abstract class VectorMapJoinFastBytesHashSet keyStore = new VectorMapJoinFastKeyStore(writeBuffersSize); } + + @Override + public long getEstimatedMemorySize() { + return super.getEstimatedMemorySize() + keyStore.getEstimatedMemorySize(); + } } http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashTable.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashTable.java index b93e977..10bc902 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashTable.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashTable.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast; import java.io.IOException; +import org.apache.hadoop.hive.ql.util.JavaDataModel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinBytesHashTable; @@ -218,4 +219,9 @@ public abstract class VectorMapJoinFastBytesHashTable super(initialCapacity, loadFactor, writeBuffersSize, estimatedKeyCount); allocateBucketArray(); } + + @Override + public long getEstimatedMemorySize() { + return super.getEstimatedMemorySize() + JavaDataModel.get().lengthForLongArrayOfSize(slotTriples.length); + } } http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTable.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTable.java index b6db3bc..1f182ee 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTable.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTable.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast; +import org.apache.hadoop.hive.ql.util.JavaDataModel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinHashTable; @@ -88,4 +89,10 @@ public abstract class VectorMapJoinFastHashTable implements VectorMapJoinHashTab public int size() { return keysAssigned; } + + @Override + public long getEstimatedMemorySize() { + JavaDataModel jdm = JavaDataModel.get(); + return JavaDataModel.alignUp(10L * jdm.primitive1() + jdm.primitive2(), jdm.memoryAlign()); + } } http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableLoader.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableLoader.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableLoader.java index 49ecdd1..b015e43 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableLoader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableLoader.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.Collections; import java.util.Map; +import org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionError; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -28,7 +29,6 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.MapJoinOperator; import org.apache.hadoop.hive.ql.exec.MapredContext; import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext; -import org.apache.hadoop.hive.ql.exec.persistence.HashMapWrapper; import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainer; import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainerSerDe; import org.apache.hadoop.hive.ql.exec.tez.TezContext; @@ -68,6 +68,21 @@ public class VectorMapJoinFastHashTableLoader implements org.apache.hadoop.hive. Map<Integer, String> parentToInput = desc.getParentToInput(); Map<Integer, Long> parentKeyCounts = desc.getParentKeyCounts(); + final float inflationFactor = HiveConf.getFloatVar(hconf, HiveConf.ConfVars.HIVE_HASH_TABLE_INFLATION_FACTOR); + final long memoryCheckInterval = HiveConf.getLongVar(hconf, + HiveConf.ConfVars.LLAP_MAPJOIN_MEMORY_MONITOR_CHECK_INTERVAL); + final boolean isLlap = "llap".equals(HiveConf.getVar(hconf, HiveConf.ConfVars.HIVE_EXECUTION_MODE)); + long numEntries = 0; + long noCondTaskSize = desc.getNoConditionalTaskSize(); + boolean doMemCheck = isLlap && inflationFactor > 0.0f && noCondTaskSize > 0 && memoryCheckInterval > 0; + if (!doMemCheck) { + LOG.info("Not doing hash table memory monitoring. isLlap: {} inflationFactor: {} noConditionalTaskSize: {} " + + "memoryCheckInterval: {}", isLlap, inflationFactor, noCondTaskSize, memoryCheckInterval); + } else { + LOG.info("Memory monitoring for hash table loader enabled. noconditionalTaskSize: {} inflationFactor: {} ", + noCondTaskSize, inflationFactor); + } + for (int pos = 0; pos < mapJoinTables.length; pos++) { if (pos == desc.getPosBigTable()) { continue; @@ -93,15 +108,41 @@ public class VectorMapJoinFastHashTableLoader implements org.apache.hadoop.hive. VectorMapJoinFastTableContainer vectorMapJoinFastTableContainer = new VectorMapJoinFastTableContainer(desc, hconf, keyCount); + LOG.info("Using vectorMapJoinFastTableContainer: " + vectorMapJoinFastTableContainer.getClass().getSimpleName()); + vectorMapJoinFastTableContainer.setSerde(null, null); // No SerDes here. while (kvReader.next()) { vectorMapJoinFastTableContainer.putRow((BytesWritable)kvReader.getCurrentKey(), (BytesWritable)kvReader.getCurrentValue()); + numEntries++; + if (doMemCheck && numEntries >= memoryCheckInterval) { + if (doMemCheck && ((numEntries % memoryCheckInterval) == 0)) { + final long estMemUsage = vectorMapJoinFastTableContainer.getEstimatedMemorySize(); + final long threshold = (long) (inflationFactor * noCondTaskSize); + // guard against poor configuration of noconditional task size. We let hash table grow till 2/3'rd memory + // available for container/executor + final long effectiveThreshold = (long) Math.max(threshold, (2.0/3.0) * desc.getMaxMemoryAvailable()); + if (estMemUsage > effectiveThreshold) { + String msg = "VectorMapJoin Hash table loading exceeded memory limits." + + " estimatedMemoryUsage: " + estMemUsage + " noconditionalTaskSize: " + noCondTaskSize + + " inflationFactor: " + inflationFactor + " threshold: " + threshold + + " effectiveThreshold: " + effectiveThreshold; + LOG.error(msg); + throw new MapJoinMemoryExhaustionError(msg); + } else { + if (LOG.isInfoEnabled()) { + LOG.info("Checking vector mapjoin hash table loader memory usage.. numEntries: {} " + + "estimatedMemoryUsage: {} effectiveThreshold: {}", numEntries, estMemUsage, effectiveThreshold); + } + } + } + } } vectorMapJoinFastTableContainer.seal(); - mapJoinTables[pos] = (MapJoinTableContainer) vectorMapJoinFastTableContainer; - + mapJoinTables[pos] = vectorMapJoinFastTableContainer; + LOG.info("Finished loading hashtable using " + vectorMapJoinFastTableContainer.getClass() + + ". Small table position: " + pos); } catch (IOException e) { throw new HiveException(e); } catch (SerDeException e) { http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastKeyStore.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastKeyStore.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastKeyStore.java index be51693..3e9ff84 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastKeyStore.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastKeyStore.java @@ -18,13 +18,14 @@ package org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast; +import org.apache.hadoop.hive.common.MemoryEstimate; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.serde2.WriteBuffers; // Optimized for sequential key lookup. -public class VectorMapJoinFastKeyStore { +public class VectorMapJoinFastKeyStore implements MemoryEstimate { private static final Logger LOG = LoggerFactory.getLogger(VectorMapJoinFastKeyStore.class.getName()); @@ -165,4 +166,12 @@ public class VectorMapJoinFastKeyStore { this.writeBuffers = writeBuffers; unsafeReadPos = new WriteBuffers.Position(); } + + @Override + public long getEstimatedMemorySize() { + long size = 0; + size += writeBuffers == null ? 0 : writeBuffers.getEstimatedMemorySize(); + size += unsafeReadPos == null ? 0 : unsafeReadPos.getEstimatedMemorySize(); + return size; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMap.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMap.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMap.java index 6fe98f9..d4847b5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMap.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMap.java @@ -20,6 +20,8 @@ package org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast; import java.io.IOException; +import org.apache.hadoop.hive.common.MemoryEstimate; +import org.apache.hadoop.hive.ql.util.JavaDataModel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.ql.exec.JoinUtil; @@ -37,7 +39,7 @@ import com.google.common.annotations.VisibleForTesting; */ public class VectorMapJoinFastLongHashMap extends VectorMapJoinFastLongHashTable - implements VectorMapJoinLongHashMap { + implements VectorMapJoinLongHashMap, MemoryEstimate { public static final Logger LOG = LoggerFactory.getLogger(VectorMapJoinFastLongHashMap.class); @@ -112,4 +114,9 @@ public class VectorMapJoinFastLongHashMap initialCapacity, loadFactor, writeBuffersSize, estimatedKeyCount); valueStore = new VectorMapJoinFastValueStore(writeBuffersSize); } + + @Override + public long getEstimatedMemorySize() { + return super.getEstimatedMemorySize() + valueStore.getEstimatedMemorySize(); + } } http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMultiSet.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMultiSet.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMultiSet.java index 9140aee..566cfa2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMultiSet.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMultiSet.java @@ -100,4 +100,9 @@ public class VectorMapJoinFastLongHashMultiSet super(minMaxEnabled, isOuterJoin, hashTableKeyType, initialCapacity, loadFactor, writeBuffersSize, estimatedKeyCount); } + + @Override + public long getEstimatedMemorySize() { + return super.getEstimatedMemorySize(); + } } http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashSet.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashSet.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashSet.java index d3efb11..fb7ae62 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashSet.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashSet.java @@ -96,4 +96,9 @@ public class VectorMapJoinFastLongHashSet super(minMaxEnabled, isOuterJoin, hashTableKeyType, initialCapacity, loadFactor, writeBuffersSize, estimatedKeyCount); } + + @Override + public long getEstimatedMemorySize() { + return super.getEstimatedMemorySize(); + } } http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashTable.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashTable.java index 8bfa07c..54e667c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashTable.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashTable.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast; import java.io.IOException; +import org.apache.hadoop.hive.ql.util.JavaDataModel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.ql.exec.JoinUtil; @@ -280,4 +281,18 @@ public abstract class VectorMapJoinFastLongHashTable min = Long.MAX_VALUE; max = Long.MIN_VALUE; } + + @Override + public long getEstimatedMemorySize() { + JavaDataModel jdm = JavaDataModel.get(); + long size = super.getEstimatedMemorySize(); + size += slotPairs == null ? 0 : jdm.lengthForLongArrayOfSize(slotPairs.length); + size += (2 * jdm.primitive2()); + size += (2 * jdm.primitive1()); + size += jdm.object(); + // adding 16KB constant memory for keyBinarySortableDeserializeRead as the rabit hole is deep to implement + // MemoryEstimate interface, also it is constant overhead + size += (16 * 1024L); + return size; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashMap.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashMap.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashMap.java index add4788..eb08aa9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashMap.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashMap.java @@ -53,4 +53,9 @@ public class VectorMapJoinFastMultiKeyHashMap int initialCapacity, float loadFactor, int writeBuffersSize, long estimatedKeyCount) { super(initialCapacity, loadFactor, writeBuffersSize, estimatedKeyCount); } + + @Override + public long getEstimatedMemorySize() { + return super.getEstimatedMemorySize(); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashMultiSet.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashMultiSet.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashMultiSet.java index faefdbb..56964bc 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashMultiSet.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashMultiSet.java @@ -52,4 +52,8 @@ public class VectorMapJoinFastMultiKeyHashMultiSet super(initialCapacity, loadFactor, writeBuffersSize, estimatedKeyCount); } + @Override + public long getEstimatedMemorySize() { + return super.getEstimatedMemorySize(); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashSet.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashSet.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashSet.java index 5328910..46bafe0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashSet.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashSet.java @@ -52,5 +52,8 @@ public class VectorMapJoinFastMultiKeyHashSet super(initialCapacity, loadFactor, writeBuffersSize, estimatedKeyCount); } - + @Override + public long getEstimatedMemorySize() { + return super.getEstimatedMemorySize(); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashMap.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashMap.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashMap.java index f13034f..d04590a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashMap.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashMap.java @@ -43,4 +43,13 @@ public class VectorMapJoinFastStringHashMap extends VectorMapJoinFastBytesHashMa super(initialCapacity, loadFactor, writeBuffersSize, estimatedKeyCount); stringCommon = new VectorMapJoinFastStringCommon(isOuterJoin); } + + @Override + public long getEstimatedMemorySize() { + long size = 0; + // adding 16KB constant memory for stringCommon as the rabit hole is deep to implement + // MemoryEstimate interface, also it is constant overhead + size += (16 * 1024L); + return super.getEstimatedMemorySize() + size; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashMultiSet.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashMultiSet.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashMultiSet.java index 53ad7b4..b24bfdf 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashMultiSet.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashMultiSet.java @@ -43,4 +43,12 @@ public class VectorMapJoinFastStringHashMultiSet extends VectorMapJoinFastBytesH super(initialCapacity, loadFactor, writeBuffersSize, estimatedKeyCount); stringCommon = new VectorMapJoinFastStringCommon(isOuterJoin); } + + @Override + public long getEstimatedMemorySize() { + // adding 16KB constant memory for stringCommon as the rabit hole is deep to implement + // MemoryEstimate interface, also it is constant overhead + long size = (16 * 1024L); + return super.getEstimatedMemorySize() + size; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashSet.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashSet.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashSet.java index 723c729..75fae25 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashSet.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashSet.java @@ -43,4 +43,12 @@ public class VectorMapJoinFastStringHashSet extends VectorMapJoinFastBytesHashSe super(initialCapacity, loadFactor, writeBuffersSize, estimatedKeyCount); stringCommon = new VectorMapJoinFastStringCommon(isOuterJoin); } + + @Override + public long getEstimatedMemorySize() { + // adding 16KB constant memory for stringCommon as the rabit hole is deep to implement + // MemoryEstimate interface, also it is constant overhead + long size = (16 * 1024L); + return super.getEstimatedMemorySize() + size; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastTableContainer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastTableContainer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastTableContainer.java index 05f1cf1..2fe4b93 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastTableContainer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastTableContainer.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast; import java.io.IOException; +import org.apache.hadoop.hive.ql.util.JavaDataModel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -26,7 +27,6 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.persistence.HashMapWrapper; import org.apache.hadoop.hive.ql.exec.persistence.MapJoinKey; import org.apache.hadoop.hive.ql.exec.persistence.MapJoinObjectSerDeContext; -import org.apache.hadoop.hive.ql.exec.tez.HashTableLoader; import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinHashTable; import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinTableContainer; import org.apache.hadoop.hive.ql.metadata.HiveException; @@ -38,7 +38,6 @@ import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.HashTableKind; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Writable; -import org.apache.tez.runtime.library.api.KeyValueReader; /** * HashTableLoader for Tez constructs the hashtable from records read from @@ -46,7 +45,7 @@ import org.apache.tez.runtime.library.api.KeyValueReader; */ public class VectorMapJoinFastTableContainer implements VectorMapJoinTableContainer { - private static final Logger LOG = LoggerFactory.getLogger(HashTableLoader.class.getName()); + private static final Logger LOG = LoggerFactory.getLogger(VectorMapJoinFastTableContainer.class.getName()); private final MapJoinDesc desc; private final Configuration hconf; @@ -219,6 +218,17 @@ public class VectorMapJoinFastTableContainer implements VectorMapJoinTableContai } @Override + public long getEstimatedMemorySize() { + JavaDataModel jdm = JavaDataModel.get(); + long size = 0; + size += vectorMapJoinFastHashTable.getEstimatedMemorySize(); + size += (4 * jdm.primitive1()); + size += (2 * jdm.object()); + size += (jdm.primitive2()); + return size; + } + + @Override public void setSerde(MapJoinObjectSerDeContext keyCtx, MapJoinObjectSerDeContext valCtx) throws SerDeException { // Do nothing in this case. http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastValueStore.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastValueStore.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastValueStore.java index f9c5b34..3cd06e8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastValueStore.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastValueStore.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast; +import org.apache.hadoop.hive.common.MemoryEstimate; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinHashMapResult; @@ -30,7 +31,7 @@ import com.google.common.base.Preconditions; // Supports random access. -public class VectorMapJoinFastValueStore { +public class VectorMapJoinFastValueStore implements MemoryEstimate { private static final Logger LOG = LoggerFactory.getLogger(VectorMapJoinFastValueStore.class.getName()); @@ -113,6 +114,11 @@ public class VectorMapJoinFastValueStore { return writeBuffers; } + @Override + public long getEstimatedMemorySize() { + return writeBuffers == null ? 0 : writeBuffers.getEstimatedMemorySize(); + } + public static class HashMapResult extends VectorMapJoinHashMapResult { private VectorMapJoinFastValueStore valueStore; http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/hashtable/VectorMapJoinHashTable.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/hashtable/VectorMapJoinHashTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/hashtable/VectorMapJoinHashTable.java index c7e585c..9cc9ad4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/hashtable/VectorMapJoinHashTable.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/hashtable/VectorMapJoinHashTable.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable; import java.io.IOException; +import org.apache.hadoop.hive.common.MemoryEstimate; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.io.BytesWritable; @@ -28,7 +29,7 @@ import org.apache.hadoop.io.BytesWritable; * Root interface for a vector map join hash table (which could be a hash map, hash multi-set, or * hash set). */ -public interface VectorMapJoinHashTable { +public interface VectorMapJoinHashTable extends MemoryEstimate { /*