[ https://issues.apache.org/jira/browse/DRILL-4134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15034200#comment-15034200 ]
ASF GitHub Bot commented on DRILL-4134: --------------------------------------- Github user julienledem commented on a diff in the pull request: https://github.com/apache/drill/pull/283#discussion_r46312830 --- Diff: exec/memory/base/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java --- @@ -23,193 +23,246 @@ import java.nio.ByteBuffer; import java.util.concurrent.atomic.AtomicLong; +import org.apache.drill.exec.exception.OutOfMemoryException; + import com.codahale.metrics.Gauge; import com.codahale.metrics.Histogram; import com.codahale.metrics.Metric; import com.codahale.metrics.MetricFilter; import com.codahale.metrics.MetricRegistry; -public class PooledByteBufAllocatorL extends PooledByteBufAllocator{ - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PooledByteBufAllocatorL.class); - +public class PooledByteBufAllocatorL { private static final org.slf4j.Logger memoryLogger = org.slf4j.LoggerFactory.getLogger("drill.allocator"); + private static final int MEMORY_LOGGER_FREQUENCY_SECONDS = 60; + private static final String METRIC_PREFIX = "drill.allocator."; + private final MetricRegistry registry; private final AtomicLong hugeBufferSize = new AtomicLong(0); private final AtomicLong hugeBufferCount = new AtomicLong(0); private final AtomicLong normalBufferSize = new AtomicLong(0); private final AtomicLong normalBufferCount = new AtomicLong(0); - private final PoolArena<ByteBuffer>[] directArenas; - private final MemoryStatusThread statusThread; - private final Histogram largeBuffersHist; - private final Histogram normalBuffersHist; + public final InnerAllocator allocator; + public final UnsafeDirectLittleEndian empty; public PooledByteBufAllocatorL(MetricRegistry registry) { - super(true); this.registry = registry; + allocator = new InnerAllocator(); + empty = new UnsafeDirectLittleEndian(new DuplicatedByteBuf(Unpooled.EMPTY_BUFFER)); + } + + public UnsafeDirectLittleEndian allocate(int size) { try { - Field f = PooledByteBufAllocator.class.getDeclaredField("directArenas"); - f.setAccessible(true); - this.directArenas = (PoolArena<ByteBuffer>[]) f.get(this); - } catch (Exception e) { - throw new RuntimeException("Failure while initializing allocator. Unable to retrieve direct arenas field.", e); + return allocator.directBuffer(size, size); + } catch (OutOfMemoryError e) { + throw new OutOfMemoryException("Failure allocating buffer.", e); } - if (memoryLogger.isTraceEnabled()) { - statusThread = new MemoryStatusThread(); - statusThread.start(); - } else { - statusThread = null; - } - removeOldMetrics(); + } - registry.register(METRIC_PREFIX + "normal.size", new Gauge<Long>() { - @Override - public Long getValue() { - return normalBufferSize.get(); - } - }); + public int getChunkSize() { + return allocator.chunkSize; + } - registry.register(METRIC_PREFIX + "normal.count", new Gauge<Long>() { - @Override - public Long getValue() { - return normalBufferCount.get(); - } - }); + private class InnerAllocator extends PooledByteBufAllocator { - registry.register(METRIC_PREFIX + "huge.size", new Gauge<Long>() { - @Override - public Long getValue() { - return hugeBufferSize.get(); - } - }); - registry.register(METRIC_PREFIX + "huge.count", new Gauge<Long>() { - @Override - public Long getValue() { - return hugeBufferCount.get(); - } - }); + private final PoolArena<ByteBuffer>[] directArenas; + private final MemoryStatusThread statusThread; + private final Histogram largeBuffersHist; + private final Histogram normalBuffersHist; + private final int chunkSize; - largeBuffersHist = registry.histogram(METRIC_PREFIX + "huge.hist"); - normalBuffersHist = registry.histogram(METRIC_PREFIX + "normal.hist"); + public InnerAllocator() { + super(true); - } + try { + Field f = PooledByteBufAllocator.class.getDeclaredField("directArenas"); + f.setAccessible(true); + this.directArenas = (PoolArena<ByteBuffer>[]) f.get(this); + } catch (Exception e) { + throw new RuntimeException("Failure while initializing allocator. Unable to retrieve direct arenas field.", e); + } - private synchronized void removeOldMetrics() { - registry.removeMatching(new MetricFilter() { - @Override - public boolean matches(String name, Metric metric) { - return name.startsWith("drill.allocator."); + this.chunkSize = directArenas[0].chunkSize; + + if (memoryLogger.isTraceEnabled()) { + statusThread = new MemoryStatusThread(); + statusThread.start(); + } else { + statusThread = null; } + removeOldMetrics(); - }); - } + registry.register(METRIC_PREFIX + "normal.size", new Gauge<Long>() { + @Override + public Long getValue() { + return normalBufferSize.get(); + } + }); - @Override - protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) { - throw new UnsupportedOperationException("Drill doesn't support using heap buffers."); - } + registry.register(METRIC_PREFIX + "normal.count", new Gauge<Long>() { + @Override + public Long getValue() { + return normalBufferCount.get(); + } + }); + + registry.register(METRIC_PREFIX + "huge.size", new Gauge<Long>() { + @Override + public Long getValue() { + return hugeBufferSize.get(); + } + }); - @Override - protected UnsafeDirectLittleEndian newDirectBuffer(int initialCapacity, int maxCapacity) { - PoolThreadCache cache = threadCache.get(); - PoolArena<ByteBuffer> directArena = cache.directArena; + registry.register(METRIC_PREFIX + "huge.count", new Gauge<Long>() { + @Override + public Long getValue() { + return hugeBufferCount.get(); + } + }); - if (directArena != null) { + largeBuffersHist = registry.histogram(METRIC_PREFIX + "huge.hist"); + normalBuffersHist = registry.histogram(METRIC_PREFIX + "normal.hist"); - if (initialCapacity > directArena.chunkSize) { - // This is beyond chunk size so we'll allocate separately. - ByteBuf buf = UnpooledByteBufAllocator.DEFAULT.directBuffer(initialCapacity, maxCapacity); + } - hugeBufferCount.incrementAndGet(); - hugeBufferSize.addAndGet(buf.capacity()); - largeBuffersHist.update(buf.capacity()); - // logger.debug("Allocating huge buffer of size {}", initialCapacity, new Exception()); - return new UnsafeDirectLittleEndian(new LargeBuffer(buf, hugeBufferSize, hugeBufferCount)); - } else { - // within chunk, use arena. - ByteBuf buf = directArena.allocate(cache, initialCapacity, maxCapacity); - if (!(buf instanceof PooledUnsafeDirectByteBuf)) { - fail(); + private synchronized void removeOldMetrics() { + registry.removeMatching(new MetricFilter() { + @Override + public boolean matches(String name, Metric metric) { + return name.startsWith("drill.allocator."); } - normalBuffersHist.update(buf.capacity()); - if (ASSERT_ENABLED) { - normalBufferSize.addAndGet(buf.capacity()); - normalBufferCount.incrementAndGet(); + }); + } + + private UnsafeDirectLittleEndian newDirectBufferL(int initialCapacity, int maxCapacity) { + PoolThreadCache cache = threadCache.get(); + PoolArena<ByteBuffer> directArena = cache.directArena; + + if (directArena != null) { + + if (initialCapacity > directArena.chunkSize) { + // This is beyond chunk size so we'll allocate separately. + ByteBuf buf = UnpooledByteBufAllocator.DEFAULT.directBuffer(initialCapacity, maxCapacity); + + hugeBufferCount.incrementAndGet(); + hugeBufferSize.addAndGet(buf.capacity()); + largeBuffersHist.update(buf.capacity()); + // logger.debug("Allocating huge buffer of size {}", initialCapacity, new Exception()); + return new UnsafeDirectLittleEndian(new LargeBuffer(buf, hugeBufferSize, hugeBufferCount)); + + } else { + // within chunk, use arena. + ByteBuf buf = directArena.allocate(cache, initialCapacity, maxCapacity); + if (!(buf instanceof PooledUnsafeDirectByteBuf)) { + fail(); + } + + normalBuffersHist.update(buf.capacity()); + if (ASSERT_ENABLED) { + normalBufferSize.addAndGet(buf.capacity()); + normalBufferCount.incrementAndGet(); + } + + return new UnsafeDirectLittleEndian((PooledUnsafeDirectByteBuf) buf, normalBufferCount, + normalBufferSize); } - return new UnsafeDirectLittleEndian((PooledUnsafeDirectByteBuf) buf, normalBufferCount, normalBufferSize); + } else { + throw fail(); } - - } else { - throw fail(); } - } - - private UnsupportedOperationException fail() { - return new UnsupportedOperationException( - "Drill requries that the JVM used supports access sun.misc.Unsafe. This platform didn't provide that functionality."); - } + private UnsupportedOperationException fail() { + return new UnsupportedOperationException( + "Drill requries that the JVM used supports access sun.misc.Unsafe. This platform didn't provide that functionality."); + } - @Override - public UnsafeDirectLittleEndian directBuffer(int initialCapacity, int maxCapacity) { + public UnsafeDirectLittleEndian directBuffer(int initialCapacity, int maxCapacity) { if (initialCapacity == 0 && maxCapacity == 0) { - newDirectBuffer(initialCapacity, maxCapacity); + newDirectBuffer(initialCapacity, maxCapacity); } validate(initialCapacity, maxCapacity); - return newDirectBuffer(initialCapacity, maxCapacity); - } + return newDirectBufferL(initialCapacity, maxCapacity); + } - @Override - public ByteBuf heapBuffer(int initialCapacity, int maxCapacity) { - throw new UnsupportedOperationException("Drill doesn't support using heap buffers."); - } + @Override + public ByteBuf heapBuffer(int initialCapacity, int maxCapacity) { + throw new UnsupportedOperationException("Drill doesn't support using heap buffers."); + } - private static void validate(int initialCapacity, int maxCapacity) { - if (initialCapacity < 0) { + private void validate(int initialCapacity, int maxCapacity) { + if (initialCapacity < 0) { throw new IllegalArgumentException("initialCapacity: " + initialCapacity + " (expectd: 0+)"); - } - if (initialCapacity > maxCapacity) { + } + if (initialCapacity > maxCapacity) { throw new IllegalArgumentException(String.format( - "initialCapacity: %d (expected: not greater than maxCapacity(%d)", - initialCapacity, maxCapacity)); + "initialCapacity: %d (expected: not greater than maxCapacity(%d)", + initialCapacity, maxCapacity)); + } } - } - private class MemoryStatusThread extends Thread { + private class MemoryStatusThread extends Thread { - public MemoryStatusThread() { - super("memory-status-logger"); - this.setDaemon(true); - this.setName("allocation.logger"); - } + public MemoryStatusThread() { + super("memory-status-logger"); + this.setDaemon(true); + this.setName("allocation.logger"); + } - @Override - public void run() { - while (true) { - memoryLogger.trace("Memory Usage: \n{}", PooledByteBufAllocatorL.this.toString()); - try { - Thread.sleep(MEMORY_LOGGER_FREQUENCY_SECONDS * 1000); - } catch (InterruptedException e) { - return; - } + @Override + public void run() { + while (true) { + memoryLogger.trace("Memory Usage: \n{}", PooledByteBufAllocatorL.this.toString()); + try { + Thread.sleep(MEMORY_LOGGER_FREQUENCY_SECONDS * 1000); + } catch (InterruptedException e) { + return; + } + } } + } - } + public void checkAndReset() { + if (hugeBufferCount.get() != 0 || normalBufferCount.get() != 0) { + StringBuilder buf = new StringBuilder(); + buf.append("Large buffers outstanding: "); --- End diff -- String.format ? > Incorporate remaining patches from DRILL-1942 Allocator refactor > ---------------------------------------------------------------- > > Key: DRILL-4134 > URL: https://issues.apache.org/jira/browse/DRILL-4134 > Project: Apache Drill > Issue Type: Sub-task > Components: Execution - Flow > Reporter: Jacques Nadeau > Assignee: Jacques Nadeau > Fix For: 1.4.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)