This is an automated email from the ASF dual-hosted git repository. arina pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/drill.git
commit dc1db98f613f041e40c2c704862051ee2a51734a Author: Salim Achouche <sachouc...@gmail.com> AuthorDate: Sun Apr 22 18:02:35 2018 -0700 DRILL-6348: Received batches are now owned by the receive operators instead of the parent closes #1237 --- .../org/apache/drill/exec/ops/FragmentContextImpl.java | 5 ++++- .../exec/physical/impl/MergingReceiverCreator.java | 2 +- .../physical/impl/mergereceiver/MergingRecordBatch.java | 3 +++ .../impl/unorderedreceiver/UnorderedReceiverBatch.java | 3 +++ .../unorderedreceiver/UnorderedReceiverCreator.java | 2 +- .../drill/exec/work/batch/AbstractDataCollector.java | 17 +++++++++++++++++ .../org/apache/drill/exec/work/batch/DataCollector.java | 14 +++++++++++++- .../apache/drill/exec/work/batch/IncomingBuffers.java | 10 +++++++--- 8 files changed, 49 insertions(+), 7 deletions(-) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java index b192850..503ebdd 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java @@ -449,13 +449,16 @@ public class FragmentContextImpl extends BaseFragmentContext implements Executor public void close() { waitForSendComplete(); + // Close the buffers before closing the operators; this is needed as buffer ownership + // is attached to the receive operators. + suppressingClose(buffers); + // close operator context for (OperatorContextImpl opContext : contexts) { suppressingClose(opContext); } suppressingClose(bufferManager); - suppressingClose(buffers); suppressingClose(allocator); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/MergingReceiverCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/MergingReceiverCreator.java index 0ef84b9..66a0cc2 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/MergingReceiverCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/MergingReceiverCreator.java @@ -42,7 +42,7 @@ public class MergingReceiverCreator implements BatchCreator<MergingReceiverPOP> IncomingBuffers bufHolder = context.getBuffers(); assert bufHolder != null : "IncomingBuffers must be defined for any place a receiver is declared."; - RawBatchBuffer[] buffers = bufHolder.getBuffers(receiver.getOppositeMajorFragmentId()); + RawBatchBuffer[] buffers = bufHolder.getCollector(receiver.getOppositeMajorFragmentId()).getBuffers(); return new MergingRecordBatch(context, receiver, buffers); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java index 7e5ff21..9087757 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java @@ -136,6 +136,9 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> this.config = config; this.inputCounts = new long[config.getNumSenders()]; this.outputCounts = new long[config.getNumSenders()]; + + // Register this operator's buffer allocator so that incoming buffers are owned by this allocator + context.getBuffers().getCollector(config.getOppositeMajorFragmentId()).setAllocator(oContext.getAllocator()); } @SuppressWarnings("resource") diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java index 433e0c8..424a733 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java @@ -86,6 +86,9 @@ public class UnorderedReceiverBatch implements CloseableRecordBatch { this.stats = oContext.getStats(); this.stats.setLongStat(Metric.NUM_SENDERS, config.getNumSenders()); this.config = config; + + // Register this operator's buffer allocator so that incoming buffers are owned by this allocator + context.getBuffers().getCollector(config.getOppositeMajorFragmentId()).setAllocator(oContext.getAllocator()); } @Override diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverCreator.java index 01a4588..3dcdfc4 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverCreator.java @@ -37,7 +37,7 @@ public class UnorderedReceiverCreator implements BatchCreator<UnorderedReceiver> IncomingBuffers bufHolder = context.getBuffers(); assert bufHolder != null : "IncomingBuffers must be defined for any place a receiver is declared."; - RawBatchBuffer[] buffers = bufHolder.getBuffers(receiver.getOppositeMajorFragmentId()); + RawBatchBuffer[] buffers = bufHolder.getCollector(receiver.getOppositeMajorFragmentId()).getBuffers(); assert buffers.length == 1; RawBatchBuffer buffer = buffers[0]; return new UnorderedReceiverBatch(context, buffer, receiver); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java index b6b4183..bb3a5a2 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java @@ -22,6 +22,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicIntegerArray; import org.apache.drill.common.AutoCloseables; +import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.proto.BitControl.Collector; import org.apache.drill.exec.record.RawFragmentBatch; @@ -37,6 +38,8 @@ public abstract class AbstractDataCollector implements DataCollector { private final int incomingStreams; protected final RawBatchBuffer[] buffers; protected final ArrayWrappedIntIntMap fragmentMap; + /** Allocator which owns incoming batches */ + protected BufferAllocator ownerAllocator; /** * @param parentAccounter @@ -53,6 +56,7 @@ public abstract class AbstractDataCollector implements DataCollector { this.parentAccounter = parentAccounter; this.remainders = new AtomicIntegerArray(incomingStreams); this.oppositeMajorFragmentId = collector.getOppositeMajorFragmentId(); + this.ownerAllocator = context.getAllocator(); // Create fragmentId to index that is within the range [0, incoming.size()-1] // We use this mapping to find objects belonging to the fragment in buffers and remainders arrays. fragmentMap = new ArrayWrappedIntIntMap(); @@ -116,4 +120,17 @@ public abstract class AbstractDataCollector implements DataCollector { AutoCloseables.close(buffers); } + /** {@inheritDoc} */ + @Override + public BufferAllocator getAllocator() { + return this.ownerAllocator; + } + + /** {@inheritDoc} */ + @Override + public void setAllocator(BufferAllocator allocator) { + Preconditions.checkArgument(allocator != null, "buffer allocator cannot be null"); + this.ownerAllocator = allocator; + } + } \ No newline at end of file diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/DataCollector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/DataCollector.java index 026fc81..fa74677 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/DataCollector.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/DataCollector.java @@ -19,13 +19,25 @@ package org.apache.drill.exec.work.batch; import java.io.IOException; +import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.record.RawFragmentBatch; -interface DataCollector extends AutoCloseable { +public interface DataCollector extends AutoCloseable { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DataCollector.class); public boolean batchArrived(int minorFragmentId, RawFragmentBatch batch) throws IOException ; public int getOppositeMajorFragmentId(); public RawBatchBuffer[] getBuffers(); public int getTotalIncomingFragments(); public void close() throws Exception; + /** + * Enables caller (e.g., receiver) to attach its buffer allocator to this Data Collector in order + * to claim ownership of incoming batches; by default, the fragment allocator owns these batches. + * + * @param allocator operator buffer allocator + */ + void setAllocator(BufferAllocator allocator); + /** + * @return allocator + */ + BufferAllocator getAllocator(); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java index 876c8b5..2d1b4f2 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java @@ -27,6 +27,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.drill.common.AutoCloseables; import org.apache.drill.common.concurrent.AutoCloseableLock; import org.apache.drill.exec.exception.FragmentSetupException; +import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.proto.BitControl.Collector; import org.apache.drill.exec.proto.BitControl.PlanFragment; @@ -103,8 +104,11 @@ public class IncomingBuffers implements AutoCloseable { Arrays.toString(collectorMap.values().toArray()))); } + // Use the Data Collector's buffer allocator if set, otherwise the fragment's one + BufferAllocator ownerAllocator = collector.getAllocator(); + synchronized (collector) { - final RawFragmentBatch newRawFragmentBatch = incomingBatch.newRawFragmentBatch(context.getAllocator()); + final RawFragmentBatch newRawFragmentBatch = incomingBatch.newRawFragmentBatch(ownerAllocator); boolean decrementedToZero = collector .batchArrived(incomingBatch.getHeader().getSendingMinorFragmentId(), newRawFragmentBatch); newRawFragmentBatch.release(); @@ -125,8 +129,8 @@ public class IncomingBuffers implements AutoCloseable { return rem; } - public RawBatchBuffer[] getBuffers(int senderMajorFragmentId) { - return collectorMap.get(senderMajorFragmentId).getBuffers(); + public DataCollector getCollector(int senderMajorFragmentId) { + return collectorMap.get(senderMajorFragmentId); } public boolean isDone() { -- To stop receiving notification emails like this one, please contact ar...@apache.org.