Repository: drill
Updated Branches:
  refs/heads/master cf3a5eef2 -> fe8471316


DRILL-5808: Reduce memory allocator strictness for "managed" operators

closes #958


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/fe847131
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/fe847131
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/fe847131

Branch: refs/heads/master
Commit: fe84713169d410c40da80d6faf658ba2ac884ccc
Parents: cf3a5ee
Author: Paul Rogers <prog...@maprtech.com>
Authored: Sun Sep 24 12:51:43 2017 -0700
Committer: Paul Rogers <prog...@maprtech.com>
Committed: Wed Oct 11 11:07:00 2017 -0700

----------------------------------------------------------------------
 .../physical/impl/aggregate/HashAggBatch.java   |   7 +
 .../physical/impl/xsort/managed/SortImpl.java   |  15 +-
 .../xsort/managed/TestLenientAllocation.java    | 194 +++++++++++++++++++
 .../apache/drill/exec/memory/Accountant.java    |  98 +++++++++-
 .../apache/drill/exec/memory/BaseAllocator.java |   9 +-
 .../drill/exec/memory/BufferAllocator.java      |  13 ++
 .../drill/exec/memory/ChildAllocator.java       |   4 +-
 7 files changed, 321 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/fe847131/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
index 314cf6e..8006276 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
@@ -97,6 +97,13 @@ public class HashAggBatch extends 
AbstractRecordBatch<HashAggregate> {
       // nulls are equal in group by case
       comparators.add(Comparator.IS_NOT_DISTINCT_FROM);
     }
+
+    // This operator manages its memory use. Ask for leniency
+    // from the allocator to allow for slight errors due to the
+    // lumpiness of vector allocations beyond our control.
+
+    boolean allowed = oContext.getAllocator().setLenient();
+    logger.debug("Config: Is allocator lenient? {}", allowed);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/fe847131/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortImpl.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortImpl.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortImpl.java
index 96861ab..d2b589c 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortImpl.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortImpl.java
@@ -203,15 +203,16 @@ public class SortImpl {
     metrics = new SortMetrics(opContext.getStats());
     bufferedBatches = new BufferedBatches(opContext);
 
-    // Reset the allocator to allow a 10% safety margin. This is done because
-    // the memory manager will enforce the original limit. Changing the hard
-    // limit will reduce the probability that random chance causes the 
allocator
+    // Request leniency from the allocator. Leniency
+    // will reduce the probability that random chance causes the allocator
     // to kill the query because of a small, spurious over-allocation.
 
-    long maxMem = memManager.getMemoryLimit();
-    long newMax = (long)(maxMem * 1.10);
-    allocator.setLimit(newMax);
-    logger.debug("Config: Resetting allocator to 10% safety margin: {}", 
newMax);
+//    long maxMem = memManager.getMemoryLimit();
+//    long newMax = (long)(maxMem * 1.10);
+//    allocator.setLimit(newMax);
+//    logger.debug("Config: Resetting allocator to 10% safety margin: {}", 
newMax);
+    boolean allowed = allocator.setLenient();
+    logger.debug("Config: Is allocator lenient? {}", allowed);
   }
 
   public void setSchema(BatchSchema schema) {

http://git-wip-us.apache.org/repos/asf/drill/blob/fe847131/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestLenientAllocation.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestLenientAllocation.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestLenientAllocation.java
new file mode 100644
index 0000000..71a51f5
--- /dev/null
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestLenientAllocation.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.xsort.managed;
+
+import static org.junit.Assert.*;
+
+import org.apache.drill.exec.exception.OutOfMemoryException;
+import org.apache.drill.exec.memory.Accountant;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.util.AssertionUtil;
+import org.apache.drill.test.LogFixture;
+import org.apache.drill.test.LogFixture.LogFixtureBuilder;
+import org.apache.drill.test.SubOperatorTest;
+import org.junit.Test;
+
+import ch.qos.logback.classic.Level;
+import io.netty.buffer.DrillBuf;
+
+/**
+ * Test of temporary allocator feature to allow a grace margin
+ * for error in allocations from operators that make a good-faith
+ * effort to stay within their budgets, but are sometimes undone
+ * by unexpected power-of-two buffer sizes and vector doubling.
+ */
+
+public class TestLenientAllocation extends SubOperatorTest {
+
+  /**
+   * Use a test-time hack to force the allocator to be lenient,
+   * regardless of whether we are in debug mode or not.
+   */
+
+  @Test
+  public void testLenient() {
+    LogFixtureBuilder logBuilder = LogFixture.builder()
+        .logger(Accountant.class, Level.WARN);
+
+    try (LogFixture logFixture = logBuilder.build()) {
+
+      // Test can't run without assertions
+
+      assertTrue(AssertionUtil.isAssertionsEnabled());
+
+      // Create a child allocator
+
+      BufferAllocator allocator = 
fixture.allocator().newChildAllocator("test", 10 * 1024, 128 * 1024);
+      ((Accountant) allocator).forceLenient();
+
+      // Allocate most of the available memory
+
+      DrillBuf buf1 = allocator.buffer(64 * 1024);
+
+      // Oops, we did our math wrong; allocate too large a buffer.
+
+      DrillBuf buf2 = allocator.buffer(128 * 1024);
+
+      assertEquals(192 * 1024, allocator.getAllocatedMemory());
+
+      // We keep making mistakes.
+
+      DrillBuf buf3 = allocator.buffer(32 * 1024);
+
+      // Right up to the hard limit
+
+      DrillBuf buf4 = allocator.buffer(32 * 1024);
+      assertEquals(256 * 1024, allocator.getAllocatedMemory());
+
+      // Enough of this; we're abusing the system. Next
+      // allocation fails.
+
+      try {
+        allocator.buffer(8);
+        fail();
+      } catch (OutOfMemoryException e) {
+        // Expected
+      }
+
+      // Recover from our excesses
+
+      buf2.close();
+      buf3.close();
+      buf4.close();
+      assertEquals(64 * 1024, allocator.getAllocatedMemory());
+
+      // We're back in the good graces of the allocator,
+      // can allocate more.
+
+      DrillBuf buf5 = allocator.buffer(8);
+
+      // Clean up
+
+      buf1.close();
+      buf5.close();
+      allocator.close();
+    }
+  }
+
+  /**
+   * Test that the allocator is normally strict in debug mode.
+   */
+
+  @Test
+  public void testStrict() {
+    LogFixtureBuilder logBuilder = LogFixture.builder()
+        .logger(Accountant.class, Level.WARN);
+
+    try (LogFixture logFixture = logBuilder.build()) {
+
+      // Test can't run without assertions
+
+      assertTrue(AssertionUtil.isAssertionsEnabled());
+
+      // Create a child allocator
+
+      BufferAllocator allocator = 
fixture.allocator().newChildAllocator("test", 10 * 1024, 128 * 1024);
+
+      // Allocate most of the available memory
+
+      DrillBuf buf1 = allocator.buffer(64 * 1024);
+
+      // Oops, we did our math wrong; allocate too large a buffer.
+
+      try {
+        allocator.buffer(128 * 1024);
+        fail();
+      } catch (OutOfMemoryException e) {
+        // Expected
+      }
+
+      // Clean up
+
+      buf1.close();
+      allocator.close();
+    }
+  }
+
+  public static final int ONE_MEG = 1024 * 1024;
+
+  @Test
+  public void testLenientLimit() {
+    LogFixtureBuilder logBuilder = LogFixture.builder()
+        .logger(Accountant.class, Level.WARN);
+
+    try (LogFixture logFixture = logBuilder.build()) {
+
+      // Test can't run without assertions
+
+      assertTrue(AssertionUtil.isAssertionsEnabled());
+
+      // Create a child allocator
+
+      BufferAllocator allocator = 
fixture.allocator().newChildAllocator("test", 10 * ONE_MEG, 128 * ONE_MEG);
+      ((Accountant) allocator).forceLenient();
+
+      // Allocate most of the available memory
+
+      DrillBuf buf1 = allocator.buffer(64 * ONE_MEG);
+
+      // Oops, we did our math wrong; allocate too large a buffer.
+
+      DrillBuf buf2 = allocator.buffer(128 * ONE_MEG);
+
+      // Can't go the full 2x over limit, errors capped at 100 MB.
+
+      try {
+        allocator.buffer(64 * ONE_MEG);
+        fail();
+      } catch (OutOfMemoryException e) {
+        // Expected
+      }
+
+      // Clean up
+
+      buf1.close();
+      buf2.close();
+      allocator.close();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/fe847131/exec/memory/base/src/main/java/org/apache/drill/exec/memory/Accountant.java
----------------------------------------------------------------------
diff --git 
a/exec/memory/base/src/main/java/org/apache/drill/exec/memory/Accountant.java 
b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/Accountant.java
index 8bcf6a0..b94a6f0 100644
--- 
a/exec/memory/base/src/main/java/org/apache/drill/exec/memory/Accountant.java
+++ 
b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/Accountant.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -22,7 +22,9 @@ import java.util.concurrent.atomic.AtomicLong;
 import javax.annotation.concurrent.ThreadSafe;
 
 import org.apache.drill.exec.exception.OutOfMemoryException;
+import org.apache.drill.exec.util.AssertionUtil;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 
 /**
@@ -30,8 +32,57 @@ import com.google.common.base.Preconditions;
  * operations are threadsafe (except for close).
  */
 @ThreadSafe
-class Accountant implements AutoCloseable {
-  // private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(Accountant.class);
+@VisibleForTesting
+public class Accountant implements AutoCloseable {
+  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(Accountant.class);
+
+  // See DRILL-5808
+  // Allow a "grace margin" above the allocator limit for those operators
+  // that are trying to stay within the limit, but are hindered by the
+  // current power-of-two allocations and random vector doublings. Instead
+  // of failing the query, the accountant allows an excess allocation within
+  // the grace, but issues a warning to the log.
+  //
+  // Normally, the "grace margin" (lenient) allocation is allowed only for
+  // operators that request leniency (because they attempt to manage memory)
+  // and only in production mode. Leniency is usually denied in a debug
+  // run (assertions enabled.) Use the following system option to force
+  // allowing lenient allocation in a debug run (or, to disable lenient
+  // allocation in a production run.)
+  //
+  // Force allowing lenient allocation:
+  //
+  // -Ddrill.memory.lenient.allocator=false
+  //
+  // Force strict allocation;
+  //
+  // -Ddrill.memory.lenient.allocator=false
+
+  public static final String ALLOW_LENIENT_ALLOCATION = 
"drill.memory.lenient.allocator";
+
+  // Allow clients to allocate beyond the limit by this factor.
+  // If the limit is 10K, then with a margin of 1, the system
+  // will allow a grace of another 10K for a hard limit of 20K.
+  // Warnings are issued for allocations that use the grace.
+
+  public static final int GRACE_MARGIN = 1;
+
+  // In large-memory allocators, don't allow more than 100 MB
+  // grace.
+
+  public static final int MAX_GRACE = 100 * 1024 * 1024;
+
+  // Use the strictness set in a system property. If not set, then
+  // default to strict in debug mode, lenient in production mode.
+
+  public static final boolean ALLOW_LENIENCY =
+      System.getProperty(ALLOW_LENIENT_ALLOCATION) == null
+      ? ! AssertionUtil.isAssertionsEnabled()
+      : Boolean.parseBoolean(System.getProperty(ALLOW_LENIENT_ALLOCATION));
+
+  // Whether leniency has been requested, and granted for this allocator.
+
+  private boolean lenient = false;
 
   /**
    * The parent allocator
@@ -80,6 +131,31 @@ class Accountant implements AutoCloseable {
   }
 
   /**
+   * Request lenient allocations: allows exceeding the allocation limit
+   * by the configured grace amount. The request is granted only if strict
+   * limits are not required.
+   *
+   * @return true if the leniency was granted, false if the current
+   * execution mode, or system property, disallows leniency
+   */
+
+  public boolean setLenient() {
+    lenient = ALLOW_LENIENCY;
+    return lenient;
+  }
+
+  /**
+   * Force lenient allocation. Used for testing to avoid the need to muck
+   * with global settings (assertions or system properties.) <b>Do not</b>
+   * use in production code!
+   */
+
+  @VisibleForTesting
+  public void forceLenient() {
+    lenient = true;
+  }
+
+  /**
    * Attempt to allocate the requested amount of memory. Either completely 
succeeds or completely fails. Constructs a a
    * log of delta
    *
@@ -148,7 +224,20 @@ class Accountant implements AutoCloseable {
   private AllocationOutcome allocate(final long size, final boolean 
incomingUpdatePeak, final boolean forceAllocation) {
     final long newLocal = locallyHeldMemory.addAndGet(size);
     final long beyondReservation = newLocal - reservation;
-    final boolean beyondLimit = newLocal > allocationLimit.get();
+    boolean beyondLimit = newLocal > allocationLimit.get();
+
+    // Non-strict allocation
+
+    if (beyondLimit && lenient) {
+      long grace = Math.min(MAX_GRACE, allocationLimit.get() * GRACE_MARGIN);
+      long graceLimit = allocationLimit.get() + grace;
+      beyondLimit = newLocal > graceLimit;
+      if (! beyondLimit) {
+        logger.warn("Excess allocation of {} bytes beyond limit of {}, " +
+                    "within grace of {}, new total allocation: {}",
+            size, allocationLimit.get(), grace, newLocal);
+      }
+    }
     final boolean updatePeak = forceAllocation || (incomingUpdatePeak && 
!beyondLimit);
 
     AllocationOutcome parentOutcome = AllocationOutcome.SUCCESS;
@@ -201,6 +290,7 @@ class Accountant implements AutoCloseable {
   /**
    * Close this Accountant. This will release any reservation bytes back to a 
parent Accountant.
    */
+  @Override
   public void close() {
     // return memory reservation to parent allocator.
     if (parent != null) {

http://git-wip-us.apache.org/repos/asf/drill/blob/fe847131/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BaseAllocator.java
----------------------------------------------------------------------
diff --git 
a/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BaseAllocator.java
 
b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BaseAllocator.java
index e97493e..6662073 100644
--- 
a/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BaseAllocator.java
+++ 
b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BaseAllocator.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -133,9 +133,7 @@ public abstract class BaseAllocator extends Accountant 
implements BufferAllocato
   }
 
   @Override
-  public String getName() {
-    return name;
-  }
+  public String getName() { return name; }
 
   @Override
   public DrillBuf getEmpty() {
@@ -233,7 +231,7 @@ public abstract class BaseAllocator extends Accountant 
implements BufferAllocato
     final int actualRequestSize = initialRequestSize < CHUNK_SIZE ?
         nextPowerOfTwo(initialRequestSize)
         : initialRequestSize;
-    AllocationOutcome outcome = this.allocateBytes(actualRequestSize);
+    AllocationOutcome outcome = allocateBytes(actualRequestSize);
     if (!outcome.isOk()) {
       throw new OutOfMemoryException(createErrorMsg(this, actualRequestSize, 
initialRequestSize));
     }
@@ -828,6 +826,7 @@ public abstract class BaseAllocator extends Accountant 
implements BufferAllocato
     }
   }
 
+  @Override
   public DrillBuf read(int length, InputStream in) throws IOException {
     DrillBuf buf = buffer(length);
     try {

http://git-wip-us.apache.org/repos/asf/drill/blob/fe847131/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BufferAllocator.java
----------------------------------------------------------------------
diff --git 
a/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BufferAllocator.java
 
b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BufferAllocator.java
index bdf3073..d45c62f 100644
--- 
a/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BufferAllocator.java
+++ 
b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BufferAllocator.java
@@ -102,6 +102,19 @@ public interface BufferAllocator extends AutoCloseable {
   public void setLimit(long newLimit);
 
   /**
+   * Request lenient enforcement of the allocation limits. Use for
+   * memory-managed operators to prevent minor math errors from killing
+   * queries. This is temporary until Drill manages memory better.
+   * Leniency is allowed only in production code (no assertions),
+   * not in debug mode (assertions enabled).
+   *
+   * @return true if leniency was granted, false if the allocator will
+   * enforce strict limits despite the request
+   */
+
+  public boolean setLenient();
+
+  /**
    * Return the current maximum limit this allocator imposes.
    *
    * @return Limit in number of bytes.

http://git-wip-us.apache.org/repos/asf/drill/blob/fe847131/exec/memory/base/src/main/java/org/apache/drill/exec/memory/ChildAllocator.java
----------------------------------------------------------------------
diff --git 
a/exec/memory/base/src/main/java/org/apache/drill/exec/memory/ChildAllocator.java
 
b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/ChildAllocator.java
index 8fcabb1..1e4e030 100644
--- 
a/exec/memory/base/src/main/java/org/apache/drill/exec/memory/ChildAllocator.java
+++ 
b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/ChildAllocator.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -48,6 +48,4 @@ class ChildAllocator extends BaseAllocator {
       long maxAllocation) {
     super(parentAllocator, name, initReservation, maxAllocation);
   }
-
-
 }

Reply via email to