Author: sershe Date: Wed Jan 14 02:47:08 2015 New Revision: 1651559 URL: http://svn.apache.org/r1651559 Log: Finish reworking LRFU policy for low-level cache (not clear if it's a good pick due to concurrency); tests; some pipeline adjustments
Added: hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/ hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java Removed: hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/old/TestLrfuCachePolicy.java Modified: hive/branches/llap/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/EncodedColumn.java hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LowLevelCache.java hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapCacheableBuffer.java hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelBuddyCache.java hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicy.java hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicyBase.java hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/VectorReader.java hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataProducer.java hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/orc/LLAPRecordReaderImpl.java hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReader.java hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java Modified: hive/branches/llap/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java URL: http://svn.apache.org/viewvc/hive/branches/llap/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1651559&r1=1651558&r2=1651559&view=diff ============================================================================== --- hive/branches/llap/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original) +++ hive/branches/llap/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Wed Jan 14 02:47:08 2015 @@ -1969,10 +1969,11 @@ public class HiveConf extends Configurat "Updates tez job execution progress in-place in the terminal."), LLAP_ENABLED("hive.llap.enabled", true, ""), - LLAP_ORC_CACHE_MIN_ALLOC("hive.llap.cache.orc.minalloc", 128 * 1024, ""), - LLAP_ORC_CACHE_MAX_ALLOC("hive.llap.cache.orc.minalloc", 16 * 1024 * 1024, ""), - LLAP_ORC_CACHE_ARENA_SIZE("hive.llap.cache.orc.minalloc", 128L * 1024 * 1024, ""), - LLAP_ORC_CACHE_MAX_SIZE("hive.llap.cache.orc.minalloc", 1024L * 1024 * 1024, ""), + LLAP_LOW_LEVEL_CACHE("hive.llap.use.lowlevel.cache", true, ""), + LLAP_ORC_CACHE_MIN_ALLOC("hive.llap.cache.orc.alloc.min", 128 * 1024, ""), + LLAP_ORC_CACHE_MAX_ALLOC("hive.llap.cache.orc.alloc.max", 16 * 1024 * 1024, ""), + LLAP_ORC_CACHE_ARENA_SIZE("hive.llap.cache.orc.arena.size", 128 * 1024 * 1024, ""), + LLAP_ORC_CACHE_MAX_SIZE("hive.llap.cache.orc.size", 1024L * 1024 * 1024, ""), LLAP_REQUEST_THREAD_COUNT("hive.llap.request.thread.count", 16, ""), LLAP_USE_LRFU("hive.llap.use.lrfu", true, ""), LLAP_LRFU_LAMBDA("hive.llap.lrfu.lambda", 0.01f, "") Modified: hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/EncodedColumn.java URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/EncodedColumn.java?rev=1651559&r1=1651558&r2=1651559&view=diff ============================================================================== --- hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/EncodedColumn.java (original) +++ hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/EncodedColumn.java Wed Jan 14 02:47:08 2015 @@ -18,10 +18,17 @@ package org.apache.hadoop.hive.llap.io.api; +import org.apache.hadoop.hive.llap.io.api.cache.LlapMemoryBuffer; + public class EncodedColumn<BatchKey> { // TODO: temporary class. Will be filled in when reading (ORC) is implemented. Need to balance // generality, and ability to not copy data from underlying low-level cached buffers. - public static class ColumnBuffer {} + public static class ColumnBuffer { + // TODO: given how ORC will allocate, it might make sense to share array between all + // returned encodedColumn-s, and store index and length in the array. + public LlapMemoryBuffer[] cacheBuffers; + public int firstOffset, lastLength; + } public EncodedColumn(BatchKey batchKey, int columnIndex, ColumnBuffer columnData) { this.batchKey = batchKey; this.columnIndex = columnIndex; Modified: hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LowLevelCache.java URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LowLevelCache.java?rev=1651559&r1=1651558&r2=1651559&view=diff ============================================================================== --- hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LowLevelCache.java (original) +++ hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LowLevelCache.java Wed Jan 14 02:47:08 2015 @@ -42,4 +42,7 @@ public interface LowLevelCache { * Allocate dest.length new blocks of size into dest. */ void allocateMultiple(LlapMemoryBuffer[] dest, int size); + + void releaseBuffers(LlapMemoryBuffer[] cacheBuffers); + } Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapCacheableBuffer.java URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapCacheableBuffer.java?rev=1651559&r1=1651558&r2=1651559&view=diff ============================================================================== --- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapCacheableBuffer.java (original) +++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapCacheableBuffer.java Wed Jan 14 02:47:08 2015 @@ -41,8 +41,9 @@ public final class LlapCacheableBuffer e // TODO: Fields pertaining to cache policy. Perhaps they should live in separate object. public double priority; public long lastUpdate = -1; - public int indexInHeap = -1; - public boolean isLockedInHeap; // TODO#: this flag is invalid and not thread safe + public LlapCacheableBuffer prev = null, next = null; + public int indexInHeap = NOT_IN_CACHE; + public static final int IN_LIST = -2, NOT_IN_CACHE = -1; @Override public int hashCode() { @@ -60,15 +61,16 @@ public final class LlapCacheableBuffer e && this.offset == other.offset && this.length == other.length; } - int lock() { - int oldRefCount = -1; + int incRef() { + int newRefCount = -1; while (true) { - oldRefCount = refCount.get(); + int oldRefCount = refCount.get(); if (oldRefCount == EVICTED_REFCOUNT) return -1; assert oldRefCount >= 0; - if (refCount.compareAndSet(oldRefCount, oldRefCount + 1)) break; + newRefCount = oldRefCount + 1; + if (refCount.compareAndSet(oldRefCount, newRefCount)) break; } - return oldRefCount; + return newRefCount; } public boolean isLocked() { @@ -81,7 +83,7 @@ public final class LlapCacheableBuffer e return refCount.get() == EVICTED_REFCOUNT; } - int unlock() { + int decRef() { int newRefCount = refCount.decrementAndGet(); if (newRefCount < 0) { throw new AssertionError("Unexpected refCount " + newRefCount); Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelBuddyCache.java URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelBuddyCache.java?rev=1651559&r1=1651558&r2=1651559&view=diff ============================================================================== --- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelBuddyCache.java (original) +++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelBuddyCache.java Wed Jan 14 02:47:08 2015 @@ -33,8 +33,9 @@ import org.apache.hadoop.hive.llap.io.ap import org.apache.hadoop.hive.llap.io.api.cache.LowLevelCache; import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl; +// TODO: refactor the cache and allocator parts? public class LowLevelBuddyCache implements LowLevelCache, EvictionListener { - private final ArrayList<arena> arenas; + private final ArrayList<Arena> arenas; private AtomicInteger newEvictions = new AtomicInteger(0); private final Thread cleanupThread; private final ConcurrentHashMap<String, FileCache> cache = @@ -44,21 +45,21 @@ public class LowLevelBuddyCache implemen // Config settings private final int minAllocLog2, maxAllocLog2, arenaSizeLog2, maxArenas; - private final int minAllocation, maxAllocation; - private final long maxSize, arenaSize; - + private final int minAllocation, maxAllocation, arenaSize; + private final long maxSize; + public LowLevelBuddyCache(Configuration conf) { minAllocation = HiveConf.getIntVar(conf, ConfVars.LLAP_ORC_CACHE_MIN_ALLOC); maxAllocation = HiveConf.getIntVar(conf, ConfVars.LLAP_ORC_CACHE_MAX_ALLOC); - arenaSize = HiveConf.getLongVar(conf, ConfVars.LLAP_ORC_CACHE_ARENA_SIZE); + arenaSize = HiveConf.getIntVar(conf, ConfVars.LLAP_ORC_CACHE_ARENA_SIZE); maxSize = HiveConf.getLongVar(conf, ConfVars.LLAP_ORC_CACHE_MAX_SIZE); - if (maxSize < arenaSize || arenaSize > maxAllocation || maxAllocation < minAllocation) { + if (maxSize < arenaSize || arenaSize < maxAllocation || maxAllocation < minAllocation) { throw new AssertionError("Inconsistent sizes of cache, arena and allocations: " + minAllocation + ", " + maxAllocation + ", " + arenaSize + ", " + maxSize); } if ((Integer.bitCount(minAllocation) != 1) || (Integer.bitCount(maxAllocation) != 1) || (Long.bitCount(arenaSize) != 1) || (minAllocation == 1)) { - // TODO: technically, arena size is not required to be so; needs to be divisible by maxAlloc + // TODO: technically, arena size only needs to be divisible by maxAlloc throw new AssertionError("Allocation and arena sizes must be powers of two > 1: " + minAllocation + ", " + maxAllocation + ", " + arenaSize); } @@ -70,11 +71,11 @@ public class LowLevelBuddyCache implemen maxAllocLog2 = 31 - Integer.numberOfLeadingZeros(maxAllocation); arenaSizeLog2 = 31 - Long.numberOfLeadingZeros(arenaSize); maxArenas = (int)(maxSize / arenaSize); - arenas = new ArrayList<arena>(maxArenas); + arenas = new ArrayList<Arena>(maxArenas); for (int i = 0; i < maxArenas; ++i) { - arenas.add(new arena()); + arenas.add(new Arena()); } - arenas.get(0).init(); + arenas.get(0).init(arenaSize, maxAllocation, arenaSizeLog2, minAllocLog2, maxAllocLog2); cachePolicy = HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_USE_LRFU) ? new LowLevelLrfuCachePolicy(conf, minAllocation, maxSize, this) : new LowLevelFifoCachePolicy(minAllocation, maxSize, this); @@ -91,7 +92,7 @@ public class LowLevelBuddyCache implemen freeListIndex = Math.max(freeListIndex - minAllocLog2, 0); int allocationSize = 1 << (freeListIndex + minAllocLog2); int total = dest.length * allocationSize; - cachePolicy.reserveMemory(total); + cachePolicy.reserveMemory(total, true); int ix = 0; for (int i = 0; i < dest.length; ++i) { @@ -99,27 +100,27 @@ public class LowLevelBuddyCache implemen dest[i] = new LlapCacheableBuffer(null, -1, -1); // TODO: pool of objects? } // TODO: instead of waiting, loop only ones we haven't tried w/tryLock? - for (arena block : arenas) { + for (Arena block : arenas) { int newIx = allocateFast(block, freeListIndex, dest, ix, allocationSize); if (newIx == -1) break; if (newIx == dest.length) return; ix = newIx; } // Then try to split bigger blocks. - for (arena block : arenas) { + for (Arena block : arenas) { int newIx = allocateWithSplit(block, freeListIndex, dest, ix, allocationSize); if (newIx == -1) break; if (newIx == dest.length) return; ix = newIx; } // Then try to allocate memory if we haven't allocated all the way to maxSize yet; very rare. - for (arena block : arenas) { + for (Arena block : arenas) { ix = allocateWithExpand(block, freeListIndex, dest, ix, allocationSize); if (ix == dest.length) return; } } - private int allocateFast(arena block, + private int allocateFast(Arena block, int freeListIndex, LlapMemoryBuffer[] dest, int ix, int size) { if (block.data == null) return -1; // not allocated yet FreeList freeList = block.freeLists[freeListIndex]; @@ -133,7 +134,7 @@ public class LowLevelBuddyCache implemen } private int allocateWithSplit( - arena arena, int freeListIndex, LlapMemoryBuffer[] dest, int ix, int allocationSize) { + Arena arena, int freeListIndex, LlapMemoryBuffer[] dest, int ix, int allocationSize) { if (arena.data == null) return -1; // not allocated yet FreeList freeList = arena.freeLists[freeListIndex]; int remaining = -1; @@ -206,7 +207,7 @@ public class LowLevelBuddyCache implemen return lastSplitNextHeader << minAllocLog2; } - public int allocateFromFreeListUnderLock(arena block, FreeList freeList, + public int allocateFromFreeListUnderLock(Arena block, FreeList freeList, int freeListIndex, LlapMemoryBuffer[] dest, int ix, int size) { int current = freeList.listHead; while (current >= 0 && ix < dest.length) { @@ -222,15 +223,15 @@ public class LowLevelBuddyCache implemen } private int allocateWithExpand( - arena block, int freeListIndex, LlapMemoryBuffer[] dest, int ix, int size) { - if (block.data != null) return ix; // already allocated - synchronized (block) { + Arena arena, int freeListIndex, LlapMemoryBuffer[] dest, int ix, int size) { + if (arena.data != null) return ix; // already allocated + synchronized (arena) { // Never goes from non-null to null, so this is the only place we need sync. - if (block.data == null) { - block.init(); + if (arena.data == null) { + arena.init(arenaSize, maxAllocation, arenaSizeLog2, minAllocLog2, maxAllocLog2); } } - return allocateWithSplit(block, freeListIndex, dest, ix, size); + return allocateWithSplit(arena, freeListIndex, dest, ix, size); } @Override @@ -262,8 +263,8 @@ public class LowLevelBuddyCache implemen } private boolean lockBuffer(LlapCacheableBuffer buffer) { - int rc = buffer.lock(); - if (rc == 0) { + int rc = buffer.incRef(); + if (rc == 1) { cachePolicy.notifyLock(buffer); } return rc >= 0; @@ -282,7 +283,11 @@ public class LowLevelBuddyCache implemen assert buffer.isLocked(); while (true) { // Overwhelmingly executes once, or maybe twice (replacing stale value). LlapCacheableBuffer oldVal = subCache.cache.putIfAbsent(offset, buffer); - if (oldVal == null) break; // Cached successfully. + if (oldVal == null) { + // Cached successfully, add to policy. + cachePolicy.cache(buffer); + break; + } if (DebugUtils.isTraceCachingEnabled()) { LlapIoImpl.LOG.info("Trying to cache when the chunk is already cached for " + fileName + "@" + offset + "; old " + oldVal + ", new " + buffer); @@ -297,7 +302,7 @@ public class LowLevelBuddyCache implemen result[i >>> 6] |= (1 << (i & 63)); // indicate that we've replaced the value break; } - // We found some old value but couldn't lock it; remove it. + // We found some old value but couldn't incRef it; remove it. subCache.cache.remove(offset, oldVal); } } @@ -349,15 +354,22 @@ public class LowLevelBuddyCache implemen releaseBufferInternal((LlapCacheableBuffer)buffer); } + @Override + public void releaseBuffers(LlapMemoryBuffer[] cacheBuffers) { + for (int i = 0; i < cacheBuffers.length; ++i) { + releaseBufferInternal((LlapCacheableBuffer)cacheBuffers[i]); + } + } + public void releaseBufferInternal(LlapCacheableBuffer buffer) { - if (buffer.unlock() == 0) { + if (buffer.decRef() == 0) { cachePolicy.notifyUnlock(buffer); unblockEviction(); } } public static LlapCacheableBuffer allocateFake() { - return new LlapCacheableBuffer(null, -1, -1); + return new LlapCacheableBuffer(null, -1, 1); } public void unblockEviction() { @@ -446,9 +458,9 @@ public class LowLevelBuddyCache implemen } } - private class arena { - void init() { - data = ByteBuffer.allocateDirect(maxAllocation); + private static class Arena { + void init(int arenaSize, int maxAlloc, int arenaSizeLog2, int minAllocLog2, int maxAllocLog2) { + data = ByteBuffer.allocateDirect(arenaSize); int maxMinAllocs = 1 << (arenaSizeLog2 - minAllocLog2); headers = new byte[maxMinAllocs]; int allocLog2Diff = maxAllocLog2 - minAllocLog2; @@ -459,7 +471,7 @@ public class LowLevelBuddyCache implemen int maxMaxAllocs = 1 << (arenaSizeLog2 - maxAllocLog2), headerIndex = 0, headerIncrement = 1 << allocLog2Diff; freeLists[maxAllocLog2 - 1].listHead = 0; - for (int i = 0, offset = 0; i < maxMaxAllocs; ++i, offset += maxAllocation) { + for (int i = 0, offset = 0; i < maxMaxAllocs; ++i, offset += maxAlloc) { // TODO: will this cause bugs on large numbers due to some Java sign bit stupidity? headers[headerIndex] = (byte)(allocLog2Diff << 1); // Maximum allocation size data.putInt(offset, (i == 0) ? -1 : (headerIndex - headerIncrement)); @@ -482,7 +494,6 @@ public class LowLevelBuddyCache implemen // However, we are trying to increase fragmentation now, since we cater to single-size. } - // TODO##: separate the classes? private static class FileCache { private static final int EVICTED_REFCOUNT = -1, EVICTING_REFCOUNT = -2; // TODO: given the specific data, perhaps the nested thing should not be CHM Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicy.java URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicy.java?rev=1651559&r1=1651558&r2=1651559&view=diff ============================================================================== --- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicy.java (original) +++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicy.java Wed Jan 14 02:47:08 2015 @@ -22,5 +22,5 @@ public interface LowLevelCachePolicy { void cache(LlapCacheableBuffer buffer); void notifyLock(LlapCacheableBuffer buffer); void notifyUnlock(LlapCacheableBuffer buffer); - void reserveMemory(long total); + boolean reserveMemory(long memoryToReserve, boolean oneEviction); } Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicyBase.java URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicyBase.java?rev=1651559&r1=1651558&r2=1651559&view=diff ============================================================================== --- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicyBase.java (original) +++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicyBase.java Wed Jan 14 02:47:08 2015 @@ -32,7 +32,7 @@ public abstract class LowLevelCachePolic } @Override - public void reserveMemory(long memoryToReserve) { + public boolean reserveMemory(long memoryToReserve, boolean waitForEviction) { // TODO: if this cannot evict enough, it will spin infinitely. Terminate at some point? while (memoryToReserve > 0) { long usedMem = usedMemory.get(), newUsedMem = usedMem + memoryToReserve; @@ -42,16 +42,18 @@ public abstract class LowLevelCachePolic } // TODO: for one-block case, we could move notification for the last block out of the loop. long evicted = evictSomeBlocks(memoryToReserve, evictionListener); + if (!waitForEviction && evicted == 0) return false; // Adjust the memory - we have to account for what we have just evicted. while (true) { long reserveWithEviction = Math.min(memoryToReserve, maxSize - usedMem + evicted); - if (usedMemory.compareAndSet(usedMem, usedMem + reserveWithEviction)) { + if (usedMemory.compareAndSet(usedMem, usedMem - evicted + reserveWithEviction)) { memoryToReserve -= reserveWithEviction; break; } usedMem = usedMemory.get(); } } + return true; } protected abstract long evictSomeBlocks(long memoryToReserve, EvictionListener listener); Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java?rev=1651559&r1=1651558&r2=1651559&view=diff ============================================================================== --- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java (original) +++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java Wed Jan 14 02:47:08 2015 @@ -18,8 +18,8 @@ package org.apache.hadoop.hive.llap.cache; -import java.util.Iterator; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantLock; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; @@ -27,14 +27,11 @@ import org.apache.hadoop.hive.conf.HiveC import org.apache.hadoop.hive.llap.DebugUtils; import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl; -import com.google.common.annotations.VisibleForTesting; - /** - * Implementation of the "simple" algorithm from "On the Existence of a Spectrum of Policies + * Implementation of the algorithm from "On the Existence of a Spectrum of Policies * that Subsumes the Least Recently Used (LRU) and Least Frequently Used (LFU) Policies". - * TODO: fix this, no longer true; with ORC as is, 4k buffers per gig of cache - * We expect the number of buffers to be relatively small (1000s), so we just use one heap. - **/ + * Additionally, buffer locking has to be handled (locked buffer cannot be evicted). + */ public class LowLevelLrfuCachePolicy extends LowLevelCachePolicyBase { private final double lambda; private final double f(long x) { @@ -50,124 +47,162 @@ public class LowLevelLrfuCachePolicy ext private final AtomicLong timer = new AtomicLong(0); /** - * The heap. Currently synchronized on itself; there is a number of papers out there - * with various lock-free/efficient priority queues which we can use if needed. + * The heap and list. Currently synchronized on the object, which is not good. If this becomes + * a problem (which it probably will), we can partition the cache policy, or use some better + * structure. Heap should not be locked while holding the lock on list. + * As of now, eviction in most cases will only need the list; locking doesn't do anything; + * unlocking actually places item in evictable cache - unlocking is done after processing, + * so this most expensive part (and only access to heap in most cases) will not affect it. + * Perhaps we should use ConcurrentDoubleLinkedList (in public domain). + * ONLY LIST REMOVAL is allowed under list lock. */ private final LlapCacheableBuffer[] heap; + private final ReentrantLock listLock = new ReentrantLock(); + private LlapCacheableBuffer listHead, listTail; /** Number of elements. */ private int heapSize = 0; public LowLevelLrfuCachePolicy(Configuration conf, long minBufferSize, long maxCacheSize, EvictionListener listener) { super(maxCacheSize, listener); - heap = new LlapCacheableBuffer[(int)Math.ceil((maxCacheSize * 1.0) / minBufferSize)]; lambda = HiveConf.getFloatVar(conf, HiveConf.ConfVars.LLAP_LRFU_LAMBDA); + int maxBuffers = (int)Math.ceil((maxCacheSize * 1.0) / minBufferSize); + int maxHeapSize = -1; + if (lambda == 0) { + maxHeapSize = maxBuffers; // lrfuThreshold is +inf in this case + } else { + int lrfuThreshold = (int)((Math.log(1 - Math.pow(0.5, lambda)) / Math.log(0.5)) / lambda); + maxHeapSize = Math.min(lrfuThreshold, maxBuffers); + } + heap = new LlapCacheableBuffer[maxHeapSize]; + listHead = listTail = null; } @Override public void cache(LlapCacheableBuffer buffer) { - buffer.lastUpdate = timer.incrementAndGet(); - buffer.priority = F0; + // LRFU cache policy doesn't store locked blocks. When we cache, the block is locked, so + // we simply do nothing here. The fact that it was never updated will allow us to add it + // properly on the first notifyUnlock. assert buffer.isLocked(); - buffer.isLockedInHeap = true; - synchronized (heap) { - // Ensured by reserveMemory. - assert heapSize < heap.length : heap.length + " >= " + heapSize; - buffer.indexInHeap = heapSize; - heapifyUpUnderLock(buffer, buffer.lastUpdate); - if (DebugUtils.isTraceEnabled()) { - LlapIoImpl.LOG.info(buffer + " inserted at " + buffer.lastUpdate); - } - ++heapSize; - } } @Override public void notifyLock(LlapCacheableBuffer buffer) { - long time = timer.get(); - synchronized (heap) { - buffer.isLockedInHeap = true; - heapifyDownUnderLock(buffer, time); - } + // We do not proactively remove locked items from the heap, and opportunistically try to + // remove from the list (since eviction is mostly from the list). If eviction stumbles upon + // a locked item in either, it will remove it from cache; when we unlock, we are going to + // put it back or update it, depending on whether this has happened. This should cause + // most of the expensive cache update work to happen in unlock, not blocking processing. + if (buffer.indexInHeap != LlapCacheableBuffer.IN_LIST) return; + if (!listLock.tryLock()) return; + removeFromListAndUnlock(buffer); } @Override public void notifyUnlock(LlapCacheableBuffer buffer) { long time = timer.incrementAndGet(); + if (DebugUtils.isTraceCachingEnabled()) { + LlapIoImpl.LOG.info("Touching " + buffer + " at " + time); + } synchronized (heap) { - if (DebugUtils.isTraceCachingEnabled()) { - LlapIoImpl.LOG.info("Touching " + buffer + " at " + time); - } - buffer.priority = touchPriority(time, buffer.lastUpdate, buffer.priority); + // First, update buffer priority - we have just been using it. + buffer.priority = (buffer.lastUpdate == -1) ? F0 + : touchPriority(time, buffer.lastUpdate, buffer.priority); buffer.lastUpdate = time; - buffer.isLockedInHeap = false; - // Buffer's priority just decreased from boosted lock priority, so move up. - heapifyUpUnderLock(buffer, time); + // Then, if the buffer was in the list, remove it. + if (buffer.indexInHeap == LlapCacheableBuffer.IN_LIST) { + listLock.lock(); + removeFromListAndUnlock(buffer); + } + // The only concurrent change that can happen when we hold the heap lock is list removal; + // we have just ensured the item is not in the list, so we have a definite state now. + if (buffer.indexInHeap >= 0) { + // The buffer has lived in the heap all along. Restore heap property. + heapifyDownUnderLock(buffer, time); + } else if (heapSize == heap.length) { + // The buffer is not in the (full) heap. Demote the top item of the heap into the list. + LlapCacheableBuffer demoted = heap[0]; + synchronized (listLock) { + demoted.indexInHeap = LlapCacheableBuffer.IN_LIST; + demoted.prev = null; + if (listHead != null) { + demoted.next = listHead; + listHead.prev = demoted; + listHead = demoted; + } else { + listHead = listTail = demoted; + demoted.next = null; + } + } + // Now insert the buffer in its place and restore heap property. + buffer.indexInHeap = 0; + heapifyDownUnderLock(buffer, time); + } else { + // Heap is not full, add the buffer to the heap and restore heap property up. + assert heapSize < heap.length : heap.length + " < " + heapSize; + buffer.indexInHeap = heapSize; + heapifyUpUnderLock(buffer, time); + ++heapSize; + } } } - private LlapCacheableBuffer evictFromHeapUnderLock(long time) { - if (heapSize == 0) return null; - LlapCacheableBuffer result = heap[0]; - if (!result.invalidate()) { - // We boost the priority of locked buffers to a very large value; - // this means entire heap is locked. TODO: need to work around that for small pools? - if (DebugUtils.isTraceCachingEnabled()) { - LlapIoImpl.LOG.info("Failed to invalidate head " + result.toString() + "; size = " + heapSize); + @Override + protected long evictSomeBlocks(long memoryToReserve, EvictionListener listener) { + long evicted = 0; + // In normal case, we evict the items from the list. + LlapCacheableBuffer nextCandidate, firstCandidate; + listLock.lock(); + try { + nextCandidate = firstCandidate = listTail; + while (evicted < memoryToReserve && nextCandidate != null) { + if (!nextCandidate.invalidate()) { + // Locked buffer was in the list - just drop it; will be re-added on unlock. + LlapCacheableBuffer lockedBuffer = nextCandidate; + nextCandidate = nextCandidate.prev; + removeFromListUnderLock(lockedBuffer); + continue; + } + // Update the state to removed-from-list, so that parallel notifyUnlock doesn't modify us. + // TODO#: double check this is valid! + nextCandidate.indexInHeap = LlapCacheableBuffer.NOT_IN_CACHE; + evicted += nextCandidate.length; } - return null; - } - if (DebugUtils.isTraceCachingEnabled()) { - LlapIoImpl.LOG.info("Evicting " + result + " at " + time); - } - result.indexInHeap = -1; - --heapSize; - LlapCacheableBuffer newRoot = heap[heapSize]; - newRoot.indexInHeap = 0; - if (newRoot.lastUpdate != time && !newRoot.isLockedInHeap) { - newRoot.priority = expirePriority(time, newRoot.lastUpdate, newRoot.priority); - newRoot.lastUpdate = time; + if (firstCandidate != nextCandidate) { + if (nextCandidate == null) { + listHead = listTail = null; // We have evicted the entire list. + } else { + // Splice the section that we have evicted out of the list. + removeFromListUnderLock(nextCandidate.next, firstCandidate); + } + } + } finally { + listLock.unlock(); } - heapifyDownUnderLock(newRoot, time); - return result; - } - - private void heapifyDownUnderLock(LlapCacheableBuffer buffer, long time) { - // Relative positions of the blocks don't change over time; priorities we expire can only - // decrease; we only have one block that could have broken heap rule and we always move it - // down; therefore, we can update priorities of other blocks as we go for part of the heap - - // we correct any discrepancy w/the parent after expiring priority, and any block we expire - // the priority for already has lower priority than that of its children. - // TODO: avoid expiring priorities if times are close? might be needlessly expensive. - int ix = buffer.indexInHeap; - double priority = buffer.isLockedInHeap ? Double.MAX_VALUE : buffer.priority; - while (true) { - int leftIx = (ix << 1) + 1, rightIx = leftIx + 1; - if (leftIx >= heapSize) break; // Buffer is at the leaf node. - LlapCacheableBuffer left = heap[leftIx], right = null; - if (rightIx < heapSize) { - right = heap[rightIx]; - } - double leftPri = getHeapifyPriority(left, time), rightPri = getHeapifyPriority(right, time); - if (priority <= leftPri && priority <= rightPri) break; - if (leftPri <= rightPri) { // prefer left, cause right might be missing - heap[ix] = left; - left.indexInHeap = ix; - ix = leftIx; - } else { - heap[ix] = right; - right.indexInHeap = ix; - ix = rightIx; + while (firstCandidate != nextCandidate) { + listener.notifyEvicted(firstCandidate); + firstCandidate = firstCandidate.prev; + } + if (evicted >= memoryToReserve) return evicted; + // This should not happen unless we are evicting a lot at once, or buffers are large (so + // there's a small number of buffers and they all live in the heap). + long time = timer.get(); + while (evicted < memoryToReserve) { + LlapCacheableBuffer buffer = null; + synchronized (heap) { + buffer = evictFromHeapUnderLock(time); } + if (buffer == null) return evicted; + evicted += buffer.length; + listener.notifyEvicted(buffer); } - buffer.indexInHeap = ix; - heap[ix] = buffer; + return evicted; } private void heapifyUpUnderLock(LlapCacheableBuffer buffer, long time) { // See heapifyDown comment. int ix = buffer.indexInHeap; - double priority = buffer.isLockedInHeap ? Double.MAX_VALUE : buffer.priority; + double priority = buffer.priority; while (true) { if (ix == 0) break; // Buffer is at the top of the heap. int parentIx = (ix - 1) >>> 1; @@ -182,19 +217,140 @@ public class LowLevelLrfuCachePolicy ext heap[ix] = buffer; } + // Note: almost never called (unless buffers are very large or we evict a lot). + private LlapCacheableBuffer evictFromHeapUnderLock(long time) { + while (true) { + if (heapSize == 0) return null; + LlapCacheableBuffer result = heap[0]; + if (DebugUtils.isTraceCachingEnabled()) { + LlapIoImpl.LOG.info("Evicting " + result + " at " + time); + } + result.indexInHeap = -1; + --heapSize; + boolean canEvict = result.invalidate(); + if (heapSize > 0) { + LlapCacheableBuffer newRoot = heap[heapSize]; + newRoot.indexInHeap = 0; + if (newRoot.lastUpdate != time) { + newRoot.priority = expirePriority(time, newRoot.lastUpdate, newRoot.priority); + newRoot.lastUpdate = time; + } + heapifyDownUnderLock(newRoot, time); + } + if (canEvict) return result; + // Otherwise we just removed a locked item from heap; unlock will re-add it, we continue. + } + } + + private void heapifyDownUnderLock(LlapCacheableBuffer buffer, long time) { + // Relative positions of the blocks don't change over time; priorities we expire can only + // decrease; we only have one block that could have broken heap rule and we always move it + // down; therefore, we can update priorities of other blocks as we go for part of the heap - + // we correct any discrepancy w/the parent after expiring priority, and any block we expire + // the priority for already has lower priority than that of its children. + // TODO: avoid expiring priorities if times are close? might be needlessly expensive. + int ix = buffer.indexInHeap; + double priority = buffer.priority; + while (true) { + int newIx = moveMinChildUp(ix, time, priority); + if (newIx == -1) break; + ix = newIx; + } + buffer.indexInHeap = ix; + heap[ix] = buffer; + } + + /** + * Moves the minimum child of targetPos block up to targetPos; optionally compares priorities + * and terminates if targetPos element has lesser value than either of its children. + * @return the index of the child that was moved up; -1 if nothing was moved due to absence + * of the children, or a failed priority check. + */ + private int moveMinChildUp(int targetPos, long time, double comparePri) { + int leftIx = (targetPos << 1) + 1, rightIx = leftIx + 1; + if (leftIx >= heapSize) return -1; // Buffer is at the leaf node. + LlapCacheableBuffer left = heap[leftIx], right = null; + if (rightIx < heapSize) { + right = heap[rightIx]; + } + double leftPri = getHeapifyPriority(left, time), rightPri = getHeapifyPriority(right, time); + if (comparePri >= 0 && comparePri <= leftPri && comparePri <= rightPri) { + return -1; + } + if (leftPri <= rightPri) { // prefer left, cause right might be missing + heap[targetPos] = left; + left.indexInHeap = targetPos; + return leftIx; + } else { + heap[targetPos] = right; + right.indexInHeap = targetPos; + return rightIx; + } + } + private double getHeapifyPriority(LlapCacheableBuffer buf, long time) { - if (buf == null || buf.isLockedInHeap) return Double.MAX_VALUE; - if (buf.lastUpdate != time) { + if (buf == null) return Double.MAX_VALUE; + if (buf.lastUpdate != time && time >= 0) { buf.priority = expirePriority(time, buf.lastUpdate, buf.priority); buf.lastUpdate = time; } return buf.priority; } + private void removeFromListAndUnlock(LlapCacheableBuffer buffer) { + try { + if (buffer.indexInHeap == LlapCacheableBuffer.IN_LIST) return; + removeFromListUnderLock(buffer); + buffer.indexInHeap = LlapCacheableBuffer.NOT_IN_CACHE; + } finally { + listLock.unlock(); + } + } + + private void removeFromListUnderLock(LlapCacheableBuffer buffer) { + if (buffer == listTail) { + listTail = buffer.prev; + } else { + buffer.next.prev = buffer.prev; + } + if (buffer == listHead) { + listHead = buffer.next; + } else { + buffer.prev.next = buffer.next; + } + } + + private void removeFromListUnderLock(LlapCacheableBuffer from, LlapCacheableBuffer to) { + if (to == listTail) { + listTail = from.prev; + } else { + to.next.prev = from.prev; + } + if (from == listHead) { + listHead = to.next; + } else { + from.prev.next = to.next; + } + } + public String debugDumpHeap() { - if (heapSize == 0) return "<empty>"; + StringBuilder result = new StringBuilder("List: "); + if (listHead == null) { + result.append("<empty>"); + } else { + LlapCacheableBuffer listItem = listHead; + while (listItem != null) { + result.append(listItem.toStringForCache()).append(" -> "); + listItem = listItem.next; + } + } + result.append("\nHeap:"); + if (heapSize == 0) { + result.append(" <empty>\n"); + return result.toString(); + } + result.append("\n"); int levels = 32 - Integer.numberOfLeadingZeros(heapSize); - StringBuilder result = new StringBuilder(); int ix = 0; int spacesCount = heap[0].toStringForCache().length() + 3; String full = StringUtils.repeat(" ", spacesCount), @@ -230,23 +386,4 @@ public class LowLevelLrfuCachePolicy ext } return result.toString(); } - - @VisibleForTesting - public LlapCacheableBuffer evictOneMoreBlock() { - synchronized (heap) { - return evictFromHeapUnderLock(timer.get()); - } - } - - @Override - protected long evictSomeBlocks(long memoryToReserve, EvictionListener listener) { - long evicted = 0; - while (evicted < memoryToReserve) { - LlapCacheableBuffer buffer = evictOneMoreBlock(); - if (buffer == null) return evicted; - evicted += buffer.length; - listener.notifyEvicted(buffer); - } - return evicted; - } } Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/VectorReader.java URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/VectorReader.java?rev=1651559&r1=1651558&r2=1651559&view=diff ============================================================================== --- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/VectorReader.java (original) +++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/VectorReader.java Wed Jan 14 02:47:08 2015 @@ -33,7 +33,6 @@ public interface VectorReader { public static class ColumnVectorBatch { public ColumnVector[] cols; public int size; - public List<ColumnBuffer> lockedBuffers; } public ColumnVectorBatch next() throws InterruptedException, IOException; public void close() throws IOException; Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java?rev=1651559&r1=1651558&r2=1651559&view=diff ============================================================================== --- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java (original) +++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java Wed Jan 14 02:47:08 2015 @@ -25,6 +25,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.llap.cache.Cache; import org.apache.hadoop.hive.llap.cache.LowLevelBuddyCache; import org.apache.hadoop.hive.llap.cache.NoopCache; @@ -52,8 +53,11 @@ public class LlapIoImpl implements LlapI private LlapIoImpl(Configuration conf) throws IOException { this.conf = conf; - Cache<OrcCacheKey> cache = new NoopCache<OrcCacheKey>(); // High-level cache not supported yet. - this.edp = new OrcEncodedDataProducer(new LowLevelBuddyCache(conf), cache, conf); + boolean useLowLevelCache = HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_LOW_LEVEL_CACHE); + // High-level cache not supported yet. + Cache<OrcCacheKey> cache = useLowLevelCache ? null : new NoopCache<OrcCacheKey>(); + LowLevelBuddyCache orcCache = useLowLevelCache ? new LowLevelBuddyCache(conf) : null; + this.edp = new OrcEncodedDataProducer(orcCache, cache, conf); this.cvp = new OrcColumnVectorProducer(edp, conf); } Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java?rev=1651559&r1=1651558&r2=1651559&view=diff ============================================================================== --- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java (original) +++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java Wed Jan 14 02:47:08 2015 @@ -28,6 +28,7 @@ import org.apache.hadoop.hive.llap.Consu import org.apache.hadoop.hive.llap.io.api.EncodedColumn; import org.apache.hadoop.hive.llap.io.api.EncodedColumn.ColumnBuffer; import org.apache.hadoop.hive.llap.io.api.VectorReader.ColumnVectorBatch; +import org.apache.hadoop.hive.llap.io.api.orc.OrcBatchKey; import org.apache.hadoop.hive.llap.io.encoded.EncodedDataProducer; import org.apache.hadoop.hive.llap.io.encoded.EncodedDataReader; import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; @@ -105,6 +106,10 @@ public abstract class ColumnVectorProduc } if (0 == colsRemaining) { ColumnVectorProducer.this.decodeBatch(data.batchKey, targetBatch, downstreamConsumer); + // Batch has been decoded; unlock the buffers in cache + for (ColumnBuffer cb : targetBatch.columnDatas) { + upstreamFeedback.returnData(cb); + } } } @@ -133,10 +138,7 @@ public abstract class ColumnVectorProduc @Override public void returnData(ColumnVectorBatch data) { - // TODO#: this should happen earlier, when data is decoded buffers are not needed - for (ColumnBuffer lockedBuffer : data.lockedBuffers) { - upstreamFeedback.returnData(lockedBuffer); - } + // TODO: column vectors could be added to object pool here } private void dicardPendingData(boolean isStopped) { Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataProducer.java URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataProducer.java?rev=1651559&r1=1651558&r2=1651559&view=diff ============================================================================== --- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataProducer.java (original) +++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataProducer.java Wed Jan 14 02:47:08 2015 @@ -48,10 +48,11 @@ import org.apache.hadoop.mapred.InputSpl public class OrcEncodedDataProducer implements EncodedDataProducer<OrcBatchKey> { private FileSystem cachedFs = null; - private final LowLevelCache lowLevelCache; private Configuration conf; private OrcMetadataCache metadataCache; + // TODO: it makes zero sense to have both at the same time and duplicate data. Add "cache mode". private final Cache<OrcCacheKey> cache; + private final LowLevelCache lowLevelCache; private class OrcEncodedDataReader implements EncodedDataReader<OrcBatchKey>, Consumer<EncodedColumn<OrcBatchKey>> { @@ -112,11 +113,11 @@ public class OrcEncodedDataProducer impl } determineWhatToRead(stripes); if (isStopped) return; - List<Integer>[] stripeColumnsToRead = produceDataFromCache(); + List<Integer>[] stripeColsToRead = produceDataFromCache(); // readState now contains some 1s for column x rgs that were fetched from cache. // TODO: I/O threadpool would be here; for now, linear and inefficient for (int stripeIxMod = 0; stripeIxMod < readState.length; ++stripeIxMod) { - List<Integer> colsToRead = stripeColumnsToRead[stripeIxMod]; + List<Integer> colsToRead = stripeColsToRead == null ? null : stripeColsToRead[stripeIxMod]; long[][] colRgs = readState[stripeIxMod]; if (colsToRead == null) { colsToRead = columnIds; @@ -139,8 +140,10 @@ public class OrcEncodedDataProducer impl orcReader = createOrcReader(split); } RecordReader stripeReader = orcReader.rows(si.getOffset(), si.getLength(), includes); - // We pass in the already-filtered RGs, as well as sarg. ORC can apply additional filtering. - stripeReader.readEncodedColumns(colRgs, rgCount, sarg, this, lowLevelCache); + // In case if we have high-level cache, we will intercept the data and add it there; + // otherwise just pass the data directly to the consumer. + Consumer<EncodedColumn<OrcBatchKey>> consumer = (cache == null) ? this.consumer : this; + stripeReader.readEncodedColumns(colRgs, rgCount, consumer, lowLevelCache); stripeReader.close(); } @@ -152,13 +155,11 @@ public class OrcEncodedDataProducer impl @Override public void returnData(ColumnBuffer data) { - // TODO#: return the data to cache (unlock) + lowLevelCache.releaseBuffers(data.cacheBuffers); } private void determineWhatToRead(List<StripeInformation> stripes) { - // The unit of caching for ORC is (stripe x column) (see OrcBatchKey). Note that we do not use - // SARG anywhere, because file-level filtering on sarg is already performed during split - // generation, and stripe-level filtering to get row groups is not very helpful right now. + // The unit of caching for ORC is (stripe x column) (see OrcBatchKey). long offset = split.getStart(), maxOffset = offset + split.getLength(); stripeIxFrom = stripeIxTo = -1; int stripeIx = 0; @@ -208,6 +209,7 @@ public class OrcEncodedDataProducer impl readState[i][j] = new long[bitmaskSize]; } } + // TODO: HERE, we need to apply sargs and mark RGs that are filtered as 1s rgsPerStripe = new int[stripeRgCounts.size()]; for (int i = 0; i < rgsPerStripe.length; ++i) { rgsPerStripe[i] = stripeRgCounts.get(i); @@ -215,11 +217,10 @@ public class OrcEncodedDataProducer impl } // TODO: split by stripe? we do everything by stripe, and it might be faster - // TODO: return type provisional depending on ORC API private List<Integer>[] produceDataFromCache() { - // Assumes none of the columns are fetched, because we always do this before reading. + if (cache == null) return null; OrcCacheKey key = new OrcCacheKey(internedFilePath, -1, -1, -1); - @SuppressWarnings("unchecked") // Grr, no generics arrays, "J" in "Java" stands for "joke". + @SuppressWarnings("unchecked") // No generics arrays - "J" in "Java" stands for "joke". List<Integer>[] stripeColsNotInCache = new List[readState.length]; for (int stripeIxMod = 0; stripeIxMod < readState.length; ++stripeIxMod) { key.stripeIx = stripeIxFrom + stripeIxMod; @@ -230,6 +231,8 @@ public class OrcEncodedDataProducer impl long[] doneMask = cols[colIxMod]; boolean areAllRgsInCache = true; for (int rgIx = 0; rgIx < rgCount; ++rgIx) { + int maskIndex = rgIx >>> 6, maskBit = 1 << (rgIx & 63); + if ((doneMask[maskIndex] & maskBit) != 0) continue; // RG eliminated by SARG key.rgIx = rgIx; ColumnBuffer cached = cache.get(key); if (cached == null) { @@ -240,7 +243,7 @@ public class OrcEncodedDataProducer impl EncodedColumn<OrcBatchKey> col = new EncodedColumn<OrcBatchKey>( key.copyToPureBatchKey(), key.colIx, cached); consumer.consumeData(col); - doneMask[rgIx >>> 6] |= 1 << (rgIx & 63); + doneMask[maskIndex] = doneMask[maskIndex] | maskBit; } boolean hasFetchList = stripeColsNotInCache[stripeIxMod] != null; if (areAllRgsInCache) { @@ -273,10 +276,11 @@ public class OrcEncodedDataProducer impl @Override public void consumeData(EncodedColumn<OrcBatchKey> data) { // Store object in cache; create new key object - cannot be reused. + assert cache != null; OrcCacheKey key = new OrcCacheKey(data.batchKey, data.columnIndex); ColumnBuffer cached = cache.cacheOrGet(key, data.columnData); if (data.columnData != cached) { - // TODO: deallocate columnData + lowLevelCache.releaseBuffers(data.columnData.cacheBuffers); data.columnData = cached; } consumer.consumeData(data); Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/orc/LLAPRecordReaderImpl.java URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/orc/LLAPRecordReaderImpl.java?rev=1651559&r1=1651558&r2=1651559&view=diff ============================================================================== --- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/orc/LLAPRecordReaderImpl.java (original) +++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/orc/LLAPRecordReaderImpl.java Wed Jan 14 02:47:08 2015 @@ -98,7 +98,7 @@ public class LLAPRecordReaderImpl extend } @Override - public void readEncodedColumns(long[][] colRgs, int rgCount, SearchArgument sarg, + public void readEncodedColumns(long[][] colRgs, int rgCount, Consumer<EncodedColumn<OrcBatchKey>> consumer, LowLevelCache cache) { } Added: hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java?rev=1651559&view=auto ============================================================================== --- hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java (added) +++ hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java Wed Jan 14 02:47:08 2015 @@ -0,0 +1,251 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.llap.cache; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Random; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.conf.HiveConf; +import org.junit.Assume; +import org.junit.Test; +import static org.junit.Assert.*; + +public class TestLowLevelLrfuCachePolicy { + private static final Log LOG = LogFactory.getLog(TestLowLevelLrfuCachePolicy.class); + + @Test + public void testHeapSize2() { + testHeapSize(2); + } + + @Test + public void testHeapSize7() { + testHeapSize(7); + } + + @Test + public void testHeapSize8() { + testHeapSize(8); + } + + @Test + public void testHeapSize30() { + testHeapSize(30); + } + + private class EvictionTracker implements EvictionListener { + public List<LlapCacheableBuffer> evicted = new ArrayList<LlapCacheableBuffer>(); + + @Override + public void notifyEvicted(LlapCacheableBuffer buffer) { + evicted.add(buffer); + } + } + + @Test + public void testLfuExtreme() { + int heapSize = 4; + LOG.info("Testing lambda 0 (LFU)"); + Random rdm = new Random(1234); + HiveConf conf = new HiveConf(); + ArrayList<LlapCacheableBuffer> inserted = new ArrayList<LlapCacheableBuffer>(heapSize); + conf.setFloat(HiveConf.ConfVars.LLAP_LRFU_LAMBDA.varname, 0.0f); + EvictionTracker et = new EvictionTracker(); + LowLevelLrfuCachePolicy lfu = new LowLevelLrfuCachePolicy(conf, 1, heapSize, et); + for (int i = 0; i < heapSize; ++i) { + LlapCacheableBuffer buffer = LowLevelBuddyCache.allocateFake(); + assertTrue(cache(lfu, et, buffer)); + inserted.add(buffer); + } + Collections.shuffle(inserted, rdm); + // LFU extreme, order of accesses should be ignored, only frequency matters. + // We touch first elements later, but do it less times, so they will be evicted first. + for (int i = inserted.size() - 1; i >= 0; --i) { + for (int j = 0; j < i + 1; ++j) { + lfu.notifyLock(inserted.get(i)); + lfu.notifyUnlock(inserted.get(i)); + } + } + verifyOrder(lfu, et, inserted); + } + + @Test + public void testLruExtreme() { + int heapSize = 4; + LOG.info("Testing lambda 1 (LRU)"); + Random rdm = new Random(1234); + HiveConf conf = new HiveConf(); + ArrayList<LlapCacheableBuffer> inserted = new ArrayList<LlapCacheableBuffer>(heapSize); + conf.setFloat(HiveConf.ConfVars.LLAP_LRFU_LAMBDA.varname, 1.0f); + EvictionTracker et = new EvictionTracker(); + LowLevelLrfuCachePolicy lru = new LowLevelLrfuCachePolicy(conf, 1, heapSize, et); + for (int i = 0; i < heapSize; ++i) { + LlapCacheableBuffer buffer = LowLevelBuddyCache.allocateFake(); + assertTrue(cache(lru, et, buffer)); + inserted.add(buffer); + } + Collections.shuffle(inserted, rdm); + // LRU extreme, frequency of accesses should be ignored, only order matters. + for (int i = 0; i < inserted.size(); ++i) { + for (int j = 0; j < (inserted.size() - i); ++j) { + lru.notifyLock(inserted.get(i)); + lru.notifyUnlock(inserted.get(i)); + } + } + verifyOrder(lru, et, inserted); + } + + @Test + public void testDeadlockResolution() { + int heapSize = 4; + LOG.info("Testing deadlock resolution"); + ArrayList<LlapCacheableBuffer> inserted = new ArrayList<LlapCacheableBuffer>(heapSize); + EvictionTracker et = new EvictionTracker(); + LowLevelLrfuCachePolicy lrfu = new LowLevelLrfuCachePolicy(new HiveConf(), 1, heapSize, et); + for (int i = 0; i < heapSize; ++i) { + LlapCacheableBuffer buffer = LowLevelBuddyCache.allocateFake(); + assertTrue(cache(lrfu, et, buffer)); + inserted.add(buffer); + } + // Lock the lowest priority buffer; try to evict - we'll evict some other buffer. + LlapCacheableBuffer locked = inserted.get(0); + lock(lrfu, locked); + lrfu.reserveMemory(1, false); + LlapCacheableBuffer evicted = et.evicted.get(0); + assertNotNull(evicted); + assertTrue(evicted.isInvalid()); + assertNotSame(locked, evicted); + unlock(lrfu, locked); + } + + private static final LlapCacheableBuffer CANNOT_EVICT = LowLevelBuddyCache.allocateFake(); + // Buffers in test are fakes not linked to cache; notify cache policy explicitly. + public boolean cache( + LowLevelLrfuCachePolicy lrfu, EvictionTracker et, LlapCacheableBuffer buffer) { + if (!lrfu.reserveMemory(1, false)) { + return false; + } + buffer.incRef(); + lrfu.cache(buffer); + buffer.decRef(); + lrfu.notifyUnlock(buffer); + return true; + } + + private LlapCacheableBuffer getOneEvictedBuffer(EvictionTracker et) { + assertTrue(et.evicted.size() == 0 || et.evicted.size() == 1); // test-specific + LlapCacheableBuffer result = et.evicted.isEmpty() ? null : et.evicted.get(0); + et.evicted.clear(); + return result; + } + + private static void lock(LowLevelLrfuCachePolicy lrfu, LlapCacheableBuffer locked) { + locked.incRef(); + lrfu.notifyLock(locked); + } + + private static void unlock(LowLevelLrfuCachePolicy lrfu, LlapCacheableBuffer locked) { + locked.decRef(); + lrfu.notifyUnlock(locked); + } + + private void testHeapSize(int heapSize) { + LOG.info("Testing heap size " + heapSize); + Random rdm = new Random(1234); + HiveConf conf = new HiveConf(); + conf.setFloat(HiveConf.ConfVars.LLAP_LRFU_LAMBDA.varname, 0.05f); // very small heap? TODO# + EvictionTracker et = new EvictionTracker(); + LowLevelLrfuCachePolicy lrfu = new LowLevelLrfuCachePolicy(conf, 1, heapSize, et); + // Insert the number of elements plus 2, to trigger 2 evictions. + int toEvict = 2; + ArrayList<LlapCacheableBuffer> inserted = new ArrayList<LlapCacheableBuffer>(heapSize); + LlapCacheableBuffer[] evicted = new LlapCacheableBuffer[toEvict]; + Assume.assumeTrue(toEvict <= heapSize); + for (int i = 0; i < heapSize + toEvict; ++i) { + LlapCacheableBuffer buffer = LowLevelBuddyCache.allocateFake(); + assertTrue(cache(lrfu, et, buffer)); + LlapCacheableBuffer evictedBuf = getOneEvictedBuffer(et); + if (i < toEvict) { + evicted[i] = buffer; + } else { + if (i >= heapSize) { + assertSame(evicted[i - heapSize], evictedBuf); + assertTrue(evictedBuf.isInvalid()); + } else { + assertNull(evictedBuf); + } + inserted.add(buffer); + } + } + LOG.info("Inserted " + dumpInserted(inserted)); + // We will touch all blocks in random order. + Collections.shuffle(inserted, rdm); + LOG.info("Touch order " + dumpInserted(inserted)); + // Lock entire heap; heap is still full; we should not be able to evict or insert. + for (LlapCacheableBuffer buf : inserted) { + lock(lrfu, buf); + } + assertFalse(lrfu.reserveMemory(1, false)); + if (!et.evicted.isEmpty()) { + assertTrue("Got " + et.evicted.get(0), et.evicted.isEmpty()); + } + for (LlapCacheableBuffer buf : inserted) { + unlock(lrfu, buf); + } + // To make (almost) sure we get definite order, touch blocks in order large number of times. + for (LlapCacheableBuffer buf : inserted) { + // TODO: this seems to indicate that priorities change too little... + // perhaps we need to adjust the policy. + for (int j = 0; j < 10; ++j) { + lrfu.notifyLock(buf); + lrfu.notifyUnlock(buf); + } + } + verifyOrder(lrfu, et, inserted); + } + + private void verifyOrder(LowLevelLrfuCachePolicy lrfu, + EvictionTracker et, ArrayList<LlapCacheableBuffer> inserted) { + LlapCacheableBuffer block; + // Evict all blocks. + et.evicted.clear(); + for (int i = 0; i < inserted.size(); ++i) { + assertTrue(lrfu.reserveMemory(1, false)); + } + // The map should now be empty. + assertFalse(lrfu.reserveMemory(1, false)); + for (int i = 0; i < inserted.size(); ++i) { + block = et.evicted.get(i); + assertTrue(block.isInvalid()); + assertSame(inserted.get(i), block); + } + } + + private String dumpInserted(ArrayList<LlapCacheableBuffer> inserted) { + String debugStr = ""; + for (int i = 0; i < inserted.size(); ++i) { + if (i != 0) debugStr += ", "; + debugStr += inserted.get(i); + } + return debugStr; + } +} Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReader.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReader.java?rev=1651559&r1=1651558&r2=1651559&view=diff ============================================================================== --- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReader.java (original) +++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReader.java Wed Jan 14 02:47:08 2015 @@ -96,6 +96,6 @@ public interface RecordReader { * @param consumer Consumer to pass the results too. * @param allocator Allocator to allocate memory. */ - void readEncodedColumns(long[][] colRgs, int rgCount, SearchArgument sarg, + void readEncodedColumns(long[][] colRgs, int rgCount, Consumer<EncodedColumn<OrcBatchKey>> consumer, LowLevelCache cache); } Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java?rev=1651559&r1=1651558&r2=1651559&view=diff ============================================================================== --- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java (original) +++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java Wed Jan 14 02:47:08 2015 @@ -3302,7 +3302,7 @@ public class RecordReaderImpl implements } @Override - public void readEncodedColumns(long[][] colRgs, int rgCount, SearchArgument sarg, + public void readEncodedColumns(long[][] colRgs, int rgCount, Consumer<EncodedColumn<OrcBatchKey>> consumer, LowLevelCache allocator) { // TODO: HERE read encoded data }