DRILL-965: Close underlying buffers/channels when FragmentContext is closed.

* Fixes unit test failures due race condition between closing the allocator and 
last message handling.


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/b328d7b7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/b328d7b7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/b328d7b7

Branch: refs/heads/master
Commit: b328d7b718b59dc7aa98543f02163940d75100b6
Parents: 0879f83
Author: Aditya Kishore <[email protected]>
Authored: Tue Jun 10 18:32:14 2014 -0700
Committer: Jacques Nadeau <[email protected]>
Committed: Wed Jun 11 16:08:17 2014 -0700

----------------------------------------------------------------------
 .../apache/drill/exec/ops/FragmentContext.java  |  4 ++
 .../apache/drill/exec/rpc/RemoteConnection.java | 43 ++++++++++++--------
 .../exec/work/batch/AbstractDataCollector.java  | 10 +++++
 .../drill/exec/work/batch/DataCollector.java    |  6 +--
 .../drill/exec/work/batch/IncomingBuffers.java  | 22 ++++++----
 5 files changed, 56 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b328d7b7/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
index 7a82f1d..f72d672 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
@@ -251,6 +251,10 @@ public class FragmentContext implements Closeable {
     for(Thread thread: daemonThreads){
      thread.interrupt();
     }
+    if (buffers != null) {
+      buffers.close();
+    }
     allocator.close();
   }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b328d7b7/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
index a19f8d8..cc3ec69 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
@@ -17,6 +17,8 @@
  */
 package org.apache.drill.exec.rpc;
 
+import java.util.concurrent.ExecutionException;
+
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
@@ -24,22 +26,22 @@ import io.netty.channel.ChannelInboundHandlerAdapter;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.rpc.user.ConnectionThrottle;
 
-public abstract class RemoteConnection implements ConnectionThrottle{
+public abstract class RemoteConnection implements ConnectionThrottle, 
AutoCloseable {
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(RemoteConnection.class);
   private final Channel channel;
   private final WriteManager writeManager;
-  
+
   public boolean inEventLoop(){
     return channel.eventLoop().inEventLoop();
   }
-  
+
   public RemoteConnection(Channel channel) {
     super();
     this.channel = channel;
     this.writeManager = new WriteManager();
     channel.pipeline().addLast(new BackPressureHandler());
   }
-  
+
   public abstract BufferAllocator getAllocator();
 
   public final Channel getChannel() {
@@ -57,41 +59,39 @@ public abstract class RemoteConnection implements 
ConnectionThrottle{
   }
 
   public void setAutoRead(boolean enableAutoRead){
-    channel.config().setAutoRead(enableAutoRead); 
+    channel.config().setAutoRead(enableAutoRead);
   }
-  
+
   public boolean isActive(){
     return channel.isActive();
   }
-  
+
   /**
    * The write manager is responsible for controlling whether or not a write 
can be sent.  It controls whether or not to block a sender if we have tcp 
backpressure on the receive side.
    */
   private static class WriteManager{
     private final ResettableBarrier barrier = new ResettableBarrier();
-    
+
     public WriteManager(){
       barrier.openBarrier();
     }
-    
+
     public void waitForWritable() throws InterruptedException{
       barrier.await();
     }
-    
+
     public void setWritable(boolean isWritable){
 //      logger.debug("Set writable: {}", isWritable);
       if(isWritable){
-        barrier.openBarrier();  
+        barrier.openBarrier();
       }else{
         barrier.closeBarrier();
       }
-      
+
     }
-    
+
   }
 
-  
-  
   private class BackPressureHandler extends ChannelInboundHandlerAdapter{
 
     @Override
@@ -100,7 +100,16 @@ public abstract class RemoteConnection implements 
ConnectionThrottle{
       writeManager.setWritable(ctx.channel().isWritable());
       ctx.fireChannelWritabilityChanged();
     }
-    
-    
+
+  }
+
+  @Override
+  public void close() {
+    try {
+      channel.close().get();
+    } catch (InterruptedException | ExecutionException e) {
+      logger.warn("Caught exception while closing channel.", e);
+    }
   }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b328d7b7/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java
index 83e697d..a67f06b 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java
@@ -135,4 +135,14 @@ public abstract class AbstractDataCollector implements 
DataCollector, ReadContro
   }
 
   protected abstract RawBatchBuffer getBuffer(int minorFragmentId);
+
+  @Override
+  public void close() {
+    for (int i = 0; i < connections.length(); i++) {
+      if (connections.get(i) != null) {
+        connections.get(i).close();
+      };
+    }
+  }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b328d7b7/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/DataCollector.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/DataCollector.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/DataCollector.java
index 67d78ac..dc016be 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/DataCollector.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/DataCollector.java
@@ -17,17 +17,15 @@
  */
 package org.apache.drill.exec.work.batch;
 
-
 import java.io.IOException;
 
 import org.apache.drill.exec.record.RawFragmentBatch;
-import org.apache.drill.exec.rpc.RemoteConnection;
-
 
-interface DataCollector {
+interface DataCollector extends AutoCloseable {
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(DataCollector.class);
   public boolean batchArrived(int minorFragmentId, RawFragmentBatch batch) 
throws IOException ;
   public int getOppositeMajorFragmentId();
   public RawBatchBuffer[] getBuffers();
   public int getTotalIncomingFragments();
+  public void close();
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b328d7b7/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java
index 9b3b870..3b97934 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java
@@ -35,7 +35,7 @@ import com.google.common.collect.Maps;
 /**
  * Determines when a particular fragment has enough data for each of its 
receiving exchanges to commence execution.  Also monitors whether we've 
collected all incoming data.
  */
-public class IncomingBuffers {
+public class IncomingBuffers implements AutoCloseable {
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(IncomingBuffers.class);
 
   private final AtomicInteger streamsRemaining = new AtomicInteger(0);
@@ -63,7 +63,7 @@ public class IncomingBuffers {
 
   public boolean batchArrived(RawFragmentBatch batch) throws 
FragmentSetupException {
     // no need to do anything if we've already enabled running.
-//    logger.debug("New Batch Arrived {}", batch);
+    // logger.debug("New Batch Arrived {}", batch);
     if (batch.getHeader().getIsOutOfMemory()) {
       for (DataCollector fSet : fragCounts.values()) {
         try {
@@ -99,13 +99,13 @@ public class IncomingBuffers {
   public RawBatchBuffer[] getBuffers(int senderMajorFragmentId){
     return fragCounts.get(senderMajorFragmentId).getBuffers();
   }
-  
-  
+
+
   /**
    * Designed to setup initial values for arriving fragment accounting.
    */
   public class CountRequiredFragments extends AbstractPhysicalVisitor<Void, 
Map<Integer, DataCollector>, RuntimeException> {
-    
+
     @Override
     public Void visitReceiver(Receiver receiver, Map<Integer, DataCollector> 
counts) throws RuntimeException {
       DataCollector set;
@@ -114,13 +114,12 @@ public class IncomingBuffers {
       } else {
         set = new PartitionedCollector(remainingRequired, receiver, context);
       }
-      
+
       counts.put(set.getOppositeMajorFragmentId(), set);
       remainingRequired.incrementAndGet();
       return null;
     }
 
-    
     @Override
     public Void visitOp(PhysicalOperator op, Map<Integer, DataCollector> 
value) throws RuntimeException {
       for(PhysicalOperator o : op){
@@ -129,10 +128,17 @@ public class IncomingBuffers {
       return null;
     }
 
-
   }
 
   public boolean isDone(){
     return streamsRemaining.get() < 1;
   }
+
+  @Override
+  public void close() {
+    for (DataCollector fragment : fragCounts.values()) {
+      fragment.close();
+    }
+  }
+
 }

Reply via email to