Update ControlConnection so that we don't block on backup. Control messages should be allowed in band.
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/c8db3462 Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/c8db3462 Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/c8db3462 Branch: refs/heads/master Commit: c8db3462d7403094f2ab1578d9bc5e4bec8a55d1 Parents: 6f54ab0 Author: Jacques Nadeau <[email protected]> Authored: Mon Jun 16 08:46:03 2014 -0700 Committer: Jacques Nadeau <[email protected]> Committed: Mon Jun 16 12:44:24 2014 -0700 ---------------------------------------------------------------------- .../org/apache/drill/exec/rpc/control/ControlConnection.java | 5 +++++ .../org/apache/drill/exec/rpc/control/ControlTunnel.java | 8 ++++---- 2 files changed, 9 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c8db3462/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnection.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnection.java index af0368a..c03b7c2 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnection.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnection.java @@ -63,6 +63,11 @@ public class ControlConnection extends RemoteConnection { bus.send(outcomeListener, this, rpcType, protobufBody, clazz, dataBodies); } + public <SEND extends MessageLite, RECEIVE extends MessageLite> void sendUnsafe(RpcOutcomeListener<RECEIVE> outcomeListener, + RpcType rpcType, SEND protobufBody, Class<RECEIVE> clazz, ByteBuf... dataBodies) { + bus.send(outcomeListener, this, rpcType, protobufBody, clazz, true, dataBodies); + } + public void disable() { active = false; } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c8db3462/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java index d0f4bde..432acab 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java @@ -41,7 +41,7 @@ public class ControlTunnel { this.manager = manager; this.endpoint = endpoint; } - + public DrillbitEndpoint getEndpoint(){ return manager.getEndpoint(); } @@ -50,13 +50,13 @@ public class ControlTunnel { SendFragment b = new SendFragment(outcomeListener, fragment); manager.runCommand(b); } - + public DrillRpcFuture<Ack> cancelFragment(FragmentHandle handle){ CancelFragment b = new CancelFragment(handle); manager.runCommand(b); return b.getFuture(); } - + public DrillRpcFuture<Ack> sendFragmentStatus(FragmentStatus status){ SendFragmentStatus b = new SendFragmentStatus(status); manager.runCommand(b); @@ -80,7 +80,7 @@ public class ControlTunnel { @Override public void doRpcCall(RpcOutcomeListener<Ack> outcomeListener, ControlConnection connection) { - connection.send(outcomeListener, RpcType.REQ_FRAGMENT_STATUS, status, Ack.class); + connection.sendUnsafe(outcomeListener, RpcType.REQ_FRAGMENT_STATUS, status, Ack.class); } }
