Author: sershe
Date: Sat Jan 17 01:26:50 2015
New Revision: 1652555

URL: http://svn.apache.org/r1652555
Log:
Add tests for cache, clean up allocator debug logging left over

Modified:
    
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
    
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapCacheableBuffer.java
    
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
    
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicyBase.java
    
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelFifoCachePolicy.java
    
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java
    
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
    
hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java
    
hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java
    
hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java

Modified: 
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
URL: 
http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java?rev=1652555&r1=1652554&r2=1652555&view=diff
==============================================================================
--- 
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
 (original)
+++ 
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
 Sat Jan 17 01:26:50 2015
@@ -28,7 +28,7 @@ import org.apache.hadoop.hive.conf.HiveC
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.llap.io.api.cache.LlapMemoryBuffer;
 
-final class BuddyAllocator {
+public final class BuddyAllocator implements Allocator {
   private static final Log LOG = LogFactory.getLog(BuddyAllocator.class);
 
   private final Arena[] arenas;
@@ -77,6 +77,7 @@ final class BuddyAllocator {
   }
 
   // TODO: would it make sense to return buffers asynchronously?
+  @Override
   public boolean allocateMultiple(LlapMemoryBuffer[] dest, int size) {
     assert size > 0 : "size is " + size;
     if (size > maxAllocation) {
@@ -133,12 +134,15 @@ final class BuddyAllocator {
     return fake;
   }
 
-  public void deallocate(LlapCacheableBuffer buffer) {
-    arenas[buffer.arenaIndex].deallocate(buffer);
+  @Override
+  public void deallocate(LlapMemoryBuffer buffer) {
+    LlapCacheableBuffer buf = (LlapCacheableBuffer)buffer;
+    arenas[buf.arenaIndex].deallocate(buf);
   }
 
   public String debugDump() {
-    StringBuilder result = new StringBuilder();
+    StringBuilder result = new StringBuilder(
+        "NOTE: with multiple threads the dump is not guaranteed to be 
consistent");
     for (Arena arena : arenas) {
       arena.debugDump(result);
     }
@@ -168,7 +172,6 @@ final class BuddyAllocator {
       for (int i = 0, offset = 0; i < maxMaxAllocs; ++i, offset += 
maxAllocation) {
         // TODO: will this cause bugs on large numbers due to some Java sign 
bit stupidity?
         headers[headerIndex] = makeHeader(allocLog2Diff, false);
-        LOG.info("TODO# 1 mucking with " + System.identityHashCode(data) + ":" 
+ offset);
         data.putInt(offset, (i == 0) ? -1 : (headerIndex - headerStep));
         data.putInt(offset + 4, (i == maxMaxAllocs - 1) ? -1 : (headerIndex + 
headerStep));
         headerIndex += headerStep;
@@ -181,26 +184,36 @@ final class BuddyAllocator {
         result.append(" not allocated");
         return;
       }
+      // Try to get as consistent view as we can; make copy of the headers.
+      byte[] headers = new byte[this.headers.length];
+      System.arraycopy(this.headers, 0, headers, 0, headers.length);
       for (int i = 0; i < headers.length; ++i) {
         byte header = headers[i];
         if (header == 0) continue;
         int freeListIx = (header >> 1) - 1, offset = offsetFromHeaderIndex(i);
         boolean isFree = (header & 1) == 0;
-        result.append("\n  block " + i + " at " + offset + ": size " + (1 << 
(freeListIx + minAllocLog2))
-            + ", " + (isFree ? "free" : "allocated"));
+        result.append("\n  block " + i + " at " + offset + ": size "
+        + (1 << (freeListIx + minAllocLog2)) + ", " + (isFree ? "free" : 
"allocated"));
       }
       int allocSize = minAllocation;
       for (int i = 0; i < freeLists.length; ++i, allocSize <<= 1) {
         result.append("\n  free list for size " + allocSize + ": ");
-        int nextItem = freeLists[i].listHead;
-        while (nextItem >= 0) {
-          result.append(nextItem + ", ");
-          nextItem = data.getInt(offsetFromHeaderIndex(nextItem));
+        FreeList freeList = freeLists[i];
+        freeList.lock.lock();
+        try {
+          int nextHeaderIx = freeList.listHead;
+          while (nextHeaderIx >= 0) {
+            result.append(nextHeaderIx + ", ");
+            nextHeaderIx = data.getInt(offsetFromHeaderIndex(nextHeaderIx));
+          }
+        } finally {
+          freeList.lock.unlock();
         }
       }
     }
 
-    private int allocateFast(int arenaIx, int freeListIx, LlapMemoryBuffer[] 
dest, int ix, int size) {
+    private int allocateFast(
+        int arenaIx, int freeListIx, LlapMemoryBuffer[] dest, int ix, int 
size) {
       if (data == null) return -1; // not allocated yet
       FreeList freeList = freeLists[freeListIx];
       if (!freeList.lock.tryLock()) return ix;
@@ -281,7 +294,6 @@ final class BuddyAllocator {
       if (headerIx == freeList.listHead) return;
       if (headerIx >= 0) {
         int newHeadOffset = offsetFromHeaderIndex(headerIx);
-        LOG.info("TODO# 3 mucking with " + System.identityHashCode(data) + ":" 
+ newHeadOffset);
         data.putInt(newHeadOffset, -1); // Remove backlink.
       }
       freeList.listHead = headerIx;
@@ -356,11 +368,9 @@ final class BuddyAllocator {
       if (freeList.listHead >= 0) {
         int oldHeadOffset = offsetFromHeaderIndex(freeList.listHead);
         assert data.getInt(oldHeadOffset) == -1;
-        LOG.info("TODO# 4 mucking with " + System.identityHashCode(data) + ":" 
+ oldHeadOffset);
         data.putInt(oldHeadOffset, headerIx);
       }
       int offset = offsetFromHeaderIndex(headerIx);
-      LOG.info("TODO# 5 mucking with " + System.identityHashCode(data) + ":" + 
offset);
       data.putInt(offset, -1);
       data.putInt(offset + 4, freeList.listHead);
       freeList.listHead = headerIx;
@@ -375,11 +385,9 @@ final class BuddyAllocator {
       }
       if (bpHeaderIx != -1) {
         data.putInt(offsetFromHeaderIndex(bpHeaderIx) + 4, bnHeaderIx);
-        LOG.info("TODO# 6 mucking with " + System.identityHashCode(data) + ":" 
+ offsetFromHeaderIndex(bpHeaderIx) + " + 4");
       }
       if (bnHeaderIx != -1) {
         data.putInt(offsetFromHeaderIndex(bnHeaderIx), bpHeaderIx);
-        LOG.info("TODO# 7 mucking with " + System.identityHashCode(data) + ":" 
+ offsetFromHeaderIndex(bnHeaderIx));
       }
     }
   }

Modified: 
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapCacheableBuffer.java
URL: 
http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapCacheableBuffer.java?rev=1652555&r1=1652554&r2=1652555&view=diff
==============================================================================
--- 
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapCacheableBuffer.java
 (original)
+++ 
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapCacheableBuffer.java
 Sat Jan 17 01:26:50 2015
@@ -25,6 +25,8 @@ import org.apache.hadoop.hive.llap.Debug
 import org.apache.hadoop.hive.llap.io.api.cache.LlapMemoryBuffer;
 import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
 
+import com.google.common.annotations.VisibleForTesting;
+
 public final class LlapCacheableBuffer extends LlapMemoryBuffer {
   private static final int EVICTED_REFCOUNT = -1;
   static final int IN_LIST = -2, NOT_IN_CACHE = -1;
@@ -47,6 +49,11 @@ public final class LlapCacheableBuffer e
   public LlapCacheableBuffer prev = null, next = null;
   public int indexInHeap = NOT_IN_CACHE;
 
+  @VisibleForTesting
+  int getRefCount() {
+    return refCount.get();
+  }
+
   @Override
   public int hashCode() {
     if (this.byteBuffer == null) return 0;

Modified: 
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
URL: 
http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java?rev=1652555&r1=1652554&r2=1652555&view=diff
==============================================================================
--- 
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
 (original)
+++ 
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
 Sat Jan 17 01:26:50 2015
@@ -30,8 +30,10 @@ import org.apache.hadoop.hive.llap.io.ap
 import org.apache.hadoop.hive.llap.io.api.cache.LowLevelCache;
 import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
 
+import com.google.common.annotations.VisibleForTesting;
+
 public class LowLevelCacheImpl implements LowLevelCache, EvictionListener {
-  private final BuddyAllocator allocator;
+  private final Allocator allocator;
 
   private AtomicInteger newEvictions = new AtomicInteger(0);
   private final Thread cleanupThread;
@@ -39,15 +41,23 @@ public class LowLevelCacheImpl implement
       new ConcurrentHashMap<String, FileCache>();
   private final LowLevelCachePolicyBase cachePolicy;
 
-  public LowLevelCacheImpl(Configuration conf) {
-    int minAllocation = HiveConf.getIntVar(conf, 
ConfVars.LLAP_ORC_CACHE_MIN_ALLOC);
-    long maxSize = HiveConf.getLongVar(conf, ConfVars.LLAP_ORC_CACHE_MAX_SIZE);
-    cachePolicy = HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_USE_LRFU)
-        ? new LowLevelLrfuCachePolicy(conf, minAllocation, maxSize, this)
-        : new LowLevelFifoCachePolicy(minAllocation, maxSize, this);
-    allocator = new BuddyAllocator(conf, cachePolicy);
-    cleanupThread = new CleanupThread();
-    cleanupThread.start();
+  public LowLevelCacheImpl(
+      Configuration conf, LowLevelCachePolicyBase cachePolicy, Allocator 
allocator) {
+    this(conf, cachePolicy, allocator, 600);
+  }
+
+  @VisibleForTesting
+  LowLevelCacheImpl(Configuration conf,
+      LowLevelCachePolicyBase cachePolicy, Allocator allocator, long 
cleanupInterval) {
+    this.cachePolicy = cachePolicy;
+    this.cachePolicy.setEvictionListener(this);
+    this.allocator = allocator;
+    if (cleanupInterval >= 0) {
+      cleanupThread = new CleanupThread(cleanupInterval);
+      cleanupThread.start();
+    } else {
+      cleanupThread = null;
+    }
   }
 
   @Override
@@ -242,10 +252,11 @@ public class LowLevelCacheImpl implement
   }
 
   private final class CleanupThread extends Thread {
-    private int APPROX_CLEANUP_INTERVAL_SEC = 600;
+    private final long approxCleanupIntervalSec;
 
-    public CleanupThread() {
+    public CleanupThread(long cleanupInterval) {
       super("Llap low level cache cleanup thread");
+      this.approxCleanupIntervalSec = cleanupInterval;
       setDaemon(true);
       setPriority(1);
     }
@@ -275,7 +286,7 @@ public class LowLevelCacheImpl implement
         }
       }
       // Duration is an estimate; if the size of the map changes, it can be 
very different.
-      long endTime = System.nanoTime() + APPROX_CLEANUP_INTERVAL_SEC * 
1000000000L;
+      long endTime = System.nanoTime() + approxCleanupIntervalSec * 
1000000000L;
       int leftToCheck = 0; // approximate
       for (FileCache fc : cache.values()) {
         leftToCheck += fc.cache.size();

Modified: 
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicyBase.java
URL: 
http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicyBase.java?rev=1652555&r1=1652554&r2=1652555&view=diff
==============================================================================
--- 
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicyBase.java
 (original)
+++ 
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicyBase.java
 Sat Jan 17 01:26:50 2015
@@ -22,13 +22,12 @@ import java.util.concurrent.atomic.Atomi
 
 public abstract class LowLevelCachePolicyBase implements LowLevelCachePolicy, 
MemoryManager {
   private final AtomicLong usedMemory;
-  private final long maxSize;
+  protected final long maxSize;
   private EvictionListener evictionListener;
 
-  public LowLevelCachePolicyBase(long maxSize, EvictionListener listener) {
+  public LowLevelCachePolicyBase(long maxSize) {
     this.maxSize = maxSize;
     this.usedMemory = new AtomicLong(0);
-    this.evictionListener = listener;
   }
 
   @Override
@@ -57,4 +56,8 @@ public abstract class LowLevelCachePolic
   }
 
   protected abstract long evictSomeBlocks(long memoryToReserve, 
EvictionListener listener);
+
+  public void setEvictionListener(EvictionListener evictionListener) {
+    this.evictionListener = evictionListener;
+  }
 }

Modified: 
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelFifoCachePolicy.java
URL: 
http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelFifoCachePolicy.java?rev=1652555&r1=1652554&r2=1652555&view=diff
==============================================================================
--- 
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelFifoCachePolicy.java
 (original)
+++ 
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelFifoCachePolicy.java
 Sat Jan 17 01:26:50 2015
@@ -24,14 +24,18 @@ import java.util.LinkedHashSet;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+
 public class LowLevelFifoCachePolicy extends LowLevelCachePolicyBase {
   private final Lock lock = new ReentrantLock();
   private final LinkedHashSet<LlapCacheableBuffer> buffers;
 
-  public LowLevelFifoCachePolicy(
-      int expectedBufferSize, long maxCacheSize, EvictionListener listener) {
-    super(maxCacheSize, listener);
-    int expectedBuffers = (int)Math.ceil((maxCacheSize * 1.0) / 
expectedBufferSize);
+  public LowLevelFifoCachePolicy(Configuration conf) {
+    super(HiveConf.getLongVar(conf, ConfVars.LLAP_ORC_CACHE_MAX_SIZE));
+    int expectedBufferSize = HiveConf.getIntVar(conf, 
ConfVars.LLAP_ORC_CACHE_MIN_ALLOC);
+    int expectedBuffers = (int)Math.ceil((maxSize * 1.0) / expectedBufferSize);
     buffers = new LinkedHashSet<LlapCacheableBuffer>((int)(expectedBuffers / 
0.75f));
   }
 

Modified: 
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java
URL: 
http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java?rev=1652555&r1=1652554&r2=1652555&view=diff
==============================================================================
--- 
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java
 (original)
+++ 
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java
 Sat Jan 17 01:26:50 2015
@@ -24,6 +24,7 @@ import java.util.concurrent.locks.Reentr
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.llap.DebugUtils;
 import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
 
@@ -62,11 +63,11 @@ public class LowLevelLrfuCachePolicy ext
   /** Number of elements. */
   private int heapSize = 0;
 
-  public LowLevelLrfuCachePolicy(Configuration conf,
-      long minBufferSize, long maxCacheSize, EvictionListener listener) {
-    super(maxCacheSize, listener);
+  public LowLevelLrfuCachePolicy(Configuration conf) {
+    super(HiveConf.getLongVar(conf, ConfVars.LLAP_ORC_CACHE_MAX_SIZE));
+    int minBufferSize = HiveConf.getIntVar(conf, 
ConfVars.LLAP_ORC_CACHE_MIN_ALLOC);
     lambda = HiveConf.getFloatVar(conf, HiveConf.ConfVars.LLAP_LRFU_LAMBDA);
-    int maxBuffers = (int)Math.ceil((maxCacheSize * 1.0) / minBufferSize);
+    int maxBuffers = (int)Math.ceil((maxSize * 1.0) / minBufferSize);
     int maxHeapSize = -1;
     if (lambda == 0) {
       maxHeapSize = maxBuffers; // lrfuThreshold is +inf in this case

Modified: 
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
URL: 
http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java?rev=1652555&r1=1652554&r2=1652555&view=diff
==============================================================================
--- 
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
 (original)
+++ 
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
 Sat Jan 17 01:26:50 2015
@@ -26,8 +26,13 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.llap.cache.Allocator;
+import org.apache.hadoop.hive.llap.cache.BuddyAllocator;
 import org.apache.hadoop.hive.llap.cache.Cache;
 import org.apache.hadoop.hive.llap.cache.LowLevelCacheImpl;
+import org.apache.hadoop.hive.llap.cache.LowLevelCachePolicyBase;
+import org.apache.hadoop.hive.llap.cache.LowLevelFifoCachePolicy;
+import org.apache.hadoop.hive.llap.cache.LowLevelLrfuCachePolicy;
 import org.apache.hadoop.hive.llap.cache.NoopCache;
 import org.apache.hadoop.hive.llap.io.api.LlapIo;
 import org.apache.hadoop.hive.llap.io.api.VectorReader;
@@ -56,7 +61,14 @@ public class LlapIoImpl implements LlapI
     boolean useLowLevelCache = HiveConf.getBoolVar(conf, 
HiveConf.ConfVars.LLAP_LOW_LEVEL_CACHE);
     // High-level cache not supported yet.
     Cache<OrcCacheKey> cache = useLowLevelCache ? null : new 
NoopCache<OrcCacheKey>();
-    LowLevelCacheImpl orcCache = useLowLevelCache ? new 
LowLevelCacheImpl(conf) : null;
+    LowLevelCacheImpl orcCache = null;
+    if (useLowLevelCache) {
+      boolean useLrfu = HiveConf.getBoolVar(conf, 
HiveConf.ConfVars.LLAP_USE_LRFU);
+      LowLevelCachePolicyBase cachePolicy =
+          useLrfu ? new LowLevelLrfuCachePolicy(conf) : new 
LowLevelFifoCachePolicy(conf);
+      Allocator allocator = new BuddyAllocator(conf, cachePolicy);
+      orcCache = new LowLevelCacheImpl(conf, cachePolicy, allocator);
+    }
     this.edp = new OrcEncodedDataProducer(orcCache, cache, conf);
     this.cvp = new OrcColumnVectorProducer(edp, conf);
   }

Modified: 
hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java
URL: 
http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java?rev=1652555&r1=1652554&r2=1652555&view=diff
==============================================================================
--- 
hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java
 (original)
+++ 
hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java
 Sat Jan 17 01:26:50 2015
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hive.llap.cache;
 
 import java.util.Random;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.FutureTask;
@@ -57,7 +58,7 @@ public class TestBuddyAllocator {
     int min = 3, max = 8, maxAlloc = 1 << max;
     Configuration conf = createConf(1 << min, maxAlloc, maxAlloc, maxAlloc);
     BuddyAllocator a = new BuddyAllocator(conf, new DummyMemoryManager());
-    for (int i = min; i <= max; i <<= 1) {
+    for (int i = min; i <= max; ++i) {
       allocSameSize(a, 1 << (max - i), i);
     }
   }
@@ -76,19 +77,23 @@ public class TestBuddyAllocator {
     Configuration conf = createConf(1 << min, maxAlloc, maxAlloc * 8, maxAlloc 
* 24);
     final BuddyAllocator a = new BuddyAllocator(conf, new 
DummyMemoryManager());
     ExecutorService executor = Executors.newFixedThreadPool(3);
+    final CountDownLatch cdlIn = new CountDownLatch(3), cdlOut = new 
CountDownLatch(1);
     FutureTask<Object> upTask = new FutureTask<Object>(new Runnable() {
       public void run() {
+        syncThreadStart(cdlIn, cdlOut);
         allocateUp(a, min, max, allocsPerSize, false);
         allocateUp(a, min, max, allocsPerSize, true);
       }
     }, null), downTask = new FutureTask<Object>(new Runnable() {
       public void run() {
+        syncThreadStart(cdlIn, cdlOut);
         allocateDown(a, min, max, allocsPerSize, false);
         allocateDown(a, min, max, allocsPerSize, true);
       }
     }, null), sameTask = new FutureTask<Object>(new Runnable() {
       public void run() {
-        for (int i = min; i <= max; i <<= 1) {
+        syncThreadStart(cdlIn, cdlOut);
+        for (int i = min; i <= max; ++i) {
           allocSameSize(a, (1 << (max - i)) * allocsPerSize, i);
         }
       }
@@ -97,6 +102,8 @@ public class TestBuddyAllocator {
     executor.execute(upTask);
     executor.execute(downTask);
     try {
+      cdlIn.await(); // Wait for all threads to be ready.
+      cdlOut.countDown(); // Release them at the same time.
       upTask.get();
       downTask.get();
       sameTask.get();
@@ -105,6 +112,15 @@ public class TestBuddyAllocator {
     }
   }
 
+  private void syncThreadStart(final CountDownLatch cdlIn, final 
CountDownLatch cdlOut) {
+    cdlIn.countDown();
+    try {
+      cdlOut.await();
+    } catch (InterruptedException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
   private void testVariableSizeInternal(int allocCount, int arenaSizeMult, int 
arenaCount) {
     int min = 3, max = 8, maxAlloc = 1 << max, arenaSize = maxAlloc * 
arenaSizeMult;
     Configuration conf = createConf(1 << min, maxAlloc, arenaSize, arenaSize * 
arenaCount);

Modified: 
hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java
URL: 
http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java?rev=1652555&r1=1652554&r2=1652555&view=diff
==============================================================================
--- 
hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java
 (original)
+++ 
hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java
 Sat Jan 17 01:26:50 2015
@@ -17,13 +17,300 @@
  */
 package org.apache.hadoop.hive.llap.cache;
 
+import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.management.RuntimeErrorException;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.llap.io.api.cache.LlapMemoryBuffer;
 import org.junit.Assume;
 import org.junit.Test;
 import static org.junit.Assert.*;
 
 public class TestLowLevelCacheImpl {
   private static final Log LOG = 
LogFactory.getLog(TestLowLevelCacheImpl.class);
+
+  private static class DummyAllocator implements Allocator {
+    @Override
+    public boolean allocateMultiple(LlapMemoryBuffer[] dest, int size) {
+      for (int i = 0; i < dest.length; ++i) {
+        LlapCacheableBuffer buf = new LlapCacheableBuffer();
+        buf.initialize(0, null, -1, size);
+        dest[i] = buf;
+      }
+      return true;
+    }
+
+    @Override
+    public void deallocate(LlapMemoryBuffer buffer) {
+    }
+  }
+
+  private static class DummyCachePolicy extends LowLevelCachePolicyBase {
+    public DummyCachePolicy(long maxSize) {
+      super(maxSize);
+    }
+
+    public void cache(LlapCacheableBuffer buffer) {
+    }
+
+    public void notifyLock(LlapCacheableBuffer buffer) {
+    }
+
+    public void notifyUnlock(LlapCacheableBuffer buffer) {
+    }
+
+    protected long evictSomeBlocks(long memoryToReserve, EvictionListener 
listener) {
+      return memoryToReserve;
+    }
+  }
+
+  @Test
+  public void testGetPut() {
+    Configuration conf = createConf();
+    LowLevelCacheImpl cache = new LowLevelCacheImpl(
+        conf, new DummyCachePolicy(10), new DummyAllocator(), -1); // no 
cleanup thread
+    String fn1 = "file1".intern(), fn2 = "file2".intern();
+    LlapMemoryBuffer[] fakes = new LlapMemoryBuffer[] { fb(), fb(), fb(), 
fb(), fb(), fb() };
+    verifyRefcount(fakes, 1, 1, 1, 1, 1, 1);
+    assertNull(cache.putFileData(fn1, new long[] { 1, 2 }, fbs(fakes, 0, 1)));
+    assertNull(cache.putFileData(fn2, new long[] { 1, 2 }, fbs(fakes, 2, 3)));
+    assertArrayEquals(fbs(fakes, 0, 1), cache.getFileData(fn1, new long[] { 1, 
2 }));
+    assertArrayEquals(fbs(fakes, 2, 3), cache.getFileData(fn2, new long[] { 1, 
2 }));
+    assertArrayEquals(fbs(fakes, 1, -1), cache.getFileData(fn1, new long[] { 
2, 3 }));
+    verifyRefcount(fakes, 2, 3, 2, 2, 1, 1);
+    LlapMemoryBuffer[] bufsDiff = fbs(fakes, 4, 5);
+    long[] mask = cache.putFileData(fn1, new long[] { 3, 1 }, bufsDiff);
+    assertEquals(1, mask.length);
+    assertEquals(2, mask[0]); // 2nd bit set - element 2 was already in cache.
+    assertSame(fakes[0], bufsDiff[1]); // Should have been replaced
+    verifyRefcount(fakes, 3, 3, 2, 2, 1, 0);
+    assertArrayEquals(fbs(fakes, 0, 1, 4), cache.getFileData(fn1, new long[] { 
1, 2, 3 }));
+    verifyRefcount(fakes, 4, 4, 2, 2, 2, 0);
+  }
+
+  @Test
+  public void testStaleValueGet() {
+    Configuration conf = createConf();
+    LowLevelCacheImpl cache = new LowLevelCacheImpl(
+        conf, new DummyCachePolicy(10), new DummyAllocator(), -1); // no 
cleanup thread
+    String fn1 = "file1".intern(), fn2 = "file2".intern();
+    LlapMemoryBuffer[] fakes = new LlapMemoryBuffer[] { fb(), fb(), fb() };
+    assertNull(cache.putFileData(fn1, new long[] { 1, 2 }, fbs(fakes, 0, 1)));
+    assertNull(cache.putFileData(fn2, new long[] { 1 }, fbs(fakes, 2)));
+    assertArrayEquals(fbs(fakes, 0, 1), cache.getFileData(fn1, new long[] { 1, 
2 }));
+    assertArrayEquals(fbs(fakes, 2), cache.getFileData(fn2, new long[] { 1 }));
+    verifyRefcount(fakes, 2, 2, 2);
+    evict(cache, fakes[0]);
+    evict(cache, fakes[2]);
+    assertArrayEquals(fbs(fakes, -1, 1), cache.getFileData(fn1, new long[] { 
1, 2 }));
+    assertNull(cache.getFileData(fn2, new long[] { 1 }));
+    verifyRefcount(fakes, -1, 3, -1);
+  }
+
+  @Test
+  public void testStaleValueReplace() {
+    Configuration conf = createConf();
+    LowLevelCacheImpl cache = new LowLevelCacheImpl(
+        conf, new DummyCachePolicy(10), new DummyAllocator(), -1); // no 
cleanup thread
+    String fn1 = "file1".intern(), fn2 = "file2".intern();
+    LlapMemoryBuffer[] fakes = new LlapMemoryBuffer[] {
+        fb(), fb(), fb(), fb(), fb(), fb(), fb(), fb(), fb() };
+    assertNull(cache.putFileData(fn1, new long[] { 1, 2, 3 }, fbs(fakes, 0, 1, 
2)));
+    assertNull(cache.putFileData(fn2, new long[] { 1 }, fbs(fakes, 3)));
+    evict(cache, fakes[0]);
+    evict(cache, fakes[3]);
+    long[] mask = cache.putFileData(fn1, new long[] { 1, 2, 3, 4 }, fbs(fakes, 
4, 5, 6, 7));
+    assertEquals(1, mask.length);
+    assertEquals(6, mask[0]); // Buffers at offset 2 & 3 exist; 1 exists and 
is stale; 4 doesn't
+    assertNull(cache.putFileData(fn2, new long[] { 1 }, fbs(fakes, 8)));
+    assertArrayEquals(fbs(fakes, 4, 2, 3, 7), cache.getFileData(fn1, new 
long[] { 1, 2, 3, 4 }));
+  }
+
+  @Test
+  public void testMTTWithCleanup() {
+    Configuration conf = createConf();
+    final LowLevelCacheImpl cache = new LowLevelCacheImpl(
+        conf, new DummyCachePolicy(10), new DummyAllocator(), 1);
+    final String fn1 = "file1".intern(), fn2 = "file2".intern();
+    final int offsetsToUse = 8;
+    final CountDownLatch cdlIn = new CountDownLatch(4), cdlOut = new 
CountDownLatch(1);
+    final AtomicInteger rdmsDone = new AtomicInteger(0);
+    Callable<Long> rdmCall = new Callable<Long>() {
+      public Long call() {
+        int gets = 0, puts = 0;
+        try {
+          Random rdm = new Random(1234 + Thread.currentThread().getId());
+          syncThreadStart(cdlIn, cdlOut);
+          for (int i = 0; i < 20000; ++i) {
+            boolean isGet = rdm.nextBoolean(), isFn1 = rdm.nextBoolean();
+            String fileName = isFn1 ? fn1 : fn2;
+            int fileIndex = isFn1 ? 1 : 2;
+            int count = rdm.nextInt(offsetsToUse);
+            long[] offsets = new long[count];
+            for (int j = 0; j < offsets.length; ++j) {
+              offsets[j] = rdm.nextInt(offsetsToUse);
+            }
+            if (isGet) {
+              LlapMemoryBuffer[] results = cache.getFileData(fileName, 
offsets);
+              if (results == null) continue;
+              for (int j = 0; j < offsets.length; ++j) {
+                if (results[j] == null) continue;
+                ++gets;
+                LlapCacheableBuffer result = (LlapCacheableBuffer)(results[j]);
+                assertEquals(makeFakeArenaIndex(fileIndex, offsets[j]), 
result.arenaIndex);
+                result.decRef();
+              }
+            } else {
+              LlapMemoryBuffer[] buffers = new LlapMemoryBuffer[count];
+              for (int j = 0; j < offsets.length; ++j) {
+                LlapCacheableBuffer buf = LowLevelCacheImpl.allocateFake();
+                buf.incRef();
+                buf.arenaIndex = makeFakeArenaIndex(fileIndex, offsets[j]);
+                buffers[j] = buf;
+              }
+              long[] mask = cache.putFileData(fileName, offsets, buffers);
+              puts += buffers.length;
+              long maskVal = 0;
+              if (mask != null) {
+                assertEquals(1, mask.length);
+                maskVal = mask[0];
+              }
+              for (int j = 0; j < offsets.length; ++j) {
+                LlapCacheableBuffer buf = (LlapCacheableBuffer)(buffers[j]);
+                if ((maskVal & 1) == 1) {
+                  assertEquals(makeFakeArenaIndex(fileIndex, offsets[j]), 
buf.arenaIndex);
+                }
+                maskVal >>= 1;
+                buf.decRef();
+              }
+            }
+          }
+        } finally {
+          rdmsDone.incrementAndGet();
+        }
+        return  (((long)gets) << 32) | puts;
+      }
+
+      private int makeFakeArenaIndex(int fileIndex, long offset) {
+        return (int)((fileIndex << 16) + offset);
+      }
+    };
+
+    FutureTask<Integer> evictionTask = new FutureTask<Integer>(new 
Callable<Integer>() {
+      public Integer call() {
+        boolean isFirstFile = false;
+        long[] offsets = new long[offsetsToUse];
+        Random rdm = new Random(1234 + Thread.currentThread().getId());
+        for (int i = 0; i < offsetsToUse; ++i) {
+          offsets[i] = i;
+        }
+        int evictions = 0;
+        syncThreadStart(cdlIn, cdlOut);
+        while (rdmsDone.get() < 3) {
+          isFirstFile = !isFirstFile;
+          String fileName = isFirstFile ? fn1 : fn2;
+          LlapMemoryBuffer[] results = cache.getFileData(fileName, offsets);
+          if (results == null) continue;
+          int startIndex = rdm.nextInt(results.length), index = startIndex;
+          LlapCacheableBuffer victim = null;
+          do {
+            if (results[index] != null) {
+              LlapCacheableBuffer result = (LlapCacheableBuffer)results[index];
+              result.decRef();
+              if (victim == null && result.invalidate()) {
+                ++evictions;
+                victim = result;
+              }
+            }
+            ++index;
+            if (index == results.length) index = 0;
+          } while (index != startIndex);
+          if (victim == null) continue;
+          cache.notifyEvicted(victim);
+        }
+        return evictions;
+      }
+    });
+
+    FutureTask<Long> rdmTask1 = new FutureTask<Long>(rdmCall),
+        rdmTask2 = new FutureTask<Long>(rdmCall), rdmTask3 = new 
FutureTask<Long>(rdmCall);
+    Executor threadPool = Executors.newFixedThreadPool(4);
+    threadPool.execute(rdmTask1);
+    threadPool.execute(rdmTask2);
+    threadPool.execute(rdmTask3);
+    threadPool.execute(evictionTask);
+    try {
+      cdlIn.await();
+      cdlOut.countDown();
+      long result1 = rdmTask1.get(), result2 = rdmTask2.get(), result3 = 
rdmTask3.get();
+      int evictions = evictionTask.get();
+      LOG.info("MTT test: task 1: " + descRdmTask(result1)  + ", task 2: " + 
descRdmTask(result2)
+          + ", task 3: " + descRdmTask(result3) + "; " + evictions + " 
evictions");
+    } catch (Throwable t) {
+      throw new RuntimeException(t);
+    }
+  }
+
+  private String descRdmTask(long result) {
+    return (result >>> 32) + " successful gets, " + (result & ((1L << 32) - 
1)) + " puts";
+  }
+
+  private void syncThreadStart(final CountDownLatch cdlIn, final 
CountDownLatch cdlOut) {
+    cdlIn.countDown();
+    try {
+      cdlOut.await();
+    } catch (InterruptedException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private void evict(LowLevelCacheImpl cache, LlapMemoryBuffer fake) {
+    LlapCacheableBuffer victimBuffer = (LlapCacheableBuffer)fake;
+    int refCount = victimBuffer.getRefCount();
+    for (int i = 0; i < refCount; ++i) {
+      victimBuffer.decRef();
+    }
+    assertTrue(victimBuffer.invalidate());
+    cache.notifyEvicted(victimBuffer);
+  }
+
+  private void verifyRefcount(LlapMemoryBuffer[] fakes, int... refCounts) {
+    for (int i = 0; i < refCounts.length; ++i) {
+      assertEquals(refCounts[i], 
((LlapCacheableBuffer)fakes[i]).getRefCount());
+    }
+  }
+
+  private LlapMemoryBuffer[] fbs(LlapMemoryBuffer[] fakes, int... indexes) {
+    LlapMemoryBuffer[] rv = new LlapMemoryBuffer[indexes.length];
+    for (int i = 0; i < indexes.length; ++i) {
+      rv[i] = (indexes[i] == -1) ? null : fakes[indexes[i]];
+    }
+    return rv;
+  }
+
+  private LlapCacheableBuffer fb() {
+    LlapCacheableBuffer fake = LowLevelCacheImpl.allocateFake();
+    fake.incRef();
+    return fake;
+  }
+
+  private Configuration createConf() {
+    Configuration conf = new Configuration();
+    conf.setInt(ConfVars.LLAP_ORC_CACHE_MIN_ALLOC.varname, 3);
+    conf.setInt(ConfVars.LLAP_ORC_CACHE_MAX_ALLOC.varname, 8);
+    conf.setInt(ConfVars.LLAP_ORC_CACHE_ARENA_SIZE.varname, 8);
+    conf.setLong(ConfVars.LLAP_ORC_CACHE_MAX_SIZE.varname, 8);
+    return conf;
+  }
 }

Modified: 
hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java
URL: 
http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java?rev=1652555&r1=1652554&r2=1652555&view=diff
==============================================================================
--- 
hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java
 (original)
+++ 
hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java
 Sat Jan 17 01:26:50 2015
@@ -24,6 +24,7 @@ import java.util.Random;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.junit.Assume;
 import org.junit.Test;
@@ -66,11 +67,12 @@ public class TestLowLevelLrfuCachePolicy
     int heapSize = 4;
     LOG.info("Testing lambda 0 (LFU)");
     Random rdm = new Random(1234);
-    HiveConf conf = new HiveConf();
+    Configuration conf = createConf(1, heapSize);
     ArrayList<LlapCacheableBuffer> inserted = new 
ArrayList<LlapCacheableBuffer>(heapSize);
     conf.setFloat(HiveConf.ConfVars.LLAP_LRFU_LAMBDA.varname, 0.0f);
     EvictionTracker et = new EvictionTracker();
-    LowLevelLrfuCachePolicy lfu = new LowLevelLrfuCachePolicy(conf, 1, 
heapSize, et);
+    LowLevelLrfuCachePolicy lfu = new LowLevelLrfuCachePolicy(conf);
+    lfu.setEvictionListener(et);
     for (int i = 0; i < heapSize; ++i) {
       LlapCacheableBuffer buffer = LowLevelCacheImpl.allocateFake();
       assertTrue(cache(lfu, et, buffer));
@@ -88,16 +90,24 @@ public class TestLowLevelLrfuCachePolicy
     verifyOrder(lfu, et, inserted);
   }
 
+  private Configuration createConf(int min, int heapSize) {
+    Configuration conf = new Configuration();
+    conf.setInt(HiveConf.ConfVars.LLAP_ORC_CACHE_MIN_ALLOC.varname, min);
+    conf.setInt(HiveConf.ConfVars.LLAP_ORC_CACHE_MAX_SIZE.varname, heapSize);
+    return conf;
+  }
+
   @Test
   public void testLruExtreme() {
     int heapSize = 4;
     LOG.info("Testing lambda 1 (LRU)");
     Random rdm = new Random(1234);
-    HiveConf conf = new HiveConf();
+    Configuration conf = createConf(1, heapSize);
     ArrayList<LlapCacheableBuffer> inserted = new 
ArrayList<LlapCacheableBuffer>(heapSize);
     conf.setFloat(HiveConf.ConfVars.LLAP_LRFU_LAMBDA.varname, 1.0f);
     EvictionTracker et = new EvictionTracker();
-    LowLevelLrfuCachePolicy lru = new LowLevelLrfuCachePolicy(conf, 1, 
heapSize, et);
+    LowLevelLrfuCachePolicy lru = new LowLevelLrfuCachePolicy(conf);
+    lru.setEvictionListener(et);
     for (int i = 0; i < heapSize; ++i) {
       LlapCacheableBuffer buffer = LowLevelCacheImpl.allocateFake();
       assertTrue(cache(lru, et, buffer));
@@ -120,7 +130,8 @@ public class TestLowLevelLrfuCachePolicy
     LOG.info("Testing deadlock resolution");
     ArrayList<LlapCacheableBuffer> inserted = new 
ArrayList<LlapCacheableBuffer>(heapSize);
     EvictionTracker et = new EvictionTracker();
-    LowLevelLrfuCachePolicy lrfu = new LowLevelLrfuCachePolicy(new HiveConf(), 
1, heapSize, et);
+    LowLevelLrfuCachePolicy lrfu = new LowLevelLrfuCachePolicy(createConf(1, 
heapSize));
+    lrfu.setEvictionListener(et);
     for (int i = 0; i < heapSize; ++i) {
       LlapCacheableBuffer buffer = LowLevelCacheImpl.allocateFake();
       assertTrue(cache(lrfu, et, buffer));
@@ -171,10 +182,11 @@ public class TestLowLevelLrfuCachePolicy
   private void testHeapSize(int heapSize) {
     LOG.info("Testing heap size " + heapSize);
     Random rdm = new Random(1234);
-    HiveConf conf = new HiveConf();
+    Configuration conf = createConf(1, heapSize);
     conf.setFloat(HiveConf.ConfVars.LLAP_LRFU_LAMBDA.varname, 0.2f); // very 
small heap, 14 elements
     EvictionTracker et = new EvictionTracker();
-    LowLevelLrfuCachePolicy lrfu = new LowLevelLrfuCachePolicy(conf, 1, 
heapSize, et);
+    LowLevelLrfuCachePolicy lrfu = new LowLevelLrfuCachePolicy(conf);
+    lrfu.setEvictionListener(et);
     // Insert the number of elements plus 2, to trigger 2 evictions.
     int toEvict = 2;
     ArrayList<LlapCacheableBuffer> inserted = new 
ArrayList<LlapCacheableBuffer>(heapSize);


Reply via email to