This is an automated email from the ASF dual-hosted git repository.

arina pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit 5dd8a6f60e006c2dc707f241b7619634e4e82bbd
Author: Sorabh Hamirwasia <shamirwa...@maprtech.com>
AuthorDate: Tue May 8 13:06:20 2018 -0700

    DRILL-6255: Drillbit while sending control message to itself creates a 
connection instead of submitting locally
    
    closes #1253
---
 .../org/apache/drill/exec/rpc/BitRpcUtility.java   |  18 ++
 .../rpc/control/ConnectionManagerRegistry.java     |   6 +-
 .../exec/rpc/control/ControlConnectionConfig.java  |   9 +-
 .../exec/rpc/control/ControlConnectionManager.java |  16 +-
 .../drill/exec/rpc/control/ControlTunnel.java      | 162 ++++++++----
 .../rpc/control/LocalControlConnectionManager.java | 236 +++++++++++++++++
 ...er.java => RemoteControlConnectionManager.java} |  26 +-
 .../org/apache/drill/exec/rpc/data/DataTunnel.java |  25 +-
 .../exec/work/batch/ControlMessageHandler.java     |  51 ++--
 .../drill/exec/work/foreman/FragmentsRunner.java   | 105 ++------
 .../exec/work/fragment/FragmentStatusReporter.java |  15 +-
 .../rpc/control/ConnectionManagerRegistryTest.java | 149 +++++++++++
 .../control/TestLocalControlConnectionManager.java | 278 +++++++++++++++++++++
 .../rpc/user/security/TestUserBitKerberos.java     |  36 ++-
 .../work/fragment/FragmentStatusReporterTest.java  |  73 +-----
 .../apache/drill/exec/rpc/FutureBitCommand.java    |  20 +-
 .../apache/drill/exec/rpc/ListeningCommand.java    |  33 +--
 .../drill/exec/rpc/ReconnectingConnection.java     |  16 +-
 .../java/org/apache/drill/exec/rpc/RpcCommand.java |  13 +-
 19 files changed, 983 insertions(+), 304 deletions(-)

diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BitRpcUtility.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BitRpcUtility.java
index c398033..0bc01f5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BitRpcUtility.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BitRpcUtility.java
@@ -103,6 +103,24 @@ public final class BitRpcUtility {
     }
   }
 
+  /**
+   * Verifies if local and remote Drillbit Endpoint has same control server by 
using address and control port
+   * information. This method is used instead of equals in {@link 
DrillbitEndpoint} because DrillbitEndpoint stores
+   * state information in it.
+   * For local Drillbit a reference is stored in {@link 
org.apache.drill.exec.server.DrillbitContext} as soon as
+   * Drillbit is started in {@link 
org.apache.drill.exec.service.ServiceEngine#start} with state as STARTUP, but
+   * while planning minor fragment the assignment list is used from active 
list of Drillbits in which state for local
+   * Drillbit will not be STARTUP
+   * @param local - DrillbitEndpoint instance for local bit
+   * @param remote - DrillbitEndpoint instance for remote bit
+   * @return true if address and control port for local and remote are same.
+   *         false - otherwise
+   */
+  public static boolean isLocalControlServer(DrillbitEndpoint local, 
DrillbitEndpoint remote) {
+    return local.hasAddress() && local.hasControlPort() && remote.hasAddress() 
&& remote.hasControlPort() &&
+      local.getAddress().equals(remote.getAddress()) && local.getControlPort() 
== remote.getControlPort();
+  }
+
   // Suppress default constructor
   private BitRpcUtility() {
   }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ConnectionManagerRegistry.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ConnectionManagerRegistry.java
index 800cf3c..e27729c 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ConnectionManagerRegistry.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ConnectionManagerRegistry.java
@@ -23,6 +23,7 @@ import java.util.concurrent.ConcurrentMap;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 
 import com.google.common.collect.Maps;
+import org.apache.drill.exec.rpc.BitRpcUtility;
 
 public class ConnectionManagerRegistry implements 
Iterable<ControlConnectionManager> {
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(ConnectionManagerRegistry.class);
@@ -40,9 +41,12 @@ public class ConnectionManagerRegistry implements 
Iterable<ControlConnectionMana
   public ControlConnectionManager getConnectionManager(DrillbitEndpoint 
remoteEndpoint) {
     assert localEndpoint != null :
         "DrillbitEndpoint must be set before a connection manager can be 
retrieved";
+
+    final boolean isLocalServer = 
BitRpcUtility.isLocalControlServer(localEndpoint, remoteEndpoint);
     ControlConnectionManager m = registry.get(remoteEndpoint);
     if (m == null) {
-      m = new ControlConnectionManager(config, localEndpoint, remoteEndpoint);
+      m = (isLocalServer) ? new LocalControlConnectionManager(config, 
remoteEndpoint)
+                    : new RemoteControlConnectionManager(config, 
localEndpoint, remoteEndpoint);
       final ControlConnectionManager m2 = registry.putIfAbsent(remoteEndpoint, 
m);
       if (m2 != null) {
         m = m2;
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnectionConfig.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnectionConfig.java
index b19fb8b..f114af4 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnectionConfig.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnectionConfig.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.rpc.control;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.drill.exec.exception.DrillbitStartupException;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.rpc.BitConnectionConfig;
@@ -24,8 +25,8 @@ import org.apache.drill.exec.server.BootStrapContext;
 import org.apache.drill.exec.work.batch.ControlMessageHandler;
 
 // config for bit to bit connection
-// package private
-class ControlConnectionConfig extends BitConnectionConfig {
+@VisibleForTesting
+public class ControlConnectionConfig extends BitConnectionConfig {
 //  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(ControlConnectionConfig.class);
 
   private final ControlMessageHandler handler;
@@ -41,8 +42,8 @@ class ControlConnectionConfig extends BitConnectionConfig {
     return "control"; // unused
   }
 
-  ControlMessageHandler getMessageHandler() {
+  @VisibleForTesting
+  public ControlMessageHandler getMessageHandler() {
     return handler;
   }
-
 }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnectionManager.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnectionManager.java
index 6bfcbd5..240421e 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnectionManager.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnectionManager.java
@@ -25,14 +25,10 @@ import org.apache.drill.exec.rpc.ReconnectingConnection;
 /**
  * Maintains connection between two particular bits.
  */
-public class ControlConnectionManager extends 
ReconnectingConnection<ControlConnection, BitControlHandshake>{
+public abstract class ControlConnectionManager extends 
ReconnectingConnection<ControlConnection, BitControlHandshake>{
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(ControlConnectionManager.class);
 
-  private final ControlConnectionConfig config;
-  private final DrillbitEndpoint remoteEndpoint;
-
-  public ControlConnectionManager(ControlConnectionConfig config, 
DrillbitEndpoint localEndpoint,
-                                  DrillbitEndpoint remoteEndpoint) {
+  public ControlConnectionManager(DrillbitEndpoint localEndpoint, 
DrillbitEndpoint remoteEndpoint) {
     super(
         BitControlHandshake.newBuilder()
             .setRpcVersion(ControlRpcConfig.RPC_VERSION)
@@ -40,14 +36,8 @@ public class ControlConnectionManager extends 
ReconnectingConnection<ControlConn
             .build(),
         remoteEndpoint.getAddress(),
         remoteEndpoint.getControlPort());
-
-    this.config = config;
-    this.remoteEndpoint = remoteEndpoint;
   }
 
   @Override
-  protected BasicClient<?, ControlConnection, BitControlHandshake, ?> 
getNewClient() {
-    return new ControlClient(config, remoteEndpoint, new 
CloseHandlerCreator());
-  }
-
+  protected abstract BasicClient<?, ControlConnection, BitControlHandshake, ?> 
getNewClient();
 }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java
index 1a4af90..492d4de 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java
@@ -17,11 +17,20 @@
  */
 package org.apache.drill.exec.rpc.control;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonDeserializer;
+import com.fasterxml.jackson.databind.JsonSerializer;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectReader;
+import com.fasterxml.jackson.databind.ObjectWriter;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.Message;
+import com.google.protobuf.MessageLite;
+import com.google.protobuf.Parser;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.DrillBuf;
-
-import java.util.concurrent.TimeUnit;
-
 import org.apache.drill.exec.proto.BitControl.CustomMessage;
 import org.apache.drill.exec.proto.BitControl.FinishedReceiver;
 import org.apache.drill.exec.proto.BitControl.FragmentStatus;
@@ -38,19 +47,7 @@ import org.apache.drill.exec.rpc.RpcException;
 import org.apache.drill.exec.rpc.RpcOutcomeListener;
 import org.apache.drill.exec.rpc.control.Controller.CustomSerDe;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.JsonDeserializer;
-import com.fasterxml.jackson.databind.JsonSerializer;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.ObjectReader;
-import com.fasterxml.jackson.databind.ObjectWriter;
-import com.fasterxml.jackson.databind.module.SimpleModule;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.InvalidProtocolBufferException;
-import com.google.protobuf.Message;
-import com.google.protobuf.MessageLite;
-import com.google.protobuf.Parser;
-
+import java.util.concurrent.TimeUnit;
 
 public class ControlTunnel {
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(ControlTunnel.class);
@@ -99,8 +96,7 @@ public class ControlTunnel {
     return b.getFuture();
   }
 
-
-  public static class SendFragmentStatus extends FutureBitCommand<Ack, 
ControlConnection> {
+  public static class SendFragmentStatus extends FutureBitCommand<Ack, 
ControlConnection, RpcType, FragmentStatus> {
     final FragmentStatus status;
 
     public SendFragmentStatus(FragmentStatus status) {
@@ -110,13 +106,22 @@ public class ControlTunnel {
 
     @Override
     public void doRpcCall(RpcOutcomeListener<Ack> outcomeListener, 
ControlConnection connection) {
-      connection.sendUnsafe(outcomeListener, RpcType.REQ_FRAGMENT_STATUS, 
status, Ack.class);
+      connection.sendUnsafe(outcomeListener, getRpcType(), status, Ack.class);
     }
 
-  }
+    @Override
+    public RpcType getRpcType() {
+      return RpcType.REQ_FRAGMENT_STATUS;
+    }
 
+    @Override
+    public FragmentStatus getMessage() {
+      return status;
+    }
 
-  public static class ReceiverFinished extends ListeningCommand<Ack, 
ControlConnection> {
+  }
+
+  public static class ReceiverFinished extends ListeningCommand<Ack, 
ControlConnection, RpcType, FinishedReceiver> {
     final FinishedReceiver finishedReceiver;
 
     public ReceiverFinished(RpcOutcomeListener<Ack> listener, FinishedReceiver 
finishedReceiver) {
@@ -126,11 +131,21 @@ public class ControlTunnel {
 
     @Override
     public void doRpcCall(RpcOutcomeListener<Ack> outcomeListener, 
ControlConnection connection) {
-      connection.send(outcomeListener, RpcType.REQ_RECEIVER_FINISHED, 
finishedReceiver, Ack.class);
+      connection.send(outcomeListener, getRpcType(), finishedReceiver, 
Ack.class);
+    }
+
+    @Override
+    public RpcType getRpcType() {
+      return RpcType.REQ_RECEIVER_FINISHED;
+    }
+
+    @Override
+    public FinishedReceiver getMessage() {
+      return finishedReceiver;
     }
   }
 
-  public static class SignalFragment extends ListeningCommand<Ack, 
ControlConnection> {
+  public static class SignalFragment extends ListeningCommand<Ack, 
ControlConnection, RpcType, FragmentHandle> {
     final FragmentHandle handle;
     final RpcType type;
 
@@ -145,9 +160,18 @@ public class ControlTunnel {
       connection.sendUnsafe(outcomeListener, type, handle, Ack.class);
     }
 
+    @Override
+    public RpcType getRpcType() {
+      return type;
+    }
+
+    @Override
+    public FragmentHandle getMessage() {
+      return handle;
+    }
   }
 
-  public static class SendFragment extends ListeningCommand<Ack, 
ControlConnection> {
+  public static class SendFragment extends ListeningCommand<Ack, 
ControlConnection, RpcType, InitializeFragments> {
     final InitializeFragments fragments;
 
     public SendFragment(RpcOutcomeListener<Ack> listener, InitializeFragments 
fragments) {
@@ -157,12 +181,21 @@ public class ControlTunnel {
 
     @Override
     public void doRpcCall(RpcOutcomeListener<Ack> outcomeListener, 
ControlConnection connection) {
-      connection.send(outcomeListener, RpcType.REQ_INITIALIZE_FRAGMENTS, 
fragments, Ack.class);
+      connection.send(outcomeListener, getRpcType(), fragments, Ack.class);
+    }
+
+    @Override
+    public RpcType getRpcType() {
+      return RpcType.REQ_INITIALIZE_FRAGMENTS;
     }
 
+    @Override
+    public InitializeFragments getMessage() {
+      return fragments;
+    }
   }
 
-  public static class RequestProfile extends FutureBitCommand<QueryProfile, 
ControlConnection> {
+  public static class RequestProfile extends FutureBitCommand<QueryProfile, 
ControlConnection, RpcType, QueryId> {
     final QueryId queryId;
 
     public RequestProfile(QueryId queryId) {
@@ -172,11 +205,21 @@ public class ControlTunnel {
 
     @Override
     public void doRpcCall(RpcOutcomeListener<QueryProfile> outcomeListener, 
ControlConnection connection) {
-      connection.send(outcomeListener, RpcType.REQ_QUERY_STATUS, queryId, 
QueryProfile.class);
+      connection.send(outcomeListener, getRpcType(), queryId, 
QueryProfile.class);
+    }
+
+    @Override
+    public RpcType getRpcType() {
+      return RpcType.REQ_QUERY_STATUS;
+    }
+
+    @Override
+    public QueryId getMessage() {
+      return queryId;
     }
   }
 
-  public static class CancelQuery extends FutureBitCommand<Ack, 
ControlConnection> {
+  public static class CancelQuery extends FutureBitCommand<Ack, 
ControlConnection, RpcType, QueryId> {
     final QueryId queryId;
 
     public CancelQuery(QueryId queryId) {
@@ -186,7 +229,17 @@ public class ControlTunnel {
 
     @Override
     public void doRpcCall(RpcOutcomeListener<Ack> outcomeListener, 
ControlConnection connection) {
-      connection.send(outcomeListener, RpcType.REQ_QUERY_CANCEL, queryId, 
Ack.class);
+      connection.send(outcomeListener, getRpcType(), queryId, Ack.class);
+    }
+
+    @Override
+    public RpcType getRpcType() {
+      return RpcType.REQ_QUERY_CANCEL;
+    }
+
+    @Override
+    public QueryId getMessage() {
+      return queryId;
     }
   }
 
@@ -204,8 +257,8 @@ public class ControlTunnel {
     return new CustomTunnel<SEND, RECEIVE>(messageTypeId, send, receive);
   }
 
-
-  private static class CustomMessageSender extends 
ListeningCommand<CustomMessage, ControlConnection> {
+  public static class CustomMessageSender extends
+    ListeningCommand<CustomMessage, ControlConnection, RpcType, CustomMessage> 
{
 
     private CustomMessage message;
     private ByteBuf[] dataBodies;
@@ -218,12 +271,27 @@ public class ControlTunnel {
 
     @Override
     public void doRpcCall(RpcOutcomeListener<CustomMessage> outcomeListener, 
ControlConnection connection) {
-      connection.send(outcomeListener, RpcType.REQ_CUSTOM, message, 
CustomMessage.class, dataBodies);
+      connection.send(outcomeListener, getRpcType(), message, 
CustomMessage.class, dataBodies);
+    }
+
+    @Override
+    public RpcType getRpcType() {
+      return RpcType.REQ_CUSTOM;
+    }
+
+    @Override
+    public CustomMessage getMessage() {
+      return message;
+    }
+
+    public ByteBuf[] getDataBodies() {
+      return dataBodies;
     }
 
   }
 
-  private static class SyncCustomMessageSender extends 
FutureBitCommand<CustomMessage, ControlConnection> {
+  public static class SyncCustomMessageSender extends
+    FutureBitCommand<CustomMessage, ControlConnection, RpcType, CustomMessage> 
{
 
     private CustomMessage message;
     private ByteBuf[] dataBodies;
@@ -236,7 +304,21 @@ public class ControlTunnel {
 
     @Override
     public void doRpcCall(RpcOutcomeListener<CustomMessage> outcomeListener, 
ControlConnection connection) {
-      connection.send(outcomeListener, RpcType.REQ_CUSTOM, message, 
CustomMessage.class, dataBodies);
+      connection.send(outcomeListener, getRpcType(), message, 
CustomMessage.class, dataBodies);
+    }
+
+    @Override
+    public RpcType getRpcType() {
+      return RpcType.REQ_CUSTOM;
+    }
+
+    @Override
+    public CustomMessage getMessage() {
+      return message;
+    }
+
+    public ByteBuf[] getDataBodies() {
+      return dataBodies;
     }
   }
 
@@ -261,8 +343,7 @@ public class ControlTunnel {
       return serde.deserializeReceived(message.getMessage().toByteArray());
     }
 
-    public RECEIVE get(long timeout, TimeUnit unit) throws Exception,
-        InvalidProtocolBufferException {
+    public RECEIVE get(long timeout, TimeUnit unit) throws Exception, 
InvalidProtocolBufferException {
       CustomMessage message = future.checkedGet(timeout, unit);
       return serde.deserializeReceived(message.getMessage().toByteArray());
     }
@@ -270,7 +351,6 @@ public class ControlTunnel {
     public DrillBuf getBuffer() throws RpcException {
       return (DrillBuf) future.getBuffer();
     }
-
   }
 
 
@@ -351,21 +431,15 @@ public class ControlTunnel {
         } catch (Exception e) {
           innerListener.failed(new RpcException("Failure while parsing message 
locally.", e));
         }
-
       }
 
       @Override
       public void interrupted(InterruptedException e) {
         innerListener.interrupted(e);
       }
-
     }
-
   }
 
-
-
-
   public static class ProtoSerDe<MSG extends MessageLite> implements 
CustomSerDe<MSG> {
     private final Parser<MSG> parser;
 
@@ -420,7 +494,5 @@ public class ControlTunnel {
     public MSG deserializeReceived(byte[] bytes) throws Exception {
       return (MSG) reader.readValue(bytes);
     }
-
   }
-
 }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/LocalControlConnectionManager.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/LocalControlConnectionManager.java
new file mode 100644
index 0000000..fa6a2e9
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/LocalControlConnectionManager.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.rpc.control;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.exec.exception.OutOfMemoryException;
+import org.apache.drill.exec.proto.BitControl;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
+import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.rpc.BasicClient;
+import org.apache.drill.exec.rpc.Response;
+import org.apache.drill.exec.rpc.RpcCommand;
+import org.apache.drill.exec.rpc.RpcConstants;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.RpcOutcomeListener;
+import org.apache.drill.exec.work.batch.ControlMessageHandler;
+
+public class LocalControlConnectionManager extends ControlConnectionManager {
+  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(LocalControlConnectionManager.class);
+
+  private final ControlConnectionConfig config;
+
+  public LocalControlConnectionManager(ControlConnectionConfig config, 
DrillbitEndpoint localEndpoint) {
+    super(localEndpoint, localEndpoint);
+    this.config = config;
+  }
+
+  @Override
+  protected BasicClient<?, ControlConnection, BitControl.BitControlHandshake, 
?> getNewClient() {
+    throw new UnsupportedOperationException("LocalControlConnectionManager 
doesn't support creating a control client");
+  }
+
+  @Override
+  public void runCommand(RpcCommand cmd) {
+    final int rpcType = cmd.getRpcType().getNumber();
+    final ControlMessageHandler messageHandler = config.getMessageHandler();
+
+    if (RpcConstants.EXTRA_DEBUGGING) {
+      logger.debug("Received bit com message of type {} over local connection 
manager", rpcType);
+    }
+
+    switch (rpcType) {
+
+      case BitControl.RpcType.REQ_CANCEL_FRAGMENT_VALUE: {
+        final ControlTunnel.SignalFragment signalFragment = 
((ControlTunnel.SignalFragment) cmd);
+        final RpcOutcomeListener<Ack> outcomeListener = 
signalFragment.getOutcomeListener();
+        final Ack ackResponse = 
messageHandler.cancelFragment(signalFragment.getMessage());
+        outcomeListener.success(ackResponse, null);
+        break;
+      }
+
+      case BitControl.RpcType.REQ_CUSTOM_VALUE: {
+        final ByteBuf[] dataBodies;
+        final RpcOutcomeListener<BitControl.CustomMessage> outcomeListener;
+
+        if (cmd instanceof ControlTunnel.CustomMessageSender) {
+          dataBodies = 
((ControlTunnel.CustomMessageSender)cmd).getDataBodies();
+          outcomeListener = 
((ControlTunnel.CustomMessageSender)cmd).getOutcomeListener();
+        } else if (cmd instanceof ControlTunnel.SyncCustomMessageSender) {
+          dataBodies = 
((ControlTunnel.SyncCustomMessageSender)cmd).getDataBodies();
+          outcomeListener = 
((ControlTunnel.SyncCustomMessageSender)cmd).getOutcomeListener();
+        } else {
+          throw new UnsupportedOperationException("Unknown Custom Type control 
message received");
+        }
+
+        DrillBuf reqDrillBuff;
+        try {
+          reqDrillBuff = convertToByteBuf(dataBodies);
+        } catch (Exception ex) {
+          outcomeListener.failed(new RpcException("Failed to allocate memory 
while sending request in " +
+            "LocalControlConnectionManager#convertToByteBuff", ex));
+          return;
+        } finally {
+          releaseByteBuf(dataBodies);
+        }
+
+        try {
+          BitControl.CustomMessage message = (BitControl.CustomMessage) 
cmd.getMessage();
+          final Response response = 
messageHandler.getHandlerRegistry().handle(message, reqDrillBuff);
+          DrillBuf responseBuffer;
+          try {
+            responseBuffer = convertToByteBuf(response.dBodies);
+          } catch (Exception ex) {
+            outcomeListener.failed(new RpcException("Failed to allocate memory 
while sending response in " +
+              "LocalControlConnectionManager#convertToByteBuff", ex));
+            return;
+          } finally {
+            releaseByteBuf(response.dBodies);
+          }
+
+          // Passed responseBuffer will be owned by consumer
+          outcomeListener.success((BitControl.CustomMessage)response.pBody, 
responseBuffer);
+        } catch (RpcException ex) {
+          cmd.getOutcomeListener().failed(ex);
+        } finally {
+          // Release the reqDrillBuff passed into handler
+          releaseByteBuf(reqDrillBuff);
+        }
+        break;
+      }
+
+      case BitControl.RpcType.REQ_RECEIVER_FINISHED_VALUE: {
+        final ControlTunnel.ReceiverFinished receiverFinished = 
((ControlTunnel.ReceiverFinished) cmd);
+        final RpcOutcomeListener<Ack> outcomeListener = 
receiverFinished.getOutcomeListener();
+        final Ack ackResponse = 
messageHandler.receivingFragmentFinished(receiverFinished.getMessage());
+        outcomeListener.success(ackResponse, null);
+        break;
+      }
+
+      case BitControl.RpcType.REQ_FRAGMENT_STATUS_VALUE: {
+        final ControlTunnel.SendFragmentStatus fragmentStatus = 
((ControlTunnel.SendFragmentStatus) cmd);
+        final RpcOutcomeListener<Ack> outcomeListener = 
fragmentStatus.getOutcomeListener();
+        final Ack ackResponse = 
messageHandler.requestFragmentStatus(fragmentStatus.getMessage());
+        outcomeListener.success(ackResponse, null);
+        break;
+      }
+
+      case BitControl.RpcType.REQ_QUERY_CANCEL_VALUE: {
+        final ControlTunnel.CancelQuery cancelQuery = 
((ControlTunnel.CancelQuery) cmd);
+        final RpcOutcomeListener<Ack> outcomeListener = 
cancelQuery.getOutcomeListener();
+        final Ack ackResponse = 
messageHandler.requestQueryCancel(cancelQuery.getMessage());
+        outcomeListener.success(ackResponse, null);
+        break;
+      }
+
+      case BitControl.RpcType.REQ_INITIALIZE_FRAGMENTS_VALUE: {
+        final ControlTunnel.SendFragment sendFragment = 
((ControlTunnel.SendFragment) cmd);
+        final RpcOutcomeListener<Ack> outcomeListener = 
sendFragment.getOutcomeListener();
+
+        try {
+          final Ack ackResponse = 
messageHandler.initializeFragment(sendFragment.getMessage());
+          outcomeListener.success(ackResponse, null);
+        } catch (RpcException ex) {
+          outcomeListener.failed(ex);
+        }
+        break;
+      }
+
+      case BitControl.RpcType.REQ_QUERY_STATUS_VALUE: {
+        final ControlTunnel.RequestProfile requestProfile = 
((ControlTunnel.RequestProfile) cmd);
+        final RpcOutcomeListener<UserBitShared.QueryProfile> outcomeListener = 
requestProfile.getOutcomeListener();
+
+        try {
+          final UserBitShared.QueryProfile profile = 
messageHandler.requestQueryStatus(requestProfile.getMessage());
+          outcomeListener.success(profile, null);
+        } catch (RpcException ex) {
+          outcomeListener.failed(ex);
+        }
+        break;
+      }
+
+      case BitControl.RpcType.REQ_UNPAUSE_FRAGMENT_VALUE: {
+        final ControlTunnel.SignalFragment signalFragment = 
((ControlTunnel.SignalFragment) cmd);
+        final RpcOutcomeListener<Ack> outcomeListener = 
signalFragment.getOutcomeListener();
+        final Ack ackResponse = 
messageHandler.resumeFragment(signalFragment.getMessage());
+        outcomeListener.success(ackResponse, null);
+        break;
+      }
+
+      default:
+        final RpcException rpcException = new 
RpcException(String.format("Unsupported control request type %s " +
+          "received on LocalControlConnectionManager", rpcType));
+        cmd.getOutcomeListener().failed(rpcException);
+    }
+  }
+
+  /**
+   * Copies ByteBuf in the input array into a single DrillBuf
+   * @param byteBuffArray - input array of ByteBuf's
+   * @return DrillBuf - output Drillbuf with all the input bytes.
+   * @throws OutOfMemoryException
+   */
+  private DrillBuf convertToByteBuf(ByteBuf[] byteBuffArray) throws 
OutOfMemoryException {
+
+    if (byteBuffArray == null) {
+      return null;
+    }
+
+    int bytesToCopy = 0;
+    for (ByteBuf b : byteBuffArray) {
+      final int validBytes = b.readableBytes();
+      if (0 == validBytes) {
+        b.release();
+      } else {
+        bytesToCopy += validBytes;
+      }
+    }
+    final DrillBuf drillBuff = config.getAllocator().buffer(bytesToCopy);
+
+    for (ByteBuf b : byteBuffArray) {
+      final int validBytes = b.readableBytes();
+      drillBuff.writeBytes(b, 0, validBytes);
+    }
+
+    return drillBuff;
+  }
+
+  /**
+   * Releases all the ByteBuf inside the input ByteBuf array
+   * @param byteBuffArray - input array of ByteBuf's
+   */
+  private void releaseByteBuf(ByteBuf[] byteBuffArray) {
+    if (byteBuffArray != null) {
+      for (ByteBuf b : byteBuffArray) {
+        b.release();
+      }
+    }
+  }
+
+  /**
+   * Releases the input ByteBuf
+   * @param byteBuf - input ByteBuf
+   */
+  private void releaseByteBuf(ByteBuf byteBuf) {
+    if (byteBuf != null) {
+      byteBuf.release();
+    }
+  }
+}
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnectionManager.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/RemoteControlConnectionManager.java
similarity index 57%
copy from 
exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnectionManager.java
copy to 
exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/RemoteControlConnectionManager.java
index 6bfcbd5..6a4dc21 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnectionManager.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/RemoteControlConnectionManager.java
@@ -17,37 +17,25 @@
  */
 package org.apache.drill.exec.rpc.control;
 
-import org.apache.drill.exec.proto.BitControl.BitControlHandshake;
+import org.apache.drill.exec.proto.BitControl;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.rpc.BasicClient;
-import org.apache.drill.exec.rpc.ReconnectingConnection;
 
-/**
- * Maintains connection between two particular bits.
- */
-public class ControlConnectionManager extends 
ReconnectingConnection<ControlConnection, BitControlHandshake>{
-  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(ControlConnectionManager.class);
+public class RemoteControlConnectionManager extends ControlConnectionManager {
+  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(RemoteControlConnectionManager.class);
 
   private final ControlConnectionConfig config;
   private final DrillbitEndpoint remoteEndpoint;
 
-  public ControlConnectionManager(ControlConnectionConfig config, 
DrillbitEndpoint localEndpoint,
-                                  DrillbitEndpoint remoteEndpoint) {
-    super(
-        BitControlHandshake.newBuilder()
-            .setRpcVersion(ControlRpcConfig.RPC_VERSION)
-            .setEndpoint(localEndpoint)
-            .build(),
-        remoteEndpoint.getAddress(),
-        remoteEndpoint.getControlPort());
-
+  public RemoteControlConnectionManager(ControlConnectionConfig config, 
DrillbitEndpoint
+    localEndpoint, DrillbitEndpoint remoteEndpoint) {
+    super(localEndpoint, remoteEndpoint);
     this.config = config;
     this.remoteEndpoint = remoteEndpoint;
   }
 
   @Override
-  protected BasicClient<?, ControlConnection, BitControlHandshake, ?> 
getNewClient() {
+  protected BasicClient<?, ControlConnection, BitControl.BitControlHandshake, 
?> getNewClient() {
     return new ControlClient(config, remoteEndpoint, new 
CloseHandlerCreator());
   }
-
 }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java
index 7e286fa..7b05b81 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.rpc.data;
 
+import com.google.protobuf.MessageLite;
 import io.netty.buffer.ByteBuf;
 
 import java.util.concurrent.Semaphore;
@@ -43,7 +44,6 @@ public class DataTunnel {
   private ExecutionControls testControls;
   private org.slf4j.Logger testLogger;
 
-
   public DataTunnel(DataConnectionManager manager) {
     this.manager = manager;
   }
@@ -69,7 +69,7 @@ public class DataTunnel {
 
   public void sendRecordBatch(RpcOutcomeListener<Ack> outcomeListener, 
FragmentWritableBatch batch) {
     SendBatchAsyncListen b = new SendBatchAsyncListen(outcomeListener, batch);
-    try{
+    try {
       if (isInjectionControlSet) {
         // Wait for interruption if set. Used to simulate the fragment 
interruption while the fragment is waiting for
         // semaphore acquire. We expect the
@@ -78,9 +78,9 @@ public class DataTunnel {
 
       sendingSemaphore.acquire();
       manager.runCommand(b);
-    }catch(final InterruptedException e){
+    } catch (final InterruptedException e) {
       // Release the buffers first before informing the listener about the 
interrupt.
-      for(ByteBuf buffer : batch.getBuffers()) {
+      for (ByteBuf buffer : batch.getBuffers()) {
         buffer.release();
       }
 
@@ -119,7 +119,7 @@ public class DataTunnel {
     }
   }
 
-  private class SendBatchAsyncListen extends ListeningCommand<Ack, 
DataClientConnection> {
+  private class SendBatchAsyncListen extends ListeningCommand<Ack, 
DataClientConnection, RpcType, MessageLite> {
     final FragmentWritableBatch batch;
 
     public SendBatchAsyncListen(RpcOutcomeListener<Ack> listener, 
FragmentWritableBatch batch) {
@@ -129,7 +129,18 @@ public class DataTunnel {
 
     @Override
     public void doRpcCall(RpcOutcomeListener<Ack> outcomeListener, 
DataClientConnection connection) {
-      connection.send(new ThrottlingOutcomeListener(outcomeListener), 
RpcType.REQ_RECORD_BATCH, batch.getHeader(), Ack.class, batch.getBuffers());
+      connection.send(new ThrottlingOutcomeListener(outcomeListener), 
getRpcType(), batch.getHeader(),
+        Ack.class, batch.getBuffers());
+    }
+
+    @Override
+    public RpcType getRpcType() {
+      return RpcType.REQ_RECORD_BATCH;
+    }
+
+    @Override
+    public MessageLite getMessage() {
+      return batch.getHeader();
     }
 
     @Override
@@ -139,7 +150,7 @@ public class DataTunnel {
 
     @Override
     public void connectionFailed(FailureType type, Throwable t) {
-      for(ByteBuf buffer : batch.getBuffers()) {
+      for (ByteBuf buffer : batch.getBuffers()) {
         buffer.release();
       }
       super.connectionFailed(type, t);
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java
index e562b16..963f53a 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java
@@ -91,14 +91,16 @@ public class ControlMessageHandler implements 
RequestHandler<ControlConnection>
     }
 
     case RpcType.REQ_FRAGMENT_STATUS_VALUE:
-      bee.getContext().getWorkBus().statusUpdate( get(pBody, 
FragmentStatus.PARSER));
+      final FragmentStatus status = get(pBody, FragmentStatus.PARSER);
+      requestFragmentStatus(status);
       // TODO: Support a type of message that has no response.
       sender.send(ControlRpcConfig.OK);
       break;
 
     case RpcType.REQ_QUERY_CANCEL_VALUE: {
       final QueryId queryId = get(pBody, QueryId.PARSER);
-      if (bee.cancelForeman(queryId, null)) {
+      final Ack cancelStatus = requestQueryCancel(queryId);
+      if (cancelStatus.getOk()) {
         sender.send(ControlRpcConfig.OK);
       } else {
         sender.send(ControlRpcConfig.FAIL);
@@ -108,21 +110,14 @@ public class ControlMessageHandler implements 
RequestHandler<ControlConnection>
 
     case RpcType.REQ_INITIALIZE_FRAGMENTS_VALUE: {
       final InitializeFragments fragments = get(pBody, 
InitializeFragments.PARSER);
-      final DrillbitContext drillbitContext = bee.getContext();
-      for(int i = 0; i < fragments.getFragmentCount(); i++) {
-        startNewFragment(fragments.getFragment(i), drillbitContext);
-      }
+      initializeFragment(fragments);
       sender.send(ControlRpcConfig.OK);
       break;
     }
 
     case RpcType.REQ_QUERY_STATUS_VALUE: {
       final QueryId queryId = get(pBody, QueryId.PARSER);
-      final Foreman foreman = bee.getForemanForQueryId(queryId);
-      if (foreman == null) {
-        throw new RpcException("Query not running on node.");
-      }
-      final QueryProfile profile = foreman.getQueryManager().getQueryProfile();
+      final QueryProfile profile = requestQueryStatus(queryId);
       sender.send(new Response(RpcType.RESP_QUERY_STATUS, profile));
       break;
     }
@@ -145,7 +140,7 @@ public class ControlMessageHandler implements 
RequestHandler<ControlConnection>
    * @param fragment
    * @throws UserRpcException
    */
-  private void startNewFragment(final PlanFragment fragment, final 
DrillbitContext drillbitContext)
+  public void startNewFragment(final PlanFragment fragment, final 
DrillbitContext drillbitContext)
       throws UserRpcException {
     logger.debug("Received remote fragment start instruction", fragment);
 
@@ -182,7 +177,7 @@ public class ControlMessageHandler implements 
RequestHandler<ControlConnection>
   /* (non-Javadoc)
    * @see 
org.apache.drill.exec.work.batch.BitComHandler#cancelFragment(org.apache.drill.exec.proto.ExecProtos.FragmentHandle)
    */
-  private Ack cancelFragment(final FragmentHandle handle) {
+  public Ack cancelFragment(final FragmentHandle handle) {
     /**
      * For case 1, see {@link 
org.apache.drill.exec.work.foreman.QueryManager#cancelExecutingFragments}.
      * In comments below, "active" refers to fragment states: SENDING, 
AWAITING_ALLOCATION, RUNNING and
@@ -213,7 +208,7 @@ public class ControlMessageHandler implements 
RequestHandler<ControlConnection>
     return Acks.OK;
   }
 
-  private Ack resumeFragment(final FragmentHandle handle) {
+  public Ack resumeFragment(final FragmentHandle handle) {
     // resume a pending fragment
     final FragmentManager manager = 
bee.getContext().getWorkBus().getFragmentManager(handle);
     if (manager != null) {
@@ -233,7 +228,7 @@ public class ControlMessageHandler implements 
RequestHandler<ControlConnection>
     return Acks.OK;
   }
 
-  private Ack receivingFragmentFinished(final FinishedReceiver 
finishedReceiver) {
+  public Ack receivingFragmentFinished(final FinishedReceiver 
finishedReceiver) {
 
     final FragmentManager manager = 
bee.getContext().getWorkBus().getFragmentManager(finishedReceiver.getSender());
 
@@ -254,6 +249,32 @@ public class ControlMessageHandler implements 
RequestHandler<ControlConnection>
     return Acks.OK;
   }
 
+  public Ack requestFragmentStatus(FragmentStatus status) {
+    bee.getContext().getWorkBus().statusUpdate( status);
+    return Acks.OK;
+  }
+
+  public Ack requestQueryCancel(QueryId queryId) {
+    return bee.cancelForeman(queryId, null) ? Acks.OK : Acks.FAIL;
+  }
+
+  public Ack initializeFragment(InitializeFragments fragments) throws 
RpcException {
+    final DrillbitContext drillbitContext = bee.getContext();
+    for (int i = 0; i < fragments.getFragmentCount(); i++) {
+      startNewFragment(fragments.getFragment(i), drillbitContext);
+    }
+
+    return Acks.OK;
+  }
+
+  public QueryProfile requestQueryStatus(QueryId queryId) throws RpcException {
+    final Foreman foreman = bee.getForemanForQueryId(queryId);
+    if (foreman == null) {
+      throw new RpcException("Query not running on node.");
+    }
+    return foreman.getQueryManager().getQueryProfile();
+  }
+
   public CustomHandlerRegistry getHandlerRegistry() {
     return handlerRegistry;
   }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentsRunner.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentsRunner.java
index 3d051bb..91b2a2a 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentsRunner.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentsRunner.java
@@ -44,11 +44,8 @@ import org.apache.drill.exec.work.EndpointListener;
 import org.apache.drill.exec.work.WorkManager.WorkerBee;
 import org.apache.drill.exec.work.fragment.FragmentExecutor;
 import org.apache.drill.exec.work.fragment.FragmentStatusReporter;
-import org.apache.drill.exec.work.fragment.NonRootFragmentManager;
 import org.apache.drill.exec.work.fragment.RootFragmentManager;
 
-
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.LinkedList;
 import java.util.List;
@@ -141,7 +138,6 @@ public class FragmentsRunner {
     }
   }
 
-
   /**
    * Set up the non-root fragments for execution. Some may be local, and some 
may be remote.
    * Messages are sent immediately, so they may start returning data even 
before we complete this.
@@ -158,17 +154,11 @@ public class FragmentsRunner {
      * executed there. We need to start up the intermediate fragments first so 
that they will be
      * ready once the leaf fragments start producing data. To satisfy both of 
these, we will
      * make a pass through the fragments and put them into the remote maps 
according to their
-     * leaf/intermediate state, as well as their target drillbit. Also filter 
the leaf/intermediate
-     * fragments which are assigned to run on local Drillbit node (or Foreman 
node) into separate lists.
-     *
-     * This will help to schedule local
+     * leaf/intermediate state, as well as their target drillbit.
      */
-    final Multimap<DrillbitEndpoint, PlanFragment> remoteLeafFragmentMap = 
ArrayListMultimap.create();
-    final List<PlanFragment> localLeafFragmentList = new ArrayList<>();
-    final Multimap<DrillbitEndpoint, PlanFragment> remoteIntFragmentMap = 
ArrayListMultimap.create();
-    final List<PlanFragment> localIntFragmentList = new ArrayList<>();
+    final Multimap<DrillbitEndpoint, PlanFragment> leafFragmentMap = 
ArrayListMultimap.create();
+    final Multimap<DrillbitEndpoint, PlanFragment> intFragmentMap = 
ArrayListMultimap.create();
 
-    final DrillbitEndpoint localDrillbitEndpoint = 
drillbitContext.getEndpoint();
     // record all fragments for status purposes.
     for (final PlanFragment planFragment : fragments) {
 
@@ -180,44 +170,38 @@ public class FragmentsRunner {
       foreman.getQueryManager().addFragmentStatusTracker(planFragment, false);
 
       if (planFragment.getLeafFragment()) {
-        updateFragmentCollection(planFragment, localDrillbitEndpoint, 
localLeafFragmentList, remoteLeafFragmentMap);
+        leafFragmentMap.put(planFragment.getAssignment(), planFragment);
       } else {
-        updateFragmentCollection(planFragment, localDrillbitEndpoint, 
localIntFragmentList, remoteIntFragmentMap);
+        intFragmentMap.put(planFragment.getAssignment(), planFragment);
       }
     }
 
     /*
      * We need to wait for the intermediates to be sent so that they'll be set 
up by the time
-     * the leaves start producing data. We'll use this latch to wait for the 
responses.
+     * the leaves start producing data. We'll use this latch to wait for the 
responses. All the local intermediate
+     * fragments are submitted locally without creating any actual control 
connection to itself.
      *
      * However, in order not to hang the process if any of the RPC requests 
fails, we always
      * count down (see FragmentSubmitFailures), but we count the number of 
failures so that we'll
      * know if any submissions did fail.
      */
-    scheduleRemoteIntermediateFragments(remoteIntFragmentMap);
-
-    // Setup local intermediate fragments
-    for (final PlanFragment fragment : localIntFragmentList) {
-      startLocalFragment(fragment);
-    }
+    scheduleIntermediateFragments(intFragmentMap);
 
     injector.injectChecked(foreman.getQueryContext().getExecutionControls(), 
"send-fragments", ForemanException.class);
     /*
      * Send the remote (leaf) fragments; we don't wait for these. Any problems 
will come in through
-     * the regular sendListener event delivery.
+     * the regular sendListener event delivery˚. All the local leaf fragments 
are submitted locally without creating
+     * any actual control connection to itself.
      */
-    for (final DrillbitEndpoint ep : remoteLeafFragmentMap.keySet()) {
-      sendRemoteFragments(ep, remoteLeafFragmentMap.get(ep), null, null);
-    }
-
-    // Setup local leaf fragments
-    for (final PlanFragment fragment : localLeafFragmentList) {
-      startLocalFragment(fragment);
+    for (final DrillbitEndpoint ep : leafFragmentMap.keySet()) {
+      sendRemoteFragments(ep, leafFragmentMap.get(ep), null, null);
     }
   }
 
   /**
-   * Send all the remote fragments belonging to a single target drillbit in 
one request.
+   * Send all the remote fragments belonging to a single target drillbit in 
one request. If the assignment
+   * DrillbitEndpoint is local Drillbit then {@link 
Controller#getTunnel(DrillbitEndpoint)} takes care of submitting it
+   * locally without actually creating a Control Connection to itself.
    *
    * @param assignment the drillbit assigned to these fragments
    * @param fragments the set of fragments
@@ -241,41 +225,21 @@ public class FragmentsRunner {
   }
 
   /**
-   * Add planFragment into either of local fragment list or remote fragment 
map based on assigned Drillbit Endpoint node
-   * and the local Drillbit Endpoint.
+   * Send intermediate fragment to the assigned Drillbit node. If the 
assignment DrillbitEndpoint is local Drillbit
+   * then {@link Controller#getTunnel(DrillbitEndpoint)} takes care of 
submitting it locally without actually creating
+   * a Control Connection to itself. Throws {@link UserException} in case of 
failure to send the fragment.
    *
-   * @param planFragment plan fragment
-   * @param localEndPoint local endpoint
-   * @param localFragmentList local fragment list
-   * @param remoteFragmentMap remote fragment map
+   * @param intermediateFragmentMap - Map of Drillbit Endpoint to list of 
intermediate PlanFragment's
    */
-  private void updateFragmentCollection(final PlanFragment planFragment, final 
DrillbitEndpoint localEndPoint,
-                                        final List<PlanFragment> 
localFragmentList,
-                                        final Multimap<DrillbitEndpoint, 
PlanFragment> remoteFragmentMap) {
-    final DrillbitEndpoint assignedDrillbit = planFragment.getAssignment();
+  private void scheduleIntermediateFragments(final Multimap<DrillbitEndpoint, 
PlanFragment> intermediateFragmentMap) {
 
-    if (assignedDrillbit.equals(localEndPoint)) {
-      localFragmentList.add(planFragment);
-    } else {
-      remoteFragmentMap.put(assignedDrillbit, planFragment);
-    }
-  }
-
-  /**
-   * Send remote intermediate fragment to the assigned Drillbit node.
-   * Throw exception in case of failure to send the fragment.
-   *
-   * @param remoteFragmentMap - Map of Drillbit Endpoint to list of 
PlanFragment's
-   */
-  private void scheduleRemoteIntermediateFragments(final 
Multimap<DrillbitEndpoint, PlanFragment> remoteFragmentMap) {
-
-    final int numIntFragments = remoteFragmentMap.keySet().size();
+    final int numIntFragments = intermediateFragmentMap.keySet().size();
     final ExtendedLatch endpointLatch = new ExtendedLatch(numIntFragments);
     final FragmentSubmitFailures fragmentSubmitFailures = new 
FragmentSubmitFailures();
 
     // send remote intermediate fragments
-    for (final DrillbitEndpoint ep : remoteFragmentMap.keySet()) {
-      sendRemoteFragments(ep, remoteFragmentMap.get(ep), endpointLatch, 
fragmentSubmitFailures);
+    for (final DrillbitEndpoint ep : intermediateFragmentMap.keySet()) {
+      sendRemoteFragments(ep, intermediateFragmentMap.get(ep), endpointLatch, 
fragmentSubmitFailures);
     }
 
     final long timeout = 
drillbitContext.getOptionManager().getLong(ExecConstants.FRAG_RUNNER_RPC_TIMEOUT)
 * numIntFragments;
@@ -313,29 +277,6 @@ public class FragmentsRunner {
     }
   }
 
-
-  /**
-   * Start the locally assigned leaf or intermediate fragment
-   *
-   * @param fragment fragment
-   */
-  private void startLocalFragment(final PlanFragment fragment) throws 
ExecutionSetupException {
-    logger.debug("Received local fragment start instruction", fragment);
-
-    final FragmentContextImpl fragmentContext = new 
FragmentContextImpl(drillbitContext, fragment, 
drillbitContext.getFunctionImplementationRegistry());
-    final FragmentStatusReporter statusReporter = new 
FragmentStatusReporter(fragmentContext);
-    final FragmentExecutor fragmentExecutor = new 
FragmentExecutor(fragmentContext, fragment, statusReporter);
-
-    // we either need to start the fragment if it is a leaf fragment, or set 
up a fragment manager if it is non leaf.
-    if (fragment.getLeafFragment()) {
-      bee.addFragmentRunner(fragmentExecutor);
-    } else {
-      // isIntermediate, store for incoming data.
-      final NonRootFragmentManager manager = new 
NonRootFragmentManager(fragment, fragmentExecutor, statusReporter);
-      drillbitContext.getWorkBus().addFragmentManager(manager);
-    }
-  }
-
   /**
    * Used by {@link FragmentSubmitListener} to track the number of submission 
failures.
    */
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentStatusReporter.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentStatusReporter.java
index eb57658..c0c5b04 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentStatusReporter.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentStatusReporter.java
@@ -40,12 +40,9 @@ public class FragmentStatusReporter implements AutoCloseable 
{
 
   protected final AtomicReference<DrillbitEndpoint> foremanDrillbit;
 
-  protected final DrillbitEndpoint localDrillbit;
-
   public FragmentStatusReporter(final ExecutorFragmentContext context) {
     this.context = context;
     this.foremanDrillbit = new AtomicReference<>(context.getForemanEndpoint());
-    this.localDrillbit = context.getEndpoint();
   }
 
   /**
@@ -119,14 +116,10 @@ public class FragmentStatusReporter implements 
AutoCloseable {
       return;
     }
 
-    if (localDrillbit.equals(foremanNode)) {
-      // Update the status locally
-      context.getWorkEventbus().statusUpdate(status);
-    } else {
-      // Send the status via Control Tunnel to remote foreman node
-      final ControlTunnel tunnel = 
context.getController().getTunnel(foremanNode);
-      tunnel.sendFragmentStatus(status);
-    }
+    // Send status for both local and remote foreman node via Tunnel. For 
local there won't be any network connection
+    // created and it will be submitted locally using 
LocalControlConnectionManager
+    final ControlTunnel tunnel = 
context.getController().getTunnel(foremanNode);
+    tunnel.sendFragmentStatus(status);
   }
 
   /**
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/control/ConnectionManagerRegistryTest.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/control/ConnectionManagerRegistryTest.java
new file mode 100644
index 0000000..667e440
--- /dev/null
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/control/ConnectionManagerRegistryTest.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.rpc.control;
+
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
+public class ConnectionManagerRegistryTest {
+
+  private static final DrillbitEndpoint localEndpoint = 
DrillbitEndpoint.newBuilder()
+    .setAddress("10.0.0.1")
+    .setControlPort(31012)
+    .setDataPort(31011)
+    .setUserPort(31010)
+    .setState(DrillbitEndpoint.State.STARTUP)
+    .build();
+
+  private static final DrillbitEndpoint foremanEndpoint = 
DrillbitEndpoint.newBuilder()
+    .setAddress("10.0.0.2")
+    .setControlPort(31012)
+    .setDataPort(31011)
+    .setUserPort(31010)
+    .setState(DrillbitEndpoint.State.STARTUP)
+    .build();
+
+  private static ControlConnectionConfig config;
+
+  @BeforeClass
+  public static void setup() {
+    config = mock(ControlConnectionConfig.class);
+  }
+
+  @Test
+  public void testLocalConnectionManager() {
+    final ConnectionManagerRegistry registry = new 
ConnectionManagerRegistry(config);
+    registry.setLocalEndpoint(localEndpoint);
+    final ControlConnectionManager manager = 
registry.getConnectionManager(localEndpoint);
+    assertTrue(registry.iterator().hasNext());
+    assertEquals(manager, registry.iterator().next());
+    assertTrue(manager instanceof LocalControlConnectionManager);
+  }
+
+  @Test
+  public void testLocalConnectionManager_differentState() {
+    final ConnectionManagerRegistry registry = new 
ConnectionManagerRegistry(config);
+    registry.setLocalEndpoint(localEndpoint);
+
+    final DrillbitEndpoint foremanEndpoint2 = 
localEndpoint.toBuilder().setState(DrillbitEndpoint.State.ONLINE).build();
+    final ControlConnectionManager manager = 
registry.getConnectionManager(foremanEndpoint2);
+    assertTrue(registry.iterator().hasNext());
+    assertEquals(manager, registry.iterator().next());
+    assertTrue(manager instanceof LocalControlConnectionManager);
+  }
+
+  @Test
+  public void testLocalConnectionManager_differentUserDataPort() {
+    final ConnectionManagerRegistry registry = new 
ConnectionManagerRegistry(config);
+    registry.setLocalEndpoint(localEndpoint);
+
+    final DrillbitEndpoint foremanEndpoint2 = localEndpoint.toBuilder()
+      .setState(DrillbitEndpoint.State.ONLINE)
+      .setUserPort(10000)
+      .setDataPort(11000)
+      .build();
+    final ControlConnectionManager manager = 
registry.getConnectionManager(foremanEndpoint2);
+    assertTrue(registry.iterator().hasNext());
+    assertEquals(manager, registry.iterator().next());
+    assertTrue(manager instanceof LocalControlConnectionManager);
+  }
+
+  @Test
+  public void testRemoteConnectionManager() {
+    final ConnectionManagerRegistry registry = new 
ConnectionManagerRegistry(config);
+    registry.setLocalEndpoint(localEndpoint);
+    final ControlConnectionManager manager = 
registry.getConnectionManager(foremanEndpoint);
+    assertTrue(registry.iterator().hasNext());
+    assertTrue(manager instanceof RemoteControlConnectionManager);
+  }
+
+  @Test
+  public void testRemoteConnectionManager_differentControlPort() {
+    final ConnectionManagerRegistry registry = new 
ConnectionManagerRegistry(config);
+    registry.setLocalEndpoint(localEndpoint);
+
+    final DrillbitEndpoint foremanEndpoint2 = localEndpoint.toBuilder()
+      .setControlPort(10000)
+      .build();
+    final ControlConnectionManager manager = 
registry.getConnectionManager(foremanEndpoint2);
+    assertTrue(registry.iterator().hasNext());
+    assertEquals(manager, registry.iterator().next());
+    assertTrue(manager instanceof RemoteControlConnectionManager);
+  }
+
+  @Test
+  public void testRemoteConnectionManager_differentAddress() {
+    final ConnectionManagerRegistry registry = new 
ConnectionManagerRegistry(config);
+    registry.setLocalEndpoint(localEndpoint);
+
+    final DrillbitEndpoint foremanEndpoint2 = localEndpoint.toBuilder()
+      .setAddress("10.0.0.0")
+      .build();
+    final ControlConnectionManager manager = 
registry.getConnectionManager(foremanEndpoint2);
+    assertTrue(registry.iterator().hasNext());
+    assertEquals(manager, registry.iterator().next());
+    assertTrue(manager instanceof RemoteControlConnectionManager);
+  }
+
+  @Test
+  public void testRemoteAndLocalConnectionManager() {
+    final ConnectionManagerRegistry registry = new 
ConnectionManagerRegistry(config);
+    registry.setLocalEndpoint(localEndpoint);
+
+    final DrillbitEndpoint foremanEndpoint2 = localEndpoint.toBuilder()
+      .setAddress("10.0.0.0")
+      .build();
+
+    final ControlConnectionManager remoteManager = 
registry.getConnectionManager(foremanEndpoint2);
+    final ControlConnectionManager localManager = 
registry.getConnectionManager(localEndpoint);
+
+    final ControlConnectionManager remoteManager_2 = 
registry.getConnectionManager(foremanEndpoint2);
+    final ControlConnectionManager localManager_2 = 
registry.getConnectionManager(localEndpoint);
+
+    assertTrue(remoteManager instanceof RemoteControlConnectionManager);
+    assertEquals(remoteManager, remoteManager_2);
+
+    assertTrue(localManager instanceof LocalControlConnectionManager);
+    assertEquals(localManager, localManager_2);
+  }
+}
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/control/TestLocalControlConnectionManager.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/control/TestLocalControlConnectionManager.java
new file mode 100644
index 0000000..c0cf09b
--- /dev/null
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/control/TestLocalControlConnectionManager.java
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.rpc.control;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.drill.exec.proto.BitControl;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.proto.ExecProtos;
+import org.apache.drill.exec.proto.GeneralRPCProtos;
+import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.rpc.Acks;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.RpcOutcomeListener;
+import org.apache.drill.exec.work.batch.ControlMessageHandler;
+import org.hamcrest.Description;
+import org.hamcrest.TypeSafeMatcher;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.util.concurrent.CountDownLatch;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestLocalControlConnectionManager {
+
+  private static final DrillbitEndpoint localEndpoint = 
DrillbitEndpoint.newBuilder()
+    .setAddress("10.0.0.1")
+    .setControlPort(31011)
+    .setState(DrillbitEndpoint.State.STARTUP)
+    .build();
+
+  private static ControlConnectionConfig mockConfig;
+
+  private static ControlMessageHandler mockHandler;
+
+  private static ControlTunnel controlTunnel;
+
+  private static CountDownLatch latch;
+
+  private static final String NEGATIVE_ACK_MESSAGE = "Negative Ack received";
+
+  private static final RpcOutcomeListener<GeneralRPCProtos.Ack> 
outcomeListener =
+    new RpcOutcomeListener<GeneralRPCProtos.Ack>() {
+    @Override
+    public void failed(RpcException ex) {
+      throw new IllegalStateException(ex);
+    }
+
+    @Override
+    public void success(GeneralRPCProtos.Ack value, ByteBuf buffer) {
+      if (value.getOk()) {
+        latch.countDown();
+      } else {
+        throw new IllegalStateException(NEGATIVE_ACK_MESSAGE);
+      }
+    }
+
+    @Override
+    public void interrupted(InterruptedException e) {
+      // Do nothing
+    }
+  };
+
+  @Rule
+  public ExpectedException exceptionThrown = ExpectedException.none();
+
+  @BeforeClass
+  public static void setup() {
+    mockConfig = mock(ControlConnectionConfig.class);
+    final ConnectionManagerRegistry registry = new 
ConnectionManagerRegistry(mockConfig);
+    registry.setLocalEndpoint(localEndpoint);
+    ControlConnectionManager manager = 
registry.getConnectionManager(localEndpoint);
+    assertTrue(manager instanceof LocalControlConnectionManager);
+    controlTunnel = new ControlTunnel(manager);
+  }
+
+  @Before
+  public void setupForTest() {
+    mockHandler = mock(ControlMessageHandler.class);
+    when(mockConfig.getMessageHandler()).thenReturn(mockHandler);
+  }
+
+  /**
+   * Verify that SendFragmentStatus is handled correctly using ControlTunnel 
with LocalControlConnectionManager
+   */
+  @Test
+  public void testLocalSendFragmentStatus_Success() throws Exception {
+    final UserBitShared.QueryId mockQueryId = 
UserBitShared.QueryId.getDefaultInstance();
+    final UserBitShared.QueryProfile mockProfile = 
UserBitShared.QueryProfile.getDefaultInstance();
+    when(mockHandler.requestQueryStatus(mockQueryId)).thenReturn(mockProfile);
+    final UserBitShared.QueryProfile returnedProfile = 
controlTunnel.requestQueryProfile(mockQueryId).checkedGet();
+    assertEquals(returnedProfile, mockProfile);
+  }
+
+  /**
+   * Verify that SendFragmentStatus failure scenario is handled correctly 
using ControlTunnel with
+   * LocalControlConnectionManager
+   */
+  @Test
+  public void testLocalSendFragmentStatus_Failure() throws Exception {
+    final UserBitShared.QueryId mockQueryId = 
UserBitShared.QueryId.getDefaultInstance();
+    final String exceptionMessage = "Testing failure case";
+    exceptionThrown.expect(RpcException.class);
+    exceptionThrown.expectMessage(exceptionMessage);
+    when(mockHandler.requestQueryStatus(mockQueryId)).thenThrow(new 
RpcException(exceptionMessage));
+    controlTunnel.requestQueryProfile(mockQueryId).checkedGet();
+  }
+
+  /**
+   * Verify that CancelFragment with positive ack is handled correctly using 
ControlTunnel with
+   * LocalControlConnectionManager
+   */
+  @Test
+  public void testLocalCancelFragment_PositiveAck() throws Exception {
+    final ExecProtos.FragmentHandle mockHandle = 
ExecProtos.FragmentHandle.getDefaultInstance();
+    latch = new CountDownLatch(1);
+    final GeneralRPCProtos.Ack mockResponse = Acks.OK;
+    when(mockHandler.cancelFragment(mockHandle)).thenReturn(mockResponse);
+    controlTunnel.cancelFragment(outcomeListener, mockHandle);
+    latch.await();
+  }
+
+  /**
+   * Verify that CancelFragment with negative ack is handled correctly using 
ControlTunnel with
+   * LocalControlConnectionManager
+   */
+  @Test
+  public void testLocalCancelFragment_NegativeAck() throws Exception {
+    final ExecProtos.FragmentHandle mockHandle = 
ExecProtos.FragmentHandle.getDefaultInstance();
+    latch = new CountDownLatch(1);
+    exceptionThrown.expect(IllegalStateException.class);
+    exceptionThrown.expectMessage(NEGATIVE_ACK_MESSAGE);
+    final GeneralRPCProtos.Ack mockResponse = Acks.FAIL;
+    when(mockHandler.cancelFragment(mockHandle)).thenReturn(mockResponse);
+    controlTunnel.cancelFragment(outcomeListener, mockHandle);
+    latch.await();
+  }
+
+  /**
+   * Verify that InitializeFragments with positive ack is handled correctly 
using ControlTunnel with
+   * LocalControlConnectionManager
+   */
+  @Test
+  public void testLocalSendFragments_PositiveAck() throws Exception {
+    final BitControl.InitializeFragments mockFragments = 
BitControl.InitializeFragments.getDefaultInstance();
+    latch = new CountDownLatch(1);
+    final GeneralRPCProtos.Ack mockResponse = Acks.OK;
+    
when(mockHandler.initializeFragment(mockFragments)).thenReturn(mockResponse);
+    controlTunnel.sendFragments(outcomeListener, mockFragments);
+    latch.await();
+  }
+
+  /**
+   * Verify that InitializeFragments with negative ack is handled correctly 
using ControlTunnel with
+   * LocalControlConnectionManager
+   */
+  @Test
+  public void testLocalSendFragments_NegativeAck() throws Exception {
+    final BitControl.InitializeFragments mockFragments = 
BitControl.InitializeFragments.getDefaultInstance();
+    latch = new CountDownLatch(1);
+    exceptionThrown.expect(IllegalStateException.class);
+    exceptionThrown.expectMessage(NEGATIVE_ACK_MESSAGE);
+    final GeneralRPCProtos.Ack mockResponse = Acks.FAIL;
+    
when(mockHandler.initializeFragment(mockFragments)).thenReturn(mockResponse);
+    controlTunnel.sendFragments(outcomeListener, mockFragments);
+    latch.await();
+  }
+
+  /**
+   * Verify that InitializeFragments failure case is handled correctly using 
ControlTunnel with
+   * LocalControlConnectionManager
+   */
+  @Test
+  public void testLocalSendFragments_Failure() throws Exception {
+    final BitControl.InitializeFragments mockFragments = 
BitControl.InitializeFragments.getDefaultInstance();
+    latch = new CountDownLatch(1);
+    exceptionThrown.expect(IllegalStateException.class);
+    exceptionThrown.expectCause(new 
TypeSafeMatcher<Throwable>(RpcException.class) {
+      @Override
+      protected boolean matchesSafely(Throwable throwable) {
+        return (throwable != null && throwable instanceof RpcException);
+      }
+
+      @Override
+      public void describeTo(Description description) {
+        // Do nothing
+      }
+    });
+    when(mockHandler.initializeFragment(mockFragments)).thenThrow(new 
RpcException("Failed to initialize"));
+    controlTunnel.sendFragments(outcomeListener, mockFragments);
+    latch.await();
+  }
+
+  /**
+   * Verify that UnpauseFragment is handled correctly using ControlTunnel with 
LocalControlConnectionManager
+   */
+  @Test
+  public void testUnpauseFragments() throws Exception {
+    final ExecProtos.FragmentHandle mockHandle = 
ExecProtos.FragmentHandle.getDefaultInstance();
+    latch = new CountDownLatch(1);
+    final GeneralRPCProtos.Ack mockResponse = Acks.OK;
+    when(mockHandler.resumeFragment(mockHandle)).thenReturn(mockResponse);
+    controlTunnel.unpauseFragment(outcomeListener, mockHandle);
+    latch.await();
+  }
+
+  /**
+   * Verify that RequestQueryStatus is handled correctly using ControlTunnel 
with LocalControlConnectionManager
+   */
+  @Test
+  public void testRequestQueryStatus() throws Exception {
+    final UserBitShared.QueryId mockQueryId = 
UserBitShared.QueryId.getDefaultInstance();
+    final UserBitShared.QueryProfile mockProfile = 
UserBitShared.QueryProfile.getDefaultInstance();
+    when(mockHandler.requestQueryStatus(mockQueryId)).thenReturn(mockProfile);
+    final UserBitShared.QueryProfile returnedProfile = 
controlTunnel.requestQueryProfile(mockQueryId).checkedGet();
+    assertEquals(returnedProfile, mockProfile);
+  }
+
+  /**
+   * Verify that CancelQuery with positive ack is handled correctly using 
ControlTunnel with
+   * LocalControlConnectionManager
+   */
+  @Test
+  public void testCancelQuery_PositiveAck() throws Exception {
+    final UserBitShared.QueryId mockQueryId = 
UserBitShared.QueryId.getDefaultInstance();
+    final GeneralRPCProtos.Ack mockResponse = Acks.OK;
+    when(mockHandler.requestQueryCancel(mockQueryId)).thenReturn(mockResponse);
+    GeneralRPCProtos.Ack response = 
controlTunnel.requestCancelQuery(mockQueryId).checkedGet();
+    assertEquals(response, mockResponse);
+  }
+
+  /**
+   * Verify that CancelQuery with negative ack is handled correctly using 
ControlTunnel with
+   * LocalControlConnectionManager
+   */
+  @Test
+  public void testCancelQuery_NegativeAck() throws Exception {
+    final UserBitShared.QueryId mockQueryId = 
UserBitShared.QueryId.getDefaultInstance();
+    final GeneralRPCProtos.Ack mockResponse = Acks.FAIL;
+    when(mockHandler.requestQueryCancel(mockQueryId)).thenReturn(mockResponse);
+    GeneralRPCProtos.Ack response = 
controlTunnel.requestCancelQuery(mockQueryId).checkedGet();
+    assertEquals(response, mockResponse);
+  }
+
+  /**
+   * Verify that FinishedReceiver is handled correctly using ControlTunnel 
with LocalControlConnectionManager
+   */
+  @Test
+  public void testInformReceiverFinished_success() throws Exception {
+    final BitControl.FinishedReceiver finishedReceiver = 
BitControl.FinishedReceiver.getDefaultInstance();
+    latch = new CountDownLatch(1);
+    final GeneralRPCProtos.Ack mockResponse = Acks.OK;
+    
when(mockHandler.receivingFragmentFinished(finishedReceiver)).thenReturn(mockResponse);
+    controlTunnel.informReceiverFinished(outcomeListener, finishedReceiver);
+    latch.await();
+  }
+}
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/security/TestUserBitKerberos.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/security/TestUserBitKerberos.java
index aaff154..04d54d5 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/security/TestUserBitKerberos.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/security/TestUserBitKerberos.java
@@ -19,16 +19,16 @@ package org.apache.drill.exec.rpc.user.security;
 
 import com.google.common.collect.Lists;
 import com.typesafe.config.ConfigValueFactory;
-import org.apache.drill.test.BaseTestQuery;
 import org.apache.drill.categories.SecurityTest;
-import org.apache.drill.common.config.DrillProperties;
 import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.config.DrillProperties;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.rpc.control.ControlRpcMetrics;
 import org.apache.drill.exec.rpc.data.DataRpcMetrics;
 import org.apache.drill.exec.rpc.security.KerberosHelper;
 import org.apache.drill.exec.rpc.user.UserRpcMetrics;
 import 
org.apache.drill.exec.rpc.user.security.testing.UserAuthenticatorTestImpl;
+import org.apache.drill.test.BaseTestQuery;
 import org.apache.hadoop.security.authentication.util.KerberosName;
 import org.apache.hadoop.security.authentication.util.KerberosUtil;
 import org.apache.kerby.kerberos.kerb.client.JaasKrbUtil;
@@ -178,6 +178,38 @@ public class TestUserBitKerberos extends BaseTestQuery {
     assertTrue(0 == 
DataRpcMetrics.getInstance().getUnEncryptedConnectionCount());
   }
 
+  @Test
+  public void testUnecryptedConnectionCounter_LocalControlMessage() throws 
Exception {
+    final Properties connectionProps = new Properties();
+    connectionProps.setProperty(DrillProperties.SERVICE_PRINCIPAL, 
krbHelper.SERVER_PRINCIPAL);
+    connectionProps.setProperty(DrillProperties.KERBEROS_FROM_SUBJECT, "true");
+    final Subject clientSubject = 
JaasKrbUtil.loginUsingKeytab(krbHelper.CLIENT_PRINCIPAL,
+      krbHelper.clientKeytab.getAbsoluteFile());
+
+    Subject.doAs(clientSubject, new PrivilegedExceptionAction<Void>() {
+      @Override
+      public Void run() throws Exception {
+        updateClient(connectionProps);
+        return null;
+      }
+    });
+
+    // Run query on memory system table this sends remote fragments to all 
Drillbit and Drillbits then send data
+    // using data channel. In this test we have only 1 Drillbit so there 
should not be any control connection but a
+    // local data connections
+    testSql("SELECT * FROM sys.memory");
+
+    // Check encrypted counters value
+    assertTrue(0 == 
UserRpcMetrics.getInstance().getEncryptedConnectionCount());
+    assertTrue(0 == 
ControlRpcMetrics.getInstance().getEncryptedConnectionCount());
+    assertTrue(0 == 
DataRpcMetrics.getInstance().getEncryptedConnectionCount());
+
+    // Check unencrypted counters value
+    assertTrue(1 == 
UserRpcMetrics.getInstance().getUnEncryptedConnectionCount());
+    assertTrue(0 == 
ControlRpcMetrics.getInstance().getUnEncryptedConnectionCount());
+    assertTrue(2 == 
DataRpcMetrics.getInstance().getUnEncryptedConnectionCount());
+  }
+
   @AfterClass
   public static void cleanTest() throws Exception {
     krbHelper.stopKdc();
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/work/fragment/FragmentStatusReporterTest.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/work/fragment/FragmentStatusReporterTest.java
index 83c1eca..a119a8c 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/work/fragment/FragmentStatusReporterTest.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/work/fragment/FragmentStatusReporterTest.java
@@ -17,54 +17,44 @@
  */
 package org.apache.drill.exec.work.fragment;
 
-import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
-import org.apache.drill.exec.rpc.control.Controller;
-import org.apache.drill.exec.rpc.control.WorkEventBus;
-import org.junit.Before;
-import org.junit.Test;
-
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.ops.FragmentContextImpl;
 import org.apache.drill.exec.ops.FragmentStats;
 import org.apache.drill.exec.proto.BitControl.FragmentStatus;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.proto.UserBitShared.FragmentState;
 import org.apache.drill.exec.rpc.control.ControlTunnel;
+import org.apache.drill.exec.rpc.control.Controller;
+import org.junit.Before;
+import org.junit.Test;
 
-import static org.junit.Assert.assertTrue;
+import static 
org.apache.drill.exec.proto.UserBitShared.FragmentState.CANCELLATION_REQUESTED;
+import static org.apache.drill.exec.proto.UserBitShared.FragmentState.FAILED;
+import static org.apache.drill.exec.proto.UserBitShared.FragmentState.RUNNING;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyZeroInteractions;
 import static org.mockito.Mockito.when;
 
-import static 
org.apache.drill.exec.proto.UserBitShared.FragmentState.CANCELLATION_REQUESTED;
-import static org.apache.drill.exec.proto.UserBitShared.FragmentState.FAILED;
-import static org.apache.drill.exec.proto.UserBitShared.FragmentState.RUNNING;
-
 public class FragmentStatusReporterTest {
 
   private FragmentStatusReporter statusReporter;
 
   private ControlTunnel foremanTunnel;
 
-  private FragmentContextImpl context;
-
   @Before
   public void setUp() throws Exception {
-    context = mock(FragmentContextImpl.class);
+    final FragmentContextImpl context = mock(FragmentContextImpl.class);
     Controller controller = mock(Controller.class);
 
-    // Create 2 different endpoint such that foremanEndpoint is different than
-    // localEndpoint
-    DrillbitEndpoint localEndpoint = 
DrillbitEndpoint.newBuilder().setAddress("10.0.0.1").build();
+    // Create Foreman Endpoint and it's tunnel
     DrillbitEndpoint foremanEndpoint = 
DrillbitEndpoint.newBuilder().setAddress("10.0.0.2").build();
     foremanTunnel = mock(ControlTunnel.class);
 
-    when(context.getEndpoint()).thenReturn(localEndpoint);
     when(context.getController()).thenReturn(controller);
     when(controller.getTunnel(foremanEndpoint)).thenReturn(foremanTunnel);
 
@@ -120,49 +110,4 @@ public class FragmentStatusReporterTest {
     statusReporter.stateChanged(CANCELLATION_REQUESTED);
     verify(foremanTunnel).sendFragmentStatus(any(FragmentStatus.class));
   }
-
-
-  /**
-   * With LocalEndpoint and Foreman Endpoint being same node, test that status 
change
-   * message doesn't happen via Control Tunnel instead it happens locally via 
WorkEventBus
-   *
-   * @throws Exception
-   */
-  @Test
-  public void testStateChangedLocalForeman() throws Exception {
-
-    DrillbitEndpoint localEndpoint = 
DrillbitEndpoint.newBuilder().setAddress("10.0.0.1").build();
-
-    when(context.getEndpoint()).thenReturn(localEndpoint);
-    when(context.getForemanEndpoint()).thenReturn(localEndpoint);
-    when(context.getWorkEventbus()).thenReturn(mock(WorkEventBus.class));
-
-    statusReporter = new FragmentStatusReporter(context);
-    statusReporter.stateChanged(RUNNING);
-    verifyZeroInteractions(foremanTunnel);
-    verify(context.getWorkEventbus()).statusUpdate(any(FragmentStatus.class));
-  }
-
-  /**
-   * With LocalEndpoint and Foreman Endpoint being same node, test that after 
close of
-   * FragmentStatusReporter, status update doesn't happen either through 
Control Tunnel
-   * or through WorkEventBus.
-   *
-   * @throws Exception
-   */
-  @Test
-  public void testCloseLocalForeman() throws Exception {
-    DrillbitEndpoint localEndpoint = 
DrillbitEndpoint.newBuilder().setAddress("10.0.0.1").build();
-
-    when(context.getEndpoint()).thenReturn(localEndpoint);
-    when(context.getForemanEndpoint()).thenReturn(localEndpoint);
-    when(context.getWorkEventbus()).thenReturn(mock(WorkEventBus.class));
-    statusReporter = new FragmentStatusReporter(context);
-
-    statusReporter.close();
-    assertTrue(statusReporter.foremanDrillbit.get() == null);
-    statusReporter.stateChanged(RUNNING);
-    verifyZeroInteractions(foremanTunnel);
-    verify(context.getWorkEventbus(), 
never()).statusUpdate(any(FragmentStatus.class));
-  }
 }
diff --git 
a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/FutureBitCommand.java 
b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/FutureBitCommand.java
index 90b2d19..2437ddb 100644
--- a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/FutureBitCommand.java
+++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/FutureBitCommand.java
@@ -17,20 +17,24 @@
  */
 package org.apache.drill.exec.rpc;
 
-import io.netty.buffer.ByteBuf;
-
 import com.google.common.util.concurrent.SettableFuture;
+import com.google.protobuf.Internal.EnumLite;
 import com.google.protobuf.MessageLite;
+import io.netty.buffer.ByteBuf;
 
-public abstract class FutureBitCommand<T extends MessageLite, C extends 
RemoteConnection> implements RpcCommand<T,C> {
+public abstract class FutureBitCommand<T extends MessageLite, C extends 
RemoteConnection,
+  E extends EnumLite, M extends MessageLite> implements RpcCommand<T, C, E, M> 
{
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(FutureBitCommand.class);
 
   protected final SettableFuture<T> settableFuture;
   private final RpcCheckedFuture<T> parentFuture;
 
+  private final RpcOutcomeListener<T> outcomeListener;
+
   public FutureBitCommand() {
     this.settableFuture = SettableFuture.create();
     this.parentFuture = new RpcCheckedFuture<T>(settableFuture);
+    outcomeListener = new DeferredRpcOutcome();
   }
 
   public abstract void doRpcCall(RpcOutcomeListener<T> outcomeListener, C 
connection);
@@ -38,7 +42,7 @@ public abstract class FutureBitCommand<T extends MessageLite, 
C extends RemoteCo
   @Override
   public void connectionAvailable(C connection) {
 
-    doRpcCall(new DeferredRpcOutcome(), connection);
+    doRpcCall(outcomeListener, connection);
   }
 
   @Override
@@ -72,9 +76,13 @@ public abstract class FutureBitCommand<T extends 
MessageLite, C extends RemoteCo
   }
 
   @Override
+  public RpcOutcomeListener<T> getOutcomeListener() {
+    return outcomeListener;
+  }
+
+  @Override
   public void connectionFailed(FailureType type, Throwable t) {
     settableFuture.setException(RpcException.mapException(
         String.format("Command failed while establishing connection.  Failure 
type %s.", type), t));
   }
-
-}
\ No newline at end of file
+}
diff --git 
a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/ListeningCommand.java 
b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/ListeningCommand.java
index c9b8b3a..964abf4 100644
--- a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/ListeningCommand.java
+++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/ListeningCommand.java
@@ -17,11 +17,11 @@
  */
 package org.apache.drill.exec.rpc;
 
-import io.netty.buffer.ByteBuf;
-
+import com.google.protobuf.Internal.EnumLite;
 import com.google.protobuf.MessageLite;
 
-public abstract class ListeningCommand<T extends MessageLite, C extends 
RemoteConnection> implements RpcCommand<T, C> {
+public abstract class ListeningCommand<T extends MessageLite, C extends 
RemoteConnection,
+  E extends EnumLite, M extends MessageLite> implements RpcCommand<T, C, E, M> 
{
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(ListeningCommand.class);
 
   private final RpcOutcomeListener<T> listener;
@@ -35,7 +35,7 @@ public abstract class ListeningCommand<T extends MessageLite, 
C extends RemoteCo
   @Override
   public void connectionAvailable(C connection) {
 
-    doRpcCall(new DeferredRpcOutcome(), connection);
+    doRpcCall(listener, connection);
   }
 
   @Override
@@ -43,28 +43,15 @@ public abstract class ListeningCommand<T extends 
MessageLite, C extends RemoteCo
     connectionAvailable(connection);
   }
 
-  private class DeferredRpcOutcome implements RpcOutcomeListener<T> {
-
-    @Override
-    public void failed(RpcException ex) {
-      listener.failed(ex);
-    }
-
-    @Override
-    public void success(T value, ByteBuf buf) {
-      listener.success(value, buf);
-    }
-
-    @Override
-    public void interrupted(final InterruptedException e) {
-      listener.interrupted(e);
-    }
-  }
-
   @Override
   public void connectionFailed(FailureType type, Throwable t) {
     listener.failed(RpcException.mapException(
         String.format("Command failed while establishing connection.  Failure 
type %s.", type), t));
   }
 
-}
\ No newline at end of file
+  @Override
+  public RpcOutcomeListener<T> getOutcomeListener() {
+    return listener;
+  }
+
+}
diff --git 
a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/ReconnectingConnection.java 
b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/ReconnectingConnection.java
index fec3962..c648a16 100644
--- 
a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/ReconnectingConnection.java
+++ 
b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/ReconnectingConnection.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.rpc;
 
+import com.google.protobuf.Internal.EnumLite;
 import io.netty.channel.ChannelFuture;
 import io.netty.util.concurrent.GenericFutureListener;
 
@@ -52,7 +53,8 @@ public abstract class ReconnectingConnection<C extends 
ClientConnection, HS exte
 
   protected abstract BasicClient<?, C, HS, ?> getNewClient();
 
-  public <T extends MessageLite, R extends RpcCommand<T, C>> void runCommand(R 
cmd) {
+  public <T extends MessageLite, E extends EnumLite, M extends MessageLite,
+    R extends RpcCommand<T, C, E, M>> void runCommand(R cmd) {
 //    if(logger.isDebugEnabled()) logger.debug(String.format("Running command 
%s sending to host %s:%d", cmd, host, port));
     C connection = connectionHolder.get();
     if (connection != null) {
@@ -78,7 +80,7 @@ public abstract class ReconnectingConnection<C extends 
ClientConnection, HS exte
       } else {
 //        logger.debug("No connection active, opening client connection.");
         BasicClient<?, C, HS, ?> client = getNewClient();
-        ConnectionListeningFuture<T> future = new 
ConnectionListeningFuture<>(cmd);
+        ConnectionListeningFuture<T,E,M> future = new 
ConnectionListeningFuture<>(cmd);
         client.connectAsClient(future, handshake, host, port);
         future.waitAndRun();
 //        logger.debug("Connection available and active, command now being run 
inline.");
@@ -88,13 +90,13 @@ public abstract class ReconnectingConnection<C extends 
ClientConnection, HS exte
     }
   }
 
-  public class ConnectionListeningFuture<R extends MessageLite>
+  public class ConnectionListeningFuture<R extends MessageLite, E extends 
EnumLite, M extends MessageLite>
       extends AbstractFuture<C>
       implements RpcConnectionHandler<C> {
 
-    private RpcCommand<R, C> cmd;
+    private RpcCommand<R, C, E, M> cmd;
 
-    public ConnectionListeningFuture(RpcCommand<R, C> cmd) {
+    public ConnectionListeningFuture(RpcCommand<R, C, E, M> cmd) {
       super();
       this.cmd = cmd;
     }
@@ -173,9 +175,7 @@ public abstract class ReconnectingConnection<C extends 
ClientConnection, HS exte
         incoming.getChannel().close();
       }
       set(connection);
-
     }
-
   }
 
   /** Factory for close handlers **/
@@ -204,7 +204,6 @@ public abstract class ReconnectingConnection<C extends 
ClientConnection, HS exte
       connectionHolder.compareAndSet(connection, null);
       parent.operationComplete(future);
     }
-
   }
 
   public CloseHandlerCreator getCloseHandlerCreator() {
@@ -223,5 +222,4 @@ public abstract class ReconnectingConnection<C extends 
ClientConnection, HS exte
       c.getChannel().close();
     }
   }
-
 }
diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcCommand.java 
b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcCommand.java
index 1bb39ac..821b8fb 100644
--- a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcCommand.java
+++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcCommand.java
@@ -17,10 +17,17 @@
  */
 package org.apache.drill.exec.rpc;
 
+import com.google.protobuf.Internal.EnumLite;
 import com.google.protobuf.MessageLite;
 
-public interface RpcCommand<T extends MessageLite, C extends RemoteConnection> 
extends RpcConnectionHandler<C>{
+public interface RpcCommand<T extends MessageLite, C extends RemoteConnection,
+  E extends EnumLite, M extends MessageLite> extends RpcConnectionHandler<C> {
 
-  public abstract void connectionAvailable(C connection);
+  void connectionAvailable(C connection);
 
-}
\ No newline at end of file
+  E getRpcType();
+
+  M getMessage();
+
+  RpcOutcomeListener<T> getOutcomeListener();
+}

-- 
To stop receiving notification emails like this one, please contact
ar...@apache.org.

Reply via email to