This is an automated email from the ASF dual-hosted git repository.
siddteotia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 392fd02 ARROW-2696: [JAVA] enhance AllocationListener with an
onFailedAllocation() call (#2133)
392fd02 is described below
commit 392fd02c63359a91b7834e55359e346f6814681c
Author: Vanco Buca <[email protected]>
AuthorDate: Fri Jun 15 10:28:01 2018 -0700
ARROW-2696: [JAVA] enhance AllocationListener with an onFailedAllocation()
call (#2133)
* Explicit listener added to newChildAllocator(), allowing each child to
have a separate listener instance
* AllocationListener.onFailedAlloc() method added, with default no-op
implementation
* Call to onFailedAlloc() implemented
* Review comments
* Review comment: missing space
* Consolidated BaseAllocator constructors
---
.../apache/arrow/memory/AllocationListener.java | 20 +++-
.../org/apache/arrow/memory/BaseAllocator.java | 54 ++++++----
.../org/apache/arrow/memory/BufferAllocator.java | 11 ++
.../org/apache/arrow/memory/ChildAllocator.java | 4 +-
.../org/apache/arrow/memory/RootAllocator.java | 2 +-
.../org/apache/arrow/memory/TestBaseAllocator.java | 117 ++++++++++++++++++++-
6 files changed, 180 insertions(+), 28 deletions(-)
diff --git
a/java/memory/src/main/java/org/apache/arrow/memory/AllocationListener.java
b/java/memory/src/main/java/org/apache/arrow/memory/AllocationListener.java
index d36cb37..de181df 100644
--- a/java/memory/src/main/java/org/apache/arrow/memory/AllocationListener.java
+++ b/java/memory/src/main/java/org/apache/arrow/memory/AllocationListener.java
@@ -21,8 +21,8 @@ package org.apache.arrow.memory;
/**
* An allocation listener being notified for allocation/deallocation
* <p>
- * It is expected to be called from multiple threads and as such,
- * provider should take care of making the implementation thread-safe
+ * It might be called from multiple threads if the allocator hierarchy shares
a listener, in which
+ * case, the provider should take care of making the implementation
thread-safe.
*/
public interface AllocationListener {
@@ -30,6 +30,11 @@ public interface AllocationListener {
@Override
public void onAllocation(long size) {
}
+
+ @Override
+ public boolean onFailedAllocation(long size, Accountant.AllocationOutcome
outcome) {
+ return false;
+ }
};
/**
@@ -39,4 +44,15 @@ public interface AllocationListener {
*/
void onAllocation(long size);
+ /**
+ * Called whenever an allocation failed, giving the caller a chance to
create some space in the allocator
+ * (either by freeing some resource, or by changing the limit), and, if
successful, allowing the allocator
+ * to retry the allocation.
+ *
+ * @param size the buffer size that was being allocated
+ * @param outcome the outcome of the failed allocation. Carries information
of what failed
+ * @return true, if the allocation can be retried; false if the allocation
should fail
+ */
+ boolean onFailedAllocation(long size, Accountant.AllocationOutcome outcome);
+
}
diff --git
a/java/memory/src/main/java/org/apache/arrow/memory/BaseAllocator.java
b/java/memory/src/main/java/org/apache/arrow/memory/BaseAllocator.java
index 4804b6b..6acbc1e 100644
--- a/java/memory/src/main/java/org/apache/arrow/memory/BaseAllocator.java
+++ b/java/memory/src/main/java/org/apache/arrow/memory/BaseAllocator.java
@@ -56,28 +56,21 @@ public abstract class BaseAllocator extends Accountant
implements BufferAllocato
private final HistoricalLog historicalLog;
private volatile boolean isClosed = false; // the allocator has been closed
+ /**
+ * Initialize an allocator
+ * @param parentAllocator parent allocator. null if defining a root
allocator
+ * @param listener listener callback. Must be non-null -- use
{@link AllocationListener#NOOP} if no listener
+ * desired
+ * @param name name of this allocator
+ * @param initReservation initial reservation. Cannot be modified after
construction
+ * @param maxAllocation limit. Allocations past the limit fail. Can be
modified after construction
+ */
protected BaseAllocator(
- final AllocationListener listener,
- final String name,
- final long initReservation,
- final long maxAllocation) throws OutOfMemoryException {
- this(listener, null, name, initReservation, maxAllocation);
- }
-
- protected BaseAllocator(
- final BaseAllocator parentAllocator,
- final String name,
- final long initReservation,
- final long maxAllocation) throws OutOfMemoryException {
- this(parentAllocator.listener, parentAllocator, name, initReservation,
maxAllocation);
- }
-
- private BaseAllocator(
- final AllocationListener listener,
- final BaseAllocator parentAllocator,
- final String name,
- final long initReservation,
- final long maxAllocation) throws OutOfMemoryException {
+ final BaseAllocator parentAllocator,
+ final AllocationListener listener,
+ final String name,
+ final long initReservation,
+ final long maxAllocation) throws OutOfMemoryException {
super(parentAllocator, initReservation, maxAllocation);
this.listener = listener;
@@ -276,7 +269,13 @@ public abstract class BaseAllocator extends Accountant
implements BufferAllocato
: initialRequestSize;
AllocationOutcome outcome = this.allocateBytes(actualRequestSize);
if (!outcome.isOk()) {
- throw new OutOfMemoryException(createErrorMsg(this, actualRequestSize,
initialRequestSize));
+ if (listener.onFailedAllocation(actualRequestSize, outcome)) {
+ // Second try, in case the listener can do something about it
+ outcome = this.allocateBytes(actualRequestSize);
+ }
+ if (!outcome.isOk()) {
+ throw new OutOfMemoryException(createErrorMsg(this, actualRequestSize,
initialRequestSize));
+ }
}
boolean success = false;
@@ -333,9 +332,18 @@ public abstract class BaseAllocator extends Accountant
implements BufferAllocato
final String name,
final long initReservation,
final long maxAllocation) {
+ return newChildAllocator(name, this.listener, initReservation,
maxAllocation);
+ }
+
+ @Override
+ public BufferAllocator newChildAllocator(
+ final String name,
+ final AllocationListener listener,
+ final long initReservation,
+ final long maxAllocation) {
assertOpen();
- final ChildAllocator childAllocator = new ChildAllocator(this, name,
initReservation,
+ final ChildAllocator childAllocator = new ChildAllocator(listener, this,
name, initReservation,
maxAllocation);
if (DEBUG) {
diff --git
a/java/memory/src/main/java/org/apache/arrow/memory/BufferAllocator.java
b/java/memory/src/main/java/org/apache/arrow/memory/BufferAllocator.java
index b23a6e4..94ea62e 100644
--- a/java/memory/src/main/java/org/apache/arrow/memory/BufferAllocator.java
+++ b/java/memory/src/main/java/org/apache/arrow/memory/BufferAllocator.java
@@ -69,6 +69,17 @@ public interface BufferAllocator extends AutoCloseable {
public BufferAllocator newChildAllocator(String name, long initReservation,
long maxAllocation);
/**
+ * Create a new child allocator.
+ *
+ * @param name the name of the allocator.
+ * @param listener allocation listener for the newly created child
+ * @param initReservation the initial space reservation (obtained from this
allocator)
+ * @param maxAllocation maximum amount of space the new allocator can
allocate
+ * @return the new allocator, or null if it can't be created
+ */
+ public BufferAllocator newChildAllocator(String name, AllocationListener
listener, long initReservation, long maxAllocation);
+
+ /**
* Close and release all buffers generated from this buffer pool.
*
* <p>When assertions are on, complains if there are any outstanding
buffers; to avoid
diff --git
a/java/memory/src/main/java/org/apache/arrow/memory/ChildAllocator.java
b/java/memory/src/main/java/org/apache/arrow/memory/ChildAllocator.java
index f9a6dc7..03ec268 100644
--- a/java/memory/src/main/java/org/apache/arrow/memory/ChildAllocator.java
+++ b/java/memory/src/main/java/org/apache/arrow/memory/ChildAllocator.java
@@ -32,6 +32,7 @@ class ChildAllocator extends BaseAllocator {
/**
* Constructor.
*
+ * @param listener Allocation listener to be used in this child
* @param parentAllocator parent allocator -- the one creating this child
* @param name the name of this child allocator
* @param initReservation initial amount of space to reserve (obtained from
the parent)
@@ -41,11 +42,12 @@ class ChildAllocator extends BaseAllocator {
* allocation policy in force, even less memory may
be available
*/
ChildAllocator(
+ AllocationListener listener,
BaseAllocator parentAllocator,
String name,
long initReservation,
long maxAllocation) {
- super(parentAllocator, name, initReservation, maxAllocation);
+ super(parentAllocator, listener, name, initReservation, maxAllocation);
}
diff --git
a/java/memory/src/main/java/org/apache/arrow/memory/RootAllocator.java
b/java/memory/src/main/java/org/apache/arrow/memory/RootAllocator.java
index 1dc6bf0..161b81a 100644
--- a/java/memory/src/main/java/org/apache/arrow/memory/RootAllocator.java
+++ b/java/memory/src/main/java/org/apache/arrow/memory/RootAllocator.java
@@ -31,7 +31,7 @@ public class RootAllocator extends BaseAllocator {
}
public RootAllocator(final AllocationListener listener, final long limit) {
- super(listener, "ROOT", 0, limit);
+ super(null, listener, "ROOT", 0, limit);
}
/**
diff --git
a/java/memory/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java
b/java/memory/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java
index 76f2c50..62b0046 100644
--- a/java/memory/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java
+++ b/java/memory/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java
@@ -222,7 +222,122 @@ public class TestBaseAllocator {
}
}
- private static void allocateAndFree(final BufferAllocator allocator) {
+ // Allocation listener
+ // It counts the number of times it has been invoked, and how much memory
allocation it has seen
+ // When set to 'expand on fail', it attempts to expand the associated
allocator's limit
+ private static final class TestAllocationListener implements
AllocationListener {
+ private int numCalls;
+ private long totalMem;
+ private boolean expandOnFail;
+ BufferAllocator expandAlloc;
+ long expandLimit;
+
+ TestAllocationListener() {
+ this.numCalls = 0;
+ this.totalMem = 0;
+ this.expandOnFail = false;
+ this.expandAlloc = null;
+ this.expandLimit = 0;
+ }
+
+ @Override
+ public void onAllocation(long size) {
+ numCalls++;
+ totalMem += size;
+ }
+
+ @Override
+ public boolean onFailedAllocation(long size, Accountant.AllocationOutcome
outcome) {
+ if (expandOnFail) {
+ expandAlloc.setLimit(expandLimit);
+ return true;
+ }
+ return false;
+ }
+
+ void setExpandOnFail(BufferAllocator expandAlloc, long expandLimit) {
+ this.expandOnFail = true;
+ this.expandAlloc = expandAlloc;
+ this.expandLimit = expandLimit;
+ }
+
+ int getNumCalls() {
+ return numCalls;
+ }
+
+ long getTotalMem() {
+ return totalMem;
+ }
+ }
+
+ @Test
+ public void testRootAllocator_listeners() throws Exception {
+ TestAllocationListener l1 = new TestAllocationListener();
+ assertEquals(0, l1.getNumCalls());
+ assertEquals(0, l1.getTotalMem());
+ TestAllocationListener l2 = new TestAllocationListener();
+ assertEquals(0, l2.getNumCalls());
+ assertEquals(0, l2.getTotalMem());
+ // root and first-level child share the first listener
+ // second-level and third-level child share the second listener
+ try (final RootAllocator rootAllocator = new RootAllocator(l1,
MAX_ALLOCATION)) {
+ try (final BufferAllocator c1 = rootAllocator.newChildAllocator("c1", 0,
MAX_ALLOCATION)) {
+ final ArrowBuf buf1 = c1.buffer(16);
+ assertNotNull("allocation failed", buf1);
+ assertEquals(1, l1.getNumCalls());
+ assertEquals(16, l1.getTotalMem());
+ buf1.release();
+ try (final BufferAllocator c2 = c1.newChildAllocator("c2", l2, 0,
MAX_ALLOCATION)) {
+ final ArrowBuf buf2 = c2.buffer(32);
+ assertNotNull("allocation failed", buf2);
+ assertEquals(1, l1.getNumCalls());
+ assertEquals(16, l1.getTotalMem());
+ assertEquals(1, l2.getNumCalls());
+ assertEquals(32, l2.getTotalMem());
+ buf2.release();
+ try (final BufferAllocator c3 = c2.newChildAllocator("c3", 0,
MAX_ALLOCATION)) {
+ final ArrowBuf buf3 = c3.buffer(64);
+ assertNotNull("allocation failed", buf3);
+ assertEquals(1, l1.getNumCalls());
+ assertEquals(16, l1.getTotalMem());
+ assertEquals(2, l2.getNumCalls());
+ assertEquals(32 + 64, l2.getTotalMem());
+ buf3.release();
+ }
+ }
+ }
+ }
+ }
+
+ @Test
+ public void testRootAllocator_listenerAllocationFail() throws Exception {
+ TestAllocationListener l1 = new TestAllocationListener();
+ assertEquals(0, l1.getNumCalls());
+ assertEquals(0, l1.getTotalMem());
+ // Test attempts to allocate too much from a child whose limit is set to
half of the max allocation
+ // The listener's callback triggers, expanding the child allocator's
limit, so then the allocation succeeds
+ try (final RootAllocator rootAllocator = new
RootAllocator(MAX_ALLOCATION)) {
+ try (final BufferAllocator c1 = rootAllocator.newChildAllocator("c1",
l1,0, MAX_ALLOCATION / 2)) {
+ try {
+ c1.buffer(MAX_ALLOCATION);
+ fail("allocated memory beyond max allowed");
+ } catch (OutOfMemoryException e) {
+ // expected
+ }
+ assertEquals(0, l1.getNumCalls());
+ assertEquals(0, l1.getTotalMem());
+
+ l1.setExpandOnFail(c1, MAX_ALLOCATION);
+ ArrowBuf arrowBuf = c1.buffer(MAX_ALLOCATION);
+ assertNotNull("allocation failed", arrowBuf);
+ assertEquals(1, l1.getNumCalls());
+ assertEquals(MAX_ALLOCATION, l1.getTotalMem());
+ arrowBuf.release();
+ }
+ }
+ }
+
+ private static void allocateAndFree(final BufferAllocator allocator) {
final ArrowBuf arrowBuf = allocator.buffer(512);
assertNotNull("allocation failed", arrowBuf);
arrowBuf.release();
--
To stop receiving notification emails like this one, please contact
[email protected].