Update ControlConnection so that we don't block on backup.  Control messages 
should be allowed in band.


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/c8db3462
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/c8db3462
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/c8db3462

Branch: refs/heads/master
Commit: c8db3462d7403094f2ab1578d9bc5e4bec8a55d1
Parents: 6f54ab0
Author: Jacques Nadeau <[email protected]>
Authored: Mon Jun 16 08:46:03 2014 -0700
Committer: Jacques Nadeau <[email protected]>
Committed: Mon Jun 16 12:44:24 2014 -0700

----------------------------------------------------------------------
 .../org/apache/drill/exec/rpc/control/ControlConnection.java | 5 +++++
 .../org/apache/drill/exec/rpc/control/ControlTunnel.java     | 8 ++++----
 2 files changed, 9 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c8db3462/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnection.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnection.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnection.java
index af0368a..c03b7c2 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnection.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnection.java
@@ -63,6 +63,11 @@ public class ControlConnection extends RemoteConnection {
     bus.send(outcomeListener, this, rpcType, protobufBody, clazz, dataBodies);
   }
 
+  public <SEND extends MessageLite, RECEIVE extends MessageLite> void 
sendUnsafe(RpcOutcomeListener<RECEIVE> outcomeListener,
+      RpcType rpcType, SEND protobufBody, Class<RECEIVE> clazz, ByteBuf... 
dataBodies) {
+    bus.send(outcomeListener, this, rpcType, protobufBody, clazz, true, 
dataBodies);
+  }
+
   public void disable() {
     active = false;
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c8db3462/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java
index d0f4bde..432acab 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java
@@ -41,7 +41,7 @@ public class ControlTunnel {
     this.manager = manager;
     this.endpoint = endpoint;
   }
-  
+
   public DrillbitEndpoint getEndpoint(){
     return manager.getEndpoint();
   }
@@ -50,13 +50,13 @@ public class ControlTunnel {
     SendFragment b = new SendFragment(outcomeListener, fragment);
     manager.runCommand(b);
   }
-  
+
   public DrillRpcFuture<Ack> cancelFragment(FragmentHandle handle){
     CancelFragment b = new CancelFragment(handle);
     manager.runCommand(b);
     return b.getFuture();
   }
-  
+
   public DrillRpcFuture<Ack> sendFragmentStatus(FragmentStatus status){
     SendFragmentStatus b = new SendFragmentStatus(status);
     manager.runCommand(b);
@@ -80,7 +80,7 @@ public class ControlTunnel {
 
     @Override
     public void doRpcCall(RpcOutcomeListener<Ack> outcomeListener, 
ControlConnection connection) {
-      connection.send(outcomeListener, RpcType.REQ_FRAGMENT_STATUS, status, 
Ack.class);
+      connection.sendUnsafe(outcomeListener, RpcType.REQ_FRAGMENT_STATUS, 
status, Ack.class);
     }
 
   }

Reply via email to