[ 
https://issues.apache.org/jira/browse/DRILL-6255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16481026#comment-16481026
 ] 

ASF GitHub Bot commented on DRILL-6255:
---------------------------------------

asfgit closed pull request #1253: DRILL-6255: Drillbit while sending control 
message to itself creates …
URL: https://github.com/apache/drill/pull/1253
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BitRpcUtility.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BitRpcUtility.java
index c3980336f7..0bc01f5367 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BitRpcUtility.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BitRpcUtility.java
@@ -103,6 +103,24 @@ void prepareSaslHandshake(final RpcConnectionHandler<CC> 
connectionHandler, List
     }
   }
 
+  /**
+   * Verifies if local and remote Drillbit Endpoint has same control server by 
using address and control port
+   * information. This method is used instead of equals in {@link 
DrillbitEndpoint} because DrillbitEndpoint stores
+   * state information in it.
+   * For local Drillbit a reference is stored in {@link 
org.apache.drill.exec.server.DrillbitContext} as soon as
+   * Drillbit is started in {@link 
org.apache.drill.exec.service.ServiceEngine#start} with state as STARTUP, but
+   * while planning minor fragment the assignment list is used from active 
list of Drillbits in which state for local
+   * Drillbit will not be STARTUP
+   * @param local - DrillbitEndpoint instance for local bit
+   * @param remote - DrillbitEndpoint instance for remote bit
+   * @return true if address and control port for local and remote are same.
+   *         false - otherwise
+   */
+  public static boolean isLocalControlServer(DrillbitEndpoint local, 
DrillbitEndpoint remote) {
+    return local.hasAddress() && local.hasControlPort() && remote.hasAddress() 
&& remote.hasControlPort() &&
+      local.getAddress().equals(remote.getAddress()) && local.getControlPort() 
== remote.getControlPort();
+  }
+
   // Suppress default constructor
   private BitRpcUtility() {
   }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ConnectionManagerRegistry.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ConnectionManagerRegistry.java
index 800cf3cc85..e27729c1f4 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ConnectionManagerRegistry.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ConnectionManagerRegistry.java
@@ -23,6 +23,7 @@
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 
 import com.google.common.collect.Maps;
+import org.apache.drill.exec.rpc.BitRpcUtility;
 
 public class ConnectionManagerRegistry implements 
Iterable<ControlConnectionManager> {
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(ConnectionManagerRegistry.class);
@@ -40,9 +41,12 @@ public ConnectionManagerRegistry(ControlConnectionConfig 
config) {
   public ControlConnectionManager getConnectionManager(DrillbitEndpoint 
remoteEndpoint) {
     assert localEndpoint != null :
         "DrillbitEndpoint must be set before a connection manager can be 
retrieved";
+
+    final boolean isLocalServer = 
BitRpcUtility.isLocalControlServer(localEndpoint, remoteEndpoint);
     ControlConnectionManager m = registry.get(remoteEndpoint);
     if (m == null) {
-      m = new ControlConnectionManager(config, localEndpoint, remoteEndpoint);
+      m = (isLocalServer) ? new LocalControlConnectionManager(config, 
remoteEndpoint)
+                    : new RemoteControlConnectionManager(config, 
localEndpoint, remoteEndpoint);
       final ControlConnectionManager m2 = registry.putIfAbsent(remoteEndpoint, 
m);
       if (m2 != null) {
         m = m2;
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnectionConfig.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnectionConfig.java
index b19fb8bb75..f114af44de 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnectionConfig.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnectionConfig.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.rpc.control;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.drill.exec.exception.DrillbitStartupException;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.rpc.BitConnectionConfig;
@@ -24,8 +25,8 @@
 import org.apache.drill.exec.work.batch.ControlMessageHandler;
 
 // config for bit to bit connection
-// package private
-class ControlConnectionConfig extends BitConnectionConfig {
+@VisibleForTesting
+public class ControlConnectionConfig extends BitConnectionConfig {
 //  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(ControlConnectionConfig.class);
 
   private final ControlMessageHandler handler;
@@ -41,8 +42,8 @@ public String getName() {
     return "control"; // unused
   }
 
-  ControlMessageHandler getMessageHandler() {
+  @VisibleForTesting
+  public ControlMessageHandler getMessageHandler() {
     return handler;
   }
-
 }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnectionManager.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnectionManager.java
index 6bfcbd5d6e..240421ef70 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnectionManager.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnectionManager.java
@@ -25,14 +25,10 @@
 /**
  * Maintains connection between two particular bits.
  */
-public class ControlConnectionManager extends 
ReconnectingConnection<ControlConnection, BitControlHandshake>{
+public abstract class ControlConnectionManager extends 
ReconnectingConnection<ControlConnection, BitControlHandshake>{
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(ControlConnectionManager.class);
 
-  private final ControlConnectionConfig config;
-  private final DrillbitEndpoint remoteEndpoint;
-
-  public ControlConnectionManager(ControlConnectionConfig config, 
DrillbitEndpoint localEndpoint,
-                                  DrillbitEndpoint remoteEndpoint) {
+  public ControlConnectionManager(DrillbitEndpoint localEndpoint, 
DrillbitEndpoint remoteEndpoint) {
     super(
         BitControlHandshake.newBuilder()
             .setRpcVersion(ControlRpcConfig.RPC_VERSION)
@@ -40,14 +36,8 @@ public ControlConnectionManager(ControlConnectionConfig 
config, DrillbitEndpoint
             .build(),
         remoteEndpoint.getAddress(),
         remoteEndpoint.getControlPort());
-
-    this.config = config;
-    this.remoteEndpoint = remoteEndpoint;
   }
 
   @Override
-  protected BasicClient<?, ControlConnection, BitControlHandshake, ?> 
getNewClient() {
-    return new ControlClient(config, remoteEndpoint, new 
CloseHandlerCreator());
-  }
-
+  protected abstract BasicClient<?, ControlConnection, BitControlHandshake, ?> 
getNewClient();
 }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java
index 1a4af9054f..492d4de625 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java
@@ -17,11 +17,20 @@
  */
 package org.apache.drill.exec.rpc.control;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonDeserializer;
+import com.fasterxml.jackson.databind.JsonSerializer;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectReader;
+import com.fasterxml.jackson.databind.ObjectWriter;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.Message;
+import com.google.protobuf.MessageLite;
+import com.google.protobuf.Parser;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.DrillBuf;
-
-import java.util.concurrent.TimeUnit;
-
 import org.apache.drill.exec.proto.BitControl.CustomMessage;
 import org.apache.drill.exec.proto.BitControl.FinishedReceiver;
 import org.apache.drill.exec.proto.BitControl.FragmentStatus;
@@ -38,19 +47,7 @@
 import org.apache.drill.exec.rpc.RpcOutcomeListener;
 import org.apache.drill.exec.rpc.control.Controller.CustomSerDe;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.JsonDeserializer;
-import com.fasterxml.jackson.databind.JsonSerializer;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.ObjectReader;
-import com.fasterxml.jackson.databind.ObjectWriter;
-import com.fasterxml.jackson.databind.module.SimpleModule;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.InvalidProtocolBufferException;
-import com.google.protobuf.Message;
-import com.google.protobuf.MessageLite;
-import com.google.protobuf.Parser;
-
+import java.util.concurrent.TimeUnit;
 
 public class ControlTunnel {
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(ControlTunnel.class);
@@ -99,8 +96,7 @@ public void informReceiverFinished(RpcOutcomeListener<Ack> 
outcomeListener, Fini
     return b.getFuture();
   }
 
-
-  public static class SendFragmentStatus extends FutureBitCommand<Ack, 
ControlConnection> {
+  public static class SendFragmentStatus extends FutureBitCommand<Ack, 
ControlConnection, RpcType, FragmentStatus> {
     final FragmentStatus status;
 
     public SendFragmentStatus(FragmentStatus status) {
@@ -110,13 +106,22 @@ public SendFragmentStatus(FragmentStatus status) {
 
     @Override
     public void doRpcCall(RpcOutcomeListener<Ack> outcomeListener, 
ControlConnection connection) {
-      connection.sendUnsafe(outcomeListener, RpcType.REQ_FRAGMENT_STATUS, 
status, Ack.class);
+      connection.sendUnsafe(outcomeListener, getRpcType(), status, Ack.class);
     }
 
-  }
+    @Override
+    public RpcType getRpcType() {
+      return RpcType.REQ_FRAGMENT_STATUS;
+    }
 
+    @Override
+    public FragmentStatus getMessage() {
+      return status;
+    }
 
-  public static class ReceiverFinished extends ListeningCommand<Ack, 
ControlConnection> {
+  }
+
+  public static class ReceiverFinished extends ListeningCommand<Ack, 
ControlConnection, RpcType, FinishedReceiver> {
     final FinishedReceiver finishedReceiver;
 
     public ReceiverFinished(RpcOutcomeListener<Ack> listener, FinishedReceiver 
finishedReceiver) {
@@ -126,11 +131,21 @@ public ReceiverFinished(RpcOutcomeListener<Ack> listener, 
FinishedReceiver finis
 
     @Override
     public void doRpcCall(RpcOutcomeListener<Ack> outcomeListener, 
ControlConnection connection) {
-      connection.send(outcomeListener, RpcType.REQ_RECEIVER_FINISHED, 
finishedReceiver, Ack.class);
+      connection.send(outcomeListener, getRpcType(), finishedReceiver, 
Ack.class);
+    }
+
+    @Override
+    public RpcType getRpcType() {
+      return RpcType.REQ_RECEIVER_FINISHED;
+    }
+
+    @Override
+    public FinishedReceiver getMessage() {
+      return finishedReceiver;
     }
   }
 
-  public static class SignalFragment extends ListeningCommand<Ack, 
ControlConnection> {
+  public static class SignalFragment extends ListeningCommand<Ack, 
ControlConnection, RpcType, FragmentHandle> {
     final FragmentHandle handle;
     final RpcType type;
 
@@ -145,9 +160,18 @@ public void doRpcCall(RpcOutcomeListener<Ack> 
outcomeListener, ControlConnection
       connection.sendUnsafe(outcomeListener, type, handle, Ack.class);
     }
 
+    @Override
+    public RpcType getRpcType() {
+      return type;
+    }
+
+    @Override
+    public FragmentHandle getMessage() {
+      return handle;
+    }
   }
 
-  public static class SendFragment extends ListeningCommand<Ack, 
ControlConnection> {
+  public static class SendFragment extends ListeningCommand<Ack, 
ControlConnection, RpcType, InitializeFragments> {
     final InitializeFragments fragments;
 
     public SendFragment(RpcOutcomeListener<Ack> listener, InitializeFragments 
fragments) {
@@ -157,12 +181,21 @@ public SendFragment(RpcOutcomeListener<Ack> listener, 
InitializeFragments fragme
 
     @Override
     public void doRpcCall(RpcOutcomeListener<Ack> outcomeListener, 
ControlConnection connection) {
-      connection.send(outcomeListener, RpcType.REQ_INITIALIZE_FRAGMENTS, 
fragments, Ack.class);
+      connection.send(outcomeListener, getRpcType(), fragments, Ack.class);
+    }
+
+    @Override
+    public RpcType getRpcType() {
+      return RpcType.REQ_INITIALIZE_FRAGMENTS;
     }
 
+    @Override
+    public InitializeFragments getMessage() {
+      return fragments;
+    }
   }
 
-  public static class RequestProfile extends FutureBitCommand<QueryProfile, 
ControlConnection> {
+  public static class RequestProfile extends FutureBitCommand<QueryProfile, 
ControlConnection, RpcType, QueryId> {
     final QueryId queryId;
 
     public RequestProfile(QueryId queryId) {
@@ -172,11 +205,21 @@ public RequestProfile(QueryId queryId) {
 
     @Override
     public void doRpcCall(RpcOutcomeListener<QueryProfile> outcomeListener, 
ControlConnection connection) {
-      connection.send(outcomeListener, RpcType.REQ_QUERY_STATUS, queryId, 
QueryProfile.class);
+      connection.send(outcomeListener, getRpcType(), queryId, 
QueryProfile.class);
+    }
+
+    @Override
+    public RpcType getRpcType() {
+      return RpcType.REQ_QUERY_STATUS;
+    }
+
+    @Override
+    public QueryId getMessage() {
+      return queryId;
     }
   }
 
-  public static class CancelQuery extends FutureBitCommand<Ack, 
ControlConnection> {
+  public static class CancelQuery extends FutureBitCommand<Ack, 
ControlConnection, RpcType, QueryId> {
     final QueryId queryId;
 
     public CancelQuery(QueryId queryId) {
@@ -186,7 +229,17 @@ public CancelQuery(QueryId queryId) {
 
     @Override
     public void doRpcCall(RpcOutcomeListener<Ack> outcomeListener, 
ControlConnection connection) {
-      connection.send(outcomeListener, RpcType.REQ_QUERY_CANCEL, queryId, 
Ack.class);
+      connection.send(outcomeListener, getRpcType(), queryId, Ack.class);
+    }
+
+    @Override
+    public RpcType getRpcType() {
+      return RpcType.REQ_QUERY_CANCEL;
+    }
+
+    @Override
+    public QueryId getMessage() {
+      return queryId;
     }
   }
 
@@ -204,8 +257,8 @@ public void doRpcCall(RpcOutcomeListener<Ack> 
outcomeListener, ControlConnection
     return new CustomTunnel<SEND, RECEIVE>(messageTypeId, send, receive);
   }
 
-
-  private static class CustomMessageSender extends 
ListeningCommand<CustomMessage, ControlConnection> {
+  public static class CustomMessageSender extends
+    ListeningCommand<CustomMessage, ControlConnection, RpcType, CustomMessage> 
{
 
     private CustomMessage message;
     private ByteBuf[] dataBodies;
@@ -218,12 +271,27 @@ public 
CustomMessageSender(RpcOutcomeListener<CustomMessage> listener, CustomMes
 
     @Override
     public void doRpcCall(RpcOutcomeListener<CustomMessage> outcomeListener, 
ControlConnection connection) {
-      connection.send(outcomeListener, RpcType.REQ_CUSTOM, message, 
CustomMessage.class, dataBodies);
+      connection.send(outcomeListener, getRpcType(), message, 
CustomMessage.class, dataBodies);
+    }
+
+    @Override
+    public RpcType getRpcType() {
+      return RpcType.REQ_CUSTOM;
+    }
+
+    @Override
+    public CustomMessage getMessage() {
+      return message;
+    }
+
+    public ByteBuf[] getDataBodies() {
+      return dataBodies;
     }
 
   }
 
-  private static class SyncCustomMessageSender extends 
FutureBitCommand<CustomMessage, ControlConnection> {
+  public static class SyncCustomMessageSender extends
+    FutureBitCommand<CustomMessage, ControlConnection, RpcType, CustomMessage> 
{
 
     private CustomMessage message;
     private ByteBuf[] dataBodies;
@@ -236,7 +304,21 @@ public SyncCustomMessageSender(CustomMessage message, 
ByteBuf[] dataBodies) {
 
     @Override
     public void doRpcCall(RpcOutcomeListener<CustomMessage> outcomeListener, 
ControlConnection connection) {
-      connection.send(outcomeListener, RpcType.REQ_CUSTOM, message, 
CustomMessage.class, dataBodies);
+      connection.send(outcomeListener, getRpcType(), message, 
CustomMessage.class, dataBodies);
+    }
+
+    @Override
+    public RpcType getRpcType() {
+      return RpcType.REQ_CUSTOM;
+    }
+
+    @Override
+    public CustomMessage getMessage() {
+      return message;
+    }
+
+    public ByteBuf[] getDataBodies() {
+      return dataBodies;
     }
   }
 
@@ -261,8 +343,7 @@ public RECEIVE get() throws Exception {
       return serde.deserializeReceived(message.getMessage().toByteArray());
     }
 
-    public RECEIVE get(long timeout, TimeUnit unit) throws Exception,
-        InvalidProtocolBufferException {
+    public RECEIVE get(long timeout, TimeUnit unit) throws Exception, 
InvalidProtocolBufferException {
       CustomMessage message = future.checkedGet(timeout, unit);
       return serde.deserializeReceived(message.getMessage().toByteArray());
     }
@@ -270,7 +351,6 @@ public RECEIVE get(long timeout, TimeUnit unit) throws 
Exception,
     public DrillBuf getBuffer() throws RpcException {
       return (DrillBuf) future.getBuffer();
     }
-
   }
 
 
@@ -351,21 +431,15 @@ public void success(CustomMessage value, ByteBuf buffer) {
         } catch (Exception e) {
           innerListener.failed(new RpcException("Failure while parsing message 
locally.", e));
         }
-
       }
 
       @Override
       public void interrupted(InterruptedException e) {
         innerListener.interrupted(e);
       }
-
     }
-
   }
 
-
-
-
   public static class ProtoSerDe<MSG extends MessageLite> implements 
CustomSerDe<MSG> {
     private final Parser<MSG> parser;
 
@@ -420,7 +494,5 @@ public JacksonSerDe(Class<MSG> clazz, JsonSerializer<MSG> 
serializer, JsonDeseri
     public MSG deserializeReceived(byte[] bytes) throws Exception {
       return (MSG) reader.readValue(bytes);
     }
-
   }
-
 }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/LocalControlConnectionManager.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/LocalControlConnectionManager.java
new file mode 100644
index 0000000000..fa6a2e9a55
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/LocalControlConnectionManager.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.rpc.control;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.exec.exception.OutOfMemoryException;
+import org.apache.drill.exec.proto.BitControl;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
+import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.rpc.BasicClient;
+import org.apache.drill.exec.rpc.Response;
+import org.apache.drill.exec.rpc.RpcCommand;
+import org.apache.drill.exec.rpc.RpcConstants;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.RpcOutcomeListener;
+import org.apache.drill.exec.work.batch.ControlMessageHandler;
+
+public class LocalControlConnectionManager extends ControlConnectionManager {
+  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(LocalControlConnectionManager.class);
+
+  private final ControlConnectionConfig config;
+
+  public LocalControlConnectionManager(ControlConnectionConfig config, 
DrillbitEndpoint localEndpoint) {
+    super(localEndpoint, localEndpoint);
+    this.config = config;
+  }
+
+  @Override
+  protected BasicClient<?, ControlConnection, BitControl.BitControlHandshake, 
?> getNewClient() {
+    throw new UnsupportedOperationException("LocalControlConnectionManager 
doesn't support creating a control client");
+  }
+
+  @Override
+  public void runCommand(RpcCommand cmd) {
+    final int rpcType = cmd.getRpcType().getNumber();
+    final ControlMessageHandler messageHandler = config.getMessageHandler();
+
+    if (RpcConstants.EXTRA_DEBUGGING) {
+      logger.debug("Received bit com message of type {} over local connection 
manager", rpcType);
+    }
+
+    switch (rpcType) {
+
+      case BitControl.RpcType.REQ_CANCEL_FRAGMENT_VALUE: {
+        final ControlTunnel.SignalFragment signalFragment = 
((ControlTunnel.SignalFragment) cmd);
+        final RpcOutcomeListener<Ack> outcomeListener = 
signalFragment.getOutcomeListener();
+        final Ack ackResponse = 
messageHandler.cancelFragment(signalFragment.getMessage());
+        outcomeListener.success(ackResponse, null);
+        break;
+      }
+
+      case BitControl.RpcType.REQ_CUSTOM_VALUE: {
+        final ByteBuf[] dataBodies;
+        final RpcOutcomeListener<BitControl.CustomMessage> outcomeListener;
+
+        if (cmd instanceof ControlTunnel.CustomMessageSender) {
+          dataBodies = 
((ControlTunnel.CustomMessageSender)cmd).getDataBodies();
+          outcomeListener = 
((ControlTunnel.CustomMessageSender)cmd).getOutcomeListener();
+        } else if (cmd instanceof ControlTunnel.SyncCustomMessageSender) {
+          dataBodies = 
((ControlTunnel.SyncCustomMessageSender)cmd).getDataBodies();
+          outcomeListener = 
((ControlTunnel.SyncCustomMessageSender)cmd).getOutcomeListener();
+        } else {
+          throw new UnsupportedOperationException("Unknown Custom Type control 
message received");
+        }
+
+        DrillBuf reqDrillBuff;
+        try {
+          reqDrillBuff = convertToByteBuf(dataBodies);
+        } catch (Exception ex) {
+          outcomeListener.failed(new RpcException("Failed to allocate memory 
while sending request in " +
+            "LocalControlConnectionManager#convertToByteBuff", ex));
+          return;
+        } finally {
+          releaseByteBuf(dataBodies);
+        }
+
+        try {
+          BitControl.CustomMessage message = (BitControl.CustomMessage) 
cmd.getMessage();
+          final Response response = 
messageHandler.getHandlerRegistry().handle(message, reqDrillBuff);
+          DrillBuf responseBuffer;
+          try {
+            responseBuffer = convertToByteBuf(response.dBodies);
+          } catch (Exception ex) {
+            outcomeListener.failed(new RpcException("Failed to allocate memory 
while sending response in " +
+              "LocalControlConnectionManager#convertToByteBuff", ex));
+            return;
+          } finally {
+            releaseByteBuf(response.dBodies);
+          }
+
+          // Passed responseBuffer will be owned by consumer
+          outcomeListener.success((BitControl.CustomMessage)response.pBody, 
responseBuffer);
+        } catch (RpcException ex) {
+          cmd.getOutcomeListener().failed(ex);
+        } finally {
+          // Release the reqDrillBuff passed into handler
+          releaseByteBuf(reqDrillBuff);
+        }
+        break;
+      }
+
+      case BitControl.RpcType.REQ_RECEIVER_FINISHED_VALUE: {
+        final ControlTunnel.ReceiverFinished receiverFinished = 
((ControlTunnel.ReceiverFinished) cmd);
+        final RpcOutcomeListener<Ack> outcomeListener = 
receiverFinished.getOutcomeListener();
+        final Ack ackResponse = 
messageHandler.receivingFragmentFinished(receiverFinished.getMessage());
+        outcomeListener.success(ackResponse, null);
+        break;
+      }
+
+      case BitControl.RpcType.REQ_FRAGMENT_STATUS_VALUE: {
+        final ControlTunnel.SendFragmentStatus fragmentStatus = 
((ControlTunnel.SendFragmentStatus) cmd);
+        final RpcOutcomeListener<Ack> outcomeListener = 
fragmentStatus.getOutcomeListener();
+        final Ack ackResponse = 
messageHandler.requestFragmentStatus(fragmentStatus.getMessage());
+        outcomeListener.success(ackResponse, null);
+        break;
+      }
+
+      case BitControl.RpcType.REQ_QUERY_CANCEL_VALUE: {
+        final ControlTunnel.CancelQuery cancelQuery = 
((ControlTunnel.CancelQuery) cmd);
+        final RpcOutcomeListener<Ack> outcomeListener = 
cancelQuery.getOutcomeListener();
+        final Ack ackResponse = 
messageHandler.requestQueryCancel(cancelQuery.getMessage());
+        outcomeListener.success(ackResponse, null);
+        break;
+      }
+
+      case BitControl.RpcType.REQ_INITIALIZE_FRAGMENTS_VALUE: {
+        final ControlTunnel.SendFragment sendFragment = 
((ControlTunnel.SendFragment) cmd);
+        final RpcOutcomeListener<Ack> outcomeListener = 
sendFragment.getOutcomeListener();
+
+        try {
+          final Ack ackResponse = 
messageHandler.initializeFragment(sendFragment.getMessage());
+          outcomeListener.success(ackResponse, null);
+        } catch (RpcException ex) {
+          outcomeListener.failed(ex);
+        }
+        break;
+      }
+
+      case BitControl.RpcType.REQ_QUERY_STATUS_VALUE: {
+        final ControlTunnel.RequestProfile requestProfile = 
((ControlTunnel.RequestProfile) cmd);
+        final RpcOutcomeListener<UserBitShared.QueryProfile> outcomeListener = 
requestProfile.getOutcomeListener();
+
+        try {
+          final UserBitShared.QueryProfile profile = 
messageHandler.requestQueryStatus(requestProfile.getMessage());
+          outcomeListener.success(profile, null);
+        } catch (RpcException ex) {
+          outcomeListener.failed(ex);
+        }
+        break;
+      }
+
+      case BitControl.RpcType.REQ_UNPAUSE_FRAGMENT_VALUE: {
+        final ControlTunnel.SignalFragment signalFragment = 
((ControlTunnel.SignalFragment) cmd);
+        final RpcOutcomeListener<Ack> outcomeListener = 
signalFragment.getOutcomeListener();
+        final Ack ackResponse = 
messageHandler.resumeFragment(signalFragment.getMessage());
+        outcomeListener.success(ackResponse, null);
+        break;
+      }
+
+      default:
+        final RpcException rpcException = new 
RpcException(String.format("Unsupported control request type %s " +
+          "received on LocalControlConnectionManager", rpcType));
+        cmd.getOutcomeListener().failed(rpcException);
+    }
+  }
+
+  /**
+   * Copies ByteBuf in the input array into a single DrillBuf
+   * @param byteBuffArray - input array of ByteBuf's
+   * @return DrillBuf - output Drillbuf with all the input bytes.
+   * @throws OutOfMemoryException
+   */
+  private DrillBuf convertToByteBuf(ByteBuf[] byteBuffArray) throws 
OutOfMemoryException {
+
+    if (byteBuffArray == null) {
+      return null;
+    }
+
+    int bytesToCopy = 0;
+    for (ByteBuf b : byteBuffArray) {
+      final int validBytes = b.readableBytes();
+      if (0 == validBytes) {
+        b.release();
+      } else {
+        bytesToCopy += validBytes;
+      }
+    }
+    final DrillBuf drillBuff = config.getAllocator().buffer(bytesToCopy);
+
+    for (ByteBuf b : byteBuffArray) {
+      final int validBytes = b.readableBytes();
+      drillBuff.writeBytes(b, 0, validBytes);
+    }
+
+    return drillBuff;
+  }
+
+  /**
+   * Releases all the ByteBuf inside the input ByteBuf array
+   * @param byteBuffArray - input array of ByteBuf's
+   */
+  private void releaseByteBuf(ByteBuf[] byteBuffArray) {
+    if (byteBuffArray != null) {
+      for (ByteBuf b : byteBuffArray) {
+        b.release();
+      }
+    }
+  }
+
+  /**
+   * Releases the input ByteBuf
+   * @param byteBuf - input ByteBuf
+   */
+  private void releaseByteBuf(ByteBuf byteBuf) {
+    if (byteBuf != null) {
+      byteBuf.release();
+    }
+  }
+}
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/RemoteControlConnectionManager.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/RemoteControlConnectionManager.java
new file mode 100644
index 0000000000..6a4dc21b90
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/RemoteControlConnectionManager.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.rpc.control;
+
+import org.apache.drill.exec.proto.BitControl;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.rpc.BasicClient;
+
+public class RemoteControlConnectionManager extends ControlConnectionManager {
+  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(RemoteControlConnectionManager.class);
+
+  private final ControlConnectionConfig config;
+  private final DrillbitEndpoint remoteEndpoint;
+
+  public RemoteControlConnectionManager(ControlConnectionConfig config, 
DrillbitEndpoint
+    localEndpoint, DrillbitEndpoint remoteEndpoint) {
+    super(localEndpoint, remoteEndpoint);
+    this.config = config;
+    this.remoteEndpoint = remoteEndpoint;
+  }
+
+  @Override
+  protected BasicClient<?, ControlConnection, BitControl.BitControlHandshake, 
?> getNewClient() {
+    return new ControlClient(config, remoteEndpoint, new 
CloseHandlerCreator());
+  }
+}
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java
index 7e286fa320..7b05b81e3b 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.rpc.data;
 
+import com.google.protobuf.MessageLite;
 import io.netty.buffer.ByteBuf;
 
 import java.util.concurrent.Semaphore;
@@ -43,7 +44,6 @@
   private ExecutionControls testControls;
   private org.slf4j.Logger testLogger;
 
-
   public DataTunnel(DataConnectionManager manager) {
     this.manager = manager;
   }
@@ -69,7 +69,7 @@ public void setTestInjectionControls(final ControlsInjector 
testInjector,
 
   public void sendRecordBatch(RpcOutcomeListener<Ack> outcomeListener, 
FragmentWritableBatch batch) {
     SendBatchAsyncListen b = new SendBatchAsyncListen(outcomeListener, batch);
-    try{
+    try {
       if (isInjectionControlSet) {
         // Wait for interruption if set. Used to simulate the fragment 
interruption while the fragment is waiting for
         // semaphore acquire. We expect the
@@ -78,9 +78,9 @@ public void sendRecordBatch(RpcOutcomeListener<Ack> 
outcomeListener, FragmentWri
 
       sendingSemaphore.acquire();
       manager.runCommand(b);
-    }catch(final InterruptedException e){
+    } catch (final InterruptedException e) {
       // Release the buffers first before informing the listener about the 
interrupt.
-      for(ByteBuf buffer : batch.getBuffers()) {
+      for (ByteBuf buffer : batch.getBuffers()) {
         buffer.release();
       }
 
@@ -119,7 +119,7 @@ public void interrupted(InterruptedException e) {
     }
   }
 
-  private class SendBatchAsyncListen extends ListeningCommand<Ack, 
DataClientConnection> {
+  private class SendBatchAsyncListen extends ListeningCommand<Ack, 
DataClientConnection, RpcType, MessageLite> {
     final FragmentWritableBatch batch;
 
     public SendBatchAsyncListen(RpcOutcomeListener<Ack> listener, 
FragmentWritableBatch batch) {
@@ -129,7 +129,18 @@ public SendBatchAsyncListen(RpcOutcomeListener<Ack> 
listener, FragmentWritableBa
 
     @Override
     public void doRpcCall(RpcOutcomeListener<Ack> outcomeListener, 
DataClientConnection connection) {
-      connection.send(new ThrottlingOutcomeListener(outcomeListener), 
RpcType.REQ_RECORD_BATCH, batch.getHeader(), Ack.class, batch.getBuffers());
+      connection.send(new ThrottlingOutcomeListener(outcomeListener), 
getRpcType(), batch.getHeader(),
+        Ack.class, batch.getBuffers());
+    }
+
+    @Override
+    public RpcType getRpcType() {
+      return RpcType.REQ_RECORD_BATCH;
+    }
+
+    @Override
+    public MessageLite getMessage() {
+      return batch.getHeader();
     }
 
     @Override
@@ -139,7 +150,7 @@ public String toString() {
 
     @Override
     public void connectionFailed(FailureType type, Throwable t) {
-      for(ByteBuf buffer : batch.getBuffers()) {
+      for (ByteBuf buffer : batch.getBuffers()) {
         buffer.release();
       }
       super.connectionFailed(type, t);
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java
index e562b167a2..963f53a9d5 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java
@@ -91,14 +91,16 @@ public void handle(ControlConnection connection, int 
rpcType, ByteBuf pBody, Byt
     }
 
     case RpcType.REQ_FRAGMENT_STATUS_VALUE:
-      bee.getContext().getWorkBus().statusUpdate( get(pBody, 
FragmentStatus.PARSER));
+      final FragmentStatus status = get(pBody, FragmentStatus.PARSER);
+      requestFragmentStatus(status);
       // TODO: Support a type of message that has no response.
       sender.send(ControlRpcConfig.OK);
       break;
 
     case RpcType.REQ_QUERY_CANCEL_VALUE: {
       final QueryId queryId = get(pBody, QueryId.PARSER);
-      if (bee.cancelForeman(queryId, null)) {
+      final Ack cancelStatus = requestQueryCancel(queryId);
+      if (cancelStatus.getOk()) {
         sender.send(ControlRpcConfig.OK);
       } else {
         sender.send(ControlRpcConfig.FAIL);
@@ -108,21 +110,14 @@ public void handle(ControlConnection connection, int 
rpcType, ByteBuf pBody, Byt
 
     case RpcType.REQ_INITIALIZE_FRAGMENTS_VALUE: {
       final InitializeFragments fragments = get(pBody, 
InitializeFragments.PARSER);
-      final DrillbitContext drillbitContext = bee.getContext();
-      for(int i = 0; i < fragments.getFragmentCount(); i++) {
-        startNewFragment(fragments.getFragment(i), drillbitContext);
-      }
+      initializeFragment(fragments);
       sender.send(ControlRpcConfig.OK);
       break;
     }
 
     case RpcType.REQ_QUERY_STATUS_VALUE: {
       final QueryId queryId = get(pBody, QueryId.PARSER);
-      final Foreman foreman = bee.getForemanForQueryId(queryId);
-      if (foreman == null) {
-        throw new RpcException("Query not running on node.");
-      }
-      final QueryProfile profile = foreman.getQueryManager().getQueryProfile();
+      final QueryProfile profile = requestQueryStatus(queryId);
       sender.send(new Response(RpcType.RESP_QUERY_STATUS, profile));
       break;
     }
@@ -145,7 +140,7 @@ public void handle(ControlConnection connection, int 
rpcType, ByteBuf pBody, Byt
    * @param fragment
    * @throws UserRpcException
    */
-  private void startNewFragment(final PlanFragment fragment, final 
DrillbitContext drillbitContext)
+  public void startNewFragment(final PlanFragment fragment, final 
DrillbitContext drillbitContext)
       throws UserRpcException {
     logger.debug("Received remote fragment start instruction", fragment);
 
@@ -182,7 +177,7 @@ private void startNewFragment(final PlanFragment fragment, 
final DrillbitContext
   /* (non-Javadoc)
    * @see 
org.apache.drill.exec.work.batch.BitComHandler#cancelFragment(org.apache.drill.exec.proto.ExecProtos.FragmentHandle)
    */
-  private Ack cancelFragment(final FragmentHandle handle) {
+  public Ack cancelFragment(final FragmentHandle handle) {
     /**
      * For case 1, see {@link 
org.apache.drill.exec.work.foreman.QueryManager#cancelExecutingFragments}.
      * In comments below, "active" refers to fragment states: SENDING, 
AWAITING_ALLOCATION, RUNNING and
@@ -213,7 +208,7 @@ private Ack cancelFragment(final FragmentHandle handle) {
     return Acks.OK;
   }
 
-  private Ack resumeFragment(final FragmentHandle handle) {
+  public Ack resumeFragment(final FragmentHandle handle) {
     // resume a pending fragment
     final FragmentManager manager = 
bee.getContext().getWorkBus().getFragmentManager(handle);
     if (manager != null) {
@@ -233,7 +228,7 @@ private Ack resumeFragment(final FragmentHandle handle) {
     return Acks.OK;
   }
 
-  private Ack receivingFragmentFinished(final FinishedReceiver 
finishedReceiver) {
+  public Ack receivingFragmentFinished(final FinishedReceiver 
finishedReceiver) {
 
     final FragmentManager manager = 
bee.getContext().getWorkBus().getFragmentManager(finishedReceiver.getSender());
 
@@ -254,6 +249,32 @@ private Ack receivingFragmentFinished(final 
FinishedReceiver finishedReceiver) {
     return Acks.OK;
   }
 
+  public Ack requestFragmentStatus(FragmentStatus status) {
+    bee.getContext().getWorkBus().statusUpdate( status);
+    return Acks.OK;
+  }
+
+  public Ack requestQueryCancel(QueryId queryId) {
+    return bee.cancelForeman(queryId, null) ? Acks.OK : Acks.FAIL;
+  }
+
+  public Ack initializeFragment(InitializeFragments fragments) throws 
RpcException {
+    final DrillbitContext drillbitContext = bee.getContext();
+    for (int i = 0; i < fragments.getFragmentCount(); i++) {
+      startNewFragment(fragments.getFragment(i), drillbitContext);
+    }
+
+    return Acks.OK;
+  }
+
+  public QueryProfile requestQueryStatus(QueryId queryId) throws RpcException {
+    final Foreman foreman = bee.getForemanForQueryId(queryId);
+    if (foreman == null) {
+      throw new RpcException("Query not running on node.");
+    }
+    return foreman.getQueryManager().getQueryProfile();
+  }
+
   public CustomHandlerRegistry getHandlerRegistry() {
     return handlerRegistry;
   }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentsRunner.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentsRunner.java
index 3d051bbdc3..91b2a2a41f 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentsRunner.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentsRunner.java
@@ -44,11 +44,8 @@
 import org.apache.drill.exec.work.WorkManager.WorkerBee;
 import org.apache.drill.exec.work.fragment.FragmentExecutor;
 import org.apache.drill.exec.work.fragment.FragmentStatusReporter;
-import org.apache.drill.exec.work.fragment.NonRootFragmentManager;
 import org.apache.drill.exec.work.fragment.RootFragmentManager;
 
-
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.LinkedList;
 import java.util.List;
@@ -141,7 +138,6 @@ private void setupRootFragment(final PlanFragment 
rootFragment, final FragmentRo
     }
   }
 
-
   /**
    * Set up the non-root fragments for execution. Some may be local, and some 
may be remote.
    * Messages are sent immediately, so they may start returning data even 
before we complete this.
@@ -158,17 +154,11 @@ private void setupNonRootFragments(final 
Collection<PlanFragment> fragments) thr
      * executed there. We need to start up the intermediate fragments first so 
that they will be
      * ready once the leaf fragments start producing data. To satisfy both of 
these, we will
      * make a pass through the fragments and put them into the remote maps 
according to their
-     * leaf/intermediate state, as well as their target drillbit. Also filter 
the leaf/intermediate
-     * fragments which are assigned to run on local Drillbit node (or Foreman 
node) into separate lists.
-     *
-     * This will help to schedule local
+     * leaf/intermediate state, as well as their target drillbit.
      */
-    final Multimap<DrillbitEndpoint, PlanFragment> remoteLeafFragmentMap = 
ArrayListMultimap.create();
-    final List<PlanFragment> localLeafFragmentList = new ArrayList<>();
-    final Multimap<DrillbitEndpoint, PlanFragment> remoteIntFragmentMap = 
ArrayListMultimap.create();
-    final List<PlanFragment> localIntFragmentList = new ArrayList<>();
+    final Multimap<DrillbitEndpoint, PlanFragment> leafFragmentMap = 
ArrayListMultimap.create();
+    final Multimap<DrillbitEndpoint, PlanFragment> intFragmentMap = 
ArrayListMultimap.create();
 
-    final DrillbitEndpoint localDrillbitEndpoint = 
drillbitContext.getEndpoint();
     // record all fragments for status purposes.
     for (final PlanFragment planFragment : fragments) {
 
@@ -180,44 +170,38 @@ private void setupNonRootFragments(final 
Collection<PlanFragment> fragments) thr
       foreman.getQueryManager().addFragmentStatusTracker(planFragment, false);
 
       if (planFragment.getLeafFragment()) {
-        updateFragmentCollection(planFragment, localDrillbitEndpoint, 
localLeafFragmentList, remoteLeafFragmentMap);
+        leafFragmentMap.put(planFragment.getAssignment(), planFragment);
       } else {
-        updateFragmentCollection(planFragment, localDrillbitEndpoint, 
localIntFragmentList, remoteIntFragmentMap);
+        intFragmentMap.put(planFragment.getAssignment(), planFragment);
       }
     }
 
     /*
      * We need to wait for the intermediates to be sent so that they'll be set 
up by the time
-     * the leaves start producing data. We'll use this latch to wait for the 
responses.
+     * the leaves start producing data. We'll use this latch to wait for the 
responses. All the local intermediate
+     * fragments are submitted locally without creating any actual control 
connection to itself.
      *
      * However, in order not to hang the process if any of the RPC requests 
fails, we always
      * count down (see FragmentSubmitFailures), but we count the number of 
failures so that we'll
      * know if any submissions did fail.
      */
-    scheduleRemoteIntermediateFragments(remoteIntFragmentMap);
-
-    // Setup local intermediate fragments
-    for (final PlanFragment fragment : localIntFragmentList) {
-      startLocalFragment(fragment);
-    }
+    scheduleIntermediateFragments(intFragmentMap);
 
     injector.injectChecked(foreman.getQueryContext().getExecutionControls(), 
"send-fragments", ForemanException.class);
     /*
      * Send the remote (leaf) fragments; we don't wait for these. Any problems 
will come in through
-     * the regular sendListener event delivery.
+     * the regular sendListener event delivery˚. All the local leaf fragments 
are submitted locally without creating
+     * any actual control connection to itself.
      */
-    for (final DrillbitEndpoint ep : remoteLeafFragmentMap.keySet()) {
-      sendRemoteFragments(ep, remoteLeafFragmentMap.get(ep), null, null);
-    }
-
-    // Setup local leaf fragments
-    for (final PlanFragment fragment : localLeafFragmentList) {
-      startLocalFragment(fragment);
+    for (final DrillbitEndpoint ep : leafFragmentMap.keySet()) {
+      sendRemoteFragments(ep, leafFragmentMap.get(ep), null, null);
     }
   }
 
   /**
-   * Send all the remote fragments belonging to a single target drillbit in 
one request.
+   * Send all the remote fragments belonging to a single target drillbit in 
one request. If the assignment
+   * DrillbitEndpoint is local Drillbit then {@link 
Controller#getTunnel(DrillbitEndpoint)} takes care of submitting it
+   * locally without actually creating a Control Connection to itself.
    *
    * @param assignment the drillbit assigned to these fragments
    * @param fragments the set of fragments
@@ -241,41 +225,21 @@ private void sendRemoteFragments(final DrillbitEndpoint 
assignment, final Collec
   }
 
   /**
-   * Add planFragment into either of local fragment list or remote fragment 
map based on assigned Drillbit Endpoint node
-   * and the local Drillbit Endpoint.
+   * Send intermediate fragment to the assigned Drillbit node. If the 
assignment DrillbitEndpoint is local Drillbit
+   * then {@link Controller#getTunnel(DrillbitEndpoint)} takes care of 
submitting it locally without actually creating
+   * a Control Connection to itself. Throws {@link UserException} in case of 
failure to send the fragment.
    *
-   * @param planFragment plan fragment
-   * @param localEndPoint local endpoint
-   * @param localFragmentList local fragment list
-   * @param remoteFragmentMap remote fragment map
+   * @param intermediateFragmentMap - Map of Drillbit Endpoint to list of 
intermediate PlanFragment's
    */
-  private void updateFragmentCollection(final PlanFragment planFragment, final 
DrillbitEndpoint localEndPoint,
-                                        final List<PlanFragment> 
localFragmentList,
-                                        final Multimap<DrillbitEndpoint, 
PlanFragment> remoteFragmentMap) {
-    final DrillbitEndpoint assignedDrillbit = planFragment.getAssignment();
+  private void scheduleIntermediateFragments(final Multimap<DrillbitEndpoint, 
PlanFragment> intermediateFragmentMap) {
 
-    if (assignedDrillbit.equals(localEndPoint)) {
-      localFragmentList.add(planFragment);
-    } else {
-      remoteFragmentMap.put(assignedDrillbit, planFragment);
-    }
-  }
-
-  /**
-   * Send remote intermediate fragment to the assigned Drillbit node.
-   * Throw exception in case of failure to send the fragment.
-   *
-   * @param remoteFragmentMap - Map of Drillbit Endpoint to list of 
PlanFragment's
-   */
-  private void scheduleRemoteIntermediateFragments(final 
Multimap<DrillbitEndpoint, PlanFragment> remoteFragmentMap) {
-
-    final int numIntFragments = remoteFragmentMap.keySet().size();
+    final int numIntFragments = intermediateFragmentMap.keySet().size();
     final ExtendedLatch endpointLatch = new ExtendedLatch(numIntFragments);
     final FragmentSubmitFailures fragmentSubmitFailures = new 
FragmentSubmitFailures();
 
     // send remote intermediate fragments
-    for (final DrillbitEndpoint ep : remoteFragmentMap.keySet()) {
-      sendRemoteFragments(ep, remoteFragmentMap.get(ep), endpointLatch, 
fragmentSubmitFailures);
+    for (final DrillbitEndpoint ep : intermediateFragmentMap.keySet()) {
+      sendRemoteFragments(ep, intermediateFragmentMap.get(ep), endpointLatch, 
fragmentSubmitFailures);
     }
 
     final long timeout = 
drillbitContext.getOptionManager().getLong(ExecConstants.FRAG_RUNNER_RPC_TIMEOUT)
 * numIntFragments;
@@ -313,29 +277,6 @@ private void scheduleRemoteIntermediateFragments(final 
Multimap<DrillbitEndpoint
     }
   }
 
-
-  /**
-   * Start the locally assigned leaf or intermediate fragment
-   *
-   * @param fragment fragment
-   */
-  private void startLocalFragment(final PlanFragment fragment) throws 
ExecutionSetupException {
-    logger.debug("Received local fragment start instruction", fragment);
-
-    final FragmentContextImpl fragmentContext = new 
FragmentContextImpl(drillbitContext, fragment, 
drillbitContext.getFunctionImplementationRegistry());
-    final FragmentStatusReporter statusReporter = new 
FragmentStatusReporter(fragmentContext);
-    final FragmentExecutor fragmentExecutor = new 
FragmentExecutor(fragmentContext, fragment, statusReporter);
-
-    // we either need to start the fragment if it is a leaf fragment, or set 
up a fragment manager if it is non leaf.
-    if (fragment.getLeafFragment()) {
-      bee.addFragmentRunner(fragmentExecutor);
-    } else {
-      // isIntermediate, store for incoming data.
-      final NonRootFragmentManager manager = new 
NonRootFragmentManager(fragment, fragmentExecutor, statusReporter);
-      drillbitContext.getWorkBus().addFragmentManager(manager);
-    }
-  }
-
   /**
    * Used by {@link FragmentSubmitListener} to track the number of submission 
failures.
    */
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentStatusReporter.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentStatusReporter.java
index eb57658689..c0c5b04303 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentStatusReporter.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentStatusReporter.java
@@ -40,12 +40,9 @@
 
   protected final AtomicReference<DrillbitEndpoint> foremanDrillbit;
 
-  protected final DrillbitEndpoint localDrillbit;
-
   public FragmentStatusReporter(final ExecutorFragmentContext context) {
     this.context = context;
     this.foremanDrillbit = new AtomicReference<>(context.getForemanEndpoint());
-    this.localDrillbit = context.getEndpoint();
   }
 
   /**
@@ -119,14 +116,10 @@ void sendStatus(final FragmentStatus status) {
       return;
     }
 
-    if (localDrillbit.equals(foremanNode)) {
-      // Update the status locally
-      context.getWorkEventbus().statusUpdate(status);
-    } else {
-      // Send the status via Control Tunnel to remote foreman node
-      final ControlTunnel tunnel = 
context.getController().getTunnel(foremanNode);
-      tunnel.sendFragmentStatus(status);
-    }
+    // Send status for both local and remote foreman node via Tunnel. For 
local there won't be any network connection
+    // created and it will be submitted locally using 
LocalControlConnectionManager
+    final ControlTunnel tunnel = 
context.getController().getTunnel(foremanNode);
+    tunnel.sendFragmentStatus(status);
   }
 
   /**
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/control/ConnectionManagerRegistryTest.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/control/ConnectionManagerRegistryTest.java
new file mode 100644
index 0000000000..667e440618
--- /dev/null
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/control/ConnectionManagerRegistryTest.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.rpc.control;
+
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
+public class ConnectionManagerRegistryTest {
+
+  private static final DrillbitEndpoint localEndpoint = 
DrillbitEndpoint.newBuilder()
+    .setAddress("10.0.0.1")
+    .setControlPort(31012)
+    .setDataPort(31011)
+    .setUserPort(31010)
+    .setState(DrillbitEndpoint.State.STARTUP)
+    .build();
+
+  private static final DrillbitEndpoint foremanEndpoint = 
DrillbitEndpoint.newBuilder()
+    .setAddress("10.0.0.2")
+    .setControlPort(31012)
+    .setDataPort(31011)
+    .setUserPort(31010)
+    .setState(DrillbitEndpoint.State.STARTUP)
+    .build();
+
+  private static ControlConnectionConfig config;
+
+  @BeforeClass
+  public static void setup() {
+    config = mock(ControlConnectionConfig.class);
+  }
+
+  @Test
+  public void testLocalConnectionManager() {
+    final ConnectionManagerRegistry registry = new 
ConnectionManagerRegistry(config);
+    registry.setLocalEndpoint(localEndpoint);
+    final ControlConnectionManager manager = 
registry.getConnectionManager(localEndpoint);
+    assertTrue(registry.iterator().hasNext());
+    assertEquals(manager, registry.iterator().next());
+    assertTrue(manager instanceof LocalControlConnectionManager);
+  }
+
+  @Test
+  public void testLocalConnectionManager_differentState() {
+    final ConnectionManagerRegistry registry = new 
ConnectionManagerRegistry(config);
+    registry.setLocalEndpoint(localEndpoint);
+
+    final DrillbitEndpoint foremanEndpoint2 = 
localEndpoint.toBuilder().setState(DrillbitEndpoint.State.ONLINE).build();
+    final ControlConnectionManager manager = 
registry.getConnectionManager(foremanEndpoint2);
+    assertTrue(registry.iterator().hasNext());
+    assertEquals(manager, registry.iterator().next());
+    assertTrue(manager instanceof LocalControlConnectionManager);
+  }
+
+  @Test
+  public void testLocalConnectionManager_differentUserDataPort() {
+    final ConnectionManagerRegistry registry = new 
ConnectionManagerRegistry(config);
+    registry.setLocalEndpoint(localEndpoint);
+
+    final DrillbitEndpoint foremanEndpoint2 = localEndpoint.toBuilder()
+      .setState(DrillbitEndpoint.State.ONLINE)
+      .setUserPort(10000)
+      .setDataPort(11000)
+      .build();
+    final ControlConnectionManager manager = 
registry.getConnectionManager(foremanEndpoint2);
+    assertTrue(registry.iterator().hasNext());
+    assertEquals(manager, registry.iterator().next());
+    assertTrue(manager instanceof LocalControlConnectionManager);
+  }
+
+  @Test
+  public void testRemoteConnectionManager() {
+    final ConnectionManagerRegistry registry = new 
ConnectionManagerRegistry(config);
+    registry.setLocalEndpoint(localEndpoint);
+    final ControlConnectionManager manager = 
registry.getConnectionManager(foremanEndpoint);
+    assertTrue(registry.iterator().hasNext());
+    assertTrue(manager instanceof RemoteControlConnectionManager);
+  }
+
+  @Test
+  public void testRemoteConnectionManager_differentControlPort() {
+    final ConnectionManagerRegistry registry = new 
ConnectionManagerRegistry(config);
+    registry.setLocalEndpoint(localEndpoint);
+
+    final DrillbitEndpoint foremanEndpoint2 = localEndpoint.toBuilder()
+      .setControlPort(10000)
+      .build();
+    final ControlConnectionManager manager = 
registry.getConnectionManager(foremanEndpoint2);
+    assertTrue(registry.iterator().hasNext());
+    assertEquals(manager, registry.iterator().next());
+    assertTrue(manager instanceof RemoteControlConnectionManager);
+  }
+
+  @Test
+  public void testRemoteConnectionManager_differentAddress() {
+    final ConnectionManagerRegistry registry = new 
ConnectionManagerRegistry(config);
+    registry.setLocalEndpoint(localEndpoint);
+
+    final DrillbitEndpoint foremanEndpoint2 = localEndpoint.toBuilder()
+      .setAddress("10.0.0.0")
+      .build();
+    final ControlConnectionManager manager = 
registry.getConnectionManager(foremanEndpoint2);
+    assertTrue(registry.iterator().hasNext());
+    assertEquals(manager, registry.iterator().next());
+    assertTrue(manager instanceof RemoteControlConnectionManager);
+  }
+
+  @Test
+  public void testRemoteAndLocalConnectionManager() {
+    final ConnectionManagerRegistry registry = new 
ConnectionManagerRegistry(config);
+    registry.setLocalEndpoint(localEndpoint);
+
+    final DrillbitEndpoint foremanEndpoint2 = localEndpoint.toBuilder()
+      .setAddress("10.0.0.0")
+      .build();
+
+    final ControlConnectionManager remoteManager = 
registry.getConnectionManager(foremanEndpoint2);
+    final ControlConnectionManager localManager = 
registry.getConnectionManager(localEndpoint);
+
+    final ControlConnectionManager remoteManager_2 = 
registry.getConnectionManager(foremanEndpoint2);
+    final ControlConnectionManager localManager_2 = 
registry.getConnectionManager(localEndpoint);
+
+    assertTrue(remoteManager instanceof RemoteControlConnectionManager);
+    assertEquals(remoteManager, remoteManager_2);
+
+    assertTrue(localManager instanceof LocalControlConnectionManager);
+    assertEquals(localManager, localManager_2);
+  }
+}
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/control/TestLocalControlConnectionManager.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/control/TestLocalControlConnectionManager.java
new file mode 100644
index 0000000000..c0cf09be6b
--- /dev/null
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/control/TestLocalControlConnectionManager.java
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.rpc.control;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.drill.exec.proto.BitControl;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.proto.ExecProtos;
+import org.apache.drill.exec.proto.GeneralRPCProtos;
+import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.rpc.Acks;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.RpcOutcomeListener;
+import org.apache.drill.exec.work.batch.ControlMessageHandler;
+import org.hamcrest.Description;
+import org.hamcrest.TypeSafeMatcher;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.util.concurrent.CountDownLatch;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestLocalControlConnectionManager {
+
+  private static final DrillbitEndpoint localEndpoint = 
DrillbitEndpoint.newBuilder()
+    .setAddress("10.0.0.1")
+    .setControlPort(31011)
+    .setState(DrillbitEndpoint.State.STARTUP)
+    .build();
+
+  private static ControlConnectionConfig mockConfig;
+
+  private static ControlMessageHandler mockHandler;
+
+  private static ControlTunnel controlTunnel;
+
+  private static CountDownLatch latch;
+
+  private static final String NEGATIVE_ACK_MESSAGE = "Negative Ack received";
+
+  private static final RpcOutcomeListener<GeneralRPCProtos.Ack> 
outcomeListener =
+    new RpcOutcomeListener<GeneralRPCProtos.Ack>() {
+    @Override
+    public void failed(RpcException ex) {
+      throw new IllegalStateException(ex);
+    }
+
+    @Override
+    public void success(GeneralRPCProtos.Ack value, ByteBuf buffer) {
+      if (value.getOk()) {
+        latch.countDown();
+      } else {
+        throw new IllegalStateException(NEGATIVE_ACK_MESSAGE);
+      }
+    }
+
+    @Override
+    public void interrupted(InterruptedException e) {
+      // Do nothing
+    }
+  };
+
+  @Rule
+  public ExpectedException exceptionThrown = ExpectedException.none();
+
+  @BeforeClass
+  public static void setup() {
+    mockConfig = mock(ControlConnectionConfig.class);
+    final ConnectionManagerRegistry registry = new 
ConnectionManagerRegistry(mockConfig);
+    registry.setLocalEndpoint(localEndpoint);
+    ControlConnectionManager manager = 
registry.getConnectionManager(localEndpoint);
+    assertTrue(manager instanceof LocalControlConnectionManager);
+    controlTunnel = new ControlTunnel(manager);
+  }
+
+  @Before
+  public void setupForTest() {
+    mockHandler = mock(ControlMessageHandler.class);
+    when(mockConfig.getMessageHandler()).thenReturn(mockHandler);
+  }
+
+  /**
+   * Verify that SendFragmentStatus is handled correctly using ControlTunnel 
with LocalControlConnectionManager
+   */
+  @Test
+  public void testLocalSendFragmentStatus_Success() throws Exception {
+    final UserBitShared.QueryId mockQueryId = 
UserBitShared.QueryId.getDefaultInstance();
+    final UserBitShared.QueryProfile mockProfile = 
UserBitShared.QueryProfile.getDefaultInstance();
+    when(mockHandler.requestQueryStatus(mockQueryId)).thenReturn(mockProfile);
+    final UserBitShared.QueryProfile returnedProfile = 
controlTunnel.requestQueryProfile(mockQueryId).checkedGet();
+    assertEquals(returnedProfile, mockProfile);
+  }
+
+  /**
+   * Verify that SendFragmentStatus failure scenario is handled correctly 
using ControlTunnel with
+   * LocalControlConnectionManager
+   */
+  @Test
+  public void testLocalSendFragmentStatus_Failure() throws Exception {
+    final UserBitShared.QueryId mockQueryId = 
UserBitShared.QueryId.getDefaultInstance();
+    final String exceptionMessage = "Testing failure case";
+    exceptionThrown.expect(RpcException.class);
+    exceptionThrown.expectMessage(exceptionMessage);
+    when(mockHandler.requestQueryStatus(mockQueryId)).thenThrow(new 
RpcException(exceptionMessage));
+    controlTunnel.requestQueryProfile(mockQueryId).checkedGet();
+  }
+
+  /**
+   * Verify that CancelFragment with positive ack is handled correctly using 
ControlTunnel with
+   * LocalControlConnectionManager
+   */
+  @Test
+  public void testLocalCancelFragment_PositiveAck() throws Exception {
+    final ExecProtos.FragmentHandle mockHandle = 
ExecProtos.FragmentHandle.getDefaultInstance();
+    latch = new CountDownLatch(1);
+    final GeneralRPCProtos.Ack mockResponse = Acks.OK;
+    when(mockHandler.cancelFragment(mockHandle)).thenReturn(mockResponse);
+    controlTunnel.cancelFragment(outcomeListener, mockHandle);
+    latch.await();
+  }
+
+  /**
+   * Verify that CancelFragment with negative ack is handled correctly using 
ControlTunnel with
+   * LocalControlConnectionManager
+   */
+  @Test
+  public void testLocalCancelFragment_NegativeAck() throws Exception {
+    final ExecProtos.FragmentHandle mockHandle = 
ExecProtos.FragmentHandle.getDefaultInstance();
+    latch = new CountDownLatch(1);
+    exceptionThrown.expect(IllegalStateException.class);
+    exceptionThrown.expectMessage(NEGATIVE_ACK_MESSAGE);
+    final GeneralRPCProtos.Ack mockResponse = Acks.FAIL;
+    when(mockHandler.cancelFragment(mockHandle)).thenReturn(mockResponse);
+    controlTunnel.cancelFragment(outcomeListener, mockHandle);
+    latch.await();
+  }
+
+  /**
+   * Verify that InitializeFragments with positive ack is handled correctly 
using ControlTunnel with
+   * LocalControlConnectionManager
+   */
+  @Test
+  public void testLocalSendFragments_PositiveAck() throws Exception {
+    final BitControl.InitializeFragments mockFragments = 
BitControl.InitializeFragments.getDefaultInstance();
+    latch = new CountDownLatch(1);
+    final GeneralRPCProtos.Ack mockResponse = Acks.OK;
+    
when(mockHandler.initializeFragment(mockFragments)).thenReturn(mockResponse);
+    controlTunnel.sendFragments(outcomeListener, mockFragments);
+    latch.await();
+  }
+
+  /**
+   * Verify that InitializeFragments with negative ack is handled correctly 
using ControlTunnel with
+   * LocalControlConnectionManager
+   */
+  @Test
+  public void testLocalSendFragments_NegativeAck() throws Exception {
+    final BitControl.InitializeFragments mockFragments = 
BitControl.InitializeFragments.getDefaultInstance();
+    latch = new CountDownLatch(1);
+    exceptionThrown.expect(IllegalStateException.class);
+    exceptionThrown.expectMessage(NEGATIVE_ACK_MESSAGE);
+    final GeneralRPCProtos.Ack mockResponse = Acks.FAIL;
+    
when(mockHandler.initializeFragment(mockFragments)).thenReturn(mockResponse);
+    controlTunnel.sendFragments(outcomeListener, mockFragments);
+    latch.await();
+  }
+
+  /**
+   * Verify that InitializeFragments failure case is handled correctly using 
ControlTunnel with
+   * LocalControlConnectionManager
+   */
+  @Test
+  public void testLocalSendFragments_Failure() throws Exception {
+    final BitControl.InitializeFragments mockFragments = 
BitControl.InitializeFragments.getDefaultInstance();
+    latch = new CountDownLatch(1);
+    exceptionThrown.expect(IllegalStateException.class);
+    exceptionThrown.expectCause(new 
TypeSafeMatcher<Throwable>(RpcException.class) {
+      @Override
+      protected boolean matchesSafely(Throwable throwable) {
+        return (throwable != null && throwable instanceof RpcException);
+      }
+
+      @Override
+      public void describeTo(Description description) {
+        // Do nothing
+      }
+    });
+    when(mockHandler.initializeFragment(mockFragments)).thenThrow(new 
RpcException("Failed to initialize"));
+    controlTunnel.sendFragments(outcomeListener, mockFragments);
+    latch.await();
+  }
+
+  /**
+   * Verify that UnpauseFragment is handled correctly using ControlTunnel with 
LocalControlConnectionManager
+   */
+  @Test
+  public void testUnpauseFragments() throws Exception {
+    final ExecProtos.FragmentHandle mockHandle = 
ExecProtos.FragmentHandle.getDefaultInstance();
+    latch = new CountDownLatch(1);
+    final GeneralRPCProtos.Ack mockResponse = Acks.OK;
+    when(mockHandler.resumeFragment(mockHandle)).thenReturn(mockResponse);
+    controlTunnel.unpauseFragment(outcomeListener, mockHandle);
+    latch.await();
+  }
+
+  /**
+   * Verify that RequestQueryStatus is handled correctly using ControlTunnel 
with LocalControlConnectionManager
+   */
+  @Test
+  public void testRequestQueryStatus() throws Exception {
+    final UserBitShared.QueryId mockQueryId = 
UserBitShared.QueryId.getDefaultInstance();
+    final UserBitShared.QueryProfile mockProfile = 
UserBitShared.QueryProfile.getDefaultInstance();
+    when(mockHandler.requestQueryStatus(mockQueryId)).thenReturn(mockProfile);
+    final UserBitShared.QueryProfile returnedProfile = 
controlTunnel.requestQueryProfile(mockQueryId).checkedGet();
+    assertEquals(returnedProfile, mockProfile);
+  }
+
+  /**
+   * Verify that CancelQuery with positive ack is handled correctly using 
ControlTunnel with
+   * LocalControlConnectionManager
+   */
+  @Test
+  public void testCancelQuery_PositiveAck() throws Exception {
+    final UserBitShared.QueryId mockQueryId = 
UserBitShared.QueryId.getDefaultInstance();
+    final GeneralRPCProtos.Ack mockResponse = Acks.OK;
+    when(mockHandler.requestQueryCancel(mockQueryId)).thenReturn(mockResponse);
+    GeneralRPCProtos.Ack response = 
controlTunnel.requestCancelQuery(mockQueryId).checkedGet();
+    assertEquals(response, mockResponse);
+  }
+
+  /**
+   * Verify that CancelQuery with negative ack is handled correctly using 
ControlTunnel with
+   * LocalControlConnectionManager
+   */
+  @Test
+  public void testCancelQuery_NegativeAck() throws Exception {
+    final UserBitShared.QueryId mockQueryId = 
UserBitShared.QueryId.getDefaultInstance();
+    final GeneralRPCProtos.Ack mockResponse = Acks.FAIL;
+    when(mockHandler.requestQueryCancel(mockQueryId)).thenReturn(mockResponse);
+    GeneralRPCProtos.Ack response = 
controlTunnel.requestCancelQuery(mockQueryId).checkedGet();
+    assertEquals(response, mockResponse);
+  }
+
+  /**
+   * Verify that FinishedReceiver is handled correctly using ControlTunnel 
with LocalControlConnectionManager
+   */
+  @Test
+  public void testInformReceiverFinished_success() throws Exception {
+    final BitControl.FinishedReceiver finishedReceiver = 
BitControl.FinishedReceiver.getDefaultInstance();
+    latch = new CountDownLatch(1);
+    final GeneralRPCProtos.Ack mockResponse = Acks.OK;
+    
when(mockHandler.receivingFragmentFinished(finishedReceiver)).thenReturn(mockResponse);
+    controlTunnel.informReceiverFinished(outcomeListener, finishedReceiver);
+    latch.await();
+  }
+}
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/security/TestUserBitKerberos.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/security/TestUserBitKerberos.java
index aaff1544c5..04d54d54c9 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/security/TestUserBitKerberos.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/security/TestUserBitKerberos.java
@@ -19,16 +19,16 @@
 
 import com.google.common.collect.Lists;
 import com.typesafe.config.ConfigValueFactory;
-import org.apache.drill.test.BaseTestQuery;
 import org.apache.drill.categories.SecurityTest;
-import org.apache.drill.common.config.DrillProperties;
 import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.config.DrillProperties;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.rpc.control.ControlRpcMetrics;
 import org.apache.drill.exec.rpc.data.DataRpcMetrics;
 import org.apache.drill.exec.rpc.security.KerberosHelper;
 import org.apache.drill.exec.rpc.user.UserRpcMetrics;
 import 
org.apache.drill.exec.rpc.user.security.testing.UserAuthenticatorTestImpl;
+import org.apache.drill.test.BaseTestQuery;
 import org.apache.hadoop.security.authentication.util.KerberosName;
 import org.apache.hadoop.security.authentication.util.KerberosUtil;
 import org.apache.kerby.kerberos.kerb.client.JaasKrbUtil;
@@ -178,6 +178,38 @@ public Void run() throws Exception {
     assertTrue(0 == 
DataRpcMetrics.getInstance().getUnEncryptedConnectionCount());
   }
 
+  @Test
+  public void testUnecryptedConnectionCounter_LocalControlMessage() throws 
Exception {
+    final Properties connectionProps = new Properties();
+    connectionProps.setProperty(DrillProperties.SERVICE_PRINCIPAL, 
krbHelper.SERVER_PRINCIPAL);
+    connectionProps.setProperty(DrillProperties.KERBEROS_FROM_SUBJECT, "true");
+    final Subject clientSubject = 
JaasKrbUtil.loginUsingKeytab(krbHelper.CLIENT_PRINCIPAL,
+      krbHelper.clientKeytab.getAbsoluteFile());
+
+    Subject.doAs(clientSubject, new PrivilegedExceptionAction<Void>() {
+      @Override
+      public Void run() throws Exception {
+        updateClient(connectionProps);
+        return null;
+      }
+    });
+
+    // Run query on memory system table this sends remote fragments to all 
Drillbit and Drillbits then send data
+    // using data channel. In this test we have only 1 Drillbit so there 
should not be any control connection but a
+    // local data connections
+    testSql("SELECT * FROM sys.memory");
+
+    // Check encrypted counters value
+    assertTrue(0 == 
UserRpcMetrics.getInstance().getEncryptedConnectionCount());
+    assertTrue(0 == 
ControlRpcMetrics.getInstance().getEncryptedConnectionCount());
+    assertTrue(0 == 
DataRpcMetrics.getInstance().getEncryptedConnectionCount());
+
+    // Check unencrypted counters value
+    assertTrue(1 == 
UserRpcMetrics.getInstance().getUnEncryptedConnectionCount());
+    assertTrue(0 == 
ControlRpcMetrics.getInstance().getUnEncryptedConnectionCount());
+    assertTrue(2 == 
DataRpcMetrics.getInstance().getUnEncryptedConnectionCount());
+  }
+
   @AfterClass
   public static void cleanTest() throws Exception {
     krbHelper.stopKdc();
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/work/fragment/FragmentStatusReporterTest.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/work/fragment/FragmentStatusReporterTest.java
index 83c1eca382..a119a8cf9a 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/work/fragment/FragmentStatusReporterTest.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/work/fragment/FragmentStatusReporterTest.java
@@ -17,54 +17,44 @@
  */
 package org.apache.drill.exec.work.fragment;
 
-import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
-import org.apache.drill.exec.rpc.control.Controller;
-import org.apache.drill.exec.rpc.control.WorkEventBus;
-import org.junit.Before;
-import org.junit.Test;
-
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.ops.FragmentContextImpl;
 import org.apache.drill.exec.ops.FragmentStats;
 import org.apache.drill.exec.proto.BitControl.FragmentStatus;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.proto.UserBitShared.FragmentState;
 import org.apache.drill.exec.rpc.control.ControlTunnel;
+import org.apache.drill.exec.rpc.control.Controller;
+import org.junit.Before;
+import org.junit.Test;
 
-import static org.junit.Assert.assertTrue;
+import static 
org.apache.drill.exec.proto.UserBitShared.FragmentState.CANCELLATION_REQUESTED;
+import static org.apache.drill.exec.proto.UserBitShared.FragmentState.FAILED;
+import static org.apache.drill.exec.proto.UserBitShared.FragmentState.RUNNING;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyZeroInteractions;
 import static org.mockito.Mockito.when;
 
-import static 
org.apache.drill.exec.proto.UserBitShared.FragmentState.CANCELLATION_REQUESTED;
-import static org.apache.drill.exec.proto.UserBitShared.FragmentState.FAILED;
-import static org.apache.drill.exec.proto.UserBitShared.FragmentState.RUNNING;
-
 public class FragmentStatusReporterTest {
 
   private FragmentStatusReporter statusReporter;
 
   private ControlTunnel foremanTunnel;
 
-  private FragmentContextImpl context;
-
   @Before
   public void setUp() throws Exception {
-    context = mock(FragmentContextImpl.class);
+    final FragmentContextImpl context = mock(FragmentContextImpl.class);
     Controller controller = mock(Controller.class);
 
-    // Create 2 different endpoint such that foremanEndpoint is different than
-    // localEndpoint
-    DrillbitEndpoint localEndpoint = 
DrillbitEndpoint.newBuilder().setAddress("10.0.0.1").build();
+    // Create Foreman Endpoint and it's tunnel
     DrillbitEndpoint foremanEndpoint = 
DrillbitEndpoint.newBuilder().setAddress("10.0.0.2").build();
     foremanTunnel = mock(ControlTunnel.class);
 
-    when(context.getEndpoint()).thenReturn(localEndpoint);
     when(context.getController()).thenReturn(controller);
     when(controller.getTunnel(foremanEndpoint)).thenReturn(foremanTunnel);
 
@@ -120,49 +110,4 @@ public void testStateChangedAfterClose() throws Exception {
     statusReporter.stateChanged(CANCELLATION_REQUESTED);
     verify(foremanTunnel).sendFragmentStatus(any(FragmentStatus.class));
   }
-
-
-  /**
-   * With LocalEndpoint and Foreman Endpoint being same node, test that status 
change
-   * message doesn't happen via Control Tunnel instead it happens locally via 
WorkEventBus
-   *
-   * @throws Exception
-   */
-  @Test
-  public void testStateChangedLocalForeman() throws Exception {
-
-    DrillbitEndpoint localEndpoint = 
DrillbitEndpoint.newBuilder().setAddress("10.0.0.1").build();
-
-    when(context.getEndpoint()).thenReturn(localEndpoint);
-    when(context.getForemanEndpoint()).thenReturn(localEndpoint);
-    when(context.getWorkEventbus()).thenReturn(mock(WorkEventBus.class));
-
-    statusReporter = new FragmentStatusReporter(context);
-    statusReporter.stateChanged(RUNNING);
-    verifyZeroInteractions(foremanTunnel);
-    verify(context.getWorkEventbus()).statusUpdate(any(FragmentStatus.class));
-  }
-
-  /**
-   * With LocalEndpoint and Foreman Endpoint being same node, test that after 
close of
-   * FragmentStatusReporter, status update doesn't happen either through 
Control Tunnel
-   * or through WorkEventBus.
-   *
-   * @throws Exception
-   */
-  @Test
-  public void testCloseLocalForeman() throws Exception {
-    DrillbitEndpoint localEndpoint = 
DrillbitEndpoint.newBuilder().setAddress("10.0.0.1").build();
-
-    when(context.getEndpoint()).thenReturn(localEndpoint);
-    when(context.getForemanEndpoint()).thenReturn(localEndpoint);
-    when(context.getWorkEventbus()).thenReturn(mock(WorkEventBus.class));
-    statusReporter = new FragmentStatusReporter(context);
-
-    statusReporter.close();
-    assertTrue(statusReporter.foremanDrillbit.get() == null);
-    statusReporter.stateChanged(RUNNING);
-    verifyZeroInteractions(foremanTunnel);
-    verify(context.getWorkEventbus(), 
never()).statusUpdate(any(FragmentStatus.class));
-  }
 }
diff --git 
a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/FutureBitCommand.java 
b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/FutureBitCommand.java
index 90b2d19f86..2437ddb7a3 100644
--- a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/FutureBitCommand.java
+++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/FutureBitCommand.java
@@ -17,20 +17,24 @@
  */
 package org.apache.drill.exec.rpc;
 
-import io.netty.buffer.ByteBuf;
-
 import com.google.common.util.concurrent.SettableFuture;
+import com.google.protobuf.Internal.EnumLite;
 import com.google.protobuf.MessageLite;
+import io.netty.buffer.ByteBuf;
 
-public abstract class FutureBitCommand<T extends MessageLite, C extends 
RemoteConnection> implements RpcCommand<T,C> {
+public abstract class FutureBitCommand<T extends MessageLite, C extends 
RemoteConnection,
+  E extends EnumLite, M extends MessageLite> implements RpcCommand<T, C, E, M> 
{
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(FutureBitCommand.class);
 
   protected final SettableFuture<T> settableFuture;
   private final RpcCheckedFuture<T> parentFuture;
 
+  private final RpcOutcomeListener<T> outcomeListener;
+
   public FutureBitCommand() {
     this.settableFuture = SettableFuture.create();
     this.parentFuture = new RpcCheckedFuture<T>(settableFuture);
+    outcomeListener = new DeferredRpcOutcome();
   }
 
   public abstract void doRpcCall(RpcOutcomeListener<T> outcomeListener, C 
connection);
@@ -38,7 +42,7 @@ public FutureBitCommand() {
   @Override
   public void connectionAvailable(C connection) {
 
-    doRpcCall(new DeferredRpcOutcome(), connection);
+    doRpcCall(outcomeListener, connection);
   }
 
   @Override
@@ -71,10 +75,14 @@ public void interrupted(final InterruptedException e) {
     return parentFuture;
   }
 
+  @Override
+  public RpcOutcomeListener<T> getOutcomeListener() {
+    return outcomeListener;
+  }
+
   @Override
   public void connectionFailed(FailureType type, Throwable t) {
     settableFuture.setException(RpcException.mapException(
         String.format("Command failed while establishing connection.  Failure 
type %s.", type), t));
   }
-
-}
\ No newline at end of file
+}
diff --git 
a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/ListeningCommand.java 
b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/ListeningCommand.java
index c9b8b3a527..964abf44ed 100644
--- a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/ListeningCommand.java
+++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/ListeningCommand.java
@@ -17,11 +17,11 @@
  */
 package org.apache.drill.exec.rpc;
 
-import io.netty.buffer.ByteBuf;
-
+import com.google.protobuf.Internal.EnumLite;
 import com.google.protobuf.MessageLite;
 
-public abstract class ListeningCommand<T extends MessageLite, C extends 
RemoteConnection> implements RpcCommand<T, C> {
+public abstract class ListeningCommand<T extends MessageLite, C extends 
RemoteConnection,
+  E extends EnumLite, M extends MessageLite> implements RpcCommand<T, C, E, M> 
{
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(ListeningCommand.class);
 
   private final RpcOutcomeListener<T> listener;
@@ -35,7 +35,7 @@ public ListeningCommand(RpcOutcomeListener<T> listener) {
   @Override
   public void connectionAvailable(C connection) {
 
-    doRpcCall(new DeferredRpcOutcome(), connection);
+    doRpcCall(listener, connection);
   }
 
   @Override
@@ -43,28 +43,15 @@ public void connectionSucceeded(C connection) {
     connectionAvailable(connection);
   }
 
-  private class DeferredRpcOutcome implements RpcOutcomeListener<T> {
-
-    @Override
-    public void failed(RpcException ex) {
-      listener.failed(ex);
-    }
-
-    @Override
-    public void success(T value, ByteBuf buf) {
-      listener.success(value, buf);
-    }
-
-    @Override
-    public void interrupted(final InterruptedException e) {
-      listener.interrupted(e);
-    }
-  }
-
   @Override
   public void connectionFailed(FailureType type, Throwable t) {
     listener.failed(RpcException.mapException(
         String.format("Command failed while establishing connection.  Failure 
type %s.", type), t));
   }
 
-}
\ No newline at end of file
+  @Override
+  public RpcOutcomeListener<T> getOutcomeListener() {
+    return listener;
+  }
+
+}
diff --git 
a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/ReconnectingConnection.java 
b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/ReconnectingConnection.java
index fec3962b90..c648a169e2 100644
--- 
a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/ReconnectingConnection.java
+++ 
b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/ReconnectingConnection.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.rpc;
 
+import com.google.protobuf.Internal.EnumLite;
 import io.netty.channel.ChannelFuture;
 import io.netty.util.concurrent.GenericFutureListener;
 
@@ -52,7 +53,8 @@ public ReconnectingConnection(HS handshake, String host, int 
port) {
 
   protected abstract BasicClient<?, C, HS, ?> getNewClient();
 
-  public <T extends MessageLite, R extends RpcCommand<T, C>> void runCommand(R 
cmd) {
+  public <T extends MessageLite, E extends EnumLite, M extends MessageLite,
+    R extends RpcCommand<T, C, E, M>> void runCommand(R cmd) {
 //    if(logger.isDebugEnabled()) logger.debug(String.format("Running command 
%s sending to host %s:%d", cmd, host, port));
     C connection = connectionHolder.get();
     if (connection != null) {
@@ -78,7 +80,7 @@ public ReconnectingConnection(HS handshake, String host, int 
port) {
       } else {
 //        logger.debug("No connection active, opening client connection.");
         BasicClient<?, C, HS, ?> client = getNewClient();
-        ConnectionListeningFuture<T> future = new 
ConnectionListeningFuture<>(cmd);
+        ConnectionListeningFuture<T,E,M> future = new 
ConnectionListeningFuture<>(cmd);
         client.connectAsClient(future, handshake, host, port);
         future.waitAndRun();
 //        logger.debug("Connection available and active, command now being run 
inline.");
@@ -88,13 +90,13 @@ public ReconnectingConnection(HS handshake, String host, 
int port) {
     }
   }
 
-  public class ConnectionListeningFuture<R extends MessageLite>
+  public class ConnectionListeningFuture<R extends MessageLite, E extends 
EnumLite, M extends MessageLite>
       extends AbstractFuture<C>
       implements RpcConnectionHandler<C> {
 
-    private RpcCommand<R, C> cmd;
+    private RpcCommand<R, C, E, M> cmd;
 
-    public ConnectionListeningFuture(RpcCommand<R, C> cmd) {
+    public ConnectionListeningFuture(RpcCommand<R, C, E, M> cmd) {
       super();
       this.cmd = cmd;
     }
@@ -173,9 +175,7 @@ public void connectionSucceeded(C incoming) {
         incoming.getChannel().close();
       }
       set(connection);
-
     }
-
   }
 
   /** Factory for close handlers **/
@@ -204,7 +204,6 @@ public void operationComplete(ChannelFuture future) throws 
Exception {
       connectionHolder.compareAndSet(connection, null);
       parent.operationComplete(future);
     }
-
   }
 
   public CloseHandlerCreator getCloseHandlerCreator() {
@@ -223,5 +222,4 @@ public void close() {
       c.getChannel().close();
     }
   }
-
 }
diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcCommand.java 
b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcCommand.java
index 1bb39ac505..821b8fbf85 100644
--- a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcCommand.java
+++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcCommand.java
@@ -17,10 +17,17 @@
  */
 package org.apache.drill.exec.rpc;
 
+import com.google.protobuf.Internal.EnumLite;
 import com.google.protobuf.MessageLite;
 
-public interface RpcCommand<T extends MessageLite, C extends RemoteConnection> 
extends RpcConnectionHandler<C>{
+public interface RpcCommand<T extends MessageLite, C extends RemoteConnection,
+  E extends EnumLite, M extends MessageLite> extends RpcConnectionHandler<C> {
 
-  public abstract void connectionAvailable(C connection);
+  void connectionAvailable(C connection);
 
-}
\ No newline at end of file
+  E getRpcType();
+
+  M getMessage();
+
+  RpcOutcomeListener<T> getOutcomeListener();
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Drillbit while sending control message to itself creates a connection instead 
> of submitting locally
> ---------------------------------------------------------------------------------------------------
>
>                 Key: DRILL-6255
>                 URL: https://issues.apache.org/jira/browse/DRILL-6255
>             Project: Apache Drill
>          Issue Type: Bug
>          Components: Execution - Flow
>    Affects Versions: 1.12.0
>            Reporter: Sorabh Hamirwasia
>            Assignee: Sorabh Hamirwasia
>            Priority: Major
>              Labels: ready-to-commit
>             Fix For: 1.14.0
>
>
> With the new shutdown feature introduced in 1.12, there is a state introduced 
> in DrillbitEndpoint. Due to this the equality check happening 
> [here|https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentsRunner.java#L256]
>  will result in false and hence the fragments supposed to be scheduled on 
> Foreman will be treated as remote fragments and a connection will be created 
> to schedule it. The equality check is false because localEndpoint state is 
> STARTUP whereas state in assigned Drillbit is ONLINE.
> I guess now we should update the equality check to verify just for address 
> and control port to be same between assigned and local Drillbit endpoint. A 
> test can be added for this based on _sys.memory_ table since that guarantees 
> scheduling minor fragments on each Drillbit node.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to