This is an automated email from the ASF dual-hosted git repository.

arina pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit dc1db98f613f041e40c2c704862051ee2a51734a
Author: Salim Achouche <sachouc...@gmail.com>
AuthorDate: Sun Apr 22 18:02:35 2018 -0700

    DRILL-6348: Received batches are now owned by the receive operators instead 
of the parent
    
    closes #1237
---
 .../org/apache/drill/exec/ops/FragmentContextImpl.java  |  5 ++++-
 .../exec/physical/impl/MergingReceiverCreator.java      |  2 +-
 .../physical/impl/mergereceiver/MergingRecordBatch.java |  3 +++
 .../impl/unorderedreceiver/UnorderedReceiverBatch.java  |  3 +++
 .../unorderedreceiver/UnorderedReceiverCreator.java     |  2 +-
 .../drill/exec/work/batch/AbstractDataCollector.java    | 17 +++++++++++++++++
 .../org/apache/drill/exec/work/batch/DataCollector.java | 14 +++++++++++++-
 .../apache/drill/exec/work/batch/IncomingBuffers.java   | 10 +++++++---
 8 files changed, 49 insertions(+), 7 deletions(-)

diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java
index b192850..503ebdd 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java
@@ -449,13 +449,16 @@ public class FragmentContextImpl extends 
BaseFragmentContext implements Executor
   public void close() {
     waitForSendComplete();
 
+    // Close the buffers before closing the operators; this is needed as 
buffer ownership
+    // is attached to the receive operators.
+    suppressingClose(buffers);
+
     // close operator context
     for (OperatorContextImpl opContext : contexts) {
       suppressingClose(opContext);
     }
 
     suppressingClose(bufferManager);
-    suppressingClose(buffers);
     suppressingClose(allocator);
   }
 
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/MergingReceiverCreator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/MergingReceiverCreator.java
index 0ef84b9..66a0cc2 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/MergingReceiverCreator.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/MergingReceiverCreator.java
@@ -42,7 +42,7 @@ public class MergingReceiverCreator implements 
BatchCreator<MergingReceiverPOP>
     IncomingBuffers bufHolder = context.getBuffers();
 
     assert bufHolder != null : "IncomingBuffers must be defined for any place 
a receiver is declared.";
-    RawBatchBuffer[] buffers = 
bufHolder.getBuffers(receiver.getOppositeMajorFragmentId());
+    RawBatchBuffer[] buffers = 
bufHolder.getCollector(receiver.getOppositeMajorFragmentId()).getBuffers();
 
     return new MergingRecordBatch(context, receiver, buffers);
   }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
index 7e5ff21..9087757 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
@@ -136,6 +136,9 @@ public class MergingRecordBatch extends 
AbstractRecordBatch<MergingReceiverPOP>
     this.config = config;
     this.inputCounts = new long[config.getNumSenders()];
     this.outputCounts = new long[config.getNumSenders()];
+
+    // Register this operator's buffer allocator so that incoming buffers are 
owned by this allocator
+    
context.getBuffers().getCollector(config.getOppositeMajorFragmentId()).setAllocator(oContext.getAllocator());
   }
 
   @SuppressWarnings("resource")
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
index 433e0c8..424a733 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
@@ -86,6 +86,9 @@ public class UnorderedReceiverBatch implements 
CloseableRecordBatch {
     this.stats = oContext.getStats();
     this.stats.setLongStat(Metric.NUM_SENDERS, config.getNumSenders());
     this.config = config;
+
+    // Register this operator's buffer allocator so that incoming buffers are 
owned by this allocator
+    
context.getBuffers().getCollector(config.getOppositeMajorFragmentId()).setAllocator(oContext.getAllocator());
   }
 
   @Override
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverCreator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverCreator.java
index 01a4588..3dcdfc4 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverCreator.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverCreator.java
@@ -37,7 +37,7 @@ public class UnorderedReceiverCreator implements 
BatchCreator<UnorderedReceiver>
     IncomingBuffers bufHolder = context.getBuffers();
     assert bufHolder != null : "IncomingBuffers must be defined for any place 
a receiver is declared.";
 
-    RawBatchBuffer[] buffers = 
bufHolder.getBuffers(receiver.getOppositeMajorFragmentId());
+    RawBatchBuffer[] buffers = 
bufHolder.getCollector(receiver.getOppositeMajorFragmentId()).getBuffers();
     assert buffers.length == 1;
     RawBatchBuffer buffer = buffers[0];
     return new UnorderedReceiverBatch(context, buffer, receiver);
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java
index b6b4183..bb3a5a2 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java
@@ -22,6 +22,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicIntegerArray;
 
 import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.proto.BitControl.Collector;
 import org.apache.drill.exec.record.RawFragmentBatch;
@@ -37,6 +38,8 @@ public abstract class AbstractDataCollector implements 
DataCollector {
   private final int incomingStreams;
   protected final RawBatchBuffer[] buffers;
   protected final ArrayWrappedIntIntMap fragmentMap;
+  /** Allocator which owns incoming batches */
+  protected BufferAllocator ownerAllocator;
 
   /**
    * @param parentAccounter
@@ -53,6 +56,7 @@ public abstract class AbstractDataCollector implements 
DataCollector {
     this.parentAccounter = parentAccounter;
     this.remainders = new AtomicIntegerArray(incomingStreams);
     this.oppositeMajorFragmentId = collector.getOppositeMajorFragmentId();
+    this.ownerAllocator = context.getAllocator();
     // Create fragmentId to index that is within the range [0, 
incoming.size()-1]
     // We use this mapping to find objects belonging to the fragment in 
buffers and remainders arrays.
     fragmentMap = new ArrayWrappedIntIntMap();
@@ -116,4 +120,17 @@ public abstract class AbstractDataCollector implements 
DataCollector {
     AutoCloseables.close(buffers);
   }
 
+  /** {@inheritDoc} */
+  @Override
+  public BufferAllocator getAllocator() {
+    return this.ownerAllocator;
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void setAllocator(BufferAllocator allocator) {
+    Preconditions.checkArgument(allocator != null, "buffer allocator cannot be 
null");
+    this.ownerAllocator = allocator;
+  }
+
 }
\ No newline at end of file
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/DataCollector.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/DataCollector.java
index 026fc81..fa74677 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/DataCollector.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/DataCollector.java
@@ -19,13 +19,25 @@ package org.apache.drill.exec.work.batch;
 
 import java.io.IOException;
 
+import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.record.RawFragmentBatch;
 
-interface DataCollector extends AutoCloseable {
+public interface DataCollector extends AutoCloseable {
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(DataCollector.class);
   public boolean batchArrived(int minorFragmentId, RawFragmentBatch batch) 
throws IOException ;
   public int getOppositeMajorFragmentId();
   public RawBatchBuffer[] getBuffers();
   public int getTotalIncomingFragments();
   public void close() throws Exception;
+  /**
+   * Enables caller (e.g., receiver) to attach its buffer allocator to this 
Data Collector in order
+   * to claim ownership of incoming batches; by default, the fragment 
allocator owns these batches.
+   *
+   * @param allocator operator buffer allocator
+   */
+  void setAllocator(BufferAllocator allocator);
+  /**
+   * @return allocator
+   */
+  BufferAllocator getAllocator();
 }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java
index 876c8b5..2d1b4f2 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java
@@ -27,6 +27,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.drill.common.AutoCloseables;
 import org.apache.drill.common.concurrent.AutoCloseableLock;
 import org.apache.drill.exec.exception.FragmentSetupException;
+import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.proto.BitControl.Collector;
 import org.apache.drill.exec.proto.BitControl.PlanFragment;
@@ -103,8 +104,11 @@ public class IncomingBuffers implements AutoCloseable {
             Arrays.toString(collectorMap.values().toArray())));
       }
 
+      // Use the Data Collector's buffer allocator if set, otherwise the 
fragment's one
+      BufferAllocator ownerAllocator = collector.getAllocator();
+
       synchronized (collector) {
-        final RawFragmentBatch newRawFragmentBatch = 
incomingBatch.newRawFragmentBatch(context.getAllocator());
+        final RawFragmentBatch newRawFragmentBatch = 
incomingBatch.newRawFragmentBatch(ownerAllocator);
         boolean decrementedToZero = collector
             
.batchArrived(incomingBatch.getHeader().getSendingMinorFragmentId(), 
newRawFragmentBatch);
         newRawFragmentBatch.release();
@@ -125,8 +129,8 @@ public class IncomingBuffers implements AutoCloseable {
     return rem;
   }
 
-  public RawBatchBuffer[] getBuffers(int senderMajorFragmentId) {
-    return collectorMap.get(senderMajorFragmentId).getBuffers();
+  public DataCollector getCollector(int senderMajorFragmentId) {
+    return collectorMap.get(senderMajorFragmentId);
   }
 
   public boolean isDone() {

-- 
To stop receiving notification emails like this one, please contact
ar...@apache.org.

Reply via email to