[ https://issues.apache.org/jira/browse/DRILL-4134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15064359#comment-15064359 ]
ASF GitHub Bot commented on DRILL-4134: --------------------------------------- Github user adeneche commented on a diff in the pull request: https://github.com/apache/drill/pull/283#discussion_r48051860 --- Diff: exec/memory/base/src/main/java/org/apache/drill/exec/memory/Accountant.java --- @@ -0,0 +1,269 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.memory; + +import java.util.concurrent.atomic.AtomicLong; + +import javax.annotation.concurrent.ThreadSafe; + +import org.apache.drill.exec.exception.OutOfMemoryException; + +import com.google.common.base.Preconditions; + +/** + * Provides a concurrent way to manage account for memory usage without locking. Used as basis for Allocators. All + * operations are threadsafe (except for close). + */ +@ThreadSafe +class Accountant implements AutoCloseable { + // private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Accountant.class); + + /** + * The parent allocator + */ + protected final Accountant parent; + + /** + * The amount of memory reserved for this allocator. Releases below this amount of memory will not be returned to the + * parent Accountant until this Accountant is closed. + */ + protected final long reservation; + + private final AtomicLong peakAllocation = new AtomicLong(); + + /** + * Maximum local memory that can be held. This can be externally updated. Changing it won't cause past memory to + * change but will change responses to future allocation efforts + */ + private final AtomicLong allocationLimit = new AtomicLong(); + + /** + * Currently allocated amount of memory; + */ + private final AtomicLong locallyHeldMemory = new AtomicLong(); + + public Accountant(Accountant parent, long reservation, long maxAllocation) { + Preconditions.checkArgument(reservation >= 0, "The initial reservation size must be non-negative."); + Preconditions.checkArgument(maxAllocation >= 0, "The maximum allocation limit must be non-negative."); + Preconditions.checkArgument(reservation <= maxAllocation, + "The initial reservation size must be <= the maximum allocation."); + Preconditions.checkArgument(reservation == 0 || parent != null, "The root accountant can't reserve memory."); + + this.parent = parent; + this.reservation = reservation; + this.allocationLimit.set(maxAllocation); + + if (reservation != 0) { + // we will allocate a reservation from our parent. + final AllocationOutcome outcome = parent.allocateBytes(reservation); + if (!outcome.isOk()) { + throw new OutOfMemoryException(String.format( + "Failure trying to allocate initial reservation for Allocator. " + + "Attempted to allocate %d bytes and received an outcome of %s.", reservation, outcome.name())); + } + } + } + + /** + * Attempt to allocate the requested amount of memory. Either completely succeeds or completely fails. Constructs a a + * log of delta + * + * If it fails, no changes are made to accounting. + * + * @param size + * The amount of memory to reserve in bytes. + * @return True if the allocation was successful, false if the allocation failed. + */ + AllocationOutcome allocateBytes(long size) { + final AllocationOutcome outcome = allocate(size, true, false); + if (!outcome.isOk()) { + releaseBytes(size); + } + return outcome; + } + + private void updatePeak() { + final long currentMemory = locallyHeldMemory.get(); + + long previousPeak; + do { + previousPeak = peakAllocation.get(); + if (peakAllocation.compareAndSet(previousPeak, currentMemory)) { --- End diff -- Just got the results of Vicky's tests, we are no longer seeing the concurrency issue. I do need more time to go through the PR, I want to make sure everything's good, especially that more patches will be coming that depend on this PR. > Incorporate remaining patches from DRILL-1942 Allocator refactor > ---------------------------------------------------------------- > > Key: DRILL-4134 > URL: https://issues.apache.org/jira/browse/DRILL-4134 > Project: Apache Drill > Issue Type: Sub-task > Components: Execution - Flow > Reporter: Jacques Nadeau > Assignee: Jacques Nadeau > Fix For: 1.5.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)