This is an automated email from the ASF dual-hosted git repository.

siddteotia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 392fd02  ARROW-2696: [JAVA] enhance AllocationListener with an 
onFailedAllocation() call (#2133)
392fd02 is described below

commit 392fd02c63359a91b7834e55359e346f6814681c
Author: Vanco Buca <[email protected]>
AuthorDate: Fri Jun 15 10:28:01 2018 -0700

    ARROW-2696: [JAVA] enhance AllocationListener with an onFailedAllocation() 
call (#2133)
    
    * Explicit listener added to newChildAllocator(), allowing each child to 
have a separate listener instance
    
    * AllocationListener.onFailedAlloc() method added, with default no-op 
implementation
    
    * Call to onFailedAlloc() implemented
    
    * Review comments
    
    * Review comment: missing space
    
    * Consolidated BaseAllocator constructors
---
 .../apache/arrow/memory/AllocationListener.java    |  20 +++-
 .../org/apache/arrow/memory/BaseAllocator.java     |  54 ++++++----
 .../org/apache/arrow/memory/BufferAllocator.java   |  11 ++
 .../org/apache/arrow/memory/ChildAllocator.java    |   4 +-
 .../org/apache/arrow/memory/RootAllocator.java     |   2 +-
 .../org/apache/arrow/memory/TestBaseAllocator.java | 117 ++++++++++++++++++++-
 6 files changed, 180 insertions(+), 28 deletions(-)

diff --git 
a/java/memory/src/main/java/org/apache/arrow/memory/AllocationListener.java 
b/java/memory/src/main/java/org/apache/arrow/memory/AllocationListener.java
index d36cb37..de181df 100644
--- a/java/memory/src/main/java/org/apache/arrow/memory/AllocationListener.java
+++ b/java/memory/src/main/java/org/apache/arrow/memory/AllocationListener.java
@@ -21,8 +21,8 @@ package org.apache.arrow.memory;
 /**
  * An allocation listener being notified for allocation/deallocation
  * <p>
- * It is expected to be called from multiple threads and as such,
- * provider should take care of making the implementation thread-safe
+ * It might be called from multiple threads if the allocator hierarchy shares 
a listener, in which
+ * case, the provider should take care of making the implementation 
thread-safe.
  */
 public interface AllocationListener {
 
@@ -30,6 +30,11 @@ public interface AllocationListener {
     @Override
     public void onAllocation(long size) {
     }
+
+    @Override
+    public boolean onFailedAllocation(long size, Accountant.AllocationOutcome 
outcome) {
+      return false;
+    }
   };
 
   /**
@@ -39,4 +44,15 @@ public interface AllocationListener {
    */
   void onAllocation(long size);
 
+  /**
+   * Called whenever an allocation failed, giving the caller a chance to 
create some space in the allocator
+   * (either by freeing some resource, or by changing the limit), and, if 
successful, allowing the allocator
+   * to retry the allocation.
+   *
+   * @param size     the buffer size that was being allocated
+   * @param outcome  the outcome of the failed allocation. Carries information 
of what failed
+   * @return true, if the allocation can be retried; false if the allocation 
should fail
+   */
+  boolean onFailedAllocation(long size, Accountant.AllocationOutcome outcome);
+
 }
diff --git 
a/java/memory/src/main/java/org/apache/arrow/memory/BaseAllocator.java 
b/java/memory/src/main/java/org/apache/arrow/memory/BaseAllocator.java
index 4804b6b..6acbc1e 100644
--- a/java/memory/src/main/java/org/apache/arrow/memory/BaseAllocator.java
+++ b/java/memory/src/main/java/org/apache/arrow/memory/BaseAllocator.java
@@ -56,28 +56,21 @@ public abstract class BaseAllocator extends Accountant 
implements BufferAllocato
   private final HistoricalLog historicalLog;
   private volatile boolean isClosed = false; // the allocator has been closed
 
+  /**
+   * Initialize an allocator
+   * @param parentAllocator   parent allocator. null if defining a root 
allocator
+   * @param listener          listener callback. Must be non-null -- use 
{@link AllocationListener#NOOP} if no listener
+   *                          desired
+   * @param name              name of this allocator
+   * @param initReservation   initial reservation. Cannot be modified after 
construction
+   * @param maxAllocation     limit. Allocations past the limit fail. Can be 
modified after construction
+   */
   protected BaseAllocator(
-      final AllocationListener listener,
-      final String name,
-      final long initReservation,
-      final long maxAllocation) throws OutOfMemoryException {
-    this(listener, null, name, initReservation, maxAllocation);
-  }
-
-  protected BaseAllocator(
-      final BaseAllocator parentAllocator,
-      final String name,
-      final long initReservation,
-      final long maxAllocation) throws OutOfMemoryException {
-    this(parentAllocator.listener, parentAllocator, name, initReservation, 
maxAllocation);
-  }
-
-  private BaseAllocator(
-      final AllocationListener listener,
-      final BaseAllocator parentAllocator,
-      final String name,
-      final long initReservation,
-      final long maxAllocation) throws OutOfMemoryException {
+          final BaseAllocator parentAllocator,
+          final AllocationListener listener,
+          final String name,
+          final long initReservation,
+          final long maxAllocation) throws OutOfMemoryException {
     super(parentAllocator, initReservation, maxAllocation);
 
     this.listener = listener;
@@ -276,7 +269,13 @@ public abstract class BaseAllocator extends Accountant 
implements BufferAllocato
         : initialRequestSize;
     AllocationOutcome outcome = this.allocateBytes(actualRequestSize);
     if (!outcome.isOk()) {
-      throw new OutOfMemoryException(createErrorMsg(this, actualRequestSize, 
initialRequestSize));
+      if (listener.onFailedAllocation(actualRequestSize, outcome)) {
+        // Second try, in case the listener can do something about it
+        outcome = this.allocateBytes(actualRequestSize);
+      }
+      if (!outcome.isOk()) {
+        throw new OutOfMemoryException(createErrorMsg(this, actualRequestSize, 
initialRequestSize));
+      }
     }
 
     boolean success = false;
@@ -333,9 +332,18 @@ public abstract class BaseAllocator extends Accountant 
implements BufferAllocato
       final String name,
       final long initReservation,
       final long maxAllocation) {
+    return newChildAllocator(name, this.listener, initReservation, 
maxAllocation);
+  }
+
+  @Override
+  public BufferAllocator newChildAllocator(
+      final String name,
+      final AllocationListener listener,
+      final long initReservation,
+      final long maxAllocation) {
     assertOpen();
 
-    final ChildAllocator childAllocator = new ChildAllocator(this, name, 
initReservation,
+    final ChildAllocator childAllocator = new ChildAllocator(listener, this, 
name, initReservation,
         maxAllocation);
 
     if (DEBUG) {
diff --git 
a/java/memory/src/main/java/org/apache/arrow/memory/BufferAllocator.java 
b/java/memory/src/main/java/org/apache/arrow/memory/BufferAllocator.java
index b23a6e4..94ea62e 100644
--- a/java/memory/src/main/java/org/apache/arrow/memory/BufferAllocator.java
+++ b/java/memory/src/main/java/org/apache/arrow/memory/BufferAllocator.java
@@ -69,6 +69,17 @@ public interface BufferAllocator extends AutoCloseable {
   public BufferAllocator newChildAllocator(String name, long initReservation, 
long maxAllocation);
 
   /**
+   * Create a new child allocator.
+   *
+   * @param name            the name of the allocator.
+   * @param listener        allocation listener for the newly created child
+   * @param initReservation the initial space reservation (obtained from this 
allocator)
+   * @param maxAllocation   maximum amount of space the new allocator can 
allocate
+   * @return the new allocator, or null if it can't be created
+   */
+  public BufferAllocator newChildAllocator(String name, AllocationListener 
listener, long initReservation, long maxAllocation);
+
+  /**
    * Close and release all buffers generated from this buffer pool.
    *
    * <p>When assertions are on, complains if there are any outstanding 
buffers; to avoid
diff --git 
a/java/memory/src/main/java/org/apache/arrow/memory/ChildAllocator.java 
b/java/memory/src/main/java/org/apache/arrow/memory/ChildAllocator.java
index f9a6dc7..03ec268 100644
--- a/java/memory/src/main/java/org/apache/arrow/memory/ChildAllocator.java
+++ b/java/memory/src/main/java/org/apache/arrow/memory/ChildAllocator.java
@@ -32,6 +32,7 @@ class ChildAllocator extends BaseAllocator {
   /**
    * Constructor.
    *
+   * @param listener        Allocation listener to be used in this child
    * @param parentAllocator parent allocator -- the one creating this child
    * @param name            the name of this child allocator
    * @param initReservation initial amount of space to reserve (obtained from 
the parent)
@@ -41,11 +42,12 @@ class ChildAllocator extends BaseAllocator {
    *                        allocation policy in force, even less memory may 
be available
    */
   ChildAllocator(
+      AllocationListener listener,
       BaseAllocator parentAllocator,
       String name,
       long initReservation,
       long maxAllocation) {
-    super(parentAllocator, name, initReservation, maxAllocation);
+    super(parentAllocator, listener, name, initReservation, maxAllocation);
   }
 
 
diff --git 
a/java/memory/src/main/java/org/apache/arrow/memory/RootAllocator.java 
b/java/memory/src/main/java/org/apache/arrow/memory/RootAllocator.java
index 1dc6bf0..161b81a 100644
--- a/java/memory/src/main/java/org/apache/arrow/memory/RootAllocator.java
+++ b/java/memory/src/main/java/org/apache/arrow/memory/RootAllocator.java
@@ -31,7 +31,7 @@ public class RootAllocator extends BaseAllocator {
   }
 
   public RootAllocator(final AllocationListener listener, final long limit) {
-    super(listener, "ROOT", 0, limit);
+    super(null, listener, "ROOT", 0, limit);
   }
 
   /**
diff --git 
a/java/memory/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java 
b/java/memory/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java
index 76f2c50..62b0046 100644
--- a/java/memory/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java
+++ b/java/memory/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java
@@ -222,7 +222,122 @@ public class TestBaseAllocator {
     }
   }
 
-  private static void allocateAndFree(final BufferAllocator allocator) {
+  // Allocation listener
+  // It counts the number of times it has been invoked, and how much memory 
allocation it has seen
+  // When set to 'expand on fail', it attempts to expand the associated 
allocator's limit
+  private static final class TestAllocationListener implements 
AllocationListener {
+    private int numCalls;
+    private long totalMem;
+    private boolean expandOnFail;
+    BufferAllocator expandAlloc;
+    long expandLimit;
+
+    TestAllocationListener() {
+      this.numCalls = 0;
+      this.totalMem = 0;
+      this.expandOnFail = false;
+      this.expandAlloc = null;
+      this.expandLimit = 0;
+    }
+
+    @Override
+    public void onAllocation(long size) {
+      numCalls++;
+      totalMem += size;
+    }
+
+    @Override
+    public boolean onFailedAllocation(long size,  Accountant.AllocationOutcome 
outcome) {
+      if (expandOnFail) {
+        expandAlloc.setLimit(expandLimit);
+        return true;
+      }
+      return false;
+    }
+
+    void setExpandOnFail(BufferAllocator expandAlloc, long expandLimit) {
+      this.expandOnFail = true;
+      this.expandAlloc = expandAlloc;
+      this.expandLimit = expandLimit;
+    }
+
+    int getNumCalls() {
+      return numCalls;
+    }
+
+    long getTotalMem() {
+      return totalMem;
+    }
+  }
+
+  @Test
+  public void testRootAllocator_listeners() throws Exception {
+    TestAllocationListener l1 = new TestAllocationListener();
+    assertEquals(0, l1.getNumCalls());
+    assertEquals(0, l1.getTotalMem());
+    TestAllocationListener l2 = new TestAllocationListener();
+    assertEquals(0, l2.getNumCalls());
+    assertEquals(0, l2.getTotalMem());
+    // root and first-level child share the first listener
+    // second-level and third-level child share the second listener
+    try (final RootAllocator rootAllocator = new RootAllocator(l1, 
MAX_ALLOCATION)) {
+      try (final BufferAllocator c1 = rootAllocator.newChildAllocator("c1", 0, 
MAX_ALLOCATION)) {
+        final ArrowBuf buf1 = c1.buffer(16);
+        assertNotNull("allocation failed", buf1);
+        assertEquals(1, l1.getNumCalls());
+        assertEquals(16, l1.getTotalMem());
+        buf1.release();
+        try (final BufferAllocator c2 = c1.newChildAllocator("c2", l2, 0, 
MAX_ALLOCATION)) {
+          final ArrowBuf buf2 = c2.buffer(32);
+          assertNotNull("allocation failed", buf2);
+          assertEquals(1, l1.getNumCalls());
+          assertEquals(16, l1.getTotalMem());
+          assertEquals(1, l2.getNumCalls());
+          assertEquals(32, l2.getTotalMem());
+          buf2.release();
+          try (final BufferAllocator c3 = c2.newChildAllocator("c3", 0, 
MAX_ALLOCATION)) {
+            final ArrowBuf buf3 = c3.buffer(64);
+            assertNotNull("allocation failed", buf3);
+            assertEquals(1, l1.getNumCalls());
+            assertEquals(16, l1.getTotalMem());
+            assertEquals(2, l2.getNumCalls());
+            assertEquals(32 + 64, l2.getTotalMem());
+            buf3.release();
+          }
+        }
+      }
+    }
+  }
+
+  @Test
+  public void testRootAllocator_listenerAllocationFail() throws Exception {
+    TestAllocationListener l1 = new TestAllocationListener();
+    assertEquals(0, l1.getNumCalls());
+    assertEquals(0, l1.getTotalMem());
+    // Test attempts to allocate too much from a child whose limit is set to 
half of the max allocation
+    // The listener's callback triggers, expanding the child allocator's 
limit, so then the allocation succeeds
+    try (final RootAllocator rootAllocator = new 
RootAllocator(MAX_ALLOCATION)) {
+      try (final BufferAllocator c1 = rootAllocator.newChildAllocator("c1", 
l1,0, MAX_ALLOCATION / 2)) {
+        try {
+          c1.buffer(MAX_ALLOCATION);
+          fail("allocated memory beyond max allowed");
+        } catch (OutOfMemoryException e) {
+          // expected
+        }
+        assertEquals(0, l1.getNumCalls());
+        assertEquals(0, l1.getTotalMem());
+
+        l1.setExpandOnFail(c1, MAX_ALLOCATION);
+        ArrowBuf arrowBuf = c1.buffer(MAX_ALLOCATION);
+        assertNotNull("allocation failed", arrowBuf);
+        assertEquals(1, l1.getNumCalls());
+        assertEquals(MAX_ALLOCATION, l1.getTotalMem());
+        arrowBuf.release();
+      }
+    }
+  }
+
+    private static void allocateAndFree(final BufferAllocator allocator) {
     final ArrowBuf arrowBuf = allocator.buffer(512);
     assertNotNull("allocation failed", arrowBuf);
     arrowBuf.release();

-- 
To stop receiving notification emails like this one, please contact
[email protected].

Reply via email to