[ 
https://issues.apache.org/jira/browse/DRILL-4134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15034201#comment-15034201
 ] 

ASF GitHub Bot commented on DRILL-4134:
---------------------------------------

Github user julienledem commented on a diff in the pull request:

    https://github.com/apache/drill/pull/283#discussion_r46312998
  
    --- Diff: 
exec/memory/base/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java ---
    @@ -23,193 +23,246 @@
     import java.nio.ByteBuffer;
     import java.util.concurrent.atomic.AtomicLong;
     
    +import org.apache.drill.exec.exception.OutOfMemoryException;
    +
     import com.codahale.metrics.Gauge;
     import com.codahale.metrics.Histogram;
     import com.codahale.metrics.Metric;
     import com.codahale.metrics.MetricFilter;
     import com.codahale.metrics.MetricRegistry;
     
    -public class PooledByteBufAllocatorL extends PooledByteBufAllocator{
    -  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(PooledByteBufAllocatorL.class);
    -
    +public class PooledByteBufAllocatorL {
       private static final org.slf4j.Logger memoryLogger = 
org.slf4j.LoggerFactory.getLogger("drill.allocator");
    +
       private static final int MEMORY_LOGGER_FREQUENCY_SECONDS = 60;
     
    +
       private static final String METRIC_PREFIX = "drill.allocator.";
    +
       private final MetricRegistry registry;
       private final AtomicLong hugeBufferSize = new AtomicLong(0);
       private final AtomicLong hugeBufferCount = new AtomicLong(0);
       private final AtomicLong normalBufferSize = new AtomicLong(0);
       private final AtomicLong normalBufferCount = new AtomicLong(0);
     
    -  private final PoolArena<ByteBuffer>[] directArenas;
    -  private final MemoryStatusThread statusThread;
    -  private final Histogram largeBuffersHist;
    -  private final Histogram normalBuffersHist;
    +  public final InnerAllocator allocator;
    +  public final UnsafeDirectLittleEndian empty;
     
       public PooledByteBufAllocatorL(MetricRegistry registry) {
    -    super(true);
         this.registry = registry;
    +    allocator = new InnerAllocator();
    +    empty = new UnsafeDirectLittleEndian(new 
DuplicatedByteBuf(Unpooled.EMPTY_BUFFER));
    +  }
    +
    +  public UnsafeDirectLittleEndian allocate(int size) {
         try {
    -      Field f = 
PooledByteBufAllocator.class.getDeclaredField("directArenas");
    -      f.setAccessible(true);
    -      this.directArenas = (PoolArena<ByteBuffer>[]) f.get(this);
    -    } catch (Exception e) {
    -      throw new RuntimeException("Failure while initializing allocator.  
Unable to retrieve direct arenas field.", e);
    +      return allocator.directBuffer(size, size);
    +    } catch (OutOfMemoryError e) {
    +      throw new OutOfMemoryException("Failure allocating buffer.", e);
         }
     
    -    if (memoryLogger.isTraceEnabled()) {
    -      statusThread = new MemoryStatusThread();
    -      statusThread.start();
    -    } else {
    -      statusThread = null;
    -    }
    -    removeOldMetrics();
    +  }
     
    -    registry.register(METRIC_PREFIX + "normal.size", new Gauge<Long>() {
    -      @Override
    -      public Long getValue() {
    -        return normalBufferSize.get();
    -      }
    -    });
    +  public int getChunkSize() {
    +    return allocator.chunkSize;
    +  }
     
    -    registry.register(METRIC_PREFIX + "normal.count", new Gauge<Long>() {
    -      @Override
    -      public Long getValue() {
    -        return normalBufferCount.get();
    -      }
    -    });
    +  private class InnerAllocator extends PooledByteBufAllocator {
     
    -    registry.register(METRIC_PREFIX + "huge.size", new Gauge<Long>() {
    -      @Override
    -      public Long getValue() {
    -        return hugeBufferSize.get();
    -      }
    -    });
     
    -    registry.register(METRIC_PREFIX + "huge.count", new Gauge<Long>() {
    -      @Override
    -      public Long getValue() {
    -        return hugeBufferCount.get();
    -      }
    -    });
    +    private final PoolArena<ByteBuffer>[] directArenas;
    +    private final MemoryStatusThread statusThread;
    +    private final Histogram largeBuffersHist;
    +    private final Histogram normalBuffersHist;
    +    private final int chunkSize;
     
    -    largeBuffersHist = registry.histogram(METRIC_PREFIX + "huge.hist");
    -    normalBuffersHist = registry.histogram(METRIC_PREFIX + "normal.hist");
    +    public InnerAllocator() {
    +      super(true);
     
    -  }
    +      try {
    +        Field f = 
PooledByteBufAllocator.class.getDeclaredField("directArenas");
    +        f.setAccessible(true);
    +        this.directArenas = (PoolArena<ByteBuffer>[]) f.get(this);
    +      } catch (Exception e) {
    +        throw new RuntimeException("Failure while initializing allocator.  
Unable to retrieve direct arenas field.", e);
    +      }
     
    -  private synchronized void removeOldMetrics() {
    -    registry.removeMatching(new MetricFilter() {
    -      @Override
    -      public boolean matches(String name, Metric metric) {
    -        return name.startsWith("drill.allocator.");
    +      this.chunkSize = directArenas[0].chunkSize;
    +
    +      if (memoryLogger.isTraceEnabled()) {
    +        statusThread = new MemoryStatusThread();
    +        statusThread.start();
    +      } else {
    +        statusThread = null;
           }
    +      removeOldMetrics();
     
    -    });
    -  }
    +      registry.register(METRIC_PREFIX + "normal.size", new Gauge<Long>() {
    +        @Override
    +        public Long getValue() {
    +          return normalBufferSize.get();
    +        }
    +      });
     
    -  @Override
    -  protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
    -    throw new UnsupportedOperationException("Drill doesn't support using 
heap buffers.");
    -  }
    +      registry.register(METRIC_PREFIX + "normal.count", new Gauge<Long>() {
    +        @Override
    +        public Long getValue() {
    +          return normalBufferCount.get();
    +        }
    +      });
    +
    +      registry.register(METRIC_PREFIX + "huge.size", new Gauge<Long>() {
    +        @Override
    +        public Long getValue() {
    +          return hugeBufferSize.get();
    +        }
    +      });
     
    -  @Override
    -  protected UnsafeDirectLittleEndian newDirectBuffer(int initialCapacity, 
int maxCapacity) {
    -    PoolThreadCache cache = threadCache.get();
    -    PoolArena<ByteBuffer> directArena = cache.directArena;
    +      registry.register(METRIC_PREFIX + "huge.count", new Gauge<Long>() {
    +        @Override
    +        public Long getValue() {
    +          return hugeBufferCount.get();
    +        }
    +      });
     
    -    if (directArena != null) {
    +      largeBuffersHist = registry.histogram(METRIC_PREFIX + "huge.hist");
    +      normalBuffersHist = registry.histogram(METRIC_PREFIX + 
"normal.hist");
     
    -      if (initialCapacity > directArena.chunkSize) {
    -        // This is beyond chunk size so we'll allocate separately.
    -        ByteBuf buf = 
UnpooledByteBufAllocator.DEFAULT.directBuffer(initialCapacity, maxCapacity);
    +    }
     
    -        hugeBufferCount.incrementAndGet();
    -        hugeBufferSize.addAndGet(buf.capacity());
    -        largeBuffersHist.update(buf.capacity());
    -        // logger.debug("Allocating huge buffer of size {}", 
initialCapacity, new Exception());
    -        return new UnsafeDirectLittleEndian(new LargeBuffer(buf, 
hugeBufferSize, hugeBufferCount));
     
    -      } else {
    -        // within chunk, use arena.
    -        ByteBuf buf = directArena.allocate(cache, initialCapacity, 
maxCapacity);
    -        if (!(buf instanceof PooledUnsafeDirectByteBuf)) {
    -          fail();
    +    private synchronized void removeOldMetrics() {
    +      registry.removeMatching(new MetricFilter() {
    +        @Override
    +        public boolean matches(String name, Metric metric) {
    +          return name.startsWith("drill.allocator.");
             }
     
    -        normalBuffersHist.update(buf.capacity());
    -        if (ASSERT_ENABLED) {
    -          normalBufferSize.addAndGet(buf.capacity());
    -          normalBufferCount.incrementAndGet();
    +      });
    +    }
    +
    +    private UnsafeDirectLittleEndian newDirectBufferL(int initialCapacity, 
int maxCapacity) {
    +      PoolThreadCache cache = threadCache.get();
    +      PoolArena<ByteBuffer> directArena = cache.directArena;
    +
    +      if (directArena != null) {
    +
    +        if (initialCapacity > directArena.chunkSize) {
    +          // This is beyond chunk size so we'll allocate separately.
    +          ByteBuf buf = 
UnpooledByteBufAllocator.DEFAULT.directBuffer(initialCapacity, maxCapacity);
    +
    +          hugeBufferCount.incrementAndGet();
    +          hugeBufferSize.addAndGet(buf.capacity());
    +          largeBuffersHist.update(buf.capacity());
    +          // logger.debug("Allocating huge buffer of size {}", 
initialCapacity, new Exception());
    +          return new UnsafeDirectLittleEndian(new LargeBuffer(buf, 
hugeBufferSize, hugeBufferCount));
    +
    +        } else {
    +          // within chunk, use arena.
    +          ByteBuf buf = directArena.allocate(cache, initialCapacity, 
maxCapacity);
    +          if (!(buf instanceof PooledUnsafeDirectByteBuf)) {
    +            fail();
    +          }
    +
    +          normalBuffersHist.update(buf.capacity());
    +          if (ASSERT_ENABLED) {
    +            normalBufferSize.addAndGet(buf.capacity());
    +            normalBufferCount.incrementAndGet();
    +          }
    +
    +          return new UnsafeDirectLittleEndian((PooledUnsafeDirectByteBuf) 
buf, normalBufferCount,
    +              normalBufferSize);
             }
     
    -        return new UnsafeDirectLittleEndian((PooledUnsafeDirectByteBuf) 
buf, normalBufferCount, normalBufferSize);
    +      } else {
    +        throw fail();
           }
    -
    -    } else {
    -      throw fail();
         }
    -  }
    -
    -  private UnsupportedOperationException fail() {
    -    return new UnsupportedOperationException(
    -        "Drill requries that the JVM used supports access sun.misc.Unsafe. 
 This platform didn't provide that functionality.");
    -  }
     
    +    private UnsupportedOperationException fail() {
    +      return new UnsupportedOperationException(
    +          "Drill requries that the JVM used supports access 
sun.misc.Unsafe.  This platform didn't provide that functionality.");
    +    }
     
    -  @Override
    -  public UnsafeDirectLittleEndian directBuffer(int initialCapacity, int 
maxCapacity) {
    +    public UnsafeDirectLittleEndian directBuffer(int initialCapacity, int 
maxCapacity) {
           if (initialCapacity == 0 && maxCapacity == 0) {
    -          newDirectBuffer(initialCapacity, maxCapacity);
    +        newDirectBuffer(initialCapacity, maxCapacity);
           }
           validate(initialCapacity, maxCapacity);
    -      return newDirectBuffer(initialCapacity, maxCapacity);
    -  }
    +      return newDirectBufferL(initialCapacity, maxCapacity);
    +    }
     
    -  @Override
    -  public ByteBuf heapBuffer(int initialCapacity, int maxCapacity) {
    -    throw new UnsupportedOperationException("Drill doesn't support using 
heap buffers.");
    -  }
    +    @Override
    +    public ByteBuf heapBuffer(int initialCapacity, int maxCapacity) {
    +      throw new UnsupportedOperationException("Drill doesn't support using 
heap buffers.");
    +    }
     
     
    -  private static void validate(int initialCapacity, int maxCapacity) {
    -    if (initialCapacity < 0) {
    +    private void validate(int initialCapacity, int maxCapacity) {
    +      if (initialCapacity < 0) {
             throw new IllegalArgumentException("initialCapacity: " + 
initialCapacity + " (expectd: 0+)");
    -    }
    -    if (initialCapacity > maxCapacity) {
    +      }
    +      if (initialCapacity > maxCapacity) {
             throw new IllegalArgumentException(String.format(
    -                "initialCapacity: %d (expected: not greater than 
maxCapacity(%d)",
    -                initialCapacity, maxCapacity));
    +            "initialCapacity: %d (expected: not greater than 
maxCapacity(%d)",
    +            initialCapacity, maxCapacity));
    +      }
         }
    -  }
     
    -  private class MemoryStatusThread extends Thread {
    +    private class MemoryStatusThread extends Thread {
     
    -    public MemoryStatusThread() {
    -      super("memory-status-logger");
    -      this.setDaemon(true);
    -      this.setName("allocation.logger");
    -    }
    +      public MemoryStatusThread() {
    +        super("memory-status-logger");
    +        this.setDaemon(true);
    +        this.setName("allocation.logger");
    +      }
     
    -    @Override
    -    public void run() {
    -      while (true) {
    -        memoryLogger.trace("Memory Usage: \n{}", 
PooledByteBufAllocatorL.this.toString());
    -        try {
    -          Thread.sleep(MEMORY_LOGGER_FREQUENCY_SECONDS * 1000);
    -        } catch (InterruptedException e) {
    -          return;
    -        }
    +      @Override
    +      public void run() {
    +        while (true) {
    +          memoryLogger.trace("Memory Usage: \n{}", 
PooledByteBufAllocatorL.this.toString());
    +          try {
    +            Thread.sleep(MEMORY_LOGGER_FREQUENCY_SECONDS * 1000);
    +          } catch (InterruptedException e) {
    +            return;
    +          }
     
    +        }
           }
    +
         }
     
    -  }
    +    public void checkAndReset() {
    +      if (hugeBufferCount.get() != 0 || normalBufferCount.get() != 0) {
    +        StringBuilder buf = new StringBuilder();
    +        buf.append("Large buffers outstanding: ");
    --- End diff --
    
    also: should this reuse toString?


> Incorporate remaining patches from DRILL-1942 Allocator refactor
> ----------------------------------------------------------------
>
>                 Key: DRILL-4134
>                 URL: https://issues.apache.org/jira/browse/DRILL-4134
>             Project: Apache Drill
>          Issue Type: Sub-task
>          Components: Execution - Flow
>            Reporter: Jacques Nadeau
>            Assignee: Jacques Nadeau
>             Fix For: 1.4.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to