Author: sershe Date: Sat Jan 17 01:26:50 2015 New Revision: 1652555 URL: http://svn.apache.org/r1652555 Log: Add tests for cache, clean up allocator debug logging left over
Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapCacheableBuffer.java hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicyBase.java hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelFifoCachePolicy.java hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java?rev=1652555&r1=1652554&r2=1652555&view=diff ============================================================================== --- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java (original) +++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java Sat Jan 17 01:26:50 2015 @@ -28,7 +28,7 @@ import org.apache.hadoop.hive.conf.HiveC import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.llap.io.api.cache.LlapMemoryBuffer; -final class BuddyAllocator { +public final class BuddyAllocator implements Allocator { private static final Log LOG = LogFactory.getLog(BuddyAllocator.class); private final Arena[] arenas; @@ -77,6 +77,7 @@ final class BuddyAllocator { } // TODO: would it make sense to return buffers asynchronously? + @Override public boolean allocateMultiple(LlapMemoryBuffer[] dest, int size) { assert size > 0 : "size is " + size; if (size > maxAllocation) { @@ -133,12 +134,15 @@ final class BuddyAllocator { return fake; } - public void deallocate(LlapCacheableBuffer buffer) { - arenas[buffer.arenaIndex].deallocate(buffer); + @Override + public void deallocate(LlapMemoryBuffer buffer) { + LlapCacheableBuffer buf = (LlapCacheableBuffer)buffer; + arenas[buf.arenaIndex].deallocate(buf); } public String debugDump() { - StringBuilder result = new StringBuilder(); + StringBuilder result = new StringBuilder( + "NOTE: with multiple threads the dump is not guaranteed to be consistent"); for (Arena arena : arenas) { arena.debugDump(result); } @@ -168,7 +172,6 @@ final class BuddyAllocator { for (int i = 0, offset = 0; i < maxMaxAllocs; ++i, offset += maxAllocation) { // TODO: will this cause bugs on large numbers due to some Java sign bit stupidity? headers[headerIndex] = makeHeader(allocLog2Diff, false); - LOG.info("TODO# 1 mucking with " + System.identityHashCode(data) + ":" + offset); data.putInt(offset, (i == 0) ? -1 : (headerIndex - headerStep)); data.putInt(offset + 4, (i == maxMaxAllocs - 1) ? -1 : (headerIndex + headerStep)); headerIndex += headerStep; @@ -181,26 +184,36 @@ final class BuddyAllocator { result.append(" not allocated"); return; } + // Try to get as consistent view as we can; make copy of the headers. + byte[] headers = new byte[this.headers.length]; + System.arraycopy(this.headers, 0, headers, 0, headers.length); for (int i = 0; i < headers.length; ++i) { byte header = headers[i]; if (header == 0) continue; int freeListIx = (header >> 1) - 1, offset = offsetFromHeaderIndex(i); boolean isFree = (header & 1) == 0; - result.append("\n block " + i + " at " + offset + ": size " + (1 << (freeListIx + minAllocLog2)) - + ", " + (isFree ? "free" : "allocated")); + result.append("\n block " + i + " at " + offset + ": size " + + (1 << (freeListIx + minAllocLog2)) + ", " + (isFree ? "free" : "allocated")); } int allocSize = minAllocation; for (int i = 0; i < freeLists.length; ++i, allocSize <<= 1) { result.append("\n free list for size " + allocSize + ": "); - int nextItem = freeLists[i].listHead; - while (nextItem >= 0) { - result.append(nextItem + ", "); - nextItem = data.getInt(offsetFromHeaderIndex(nextItem)); + FreeList freeList = freeLists[i]; + freeList.lock.lock(); + try { + int nextHeaderIx = freeList.listHead; + while (nextHeaderIx >= 0) { + result.append(nextHeaderIx + ", "); + nextHeaderIx = data.getInt(offsetFromHeaderIndex(nextHeaderIx)); + } + } finally { + freeList.lock.unlock(); } } } - private int allocateFast(int arenaIx, int freeListIx, LlapMemoryBuffer[] dest, int ix, int size) { + private int allocateFast( + int arenaIx, int freeListIx, LlapMemoryBuffer[] dest, int ix, int size) { if (data == null) return -1; // not allocated yet FreeList freeList = freeLists[freeListIx]; if (!freeList.lock.tryLock()) return ix; @@ -281,7 +294,6 @@ final class BuddyAllocator { if (headerIx == freeList.listHead) return; if (headerIx >= 0) { int newHeadOffset = offsetFromHeaderIndex(headerIx); - LOG.info("TODO# 3 mucking with " + System.identityHashCode(data) + ":" + newHeadOffset); data.putInt(newHeadOffset, -1); // Remove backlink. } freeList.listHead = headerIx; @@ -356,11 +368,9 @@ final class BuddyAllocator { if (freeList.listHead >= 0) { int oldHeadOffset = offsetFromHeaderIndex(freeList.listHead); assert data.getInt(oldHeadOffset) == -1; - LOG.info("TODO# 4 mucking with " + System.identityHashCode(data) + ":" + oldHeadOffset); data.putInt(oldHeadOffset, headerIx); } int offset = offsetFromHeaderIndex(headerIx); - LOG.info("TODO# 5 mucking with " + System.identityHashCode(data) + ":" + offset); data.putInt(offset, -1); data.putInt(offset + 4, freeList.listHead); freeList.listHead = headerIx; @@ -375,11 +385,9 @@ final class BuddyAllocator { } if (bpHeaderIx != -1) { data.putInt(offsetFromHeaderIndex(bpHeaderIx) + 4, bnHeaderIx); - LOG.info("TODO# 6 mucking with " + System.identityHashCode(data) + ":" + offsetFromHeaderIndex(bpHeaderIx) + " + 4"); } if (bnHeaderIx != -1) { data.putInt(offsetFromHeaderIndex(bnHeaderIx), bpHeaderIx); - LOG.info("TODO# 7 mucking with " + System.identityHashCode(data) + ":" + offsetFromHeaderIndex(bnHeaderIx)); } } } Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapCacheableBuffer.java URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapCacheableBuffer.java?rev=1652555&r1=1652554&r2=1652555&view=diff ============================================================================== --- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapCacheableBuffer.java (original) +++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapCacheableBuffer.java Sat Jan 17 01:26:50 2015 @@ -25,6 +25,8 @@ import org.apache.hadoop.hive.llap.Debug import org.apache.hadoop.hive.llap.io.api.cache.LlapMemoryBuffer; import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl; +import com.google.common.annotations.VisibleForTesting; + public final class LlapCacheableBuffer extends LlapMemoryBuffer { private static final int EVICTED_REFCOUNT = -1; static final int IN_LIST = -2, NOT_IN_CACHE = -1; @@ -47,6 +49,11 @@ public final class LlapCacheableBuffer e public LlapCacheableBuffer prev = null, next = null; public int indexInHeap = NOT_IN_CACHE; + @VisibleForTesting + int getRefCount() { + return refCount.get(); + } + @Override public int hashCode() { if (this.byteBuffer == null) return 0; Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java?rev=1652555&r1=1652554&r2=1652555&view=diff ============================================================================== --- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java (original) +++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java Sat Jan 17 01:26:50 2015 @@ -30,8 +30,10 @@ import org.apache.hadoop.hive.llap.io.ap import org.apache.hadoop.hive.llap.io.api.cache.LowLevelCache; import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl; +import com.google.common.annotations.VisibleForTesting; + public class LowLevelCacheImpl implements LowLevelCache, EvictionListener { - private final BuddyAllocator allocator; + private final Allocator allocator; private AtomicInteger newEvictions = new AtomicInteger(0); private final Thread cleanupThread; @@ -39,15 +41,23 @@ public class LowLevelCacheImpl implement new ConcurrentHashMap<String, FileCache>(); private final LowLevelCachePolicyBase cachePolicy; - public LowLevelCacheImpl(Configuration conf) { - int minAllocation = HiveConf.getIntVar(conf, ConfVars.LLAP_ORC_CACHE_MIN_ALLOC); - long maxSize = HiveConf.getLongVar(conf, ConfVars.LLAP_ORC_CACHE_MAX_SIZE); - cachePolicy = HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_USE_LRFU) - ? new LowLevelLrfuCachePolicy(conf, minAllocation, maxSize, this) - : new LowLevelFifoCachePolicy(minAllocation, maxSize, this); - allocator = new BuddyAllocator(conf, cachePolicy); - cleanupThread = new CleanupThread(); - cleanupThread.start(); + public LowLevelCacheImpl( + Configuration conf, LowLevelCachePolicyBase cachePolicy, Allocator allocator) { + this(conf, cachePolicy, allocator, 600); + } + + @VisibleForTesting + LowLevelCacheImpl(Configuration conf, + LowLevelCachePolicyBase cachePolicy, Allocator allocator, long cleanupInterval) { + this.cachePolicy = cachePolicy; + this.cachePolicy.setEvictionListener(this); + this.allocator = allocator; + if (cleanupInterval >= 0) { + cleanupThread = new CleanupThread(cleanupInterval); + cleanupThread.start(); + } else { + cleanupThread = null; + } } @Override @@ -242,10 +252,11 @@ public class LowLevelCacheImpl implement } private final class CleanupThread extends Thread { - private int APPROX_CLEANUP_INTERVAL_SEC = 600; + private final long approxCleanupIntervalSec; - public CleanupThread() { + public CleanupThread(long cleanupInterval) { super("Llap low level cache cleanup thread"); + this.approxCleanupIntervalSec = cleanupInterval; setDaemon(true); setPriority(1); } @@ -275,7 +286,7 @@ public class LowLevelCacheImpl implement } } // Duration is an estimate; if the size of the map changes, it can be very different. - long endTime = System.nanoTime() + APPROX_CLEANUP_INTERVAL_SEC * 1000000000L; + long endTime = System.nanoTime() + approxCleanupIntervalSec * 1000000000L; int leftToCheck = 0; // approximate for (FileCache fc : cache.values()) { leftToCheck += fc.cache.size(); Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicyBase.java URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicyBase.java?rev=1652555&r1=1652554&r2=1652555&view=diff ============================================================================== --- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicyBase.java (original) +++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicyBase.java Sat Jan 17 01:26:50 2015 @@ -22,13 +22,12 @@ import java.util.concurrent.atomic.Atomi public abstract class LowLevelCachePolicyBase implements LowLevelCachePolicy, MemoryManager { private final AtomicLong usedMemory; - private final long maxSize; + protected final long maxSize; private EvictionListener evictionListener; - public LowLevelCachePolicyBase(long maxSize, EvictionListener listener) { + public LowLevelCachePolicyBase(long maxSize) { this.maxSize = maxSize; this.usedMemory = new AtomicLong(0); - this.evictionListener = listener; } @Override @@ -57,4 +56,8 @@ public abstract class LowLevelCachePolic } protected abstract long evictSomeBlocks(long memoryToReserve, EvictionListener listener); + + public void setEvictionListener(EvictionListener evictionListener) { + this.evictionListener = evictionListener; + } } Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelFifoCachePolicy.java URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelFifoCachePolicy.java?rev=1652555&r1=1652554&r2=1652555&view=diff ============================================================================== --- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelFifoCachePolicy.java (original) +++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelFifoCachePolicy.java Sat Jan 17 01:26:50 2015 @@ -24,14 +24,18 @@ import java.util.LinkedHashSet; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; + public class LowLevelFifoCachePolicy extends LowLevelCachePolicyBase { private final Lock lock = new ReentrantLock(); private final LinkedHashSet<LlapCacheableBuffer> buffers; - public LowLevelFifoCachePolicy( - int expectedBufferSize, long maxCacheSize, EvictionListener listener) { - super(maxCacheSize, listener); - int expectedBuffers = (int)Math.ceil((maxCacheSize * 1.0) / expectedBufferSize); + public LowLevelFifoCachePolicy(Configuration conf) { + super(HiveConf.getLongVar(conf, ConfVars.LLAP_ORC_CACHE_MAX_SIZE)); + int expectedBufferSize = HiveConf.getIntVar(conf, ConfVars.LLAP_ORC_CACHE_MIN_ALLOC); + int expectedBuffers = (int)Math.ceil((maxSize * 1.0) / expectedBufferSize); buffers = new LinkedHashSet<LlapCacheableBuffer>((int)(expectedBuffers / 0.75f)); } Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java?rev=1652555&r1=1652554&r2=1652555&view=diff ============================================================================== --- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java (original) +++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java Sat Jan 17 01:26:50 2015 @@ -24,6 +24,7 @@ import java.util.concurrent.locks.Reentr import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.llap.DebugUtils; import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl; @@ -62,11 +63,11 @@ public class LowLevelLrfuCachePolicy ext /** Number of elements. */ private int heapSize = 0; - public LowLevelLrfuCachePolicy(Configuration conf, - long minBufferSize, long maxCacheSize, EvictionListener listener) { - super(maxCacheSize, listener); + public LowLevelLrfuCachePolicy(Configuration conf) { + super(HiveConf.getLongVar(conf, ConfVars.LLAP_ORC_CACHE_MAX_SIZE)); + int minBufferSize = HiveConf.getIntVar(conf, ConfVars.LLAP_ORC_CACHE_MIN_ALLOC); lambda = HiveConf.getFloatVar(conf, HiveConf.ConfVars.LLAP_LRFU_LAMBDA); - int maxBuffers = (int)Math.ceil((maxCacheSize * 1.0) / minBufferSize); + int maxBuffers = (int)Math.ceil((maxSize * 1.0) / minBufferSize); int maxHeapSize = -1; if (lambda == 0) { maxHeapSize = maxBuffers; // lrfuThreshold is +inf in this case Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java?rev=1652555&r1=1652554&r2=1652555&view=diff ============================================================================== --- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java (original) +++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java Sat Jan 17 01:26:50 2015 @@ -26,8 +26,13 @@ import org.apache.commons.logging.LogFac import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.llap.cache.Allocator; +import org.apache.hadoop.hive.llap.cache.BuddyAllocator; import org.apache.hadoop.hive.llap.cache.Cache; import org.apache.hadoop.hive.llap.cache.LowLevelCacheImpl; +import org.apache.hadoop.hive.llap.cache.LowLevelCachePolicyBase; +import org.apache.hadoop.hive.llap.cache.LowLevelFifoCachePolicy; +import org.apache.hadoop.hive.llap.cache.LowLevelLrfuCachePolicy; import org.apache.hadoop.hive.llap.cache.NoopCache; import org.apache.hadoop.hive.llap.io.api.LlapIo; import org.apache.hadoop.hive.llap.io.api.VectorReader; @@ -56,7 +61,14 @@ public class LlapIoImpl implements LlapI boolean useLowLevelCache = HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_LOW_LEVEL_CACHE); // High-level cache not supported yet. Cache<OrcCacheKey> cache = useLowLevelCache ? null : new NoopCache<OrcCacheKey>(); - LowLevelCacheImpl orcCache = useLowLevelCache ? new LowLevelCacheImpl(conf) : null; + LowLevelCacheImpl orcCache = null; + if (useLowLevelCache) { + boolean useLrfu = HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_USE_LRFU); + LowLevelCachePolicyBase cachePolicy = + useLrfu ? new LowLevelLrfuCachePolicy(conf) : new LowLevelFifoCachePolicy(conf); + Allocator allocator = new BuddyAllocator(conf, cachePolicy); + orcCache = new LowLevelCacheImpl(conf, cachePolicy, allocator); + } this.edp = new OrcEncodedDataProducer(orcCache, cache, conf); this.cvp = new OrcColumnVectorProducer(edp, conf); } Modified: hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java?rev=1652555&r1=1652554&r2=1652555&view=diff ============================================================================== --- hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java (original) +++ hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java Sat Jan 17 01:26:50 2015 @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.llap.cache; import java.util.Random; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.FutureTask; @@ -57,7 +58,7 @@ public class TestBuddyAllocator { int min = 3, max = 8, maxAlloc = 1 << max; Configuration conf = createConf(1 << min, maxAlloc, maxAlloc, maxAlloc); BuddyAllocator a = new BuddyAllocator(conf, new DummyMemoryManager()); - for (int i = min; i <= max; i <<= 1) { + for (int i = min; i <= max; ++i) { allocSameSize(a, 1 << (max - i), i); } } @@ -76,19 +77,23 @@ public class TestBuddyAllocator { Configuration conf = createConf(1 << min, maxAlloc, maxAlloc * 8, maxAlloc * 24); final BuddyAllocator a = new BuddyAllocator(conf, new DummyMemoryManager()); ExecutorService executor = Executors.newFixedThreadPool(3); + final CountDownLatch cdlIn = new CountDownLatch(3), cdlOut = new CountDownLatch(1); FutureTask<Object> upTask = new FutureTask<Object>(new Runnable() { public void run() { + syncThreadStart(cdlIn, cdlOut); allocateUp(a, min, max, allocsPerSize, false); allocateUp(a, min, max, allocsPerSize, true); } }, null), downTask = new FutureTask<Object>(new Runnable() { public void run() { + syncThreadStart(cdlIn, cdlOut); allocateDown(a, min, max, allocsPerSize, false); allocateDown(a, min, max, allocsPerSize, true); } }, null), sameTask = new FutureTask<Object>(new Runnable() { public void run() { - for (int i = min; i <= max; i <<= 1) { + syncThreadStart(cdlIn, cdlOut); + for (int i = min; i <= max; ++i) { allocSameSize(a, (1 << (max - i)) * allocsPerSize, i); } } @@ -97,6 +102,8 @@ public class TestBuddyAllocator { executor.execute(upTask); executor.execute(downTask); try { + cdlIn.await(); // Wait for all threads to be ready. + cdlOut.countDown(); // Release them at the same time. upTask.get(); downTask.get(); sameTask.get(); @@ -105,6 +112,15 @@ public class TestBuddyAllocator { } } + private void syncThreadStart(final CountDownLatch cdlIn, final CountDownLatch cdlOut) { + cdlIn.countDown(); + try { + cdlOut.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + private void testVariableSizeInternal(int allocCount, int arenaSizeMult, int arenaCount) { int min = 3, max = 8, maxAlloc = 1 << max, arenaSize = maxAlloc * arenaSizeMult; Configuration conf = createConf(1 << min, maxAlloc, arenaSize, arenaSize * arenaCount); Modified: hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java?rev=1652555&r1=1652554&r2=1652555&view=diff ============================================================================== --- hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java (original) +++ hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java Sat Jan 17 01:26:50 2015 @@ -17,13 +17,300 @@ */ package org.apache.hadoop.hive.llap.cache; +import java.util.Random; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.FutureTask; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.management.RuntimeErrorException; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.llap.io.api.cache.LlapMemoryBuffer; import org.junit.Assume; import org.junit.Test; import static org.junit.Assert.*; public class TestLowLevelCacheImpl { private static final Log LOG = LogFactory.getLog(TestLowLevelCacheImpl.class); + + private static class DummyAllocator implements Allocator { + @Override + public boolean allocateMultiple(LlapMemoryBuffer[] dest, int size) { + for (int i = 0; i < dest.length; ++i) { + LlapCacheableBuffer buf = new LlapCacheableBuffer(); + buf.initialize(0, null, -1, size); + dest[i] = buf; + } + return true; + } + + @Override + public void deallocate(LlapMemoryBuffer buffer) { + } + } + + private static class DummyCachePolicy extends LowLevelCachePolicyBase { + public DummyCachePolicy(long maxSize) { + super(maxSize); + } + + public void cache(LlapCacheableBuffer buffer) { + } + + public void notifyLock(LlapCacheableBuffer buffer) { + } + + public void notifyUnlock(LlapCacheableBuffer buffer) { + } + + protected long evictSomeBlocks(long memoryToReserve, EvictionListener listener) { + return memoryToReserve; + } + } + + @Test + public void testGetPut() { + Configuration conf = createConf(); + LowLevelCacheImpl cache = new LowLevelCacheImpl( + conf, new DummyCachePolicy(10), new DummyAllocator(), -1); // no cleanup thread + String fn1 = "file1".intern(), fn2 = "file2".intern(); + LlapMemoryBuffer[] fakes = new LlapMemoryBuffer[] { fb(), fb(), fb(), fb(), fb(), fb() }; + verifyRefcount(fakes, 1, 1, 1, 1, 1, 1); + assertNull(cache.putFileData(fn1, new long[] { 1, 2 }, fbs(fakes, 0, 1))); + assertNull(cache.putFileData(fn2, new long[] { 1, 2 }, fbs(fakes, 2, 3))); + assertArrayEquals(fbs(fakes, 0, 1), cache.getFileData(fn1, new long[] { 1, 2 })); + assertArrayEquals(fbs(fakes, 2, 3), cache.getFileData(fn2, new long[] { 1, 2 })); + assertArrayEquals(fbs(fakes, 1, -1), cache.getFileData(fn1, new long[] { 2, 3 })); + verifyRefcount(fakes, 2, 3, 2, 2, 1, 1); + LlapMemoryBuffer[] bufsDiff = fbs(fakes, 4, 5); + long[] mask = cache.putFileData(fn1, new long[] { 3, 1 }, bufsDiff); + assertEquals(1, mask.length); + assertEquals(2, mask[0]); // 2nd bit set - element 2 was already in cache. + assertSame(fakes[0], bufsDiff[1]); // Should have been replaced + verifyRefcount(fakes, 3, 3, 2, 2, 1, 0); + assertArrayEquals(fbs(fakes, 0, 1, 4), cache.getFileData(fn1, new long[] { 1, 2, 3 })); + verifyRefcount(fakes, 4, 4, 2, 2, 2, 0); + } + + @Test + public void testStaleValueGet() { + Configuration conf = createConf(); + LowLevelCacheImpl cache = new LowLevelCacheImpl( + conf, new DummyCachePolicy(10), new DummyAllocator(), -1); // no cleanup thread + String fn1 = "file1".intern(), fn2 = "file2".intern(); + LlapMemoryBuffer[] fakes = new LlapMemoryBuffer[] { fb(), fb(), fb() }; + assertNull(cache.putFileData(fn1, new long[] { 1, 2 }, fbs(fakes, 0, 1))); + assertNull(cache.putFileData(fn2, new long[] { 1 }, fbs(fakes, 2))); + assertArrayEquals(fbs(fakes, 0, 1), cache.getFileData(fn1, new long[] { 1, 2 })); + assertArrayEquals(fbs(fakes, 2), cache.getFileData(fn2, new long[] { 1 })); + verifyRefcount(fakes, 2, 2, 2); + evict(cache, fakes[0]); + evict(cache, fakes[2]); + assertArrayEquals(fbs(fakes, -1, 1), cache.getFileData(fn1, new long[] { 1, 2 })); + assertNull(cache.getFileData(fn2, new long[] { 1 })); + verifyRefcount(fakes, -1, 3, -1); + } + + @Test + public void testStaleValueReplace() { + Configuration conf = createConf(); + LowLevelCacheImpl cache = new LowLevelCacheImpl( + conf, new DummyCachePolicy(10), new DummyAllocator(), -1); // no cleanup thread + String fn1 = "file1".intern(), fn2 = "file2".intern(); + LlapMemoryBuffer[] fakes = new LlapMemoryBuffer[] { + fb(), fb(), fb(), fb(), fb(), fb(), fb(), fb(), fb() }; + assertNull(cache.putFileData(fn1, new long[] { 1, 2, 3 }, fbs(fakes, 0, 1, 2))); + assertNull(cache.putFileData(fn2, new long[] { 1 }, fbs(fakes, 3))); + evict(cache, fakes[0]); + evict(cache, fakes[3]); + long[] mask = cache.putFileData(fn1, new long[] { 1, 2, 3, 4 }, fbs(fakes, 4, 5, 6, 7)); + assertEquals(1, mask.length); + assertEquals(6, mask[0]); // Buffers at offset 2 & 3 exist; 1 exists and is stale; 4 doesn't + assertNull(cache.putFileData(fn2, new long[] { 1 }, fbs(fakes, 8))); + assertArrayEquals(fbs(fakes, 4, 2, 3, 7), cache.getFileData(fn1, new long[] { 1, 2, 3, 4 })); + } + + @Test + public void testMTTWithCleanup() { + Configuration conf = createConf(); + final LowLevelCacheImpl cache = new LowLevelCacheImpl( + conf, new DummyCachePolicy(10), new DummyAllocator(), 1); + final String fn1 = "file1".intern(), fn2 = "file2".intern(); + final int offsetsToUse = 8; + final CountDownLatch cdlIn = new CountDownLatch(4), cdlOut = new CountDownLatch(1); + final AtomicInteger rdmsDone = new AtomicInteger(0); + Callable<Long> rdmCall = new Callable<Long>() { + public Long call() { + int gets = 0, puts = 0; + try { + Random rdm = new Random(1234 + Thread.currentThread().getId()); + syncThreadStart(cdlIn, cdlOut); + for (int i = 0; i < 20000; ++i) { + boolean isGet = rdm.nextBoolean(), isFn1 = rdm.nextBoolean(); + String fileName = isFn1 ? fn1 : fn2; + int fileIndex = isFn1 ? 1 : 2; + int count = rdm.nextInt(offsetsToUse); + long[] offsets = new long[count]; + for (int j = 0; j < offsets.length; ++j) { + offsets[j] = rdm.nextInt(offsetsToUse); + } + if (isGet) { + LlapMemoryBuffer[] results = cache.getFileData(fileName, offsets); + if (results == null) continue; + for (int j = 0; j < offsets.length; ++j) { + if (results[j] == null) continue; + ++gets; + LlapCacheableBuffer result = (LlapCacheableBuffer)(results[j]); + assertEquals(makeFakeArenaIndex(fileIndex, offsets[j]), result.arenaIndex); + result.decRef(); + } + } else { + LlapMemoryBuffer[] buffers = new LlapMemoryBuffer[count]; + for (int j = 0; j < offsets.length; ++j) { + LlapCacheableBuffer buf = LowLevelCacheImpl.allocateFake(); + buf.incRef(); + buf.arenaIndex = makeFakeArenaIndex(fileIndex, offsets[j]); + buffers[j] = buf; + } + long[] mask = cache.putFileData(fileName, offsets, buffers); + puts += buffers.length; + long maskVal = 0; + if (mask != null) { + assertEquals(1, mask.length); + maskVal = mask[0]; + } + for (int j = 0; j < offsets.length; ++j) { + LlapCacheableBuffer buf = (LlapCacheableBuffer)(buffers[j]); + if ((maskVal & 1) == 1) { + assertEquals(makeFakeArenaIndex(fileIndex, offsets[j]), buf.arenaIndex); + } + maskVal >>= 1; + buf.decRef(); + } + } + } + } finally { + rdmsDone.incrementAndGet(); + } + return (((long)gets) << 32) | puts; + } + + private int makeFakeArenaIndex(int fileIndex, long offset) { + return (int)((fileIndex << 16) + offset); + } + }; + + FutureTask<Integer> evictionTask = new FutureTask<Integer>(new Callable<Integer>() { + public Integer call() { + boolean isFirstFile = false; + long[] offsets = new long[offsetsToUse]; + Random rdm = new Random(1234 + Thread.currentThread().getId()); + for (int i = 0; i < offsetsToUse; ++i) { + offsets[i] = i; + } + int evictions = 0; + syncThreadStart(cdlIn, cdlOut); + while (rdmsDone.get() < 3) { + isFirstFile = !isFirstFile; + String fileName = isFirstFile ? fn1 : fn2; + LlapMemoryBuffer[] results = cache.getFileData(fileName, offsets); + if (results == null) continue; + int startIndex = rdm.nextInt(results.length), index = startIndex; + LlapCacheableBuffer victim = null; + do { + if (results[index] != null) { + LlapCacheableBuffer result = (LlapCacheableBuffer)results[index]; + result.decRef(); + if (victim == null && result.invalidate()) { + ++evictions; + victim = result; + } + } + ++index; + if (index == results.length) index = 0; + } while (index != startIndex); + if (victim == null) continue; + cache.notifyEvicted(victim); + } + return evictions; + } + }); + + FutureTask<Long> rdmTask1 = new FutureTask<Long>(rdmCall), + rdmTask2 = new FutureTask<Long>(rdmCall), rdmTask3 = new FutureTask<Long>(rdmCall); + Executor threadPool = Executors.newFixedThreadPool(4); + threadPool.execute(rdmTask1); + threadPool.execute(rdmTask2); + threadPool.execute(rdmTask3); + threadPool.execute(evictionTask); + try { + cdlIn.await(); + cdlOut.countDown(); + long result1 = rdmTask1.get(), result2 = rdmTask2.get(), result3 = rdmTask3.get(); + int evictions = evictionTask.get(); + LOG.info("MTT test: task 1: " + descRdmTask(result1) + ", task 2: " + descRdmTask(result2) + + ", task 3: " + descRdmTask(result3) + "; " + evictions + " evictions"); + } catch (Throwable t) { + throw new RuntimeException(t); + } + } + + private String descRdmTask(long result) { + return (result >>> 32) + " successful gets, " + (result & ((1L << 32) - 1)) + " puts"; + } + + private void syncThreadStart(final CountDownLatch cdlIn, final CountDownLatch cdlOut) { + cdlIn.countDown(); + try { + cdlOut.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + private void evict(LowLevelCacheImpl cache, LlapMemoryBuffer fake) { + LlapCacheableBuffer victimBuffer = (LlapCacheableBuffer)fake; + int refCount = victimBuffer.getRefCount(); + for (int i = 0; i < refCount; ++i) { + victimBuffer.decRef(); + } + assertTrue(victimBuffer.invalidate()); + cache.notifyEvicted(victimBuffer); + } + + private void verifyRefcount(LlapMemoryBuffer[] fakes, int... refCounts) { + for (int i = 0; i < refCounts.length; ++i) { + assertEquals(refCounts[i], ((LlapCacheableBuffer)fakes[i]).getRefCount()); + } + } + + private LlapMemoryBuffer[] fbs(LlapMemoryBuffer[] fakes, int... indexes) { + LlapMemoryBuffer[] rv = new LlapMemoryBuffer[indexes.length]; + for (int i = 0; i < indexes.length; ++i) { + rv[i] = (indexes[i] == -1) ? null : fakes[indexes[i]]; + } + return rv; + } + + private LlapCacheableBuffer fb() { + LlapCacheableBuffer fake = LowLevelCacheImpl.allocateFake(); + fake.incRef(); + return fake; + } + + private Configuration createConf() { + Configuration conf = new Configuration(); + conf.setInt(ConfVars.LLAP_ORC_CACHE_MIN_ALLOC.varname, 3); + conf.setInt(ConfVars.LLAP_ORC_CACHE_MAX_ALLOC.varname, 8); + conf.setInt(ConfVars.LLAP_ORC_CACHE_ARENA_SIZE.varname, 8); + conf.setLong(ConfVars.LLAP_ORC_CACHE_MAX_SIZE.varname, 8); + return conf; + } } Modified: hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java?rev=1652555&r1=1652554&r2=1652555&view=diff ============================================================================== --- hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java (original) +++ hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java Sat Jan 17 01:26:50 2015 @@ -24,6 +24,7 @@ import java.util.Random; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.junit.Assume; import org.junit.Test; @@ -66,11 +67,12 @@ public class TestLowLevelLrfuCachePolicy int heapSize = 4; LOG.info("Testing lambda 0 (LFU)"); Random rdm = new Random(1234); - HiveConf conf = new HiveConf(); + Configuration conf = createConf(1, heapSize); ArrayList<LlapCacheableBuffer> inserted = new ArrayList<LlapCacheableBuffer>(heapSize); conf.setFloat(HiveConf.ConfVars.LLAP_LRFU_LAMBDA.varname, 0.0f); EvictionTracker et = new EvictionTracker(); - LowLevelLrfuCachePolicy lfu = new LowLevelLrfuCachePolicy(conf, 1, heapSize, et); + LowLevelLrfuCachePolicy lfu = new LowLevelLrfuCachePolicy(conf); + lfu.setEvictionListener(et); for (int i = 0; i < heapSize; ++i) { LlapCacheableBuffer buffer = LowLevelCacheImpl.allocateFake(); assertTrue(cache(lfu, et, buffer)); @@ -88,16 +90,24 @@ public class TestLowLevelLrfuCachePolicy verifyOrder(lfu, et, inserted); } + private Configuration createConf(int min, int heapSize) { + Configuration conf = new Configuration(); + conf.setInt(HiveConf.ConfVars.LLAP_ORC_CACHE_MIN_ALLOC.varname, min); + conf.setInt(HiveConf.ConfVars.LLAP_ORC_CACHE_MAX_SIZE.varname, heapSize); + return conf; + } + @Test public void testLruExtreme() { int heapSize = 4; LOG.info("Testing lambda 1 (LRU)"); Random rdm = new Random(1234); - HiveConf conf = new HiveConf(); + Configuration conf = createConf(1, heapSize); ArrayList<LlapCacheableBuffer> inserted = new ArrayList<LlapCacheableBuffer>(heapSize); conf.setFloat(HiveConf.ConfVars.LLAP_LRFU_LAMBDA.varname, 1.0f); EvictionTracker et = new EvictionTracker(); - LowLevelLrfuCachePolicy lru = new LowLevelLrfuCachePolicy(conf, 1, heapSize, et); + LowLevelLrfuCachePolicy lru = new LowLevelLrfuCachePolicy(conf); + lru.setEvictionListener(et); for (int i = 0; i < heapSize; ++i) { LlapCacheableBuffer buffer = LowLevelCacheImpl.allocateFake(); assertTrue(cache(lru, et, buffer)); @@ -120,7 +130,8 @@ public class TestLowLevelLrfuCachePolicy LOG.info("Testing deadlock resolution"); ArrayList<LlapCacheableBuffer> inserted = new ArrayList<LlapCacheableBuffer>(heapSize); EvictionTracker et = new EvictionTracker(); - LowLevelLrfuCachePolicy lrfu = new LowLevelLrfuCachePolicy(new HiveConf(), 1, heapSize, et); + LowLevelLrfuCachePolicy lrfu = new LowLevelLrfuCachePolicy(createConf(1, heapSize)); + lrfu.setEvictionListener(et); for (int i = 0; i < heapSize; ++i) { LlapCacheableBuffer buffer = LowLevelCacheImpl.allocateFake(); assertTrue(cache(lrfu, et, buffer)); @@ -171,10 +182,11 @@ public class TestLowLevelLrfuCachePolicy private void testHeapSize(int heapSize) { LOG.info("Testing heap size " + heapSize); Random rdm = new Random(1234); - HiveConf conf = new HiveConf(); + Configuration conf = createConf(1, heapSize); conf.setFloat(HiveConf.ConfVars.LLAP_LRFU_LAMBDA.varname, 0.2f); // very small heap, 14 elements EvictionTracker et = new EvictionTracker(); - LowLevelLrfuCachePolicy lrfu = new LowLevelLrfuCachePolicy(conf, 1, heapSize, et); + LowLevelLrfuCachePolicy lrfu = new LowLevelLrfuCachePolicy(conf); + lrfu.setEvictionListener(et); // Insert the number of elements plus 2, to trigger 2 evictions. int toEvict = 2; ArrayList<LlapCacheableBuffer> inserted = new ArrayList<LlapCacheableBuffer>(heapSize);