Repository: hive
Updated Branches:
  refs/heads/branch-2 da84a1d39 -> d988d4aef


HIVE-19704 : LLAP IO retries on branch-2 should be stoppable (Sergey Shelukhin, 
reviewed by Prasanth Jayachandran)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/d988d4ae
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/d988d4ae
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/d988d4ae

Branch: refs/heads/branch-2
Commit: d988d4aef6405b18652cf1b7304f616894c72a8e
Parents: da84a1d
Author: sergey <ser...@apache.org>
Authored: Tue May 29 13:28:42 2018 -0700
Committer: sergey <ser...@apache.org>
Committed: Tue May 29 13:28:42 2018 -0700

----------------------------------------------------------------------
 .../hadoop/hive/llap/cache/BuddyAllocator.java  | 14 ++++++--
 .../llap/cache/LowLevelCacheMemoryManager.java  | 23 ++++++++++---
 .../hadoop/hive/llap/cache/MemoryManager.java   |  4 ++-
 .../llap/io/encoded/OrcEncodedDataReader.java   | 17 +++++----
 .../llap/io/encoded/SerDeEncodedDataReader.java | 36 +++++++++++++++-----
 .../hive/llap/io/metadata/OrcMetadataCache.java | 14 ++++----
 .../hive/llap/cache/TestBuddyAllocator.java     |  3 +-
 .../llap/cache/TestLowLevelLrfuCachePolicy.java | 12 +++----
 .../hive/llap/cache/TestOrcMetadataCache.java   | 16 +++++----
 .../hive/ql/io/orc/encoded/EncodedReader.java   |  3 ++
 .../ql/io/orc/encoded/EncodedReaderImpl.java    | 28 ++++++++++++---
 .../ql/io/orc/encoded/StoppableAllocator.java   | 29 ++++++++++++++++
 12 files changed, 150 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/d988d4ae/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java 
b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
index 302918a..af9243a 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hive.llap.cache;
 
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -44,9 +45,10 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
 import org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheMetrics;
+import org.apache.hadoop.hive.ql.io.orc.encoded.StoppableAllocator;
 
 public final class BuddyAllocator
-  implements EvictionAwareAllocator, BuddyAllocatorMXBean, LlapOomDebugDump {
+  implements EvictionAwareAllocator, StoppableAllocator, BuddyAllocatorMXBean, 
LlapOomDebugDump {
   private final Arena[] arenas;
   private final AtomicInteger allocatedArenas = new AtomicInteger(0);
 
@@ -183,10 +185,16 @@ public final class BuddyAllocator
     metrics.incrAllocatedArena();
   }
 
-  // TODO: would it make sense to return buffers asynchronously?
+
   @Override
   public void allocateMultiple(MemoryBuffer[] dest, int size)
       throws AllocatorOutOfMemoryException {
+    allocateMultiple(dest, size, null);
+  }
+
+  @Override
+  public void allocateMultiple(MemoryBuffer[] dest, int size, AtomicBoolean 
isStopped)
+      throws AllocatorOutOfMemoryException {
     assert size > 0 : "size is " + size;
     if (size > maxAllocation) {
       throw new RuntimeException("Trying to allocate " + size + "; max is " + 
maxAllocation);
@@ -197,7 +205,7 @@ public final class BuddyAllocator
     int allocLog2 = freeListIx + minAllocLog2;
     int allocationSize = 1 << allocLog2;
     // TODO: reserving the entire thing is not ideal before we alloc anything. 
Interleave?
-    memoryManager.reserveMemory(dest.length << allocLog2);
+    memoryManager.reserveMemory(dest.length << allocLog2, isStopped);
     int destAllocIx = 0;
     for (int i = 0; i < dest.length; ++i) {
       if (dest[i] != null) continue;

http://git-wip-us.apache.org/repos/asf/hive/blob/d988d4ae/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java
index e331f1b..e30acb0 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hive.llap.cache;
 
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
@@ -53,18 +54,26 @@ public class LowLevelCacheMemoryManager implements 
MemoryManager {
     }
   }
 
+  public static class ReserveFailedException extends RuntimeException {
+    private static final long serialVersionUID = 1L;
+    public ReserveFailedException(AtomicBoolean isStopped) {
+      super("Cannot reserve memory"
+          + (Thread.currentThread().isInterrupted() ? "; thread interrupted" : 
"")
+          + ((isStopped != null && isStopped.get()) ? "; thread stopped" : 
""));
+    }
+  }
 
   @Override
-  public void reserveMemory(final long memoryToReserve) {
-    boolean result = reserveMemory(memoryToReserve, true);
+  public void reserveMemory(final long memoryToReserve, AtomicBoolean 
isStopped) {
+    boolean result = reserveMemory(memoryToReserve, true, isStopped);
     if (result) return;
     // Can only happen if there's no evictor, or if thread is interrupted.
-    throw new RuntimeException("Cannot reserve memory"
-        + (Thread.currentThread().isInterrupted() ? "; thread interrupted" : 
""));
+    throw new ReserveFailedException(isStopped);
   }
 
   @VisibleForTesting
-  public boolean reserveMemory(final long memoryToReserve, boolean 
waitForEviction) {
+  public boolean reserveMemory(final long memoryToReserve,
+      boolean waitForEviction, AtomicBoolean isStopped) {
     // TODO: if this cannot evict enough, it will spin infinitely. Terminate 
at some point?
     int badCallCount = 0;
     long evictedTotalMetric = 0, reservedTotalMetric = 0, remainingToReserve = 
memoryToReserve;
@@ -100,6 +109,10 @@ public class LowLevelCacheMemoryManager implements 
MemoryManager {
           result = false;
           break;
         }
+        if (isStopped != null && isStopped.get()) {
+          result = false;
+          break;
+        }
         continue;
       }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/d988d4ae/llap-server/src/java/org/apache/hadoop/hive/llap/cache/MemoryManager.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/MemoryManager.java 
b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/MemoryManager.java
index 0f4d3c0..e1133cd 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/MemoryManager.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/MemoryManager.java
@@ -18,10 +18,12 @@
 
 package org.apache.hadoop.hive.llap.cache;
 
+import java.util.concurrent.atomic.AtomicBoolean;
+
 public interface MemoryManager extends LlapOomDebugDump {
   void releaseMemory(long memUsage);
   void updateMaxSize(long maxSize);
   /** TODO: temporary method until we get a better allocator. */
   long forceReservedMemory(int allocationSize, int count);
-  void reserveMemory(long memoryToReserve);
+  void reserveMemory(long memoryToReserve, AtomicBoolean isStopped);
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/d988d4ae/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
index 0fd8139..655ce83 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
@@ -24,6 +24,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.hadoop.hive.llap.counters.LlapIOCounters;
 import org.apache.orc.TypeDescription;
@@ -160,7 +161,7 @@ public class OrcEncodedDataReader extends 
CallableWithNdc<Void>
    * Contains only stripes that are read, and only columns included. null => 
read all RGs.
    */
   private boolean[][] stripeRgs;
-  private volatile boolean isStopped = false;
+  private AtomicBoolean isStopped = new AtomicBoolean(false);
   @SuppressWarnings("unused")
   private volatile boolean isPaused = false;
 
@@ -226,7 +227,7 @@ public class OrcEncodedDataReader extends 
CallableWithNdc<Void>
   @Override
   public void stop() {
     LOG.debug("Encoded reader is being stopped");
-    isStopped = true;
+    isStopped.set(true);
   }
 
   @Override
@@ -330,6 +331,7 @@ public class OrcEncodedDataReader extends 
CallableWithNdc<Void>
       DataWrapperForOrc dw = new DataWrapperForOrc();
       stripeReader = orcReader.encodedReader(fileKey, dw, dw, POOL_FACTORY, 
trace);
       stripeReader.setTracing(LlapIoImpl.ORC_LOGGER.isTraceEnabled());
+      stripeReader.setStopped(isStopped);
     } catch (Throwable t) {
       handleReaderError(startTime, t);
       return null;
@@ -383,7 +385,8 @@ public class OrcEncodedDataReader extends 
CallableWithNdc<Void>
                 orcReader.getSchema(), orcReader.getWriterVersion());
             counters.incrTimeCounter(LlapIOCounters.HDFS_TIME_NS, 
startTimeHdfs);
             if (hasFileId && metadataCache != null) {
-              OrcStripeMetadata newMetadata = 
metadataCache.putStripeMetadata(stripeMetadata);
+              OrcStripeMetadata newMetadata = metadataCache.putStripeMetadata(
+                  stripeMetadata, isStopped);
               isFoundInCache = newMetadata != stripeMetadata; // May be cached 
concurrently.
               stripeMetadata = newMetadata;
               if (LlapIoImpl.ORC_LOGGER.isTraceEnabled()) {
@@ -510,7 +513,7 @@ public class OrcEncodedDataReader extends 
CallableWithNdc<Void>
   }
 
   private boolean processStop() {
-    if (!isStopped) return false;
+    if (!isStopped.get()) return false;
     LOG.info("Encoded data reader is stopping");
     tracePool.offer(trace);
     cleanupReaders();
@@ -620,7 +623,7 @@ public class OrcEncodedDataReader extends 
CallableWithNdc<Void>
     // We assume this call doesn't touch HDFS because everything is already 
read; don't add time.
     metadata = new OrcFileMetadata(fileKey, orcReader);
     if (fileKey == null || metadataCache == null) return metadata;
-    return metadataCache.putFileMetadata(metadata);
+    return metadataCache.putFileMetadata(metadata, isStopped);
   }
 
   /**
@@ -649,7 +652,7 @@ public class OrcEncodedDataReader extends 
CallableWithNdc<Void>
               orcReader.getWriterVersion());
           counters.incrTimeCounter(LlapIOCounters.HDFS_TIME_NS, startTime);
           if (hasFileId && metadataCache != null) {
-            value = metadataCache.putStripeMetadata(value);
+            value = metadataCache.putStripeMetadata(value, isStopped);
             if (LlapIoImpl.ORC_LOGGER.isTraceEnabled()) {
               LlapIoImpl.ORC_LOGGER.trace("Caching stripe {} metadata with 
includes: {}",
                   stripeKey.stripeIx, DebugUtils.toString(globalInc));
@@ -862,7 +865,7 @@ public class OrcEncodedDataReader extends 
CallableWithNdc<Void>
         return lowLevelCache.putFileData(
             fileKey, ranges, data, baseOffset, Priority.NORMAL, counters);
       } else if (metadataCache != null) {
-        metadataCache.putIncompleteCbs(fileKey, ranges, baseOffset);
+        metadataCache.putIncompleteCbs(fileKey, ranges, baseOffset, isStopped);
       }
       return null;
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/d988d4ae/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java
index a088e27..35d6178 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java
@@ -29,11 +29,13 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.Pool.PoolObjectHelper;
+import org.apache.hadoop.hive.common.io.Allocator;
 import org.apache.hadoop.hive.common.io.DataCache.BooleanRef;
 import org.apache.hadoop.hive.common.io.DiskRangeList;
 import org.apache.hadoop.hive.common.io.DataCache.DiskRangeListFactory;
@@ -66,6 +68,7 @@ import org.apache.hadoop.hive.ql.io.orc.Reader;
 import org.apache.hadoop.hive.ql.io.orc.Writer;
 import org.apache.hadoop.hive.ql.io.orc.encoded.CacheChunk;
 import org.apache.hadoop.hive.ql.io.orc.encoded.Reader.OrcEncodedColumnBatch;
+import org.apache.hadoop.hive.ql.io.orc.encoded.StoppableAllocator;
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
 import org.apache.hadoop.hive.serde2.Deserializer;
 import org.apache.hadoop.hive.serde2.SerDeException;
@@ -159,7 +162,7 @@ public class SerDeEncodedDataReader extends 
CallableWithNdc<Void>
   private final Object fileKey;
   private final FileSystem fs;
 
-  private volatile boolean isStopped = false;
+  private AtomicBoolean isStopped = new AtomicBoolean(false);
   private final Deserializer sourceSerDe;
   private final InputFormat<?, ?> sourceInputFormat;
   private final Reporter reporter;
@@ -240,7 +243,7 @@ public class SerDeEncodedDataReader extends 
CallableWithNdc<Void>
   @Override
   public void stop() {
     LlapIoImpl.LOG.debug("Encoded reader is being stopped");
-    isStopped = true;
+    isStopped.set(true);
   }
 
   @Override
@@ -338,14 +341,16 @@ public class SerDeEncodedDataReader extends 
CallableWithNdc<Void>
     private final Map<StreamName, OutputReceiver> streams = new HashMap<>();
     private final Map<Integer, List<CacheOutputReceiver>> colStreams = new 
HashMap<>();
     private final boolean doesSourceHaveIncludes;
+    private final AtomicBoolean isStopped;
 
     public CacheWriter(BufferUsageManager bufferManager, List<Integer> 
columnIds,
-        boolean[] writerIncludes, boolean doesSourceHaveIncludes) {
+        boolean[] writerIncludes, boolean doesSourceHaveIncludes, 
AtomicBoolean isStopped) {
       this.bufferManager = bufferManager;
       assert writerIncludes != null; // Taken care of on higher level.
       this.writerIncludes = writerIncludes;
       this.doesSourceHaveIncludes = doesSourceHaveIncludes;
       this.columnIds = columnIds;
+      this.isStopped = isStopped;
       startStripe();
     }
 
@@ -433,7 +438,7 @@ public class SerDeEncodedDataReader extends 
CallableWithNdc<Void>
         if (LlapIoImpl.LOG.isTraceEnabled()) {
           LlapIoImpl.LOG.trace("Creating cache receiver for " + name);
         }
-        CacheOutputReceiver cor = new CacheOutputReceiver(bufferManager, name);
+        CacheOutputReceiver cor = new CacheOutputReceiver(bufferManager, name, 
isStopped);
         or = cor;
         List<CacheOutputReceiver> list = colStreams.get(name.getColumn());
         if (list == null) {
@@ -567,10 +572,16 @@ public class SerDeEncodedDataReader extends 
CallableWithNdc<Void>
     private List<MemoryBuffer> buffers = null;
     private int lastBufferPos = -1;
     private boolean suppressed = false;
+    private final AtomicBoolean isStopped;
+    private final StoppableAllocator allocator;
 
-    public CacheOutputReceiver(BufferUsageManager bufferManager, StreamName 
name) {
+    public CacheOutputReceiver(
+        BufferUsageManager bufferManager, StreamName name, AtomicBoolean 
isStopped) {
       this.bufferManager = bufferManager;
+      Allocator alloc = bufferManager.getAllocator();
+      this.allocator = alloc instanceof StoppableAllocator ? 
(StoppableAllocator) alloc : null;
       this.name = name;
+      this.isStopped = isStopped;
     }
 
     public void clear() {
@@ -585,6 +596,15 @@ public class SerDeEncodedDataReader extends 
CallableWithNdc<Void>
       lastBufferPos = -1;
     }
 
+    private void allocateMultiple(MemoryBuffer[] dest, int size) {
+      if (allocator != null) {
+        allocator.allocateMultiple(dest, size, isStopped);
+      } else {
+        bufferManager.getAllocator().allocateMultiple(dest, size);
+      }
+    }
+
+
     @Override
     public void output(ByteBuffer buffer) throws IOException {
       // TODO: avoid put() by working directly in OutStream?
@@ -608,7 +628,7 @@ public class SerDeEncodedDataReader extends 
CallableWithNdc<Void>
       boolean isNewBuffer = (lastBufferPos == -1);
       if (isNewBuffer) {
         MemoryBuffer[] dest = new MemoryBuffer[1];
-        bufferManager.getAllocator().allocateMultiple(dest, size);
+        allocateMultiple(dest, size);
         LlapDataBuffer newBuffer = (LlapDataBuffer)dest[0];
         bb = newBuffer.getByteBufferRaw();
         lastBufferPos = bb.position();
@@ -1384,7 +1404,7 @@ public class SerDeEncodedDataReader extends 
CallableWithNdc<Void>
       // TODO: move this into ctor? EW would need to create CacheWriter then
       List<Integer> cwColIds = writer.isOnlyWritingIncludedColumns() ? 
splitColumnIds : columnIds;
       writer.init(new CacheWriter(bufferManager, cwColIds, splitIncludes,
-          writer.isOnlyWritingIncludedColumns()), daemonConf, split.getPath());
+          writer.isOnlyWritingIncludedColumns(), isStopped), daemonConf, 
split.getPath());
       if (writer instanceof VectorDeserializeOrcWriter) {
         VectorDeserializeOrcWriter asyncWriter = 
(VectorDeserializeOrcWriter)writer;
         asyncWriter.startAsync(new AsyncCacheDataCallback());
@@ -1640,7 +1660,7 @@ public class SerDeEncodedDataReader extends 
CallableWithNdc<Void>
   }
 
   private boolean processStop() {
-    if (!isStopped) return false;
+    if (!isStopped.get()) return false;
     LlapIoImpl.LOG.info("SerDe-based data reader is stopping");
     cleanup(true);
     return true;

http://git-wip-us.apache.org/repos/asf/hive/blob/d988d4ae/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcMetadataCache.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcMetadataCache.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcMetadataCache.java
index 601b622..6c81e5b 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcMetadataCache.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcMetadataCache.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.hive.llap.cache.LlapCacheableBuffer;
 
 import java.io.IOException;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.hadoop.hive.common.io.DiskRange;
 import org.apache.hadoop.hive.common.io.DiskRangeList;
@@ -49,17 +50,17 @@ public class OrcMetadataCache implements LlapOomDebugDump {
         ? new ConcurrentHashMap<Object, OrcFileEstimateErrors>() : null;
   }
 
-  public OrcFileMetadata putFileMetadata(OrcFileMetadata metaData) {
+  public OrcFileMetadata putFileMetadata(OrcFileMetadata metaData, 
AtomicBoolean isStopped) {
     long memUsage = metaData.getMemoryUsage();
-    memoryManager.reserveMemory(memUsage);
+    memoryManager.reserveMemory(memUsage, isStopped);
     OrcFileMetadata val = metadata.putIfAbsent(metaData.getFileKey(), 
metaData);
     // See OrcFileMetadata; it is always unlocked, so we just "touch" it here 
to simulate use.
     return touchOnPut(metaData, val, memUsage);
   }
 
-  public OrcStripeMetadata putStripeMetadata(OrcStripeMetadata metaData) {
+  public OrcStripeMetadata putStripeMetadata(OrcStripeMetadata metaData, 
AtomicBoolean isStopped) {
     long memUsage = metaData.getMemoryUsage();
-    memoryManager.reserveMemory(memUsage);
+    memoryManager.reserveMemory(memUsage, isStopped);
     OrcStripeMetadata val = stripeMetadata.putIfAbsent(metaData.getKey(), 
metaData);
     // See OrcStripeMetadata; it is always unlocked, so we just "touch" it 
here to simulate use.
     return touchOnPut(metaData, val, memUsage);
@@ -78,7 +79,8 @@ public class OrcMetadataCache implements LlapOomDebugDump {
   }
 
 
-  public void putIncompleteCbs(Object fileKey, DiskRange[] ranges, long 
baseOffset) {
+  public void putIncompleteCbs(
+      Object fileKey, DiskRange[] ranges, long baseOffset, AtomicBoolean 
isStopped) {
     if (estimateErrors == null) return;
     OrcFileEstimateErrors errorData = estimateErrors.get(fileKey);
     boolean isNew = false;
@@ -90,7 +92,7 @@ public class OrcMetadataCache implements LlapOomDebugDump {
         errorData.addError(range.getOffset(), range.getLength(), baseOffset);
       }
       long memUsage = errorData.estimateMemoryUsage();
-      memoryManager.reserveMemory(memUsage);
+      memoryManager.reserveMemory(memUsage, isStopped);
       OrcFileEstimateErrors old = estimateErrors.putIfAbsent(fileKey, 
errorData);
       if (old != null) {
         errorData = old;

http://git-wip-us.apache.org/repos/asf/hive/blob/d988d4ae/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java
 
b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java
index a6080e6..390b34b 100644
--- 
a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java
+++ 
b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java
@@ -27,6 +27,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.FutureTask;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import 
org.apache.hadoop.hive.common.io.Allocator.AllocatorOutOfMemoryException;
 import org.apache.hadoop.hive.common.io.encoded.MemoryBuffer;
@@ -58,7 +59,7 @@ public class TestBuddyAllocator {
 
   private static class DummyMemoryManager implements MemoryManager {
     @Override
-    public void reserveMemory(long memoryToReserve) {
+    public void reserveMemory(long memoryToReserve, AtomicBoolean isStopped) {
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/d988d4ae/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java
 
b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java
index 0cce624..210cbb0 100644
--- 
a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java
+++ 
b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java
@@ -86,7 +86,7 @@ public class TestLowLevelLrfuCachePolicy {
       listLock.unlock();
     }
     // Now try to evict with locked buffer still in the list.
-    mm.reserveMemory(1, false);
+    mm.reserveMemory(1, false, null);
     assertSame(buffer2, et.evicted.get(0));
     unlock(lrfu, buffer1);
   }
@@ -198,7 +198,7 @@ public class TestLowLevelLrfuCachePolicy {
     // Lock the lowest priority buffer; try to evict - we'll evict some other 
buffer.
     LlapDataBuffer locked = inserted.get(0);
     lock(lrfu, locked);
-    mm.reserveMemory(1, false);
+    mm.reserveMemory(1, false, null);
     LlapDataBuffer evicted = et.evicted.get(0);
     assertNotNull(evicted);
     assertTrue(evicted.isInvalid());
@@ -264,7 +264,7 @@ public class TestLowLevelLrfuCachePolicy {
   // Buffers in test are fakes not linked to cache; notify cache policy 
explicitly.
   public boolean cache(LowLevelCacheMemoryManager mm,
       LowLevelLrfuCachePolicy lrfu, EvictionTracker et, LlapDataBuffer buffer) 
{
-    if (mm != null && !mm.reserveMemory(1, false)) {
+    if (mm != null && !mm.reserveMemory(1, false, null)) {
       return false;
     }
     buffer.incRef();
@@ -353,7 +353,7 @@ public class TestLowLevelLrfuCachePolicy {
       lock(lrfu, buf);
     }
     assertEquals(heapSize, m.cacheUsed.get());
-    assertFalse(mm.reserveMemory(1, false));
+    assertFalse(mm.reserveMemory(1, false, null));
     if (!et.evicted.isEmpty()) {
       assertTrue("Got " + et.evicted.get(0), et.evicted.isEmpty());
     }
@@ -378,13 +378,13 @@ public class TestLowLevelLrfuCachePolicy {
     // Evict all blocks.
     et.evicted.clear();
     for (int i = 0; i < inserted.size(); ++i) {
-      assertTrue(mm.reserveMemory(1, false));
+      assertTrue(mm.reserveMemory(1, false, null));
       if (cacheUsed != null) {
         assertEquals(inserted.size(), cacheUsed.get());
       }
     }
     // The map should now be empty.
-    assertFalse(mm.reserveMemory(1, false));
+    assertFalse(mm.reserveMemory(1, false, null));
     if (cacheUsed != null) {
       assertEquals(inserted.size(), cacheUsed.get());
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/d988d4ae/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java
 
b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java
index 3059382..1d5954e 100644
--- 
a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java
+++ 
b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java
@@ -19,6 +19,8 @@ package org.apache.hadoop.hive.llap.cache;
 
 import static org.junit.Assert.*;
 
+import java.util.concurrent.atomic.AtomicBoolean;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hive.llap.cache.LowLevelCache.Priority;
@@ -76,7 +78,7 @@ public class TestOrcMetadataCache {
     int allocs = 0;
 
     @Override
-    public void reserveMemory(long memoryToReserve) {
+    public void reserveMemory(long memoryToReserve, AtomicBoolean isStopped) {
       ++allocs;
     }
 
@@ -110,31 +112,31 @@ public class TestOrcMetadataCache {
     DummyCachePolicy cp = new DummyCachePolicy();
     OrcMetadataCache cache = new OrcMetadataCache(mm, cp, false);
     OrcFileMetadata ofm1 = OrcFileMetadata.createDummy(1), ofm2 = 
OrcFileMetadata.createDummy(2);
-    assertSame(ofm1, cache.putFileMetadata(ofm1));
+    assertSame(ofm1, cache.putFileMetadata(ofm1, null));
     assertEquals(1, mm.allocs);
     cp.verifyEquals(1);
-    assertSame(ofm2, cache.putFileMetadata(ofm2));
+    assertSame(ofm2, cache.putFileMetadata(ofm2, null));
     assertEquals(2, mm.allocs);
     cp.verifyEquals(2);
     assertSame(ofm1, cache.getFileMetadata(1));
     assertSame(ofm2, cache.getFileMetadata(2));
     cp.verifyEquals(4);
     OrcFileMetadata ofm3 = OrcFileMetadata.createDummy(1);
-    assertSame(ofm1, cache.putFileMetadata(ofm3));
+    assertSame(ofm1, cache.putFileMetadata(ofm3, null));
     assertEquals(2, mm.allocs);
     cp.verifyEquals(5);
     assertSame(ofm1, cache.getFileMetadata(1));
     cp.verifyEquals(6);
 
     OrcStripeMetadata osm1 = OrcStripeMetadata.createDummy(1), osm2 = 
OrcStripeMetadata.createDummy(2);
-    assertSame(osm1, cache.putStripeMetadata(osm1));
+    assertSame(osm1, cache.putStripeMetadata(osm1, null));
     assertEquals(3, mm.allocs);
-    assertSame(osm2, cache.putStripeMetadata(osm2));
+    assertSame(osm2, cache.putStripeMetadata(osm2, null));
     assertEquals(4, mm.allocs);
     assertSame(osm1, cache.getStripeMetadata(osm1.getKey()));
     assertSame(osm2, cache.getStripeMetadata(osm2.getKey()));
     OrcStripeMetadata osm3 = OrcStripeMetadata.createDummy(1);
-    assertSame(osm1, cache.putStripeMetadata(osm3));
+    assertSame(osm1, cache.putStripeMetadata(osm3, null));
     assertEquals(4, mm.allocs);
     assertSame(osm1, cache.getStripeMetadata(osm3.getKey()));
     cp.verifyEquals(12);

http://git-wip-us.apache.org/repos/asf/hive/blob/d988d4ae/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReader.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReader.java 
b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReader.java
index 7540e72..4324c86 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReader.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.io.orc.encoded;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.orc.StripeInformation;
 import org.apache.hadoop.hive.ql.io.orc.encoded.Reader.OrcEncodedColumnBatch;
@@ -54,4 +55,6 @@ public interface EncodedReader {
    * to just checking the constant in the first place.
    */
   void setTracing(boolean isEnabled);
+
+  void setStopped(AtomicBoolean isStopped);
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/d988d4ae/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java 
b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
index 3ef03ea..5e718c3 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
@@ -24,11 +24,13 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.IdentityHashMap;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hive.common.Pool;
 import org.apache.hadoop.hive.common.Pool.PoolObjectHelper;
+import org.apache.hadoop.hive.common.io.Allocator;
 import org.apache.hadoop.hive.common.io.DataCache;
 import org.apache.hadoop.hive.common.io.DiskRange;
 import org.apache.hadoop.hive.common.io.DiskRangeList;
@@ -126,6 +128,8 @@ class EncodedReaderImpl implements EncodedReader {
   private final DataCache cacheWrapper;
   private boolean isTracingEnabled;
   private final IoTrace trace;
+  private AtomicBoolean isStopped;
+  private StoppableAllocator allocator;
 
   public EncodedReaderImpl(Object fileKey, List<OrcProto.Type> types, 
CompressionCodec codec,
       int bufferSize, long strideRate, DataCache cacheWrapper, DataReader 
dataReader,
@@ -136,6 +140,8 @@ class EncodedReaderImpl implements EncodedReader {
     this.bufferSize = bufferSize;
     this.rowIndexStride = strideRate;
     this.cacheWrapper = cacheWrapper;
+    Allocator alloc = cacheWrapper.getAllocator();
+    this.allocator = alloc instanceof StoppableAllocator ? 
(StoppableAllocator) alloc : null;
     this.dataReader = dataReader;
     this.trace = trace;
     if (POOLS != null) return;
@@ -805,7 +811,7 @@ class EncodedReaderImpl implements EncodedReader {
       targetBuffers[ix] = chunk.getBuffer();
       ++ix;
     }
-    cacheWrapper.getAllocator().allocateMultiple(targetBuffers, bufferSize);
+    allocateMultiple(targetBuffers, bufferSize);
 
     // 4. Now decompress (or copy) the data into cache buffers.
     for (ProcCacheChunk chunk : toDecompress) {
@@ -1067,8 +1073,7 @@ class EncodedReaderImpl implements EncodedReader {
       cacheKeys[ix] = chunk; // Relies on the fact that cache does not 
actually store these.
       ++ix;
     }
-    cacheWrapper.getAllocator().allocateMultiple(
-        targetBuffers, (int)(partCount == 1 ? streamLen : partSize));
+    allocateMultiple(targetBuffers, (int)(partCount == 1 ? streamLen : 
partSize));
 
     // 4. Now copy the data into cache buffers.
     ix = 0;
@@ -1120,7 +1125,7 @@ class EncodedReaderImpl implements EncodedReader {
     // non-cached. Since we are at the first gap, the previous stuff must be 
contiguous.
     singleAlloc[0] = null;
     trace.logPartialUncompressedData(partOffset, candidateEnd, true);
-    cacheWrapper.getAllocator().allocateMultiple(singleAlloc, 
(int)(candidateEnd - partOffset));
+    allocateMultiple(singleAlloc, (int)(candidateEnd - partOffset));
     MemoryBuffer buffer = singleAlloc[0];
     cacheWrapper.reuseBuffer(buffer);
     ByteBuffer dest = buffer.getByteBufferRaw();
@@ -1130,11 +1135,19 @@ class EncodedReaderImpl implements EncodedReader {
     return tcc;
   }
 
+  private void allocateMultiple(MemoryBuffer[] dest, int size) {
+    if (allocator != null) {
+      allocator.allocateMultiple(dest, size, isStopped);
+    } else {
+      cacheWrapper.getAllocator().allocateMultiple(dest, size);
+    }
+  }
+
   private CacheChunk copyAndReplaceUncompressedToNonCached(
       BufferChunk bc, DataCache cacheWrapper, MemoryBuffer[] singleAlloc) {
     singleAlloc[0] = null;
     trace.logPartialUncompressedData(bc.getOffset(), bc.getEnd(), false);
-    cacheWrapper.getAllocator().allocateMultiple(singleAlloc, bc.getLength());
+    allocateMultiple(singleAlloc, bc.getLength());
     MemoryBuffer buffer = singleAlloc[0];
     cacheWrapper.reuseBuffer(buffer);
     ByteBuffer dest = buffer.getByteBufferRaw();
@@ -1706,4 +1719,9 @@ class EncodedReaderImpl implements EncodedReader {
       });
     }
   }
+
+  @Override
+  public void setStopped(AtomicBoolean isStopped) {
+    this.isStopped = isStopped;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/d988d4ae/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/StoppableAllocator.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/StoppableAllocator.java 
b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/StoppableAllocator.java
new file mode 100644
index 0000000..2172bd2
--- /dev/null
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/StoppableAllocator.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.io.orc.encoded;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hadoop.hive.common.io.Allocator;
+import org.apache.hadoop.hive.common.io.encoded.MemoryBuffer;
+
+public interface StoppableAllocator extends Allocator {
+  /** Stoppable allocate method specific to branch-2. */
+  void allocateMultiple(MemoryBuffer[] dest, int size, AtomicBoolean isStopped)
+      throws AllocatorOutOfMemoryException;
+}
\ No newline at end of file

Reply via email to