Github user adeneche commented on a diff in the pull request:

    https://github.com/apache/drill/pull/283#discussion_r46453393
  
    --- Diff: 
exec/memory/base/src/main/java/org/apache/drill/exec/memory/BaseAllocator.java 
---
    @@ -0,0 +1,689 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.drill.exec.memory;
    +
    +import io.netty.buffer.ByteBufAllocator;
    +import io.netty.buffer.DrillBuf;
    +import io.netty.buffer.UnsafeDirectLittleEndian;
    +
    +import java.util.Arrays;
    +import java.util.IdentityHashMap;
    +import java.util.Set;
    +import java.util.concurrent.atomic.AtomicInteger;
    +import java.util.concurrent.atomic.AtomicLong;
    +
    +import org.apache.drill.common.HistoricalLog;
    +import org.apache.drill.exec.exception.OutOfMemoryException;
    +import org.apache.drill.exec.memory.AllocatorManager.BufferLedger;
    +import org.apache.drill.exec.ops.BufferManager;
    +import org.apache.drill.exec.util.AssertionUtil;
    +
    +import com.google.common.base.Preconditions;
    +
    +public abstract class BaseAllocator extends Accountant implements 
BufferAllocator {
    +  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(BaseAllocator.class);
    +
    +  public static final String DEBUG_ALLOCATOR = 
"drill.memory.debug.allocator";
    +
    +  private static final AtomicLong ID_GENERATOR = new AtomicLong(0);
    +  private static final int CHUNK_SIZE = 
AllocatorManager.INNER_ALLOCATOR.getChunkSize();
    +
    +  public static final int DEBUG_LOG_LENGTH = 6;
    +  public static final boolean DEBUG = AssertionUtil.isAssertionsEnabled()
    +      || Boolean.parseBoolean(System.getProperty(DEBUG_ALLOCATOR, 
"false"));
    +  private final Object DEBUG_LOCK = DEBUG ? new Object() : null;
    +
    +  private final BaseAllocator parentAllocator;
    +  private final ByteBufAllocator thisAsByteBufAllocator;
    +  private final IdentityHashMap<BaseAllocator, Object> childAllocators;
    +  private final DrillBuf empty;
    +
    +  private volatile boolean isClosed = false; // the allocator has been 
closed
    +
    +  // Package exposed for sharing between AllocatorManger and BaseAllocator 
objects
    +  final long id = ID_GENERATOR.incrementAndGet(); // unique ID assigned to 
each allocator
    +  final String name;
    +  final RootAllocator root;
    +
    +  // members used purely for debugging
    +  private final IdentityHashMap<BufferLedger, Object> childLedgers;
    +  private final IdentityHashMap<Reservation, Object> reservations;
    +  private final HistoricalLog historicalLog;
    +
    +  protected BaseAllocator(
    +      final BaseAllocator parentAllocator,
    +      final String name,
    +      final long initReservation,
    +      final long maxAllocation) throws OutOfMemoryException {
    +    super(parentAllocator, initReservation, maxAllocation);
    +
    +    if (parentAllocator != null) {
    +      this.root = parentAllocator.root;
    +      empty = parentAllocator.empty;
    +    } else if (this instanceof RootAllocator) {
    +      this.root = (RootAllocator) this;
    +      empty = createEmpty();
    +    } else {
    +      throw new IllegalStateException("An parent allocator must either 
carry a root or be the root.");
    +    }
    +
    +    this.parentAllocator = parentAllocator;
    +    this.name = name;
    +
    +    // TODO: DRILL-4131
    +    // this.thisAsByteBufAllocator = new DrillByteBufAllocator(this);
    +    this.thisAsByteBufAllocator = 
AllocatorManager.INNER_ALLOCATOR.allocator;
    +
    +    if (DEBUG) {
    +      childAllocators = new IdentityHashMap<>();
    +      reservations = new IdentityHashMap<>();
    +      childLedgers = new IdentityHashMap<>();
    +      historicalLog = new HistoricalLog(DEBUG_LOG_LENGTH, "allocator[%d]", 
id);
    +      hist("created by \"%s\", owned = %d", name.toString(), 
this.getAllocatedMemory());
    +    } else {
    +      childAllocators = null;
    +      reservations = null;
    +      historicalLog = null;
    +      childLedgers = null;
    +    }
    +
    +  }
    +
    +  @Override
    +  public String getName() {
    +    return name;
    +  }
    +
    +  @Override
    +  public DrillBuf getEmpty() {
    +    return empty;
    +  }
    +
    +  /**
    +   * For debug/verification purposes only. Allows an AllocatorManager to 
tell the allocator that we have a new ledger
    +   * associated with this allocator.
    +   */
    +  void associateLedger(BufferLedger ledger) {
    +    if (DEBUG) {
    +      synchronized (DEBUG_LOCK) {
    +        childLedgers.put(ledger, null);
    +      }
    +    }
    +  }
    +
    +  /**
    +   * For debug/verification purposes only. Allows an AllocatorManager to 
tell the allocator that we are removing a
    +   * ledger associated with this allocator
    +   */
    +  void dissociateLedger(BufferLedger ledger) {
    +    if (DEBUG) {
    +      synchronized (DEBUG_LOCK) {
    +        if (!childLedgers.containsKey(ledger)) {
    +          throw new IllegalStateException("Trying to remove a child ledger 
that doesn't exist.");
    +        }
    +        childLedgers.remove(ledger);
    +      }
    +    }
    +  }
    +
    +  /**
    +   * Track when a ChildAllocator of this BaseAllocator is closed. Used for 
debugging purposes.
    +   *
    +   * @param childAllocator
    +   *          The child allocator that has been closed.
    +   */
    +  private void childClosed(final BaseAllocator childAllocator) {
    +    if (DEBUG) {
    +      Preconditions.checkArgument(childAllocator != null, "child allocator 
can't be null");
    +
    +      synchronized (DEBUG_LOCK) {
    +        final Object object = childAllocators.remove(childAllocator);
    +        if (object == null) {
    +          childAllocator.historicalLog.logHistory(logger);
    +          throw new IllegalStateException("Child allocator[" + 
childAllocator.id
    +              + "] not found in parent allocator[" + id + "]'s 
childAllocators");
    +        }
    +      }
    +    }
    +  }
    +
    +  private static String createErrorMsg(final BufferAllocator allocator, 
final int rounded, final int requested) {
    +    if (rounded != requested) {
    +      return String.format(
    +          "Unable to allocate buffer of size %d (rounded from %d) due to 
memory limit. Current allocation: %d",
    +          rounded, requested, allocator.getAllocatedMemory());
    +    } else {
    +      return String.format("Unable to allocate buffer of size %d due to 
memory limit. Current allocation: %d",
    +          rounded, allocator.getAllocatedMemory());
    +    }
    +  }
    +
    +  @Override
    +  public DrillBuf buffer(final int initialRequestSize) {
    +    return buffer(initialRequestSize, null);
    +  }
    +
    +  private DrillBuf createEmpty(){
    +    return new DrillBuf(new AtomicInteger(), null, 
AllocatorManager.INNER_ALLOCATOR.empty, null, null, 0, 0, true);
    +  }
    +
    +  @Override
    +  public DrillBuf buffer(final int initialRequestSize, BufferManager 
manager) {
    +
    +    Preconditions.checkArgument(initialRequestSize >= 0, "the minimimum 
requested size must be non-negative");
    +    Preconditions.checkArgument(initialRequestSize >= 0, "the maximum 
requested size must be non-negative");
    --- End diff --
    
    this check is redundant, we don't have a `maximum requested size`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to