[ https://issues.apache.org/jira/browse/DRILL-4134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15034844#comment-15034844 ]
ASF GitHub Bot commented on DRILL-4134: --------------------------------------- Github user adeneche commented on a diff in the pull request: https://github.com/apache/drill/pull/283#discussion_r46355906 --- Diff: exec/memory/base/src/main/java/org/apache/drill/exec/memory/BaseAllocator.java --- @@ -0,0 +1,689 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.memory; + +import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.DrillBuf; +import io.netty.buffer.UnsafeDirectLittleEndian; + +import java.util.Arrays; +import java.util.IdentityHashMap; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.drill.common.HistoricalLog; +import org.apache.drill.exec.exception.OutOfMemoryException; +import org.apache.drill.exec.memory.AllocatorManager.BufferLedger; +import org.apache.drill.exec.ops.BufferManager; +import org.apache.drill.exec.util.AssertionUtil; + +import com.google.common.base.Preconditions; + +public abstract class BaseAllocator extends Accountant implements BufferAllocator { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BaseAllocator.class); + + public static final String DEBUG_ALLOCATOR = "drill.memory.debug.allocator"; + + private static final AtomicLong ID_GENERATOR = new AtomicLong(0); + private static final int CHUNK_SIZE = AllocatorManager.INNER_ALLOCATOR.getChunkSize(); + + public static final int DEBUG_LOG_LENGTH = 6; + public static final boolean DEBUG = AssertionUtil.isAssertionsEnabled() + || Boolean.parseBoolean(System.getProperty(DEBUG_ALLOCATOR, "false")); + private final Object DEBUG_LOCK = DEBUG ? new Object() : null; + + private final BaseAllocator parentAllocator; + private final ByteBufAllocator thisAsByteBufAllocator; + private final IdentityHashMap<BaseAllocator, Object> childAllocators; + private final DrillBuf empty; + + private volatile boolean isClosed = false; // the allocator has been closed + + // Package exposed for sharing between AllocatorManger and BaseAllocator objects + final long id = ID_GENERATOR.incrementAndGet(); // unique ID assigned to each allocator + final String name; + final RootAllocator root; + + // members used purely for debugging + private final IdentityHashMap<BufferLedger, Object> childLedgers; + private final IdentityHashMap<Reservation, Object> reservations; + private final HistoricalLog historicalLog; + + protected BaseAllocator( + final BaseAllocator parentAllocator, + final String name, + final long initReservation, + final long maxAllocation) throws OutOfMemoryException { + super(parentAllocator, initReservation, maxAllocation); + + if (parentAllocator != null) { + this.root = parentAllocator.root; + empty = parentAllocator.empty; + } else if (this instanceof RootAllocator) { + this.root = (RootAllocator) this; + empty = createEmpty(); + } else { + throw new IllegalStateException("An parent allocator must either carry a root or be the root."); + } + + this.parentAllocator = parentAllocator; + this.name = name; + + // TODO: DRILL-4131 + // this.thisAsByteBufAllocator = new DrillByteBufAllocator(this); + this.thisAsByteBufAllocator = AllocatorManager.INNER_ALLOCATOR.allocator; + + if (DEBUG) { + childAllocators = new IdentityHashMap<>(); + reservations = new IdentityHashMap<>(); + childLedgers = new IdentityHashMap<>(); + historicalLog = new HistoricalLog(DEBUG_LOG_LENGTH, "allocator[%d]", id); + hist("created by \"%s\", owned = %d", name.toString(), this.getAllocatedMemory()); + } else { + childAllocators = null; + reservations = null; + historicalLog = null; + childLedgers = null; + } + + } + + @Override + public String getName() { + return name; + } + + @Override + public DrillBuf getEmpty() { + return empty; + } + + /** + * For debug/verification purposes only. Allows an AllocatorManager to tell the allocator that we have a new ledger + * associated with this allocator. + */ + void associateLedger(BufferLedger ledger) { + if (DEBUG) { + synchronized (DEBUG_LOCK) { + childLedgers.put(ledger, null); + } + } + } + + /** + * For debug/verification purposes only. Allows an AllocatorManager to tell the allocator that we are removing a + * ledger associated with this allocator + */ + void dissociateLedger(BufferLedger ledger) { + if (DEBUG) { + synchronized (DEBUG_LOCK) { + if (!childLedgers.containsKey(ledger)) { + throw new IllegalStateException("Trying to remove a child ledger that doesn't exist."); + } + childLedgers.remove(ledger); + } + } + } + + /** + * Track when a ChildAllocator of this BaseAllocator is closed. Used for debugging purposes. + * + * @param childAllocator + * The child allocator that has been closed. + */ + private void childClosed(final BaseAllocator childAllocator) { + if (DEBUG) { + Preconditions.checkArgument(childAllocator != null, "child allocator can't be null"); + + synchronized (DEBUG_LOCK) { + final Object object = childAllocators.remove(childAllocator); + if (object == null) { + childAllocator.historicalLog.logHistory(logger); + throw new IllegalStateException("Child allocator[" + childAllocator.id + + "] not found in parent allocator[" + id + "]'s childAllocators"); + } + } + } + } + + private static String createErrorMsg(final BufferAllocator allocator, final int rounded, final int requested) { + if (rounded != requested) { + return String.format( + "Unable to allocate buffer of size %d (rounded from %d) due to memory limit. Current allocation: %d", + rounded, requested, allocator.getAllocatedMemory()); + } else { + return String.format("Unable to allocate buffer of size %d due to memory limit. Current allocation: %d", + rounded, allocator.getAllocatedMemory()); + } + } + + @Override + public DrillBuf buffer(final int initialRequestSize) { + return buffer(initialRequestSize, null); + } + + private DrillBuf createEmpty(){ + return new DrillBuf(new AtomicInteger(), null, AllocatorManager.INNER_ALLOCATOR.empty, null, null, 0, 0, true); + } + + @Override + public DrillBuf buffer(final int initialRequestSize, BufferManager manager) { + + Preconditions.checkArgument(initialRequestSize >= 0, "the minimimum requested size must be non-negative"); + Preconditions.checkArgument(initialRequestSize >= 0, "the maximum requested size must be non-negative"); + + if (initialRequestSize == 0) { + return empty; + } + + // round to next largest power of two if we're within a chunk since that is how our allocator operates + final int actualRequestSize = initialRequestSize < CHUNK_SIZE ? + nextPowerOfTwo(initialRequestSize) + : initialRequestSize; + AllocationOutcome outcome = this.allocateBytes(actualRequestSize); + if (!outcome.isOk()) { + throw new OutOfMemoryException(createErrorMsg(this, actualRequestSize, initialRequestSize)); + } + + boolean success = true; + try { + DrillBuf buffer = bufferWithoutReservation(actualRequestSize, manager); + success = true; + return buffer; + } finally { + if (!success) { + releaseBytes(actualRequestSize); + } + } + + } + + /** + * Used by usual allocation as well as for allocating a pre-reserved buffer. Skips the typical accounting associated + * with creating a new buffer. + */ + private DrillBuf bufferWithoutReservation(final int size, BufferManager bufferManager) throws OutOfMemoryException { + AllocatorManager manager = new AllocatorManager(this, size); + BufferLedger ledger = manager.associate(this); + DrillBuf buffer = ledger.newDrillBuf(0, size, bufferManager, true); + + // make sure that our allocation is equal to what we expected. + Preconditions.checkArgument(buffer.capacity() == size, + "Allocated capacity %d was not equal to requested capacity %d.", buffer.capacity(), size); + + return buffer; + } + + @Override + public ByteBufAllocator getAsByteBufAllocator() { + return thisAsByteBufAllocator; + } + + /** + * Return a unique Id for an allocator. Id's may be recycled after a long period of time. + * + * <p> + * Primary use for this is for debugging output. + * </p> + * + * @return the allocator's id + */ + long getId() { + return id; + } + + @Override + public BufferAllocator newChildAllocator( + final String name, + final long initReservation, + final long maxAllocation) { + final ChildAllocator childAllocator = new ChildAllocator(this, name, initReservation, maxAllocation); + + if (DEBUG) { + childAllocators.put(childAllocator, childAllocator); + historicalLog.recordEvent("allocator[%d] created new child allocator[%d]", + id, childAllocator.id); + } + + return childAllocator; + } + + private class Reservation extends AllocationReservation { + private final HistoricalLog historicalLog; + + public Reservation() { + if (DEBUG) { + historicalLog = new HistoricalLog("Reservation[allocator[%d], %d]", id, System.identityHashCode(this)); + historicalLog.recordEvent("created"); + synchronized (DEBUG_LOCK) { + reservations.put(this, this); + } + } else { + historicalLog = null; + } + } + + @Override + public void close() { + if (DEBUG) { + if (!isClosed()) { + final Object object; + synchronized (DEBUG_LOCK) { + object = reservations.remove(this); + } + if (object == null) { + final StringBuilder sb = new StringBuilder(); + print(sb, 0, Verbosity.LOG_WITH_STACKTRACE); + logger.debug(sb.toString()); + throw new IllegalStateException( + String.format("Didn't find closing reservation[%d]", System.identityHashCode(this))); + } + + historicalLog.recordEvent("closed"); + } + } + + super.close(); + } + + @Override + protected boolean reserve(int nBytes) { + final AllocationOutcome outcome = BaseAllocator.this.allocateBytes(nBytes); + + if (DEBUG) { + historicalLog.recordEvent("reserve(%d) => %s", nBytes, Boolean.toString(outcome.isOk())); + } + + return outcome.isOk(); + } + + @Override + protected DrillBuf allocate(int nBytes) { + boolean success = false; + + /* + * The reservation already added the requested bytes to the allocators owned and allocated bytes via reserve(). + * This ensures that they can't go away. But when we ask for the buffer here, that will add to the allocated bytes + * as well, so we need to return the same number back to avoid double-counting them. + */ + try { + final DrillBuf drillBuf = BaseAllocator.this.bufferWithoutReservation(nBytes, null); + + if (DEBUG) { + historicalLog.recordEvent("allocate() => %s", + drillBuf == null ? "null" : String.format("DrillBuf[%d]", drillBuf.getId())); + } + success = true; + return drillBuf; + } finally { + if (!success) { + releaseBytes(nBytes); + } + } + } + + @Override + protected void releaseReservation(int nBytes) { + releaseBytes(nBytes); + + if (DEBUG) { + historicalLog.recordEvent("releaseReservation(%d)", nBytes); + } + } + + } + + @Override + public AllocationReservation newReservation() { + return new Reservation(); + } + + + @Override + public synchronized void close() { + /* + * Some owners may close more than once because of complex cleanup and shutdown + * procedures. + */ + if (isClosed) { + return; + } + + if (DEBUG) { + synchronized(DEBUG_LOCK) { + verifyAllocator(); + + // are there outstanding child allocators? + if (!childAllocators.isEmpty()) { + for (final BaseAllocator childAllocator : childAllocators.keySet()) { + if (childAllocator.isClosed) { + logger.warn(String.format( + "Closed child allocator[%s] on parent allocator[%s]'s child list.\n%s", + childAllocator.name, name, toString())); + } + } + + throw new IllegalStateException( + String.format("Allocator[%s] closed with outstanding child allocators.\n%s", name, toString())); + } + + // are there outstanding buffers? + final int allocatedCount = childLedgers.size(); + if (allocatedCount > 0) { + throw new IllegalStateException( + String.format("Allocator[%s] closed with outstanding buffers allocated (%d).\n%s", + name, allocatedCount, toString())); + } + + if (reservations.size() != 0) { + throw new IllegalStateException( + String.format("Allocator[%s] closed with outstanding reservations (%d).\n%s", name, reservations.size(), + toString())); + } + + } + } + + // Is there unaccounted-for outstanding allocation? + final long allocated = getAllocatedMemory(); + if (allocated > 0) { + throw new IllegalStateException( + String.format("Unaccounted for outstanding allocation (%d)\n%s", allocated, toString())); + } + + // we need to release our memory to our parent before we tell it we've closed. + super.close(); + + // Inform our parent allocator that we've closed + if (parentAllocator != null) { + parentAllocator.childClosed(this); + } + + if (DEBUG) { + historicalLog.recordEvent("closed"); + logger.debug(String.format( + "closed allocator[%s].", + name)); + } + + isClosed = true; + + + } + + public String toString() { + final Verbosity verbosity = logger.isTraceEnabled() || true ? Verbosity.LOG_WITH_STACKTRACE : Verbosity.BASIC; --- End diff -- this one too :P > Incorporate remaining patches from DRILL-1942 Allocator refactor > ---------------------------------------------------------------- > > Key: DRILL-4134 > URL: https://issues.apache.org/jira/browse/DRILL-4134 > Project: Apache Drill > Issue Type: Sub-task > Components: Execution - Flow > Reporter: Jacques Nadeau > Assignee: Jacques Nadeau > Fix For: 1.4.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)