Updates for memory issues (WIP)
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/70dddc54 Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/70dddc54 Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/70dddc54 Branch: refs/heads/master Commit: 70dddc54a73183e58f5493b13b1b19e51162f752 Parents: e80c32e Author: Jacques Nadeau <[email protected]> Authored: Mon Mar 3 22:22:59 2014 -0800 Committer: Steven Phillips <[email protected]> Committed: Sun May 4 11:55:52 2014 -0700 ---------------------------------------------------------------------- .../main/java/io/netty/buffer/PoolArenaL.java | 7 +- .../netty/buffer/PooledByteBufAllocatorL.java | 7 ++ .../drill/exec/memory/AccountingByteBuf.java | 4 +- .../org/apache/drill/exec/memory/Accountor.java | 11 ++- .../drill/exec/memory/AtomicRemainder.java | 93 ++++++++++++-------- .../drill/exec/memory/TopLevelAllocator.java | 40 ++++++--- .../drill/exec/rpc/ProtobufLengthDecoder.java | 2 +- .../exec/work/batch/ControlHandlerImpl.java | 2 +- 8 files changed, 104 insertions(+), 62 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/70dddc54/exec/bufferl/src/main/java/io/netty/buffer/PoolArenaL.java ---------------------------------------------------------------------- diff --git a/exec/bufferl/src/main/java/io/netty/buffer/PoolArenaL.java b/exec/bufferl/src/main/java/io/netty/buffer/PoolArenaL.java index aba2226..479fa80 100644 --- a/exec/bufferl/src/main/java/io/netty/buffer/PoolArenaL.java +++ b/exec/bufferl/src/main/java/io/netty/buffer/PoolArenaL.java @@ -125,13 +125,10 @@ abstract class PoolArenaL<T> { /** * Allocate a buffer from the current arena. - * Unlike netty.io buffers, this buffer can grow without bounds, - * but it will throw an exception if growth involves copying a page - * or more of data. Instead of being an upper bounds sanity check, + * Instead of being an upper bounds sanity check, * the "max" capacity is used to opportunistically allocate extra memory. * Later, the capacity can be reduced very efficiently. - * To avoid excessive copying, a buffer cannot grow if it must copy - * more than a single page of data. + * * @param cache TODO: not sure * @param minRequested The smallest capacity buffer we want * @param maxRequested If convenient, allocate up to this capacity http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/70dddc54/exec/bufferl/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java ---------------------------------------------------------------------- diff --git a/exec/bufferl/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java b/exec/bufferl/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java index 85522c1..bc2b137 100644 --- a/exec/bufferl/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java +++ b/exec/bufferl/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java @@ -228,6 +228,13 @@ public class PooledByteBufAllocatorL extends AbstractByteBufAllocator { } } + @Override + public ByteBuf directBuffer(int initialCapacity, int maxCapacity) { + if (initialCapacity == 0 && maxCapacity == 0) { + return newDirectBuffer(0,0); + } + return super.directBuffer(initialCapacity, maxCapacity); + } /** * Override the abstract allocator. Normally, the abstract allocator http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/70dddc54/exec/java-exec/src/main/java/org/apache/drill/exec/memory/AccountingByteBuf.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/AccountingByteBuf.java b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/AccountingByteBuf.java index 4df209f..f2d695e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/AccountingByteBuf.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/AccountingByteBuf.java @@ -46,7 +46,7 @@ public class AccountingByteBuf extends ByteBuf{ super(); this.b = b; this.acct = a; - this.size = b.maxCapacity(); + this.size = b.capacity(); } @Override @@ -83,7 +83,7 @@ public class AccountingByteBuf extends ByteBuf{ return this; }else if(newCapacity < size){ b.capacity(newCapacity); - int diff = size - b.maxCapacity(); + int diff = size - b.capacity(); acct.releasePartial(this, diff); this.size = size - diff; return this; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/70dddc54/exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java index bd40da3..0d19340 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java @@ -62,18 +62,20 @@ public class Accountor { } public boolean reserve(long size) { - return remainder.get(size); + //TODO: for now, we won't stop reservation. + remainder.get(size); + return true; } public void forceAdditionalReservation(long size) { - remainder.forceGet(size); + if(size > 0) remainder.forceGet(size); } public void reserved(long expected, AccountingByteBuf buf){ // make sure to take away the additional memory that happened due to rounding. long additional = buf.capacity() - expected; - remainder.forceGet(additional); + if(additional > 0) remainder.forceGet(additional); if (ENABLE_ACCOUNTING) { buffers.put(buf, new DebugStackTrace(buf.capacity(), Thread.currentThread().getStackTrace())); @@ -103,6 +105,7 @@ public class Accountor { } public void close() { + if (ENABLE_ACCOUNTING && !buffers.isEmpty()) { StringBuffer sb = new StringBuffer(); sb.append("Attempted to close accountor with "); @@ -144,7 +147,7 @@ public class Accountor { } remainder.close(); - + } private class DebugStackTrace { http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/70dddc54/exec/java-exec/src/main/java/org/apache/drill/exec/memory/AtomicRemainder.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/AtomicRemainder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/AtomicRemainder.java index 95e57d2..8476b53 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/AtomicRemainder.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/AtomicRemainder.java @@ -27,27 +27,30 @@ import java.util.concurrent.atomic.AtomicLong; public class AtomicRemainder { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AtomicRemainder.class); + private static final boolean DEBUG = true; + private final AtomicRemainder parent; - private final AtomicLong total; - private final AtomicLong unaccountable; - private final long max; - private final long pre; - private boolean closed = false; + private final AtomicLong availableShared; + private final AtomicLong availablePrivate; + private final long initTotal; + private final long initShared; + private final long initPrivate; public AtomicRemainder(AtomicRemainder parent, long max, long pre) { this.parent = parent; - this.total = new AtomicLong(max - pre); - this.unaccountable = new AtomicLong(pre); - this.max = max; - this.pre = pre; + this.availableShared = new AtomicLong(max - pre); + this.availablePrivate = new AtomicLong(pre); + this.initTotal = max; + this.initShared = max - pre; + this.initPrivate = pre; } public long getRemainder() { - return total.get() + unaccountable.get(); + return availableShared.get() + availablePrivate.get(); } public long getUsed() { - return max - getRemainder(); + return initTotal - getRemainder(); } /** @@ -57,41 +60,54 @@ public class AtomicRemainder { * @param size */ public void forceGet(long size) { - total.addAndGet(size); + if (DEBUG) + logger.info("Force get {}", size); + availableShared.addAndGet(size); if (parent != null) parent.forceGet(size); } public boolean get(long size) { - if (unaccountable.get() < 1) { + if (DEBUG) + logger.info("Get {}", size); + if (availablePrivate.get() < 1) { // if there is no preallocated memory, we can operate normally. - long outcome = total.addAndGet(-size); + + // attempt to get shared memory, if fails, return false. + long outcome = availableShared.addAndGet(-size); if (outcome < 0) { - total.addAndGet(size); + availableShared.addAndGet(size); return false; } else { return true; } + } else { // if there is preallocated memory, use that first. - long unaccount = unaccountable.getAndAdd(-size); - if (unaccount > -1) { + long unaccount = availablePrivate.addAndGet(-size); + if (unaccount >= 0) { return true; } else { + long additionalSpaceNeeded = -unaccount; // if there is a parent allocator, check it before allocating. - if (parent != null && !parent.get(-unaccount)) { - unaccountable.getAndAdd(size); + if (parent != null && !parent.get(additionalSpaceNeeded)) { + // parent allocation failed, return space to private pool. + availablePrivate.getAndAdd(size); return false; } - long account = total.addAndGet(unaccount); + // we got space from parent pool. lets make sure we have space locally available. + long account = availableShared.addAndGet(-additionalSpaceNeeded); if (account >= 0) { - unaccountable.getAndAdd(unaccount); + // we were succesful, move private back to zero (since we allocated using shared). + availablePrivate.addAndGet(additionalSpaceNeeded); return true; } else { - unaccountable.getAndAdd(size); - total.addAndGet(-unaccount); + // we failed to get space from available shared. Return allocations to initial state. + availablePrivate.addAndGet(size); + availableShared.addAndGet(additionalSpaceNeeded); + parent.returnAllocation(additionalSpaceNeeded); return false; } } @@ -106,20 +122,27 @@ public class AtomicRemainder { * @param size */ public void returnAllocation(long size) { - long preSize = unaccountable.get(); - long preChange = Math.min(size, pre - preSize); - long totalChange = size - preChange; - unaccountable.addAndGet(preChange); - total.addAndGet(totalChange); - if (parent != null){ - parent.returnAllocation(totalChange); + if (DEBUG) + logger.info("Return allocation {}", size); + long privateSize = availablePrivate.get(); + long privateChange = Math.min(size, initPrivate - privateSize); + long sharedChange = size - privateChange; + availablePrivate.addAndGet(privateChange); + availableShared.addAndGet(sharedChange); + if (parent != null) { + parent.returnAllocation(sharedChange); } } - public void close(){ - if(!closed){ - closed = true; -// if(parent != null) parent.returnAllocation(pre); - } + public void close() { + + if (availablePrivate.get() != initPrivate || availableShared.get() != initShared) + throw new IllegalStateException( + String + .format(ERROR, initPrivate, availablePrivate.get(), initPrivate - availablePrivate.get(), initShared, availableShared.get(), initShared - availableShared.get())); + + if(parent != null) parent.returnAllocation(initPrivate); } + + static final String ERROR = "Failure while closing accountor. Expected private and shared pools to be set to initial values. However, one or more were not. Stats are\n\tzone\tinit\tallocated\tdelta \n\tprivate\t%d\t%d\t%d \n\tshared\t%d\t%d\t%d."; } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/70dddc54/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java index 108eaec..e71c9c9 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java @@ -22,12 +22,18 @@ import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.PooledByteBufAllocatorL; import io.netty.buffer.PooledUnsafeDirectByteBufL; +import java.util.HashSet; +import java.util.Set; + import org.apache.drill.common.config.DrillConfig; import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; +import org.apache.drill.exec.util.AssertionUtil; public class TopLevelAllocator implements BufferAllocator { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TopLevelAllocator.class); + private static final boolean ENABLE_ACCOUNTING = AssertionUtil.isAssertionsEnabled(); + private final Set<ChildAllocator> children; private final PooledByteBufAllocatorL innerAllocator = new PooledByteBufAllocatorL(true); private final Accountor acct; @@ -37,14 +43,14 @@ public class TopLevelAllocator implements BufferAllocator { public TopLevelAllocator(long maximumAllocation) { this.acct = new Accountor(null, null, maximumAllocation, 0); + this.children = ENABLE_ACCOUNTING ? new HashSet<ChildAllocator>() : null; } public AccountingByteBuf buffer(int min, int max) { if(!acct.reserve(min)) return null; ByteBuf buffer = innerAllocator.directBuffer(min, max); - if(buffer.maxCapacity() > max) buffer.capacity(max); AccountingByteBuf wrapped = new AccountingByteBuf(acct, (PooledUnsafeDirectByteBufL) buffer); - acct.reserved(buffer.maxCapacity(), wrapped); + acct.reserved(buffer.capacity() - min, wrapped); return wrapped; } @@ -68,34 +74,37 @@ public class TopLevelAllocator implements BufferAllocator { if(!acct.reserve(initialReservation)){ throw new OutOfMemoryException(String.format("You attempted to create a new child allocator with initial reservation %d but only %d bytes of memory were available.", initialReservation, acct.getCapacity() - acct.getAllocation())); }; - return new ChildAllocator(handle, acct, initialReservation, maximumReservation); + ChildAllocator allocator = new ChildAllocator(handle, acct, initialReservation, maximumReservation); + if(ENABLE_ACCOUNTING) children.add(allocator); + return allocator; } @Override public void close() { + if(ENABLE_ACCOUNTING && !children.isEmpty()){ + throw new IllegalStateException("Failure while trying to close allocator: Child level allocators not closed."); + } acct.close(); } private class ChildAllocator implements BufferAllocator{ - private Accountor innerAcct; + private Accountor childAcct; public ChildAllocator(FragmentHandle handle, Accountor parentAccountor, long max, long pre) throws OutOfMemoryException{ - innerAcct = new Accountor(handle, parentAccountor, max, pre); + childAcct = new Accountor(handle, parentAccountor, max, pre); } - @Override public AccountingByteBuf buffer(int size, int max) { - if(!innerAcct.reserve(size)){ + if(!childAcct.reserve(size)){ return null; }; ByteBuf buffer = innerAllocator.directBuffer(size, max); - if(buffer.maxCapacity() > max) buffer.capacity(max); - AccountingByteBuf wrapped = new AccountingByteBuf(innerAcct, (PooledUnsafeDirectByteBufL) buffer); - innerAcct.reserved(buffer.maxCapacity(), wrapped); + AccountingByteBuf wrapped = new AccountingByteBuf(childAcct, (PooledUnsafeDirectByteBufL) buffer); + childAcct.reserved(buffer.capacity(), wrapped); return wrapped; } @@ -111,21 +120,24 @@ public class TopLevelAllocator implements BufferAllocator { @Override public BufferAllocator getChildAllocator(FragmentHandle handle, long initialReservation, long maximumReservation) throws OutOfMemoryException { - return new ChildAllocator(handle, innerAcct, maximumReservation, initialReservation); + if(!childAcct.reserve(initialReservation)){ + throw new OutOfMemoryException(String.format("You attempted to create a new child allocator with initial reservation %d but only %d bytes of memory were available.", initialReservation, childAcct.getCapacity() - childAcct.getAllocation())); + }; + return new ChildAllocator(handle, childAcct, maximumReservation, initialReservation); } public PreAllocator getNewPreAllocator(){ - return new PreAlloc(this.innerAcct); + return new PreAlloc(this.childAcct); } @Override public void close() { - innerAcct.close(); + childAcct.close(); } @Override public long getAllocatedMemory() { - return innerAcct.getAllocation(); + return childAcct.getAllocation(); } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/70dddc54/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ProtobufLengthDecoder.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ProtobufLengthDecoder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ProtobufLengthDecoder.java index 6fef7e5..23fa46d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ProtobufLengthDecoder.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ProtobufLengthDecoder.java @@ -80,7 +80,7 @@ public class ProtobufLengthDecoder extends ByteToMessageDecoder { // TODO: Can we avoid this copy? ByteBuf outBuf = allocator.buffer(length); if(outBuf == null){ - logger.debug("Failure allocating buffer on incoming stream due to memory limits."); + logger.warn("Failure allocating buffer on incoming stream due to memory limits. Current Allocation: {}.", allocator.getAllocatedMemory()); in.resetReaderIndex(); return; } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/70dddc54/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java index 92614ca..835adad 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java @@ -81,7 +81,7 @@ public class ControlHandlerImpl implements ControlMessageHandler { return DataRpcConfig.OK; } catch (OutOfMemoryException e) { - logger.error("Failure while attempting to start remote fragment.", fragment); + logger.error("Failure while attempting to start remote fragment.", fragment, e); return new Response(RpcType.ACK, Acks.FAIL); }
