Author: sershe Date: Fri Nov 7 18:54:08 2014 New Revision: 1637432 URL: http://svn.apache.org/r1637432 Log: HIVE-8556 : introduce overflow control and sanity check to BytesBytesMapJoin (Sergey Shelukhin, reviewed by Mostafa Mokhtar and Prasanth J)
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableLoader.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionHandler.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HashTableLoader.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableLoader.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableLoader.java?rev=1637432&r1=1637431&r2=1637432&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableLoader.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableLoader.java Fri Nov 7 18:54:08 2014 @@ -33,6 +33,6 @@ public interface HashTableLoader { void init(ExecMapperContext context, Configuration hconf, MapJoinOperator joinOp); - void load(MapJoinTableContainer[] mapJoinTables, MapJoinTableContainerSerDe[] mapJoinTableSerdes) - throws HiveException; + void load(MapJoinTableContainer[] mapJoinTables, + MapJoinTableContainerSerDe[] mapJoinTableSerdes, long memUsage) throws HiveException; } Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java?rev=1637432&r1=1637431&r2=1637432&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java Fri Nov 7 18:54:08 2014 @@ -26,6 +26,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.ql.HashTableLoaderFactory; +import org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionHandler; import org.apache.hadoop.hive.ql.exec.persistence.MapJoinKey; import org.apache.hadoop.hive.ql.exec.persistence.MapJoinObjectSerDeContext; import org.apache.hadoop.hive.ql.exec.persistence.MapJoinRowContainer; @@ -187,7 +188,9 @@ public class MapJoinOperator extends Abs } perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.LOAD_HASHTABLE); loader.init(getExecContext(), hconf, this); - loader.load(mapJoinTables, mapJoinTableSerdes); + long memUsage = (long)(MapJoinMemoryExhaustionHandler.getMaxHeapSize() + * conf.getHashTableMemoryUsage()); + loader.load(mapJoinTables, mapJoinTableSerdes, memUsage); if (!conf.isBucketMapJoin()) { /* * The issue with caching in case of bucket map join is that different tasks Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionHandler.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionHandler.java?rev=1637432&r1=1637431&r2=1637432&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionHandler.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionHandler.java Fri Nov 7 18:54:08 2014 @@ -55,22 +55,30 @@ public class MapJoinMemoryExhaustionHand this.console = console; this.maxMemoryUsage = maxMemoryUsage; this.memoryMXBean = ManagementFactory.getMemoryMXBean(); - long maxHeapSize = memoryMXBean.getHeapMemoryUsage().getMax(); + this.maxHeapSize = getMaxHeapSize(memoryMXBean); + percentageNumberFormat = NumberFormat.getInstance(); + percentageNumberFormat.setMinimumFractionDigits(2); + LOG.info("JVM Max Heap Size: " + this.maxHeapSize); + } + + public static long getMaxHeapSize() { + return getMaxHeapSize(ManagementFactory.getMemoryMXBean()); + } + + private static long getMaxHeapSize(MemoryMXBean bean) { + long maxHeapSize = bean.getHeapMemoryUsage().getMax(); /* * According to the javadoc, getMax() can return -1. In this case * default to 200MB. This will probably never actually happen. */ if(maxHeapSize == -1) { - this.maxHeapSize = 200L * 1024L * 1024L; LOG.warn("MemoryMXBean.getHeapMemoryUsage().getMax() returned -1, " + "defaulting maxHeapSize to 200MB"); - } else { - this.maxHeapSize = maxHeapSize; + return 200L * 1024L * 1024L; } - percentageNumberFormat = NumberFormat.getInstance(); - percentageNumberFormat.setMinimumFractionDigits(2); - LOG.info("JVM Max Heap Size: " + this.maxHeapSize); + return maxHeapSize; } + /** * Throws MapJoinMemoryExhaustionException when the JVM has consumed the * configured percentage of memory. The arguments are used simply for the error Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HashTableLoader.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HashTableLoader.java?rev=1637432&r1=1637431&r2=1637432&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HashTableLoader.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HashTableLoader.java Fri Nov 7 18:54:08 2014 @@ -72,7 +72,7 @@ public class HashTableLoader implements @Override public void load( MapJoinTableContainer[] mapJoinTables, - MapJoinTableContainerSerDe[] mapJoinTableSerdes) throws HiveException { + MapJoinTableContainerSerDe[] mapJoinTableSerdes, long memUsage) throws HiveException { String currentInputPath = context.getCurrentInputPath().toString(); LOG.info("******* Load from HashTable for input file: " + currentInputPath); Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java?rev=1637432&r1=1637431&r2=1637432&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java Fri Nov 7 18:54:08 2014 @@ -149,13 +149,27 @@ public final class BytesBytesMultiHashMa /** We have 39 bits to store list pointer from the first record; this is size limit */ final static long MAX_WB_SIZE = ((long)1) << 38; + /** 8 Gb of refs is the max capacity if memory limit is not specified. If someone has 100s of + * Gbs of memory (this might happen pretty soon) we'd need to string together arrays anyway. */ + private final static int DEFAULT_MAX_CAPACITY = 1024 * 1024 * 1024; - public BytesBytesMultiHashMap(int initialCapacity, float loadFactor, int wbSize) { + public BytesBytesMultiHashMap(int initialCapacity, + float loadFactor, int wbSize, long memUsage, int defaultCapacity) { if (loadFactor < 0 || loadFactor > 1) { throw new AssertionError("Load factor must be between (0, 1]."); } + assert initialCapacity > 0; initialCapacity = (Long.bitCount(initialCapacity) == 1) ? initialCapacity : nextHighestPowerOfTwo(initialCapacity); + // 8 bytes per long in the refs, assume data will be empty. This is just a sanity check. + int maxCapacity = (memUsage <= 0) ? DEFAULT_MAX_CAPACITY + : (int)Math.min((long)DEFAULT_MAX_CAPACITY, memUsage / 8); + if (maxCapacity < initialCapacity || initialCapacity <= 0) { + // Either initialCapacity is too large, or nextHighestPowerOfTwo overflows + initialCapacity = (Long.bitCount(maxCapacity) == 1) + ? maxCapacity : nextLowestPowerOfTwo(maxCapacity); + } + validateCapacity(initialCapacity); startingHashBitCount = 63 - Long.numberOfLeadingZeros(initialCapacity); this.loadFactor = loadFactor; @@ -164,6 +178,11 @@ public final class BytesBytesMultiHashMa resizeThreshold = (int)(initialCapacity * this.loadFactor); } + @VisibleForTesting + BytesBytesMultiHashMap(int initialCapacity, float loadFactor, int wbSize) { + this(initialCapacity, loadFactor, wbSize, -1, 100000); + } + /** The source of keys and values to put into hashtable; avoids byte copying. */ public static interface KvSource { /** Write key into output. */ @@ -644,6 +663,10 @@ public final class BytesBytesMultiHashMa return Integer.highestOneBit(v) << 1; } + private static int nextLowestPowerOfTwo(int v) { + return Integer.highestOneBit(v); + } + @VisibleForTesting int getCapacity() { return refs.length; Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java?rev=1637432&r1=1637431&r2=1637432&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java Fri Nov 7 18:54:08 2014 @@ -60,17 +60,20 @@ public class MapJoinBytesTableContainer private final List<Object> EMPTY_LIST = new ArrayList<Object>(0); public MapJoinBytesTableContainer(Configuration hconf, - MapJoinObjectSerDeContext valCtx, long keyCount) throws SerDeException { + MapJoinObjectSerDeContext valCtx, long keyCount, long memUsage) throws SerDeException { this(HiveConf.getFloatVar(hconf, HiveConf.ConfVars.HIVEHASHTABLEKEYCOUNTADJUSTMENT), HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEHASHTABLETHRESHOLD), HiveConf.getFloatVar(hconf, HiveConf.ConfVars.HIVEHASHTABLELOADFACTOR), - HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEHASHTABLEWBSIZE), valCtx, keyCount); + HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEHASHTABLEWBSIZE), + valCtx, keyCount, memUsage); } private MapJoinBytesTableContainer(float keyCountAdj, int threshold, float loadFactor, - int wbSize, MapJoinObjectSerDeContext valCtx, long keyCount) throws SerDeException { - threshold = HashMapWrapper.calculateTableSize(keyCountAdj, threshold, loadFactor, keyCount); - hashMap = new BytesBytesMultiHashMap(threshold, loadFactor, wbSize); + int wbSize, MapJoinObjectSerDeContext valCtx, long keyCount, long memUsage) + throws SerDeException { + int newThreshold = HashMapWrapper.calculateTableSize( + keyCountAdj, threshold, loadFactor, keyCount); + hashMap = new BytesBytesMultiHashMap(newThreshold, loadFactor, wbSize, memUsage, threshold); } private LazyBinaryStructObjectInspector createInternalOi( Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java?rev=1637432&r1=1637431&r2=1637432&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java Fri Nov 7 18:54:08 2014 @@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configurat import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.MapJoinOperator; import org.apache.hadoop.hive.ql.exec.MapredContext; +import org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionHandler; import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext; import org.apache.hadoop.hive.ql.exec.persistence.HashMapWrapper; import org.apache.hadoop.hive.ql.exec.persistence.MapJoinBytesTableContainer; @@ -69,7 +70,7 @@ public class HashTableLoader implements @Override public void load( MapJoinTableContainer[] mapJoinTables, - MapJoinTableContainerSerDe[] mapJoinTableSerdes) throws HiveException { + MapJoinTableContainerSerDe[] mapJoinTableSerdes, long memUsage) throws HiveException { TezContext tezContext = (TezContext) MapredContext.get(); Map<Integer, String> parentToInput = desc.getParentToInput(); @@ -106,7 +107,7 @@ public class HashTableLoader implements Long keyCountObj = parentKeyCounts.get(pos); long keyCount = (keyCountObj == null) ? -1 : keyCountObj.longValue(); MapJoinTableContainer tableContainer = useOptimizedTables - ? new MapJoinBytesTableContainer(hconf, valCtx, keyCount) + ? new MapJoinBytesTableContainer(hconf, valCtx, keyCount, memUsage) : new HashMapWrapper(hconf, keyCount); while (kvReader.next()) {