Repository: hive Updated Branches: refs/heads/branch-2 da84a1d39 -> d988d4aef
HIVE-19704 : LLAP IO retries on branch-2 should be stoppable (Sergey Shelukhin, reviewed by Prasanth Jayachandran) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/d988d4ae Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/d988d4ae Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/d988d4ae Branch: refs/heads/branch-2 Commit: d988d4aef6405b18652cf1b7304f616894c72a8e Parents: da84a1d Author: sergey <ser...@apache.org> Authored: Tue May 29 13:28:42 2018 -0700 Committer: sergey <ser...@apache.org> Committed: Tue May 29 13:28:42 2018 -0700 ---------------------------------------------------------------------- .../hadoop/hive/llap/cache/BuddyAllocator.java | 14 ++++++-- .../llap/cache/LowLevelCacheMemoryManager.java | 23 ++++++++++--- .../hadoop/hive/llap/cache/MemoryManager.java | 4 ++- .../llap/io/encoded/OrcEncodedDataReader.java | 17 +++++---- .../llap/io/encoded/SerDeEncodedDataReader.java | 36 +++++++++++++++----- .../hive/llap/io/metadata/OrcMetadataCache.java | 14 ++++---- .../hive/llap/cache/TestBuddyAllocator.java | 3 +- .../llap/cache/TestLowLevelLrfuCachePolicy.java | 12 +++---- .../hive/llap/cache/TestOrcMetadataCache.java | 16 +++++---- .../hive/ql/io/orc/encoded/EncodedReader.java | 3 ++ .../ql/io/orc/encoded/EncodedReaderImpl.java | 28 ++++++++++++--- .../ql/io/orc/encoded/StoppableAllocator.java | 29 ++++++++++++++++ 12 files changed, 150 insertions(+), 49 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/d988d4ae/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java index 302918a..af9243a 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hive.llap.cache; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import com.google.common.annotations.VisibleForTesting; @@ -44,9 +45,10 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl; import org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheMetrics; +import org.apache.hadoop.hive.ql.io.orc.encoded.StoppableAllocator; public final class BuddyAllocator - implements EvictionAwareAllocator, BuddyAllocatorMXBean, LlapOomDebugDump { + implements EvictionAwareAllocator, StoppableAllocator, BuddyAllocatorMXBean, LlapOomDebugDump { private final Arena[] arenas; private final AtomicInteger allocatedArenas = new AtomicInteger(0); @@ -183,10 +185,16 @@ public final class BuddyAllocator metrics.incrAllocatedArena(); } - // TODO: would it make sense to return buffers asynchronously? + @Override public void allocateMultiple(MemoryBuffer[] dest, int size) throws AllocatorOutOfMemoryException { + allocateMultiple(dest, size, null); + } + + @Override + public void allocateMultiple(MemoryBuffer[] dest, int size, AtomicBoolean isStopped) + throws AllocatorOutOfMemoryException { assert size > 0 : "size is " + size; if (size > maxAllocation) { throw new RuntimeException("Trying to allocate " + size + "; max is " + maxAllocation); @@ -197,7 +205,7 @@ public final class BuddyAllocator int allocLog2 = freeListIx + minAllocLog2; int allocationSize = 1 << allocLog2; // TODO: reserving the entire thing is not ideal before we alloc anything. Interleave? - memoryManager.reserveMemory(dest.length << allocLog2); + memoryManager.reserveMemory(dest.length << allocLog2, isStopped); int destAllocIx = 0; for (int i = 0; i < dest.length; ++i) { if (dest[i] != null) continue; http://git-wip-us.apache.org/repos/asf/hive/blob/d988d4ae/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java index e331f1b..e30acb0 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.llap.cache; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl; @@ -53,18 +54,26 @@ public class LowLevelCacheMemoryManager implements MemoryManager { } } + public static class ReserveFailedException extends RuntimeException { + private static final long serialVersionUID = 1L; + public ReserveFailedException(AtomicBoolean isStopped) { + super("Cannot reserve memory" + + (Thread.currentThread().isInterrupted() ? "; thread interrupted" : "") + + ((isStopped != null && isStopped.get()) ? "; thread stopped" : "")); + } + } @Override - public void reserveMemory(final long memoryToReserve) { - boolean result = reserveMemory(memoryToReserve, true); + public void reserveMemory(final long memoryToReserve, AtomicBoolean isStopped) { + boolean result = reserveMemory(memoryToReserve, true, isStopped); if (result) return; // Can only happen if there's no evictor, or if thread is interrupted. - throw new RuntimeException("Cannot reserve memory" - + (Thread.currentThread().isInterrupted() ? "; thread interrupted" : "")); + throw new ReserveFailedException(isStopped); } @VisibleForTesting - public boolean reserveMemory(final long memoryToReserve, boolean waitForEviction) { + public boolean reserveMemory(final long memoryToReserve, + boolean waitForEviction, AtomicBoolean isStopped) { // TODO: if this cannot evict enough, it will spin infinitely. Terminate at some point? int badCallCount = 0; long evictedTotalMetric = 0, reservedTotalMetric = 0, remainingToReserve = memoryToReserve; @@ -100,6 +109,10 @@ public class LowLevelCacheMemoryManager implements MemoryManager { result = false; break; } + if (isStopped != null && isStopped.get()) { + result = false; + break; + } continue; } http://git-wip-us.apache.org/repos/asf/hive/blob/d988d4ae/llap-server/src/java/org/apache/hadoop/hive/llap/cache/MemoryManager.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/MemoryManager.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/MemoryManager.java index 0f4d3c0..e1133cd 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/MemoryManager.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/MemoryManager.java @@ -18,10 +18,12 @@ package org.apache.hadoop.hive.llap.cache; +import java.util.concurrent.atomic.AtomicBoolean; + public interface MemoryManager extends LlapOomDebugDump { void releaseMemory(long memUsage); void updateMaxSize(long maxSize); /** TODO: temporary method until we get a better allocator. */ long forceReservedMemory(int allocationSize, int count); - void reserveMemory(long memoryToReserve); + void reserveMemory(long memoryToReserve, AtomicBoolean isStopped); } http://git-wip-us.apache.org/repos/asf/hive/blob/d988d4ae/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java index 0fd8139..655ce83 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java @@ -24,6 +24,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.hive.llap.counters.LlapIOCounters; import org.apache.orc.TypeDescription; @@ -160,7 +161,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void> * Contains only stripes that are read, and only columns included. null => read all RGs. */ private boolean[][] stripeRgs; - private volatile boolean isStopped = false; + private AtomicBoolean isStopped = new AtomicBoolean(false); @SuppressWarnings("unused") private volatile boolean isPaused = false; @@ -226,7 +227,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void> @Override public void stop() { LOG.debug("Encoded reader is being stopped"); - isStopped = true; + isStopped.set(true); } @Override @@ -330,6 +331,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void> DataWrapperForOrc dw = new DataWrapperForOrc(); stripeReader = orcReader.encodedReader(fileKey, dw, dw, POOL_FACTORY, trace); stripeReader.setTracing(LlapIoImpl.ORC_LOGGER.isTraceEnabled()); + stripeReader.setStopped(isStopped); } catch (Throwable t) { handleReaderError(startTime, t); return null; @@ -383,7 +385,8 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void> orcReader.getSchema(), orcReader.getWriterVersion()); counters.incrTimeCounter(LlapIOCounters.HDFS_TIME_NS, startTimeHdfs); if (hasFileId && metadataCache != null) { - OrcStripeMetadata newMetadata = metadataCache.putStripeMetadata(stripeMetadata); + OrcStripeMetadata newMetadata = metadataCache.putStripeMetadata( + stripeMetadata, isStopped); isFoundInCache = newMetadata != stripeMetadata; // May be cached concurrently. stripeMetadata = newMetadata; if (LlapIoImpl.ORC_LOGGER.isTraceEnabled()) { @@ -510,7 +513,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void> } private boolean processStop() { - if (!isStopped) return false; + if (!isStopped.get()) return false; LOG.info("Encoded data reader is stopping"); tracePool.offer(trace); cleanupReaders(); @@ -620,7 +623,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void> // We assume this call doesn't touch HDFS because everything is already read; don't add time. metadata = new OrcFileMetadata(fileKey, orcReader); if (fileKey == null || metadataCache == null) return metadata; - return metadataCache.putFileMetadata(metadata); + return metadataCache.putFileMetadata(metadata, isStopped); } /** @@ -649,7 +652,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void> orcReader.getWriterVersion()); counters.incrTimeCounter(LlapIOCounters.HDFS_TIME_NS, startTime); if (hasFileId && metadataCache != null) { - value = metadataCache.putStripeMetadata(value); + value = metadataCache.putStripeMetadata(value, isStopped); if (LlapIoImpl.ORC_LOGGER.isTraceEnabled()) { LlapIoImpl.ORC_LOGGER.trace("Caching stripe {} metadata with includes: {}", stripeKey.stripeIx, DebugUtils.toString(globalInc)); @@ -862,7 +865,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void> return lowLevelCache.putFileData( fileKey, ranges, data, baseOffset, Priority.NORMAL, counters); } else if (metadataCache != null) { - metadataCache.putIncompleteCbs(fileKey, ranges, baseOffset); + metadataCache.putIncompleteCbs(fileKey, ranges, baseOffset, isStopped); } return null; } http://git-wip-us.apache.org/repos/asf/hive/blob/d988d4ae/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java index a088e27..35d6178 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java @@ -29,11 +29,13 @@ import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.Pool.PoolObjectHelper; +import org.apache.hadoop.hive.common.io.Allocator; import org.apache.hadoop.hive.common.io.DataCache.BooleanRef; import org.apache.hadoop.hive.common.io.DiskRangeList; import org.apache.hadoop.hive.common.io.DataCache.DiskRangeListFactory; @@ -66,6 +68,7 @@ import org.apache.hadoop.hive.ql.io.orc.Reader; import org.apache.hadoop.hive.ql.io.orc.Writer; import org.apache.hadoop.hive.ql.io.orc.encoded.CacheChunk; import org.apache.hadoop.hive.ql.io.orc.encoded.Reader.OrcEncodedColumnBatch; +import org.apache.hadoop.hive.ql.io.orc.encoded.StoppableAllocator; import org.apache.hadoop.hive.ql.plan.PartitionDesc; import org.apache.hadoop.hive.serde2.Deserializer; import org.apache.hadoop.hive.serde2.SerDeException; @@ -159,7 +162,7 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void> private final Object fileKey; private final FileSystem fs; - private volatile boolean isStopped = false; + private AtomicBoolean isStopped = new AtomicBoolean(false); private final Deserializer sourceSerDe; private final InputFormat<?, ?> sourceInputFormat; private final Reporter reporter; @@ -240,7 +243,7 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void> @Override public void stop() { LlapIoImpl.LOG.debug("Encoded reader is being stopped"); - isStopped = true; + isStopped.set(true); } @Override @@ -338,14 +341,16 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void> private final Map<StreamName, OutputReceiver> streams = new HashMap<>(); private final Map<Integer, List<CacheOutputReceiver>> colStreams = new HashMap<>(); private final boolean doesSourceHaveIncludes; + private final AtomicBoolean isStopped; public CacheWriter(BufferUsageManager bufferManager, List<Integer> columnIds, - boolean[] writerIncludes, boolean doesSourceHaveIncludes) { + boolean[] writerIncludes, boolean doesSourceHaveIncludes, AtomicBoolean isStopped) { this.bufferManager = bufferManager; assert writerIncludes != null; // Taken care of on higher level. this.writerIncludes = writerIncludes; this.doesSourceHaveIncludes = doesSourceHaveIncludes; this.columnIds = columnIds; + this.isStopped = isStopped; startStripe(); } @@ -433,7 +438,7 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void> if (LlapIoImpl.LOG.isTraceEnabled()) { LlapIoImpl.LOG.trace("Creating cache receiver for " + name); } - CacheOutputReceiver cor = new CacheOutputReceiver(bufferManager, name); + CacheOutputReceiver cor = new CacheOutputReceiver(bufferManager, name, isStopped); or = cor; List<CacheOutputReceiver> list = colStreams.get(name.getColumn()); if (list == null) { @@ -567,10 +572,16 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void> private List<MemoryBuffer> buffers = null; private int lastBufferPos = -1; private boolean suppressed = false; + private final AtomicBoolean isStopped; + private final StoppableAllocator allocator; - public CacheOutputReceiver(BufferUsageManager bufferManager, StreamName name) { + public CacheOutputReceiver( + BufferUsageManager bufferManager, StreamName name, AtomicBoolean isStopped) { this.bufferManager = bufferManager; + Allocator alloc = bufferManager.getAllocator(); + this.allocator = alloc instanceof StoppableAllocator ? (StoppableAllocator) alloc : null; this.name = name; + this.isStopped = isStopped; } public void clear() { @@ -585,6 +596,15 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void> lastBufferPos = -1; } + private void allocateMultiple(MemoryBuffer[] dest, int size) { + if (allocator != null) { + allocator.allocateMultiple(dest, size, isStopped); + } else { + bufferManager.getAllocator().allocateMultiple(dest, size); + } + } + + @Override public void output(ByteBuffer buffer) throws IOException { // TODO: avoid put() by working directly in OutStream? @@ -608,7 +628,7 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void> boolean isNewBuffer = (lastBufferPos == -1); if (isNewBuffer) { MemoryBuffer[] dest = new MemoryBuffer[1]; - bufferManager.getAllocator().allocateMultiple(dest, size); + allocateMultiple(dest, size); LlapDataBuffer newBuffer = (LlapDataBuffer)dest[0]; bb = newBuffer.getByteBufferRaw(); lastBufferPos = bb.position(); @@ -1384,7 +1404,7 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void> // TODO: move this into ctor? EW would need to create CacheWriter then List<Integer> cwColIds = writer.isOnlyWritingIncludedColumns() ? splitColumnIds : columnIds; writer.init(new CacheWriter(bufferManager, cwColIds, splitIncludes, - writer.isOnlyWritingIncludedColumns()), daemonConf, split.getPath()); + writer.isOnlyWritingIncludedColumns(), isStopped), daemonConf, split.getPath()); if (writer instanceof VectorDeserializeOrcWriter) { VectorDeserializeOrcWriter asyncWriter = (VectorDeserializeOrcWriter)writer; asyncWriter.startAsync(new AsyncCacheDataCallback()); @@ -1640,7 +1660,7 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void> } private boolean processStop() { - if (!isStopped) return false; + if (!isStopped.get()) return false; LlapIoImpl.LOG.info("SerDe-based data reader is stopping"); cleanup(true); return true; http://git-wip-us.apache.org/repos/asf/hive/blob/d988d4ae/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcMetadataCache.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcMetadataCache.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcMetadataCache.java index 601b622..6c81e5b 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcMetadataCache.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcMetadataCache.java @@ -22,6 +22,7 @@ import org.apache.hadoop.hive.llap.cache.LlapCacheableBuffer; import java.io.IOException; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.hive.common.io.DiskRange; import org.apache.hadoop.hive.common.io.DiskRangeList; @@ -49,17 +50,17 @@ public class OrcMetadataCache implements LlapOomDebugDump { ? new ConcurrentHashMap<Object, OrcFileEstimateErrors>() : null; } - public OrcFileMetadata putFileMetadata(OrcFileMetadata metaData) { + public OrcFileMetadata putFileMetadata(OrcFileMetadata metaData, AtomicBoolean isStopped) { long memUsage = metaData.getMemoryUsage(); - memoryManager.reserveMemory(memUsage); + memoryManager.reserveMemory(memUsage, isStopped); OrcFileMetadata val = metadata.putIfAbsent(metaData.getFileKey(), metaData); // See OrcFileMetadata; it is always unlocked, so we just "touch" it here to simulate use. return touchOnPut(metaData, val, memUsage); } - public OrcStripeMetadata putStripeMetadata(OrcStripeMetadata metaData) { + public OrcStripeMetadata putStripeMetadata(OrcStripeMetadata metaData, AtomicBoolean isStopped) { long memUsage = metaData.getMemoryUsage(); - memoryManager.reserveMemory(memUsage); + memoryManager.reserveMemory(memUsage, isStopped); OrcStripeMetadata val = stripeMetadata.putIfAbsent(metaData.getKey(), metaData); // See OrcStripeMetadata; it is always unlocked, so we just "touch" it here to simulate use. return touchOnPut(metaData, val, memUsage); @@ -78,7 +79,8 @@ public class OrcMetadataCache implements LlapOomDebugDump { } - public void putIncompleteCbs(Object fileKey, DiskRange[] ranges, long baseOffset) { + public void putIncompleteCbs( + Object fileKey, DiskRange[] ranges, long baseOffset, AtomicBoolean isStopped) { if (estimateErrors == null) return; OrcFileEstimateErrors errorData = estimateErrors.get(fileKey); boolean isNew = false; @@ -90,7 +92,7 @@ public class OrcMetadataCache implements LlapOomDebugDump { errorData.addError(range.getOffset(), range.getLength(), baseOffset); } long memUsage = errorData.estimateMemoryUsage(); - memoryManager.reserveMemory(memUsage); + memoryManager.reserveMemory(memUsage, isStopped); OrcFileEstimateErrors old = estimateErrors.putIfAbsent(fileKey, errorData); if (old != null) { errorData = old; http://git-wip-us.apache.org/repos/asf/hive/blob/d988d4ae/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java ---------------------------------------------------------------------- diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java index a6080e6..390b34b 100644 --- a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java +++ b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java @@ -27,6 +27,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.FutureTask; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.hive.common.io.Allocator.AllocatorOutOfMemoryException; import org.apache.hadoop.hive.common.io.encoded.MemoryBuffer; @@ -58,7 +59,7 @@ public class TestBuddyAllocator { private static class DummyMemoryManager implements MemoryManager { @Override - public void reserveMemory(long memoryToReserve) { + public void reserveMemory(long memoryToReserve, AtomicBoolean isStopped) { } @Override http://git-wip-us.apache.org/repos/asf/hive/blob/d988d4ae/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java ---------------------------------------------------------------------- diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java index 0cce624..210cbb0 100644 --- a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java +++ b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java @@ -86,7 +86,7 @@ public class TestLowLevelLrfuCachePolicy { listLock.unlock(); } // Now try to evict with locked buffer still in the list. - mm.reserveMemory(1, false); + mm.reserveMemory(1, false, null); assertSame(buffer2, et.evicted.get(0)); unlock(lrfu, buffer1); } @@ -198,7 +198,7 @@ public class TestLowLevelLrfuCachePolicy { // Lock the lowest priority buffer; try to evict - we'll evict some other buffer. LlapDataBuffer locked = inserted.get(0); lock(lrfu, locked); - mm.reserveMemory(1, false); + mm.reserveMemory(1, false, null); LlapDataBuffer evicted = et.evicted.get(0); assertNotNull(evicted); assertTrue(evicted.isInvalid()); @@ -264,7 +264,7 @@ public class TestLowLevelLrfuCachePolicy { // Buffers in test are fakes not linked to cache; notify cache policy explicitly. public boolean cache(LowLevelCacheMemoryManager mm, LowLevelLrfuCachePolicy lrfu, EvictionTracker et, LlapDataBuffer buffer) { - if (mm != null && !mm.reserveMemory(1, false)) { + if (mm != null && !mm.reserveMemory(1, false, null)) { return false; } buffer.incRef(); @@ -353,7 +353,7 @@ public class TestLowLevelLrfuCachePolicy { lock(lrfu, buf); } assertEquals(heapSize, m.cacheUsed.get()); - assertFalse(mm.reserveMemory(1, false)); + assertFalse(mm.reserveMemory(1, false, null)); if (!et.evicted.isEmpty()) { assertTrue("Got " + et.evicted.get(0), et.evicted.isEmpty()); } @@ -378,13 +378,13 @@ public class TestLowLevelLrfuCachePolicy { // Evict all blocks. et.evicted.clear(); for (int i = 0; i < inserted.size(); ++i) { - assertTrue(mm.reserveMemory(1, false)); + assertTrue(mm.reserveMemory(1, false, null)); if (cacheUsed != null) { assertEquals(inserted.size(), cacheUsed.get()); } } // The map should now be empty. - assertFalse(mm.reserveMemory(1, false)); + assertFalse(mm.reserveMemory(1, false, null)); if (cacheUsed != null) { assertEquals(inserted.size(), cacheUsed.get()); } http://git-wip-us.apache.org/repos/asf/hive/blob/d988d4ae/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java ---------------------------------------------------------------------- diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java index 3059382..1d5954e 100644 --- a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java +++ b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java @@ -19,6 +19,8 @@ package org.apache.hadoop.hive.llap.cache; import static org.junit.Assert.*; +import java.util.concurrent.atomic.AtomicBoolean; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.llap.cache.LowLevelCache.Priority; @@ -76,7 +78,7 @@ public class TestOrcMetadataCache { int allocs = 0; @Override - public void reserveMemory(long memoryToReserve) { + public void reserveMemory(long memoryToReserve, AtomicBoolean isStopped) { ++allocs; } @@ -110,31 +112,31 @@ public class TestOrcMetadataCache { DummyCachePolicy cp = new DummyCachePolicy(); OrcMetadataCache cache = new OrcMetadataCache(mm, cp, false); OrcFileMetadata ofm1 = OrcFileMetadata.createDummy(1), ofm2 = OrcFileMetadata.createDummy(2); - assertSame(ofm1, cache.putFileMetadata(ofm1)); + assertSame(ofm1, cache.putFileMetadata(ofm1, null)); assertEquals(1, mm.allocs); cp.verifyEquals(1); - assertSame(ofm2, cache.putFileMetadata(ofm2)); + assertSame(ofm2, cache.putFileMetadata(ofm2, null)); assertEquals(2, mm.allocs); cp.verifyEquals(2); assertSame(ofm1, cache.getFileMetadata(1)); assertSame(ofm2, cache.getFileMetadata(2)); cp.verifyEquals(4); OrcFileMetadata ofm3 = OrcFileMetadata.createDummy(1); - assertSame(ofm1, cache.putFileMetadata(ofm3)); + assertSame(ofm1, cache.putFileMetadata(ofm3, null)); assertEquals(2, mm.allocs); cp.verifyEquals(5); assertSame(ofm1, cache.getFileMetadata(1)); cp.verifyEquals(6); OrcStripeMetadata osm1 = OrcStripeMetadata.createDummy(1), osm2 = OrcStripeMetadata.createDummy(2); - assertSame(osm1, cache.putStripeMetadata(osm1)); + assertSame(osm1, cache.putStripeMetadata(osm1, null)); assertEquals(3, mm.allocs); - assertSame(osm2, cache.putStripeMetadata(osm2)); + assertSame(osm2, cache.putStripeMetadata(osm2, null)); assertEquals(4, mm.allocs); assertSame(osm1, cache.getStripeMetadata(osm1.getKey())); assertSame(osm2, cache.getStripeMetadata(osm2.getKey())); OrcStripeMetadata osm3 = OrcStripeMetadata.createDummy(1); - assertSame(osm1, cache.putStripeMetadata(osm3)); + assertSame(osm1, cache.putStripeMetadata(osm3, null)); assertEquals(4, mm.allocs); assertSame(osm1, cache.getStripeMetadata(osm3.getKey())); cp.verifyEquals(12); http://git-wip-us.apache.org/repos/asf/hive/blob/d988d4ae/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReader.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReader.java index 7540e72..4324c86 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReader.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.io.orc.encoded; import java.io.IOException; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.orc.StripeInformation; import org.apache.hadoop.hive.ql.io.orc.encoded.Reader.OrcEncodedColumnBatch; @@ -54,4 +55,6 @@ public interface EncodedReader { * to just checking the constant in the first place. */ void setTracing(boolean isEnabled); + + void setStopped(AtomicBoolean isStopped); } http://git-wip-us.apache.org/repos/asf/hive/blob/d988d4ae/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java index 3ef03ea..5e718c3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java @@ -24,11 +24,13 @@ import java.util.ArrayList; import java.util.Collection; import java.util.IdentityHashMap; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.common.Pool; import org.apache.hadoop.hive.common.Pool.PoolObjectHelper; +import org.apache.hadoop.hive.common.io.Allocator; import org.apache.hadoop.hive.common.io.DataCache; import org.apache.hadoop.hive.common.io.DiskRange; import org.apache.hadoop.hive.common.io.DiskRangeList; @@ -126,6 +128,8 @@ class EncodedReaderImpl implements EncodedReader { private final DataCache cacheWrapper; private boolean isTracingEnabled; private final IoTrace trace; + private AtomicBoolean isStopped; + private StoppableAllocator allocator; public EncodedReaderImpl(Object fileKey, List<OrcProto.Type> types, CompressionCodec codec, int bufferSize, long strideRate, DataCache cacheWrapper, DataReader dataReader, @@ -136,6 +140,8 @@ class EncodedReaderImpl implements EncodedReader { this.bufferSize = bufferSize; this.rowIndexStride = strideRate; this.cacheWrapper = cacheWrapper; + Allocator alloc = cacheWrapper.getAllocator(); + this.allocator = alloc instanceof StoppableAllocator ? (StoppableAllocator) alloc : null; this.dataReader = dataReader; this.trace = trace; if (POOLS != null) return; @@ -805,7 +811,7 @@ class EncodedReaderImpl implements EncodedReader { targetBuffers[ix] = chunk.getBuffer(); ++ix; } - cacheWrapper.getAllocator().allocateMultiple(targetBuffers, bufferSize); + allocateMultiple(targetBuffers, bufferSize); // 4. Now decompress (or copy) the data into cache buffers. for (ProcCacheChunk chunk : toDecompress) { @@ -1067,8 +1073,7 @@ class EncodedReaderImpl implements EncodedReader { cacheKeys[ix] = chunk; // Relies on the fact that cache does not actually store these. ++ix; } - cacheWrapper.getAllocator().allocateMultiple( - targetBuffers, (int)(partCount == 1 ? streamLen : partSize)); + allocateMultiple(targetBuffers, (int)(partCount == 1 ? streamLen : partSize)); // 4. Now copy the data into cache buffers. ix = 0; @@ -1120,7 +1125,7 @@ class EncodedReaderImpl implements EncodedReader { // non-cached. Since we are at the first gap, the previous stuff must be contiguous. singleAlloc[0] = null; trace.logPartialUncompressedData(partOffset, candidateEnd, true); - cacheWrapper.getAllocator().allocateMultiple(singleAlloc, (int)(candidateEnd - partOffset)); + allocateMultiple(singleAlloc, (int)(candidateEnd - partOffset)); MemoryBuffer buffer = singleAlloc[0]; cacheWrapper.reuseBuffer(buffer); ByteBuffer dest = buffer.getByteBufferRaw(); @@ -1130,11 +1135,19 @@ class EncodedReaderImpl implements EncodedReader { return tcc; } + private void allocateMultiple(MemoryBuffer[] dest, int size) { + if (allocator != null) { + allocator.allocateMultiple(dest, size, isStopped); + } else { + cacheWrapper.getAllocator().allocateMultiple(dest, size); + } + } + private CacheChunk copyAndReplaceUncompressedToNonCached( BufferChunk bc, DataCache cacheWrapper, MemoryBuffer[] singleAlloc) { singleAlloc[0] = null; trace.logPartialUncompressedData(bc.getOffset(), bc.getEnd(), false); - cacheWrapper.getAllocator().allocateMultiple(singleAlloc, bc.getLength()); + allocateMultiple(singleAlloc, bc.getLength()); MemoryBuffer buffer = singleAlloc[0]; cacheWrapper.reuseBuffer(buffer); ByteBuffer dest = buffer.getByteBufferRaw(); @@ -1706,4 +1719,9 @@ class EncodedReaderImpl implements EncodedReader { }); } } + + @Override + public void setStopped(AtomicBoolean isStopped) { + this.isStopped = isStopped; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/d988d4ae/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/StoppableAllocator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/StoppableAllocator.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/StoppableAllocator.java new file mode 100644 index 0000000..2172bd2 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/StoppableAllocator.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.io.orc.encoded; + +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.hadoop.hive.common.io.Allocator; +import org.apache.hadoop.hive.common.io.encoded.MemoryBuffer; + +public interface StoppableAllocator extends Allocator { + /** Stoppable allocate method specific to branch-2. */ + void allocateMultiple(MemoryBuffer[] dest, int size, AtomicBoolean isStopped) + throws AllocatorOutOfMemoryException; +} \ No newline at end of file