[ 
https://issues.apache.org/jira/browse/DRILL-4134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15034844#comment-15034844
 ] 

ASF GitHub Bot commented on DRILL-4134:
---------------------------------------

Github user adeneche commented on a diff in the pull request:

    https://github.com/apache/drill/pull/283#discussion_r46355906
  
    --- Diff: 
exec/memory/base/src/main/java/org/apache/drill/exec/memory/BaseAllocator.java 
---
    @@ -0,0 +1,689 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.drill.exec.memory;
    +
    +import io.netty.buffer.ByteBufAllocator;
    +import io.netty.buffer.DrillBuf;
    +import io.netty.buffer.UnsafeDirectLittleEndian;
    +
    +import java.util.Arrays;
    +import java.util.IdentityHashMap;
    +import java.util.Set;
    +import java.util.concurrent.atomic.AtomicInteger;
    +import java.util.concurrent.atomic.AtomicLong;
    +
    +import org.apache.drill.common.HistoricalLog;
    +import org.apache.drill.exec.exception.OutOfMemoryException;
    +import org.apache.drill.exec.memory.AllocatorManager.BufferLedger;
    +import org.apache.drill.exec.ops.BufferManager;
    +import org.apache.drill.exec.util.AssertionUtil;
    +
    +import com.google.common.base.Preconditions;
    +
    +public abstract class BaseAllocator extends Accountant implements 
BufferAllocator {
    +  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(BaseAllocator.class);
    +
    +  public static final String DEBUG_ALLOCATOR = 
"drill.memory.debug.allocator";
    +
    +  private static final AtomicLong ID_GENERATOR = new AtomicLong(0);
    +  private static final int CHUNK_SIZE = 
AllocatorManager.INNER_ALLOCATOR.getChunkSize();
    +
    +  public static final int DEBUG_LOG_LENGTH = 6;
    +  public static final boolean DEBUG = AssertionUtil.isAssertionsEnabled()
    +      || Boolean.parseBoolean(System.getProperty(DEBUG_ALLOCATOR, 
"false"));
    +  private final Object DEBUG_LOCK = DEBUG ? new Object() : null;
    +
    +  private final BaseAllocator parentAllocator;
    +  private final ByteBufAllocator thisAsByteBufAllocator;
    +  private final IdentityHashMap<BaseAllocator, Object> childAllocators;
    +  private final DrillBuf empty;
    +
    +  private volatile boolean isClosed = false; // the allocator has been 
closed
    +
    +  // Package exposed for sharing between AllocatorManger and BaseAllocator 
objects
    +  final long id = ID_GENERATOR.incrementAndGet(); // unique ID assigned to 
each allocator
    +  final String name;
    +  final RootAllocator root;
    +
    +  // members used purely for debugging
    +  private final IdentityHashMap<BufferLedger, Object> childLedgers;
    +  private final IdentityHashMap<Reservation, Object> reservations;
    +  private final HistoricalLog historicalLog;
    +
    +  protected BaseAllocator(
    +      final BaseAllocator parentAllocator,
    +      final String name,
    +      final long initReservation,
    +      final long maxAllocation) throws OutOfMemoryException {
    +    super(parentAllocator, initReservation, maxAllocation);
    +
    +    if (parentAllocator != null) {
    +      this.root = parentAllocator.root;
    +      empty = parentAllocator.empty;
    +    } else if (this instanceof RootAllocator) {
    +      this.root = (RootAllocator) this;
    +      empty = createEmpty();
    +    } else {
    +      throw new IllegalStateException("An parent allocator must either 
carry a root or be the root.");
    +    }
    +
    +    this.parentAllocator = parentAllocator;
    +    this.name = name;
    +
    +    // TODO: DRILL-4131
    +    // this.thisAsByteBufAllocator = new DrillByteBufAllocator(this);
    +    this.thisAsByteBufAllocator = 
AllocatorManager.INNER_ALLOCATOR.allocator;
    +
    +    if (DEBUG) {
    +      childAllocators = new IdentityHashMap<>();
    +      reservations = new IdentityHashMap<>();
    +      childLedgers = new IdentityHashMap<>();
    +      historicalLog = new HistoricalLog(DEBUG_LOG_LENGTH, "allocator[%d]", 
id);
    +      hist("created by \"%s\", owned = %d", name.toString(), 
this.getAllocatedMemory());
    +    } else {
    +      childAllocators = null;
    +      reservations = null;
    +      historicalLog = null;
    +      childLedgers = null;
    +    }
    +
    +  }
    +
    +  @Override
    +  public String getName() {
    +    return name;
    +  }
    +
    +  @Override
    +  public DrillBuf getEmpty() {
    +    return empty;
    +  }
    +
    +  /**
    +   * For debug/verification purposes only. Allows an AllocatorManager to 
tell the allocator that we have a new ledger
    +   * associated with this allocator.
    +   */
    +  void associateLedger(BufferLedger ledger) {
    +    if (DEBUG) {
    +      synchronized (DEBUG_LOCK) {
    +        childLedgers.put(ledger, null);
    +      }
    +    }
    +  }
    +
    +  /**
    +   * For debug/verification purposes only. Allows an AllocatorManager to 
tell the allocator that we are removing a
    +   * ledger associated with this allocator
    +   */
    +  void dissociateLedger(BufferLedger ledger) {
    +    if (DEBUG) {
    +      synchronized (DEBUG_LOCK) {
    +        if (!childLedgers.containsKey(ledger)) {
    +          throw new IllegalStateException("Trying to remove a child ledger 
that doesn't exist.");
    +        }
    +        childLedgers.remove(ledger);
    +      }
    +    }
    +  }
    +
    +  /**
    +   * Track when a ChildAllocator of this BaseAllocator is closed. Used for 
debugging purposes.
    +   *
    +   * @param childAllocator
    +   *          The child allocator that has been closed.
    +   */
    +  private void childClosed(final BaseAllocator childAllocator) {
    +    if (DEBUG) {
    +      Preconditions.checkArgument(childAllocator != null, "child allocator 
can't be null");
    +
    +      synchronized (DEBUG_LOCK) {
    +        final Object object = childAllocators.remove(childAllocator);
    +        if (object == null) {
    +          childAllocator.historicalLog.logHistory(logger);
    +          throw new IllegalStateException("Child allocator[" + 
childAllocator.id
    +              + "] not found in parent allocator[" + id + "]'s 
childAllocators");
    +        }
    +      }
    +    }
    +  }
    +
    +  private static String createErrorMsg(final BufferAllocator allocator, 
final int rounded, final int requested) {
    +    if (rounded != requested) {
    +      return String.format(
    +          "Unable to allocate buffer of size %d (rounded from %d) due to 
memory limit. Current allocation: %d",
    +          rounded, requested, allocator.getAllocatedMemory());
    +    } else {
    +      return String.format("Unable to allocate buffer of size %d due to 
memory limit. Current allocation: %d",
    +          rounded, allocator.getAllocatedMemory());
    +    }
    +  }
    +
    +  @Override
    +  public DrillBuf buffer(final int initialRequestSize) {
    +    return buffer(initialRequestSize, null);
    +  }
    +
    +  private DrillBuf createEmpty(){
    +    return new DrillBuf(new AtomicInteger(), null, 
AllocatorManager.INNER_ALLOCATOR.empty, null, null, 0, 0, true);
    +  }
    +
    +  @Override
    +  public DrillBuf buffer(final int initialRequestSize, BufferManager 
manager) {
    +
    +    Preconditions.checkArgument(initialRequestSize >= 0, "the minimimum 
requested size must be non-negative");
    +    Preconditions.checkArgument(initialRequestSize >= 0, "the maximum 
requested size must be non-negative");
    +
    +    if (initialRequestSize == 0) {
    +      return empty;
    +    }
    +
    +    // round to next largest power of two if we're within a chunk since 
that is how our allocator operates
    +    final int actualRequestSize = initialRequestSize < CHUNK_SIZE ?
    +        nextPowerOfTwo(initialRequestSize)
    +        : initialRequestSize;
    +    AllocationOutcome outcome = this.allocateBytes(actualRequestSize);
    +    if (!outcome.isOk()) {
    +      throw new OutOfMemoryException(createErrorMsg(this, 
actualRequestSize, initialRequestSize));
    +    }
    +
    +    boolean success = true;
    +    try {
    +      DrillBuf buffer = bufferWithoutReservation(actualRequestSize, 
manager);
    +      success = true;
    +      return buffer;
    +    } finally {
    +      if (!success) {
    +        releaseBytes(actualRequestSize);
    +      }
    +    }
    +
    +  }
    +
    +  /**
    +   * Used by usual allocation as well as for allocating a pre-reserved 
buffer. Skips the typical accounting associated
    +   * with creating a new buffer.
    +   */
    +  private DrillBuf bufferWithoutReservation(final int size, BufferManager 
bufferManager) throws OutOfMemoryException {
    +    AllocatorManager manager = new AllocatorManager(this, size);
    +    BufferLedger ledger = manager.associate(this);
    +    DrillBuf buffer = ledger.newDrillBuf(0, size, bufferManager, true);
    +
    +    // make sure that our allocation is equal to what we expected.
    +    Preconditions.checkArgument(buffer.capacity() == size,
    +        "Allocated capacity %d was not equal to requested capacity %d.", 
buffer.capacity(), size);
    +
    +    return buffer;
    +  }
    +
    +  @Override
    +  public ByteBufAllocator getAsByteBufAllocator() {
    +    return thisAsByteBufAllocator;
    +  }
    +
    +  /**
    +   * Return a unique Id for an allocator. Id's may be recycled after a 
long period of time.
    +   *
    +   * <p>
    +   * Primary use for this is for debugging output.
    +   * </p>
    +   *
    +   * @return the allocator's id
    +   */
    +  long getId() {
    +    return id;
    +  }
    +
    +  @Override
    +  public BufferAllocator newChildAllocator(
    +      final String name,
    +      final long initReservation,
    +      final long maxAllocation) {
    +    final ChildAllocator childAllocator = new ChildAllocator(this, name, 
initReservation, maxAllocation);
    +
    +    if (DEBUG) {
    +      childAllocators.put(childAllocator, childAllocator);
    +      historicalLog.recordEvent("allocator[%d] created new child 
allocator[%d]",
    +          id, childAllocator.id);
    +    }
    +
    +    return childAllocator;
    +  }
    +
    +  private class Reservation extends AllocationReservation {
    +    private final HistoricalLog historicalLog;
    +
    +    public Reservation() {
    +      if (DEBUG) {
    +        historicalLog = new HistoricalLog("Reservation[allocator[%d], 
%d]", id, System.identityHashCode(this));
    +        historicalLog.recordEvent("created");
    +        synchronized (DEBUG_LOCK) {
    +          reservations.put(this, this);
    +        }
    +      } else {
    +        historicalLog = null;
    +      }
    +    }
    +
    +    @Override
    +    public void close() {
    +      if (DEBUG) {
    +        if (!isClosed()) {
    +          final Object object;
    +          synchronized (DEBUG_LOCK) {
    +            object = reservations.remove(this);
    +          }
    +          if (object == null) {
    +            final StringBuilder sb = new StringBuilder();
    +            print(sb, 0, Verbosity.LOG_WITH_STACKTRACE);
    +            logger.debug(sb.toString());
    +            throw new IllegalStateException(
    +                String.format("Didn't find closing reservation[%d]", 
System.identityHashCode(this)));
    +          }
    +
    +          historicalLog.recordEvent("closed");
    +        }
    +      }
    +
    +      super.close();
    +    }
    +
    +    @Override
    +    protected boolean reserve(int nBytes) {
    +      final AllocationOutcome outcome = 
BaseAllocator.this.allocateBytes(nBytes);
    +
    +      if (DEBUG) {
    +        historicalLog.recordEvent("reserve(%d) => %s", nBytes, 
Boolean.toString(outcome.isOk()));
    +      }
    +
    +      return outcome.isOk();
    +    }
    +
    +    @Override
    +    protected DrillBuf allocate(int nBytes) {
    +      boolean success = false;
    +
    +      /*
    +       * The reservation already added the requested bytes to the 
allocators owned and allocated bytes via reserve().
    +       * This ensures that they can't go away. But when we ask for the 
buffer here, that will add to the allocated bytes
    +       * as well, so we need to return the same number back to avoid 
double-counting them.
    +       */
    +      try {
    +        final DrillBuf drillBuf = 
BaseAllocator.this.bufferWithoutReservation(nBytes, null);
    +
    +        if (DEBUG) {
    +          historicalLog.recordEvent("allocate() => %s",
    +              drillBuf == null ? "null" : String.format("DrillBuf[%d]", 
drillBuf.getId()));
    +        }
    +        success = true;
    +        return drillBuf;
    +      } finally {
    +        if (!success) {
    +          releaseBytes(nBytes);
    +        }
    +      }
    +    }
    +
    +    @Override
    +    protected void releaseReservation(int nBytes) {
    +      releaseBytes(nBytes);
    +
    +      if (DEBUG) {
    +        historicalLog.recordEvent("releaseReservation(%d)", nBytes);
    +      }
    +    }
    +
    +  }
    +
    +  @Override
    +  public AllocationReservation newReservation() {
    +    return new Reservation();
    +  }
    +
    +
    +  @Override
    +  public synchronized void close() {
    +    /*
    +     * Some owners may close more than once because of complex cleanup and 
shutdown
    +     * procedures.
    +     */
    +    if (isClosed) {
    +      return;
    +    }
    +
    +    if (DEBUG) {
    +      synchronized(DEBUG_LOCK) {
    +        verifyAllocator();
    +
    +        // are there outstanding child allocators?
    +        if (!childAllocators.isEmpty()) {
    +          for (final BaseAllocator childAllocator : 
childAllocators.keySet()) {
    +            if (childAllocator.isClosed) {
    +              logger.warn(String.format(
    +                  "Closed child allocator[%s] on parent allocator[%s]'s 
child list.\n%s",
    +                  childAllocator.name, name, toString()));
    +            }
    +          }
    +
    +          throw new IllegalStateException(
    +              String.format("Allocator[%s] closed with outstanding child 
allocators.\n%s", name, toString()));
    +        }
    +
    +        // are there outstanding buffers?
    +        final int allocatedCount = childLedgers.size();
    +        if (allocatedCount > 0) {
    +          throw new IllegalStateException(
    +              String.format("Allocator[%s] closed with outstanding buffers 
allocated (%d).\n%s",
    +                  name, allocatedCount, toString()));
    +        }
    +
    +        if (reservations.size() != 0) {
    +          throw new IllegalStateException(
    +              String.format("Allocator[%s] closed with outstanding 
reservations (%d).\n%s", name, reservations.size(),
    +                  toString()));
    +        }
    +
    +      }
    +    }
    +
    +      // Is there unaccounted-for outstanding allocation?
    +      final long allocated = getAllocatedMemory();
    +      if (allocated > 0) {
    +        throw new IllegalStateException(
    +          String.format("Unaccounted for outstanding allocation (%d)\n%s", 
allocated, toString()));
    +      }
    +
    +    // we need to release our memory to our parent before we tell it we've 
closed.
    +    super.close();
    +
    +    // Inform our parent allocator that we've closed
    +    if (parentAllocator != null) {
    +      parentAllocator.childClosed(this);
    +    }
    +
    +    if (DEBUG) {
    +      historicalLog.recordEvent("closed");
    +      logger.debug(String.format(
    +          "closed allocator[%s].",
    +          name));
    +    }
    +
    +    isClosed = true;
    +
    +
    +  }
    +
    +  public String toString() {
    +    final Verbosity verbosity = logger.isTraceEnabled() || true ? 
Verbosity.LOG_WITH_STACKTRACE : Verbosity.BASIC;
    --- End diff --
    
    this one too :P


> Incorporate remaining patches from DRILL-1942 Allocator refactor
> ----------------------------------------------------------------
>
>                 Key: DRILL-4134
>                 URL: https://issues.apache.org/jira/browse/DRILL-4134
>             Project: Apache Drill
>          Issue Type: Sub-task
>          Components: Execution - Flow
>            Reporter: Jacques Nadeau
>            Assignee: Jacques Nadeau
>             Fix For: 1.4.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to