Repository: drill
Updated Branches:
  refs/heads/master d77ab3183 -> 6cb626d78


DRILL-5721: Query with only root fragment and no non-root fragment hangs when 
Drillbit to Drillbit Control Connection has network issues
            Note: 1) To resolve the issue all the fragments including root 
fragment which are assigned to be executed on Foreman node
                     are scheduled locally and not sent over Control Tunnel. 
Also the FragmentStatusReporter is updated to sent the
                     status update locally by fragments running on Foreman node.
                  2) Refactor for FragmentManager, setupRootFragment and 
startNewFragment
                  3) Update the test added for DRILL-5701 as there is change in 
behavior


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/b06a7bde
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/b06a7bde
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/b06a7bde

Branch: refs/heads/master
Commit: b06a7bdecef8bdd52eebf4a2821a09c64b296886
Parents: d77ab31
Author: Sorabh Hamirwasia <shamirwa...@maprtech.com>
Authored: Thu Aug 17 16:48:37 2017 -0700
Committer: Paul Rogers <prog...@maprtech.com>
Committed: Sun Sep 24 21:34:14 2017 -0700

----------------------------------------------------------------------
 .../apache/drill/exec/ops/FragmentContext.java  |  30 ++-
 .../org/apache/drill/exec/work/WorkManager.java |  51 ++--
 .../exec/work/batch/ControlMessageHandler.java  |  35 ++-
 .../apache/drill/exec/work/foreman/Foreman.java | 238 ++++++++++++-------
 .../work/fragment/AbstractFragmentManager.java  |  99 ++++++++
 .../exec/work/fragment/FragmentExecutor.java    |   5 +-
 .../work/fragment/FragmentStatusReporter.java   |  77 +++---
 .../work/fragment/NonRootFragmentManager.java   |  78 +-----
 .../exec/work/fragment/RootFragmentManager.java |  67 +-----
 .../drill/exec/rpc/data/TestBitBitKerberos.java |  97 ++++++++
 .../security/TestUserBitKerberosEncryption.java |  14 +-
 11 files changed, 501 insertions(+), 290 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/b06a7bde/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
index badf70c..19ffca2 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
@@ -17,14 +17,12 @@
  */
 package org.apache.drill.exec.ops;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import io.netty.buffer.DrillBuf;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Executor;
-
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
@@ -47,8 +45,9 @@ import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
 import org.apache.drill.exec.proto.helper.QueryIdHelper;
 import org.apache.drill.exec.rpc.RpcException;
 import org.apache.drill.exec.rpc.RpcOutcomeListener;
-import org.apache.drill.exec.rpc.control.ControlTunnel;
 import org.apache.drill.exec.rpc.UserClientConnection;
+import org.apache.drill.exec.rpc.control.ControlTunnel;
+import org.apache.drill.exec.rpc.control.WorkEventBus;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.server.options.FragmentOptionManager;
 import org.apache.drill.exec.server.options.OptionList;
@@ -60,10 +59,10 @@ import org.apache.drill.exec.testing.ExecutionControls;
 import org.apache.drill.exec.util.ImpersonationUtil;
 import org.apache.drill.exec.work.batch.IncomingBuffers;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executor;
 
 /**
  * Contextual objects required for execution of a particular fragment.
@@ -486,6 +485,15 @@ public class FragmentContext implements AutoCloseable, 
UdfUtilities, FragmentExe
     sendingAccountor.waitForSendComplete();
   }
 
+  public WorkEventBus getWorkEventbus() {
+    return context.getWorkBus();
+  }
+
+  public boolean isBuffersDone() {
+    Preconditions.checkState(this.buffers != null, "Incoming Buffers is not 
set in this fragment context");
+    return buffers.isDone();
+  }
+
   public interface ExecutorState {
     /**
      * Whether execution should continue.

http://git-wip-us.apache.org/repos/asf/drill/blob/b06a7bde/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
index 2d37b8c..800d3a7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
@@ -17,13 +17,10 @@
  */
 package org.apache.drill.exec.work;
 
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Executor;
-
-import com.codahale.metrics.Counter;
+import com.codahale.metrics.Gauge;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import org.apache.drill.common.SelfCleaningRunnable;
 import org.apache.drill.common.concurrent.ExtendedLatch;
 import org.apache.drill.exec.coord.ClusterCoordinator;
@@ -49,10 +46,11 @@ import org.apache.drill.exec.work.fragment.FragmentExecutor;
 import org.apache.drill.exec.work.fragment.FragmentManager;
 import org.apache.drill.exec.work.user.UserWorker;
 
-import com.codahale.metrics.Gauge;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executor;
 
 /**
  * Manages the running fragments in a Drillbit. Periodically requests run-time 
stats updates from fragments
@@ -294,6 +292,9 @@ public class WorkManager implements AutoCloseable {
    * about RUNNING queries, such as current memory consumption, number of rows 
processed, and so on.
    * The FragmentStatusListener only tracks changes to state, so the 
statistics kept there will be
    * stale; this thread probes for current values.
+   *
+   * For each running fragment if the Foreman is the local Drillbit then 
status is updated locally bypassing the Control
+   * Tunnel, whereas for remote Foreman it is sent over the Control Tunnel.
    */
   private class StatusThread extends Thread {
     public StatusThread() {
@@ -303,30 +304,42 @@ public class WorkManager implements AutoCloseable {
 
     @Override
     public void run() {
-      while(true) {
-        final Controller controller = dContext.getController();
+
+      // Get the controller and localBitEndPoint outside the loop since these 
will not change once a Drillbit and
+      // StatusThread is started
+      final Controller controller = dContext.getController();
+      final DrillbitEndpoint localBitEndPoint = dContext.getEndpoint();
+
+      while (true) {
         final List<DrillRpcFuture<Ack>> futures = Lists.newArrayList();
-        for(final FragmentExecutor fragmentExecutor : 
runningFragments.values()) {
+        for (final FragmentExecutor fragmentExecutor : 
runningFragments.values()) {
           final FragmentStatus status = fragmentExecutor.getStatus();
           if (status == null) {
             continue;
           }
 
-          final DrillbitEndpoint ep = 
fragmentExecutor.getContext().getForemanEndpoint();
-          futures.add(controller.getTunnel(ep).sendFragmentStatus(status));
+          final DrillbitEndpoint foremanEndpoint = 
fragmentExecutor.getContext().getForemanEndpoint();
+
+          // If local endpoint is the Foreman for this running fragment, then 
submit the status locally bypassing the
+          // Control Tunnel
+          if (localBitEndPoint.equals(foremanEndpoint)) {
+            workBus.statusUpdate(status);
+          } else { // else send the status to remote Foreman over Control 
Tunnel
+            
futures.add(controller.getTunnel(foremanEndpoint).sendFragmentStatus(status));
+          }
         }
 
-        for(final DrillRpcFuture<Ack> future : futures) {
+        for (final DrillRpcFuture<Ack> future : futures) {
           try {
             future.checkedGet();
-          } catch(final RpcException ex) {
+          } catch (final RpcException ex) {
             logger.info("Failure while sending intermediate fragment status to 
Foreman", ex);
           }
         }
 
         try {
           Thread.sleep(STATUS_PERIOD_SECONDS * 1000);
-        } catch(final InterruptedException e) {
+        } catch (final InterruptedException e) {
           // Preserve evidence that the interruption occurred so that code 
higher up on the call stack can learn of the
           // interruption and respond to it if it wants to.
           Thread.currentThread().interrupt();

http://git-wip-us.apache.org/repos/asf/drill/blob/b06a7bde/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java
index 58c1df5..2bbaf1b 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java
@@ -17,10 +17,9 @@
  */
 package org.apache.drill.exec.work.batch;
 
-import static org.apache.drill.exec.rpc.RpcBus.get;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.DrillBuf;
-
+import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.proto.BitControl.CustomMessage;
 import org.apache.drill.exec.proto.BitControl.FinishedReceiver;
@@ -42,7 +41,6 @@ import org.apache.drill.exec.rpc.RpcException;
 import org.apache.drill.exec.rpc.UserRpcException;
 import org.apache.drill.exec.rpc.control.ControlConnection;
 import org.apache.drill.exec.rpc.control.ControlRpcConfig;
-import org.apache.drill.exec.rpc.control.ControlTunnel;
 import org.apache.drill.exec.rpc.control.CustomHandlerRegistry;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.work.WorkManager.WorkerBee;
@@ -52,6 +50,8 @@ import org.apache.drill.exec.work.fragment.FragmentManager;
 import org.apache.drill.exec.work.fragment.FragmentStatusReporter;
 import org.apache.drill.exec.work.fragment.NonRootFragmentManager;
 
+import static org.apache.drill.exec.rpc.RpcBus.get;
+
 public class ControlMessageHandler implements 
RequestHandler<ControlConnection> {
   private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(ControlMessageHandler.class);
   private final WorkerBee bee;
@@ -110,8 +110,9 @@ public class ControlMessageHandler implements 
RequestHandler<ControlConnection>
 
     case RpcType.REQ_INITIALIZE_FRAGMENTS_VALUE: {
       final InitializeFragments fragments = get(pBody, 
InitializeFragments.PARSER);
+      final DrillbitContext drillbitContext = bee.getContext();
       for(int i = 0; i < fragments.getFragmentCount(); i++) {
-        startNewRemoteFragment(fragments.getFragment(i));
+        startNewFragment(fragments.getFragment(i), drillbitContext);
       }
       sender.send(ControlRpcConfig.OK);
       break;
@@ -140,25 +141,33 @@ public class ControlMessageHandler implements 
RequestHandler<ControlConnection>
     }
   }
 
-  private void startNewRemoteFragment(final PlanFragment fragment) throws 
UserRpcException {
+  /**
+   * Start a new fragment on this node. These fragments can be leaf or 
intermediate fragments
+   * which are scheduled by remote or local Foreman node.
+   * @param fragment
+   * @throws UserRpcException
+   */
+  private void startNewFragment(final PlanFragment fragment, final 
DrillbitContext drillbitContext)
+      throws UserRpcException {
     logger.debug("Received remote fragment start instruction", fragment);
 
-    final DrillbitContext drillbitContext = bee.getContext();
     try {
+      final FragmentContext fragmentContext = new 
FragmentContext(drillbitContext, fragment,
+          drillbitContext.getFunctionImplementationRegistry());
+      final FragmentStatusReporter statusReporter = new 
FragmentStatusReporter(fragmentContext);
+      final FragmentExecutor fragmentExecutor = new 
FragmentExecutor(fragmentContext, fragment, statusReporter);
+
       // we either need to start the fragment if it is a leaf fragment, or set 
up a fragment manager if it is non leaf.
       if (fragment.getLeafFragment()) {
-        final FragmentContext context = new FragmentContext(drillbitContext, 
fragment,
-            drillbitContext.getFunctionImplementationRegistry());
-        final ControlTunnel tunnel = 
drillbitContext.getController().getTunnel(fragment.getForeman());
-        final FragmentStatusReporter statusReporter = new 
FragmentStatusReporter(context, tunnel);
-        final FragmentExecutor fr = new FragmentExecutor(context, fragment, 
statusReporter);
-        bee.addFragmentRunner(fr);
+        bee.addFragmentRunner(fragmentExecutor);
       } else {
         // isIntermediate, store for incoming data.
-        final NonRootFragmentManager manager = new 
NonRootFragmentManager(fragment, drillbitContext);
+        final NonRootFragmentManager manager = new 
NonRootFragmentManager(fragment, fragmentExecutor, statusReporter);
         drillbitContext.getWorkBus().addFragmentManager(manager);
       }
 
+    } catch (final ExecutionSetupException ex) {
+      throw new UserRpcException(drillbitContext.getEndpoint(), "Failed to 
create fragment context", ex);
     } catch (final Exception e) {
         throw new UserRpcException(drillbitContext.getEndpoint(),
             "Failure while trying to start remote fragment", e);

http://git-wip-us.apache.org/repos/asf/drill/blob/b06a7bde/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index 8144da1..d01d8fd 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -18,20 +18,16 @@
 package org.apache.drill.exec.work.foreman;
 
 import com.codahale.metrics.Counter;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Sets;
+import com.google.protobuf.InvalidProtocolBufferException;
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.ChannelFuture;
 import io.netty.util.concurrent.Future;
 import io.netty.util.concurrent.GenericFutureListener;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Date;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
 import org.apache.drill.common.CatastrophicFailure;
 import org.apache.drill.common.EventProcessor;
 import org.apache.drill.common.concurrent.ExtendedLatch;
@@ -71,9 +67,8 @@ import org.apache.drill.exec.proto.UserProtos.RunQuery;
 import org.apache.drill.exec.proto.helper.QueryIdHelper;
 import org.apache.drill.exec.rpc.BaseRpcOutcomeListener;
 import org.apache.drill.exec.rpc.RpcException;
-import org.apache.drill.exec.rpc.control.ControlTunnel;
-import org.apache.drill.exec.rpc.control.Controller;
 import org.apache.drill.exec.rpc.UserClientConnection;
+import org.apache.drill.exec.rpc.control.Controller;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.testing.ControlsInjector;
@@ -83,18 +78,21 @@ import org.apache.drill.exec.util.Pointer;
 import org.apache.drill.exec.work.EndpointListener;
 import org.apache.drill.exec.work.QueryWorkUnit;
 import org.apache.drill.exec.work.WorkManager.WorkerBee;
-import org.apache.drill.exec.work.batch.IncomingBuffers;
 import org.apache.drill.exec.work.fragment.FragmentExecutor;
 import org.apache.drill.exec.work.fragment.FragmentStatusReporter;
+import org.apache.drill.exec.work.fragment.NonRootFragmentManager;
 import org.apache.drill.exec.work.fragment.RootFragmentManager;
 import org.codehaus.jackson.map.ObjectMapper;
 
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Multimap;
-import com.google.common.collect.Sets;
-import com.google.protobuf.InvalidProtocolBufferException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Date;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Foreman manages all the fragments (local and remote) for a single query 
where this
@@ -1073,24 +1071,18 @@ public class Foreman implements Runnable {
    */
   private void setupRootFragment(final PlanFragment rootFragment, final 
FragmentRoot rootOperator)
       throws ExecutionSetupException {
-    @SuppressWarnings("resource")
     final FragmentContext rootContext = new FragmentContext(drillbitContext, 
rootFragment, queryContext,
         initiatingClient, drillbitContext.getFunctionImplementationRegistry());
-    @SuppressWarnings("resource")
-    final IncomingBuffers buffers = new IncomingBuffers(rootFragment, 
rootContext);
-    rootContext.setBuffers(buffers);
+    final FragmentStatusReporter statusReporter = new 
FragmentStatusReporter(rootContext);
+    final FragmentExecutor rootRunner = new FragmentExecutor(rootContext, 
rootFragment, statusReporter, rootOperator);
+    final RootFragmentManager fragmentManager = new 
RootFragmentManager(rootFragment, rootRunner, statusReporter);
 
     queryManager.addFragmentStatusTracker(rootFragment, true);
 
-    final ControlTunnel tunnel = 
drillbitContext.getController().getTunnel(queryContext.getCurrentEndpoint());
-    final FragmentExecutor rootRunner = new FragmentExecutor(rootContext, 
rootFragment,
-        new FragmentStatusReporter(rootContext, tunnel),
-        rootOperator);
-    final RootFragmentManager fragmentManager = new 
RootFragmentManager(rootFragment.getHandle(), buffers, rootRunner);
-
-    if (buffers.isDone()) {
+    // FragmentManager is setting buffer for FragmentContext
+    if (rootContext.isBuffersDone()) {
       // if we don't have to wait for any incoming data, start the fragment 
runner.
-      bee.addFragmentRunner(fragmentManager.getRunnable());
+      bee.addFragmentRunner(rootRunner);
     } else {
       // if we do, record the fragment manager in the workBus.
       drillbitContext.getWorkBus().addFragmentManager(fragmentManager);
@@ -1098,69 +1090,54 @@ public class Foreman implements Runnable {
   }
 
   /**
-   * Set up the non-root fragments for execution. Some may be local, and some 
may be remote.
-   * Messages are sent immediately, so they may start returning data even 
before we complete this.
-   *
-   * @param fragments the fragments
-   * @throws ForemanException
+   * Add planFragment into either of local fragment list or remote fragment 
map based on assigned Drillbit Endpoint node
+   * and the local Drillbit Endpoint.
+   * @param planFragment
+   * @param localEndPoint
+   * @param localFragmentList
+   * @param remoteFragmentMap
    */
-  private void setupNonRootFragments(final Collection<PlanFragment> fragments) 
throws ForemanException {
-    if (fragments.isEmpty()) {
-      // nothing to do here
-      return;
-    }
-    /*
-     * We will send a single message to each endpoint, regardless of how many 
fragments will be
-     * executed there. We need to start up the intermediate fragments first so 
that they will be
-     * ready once the leaf fragments start producing data. To satisfy both of 
these, we will
-     * make a pass through the fragments and put them into these two maps 
according to their
-     * leaf/intermediate state, as well as their target drillbit.
-     */
-    final Multimap<DrillbitEndpoint, PlanFragment> leafFragmentMap = 
ArrayListMultimap.create();
-    final Multimap<DrillbitEndpoint, PlanFragment> intFragmentMap = 
ArrayListMultimap.create();
+  private void updateFragmentCollection(final PlanFragment planFragment, final 
DrillbitEndpoint localEndPoint,
+                                        final List<PlanFragment> 
localFragmentList,
+                                        final Multimap<DrillbitEndpoint, 
PlanFragment> remoteFragmentMap) {
+    final DrillbitEndpoint assignedDrillbit = planFragment.getAssignment();
 
-    // record all fragments for status purposes.
-    for (final PlanFragment planFragment : fragments) {
-      logger.trace("Tracking intermediate remote node {} with data {}",
-                   planFragment.getAssignment(), 
planFragment.getFragmentJson());
-      queryManager.addFragmentStatusTracker(planFragment, false);
-      if (planFragment.getLeafFragment()) {
-        leafFragmentMap.put(planFragment.getAssignment(), planFragment);
-      } else {
-        intFragmentMap.put(planFragment.getAssignment(), planFragment);
-      }
+    if (assignedDrillbit.equals(localEndPoint)) {
+      localFragmentList.add(planFragment);
+    } else {
+      remoteFragmentMap.put(assignedDrillbit, planFragment);
     }
+  }
 
-    /*
-     * We need to wait for the intermediates to be sent so that they'll be set 
up by the time
-     * the leaves start producing data. We'll use this latch to wait for the 
responses.
-     *
-     * However, in order not to hang the process if any of the RPC requests 
fails, we always
-     * count down (see FragmentSubmitFailures), but we count the number of 
failures so that we'll
-     * know if any submissions did fail.
-     */
-    final int numIntFragments = intFragmentMap.keySet().size();
+  /**
+   * Send remote intermediate fragment to the assigned Drillbit node. Throw 
exception in case of failure to send the
+   * fragment.
+   * @param remoteFragmentMap - Map of Drillbit Endpoint to list of 
PlanFragment's
+   */
+  private void scheduleRemoteIntermediateFragments(final 
Multimap<DrillbitEndpoint, PlanFragment> remoteFragmentMap) {
+
+    final int numIntFragments = remoteFragmentMap.keySet().size();
     final ExtendedLatch endpointLatch = new ExtendedLatch(numIntFragments);
     final FragmentSubmitFailures fragmentSubmitFailures = new 
FragmentSubmitFailures();
 
     // send remote intermediate fragments
-    for (final DrillbitEndpoint ep : intFragmentMap.keySet()) {
-      sendRemoteFragments(ep, intFragmentMap.get(ep), endpointLatch, 
fragmentSubmitFailures);
+    for (final DrillbitEndpoint ep : remoteFragmentMap.keySet()) {
+      sendRemoteFragments(ep, remoteFragmentMap.get(ep), endpointLatch, 
fragmentSubmitFailures);
     }
 
     final long timeout = RPC_WAIT_IN_MSECS_PER_FRAGMENT * numIntFragments;
-    if(numIntFragments > 0 && !endpointLatch.awaitUninterruptibly(timeout)){
+    if (numIntFragments > 0 && !endpointLatch.awaitUninterruptibly(timeout)) {
       long numberRemaining = endpointLatch.getCount();
       throw UserException.connectionError()
-          .message(
-              "Exceeded timeout (%d) while waiting send intermediate work 
fragments to remote nodes. " +
-                  "Sent %d and only heard response back from %d nodes.",
-              timeout, numIntFragments, numIntFragments - numberRemaining)
-          .build(logger);
+          .message("Exceeded timeout (%d) while waiting send intermediate work 
fragments to remote nodes. " +
+              "Sent %d and only heard response back from %d nodes.",
+              timeout, numIntFragments, numIntFragments - 
numberRemaining).build(logger);
     }
 
     // if any of the intermediate fragment submissions failed, fail the query
-    final List<FragmentSubmitFailures.SubmissionException> 
submissionExceptions = fragmentSubmitFailures.submissionExceptions;
+    final List<FragmentSubmitFailures.SubmissionException> 
submissionExceptions =
+        fragmentSubmitFailures.submissionExceptions;
+
     if (submissionExceptions.size() > 0) {
       Set<DrillbitEndpoint> endpoints = Sets.newHashSet();
       StringBuilder sb = new StringBuilder();
@@ -1179,8 +1156,100 @@ public class Foreman implements Runnable {
       }
       throw 
UserException.connectionError(submissionExceptions.get(0).rpcException)
           .message("Error setting up remote intermediate fragment execution")
-          .addContext("Nodes with failures", sb.toString())
-          .build(logger);
+          .addContext("Nodes with failures", sb.toString()).build(logger);
+    }
+  }
+
+
+  /**
+   * Start the locally assigned leaf or intermediate fragment
+   * @param fragment
+   * @throws ForemanException
+   */
+  private void startLocalFragment(final PlanFragment fragment) throws 
ForemanException {
+
+    logger.debug("Received local fragment start instruction", fragment);
+
+    try {
+      final FragmentContext fragmentContext = new 
FragmentContext(drillbitContext, fragment,
+          drillbitContext.getFunctionImplementationRegistry());
+      final FragmentStatusReporter statusReporter = new 
FragmentStatusReporter(fragmentContext);
+      final FragmentExecutor fragmentExecutor = new 
FragmentExecutor(fragmentContext, fragment, statusReporter);
+
+      // we either need to start the fragment if it is a leaf fragment, or set 
up a fragment manager if it is non leaf.
+      if (fragment.getLeafFragment()) {
+        bee.addFragmentRunner(fragmentExecutor);
+      } else {
+        // isIntermediate, store for incoming data.
+        final NonRootFragmentManager manager = new 
NonRootFragmentManager(fragment, fragmentExecutor, statusReporter);
+        drillbitContext.getWorkBus().addFragmentManager(manager);
+      }
+
+    } catch (final ExecutionSetupException ex) {
+      throw new ForemanException("Failed to create fragment context", ex);
+    } catch (final Exception ex) {
+      throw new ForemanException("Failed while trying to start local 
fragment", ex);
+    }
+  }
+
+  /**
+   * Set up the non-root fragments for execution. Some may be local, and some 
may be remote.
+   * Messages are sent immediately, so they may start returning data even 
before we complete this.
+   *
+   * @param fragments the fragments
+   * @throws ForemanException
+   */
+  private void setupNonRootFragments(final Collection<PlanFragment> fragments) 
throws ForemanException {
+    if (fragments.isEmpty()) {
+      // nothing to do here
+      return;
+    }
+    /*
+     * We will send a single message to each endpoint, regardless of how many 
fragments will be
+     * executed there. We need to start up the intermediate fragments first so 
that they will be
+     * ready once the leaf fragments start producing data. To satisfy both of 
these, we will
+     * make a pass through the fragments and put them into the remote maps 
according to their
+     * leaf/intermediate state, as well as their target drillbit. Also filter 
the leaf/intermediate
+     * fragments which are assigned to run on local Drillbit node (or Foreman 
node) into separate lists.
+     *
+     * This will help to schedule local
+     */
+    final Multimap<DrillbitEndpoint, PlanFragment> remoteLeafFragmentMap = 
ArrayListMultimap.create();
+    final List<PlanFragment> localLeafFragmentList = new ArrayList<>();
+    final Multimap<DrillbitEndpoint, PlanFragment> remoteIntFragmentMap = 
ArrayListMultimap.create();
+    final List<PlanFragment> localIntFragmentList = new ArrayList<>();
+
+    final DrillbitEndpoint localDrillbitEndpoint = 
drillbitContext.getEndpoint();
+    // record all fragments for status purposes.
+    for (final PlanFragment planFragment : fragments) {
+
+      if (logger.isTraceEnabled()) {
+        logger.trace("Tracking intermediate remote node {} with data {}", 
planFragment.getAssignment(),
+            planFragment.getFragmentJson());
+      }
+
+      queryManager.addFragmentStatusTracker(planFragment, false);
+
+      if (planFragment.getLeafFragment()) {
+        updateFragmentCollection(planFragment, localDrillbitEndpoint, 
localLeafFragmentList, remoteLeafFragmentMap);
+      } else {
+        updateFragmentCollection(planFragment, localDrillbitEndpoint, 
localIntFragmentList, remoteIntFragmentMap);
+      }
+    }
+
+    /*
+     * We need to wait for the intermediates to be sent so that they'll be set 
up by the time
+     * the leaves start producing data. We'll use this latch to wait for the 
responses.
+     *
+     * However, in order not to hang the process if any of the RPC requests 
fails, we always
+     * count down (see FragmentSubmitFailures), but we count the number of 
failures so that we'll
+     * know if any submissions did fail.
+     */
+    scheduleRemoteIntermediateFragments(remoteIntFragmentMap);
+
+    // Setup local intermediate fragments
+    for (final PlanFragment fragment : localIntFragmentList) {
+      startLocalFragment(fragment);
     }
 
     injector.injectChecked(queryContext.getExecutionControls(), 
"send-fragments", ForemanException.class);
@@ -1188,8 +1257,13 @@ public class Foreman implements Runnable {
      * Send the remote (leaf) fragments; we don't wait for these. Any problems 
will come in through
      * the regular sendListener event delivery.
      */
-    for (final DrillbitEndpoint ep : leafFragmentMap.keySet()) {
-      sendRemoteFragments(ep, leafFragmentMap.get(ep), null, null);
+    for (final DrillbitEndpoint ep : remoteLeafFragmentMap.keySet()) {
+      sendRemoteFragments(ep, remoteLeafFragmentMap.get(ep), null, null);
+    }
+
+    // Setup local leaf fragments
+    for (final PlanFragment fragment : localLeafFragmentList) {
+      startLocalFragment(fragment);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/b06a7bde/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/AbstractFragmentManager.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/AbstractFragmentManager.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/AbstractFragmentManager.java
new file mode 100644
index 0000000..f427a84
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/AbstractFragmentManager.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.fragment;
+
+import org.apache.drill.exec.exception.FragmentSetupException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.base.FragmentRoot;
+import org.apache.drill.exec.proto.BitControl.PlanFragment;
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.rpc.data.IncomingDataBatch;
+import org.apache.drill.exec.work.batch.IncomingBuffers;
+
+import java.io.IOException;
+
+public abstract class AbstractFragmentManager implements FragmentManager {
+  //private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(AbstractFragmentManager.class);
+
+
+  protected final IncomingBuffers buffers;
+
+  protected final FragmentExecutor fragmentExecutor;
+
+  protected final FragmentHandle fragmentHandle;
+
+  protected final FragmentContext fragmentContext;
+
+  protected volatile boolean cancel = false;
+
+
+  public AbstractFragmentManager(final PlanFragment fragment, final 
FragmentExecutor executor, final FragmentStatusReporter statusReporter, final 
FragmentRoot rootOperator) {
+    this.fragmentHandle = fragment.getHandle();
+    this.fragmentContext = executor.getContext();
+    this.buffers = new IncomingBuffers(fragment, fragmentContext);
+    this.fragmentContext.setBuffers(buffers);
+    this.fragmentExecutor = executor;
+  }
+
+  public AbstractFragmentManager(final PlanFragment fragment, final 
FragmentExecutor executor, final FragmentStatusReporter statusReporter) {
+    this(fragment, executor, statusReporter, null);
+  }
+
+  @Override
+  public boolean handle(final IncomingDataBatch batch) throws 
FragmentSetupException, IOException {
+    return buffers.batchArrived(batch);
+  }
+
+  @Override
+  public boolean isCancelled() {
+    return cancel;
+  }
+
+  @Override
+  public void unpause() {
+    fragmentExecutor.unpause();
+  }
+
+  @Override
+  public FragmentHandle getHandle() {
+    return fragmentHandle;
+  }
+
+  @Override
+  public boolean isWaiting() {
+    return !buffers.isDone() && !cancel;
+  }
+
+  @Override
+  public FragmentContext getFragmentContext() {
+    return fragmentContext;
+  }
+
+  @Override
+  public FragmentExecutor getRunnable() {
+    return fragmentExecutor;
+  }
+
+  public abstract void receivingFragmentFinished(final FragmentHandle handle);
+
+  @Override
+  public void cancel() {
+    cancel = true;
+    fragmentExecutor.cancel();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/b06a7bde/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
index 258e485..e97a382 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
@@ -92,7 +92,7 @@ public class FragmentExecutor implements Runnable {
    * @param rootOperator
    */
   public FragmentExecutor(final FragmentContext context, final PlanFragment 
fragment,
-      final FragmentStatusReporter statusReporter, final FragmentRoot 
rootOperator) {
+                          final FragmentStatusReporter statusReporter, final 
FragmentRoot rootOperator) {
     this.fragmentContext = context;
     this.statusReporter = statusReporter;
     this.fragment = fragment;
@@ -261,6 +261,9 @@ public class FragmentExecutor implements Runnable {
       eventProcessor.start();
 
       // here we could be in FAILED, RUNNING, or CANCELLATION_REQUESTED
+      // FAILED state will be because of any Exception in execution loop 
root.next()
+      // CANCELLATION_REQUESTED because of a CANCEL request received by 
Foreman.
+      // ELSE will be in FINISHED state.
       cleanup(FragmentState.FINISHED);
 
       clusterCoordinator.removeDrillbitStatusListener(drillbitStatusListener);

http://git-wip-us.apache.org/repos/asf/drill/blob/b06a7bde/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentStatusReporter.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentStatusReporter.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentStatusReporter.java
index e37435c..c0ecb07 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentStatusReporter.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentStatusReporter.java
@@ -21,27 +21,32 @@ import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.proto.BitControl.FragmentStatus;
-import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.proto.UserBitShared.FragmentState;
 import org.apache.drill.exec.proto.UserBitShared.MinorFragmentProfile;
 import org.apache.drill.exec.proto.helper.QueryIdHelper;
 import org.apache.drill.exec.rpc.control.ControlTunnel;
+import org.apache.drill.exec.server.Drillbit;
 
 import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * The status reporter is responsible for receiving changes in fragment state 
and propagating the status back to the
- * Foreman through a control tunnel.
+ * Foreman either through a control tunnel or locally.
  */
 public class FragmentStatusReporter implements AutoCloseable {
   private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(FragmentStatusReporter.class);
 
-  private final FragmentContext context;
-  private final AtomicReference<ControlTunnel> tunnel;
+  protected final FragmentContext context;
 
-  public FragmentStatusReporter(final FragmentContext context, final 
ControlTunnel tunnel) {
+  protected final AtomicReference<DrillbitEndpoint> foremanDrillbit;
+
+  protected final DrillbitEndpoint localDrillbit;
+
+  public FragmentStatusReporter(final FragmentContext context) {
     this.context = context;
-    this.tunnel = new AtomicReference<>(tunnel);
+    this.foremanDrillbit = new AtomicReference<>(context.getForemanEndpoint());
+    this.localDrillbit = context.getIdentity();
   }
 
   /**
@@ -82,29 +87,47 @@ public class FragmentStatusReporter implements 
AutoCloseable {
     final FragmentStatus status = getStatus(newState, null);
     logger.info("{}: State to report: {}", 
QueryIdHelper.getQueryIdentifier(context.getHandle()), newState);
     switch (newState) {
-    case AWAITING_ALLOCATION:
-    case CANCELLATION_REQUESTED:
-    case CANCELLED:
-    case FINISHED:
-    case RUNNING:
-      sendStatus(status);
-      break;
-    case SENDING:
-      // no op.
-      break;
-    case FAILED:
-      // shouldn't get here since fail() should be called.
-    default:
-      throw new IllegalStateException(String.format("Received state changed 
event for unexpected state of %s.", newState));
+      case AWAITING_ALLOCATION:
+      case CANCELLATION_REQUESTED:
+      case CANCELLED:
+      case FINISHED:
+      case RUNNING:
+        sendStatus(status);
+        break;
+      case SENDING:
+        // no op.
+        break;
+      case FAILED:
+        // shouldn't get here since fail() should be called.
+      default:
+        throw new IllegalStateException(String.format("Received state changed 
event for unexpected state of %s.",
+            newState));
     }
   }
 
-  private void sendStatus(final FragmentStatus status) {
-    final ControlTunnel tunnel = this.tunnel.get();
-    if (tunnel != null) {
-      tunnel.sendFragmentStatus(status);
+
+  /**
+   * Sends status to remote Foreman node using Control Tunnel or to Local 
Foreman bypassing
+   * Control Tunnel and using WorkEventBus.
+   * @param status
+   */
+  void sendStatus(final FragmentStatus status) {
+
+    DrillbitEndpoint foremanNode = foremanDrillbit.get();
+
+    if (foremanNode == null) {
+      logger.warn("{}: State {} is not reported as {} is closed", 
QueryIdHelper.getQueryIdentifier(context.getHandle()),
+          status.getProfile().getState(), this);
+      return;
+    }
+
+    if (localDrillbit.equals(foremanNode)) {
+      // Update the status locally
+      context.getWorkEventbus().statusUpdate(status);
     } else {
-      logger.warn("{}: State {} is not reported as {} is closed", 
QueryIdHelper.getQueryIdentifier(context.getHandle()), 
status.getProfile().getState(), this);
+      // Send the status via Control Tunnel to remote foreman node
+      final ControlTunnel tunnel = context.getControlTunnel(foremanNode);
+      tunnel.sendFragmentStatus(status);
     }
   }
 
@@ -123,8 +146,8 @@ public class FragmentStatusReporter implements 
AutoCloseable {
   @Override
   public void close()
   {
-    final ControlTunnel tunnel = this.tunnel.getAndSet(null);
-    if (tunnel != null) {
+    final DrillbitEndpoint foremanNode = foremanDrillbit.getAndSet(null);
+    if (foremanNode != null) {
       logger.debug("Closing {}", this);
     } else {
       logger.warn("{} was already closed", this);

http://git-wip-us.apache.org/repos/asf/drill/blob/b06a7bde/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
index 7cffa0a..7d1585b 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
@@ -17,56 +17,21 @@
  */
 package org.apache.drill.exec.work.fragment;
 
-import java.io.IOException;
-
-import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.exec.exception.FragmentSetupException;
-import org.apache.drill.exec.ops.FragmentContext;
+import com.google.common.base.Preconditions;
 import org.apache.drill.exec.proto.BitControl.PlanFragment;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
-import org.apache.drill.exec.rpc.data.IncomingDataBatch;
-import org.apache.drill.exec.server.DrillbitContext;
-import org.apache.drill.exec.work.batch.IncomingBuffers;
-import org.apache.drill.exec.work.foreman.ForemanException;
-
-import com.google.common.base.Preconditions;
 
 /**
  * This managers determines when to run a non-root fragment node.
  */
-// TODO a lot of this is the same as RootFragmentManager
-public class NonRootFragmentManager implements FragmentManager {
+public class NonRootFragmentManager extends AbstractFragmentManager {
   //private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(NonRootFragmentManager.class);
 
-  private final IncomingBuffers buffers;
-  private final FragmentExecutor runner;
-  private final FragmentHandle handle;
-  private volatile boolean cancel = false;
-  private final FragmentContext context;
   private volatile boolean runnerRetrieved = false;
 
-  public NonRootFragmentManager(final PlanFragment fragment, final 
DrillbitContext context)
-      throws ExecutionSetupException {
-    try {
-      this.handle = fragment.getHandle();
-      this.context = new FragmentContext(context, fragment, 
context.getFunctionImplementationRegistry());
-      this.buffers = new IncomingBuffers(fragment, this.context);
-      final FragmentStatusReporter reporter = new 
FragmentStatusReporter(this.context,
-          context.getController().getTunnel(fragment.getForeman()));
-      this.runner = new FragmentExecutor(this.context, fragment, reporter);
-      this.context.setBuffers(buffers);
-
-    } catch (ForemanException e) {
-      throw new FragmentSetupException("Failure while decoding fragment.", e);
-    }
-  }
-
-  /* (non-Javadoc)
-   * @see 
org.apache.drill.exec.work.fragment.FragmentHandler#handle(org.apache.drill.exec.rpc.AbstractRemoteConnection.ConnectionThrottle,
 org.apache.drill.exec.record.RawFragmentBatch)
-   */
-  @Override
-  public boolean handle(final IncomingDataBatch batch) throws 
FragmentSetupException, IOException {
-    return buffers.batchArrived(batch);
+  public NonRootFragmentManager(final PlanFragment fragment, final 
FragmentExecutor fragmentExecutor,
+                                final FragmentStatusReporter statusReporter) {
+    super(fragment, fragmentExecutor, statusReporter);
   }
 
   /* (non-Javadoc)
@@ -84,44 +49,17 @@ public class NonRootFragmentManager implements 
FragmentManager {
         return null;
       }
       runnerRetrieved = true;
-      return runner;
+      return super.getRunnable();
     }
   }
 
   @Override
   public void receivingFragmentFinished(final FragmentHandle handle) {
-    runner.receivingFragmentFinished(handle);
+    fragmentExecutor.receivingFragmentFinished(handle);
   }
 
   @Override
   public synchronized void cancel() {
-    cancel = true;
-    runner.cancel();
-  }
-
-  @Override
-  public boolean isCancelled() {
-    return cancel;
+    super.cancel();
   }
-
-  @Override
-  public void unpause() {
-    runner.unpause();
-  }
-
-  @Override
-  public FragmentHandle getHandle() {
-    return handle;
-  }
-
-  @Override
-  public boolean isWaiting() {
-    return !buffers.isDone() && !cancel;
-  }
-
-  @Override
-  public FragmentContext getFragmentContext() {
-    return context;
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/b06a7bde/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RootFragmentManager.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RootFragmentManager.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RootFragmentManager.java
index af81d17..4cbadc2 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RootFragmentManager.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RootFragmentManager.java
@@ -17,73 +17,22 @@
  */
 package org.apache.drill.exec.work.fragment;
 
-import java.io.IOException;
-
-import org.apache.drill.exec.exception.FragmentSetupException;
-import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.proto.BitControl.PlanFragment;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
-import org.apache.drill.exec.rpc.data.IncomingDataBatch;
-import org.apache.drill.exec.work.batch.IncomingBuffers;
 
-// TODO a lot of this is the same as NonRootFragmentManager
-public class RootFragmentManager implements FragmentManager {
+/**
+ * This managers determines when to run a root fragment node.
+ */
+public class RootFragmentManager extends AbstractFragmentManager {
   // private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(RootFragmentManager.class);
 
-  private final IncomingBuffers buffers;
-  private final FragmentExecutor runner;
-  private final FragmentHandle handle;
-  private volatile boolean cancel = false;
-
-  public RootFragmentManager(final FragmentHandle handle, final 
IncomingBuffers buffers, final FragmentExecutor runner) {
-    super();
-    this.handle = handle;
-    this.buffers = buffers;
-    this.runner = runner;
-  }
-
-  @Override
-  public boolean handle(final IncomingDataBatch batch) throws 
FragmentSetupException, IOException {
-    return buffers.batchArrived(batch);
+  public RootFragmentManager(final PlanFragment fragment, final 
FragmentExecutor fragmentExecutor,
+                             final FragmentStatusReporter statusReporter) {
+    super(fragment, fragmentExecutor, statusReporter);
   }
 
   @Override
   public void receivingFragmentFinished(final FragmentHandle handle) {
     throw new IllegalStateException("The root fragment should not be sending 
any messages to receiver.");
   }
-
-  @Override
-  public FragmentExecutor getRunnable() {
-    return runner;
-  }
-
-  public FragmentHandle getHandle() {
-    return handle;
-  }
-
-  @Override
-  public void cancel() {
-    cancel = true;
-    runner.cancel();
-  }
-
-  @Override
-  public boolean isCancelled() {
-    return cancel;
-  }
-
-  @Override
-  public void unpause() {
-    runner.unpause();
-  }
-
-  @Override
-  public boolean isWaiting() {
-    return !buffers.isDone() && !cancel;
-  }
-
-  @Override
-  public FragmentContext getFragmentContext() {
-    return runner.getContext();
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/b06a7bde/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/data/TestBitBitKerberos.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/data/TestBitBitKerberos.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/data/TestBitBitKerberos.java
index 8165b0d..3d6f507 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/data/TestBitBitKerberos.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/data/TestBitBitKerberos.java
@@ -28,6 +28,8 @@ import mockit.MockUp;
 import mockit.NonStrictExpectations;
 import org.apache.drill.BaseTestQuery;
 import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.config.DrillProperties;
+import org.apache.drill.common.exceptions.UserRemoteException;
 import org.apache.drill.common.scanner.ClassPathScanner;
 import org.apache.drill.common.scanner.persistence.ScanResult;
 import org.apache.drill.common.types.TypeProtos.MinorType;
@@ -41,6 +43,7 @@ import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
+import org.apache.drill.exec.proto.UserBitShared;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
 import org.apache.drill.exec.record.FragmentWritableBatch;
 import org.apache.drill.exec.record.MaterializedField;
@@ -50,6 +53,7 @@ import org.apache.drill.exec.rpc.RpcException;
 import org.apache.drill.exec.rpc.RpcOutcomeListener;
 import org.apache.drill.exec.rpc.control.WorkEventBus;
 import org.apache.drill.exec.rpc.security.KerberosHelper;
+import 
org.apache.drill.exec.rpc.user.security.testing.UserAuthenticatorTestImpl;
 import org.apache.drill.exec.server.BootStrapContext;
 import org.apache.drill.exec.server.options.SystemOptionManager;
 import org.apache.drill.exec.vector.Float8Vector;
@@ -66,6 +70,7 @@ import org.junit.Test;
 import java.io.IOException;
 import java.lang.reflect.Field;
 import java.util.List;
+import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -364,6 +369,98 @@ public class TestBitBitKerberos extends BaseTestQuery {
     }
   }
 
+  /**
+   * Test to validate that a query which is running only on local Foreman node 
runs fine even if the Bit-Bit
+   * Auth config is wrong. With DRILL-5721, all the local fragment setup and 
status update
+   * doesn't happen over Control tunnel but instead happens locally. Without 
the fix in DRILL-5721 these queries will
+   * hang.
+   *
+   * This test only starts up 1 Drillbit so that all fragments are scheduled 
on Foreman Drillbit node
+   * @throws Exception
+   */
+  @Test
+  public void localQuerySuccessWithWrongBitAuthConfig() throws Exception {
+
+    final Properties connectionProps = new Properties();
+    connectionProps.setProperty(DrillProperties.SERVICE_PRINCIPAL, 
krbHelper.SERVER_PRINCIPAL);
+    connectionProps.setProperty(DrillProperties.USER, 
krbHelper.CLIENT_PRINCIPAL);
+    connectionProps.setProperty(DrillProperties.KEYTAB, 
krbHelper.clientKeytab.getAbsolutePath());
+
+    newConfig = new 
DrillConfig(DrillConfig.create(cloneDefaultTestConfigProperties())
+        .withValue(ExecConstants.USER_AUTHENTICATION_ENABLED,
+            ConfigValueFactory.fromAnyRef(true))
+        .withValue(ExecConstants.USER_AUTHENTICATOR_IMPL,
+            ConfigValueFactory.fromAnyRef(UserAuthenticatorTestImpl.TYPE))
+        .withValue(BootStrapContext.SERVICE_PRINCIPAL,
+            ConfigValueFactory.fromAnyRef(krbHelper.SERVER_PRINCIPAL))
+        .withValue(BootStrapContext.SERVICE_KEYTAB_LOCATION,
+            ConfigValueFactory.fromAnyRef(krbHelper.serverKeytab.toString()))
+        .withValue(ExecConstants.AUTHENTICATION_MECHANISMS,
+            ConfigValueFactory.fromIterable(Lists.newArrayList("plain", 
"kerberos")))
+        .withValue(ExecConstants.BIT_AUTHENTICATION_ENABLED,
+            ConfigValueFactory.fromAnyRef(true))
+        .withValue(ExecConstants.BIT_AUTHENTICATION_MECHANISM,
+            ConfigValueFactory.fromAnyRef("kerberos"))
+        .withValue(ExecConstants.USE_LOGIN_PRINCIPAL,
+            ConfigValueFactory.fromAnyRef(false))
+        ,false);
+
+    updateTestCluster(1, newConfig, connectionProps);
+
+    // Run a query using the new client
+    final String query = getFile("queries/tpch/01.sql");
+    test(query);
+  }
+
+  /**
+   * Test to validate that query setup fails while scheduling remote fragments 
when multiple Drillbits are running with
+   * wrong Bit-to-Bit Authentication configuration.
+   *
+   * This test starts up 2 Drillbit so that there are combination of local and 
remote fragments for query
+   * execution. Note: When test runs with wrong config then for control 
connection Drillbit's uses wrong
+   * service principal to talk to another Drillbit, and due to this Kerby 
server also fails with NullPointerException.
+   * But for unit testing this should be fine.
+   * @throws Exception
+   */
+  @Test
+  public void queryFailureWithWrongBitAuthConfig() throws Exception {
+    try{
+      final Properties connectionProps = new Properties();
+      connectionProps.setProperty(DrillProperties.SERVICE_PRINCIPAL, 
krbHelper.SERVER_PRINCIPAL);
+      connectionProps.setProperty(DrillProperties.USER, 
krbHelper.CLIENT_PRINCIPAL);
+      connectionProps.setProperty(DrillProperties.KEYTAB, 
krbHelper.clientKeytab.getAbsolutePath());
+
+      newConfig = new 
DrillConfig(DrillConfig.create(cloneDefaultTestConfigProperties())
+          .withValue(ExecConstants.USER_AUTHENTICATION_ENABLED,
+              ConfigValueFactory.fromAnyRef(true))
+          .withValue(ExecConstants.USER_AUTHENTICATOR_IMPL,
+              ConfigValueFactory.fromAnyRef(UserAuthenticatorTestImpl.TYPE))
+          .withValue(BootStrapContext.SERVICE_PRINCIPAL,
+              ConfigValueFactory.fromAnyRef(krbHelper.SERVER_PRINCIPAL))
+          .withValue(BootStrapContext.SERVICE_KEYTAB_LOCATION,
+              ConfigValueFactory.fromAnyRef(krbHelper.serverKeytab.toString()))
+          .withValue(ExecConstants.AUTHENTICATION_MECHANISMS,
+              ConfigValueFactory.fromIterable(Lists.newArrayList("plain", 
"kerberos")))
+          .withValue(ExecConstants.BIT_AUTHENTICATION_ENABLED,
+              ConfigValueFactory.fromAnyRef(true))
+          .withValue(ExecConstants.BIT_AUTHENTICATION_MECHANISM,
+              ConfigValueFactory.fromAnyRef("kerberos"))
+          .withValue(ExecConstants.USE_LOGIN_PRINCIPAL,
+              ConfigValueFactory.fromAnyRef(false))
+          ,false);
+
+      updateTestCluster(2, newConfig, connectionProps);
+
+      test("alter session set `planner.slice_target` = 10");
+      final String query = getFile("queries/tpch/01.sql");
+      test(query);
+      fail();
+    } catch(Exception ex) {
+      assertTrue(ex instanceof UserRemoteException);
+      assertTrue(((UserRemoteException)ex).getErrorType() == 
UserBitShared.DrillPBError.ErrorType.CONNECTION);
+    }
+  }
+
   @AfterClass
   public static void cleanTest() throws Exception {
     krbHelper.stopKdc();

http://git-wip-us.apache.org/repos/asf/drill/blob/b06a7bde/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/security/TestUserBitKerberosEncryption.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/security/TestUserBitKerberosEncryption.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/security/TestUserBitKerberosEncryption.java
index b4d56ba..b9c0de2 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/security/TestUserBitKerberosEncryption.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/security/TestUserBitKerberosEncryption.java
@@ -132,9 +132,8 @@ public class TestUserBitKerberosEncryption extends 
BaseTestQuery {
    * For example: There is only 1 DrillClient so encrypted connection count of 
UserRpcMetrics will be 1. Before
    * running any query there should not be any connection (control or data) 
between Drillbits, hence those counters
    * are 0. After running a simple query since there is only 1 fragment which 
is root fragment the Control Connection
-   * count is 2 (for unencrypted counter) based on connection for status 
update of fragment to Foreman. It is 2 because
-   * for Control and Data Server we count total number of client and server 
connections on a node. There is no
-   * Data Connection because there is no data exchange between multiple 
fragments.
+   * count is 0 (for unencrypted counter) since with DRILL-5721 status update 
of fragment to Foreman happens locally.
+   * There is no Data Connection because there is no data exchange between 
multiple fragments.
    *
    * @throws Exception
    */
@@ -164,7 +163,7 @@ public class TestUserBitKerberosEncryption extends 
BaseTestQuery {
 
     // Check unencrypted counters value
     assertTrue(0 == 
UserRpcMetrics.getInstance().getUnEncryptedConnectionCount());
-    assertTrue(2 == 
ControlRpcMetrics.getInstance().getUnEncryptedConnectionCount());
+    assertTrue(0 == 
ControlRpcMetrics.getInstance().getUnEncryptedConnectionCount());
     assertTrue(0 == 
DataRpcMetrics.getInstance().getUnEncryptedConnectionCount());
   }
 
@@ -393,9 +392,8 @@ public class TestUserBitKerberosEncryption extends 
BaseTestQuery {
    * For example: There is only 1 DrillClient so encrypted connection count of 
UserRpcMetrics
    * will be 1. Before running any query there should not be any connection 
(control or data) between Drillbits,
    * hence those counters are 0. After running a simple query since there is 
only 1 fragment which is root fragment
-   * the Control Connection count is 2 (for encrypted counter) based on 
connection for status update of fragment to
-   * Foreman. It is 2 because for Control and Data Server we count total 
number of client and server connections on a
-   * node. There is no Data Connection because there is no data exchange 
between multiple fragments.
+   * the Control Connection count is 0 (for encrypted counter), since with 
DRILL-5721 status update of fragment to
+   * Foreman happens locally. There is no Data Connection because there is no 
data exchange between multiple fragments.
    */
 
   @Test
@@ -440,7 +438,7 @@ public class TestUserBitKerberosEncryption extends 
BaseTestQuery {
 
     // Check encrypted counters value
     assertTrue(1 == 
UserRpcMetrics.getInstance().getEncryptedConnectionCount());
-    assertTrue(2 == 
ControlRpcMetrics.getInstance().getEncryptedConnectionCount());
+    assertTrue(0 == 
ControlRpcMetrics.getInstance().getEncryptedConnectionCount());
     assertTrue(0 == 
DataRpcMetrics.getInstance().getEncryptedConnectionCount());
 
     // Check unencrypted counters value

Reply via email to