DRILL-965: Close underlying buffers/channels when FragmentContext is closed.
* Fixes unit test failures due race condition between closing the allocator and last message handling. Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/b328d7b7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/b328d7b7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/b328d7b7 Branch: refs/heads/master Commit: b328d7b718b59dc7aa98543f02163940d75100b6 Parents: 0879f83 Author: Aditya Kishore <[email protected]> Authored: Tue Jun 10 18:32:14 2014 -0700 Committer: Jacques Nadeau <[email protected]> Committed: Wed Jun 11 16:08:17 2014 -0700 ---------------------------------------------------------------------- .../apache/drill/exec/ops/FragmentContext.java | 4 ++ .../apache/drill/exec/rpc/RemoteConnection.java | 43 ++++++++++++-------- .../exec/work/batch/AbstractDataCollector.java | 10 +++++ .../drill/exec/work/batch/DataCollector.java | 6 +-- .../drill/exec/work/batch/IncomingBuffers.java | 22 ++++++---- 5 files changed, 56 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b328d7b7/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java index 7a82f1d..f72d672 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java @@ -251,6 +251,10 @@ public class FragmentContext implements Closeable { for(Thread thread: daemonThreads){ thread.interrupt(); } + if (buffers != null) { + buffers.close(); + } allocator.close(); } + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b328d7b7/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java index a19f8d8..cc3ec69 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java @@ -17,6 +17,8 @@ */ package org.apache.drill.exec.rpc; +import java.util.concurrent.ExecutionException; + import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; @@ -24,22 +26,22 @@ import io.netty.channel.ChannelInboundHandlerAdapter; import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.rpc.user.ConnectionThrottle; -public abstract class RemoteConnection implements ConnectionThrottle{ +public abstract class RemoteConnection implements ConnectionThrottle, AutoCloseable { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RemoteConnection.class); private final Channel channel; private final WriteManager writeManager; - + public boolean inEventLoop(){ return channel.eventLoop().inEventLoop(); } - + public RemoteConnection(Channel channel) { super(); this.channel = channel; this.writeManager = new WriteManager(); channel.pipeline().addLast(new BackPressureHandler()); } - + public abstract BufferAllocator getAllocator(); public final Channel getChannel() { @@ -57,41 +59,39 @@ public abstract class RemoteConnection implements ConnectionThrottle{ } public void setAutoRead(boolean enableAutoRead){ - channel.config().setAutoRead(enableAutoRead); + channel.config().setAutoRead(enableAutoRead); } - + public boolean isActive(){ return channel.isActive(); } - + /** * The write manager is responsible for controlling whether or not a write can be sent. It controls whether or not to block a sender if we have tcp backpressure on the receive side. */ private static class WriteManager{ private final ResettableBarrier barrier = new ResettableBarrier(); - + public WriteManager(){ barrier.openBarrier(); } - + public void waitForWritable() throws InterruptedException{ barrier.await(); } - + public void setWritable(boolean isWritable){ // logger.debug("Set writable: {}", isWritable); if(isWritable){ - barrier.openBarrier(); + barrier.openBarrier(); }else{ barrier.closeBarrier(); } - + } - + } - - private class BackPressureHandler extends ChannelInboundHandlerAdapter{ @Override @@ -100,7 +100,16 @@ public abstract class RemoteConnection implements ConnectionThrottle{ writeManager.setWritable(ctx.channel().isWritable()); ctx.fireChannelWritabilityChanged(); } - - + + } + + @Override + public void close() { + try { + channel.close().get(); + } catch (InterruptedException | ExecutionException e) { + logger.warn("Caught exception while closing channel.", e); + } } + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b328d7b7/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java index 83e697d..a67f06b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java @@ -135,4 +135,14 @@ public abstract class AbstractDataCollector implements DataCollector, ReadContro } protected abstract RawBatchBuffer getBuffer(int minorFragmentId); + + @Override + public void close() { + for (int i = 0; i < connections.length(); i++) { + if (connections.get(i) != null) { + connections.get(i).close(); + }; + } + } + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b328d7b7/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/DataCollector.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/DataCollector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/DataCollector.java index 67d78ac..dc016be 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/DataCollector.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/DataCollector.java @@ -17,17 +17,15 @@ */ package org.apache.drill.exec.work.batch; - import java.io.IOException; import org.apache.drill.exec.record.RawFragmentBatch; -import org.apache.drill.exec.rpc.RemoteConnection; - -interface DataCollector { +interface DataCollector extends AutoCloseable { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DataCollector.class); public boolean batchArrived(int minorFragmentId, RawFragmentBatch batch) throws IOException ; public int getOppositeMajorFragmentId(); public RawBatchBuffer[] getBuffers(); public int getTotalIncomingFragments(); + public void close(); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b328d7b7/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java index 9b3b870..3b97934 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java @@ -35,7 +35,7 @@ import com.google.common.collect.Maps; /** * Determines when a particular fragment has enough data for each of its receiving exchanges to commence execution. Also monitors whether we've collected all incoming data. */ -public class IncomingBuffers { +public class IncomingBuffers implements AutoCloseable { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(IncomingBuffers.class); private final AtomicInteger streamsRemaining = new AtomicInteger(0); @@ -63,7 +63,7 @@ public class IncomingBuffers { public boolean batchArrived(RawFragmentBatch batch) throws FragmentSetupException { // no need to do anything if we've already enabled running. -// logger.debug("New Batch Arrived {}", batch); + // logger.debug("New Batch Arrived {}", batch); if (batch.getHeader().getIsOutOfMemory()) { for (DataCollector fSet : fragCounts.values()) { try { @@ -99,13 +99,13 @@ public class IncomingBuffers { public RawBatchBuffer[] getBuffers(int senderMajorFragmentId){ return fragCounts.get(senderMajorFragmentId).getBuffers(); } - - + + /** * Designed to setup initial values for arriving fragment accounting. */ public class CountRequiredFragments extends AbstractPhysicalVisitor<Void, Map<Integer, DataCollector>, RuntimeException> { - + @Override public Void visitReceiver(Receiver receiver, Map<Integer, DataCollector> counts) throws RuntimeException { DataCollector set; @@ -114,13 +114,12 @@ public class IncomingBuffers { } else { set = new PartitionedCollector(remainingRequired, receiver, context); } - + counts.put(set.getOppositeMajorFragmentId(), set); remainingRequired.incrementAndGet(); return null; } - @Override public Void visitOp(PhysicalOperator op, Map<Integer, DataCollector> value) throws RuntimeException { for(PhysicalOperator o : op){ @@ -129,10 +128,17 @@ public class IncomingBuffers { return null; } - } public boolean isDone(){ return streamsRemaining.get() < 1; } + + @Override + public void close() { + for (DataCollector fragment : fragCounts.values()) { + fragment.close(); + } + } + }
