http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterBatchCreator.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterBatchCreator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterBatchCreator.java
index e9b3051..ace4f24 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterBatchCreator.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterBatchCreator.java
@@ -20,22 +20,18 @@ package org.apache.drill.exec.physical.impl.filter;
 import java.util.List;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
 import org.apache.drill.exec.physical.config.Filter;
 import org.apache.drill.exec.physical.impl.BatchCreator;
 import org.apache.drill.exec.record.RecordBatch;
 
 import com.google.common.base.Preconditions;
 
-public class FilterBatchCreator implements BatchCreator<Filter>{
-  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(FilterBatchCreator.class);
-
+public class FilterBatchCreator implements BatchCreator<Filter> {
   @Override
-  public FilterRecordBatch getBatch(FragmentContext context, Filter config, 
List<RecordBatch> children)
+  public FilterRecordBatch getBatch(ExecutorFragmentContext context, Filter 
config, List<RecordBatch> children)
       throws ExecutionSetupException {
     Preconditions.checkArgument(children.size() == 1);
     return new FilterRecordBatch(config, children.iterator().next(), context);
   }
-
-
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
index 1bdd097..f0b832a 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
@@ -42,9 +42,7 @@ import org.apache.drill.exec.vector.ValueVector;
 
 import com.google.common.collect.Lists;
 
-public class FilterRecordBatch extends AbstractSingleRecordBatch<Filter>{
-  //private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(FilterRecordBatch.class);
-
+public class FilterRecordBatch extends AbstractSingleRecordBatch<Filter> {
   private SelectionVector2 sv2;
   private SelectionVector4 sv4;
   private Filterer filter;
@@ -120,16 +118,7 @@ public class FilterRecordBatch extends 
AbstractSingleRecordBatch<Filter>{
          * logic that handles SV4 + filter should always be pushed beyond sort 
so disabling
          * it in FilterPrel.
          *
-
-        // set up the multi-batch selection vector
-        this.svAllocator = oContext.getAllocator().getNewPreAllocator();
-        if (!svAllocator.preAllocate(incoming.getRecordCount()*4))
-          throw new SchemaChangeException("Attempted to filter an SV4 which 
exceeds allowed memory (" +
-                                          incoming.getRecordCount() * 4 + " 
bytes)");
-        sv4 = new SelectionVector4(svAllocator.getAllocation(), 
incoming.getRecordCount(), Character.MAX_VALUE);
-        this.filter = generateSV4Filterer();
-        break;
-        */
+         */
       default:
         throw new UnsupportedOperationException();
     }
@@ -197,8 +186,6 @@ public class FilterRecordBatch extends 
AbstractSingleRecordBatch<Filter>{
       final TransferPair[] tx = transfers.toArray(new 
TransferPair[transfers.size()]);
       CodeGenerator<Filterer> codeGen = cg.getCodeGenerator();
       codeGen.plainJavaCapable(true);
-      // Uncomment out this line to debug the generated code.
-//    cg.saveCodeForDebugging(true);
       final Filterer filter = context.getImplementationClass(codeGen);
       filter.setup(context, incoming, this, tx);
       return filter;

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterSignature.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterSignature.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterSignature.java
index 74a5d16..db62d36 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterSignature.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterSignature.java
@@ -20,12 +20,12 @@ package org.apache.drill.exec.physical.impl.filter;
 import javax.inject.Named;
 
 import org.apache.drill.exec.compile.sig.CodeGeneratorSignature;
-import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.FragmentContextImpl;
 import org.apache.drill.exec.record.RecordBatch;
 
 public interface FilterSignature  extends CodeGeneratorSignature{
 
-  public void doSetup(@Named("context") FragmentContext context, 
@Named("incoming") RecordBatch incoming, @Named("outgoing") RecordBatch 
outgoing);
+  public void doSetup(@Named("context") FragmentContextImpl context, 
@Named("incoming") RecordBatch incoming, @Named("outgoing") RecordBatch 
outgoing);
   public boolean doEval(@Named("inIndex") int inIndex, @Named("outIndex") int 
outIndex);
 
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java
index d014a2e..52533bd 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java
@@ -27,9 +27,7 @@ import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.TransferPair;
 import org.apache.drill.exec.record.selection.SelectionVector2;
 
-public abstract class FilterTemplate2 implements Filterer{
-  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(FilterTemplate2.class);
-
+public abstract class FilterTemplate2 implements Filterer {
   private SelectionVector2 outgoingSelectionVector;
   private SelectionVector2 incomingSelectionVector;
   private SelectionVectorMode svMode;

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate4.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate4.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate4.java
index fd1f9e6..4850cff 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate4.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate4.java
@@ -26,8 +26,6 @@ import org.apache.drill.exec.record.TransferPair;
 import org.apache.drill.exec.record.selection.SelectionVector4;
 
 public abstract class FilterTemplate4 implements Filterer {
-  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(FilterTemplate4.class);
-
   private SelectionVector4 outgoingSelectionVector;
   private SelectionVector4 incomingSelectionVector;
   private TransferPair[] transfers;

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/Filterer.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/Filterer.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/Filterer.java
index aa45f54..a3d03c2 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/Filterer.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/Filterer.java
@@ -24,11 +24,9 @@ import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.TransferPair;
 
 public interface Filterer {
-  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(Filterer.class);
+  TemplateClassDefinition<Filterer> TEMPLATE_DEFINITION2 = new 
TemplateClassDefinition<Filterer>(Filterer.class, FilterTemplate2.class);
+  TemplateClassDefinition<Filterer> TEMPLATE_DEFINITION4 = new 
TemplateClassDefinition<Filterer>(Filterer.class, FilterTemplate4.class);
 
-  public void setup(FragmentContext context, RecordBatch incoming, RecordBatch 
outgoing, TransferPair[] transfers) throws SchemaChangeException;
-  public void filterBatch(int recordCount) throws SchemaChangeException;
-
-  public static TemplateClassDefinition<Filterer> TEMPLATE_DEFINITION2 = new 
TemplateClassDefinition<Filterer>(Filterer.class, FilterTemplate2.class);
-  public static TemplateClassDefinition<Filterer> TEMPLATE_DEFINITION4 = new 
TemplateClassDefinition<Filterer>(Filterer.class, FilterTemplate4.class);
+  void setup(FragmentContext context, RecordBatch incoming, RecordBatch 
outgoing, TransferPair[] transfers) throws SchemaChangeException;
+  void filterBatch(int recordCount) throws SchemaChangeException;
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenBatchCreator.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenBatchCreator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenBatchCreator.java
index 94203d8..bfda4f4 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenBatchCreator.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenBatchCreator.java
@@ -20,21 +20,18 @@ package org.apache.drill.exec.physical.impl.flatten;
 import java.util.List;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
 import org.apache.drill.exec.physical.config.FlattenPOP;
 import org.apache.drill.exec.physical.impl.BatchCreator;
 import org.apache.drill.exec.record.RecordBatch;
 
 import com.google.common.base.Preconditions;
 
-public class FlattenBatchCreator implements BatchCreator<FlattenPOP>{
-  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(FlattenBatchCreator.class);
-
+public class FlattenBatchCreator implements BatchCreator<FlattenPOP> {
   @Override
-  public FlattenRecordBatch getBatch(FragmentContext context, FlattenPOP 
config, List<RecordBatch> children)
+  public FlattenRecordBatch getBatch(ExecutorFragmentContext context, 
FlattenPOP config, List<RecordBatch> children)
       throws ExecutionSetupException {
     Preconditions.checkArgument(children.size() == 1);
     return new FlattenRecordBatch(config, children.iterator().next(), context);
   }
-
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
index 2aa841b..8be16ad 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
@@ -303,8 +303,6 @@ public class FlattenRecordBatch extends 
AbstractSingleRecordBatch<FlattenPOP> {
 
     final ClassGenerator<Flattener> cg = 
CodeGenerator.getRoot(Flattener.TEMPLATE_DEFINITION, context.getOptions());
     cg.getCodeGenerator().plainJavaCapable(true);
-    // Uncomment out this line to debug the generated code.
-    // cg.getCodeGenerator().saveCodeForDebugging(true);
     final IntHashSet transferFieldIds = new IntHashSet();
 
     final NamedExpression flattenExpr = new 
NamedExpression(popConfig.getColumn(), new 
FieldReference(popConfig.getColumn()));

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/Flattener.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/Flattener.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/Flattener.java
index 5293060..392757e 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/Flattener.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/Flattener.java
@@ -27,9 +27,11 @@ import org.apache.drill.exec.record.TransferPair;
 import org.apache.drill.exec.vector.complex.RepeatedValueVector;
 
 public interface Flattener {
-  public void setup(FragmentContext context, RecordBatch incoming,  
RecordBatch outgoing, List<TransferPair> transfers)  throws 
SchemaChangeException;
+  TemplateClassDefinition<Flattener> TEMPLATE_DEFINITION = new 
TemplateClassDefinition<Flattener>(Flattener.class, FlattenTemplate.class);
 
-  public interface Monitor {
+  void setup(FragmentContext context, RecordBatch incoming, RecordBatch 
outgoing, List<TransferPair> transfers)  throws SchemaChangeException;
+
+  interface Monitor {
     /**
      * Get the required buffer size for the specified number of records.
      * {@see ValueVector#getBufferSizeFor(int)} for the meaning of this.
@@ -37,14 +39,14 @@ public interface Flattener {
      * @param recordCount the number of records processed so far
      * @return the buffer size the vectors report as being in use
      */
-    public int getBufferSizeFor(int recordCount);
-  };
+    int getBufferSizeFor(int recordCount);
+  }
+
+  int flattenRecords(int recordCount, int firstOutputIndex, Monitor monitor);
 
-  public int flattenRecords(int recordCount, int firstOutputIndex, Monitor 
monitor);
+  void setFlattenField(RepeatedValueVector repeatedColumn);
 
-  public void setFlattenField(RepeatedValueVector repeatedColumn);
-  public RepeatedValueVector getFlattenField();
-  public void resetGroupIndex();
+  RepeatedValueVector getFlattenField();
 
-  public static final TemplateClassDefinition<Flattener> TEMPLATE_DEFINITION = 
new TemplateClassDefinition<Flattener>(Flattener.class, FlattenTemplate.class);
+  void resetGroupIndex();
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index 7b679c0..e087bc8 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -256,12 +256,9 @@ public class HashJoinBatch extends 
AbstractBinaryRecordBatch<HashJoinPOP> {
 
       // No more output records, clean up and return
       state = BatchState.DONE;
-      //            if (first) {
-      //              return IterOutcome.OK_NEW_SCHEMA;
-      //            }
       return IterOutcome.NONE;
     } catch (ClassTransformationException | SchemaChangeException | 
IOException e) {
-      context.fail(e);
+      context.getExecutorState().fail(e);
       killIncoming(false);
       return IterOutcome.STOP;
     }
@@ -405,8 +402,6 @@ public class HashJoinBatch extends 
AbstractBinaryRecordBatch<HashJoinPOP> {
   public HashJoinProbe setupHashJoinProbe() throws 
ClassTransformationException, IOException {
     final CodeGenerator<HashJoinProbe> cg = 
CodeGenerator.get(HashJoinProbe.TEMPLATE_DEFINITION, context.getOptions());
     cg.plainJavaCapable(true);
-    // Uncomment out this line to debug the generated code.
-    // cg.saveCodeForDebugging(true);
     final ClassGenerator<HashJoinProbe> g = cg.getRoot();
 
     // Generate the code to project build side records

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatchCreator.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatchCreator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatchCreator.java
index 1402769..a005559 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatchCreator.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatchCreator.java
@@ -20,7 +20,7 @@ package org.apache.drill.exec.physical.impl.join;
 import java.util.List;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
 import org.apache.drill.exec.physical.config.HashJoinPOP;
 import org.apache.drill.exec.physical.impl.BatchCreator;
 import org.apache.drill.exec.record.RecordBatch;
@@ -30,7 +30,7 @@ import com.google.common.base.Preconditions;
 public class HashJoinBatchCreator implements BatchCreator<HashJoinPOP> {
 
   @Override
-  public HashJoinBatch getBatch(FragmentContext context, HashJoinPOP config, 
List<RecordBatch> children)
+  public HashJoinBatch getBatch(ExecutorFragmentContext context, HashJoinPOP 
config, List<RecordBatch> children)
       throws ExecutionSetupException {
     Preconditions.checkArgument(children.size() == 2);
     return new HashJoinBatch(config, context, children.get(0), 
children.get(1));

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java
index cc6bd55..4ef28e6 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java
@@ -30,7 +30,7 @@ import org.apache.drill.exec.record.VectorContainer;
 import org.apache.calcite.rel.core.JoinRelType;
 
 public interface HashJoinProbe {
-  public static TemplateClassDefinition<HashJoinProbe> TEMPLATE_DEFINITION = 
new TemplateClassDefinition<HashJoinProbe>(HashJoinProbe.class, 
HashJoinProbeTemplate.class);
+  TemplateClassDefinition<HashJoinProbe> TEMPLATE_DEFINITION = new 
TemplateClassDefinition<HashJoinProbe>(HashJoinProbe.class, 
HashJoinProbeTemplate.class);
 
   /* The probe side of the hash join can be in the following two states
    * 1. PROBE_PROJECT: Inner join case, we probe our hash table to see if we 
have a
@@ -40,15 +40,15 @@ public interface HashJoinProbe {
    *    case we handle it internally by projecting the record if there isn't a 
match on the build side
    * 3. DONE: Once we have projected all possible records we are done
    */
-  public static enum ProbeState {
+  enum ProbeState {
     PROBE_PROJECT, PROJECT_RIGHT, DONE
   }
 
-  public abstract void setupHashJoinProbe(FragmentContext context, 
VectorContainer buildBatch, RecordBatch probeBatch,
-                                          int probeRecordCount, HashJoinBatch 
outgoing, HashTable hashTable, HashJoinHelper hjHelper,
-                                          JoinRelType joinRelType);
-  public abstract void doSetup(FragmentContext context, VectorContainer 
buildBatch, RecordBatch probeBatch, RecordBatch outgoing);
-  public abstract int  probeAndProject() throws SchemaChangeException, 
ClassTransformationException, IOException;
-  public abstract void projectBuildRecord(int buildIndex, int outIndex);
-  public abstract void projectProbeRecord(int probeIndex, int outIndex);
+  void setupHashJoinProbe(FragmentContext context, VectorContainer buildBatch, 
RecordBatch probeBatch,
+                          int probeRecordCount, HashJoinBatch outgoing, 
HashTable hashTable, HashJoinHelper hjHelper,
+                          JoinRelType joinRelType);
+  void doSetup(FragmentContext context, VectorContainer buildBatch, 
RecordBatch probeBatch, RecordBatch outgoing);
+  int  probeAndProject() throws SchemaChangeException, 
ClassTransformationException, IOException;
+  void projectBuildRecord(int buildIndex, int outIndex);
+  void projectProbeRecord(int probeIndex, int outIndex);
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinWorker.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinWorker.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinWorker.java
index 55322f8..95f7c3d 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinWorker.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinWorker.java
@@ -24,15 +24,12 @@ import org.apache.drill.exec.record.VectorContainer;
 
 
 public interface JoinWorker {
+  TemplateClassDefinition<JoinWorker> TEMPLATE_DEFINITION = new 
TemplateClassDefinition<>(JoinWorker.class, JoinTemplate.class);
 
-  public static enum JoinOutcome {
+  enum JoinOutcome {
     NO_MORE_DATA, BATCH_RETURNED, SCHEMA_CHANGED, WAITING, FAILURE;
   }
 
-  public void setupJoin(FragmentContext context, JoinStatus status, 
VectorContainer outgoing) throws SchemaChangeException;
-  public boolean doJoin(JoinStatus status);
-
-  public static TemplateClassDefinition<JoinWorker> TEMPLATE_DEFINITION = new 
TemplateClassDefinition<>(
-    JoinWorker.class, JoinTemplate.class);
-
+  void setupJoin(FragmentContext context, JoinStatus status, VectorContainer 
outgoing) throws SchemaChangeException;
+  boolean doJoin(JoinStatus status);
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
index 8ad3f84..1ed4722 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
@@ -199,7 +199,7 @@ public class MergeJoinBatch extends 
AbstractRecordBatch<MergeJoinPOP> {
           this.worker = generateNewWorker();
           first = true;
         } catch (ClassTransformationException | IOException | 
SchemaChangeException e) {
-          context.fail(new SchemaChangeException(e));
+          context.getExecutorState().fail(new SchemaChangeException(e));
           kill(false);
           return IterOutcome.STOP;
         } finally {
@@ -269,12 +269,9 @@ public class MergeJoinBatch extends 
AbstractRecordBatch<MergeJoinPOP> {
     right.kill(sendUpstream);
   }
 
-  private JoinWorker generateNewWorker() throws ClassTransformationException, 
IOException, SchemaChangeException{
-
+  private JoinWorker generateNewWorker() throws ClassTransformationException, 
IOException, SchemaChangeException {
     final ClassGenerator<JoinWorker> cg = 
CodeGenerator.getRoot(JoinWorker.TEMPLATE_DEFINITION, context.getOptions());
     cg.getCodeGenerator().plainJavaCapable(true);
-    // Uncomment out this line to debug the generated code.
-    // cg.getCodeGenerator().saveCodeForDebugging(true);
     final ErrorCollector collector = new ErrorCollectorImpl();
 
     // Generate members and initialization code

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinCreator.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinCreator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinCreator.java
index 24f5533..b24624e 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinCreator.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinCreator.java
@@ -20,7 +20,7 @@ package org.apache.drill.exec.physical.impl.join;
 import java.util.List;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
 import org.apache.drill.exec.physical.config.MergeJoinPOP;
 import org.apache.drill.exec.physical.impl.BatchCreator;
 import org.apache.drill.exec.record.RecordBatch;
@@ -29,10 +29,8 @@ import org.apache.calcite.rel.core.JoinRelType;
 import com.google.common.base.Preconditions;
 
 public class MergeJoinCreator implements BatchCreator<MergeJoinPOP> {
-  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(MergeJoinCreator.class);
-
   @Override
-  public MergeJoinBatch getBatch(FragmentContext context, MergeJoinPOP config, 
List<RecordBatch> children)
+  public MergeJoinBatch getBatch(ExecutorFragmentContext context, MergeJoinPOP 
config, List<RecordBatch> children)
       throws ExecutionSetupException {
     Preconditions.checkArgument(children.size() == 2);
     if(config.getJoinType() == JoinRelType.RIGHT){
@@ -40,6 +38,5 @@ public class MergeJoinCreator implements 
BatchCreator<MergeJoinPOP> {
     }else{
       return new MergeJoinBatch(config, context, children.get(0), 
children.get(1));
     }
-
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatchCreator.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatchCreator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatchCreator.java
index 2e708a6..ef4cab1 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatchCreator.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatchCreator.java
@@ -20,14 +20,14 @@ package org.apache.drill.exec.physical.impl.join;
 import java.util.List;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
 import org.apache.drill.exec.physical.config.NestedLoopJoinPOP;
 import org.apache.drill.exec.physical.impl.BatchCreator;
 import org.apache.drill.exec.record.RecordBatch;
 
 public class NestedLoopJoinBatchCreator implements 
BatchCreator<NestedLoopJoinPOP> {
   @Override
-  public NestedLoopJoinBatch getBatch(FragmentContext context, 
NestedLoopJoinPOP config, List<RecordBatch> children)
+  public NestedLoopJoinBatch getBatch(ExecutorFragmentContext context, 
NestedLoopJoinPOP config, List<RecordBatch> children)
       throws ExecutionSetupException {
     return new NestedLoopJoinBatch(config, context, children.get(0), 
children.get(1));
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitBatchCreator.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitBatchCreator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitBatchCreator.java
index f954e72..15e5275 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitBatchCreator.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitBatchCreator.java
@@ -20,7 +20,7 @@ package org.apache.drill.exec.physical.impl.limit;
 import java.util.List;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
 import org.apache.drill.exec.physical.config.Limit;
 import org.apache.drill.exec.physical.impl.BatchCreator;
 import org.apache.drill.exec.record.RecordBatch;
@@ -29,7 +29,7 @@ import com.google.common.collect.Iterables;
 
 public class LimitBatchCreator implements BatchCreator<Limit> {
   @Override
-  public LimitRecordBatch getBatch(FragmentContext context, Limit config, 
List<RecordBatch> children)
+  public LimitRecordBatch getBatch(ExecutorFragmentContext context, Limit 
config, List<RecordBatch> children)
       throws ExecutionSetupException {
     return new LimitRecordBatch(config, context, 
Iterables.getOnlyElement(children));
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
index f9ceff2..7e5ff21 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
@@ -41,6 +41,7 @@ import 
org.apache.drill.exec.expr.ClassGenerator.HoldingContainer;
 import org.apache.drill.exec.expr.CodeGenerator;
 import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
 import org.apache.drill.exec.expr.fn.FunctionGenerationHelper;
+import org.apache.drill.exec.ops.ExchangeFragmentContext;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.MetricDef;
 import org.apache.drill.exec.physical.MinorFragmentEndpoint;
@@ -96,7 +97,7 @@ public class MergingRecordBatch extends 
AbstractRecordBatch<MergingReceiverPOP>
 
   private RecordBatchLoader[] batchLoaders;
   private final RawFragmentBatchProvider[] fragProviders;
-  private final FragmentContext context;
+  private final ExchangeFragmentContext context;
   private VectorContainer outgoingContainer;
   private MergingReceiverGeneratorBase merger;
   private final MergingReceiverPOP config;
@@ -109,12 +110,11 @@ public class MergingRecordBatch extends 
AbstractRecordBatch<MergingReceiverPOP>
   private RawFragmentBatch[] incomingBatches;
   private int[] batchOffsets;
   private PriorityQueue <Node> pqueue;
-  private RawFragmentBatch emptyBatch = null;
   private RawFragmentBatch[] tempBatchHolder;
   private long[] inputCounts;
   private long[] outputCounts;
 
-  public static enum Metric implements MetricDef{
+  public enum Metric implements MetricDef {
     BYTES_RECEIVED,
     NUM_SENDERS,
     NEXT_WAIT_NANOS;
@@ -125,7 +125,7 @@ public class MergingRecordBatch extends 
AbstractRecordBatch<MergingReceiverPOP>
     }
   }
 
-  public MergingRecordBatch(final FragmentContext context,
+  public MergingRecordBatch(final ExchangeFragmentContext context,
                             final MergingReceiverPOP config,
                             final RawFragmentBatchProvider[] fragProviders) 
throws OutOfMemoryException {
     super(config, context, true, context.newOperatorContext(config));
@@ -210,11 +210,11 @@ public class MergingRecordBatch extends 
AbstractRecordBatch<MergingReceiverPOP>
           try {
             rawBatch = getNext(p);
           } catch (final IOException e) {
-            context.fail(e);
+            context.getExecutorState().fail(e);
             return IterOutcome.STOP;
           }
         }
-        if (rawBatch == null && !context.shouldContinue()) {
+        if (rawBatch == null && !context.getExecutorState().shouldContinue()) {
           clearBatches(rawBatches);
           return IterOutcome.STOP;
         }
@@ -241,12 +241,12 @@ public class MergingRecordBatch extends 
AbstractRecordBatch<MergingReceiverPOP>
             while ((rawBatch = getNext(p)) != null && 
rawBatch.getHeader().getDef().getRecordCount() == 0) {
               // Do nothing
             }
-            if (rawBatch == null && !context.shouldContinue()) {
+            if (rawBatch == null && 
!context.getExecutorState().shouldContinue()) {
               clearBatches(rawBatches);
               return IterOutcome.STOP;
             }
           } catch (final IOException e) {
-            context.fail(e);
+            context.getExecutorState().fail(e);
             clearBatches(rawBatches);
             return IterOutcome.STOP;
           }
@@ -315,7 +315,7 @@ public class MergingRecordBatch extends 
AbstractRecordBatch<MergingReceiverPOP>
           // SchemaChangeException, so check/clean catch clause below.
         } catch(final SchemaChangeException e) {
           logger.error("MergingReceiver failed to load record batch from 
remote host.  {}", e);
-          context.fail(e);
+          context.getExecutorState().fail(e);
           return IterOutcome.STOP;
         }
         batch.release();
@@ -328,7 +328,7 @@ public class MergingRecordBatch extends 
AbstractRecordBatch<MergingReceiverPOP>
       // Ensure all the incoming batches have the identical schema.
       // Note: RecordBatchLoader permutes the columns to obtain the same 
columns order for all batches.
       if (!isSameSchemaAmongBatches(batchLoaders)) {
-        context.fail(new SchemaChangeException("Incoming batches for merging 
receiver have different schemas!"));
+        context.getExecutorState().fail(new SchemaChangeException("Incoming 
batches for merging receiver have different schemas!"));
         return IterOutcome.STOP;
       }
 
@@ -351,7 +351,7 @@ public class MergingRecordBatch extends 
AbstractRecordBatch<MergingReceiverPOP>
         merger = createMerger();
       } catch (final SchemaChangeException e) {
         logger.error("Failed to generate code for MergingReceiver.  {}", e);
-        context.fail(e);
+        context.getExecutorState().fail(e);
         return IterOutcome.STOP;
       }
 
@@ -380,12 +380,12 @@ public class MergingRecordBatch extends 
AbstractRecordBatch<MergingReceiverPOP>
             } else {
               batchLoaders[b].clear();
               batchLoaders[b] = null;
-              if (!context.shouldContinue()) {
+              if (!context.getExecutorState().shouldContinue()) {
                 return IterOutcome.STOP;
               }
             }
           } catch (IOException | SchemaChangeException e) {
-            context.fail(e);
+            context.getExecutorState().fail(e);
             return IterOutcome.STOP;
           }
         }
@@ -418,11 +418,11 @@ public class MergingRecordBatch extends 
AbstractRecordBatch<MergingReceiverPOP>
 
           assert nextBatch != null || inputCounts[node.batchId] == 
outputCounts[node.batchId]
               : String.format("Stream %d input count: %d output count %d", 
node.batchId, inputCounts[node.batchId], outputCounts[node.batchId]);
-          if (nextBatch == null && !context.shouldContinue()) {
+          if (nextBatch == null && 
!context.getExecutorState().shouldContinue()) {
             return IterOutcome.STOP;
           }
         } catch (final IOException e) {
-          context.fail(e);
+          context.getExecutorState().fail(e);
           return IterOutcome.STOP;
         }
 
@@ -456,7 +456,7 @@ public class MergingRecordBatch extends 
AbstractRecordBatch<MergingReceiverPOP>
           // TODO:  Clean:  DRILL-2933:  That load(...) no longer throws
           // SchemaChangeException, so check/clean catch clause below.
         } catch(final SchemaChangeException ex) {
-          context.fail(ex);
+          context.getExecutorState().fail(ex);
           return IterOutcome.STOP;
         }
         incomingBatches[node.batchId].release();
@@ -548,7 +548,7 @@ public class MergingRecordBatch extends 
AbstractRecordBatch<MergingReceiverPOP>
         }
         final RawFragmentBatch batch = getNext(i);
         if (batch == null) {
-          if (!context.shouldContinue()) {
+          if (!context.getExecutorState().shouldContinue()) {
             state = BatchState.STOP;
           } else {
             state = BatchState.DONE;
@@ -605,7 +605,9 @@ public class MergingRecordBatch extends 
AbstractRecordBatch<MergingReceiverPOP>
               .setReceiver(context.getHandle())
               .setSender(sender)
               .build();
-      
context.getControlTunnel(providingEndpoint.getEndpoint()).informReceiverFinished(new
 OutcomeListener(), finishedReceiver);
+      context.getController()
+        .getTunnel(providingEndpoint.getEndpoint())
+        .informReceiverFinished(new OutcomeListener(), finishedReceiver);
     }
   }
 
@@ -624,10 +626,10 @@ public class MergingRecordBatch extends 
AbstractRecordBatch<MergingReceiverPOP>
 
     @Override
     public void interrupted(final InterruptedException e) {
-      if (context.shouldContinue()) {
+      if (context.getExecutorState().shouldContinue()) {
         final String errMsg = "Received an interrupt RPC outcome while sending 
ReceiverFinished message";
         logger.error(errMsg, e);
-        context.fail(new RpcException(errMsg, e));
+        context.getExecutorState().fail(new RpcException(errMsg, e));
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
index 7f662ae..9e82af8 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
@@ -196,7 +196,7 @@ public class OrderedPartitionRecordBatch extends 
AbstractRecordBatch<OrderedPart
         }
       }
       VectorContainer sortedSamples = new VectorContainer();
-      builder.build(context, sortedSamples);
+      builder.build(sortedSamples);
 
       // Sort the records according the orderings given in the configuration
 
@@ -262,7 +262,7 @@ public class OrderedPartitionRecordBatch extends 
AbstractRecordBatch<OrderedPart
         Thread.sleep(timeout);
         return true;
       } catch (final InterruptedException e) {
-        if (!context.shouldContinue()) {
+        if (!context.getExecutorState().shouldContinue()) {
           return false;
         }
       }
@@ -329,7 +329,7 @@ public class OrderedPartitionRecordBatch extends 
AbstractRecordBatch<OrderedPart
 
     } catch (final ClassTransformationException | IOException | 
SchemaChangeException ex) {
       kill(false);
-      context.fail(ex);
+      context.getExecutorState().fail(ex);
       return false;
       // TODO InterruptedException
     }
@@ -349,7 +349,7 @@ public class OrderedPartitionRecordBatch extends 
AbstractRecordBatch<OrderedPart
       for (CachedVectorContainer w : mmap.get(mapKey)) {
         containerBuilder.add(w.get());
       }
-      containerBuilder.build(context, allSamplesContainer);
+      containerBuilder.build(allSamplesContainer);
 
       List<Ordering> orderDefs = Lists.newArrayList();
       int i = 0;
@@ -390,7 +390,7 @@ public class OrderedPartitionRecordBatch extends 
AbstractRecordBatch<OrderedPart
       candidatePartitionTable.setRecordCount(copier.getOutputRecords());
       @SuppressWarnings("resource")
       WritableBatch batch = 
WritableBatch.getBatchNoHVWrap(candidatePartitionTable.getRecordCount(), 
candidatePartitionTable, false);
-      wrap = new CachedVectorContainer(batch, 
context.getDrillbitContext().getAllocator());
+      wrap = new CachedVectorContainer(batch, context.getAllocator());
       tableMap.putIfAbsent(mapKey + "final", wrap, 1, TimeUnit.MINUTES);
     } finally {
       candidatePartitionTable.clear();
@@ -486,7 +486,7 @@ public class OrderedPartitionRecordBatch extends 
AbstractRecordBatch<OrderedPart
       } catch (SchemaChangeException ex) {
         kill(false);
         logger.error("Failure during query", ex);
-        context.fail(ex);
+        context.getExecutorState().fail(ex);
         return IterOutcome.STOP;
       }
       doWork(vc);
@@ -519,7 +519,7 @@ public class OrderedPartitionRecordBatch extends 
AbstractRecordBatch<OrderedPart
       } catch (SchemaChangeException ex) {
         kill(false);
         logger.error("Failure during query", ex);
-        context.fail(ex);
+        context.getExecutorState().fail(ex);
         return IterOutcome.STOP;
       }
       doWork(vc);
@@ -550,7 +550,7 @@ public class OrderedPartitionRecordBatch extends 
AbstractRecordBatch<OrderedPart
       } catch (SchemaChangeException ex) {
         kill(false);
         logger.error("Failure during query", ex);
-        context.fail(ex);
+        context.getExecutorState().fail(ex);
         return IterOutcome.STOP;
       }
       // fall through.

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionSenderCreator.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionSenderCreator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionSenderCreator.java
index d2e07e7..5705aca 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionSenderCreator.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionSenderCreator.java
@@ -20,7 +20,7 @@ package 
org.apache.drill.exec.physical.impl.orderedpartitioner;
 import java.util.List;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
 import org.apache.drill.exec.physical.config.HashPartitionSender;
 import org.apache.drill.exec.physical.config.OrderedPartitionSender;
 import org.apache.drill.exec.physical.impl.RootCreator;
@@ -61,7 +61,7 @@ public class OrderedPartitionSenderCreator implements 
RootCreator<OrderedPartiti
 
   @SuppressWarnings("resource")
   @Override
-  public RootExec getRoot(FragmentContext context, OrderedPartitionSender 
config,
+  public RootExec getRoot(ExecutorFragmentContext context, 
OrderedPartitionSender config,
       List<RecordBatch> children) throws ExecutionSetupException {
     Preconditions.checkArgument(children.size() == 1);
 

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderCreator.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderCreator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderCreator.java
index 06fd115..e0b7b9a 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderCreator.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderCreator.java
@@ -20,7 +20,7 @@ package org.apache.drill.exec.physical.impl.partitionsender;
 import java.util.List;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
 import org.apache.drill.exec.physical.config.HashPartitionSender;
 import org.apache.drill.exec.physical.impl.RootCreator;
 import org.apache.drill.exec.physical.impl.RootExec;
@@ -29,7 +29,7 @@ import org.apache.drill.exec.record.RecordBatch;
 public class PartitionSenderCreator implements 
RootCreator<HashPartitionSender> {
 
   @Override
-  public RootExec getRoot(FragmentContext context,
+  public RootExec getRoot(ExecutorFragmentContext context,
                           HashPartitionSender config,
                           List<RecordBatch> children) throws 
ExecutionSetupException {
 

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
index 108d539..25be50a 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
@@ -33,9 +33,10 @@ import org.apache.drill.exec.expr.ClassGenerator;
 import org.apache.drill.exec.expr.CodeGenerator;
 import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
 import org.apache.drill.exec.ops.AccountingDataTunnel;
-import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.ExchangeFragmentContext;
 import org.apache.drill.exec.ops.MetricDef;
 import org.apache.drill.exec.ops.OperatorStats;
+import org.apache.drill.exec.ops.RootFragmentContext;
 import org.apache.drill.exec.physical.MinorFragmentEndpoint;
 import org.apache.drill.exec.physical.config.HashPartitionSender;
 import org.apache.drill.exec.physical.impl.BaseRootExec;
@@ -63,7 +64,7 @@ public class PartitionSenderRootExec extends BaseRootExec {
   private HashPartitionSender operator;
   private PartitionerDecorator partitioner;
 
-  private FragmentContext context;
+  private ExchangeFragmentContext context;
   private boolean ok = true;
   private final int outGoingBatchCount;
   private final HashPartitionSender popConfig;
@@ -98,13 +99,13 @@ public class PartitionSenderRootExec extends BaseRootExec {
     }
   }
 
-  public PartitionSenderRootExec(FragmentContext context,
+  public PartitionSenderRootExec(RootFragmentContext context,
                                  RecordBatch incoming,
                                  HashPartitionSender operator) throws 
OutOfMemoryException {
     this(context, incoming, operator, false);
   }
 
-  public PartitionSenderRootExec(FragmentContext context,
+  public PartitionSenderRootExec(RootFragmentContext context,
                                  RecordBatch incoming,
                                  HashPartitionSender operator,
                                  boolean closeIncoming) throws 
OutOfMemoryException {
@@ -173,7 +174,7 @@ public class PartitionSenderRootExec extends BaseRootExec {
         } catch (IOException e) {
           incoming.kill(false);
           logger.error("Error while creating partitioning sender or flushing 
outgoing batches", e);
-          context.fail(e);
+          context.getExecutorState().fail(e);
         }
         return false;
 
@@ -203,19 +204,19 @@ public class PartitionSenderRootExec extends BaseRootExec 
{
         } catch (IOException e) {
           incoming.kill(false);
           logger.error("Error while flushing outgoing batches", e);
-          context.fail(e);
+          context.getExecutorState().fail(e);
           return false;
         } catch (SchemaChangeException e) {
           incoming.kill(false);
           logger.error("Error while setting up partitioner", e);
-          context.fail(e);
+          context.getExecutorState().fail(e);
           return false;
         }
       case OK:
         try {
           partitioner.partitionBatch(incoming);
         } catch (IOException e) {
-          context.fail(e);
+          context.getExecutorState().fail(e);
           incoming.kill(false);
           return false;
         }

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java
index 95a4813..5d1b08c 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java
@@ -22,34 +22,33 @@ import java.util.List;
 
 import org.apache.drill.exec.compile.TemplateClassDefinition;
 import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.ExchangeFragmentContext;
 import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.ops.OperatorStats;
 import org.apache.drill.exec.physical.config.HashPartitionSender;
 import org.apache.drill.exec.record.RecordBatch;
 
 public interface Partitioner {
+  void setup(ExchangeFragmentContext context,
+             RecordBatch incoming,
+             HashPartitionSender popConfig,
+             OperatorStats stats,
+             OperatorContext oContext,
+             int start, int count) throws SchemaChangeException;
 
-  public abstract void setup(FragmentContext context,
-                          RecordBatch incoming,
-                          HashPartitionSender popConfig,
-                          OperatorStats stats,
-                          OperatorContext oContext,
-                          int start, int count) throws SchemaChangeException;
-
-  public abstract void partitionBatch(RecordBatch incoming) throws IOException;
-  public abstract void flushOutgoingBatches(boolean isLastBatch, boolean 
schemaChanged) throws IOException;
-  public abstract void initialize();
-  public abstract void clear();
-  public abstract List<? extends PartitionOutgoingBatch> getOutgoingBatches();
+  void partitionBatch(RecordBatch incoming) throws IOException;
+  void flushOutgoingBatches(boolean isLastBatch, boolean schemaChanged) throws 
IOException;
+  void initialize();
+  void clear();
+  List<? extends PartitionOutgoingBatch> getOutgoingBatches();
   /**
    * Method to get PartitionOutgoingBatch based on the fact that there can be 
> 1 Partitioner
    * @param index
    * @return PartitionOutgoingBatch that matches index within Partitioner. 
This method can
    * return null if index does not fall within boundary of this Partitioner
    */
-  public abstract PartitionOutgoingBatch getOutgoingBatch(int index);
-  public abstract OperatorStats getStats();
+  PartitionOutgoingBatch getOutgoingBatch(int index);
+  OperatorStats getStats();
 
-  public static TemplateClassDefinition<Partitioner> TEMPLATE_DEFINITION = new 
TemplateClassDefinition<>(Partitioner.class, PartitionerTemplate.class);
+  TemplateClassDefinition<Partitioner> TEMPLATE_DEFINITION = new 
TemplateClassDefinition<>(Partitioner.class, PartitionerTemplate.class);
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.java
index 042222a..78b8d03 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.java
@@ -57,7 +57,7 @@ public class PartitionerDecorator {
     this.partitioners = partitioners;
     this.stats = stats;
     this.context = context;
-    this.executor = context.getDrillbitContext().getExecutor();
+    this.executor = context.getExecutor();
     this.tName = Thread.currentThread().getName();
     this.childThreadPrefix = "Partitioner-" + tName + "-";
   }
@@ -177,7 +177,7 @@ public class PartitionerDecorator {
           break;
         } catch (final InterruptedException e) {
           // If the fragment state says we shouldn't continue, cancel or 
interrupt partitioner threads
-          if (!context.shouldContinue()) {
+          if (!context.getExecutorState().shouldContinue()) {
             logger.debug("Interrupting partioner threads. Fragment thread {}", 
tName);
             for(Future<?> f : taskFutures) {
               f.cancel(true);

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
index aa72c44..0d52b53 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
@@ -29,6 +29,7 @@ import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.ops.AccountingDataTunnel;
+import org.apache.drill.exec.ops.ExchangeFragmentContext;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.ops.OperatorStats;
@@ -81,7 +82,7 @@ public abstract class PartitionerTemplate implements 
Partitioner {
   }
 
   @Override
-  public final void setup(FragmentContext context,
+  public final void setup(ExchangeFragmentContext context,
                           RecordBatch incoming,
                           HashPartitionSender popConfig,
                           OperatorStats stats,

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
index 3afa852..bbcb758 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
@@ -68,8 +68,8 @@ public class ProducerConsumerBatch extends 
AbstractRecordBatch<ProducerConsumer>
       wrapper = queue.take();
       logger.debug("Got batch from queue");
     } catch (final InterruptedException e) {
-      if (context.shouldContinue()) {
-        context.fail(e);
+      if (context.getExecutorState().shouldContinue()) {
+        context.getExecutorState().fail(e);
       }
       return IterOutcome.STOP;
       // TODO InterruptedException

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatchCreator.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatchCreator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatchCreator.java
index 6542576..779728a 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatchCreator.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatchCreator.java
@@ -20,7 +20,7 @@ package org.apache.drill.exec.physical.impl.producer;
 import java.util.List;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
 import org.apache.drill.exec.physical.config.ProducerConsumer;
 import org.apache.drill.exec.physical.impl.BatchCreator;
 import org.apache.drill.exec.record.RecordBatch;
@@ -29,7 +29,7 @@ import com.google.common.collect.Iterables;
 
 public class ProducerConsumerBatchCreator implements 
BatchCreator<ProducerConsumer> {
   @Override
-  public ProducerConsumerBatch getBatch(FragmentContext context, 
ProducerConsumer config, List<RecordBatch> children)
+  public ProducerConsumerBatch getBatch(ExecutorFragmentContext context, 
ProducerConsumer config, List<RecordBatch> children)
       throws ExecutionSetupException {
     return new ProducerConsumerBatch(config, context, 
Iterables.getOnlyElement(children));
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ComplexToJsonBatchCreator.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ComplexToJsonBatchCreator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ComplexToJsonBatchCreator.java
index f249540..73ab441 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ComplexToJsonBatchCreator.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ComplexToJsonBatchCreator.java
@@ -20,7 +20,7 @@ package org.apache.drill.exec.physical.impl.project;
 import java.util.List;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
 import org.apache.drill.exec.physical.config.ComplexToJson;
 import org.apache.drill.exec.physical.config.Project;
 import org.apache.drill.exec.physical.impl.BatchCreator;
@@ -29,15 +29,12 @@ import org.apache.drill.exec.record.RecordBatch;
 import com.google.common.base.Preconditions;
 
 public class ComplexToJsonBatchCreator implements BatchCreator<ComplexToJson> {
-  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(ComplexToJsonBatchCreator.class);
-
   @Override
-  public ProjectRecordBatch getBatch(FragmentContext context, ComplexToJson 
flatten, List<RecordBatch> children)
+  public ProjectRecordBatch getBatch(ExecutorFragmentContext context, 
ComplexToJson flatten, List<RecordBatch> children)
       throws ExecutionSetupException {
     Preconditions.checkArgument(children.size() == 1);
     return new ProjectRecordBatch(new Project(null, flatten.getChild()),
                                   children.iterator().next(),
                                   context);
   }
-
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectBatchCreator.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectBatchCreator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectBatchCreator.java
index e7a6b05..37753cd 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectBatchCreator.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectBatchCreator.java
@@ -20,7 +20,7 @@ package org.apache.drill.exec.physical.impl.project;
 import java.util.List;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
 import org.apache.drill.exec.physical.config.Project;
 import org.apache.drill.exec.physical.impl.BatchCreator;
 import org.apache.drill.exec.record.RecordBatch;
@@ -28,13 +28,10 @@ import org.apache.drill.exec.record.RecordBatch;
 import com.google.common.base.Preconditions;
 
 public class ProjectBatchCreator implements BatchCreator<Project>{
-  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(ProjectBatchCreator.class);
-
   @Override
-  public ProjectRecordBatch getBatch(FragmentContext context, Project config, 
List<RecordBatch> children)
+  public ProjectRecordBatch getBatch(ExecutorFragmentContext context, Project 
config, List<RecordBatch> children)
       throws ExecutionSetupException {
     Preconditions.checkArgument(children.size() == 1);
     return new ProjectRecordBatch(config, children.iterator().next(), context);
   }
-
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
index be0f61f..89e0ee9 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
@@ -804,7 +804,7 @@ public class ProjectRecordBatch extends 
AbstractSingleRecordBatch<Project> {
     } catch (SchemaChangeException e) {
       kill(false);
       logger.error("Failure during query", e);
-      context.fail(e);
+      context.getExecutorState().fail(e);
       return IterOutcome.STOP;
     }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
index d711592..f38b62e 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
@@ -132,7 +132,7 @@ public class SortBatch extends AbstractRecordBatch<Sort> {
         return IterOutcome.NONE;
       }
 
-      builder.build(context, container);
+      builder.build(container);
       sorter = createNewSorter();
       sorter.setup(context, getSelectionVector4(), this.container);
       sorter.sort(getSelectionVector4(), this.container);
@@ -142,7 +142,7 @@ public class SortBatch extends AbstractRecordBatch<Sort> {
     } catch(SchemaChangeException | ClassTransformationException | IOException 
ex) {
       kill(false);
       logger.error("Failure during query", ex);
-      context.fail(ex);
+      context.getExecutorState().fail(ex);
       return IterOutcome.STOP;
     }
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatchCreator.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatchCreator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatchCreator.java
index 559558f..ccd5561 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatchCreator.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatchCreator.java
@@ -20,7 +20,7 @@ package org.apache.drill.exec.physical.impl.sort;
 import java.util.List;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
 import org.apache.drill.exec.physical.config.Sort;
 import org.apache.drill.exec.physical.impl.BatchCreator;
 import org.apache.drill.exec.record.RecordBatch;
@@ -28,14 +28,10 @@ import org.apache.drill.exec.record.RecordBatch;
 import com.google.common.base.Preconditions;
 
 public class SortBatchCreator implements BatchCreator<Sort>{
-  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(SortBatchCreator.class);
-
   @Override
-  public SortBatch getBatch(FragmentContext context, Sort config, 
List<RecordBatch> children)
+  public SortBatch getBatch(ExecutorFragmentContext context, Sort config, 
List<RecordBatch> children)
       throws ExecutionSetupException {
     Preconditions.checkArgument(children.size() == 1);
     return new SortBatch(config, context, children.iterator().next());
   }
-
-
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java
index 6b3de25..6c66c01 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java
@@ -27,7 +27,6 @@ import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.memory.AllocationReservation;
 import org.apache.drill.exec.memory.BaseAllocator;
 import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 import org.apache.drill.exec.record.MaterializedField;
@@ -136,10 +135,6 @@ public class SortRecordBatchBuilder implements 
AutoCloseable {
     return batches.isEmpty();
   }
 
-  public void build(FragmentContext context, VectorContainer outputContainer) 
throws SchemaChangeException {
-    build(outputContainer);
-  }
-
   @SuppressWarnings("resource")
   public void build(VectorContainer outputContainer) throws 
SchemaChangeException {
     outputContainer.clear();

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
index 4304c2c..66fe261 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
@@ -24,7 +24,6 @@ import 
org.apache.drill.exec.exception.ClassTransformationException;
 import org.apache.drill.exec.exception.OutOfMemoryException;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.expr.CodeGenerator;
-import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.config.SelectionVectorRemover;
 import org.apache.drill.exec.record.AbstractSingleRecordBatch;
@@ -235,10 +234,11 @@ public class RemovingRecordBatch extends 
AbstractSingleRecordBatch<SelectionVect
 
   private Copier getGenerated4Copier() throws SchemaChangeException {
     Preconditions.checkArgument(incoming.getSchema().getSelectionVectorMode() 
== SelectionVectorMode.FOUR_BYTE);
-    return getGenerated4Copier(incoming, context, oContext.getAllocator(), 
container, this, callBack);
+    return getGenerated4Copier(incoming, context, container, this, callBack);
   }
 
-  public static Copier getGenerated4Copier(RecordBatch batch, FragmentContext 
context, BufferAllocator allocator, VectorContainer container, RecordBatch 
outgoing, SchemaChangeCallBack callBack) throws SchemaChangeException{
+  public static Copier getGenerated4Copier(RecordBatch batch, FragmentContext 
context, VectorContainer container, RecordBatch outgoing,
+                                           SchemaChangeCallBack callBack) 
throws SchemaChangeException{
 
     for(VectorWrapper<?> vv : batch){
       @SuppressWarnings("resource")

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/SVRemoverCreator.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/SVRemoverCreator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/SVRemoverCreator.java
index 9ab39a3..4bf5b5c 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/SVRemoverCreator.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/SVRemoverCreator.java
@@ -20,22 +20,18 @@ package org.apache.drill.exec.physical.impl.svremover;
 import java.util.List;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
 import org.apache.drill.exec.physical.config.SelectionVectorRemover;
 import org.apache.drill.exec.physical.impl.BatchCreator;
 import org.apache.drill.exec.record.RecordBatch;
 
 import com.google.common.base.Preconditions;
 
-public class SVRemoverCreator implements BatchCreator<SelectionVectorRemover>{
-  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(SVRemoverCreator.class);
-
+public class SVRemoverCreator implements BatchCreator<SelectionVectorRemover> {
   @Override
-  public RemovingRecordBatch getBatch(FragmentContext context, 
SelectionVectorRemover config, List<RecordBatch> children)
+  public RemovingRecordBatch getBatch(ExecutorFragmentContext context, 
SelectionVectorRemover config, List<RecordBatch> children)
       throws ExecutionSetupException {
     Preconditions.checkArgument(children.size() == 1);
     return new RemovingRecordBatch(config, context, 
children.iterator().next());
   }
-
-
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceBatchCreator.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceBatchCreator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceBatchCreator.java
index 40ef2bb..dd2f6db 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceBatchCreator.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceBatchCreator.java
@@ -21,19 +21,15 @@ package org.apache.drill.exec.physical.impl.trace;
 import java.util.List;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
 import org.apache.drill.exec.physical.config.Trace;
 import org.apache.drill.exec.physical.impl.BatchCreator;
 import org.apache.drill.exec.record.RecordBatch;
 
 public class TraceBatchCreator implements BatchCreator<Trace> {
-  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(TraceBatchCreator.class);
-
   @Override
-  public TraceRecordBatch getBatch(FragmentContext context, Trace config, 
List<RecordBatch> children)
+  public TraceRecordBatch getBatch(ExecutorFragmentContext context, Trace 
config, List<RecordBatch> children)
       throws ExecutionSetupException {
-    // Preconditions.checkArgument(children.size() == 1);
     return new TraceRecordBatch(config, children.iterator().next(), context);
   }
-
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllBatchCreator.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllBatchCreator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllBatchCreator.java
index 1ef3142..bdc1a3d 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllBatchCreator.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllBatchCreator.java
@@ -20,22 +20,18 @@ package org.apache.drill.exec.physical.impl.union;
 import java.util.List;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
 import org.apache.drill.exec.physical.config.UnionAll;
 import org.apache.drill.exec.physical.impl.BatchCreator;
 import org.apache.drill.exec.record.RecordBatch;
 
 import com.google.common.base.Preconditions;
 
-public class UnionAllBatchCreator implements BatchCreator<UnionAll>{
-  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(UnionAllBatchCreator.class);
-
+public class UnionAllBatchCreator implements BatchCreator<UnionAll> {
   @Override
-  public UnionAllRecordBatch getBatch(FragmentContext context, UnionAll 
config, List<RecordBatch> children)
+  public UnionAllRecordBatch getBatch(ExecutorFragmentContext context, 
UnionAll config, List<RecordBatch> children)
       throws ExecutionSetupException {
     Preconditions.checkArgument(children.size() >= 1);
     return new UnionAllRecordBatch(config, children, context);
   }
-
-
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
index 1d1ecb0..b4d0e77 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
@@ -129,7 +129,7 @@ public class UnionAllRecordBatch extends 
AbstractBinaryRecordBatch<UnionAll> {
         }
       }
     } catch (ClassTransformationException | IOException | 
SchemaChangeException ex) {
-      context.fail(ex);
+      context.getExecutorState().fail(ex);
       killIncoming(false);
       return IterOutcome.STOP;
     }
@@ -168,8 +168,6 @@ public class UnionAllRecordBatch extends 
AbstractBinaryRecordBatch<UnionAll> {
 
     final ClassGenerator<UnionAller> cg = 
CodeGenerator.getRoot(UnionAller.TEMPLATE_DEFINITION, context.getOptions());
     cg.getCodeGenerator().plainJavaCapable(true);
-    // Uncomment out this line to debug the generated code.
-    //    cg.getCodeGenerator().saveCodeForDebugging(true);
 
     int index = 0;
     for(VectorWrapper<?> vw : inputBatch) {

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
index cfdc06d..9da8a4b 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
@@ -25,6 +25,7 @@ import java.util.Iterator;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.exception.OutOfMemoryException;
 import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.ops.ExchangeFragmentContext;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.MetricDef;
 import org.apache.drill.exec.ops.OperatorContext;
@@ -57,7 +58,7 @@ public class UnorderedReceiverBatch implements 
CloseableRecordBatch {
 
   private final RecordBatchLoader batchLoader;
   private final RawFragmentBatchProvider fragProvider;
-  private final FragmentContext context;
+  private final ExchangeFragmentContext context;
   private BatchSchema schema;
   private final OperatorStats stats;
   private boolean first = true;
@@ -74,7 +75,7 @@ public class UnorderedReceiverBatch implements 
CloseableRecordBatch {
     }
   }
 
-  public UnorderedReceiverBatch(final FragmentContext context, final 
RawFragmentBatchProvider fragProvider, final UnorderedReceiver config) throws 
OutOfMemoryException {
+  public UnorderedReceiverBatch(final ExchangeFragmentContext context, final 
RawFragmentBatchProvider fragProvider, final UnorderedReceiver config) throws 
OutOfMemoryException {
     this.fragProvider = fragProvider;
     this.context = context;
     // In normal case, batchLoader does not require an allocator. However, in 
case of splitAndTransfer of a value vector,
@@ -171,13 +172,13 @@ public class UnorderedReceiverBatch implements 
CloseableRecordBatch {
 
       if (batch == null) {
         batchLoader.clear();
-        if (!context.shouldContinue()) {
+        if (!context.getExecutorState().shouldContinue()) {
           return IterOutcome.STOP;
         }
         return IterOutcome.NONE;
       }
 
-      if (context.isOverMemoryLimit()) {
+      if (context.getAllocator().isOverLimit()) {
         return IterOutcome.OUT_OF_MEMORY;
       }
 
@@ -197,7 +198,7 @@ public class UnorderedReceiverBatch implements 
CloseableRecordBatch {
         return IterOutcome.OK;
       }
     } catch(SchemaChangeException | IOException ex) {
-      context.fail(ex);
+      context.getExecutorState().fail(ex);
       return IterOutcome.STOP;
     } finally {
       stats.stopProcessing();
@@ -233,7 +234,9 @@ public class UnorderedReceiverBatch implements 
CloseableRecordBatch {
               .setReceiver(context.getHandle())
               .setSender(sender)
               .build();
-      
context.getControlTunnel(providingEndpoint.getEndpoint()).informReceiverFinished(new
 OutcomeListener(), finishedReceiver);
+      context.getController()
+        .getTunnel(providingEndpoint.getEndpoint())
+        .informReceiverFinished(new OutcomeListener(), finishedReceiver);
     }
   }
 
@@ -252,12 +255,11 @@ public class UnorderedReceiverBatch implements 
CloseableRecordBatch {
 
     @Override
     public void interrupted(final InterruptedException e) {
-      if (context.shouldContinue()) {
+      if (context.getExecutorState().shouldContinue()) {
         final String errMsg = "Received an interrupt RPC outcome while sending 
ReceiverFinished message";
         logger.error(errMsg, e);
-        context.fail(new RpcException(errMsg, e));
+        context.getExecutorState().fail(new RpcException(errMsg, e));
       }
     }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverCreator.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverCreator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverCreator.java
index 6d4f1d7..01a4588 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverCreator.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverCreator.java
@@ -20,18 +20,18 @@ package 
org.apache.drill.exec.physical.impl.unorderedreceiver;
 import java.util.List;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
 import org.apache.drill.exec.physical.config.UnorderedReceiver;
 import org.apache.drill.exec.physical.impl.BatchCreator;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.work.batch.IncomingBuffers;
 import org.apache.drill.exec.work.batch.RawBatchBuffer;
 
-public class UnorderedReceiverCreator implements 
BatchCreator<UnorderedReceiver>{
+public class UnorderedReceiverCreator implements 
BatchCreator<UnorderedReceiver> {
 
   @SuppressWarnings("resource")
   @Override
-  public UnorderedReceiverBatch getBatch(FragmentContext context, 
UnorderedReceiver receiver, List<RecordBatch> children)
+  public UnorderedReceiverBatch getBatch(ExecutorFragmentContext context, 
UnorderedReceiver receiver, List<RecordBatch> children)
       throws ExecutionSetupException {
     assert children == null || children.isEmpty();
     IncomingBuffers bufHolder = context.getBuffers();

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorCreator.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorCreator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorCreator.java
index 4199191..e27f881 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorCreator.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorCreator.java
@@ -21,7 +21,7 @@ import java.util.List;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
 import org.apache.drill.exec.physical.config.IteratorValidator;
 import org.apache.drill.exec.physical.impl.BatchCreator;
 import org.apache.drill.exec.record.RecordBatch;
@@ -32,8 +32,8 @@ public class IteratorValidatorCreator implements 
BatchCreator<IteratorValidator>
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(IteratorValidatorCreator.class);
 
   @Override
-  public IteratorValidatorBatchIterator getBatch(FragmentContext context, 
IteratorValidator config,
-      List<RecordBatch> children)
+  public IteratorValidatorBatchIterator getBatch(ExecutorFragmentContext 
context, IteratorValidator config,
+                                                 List<RecordBatch> children)
       throws ExecutionSetupException {
     Preconditions.checkArgument(children.size() == 1);
     RecordBatch child = children.iterator().next();

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/values/ValuesBatchCreator.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/values/ValuesBatchCreator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/values/ValuesBatchCreator.java
index a8eddbc..c2bcab0 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/values/ValuesBatchCreator.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/values/ValuesBatchCreator.java
@@ -22,7 +22,7 @@ import java.util.List;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
 import org.apache.drill.exec.physical.config.Values;
 import org.apache.drill.exec.physical.impl.BatchCreator;
 import org.apache.drill.exec.physical.impl.ScanBatch;
@@ -30,11 +30,9 @@ import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.store.RecordReader;
 import org.apache.drill.exec.store.easy.json.JSONRecordReader;
 
-import com.google.common.collect.Iterators;
-
 public class ValuesBatchCreator implements BatchCreator<Values> {
   @Override
-  public ScanBatch getBatch(FragmentContext context, Values config, 
List<RecordBatch> children)
+  public ScanBatch getBatch(ExecutorFragmentContext context, Values config, 
List<RecordBatch> children)
       throws ExecutionSetupException {
     assert children.isEmpty();
 

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameBatchCreator.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameBatchCreator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameBatchCreator.java
index 59bc115..6ca9652 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameBatchCreator.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameBatchCreator.java
@@ -21,7 +21,7 @@ package org.apache.drill.exec.physical.impl.window;
 import java.util.List;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
 import org.apache.drill.exec.physical.config.WindowPOP;
 import org.apache.drill.exec.physical.impl.BatchCreator;
 import org.apache.drill.exec.record.RecordBatch;
@@ -29,9 +29,8 @@ import org.apache.drill.exec.record.RecordBatch;
 import com.google.common.base.Preconditions;
 
 public class WindowFrameBatchCreator implements BatchCreator<WindowPOP> {
-
   @Override
-  public WindowFrameRecordBatch getBatch(FragmentContext context, WindowPOP 
config, List<RecordBatch> children)
+  public WindowFrameRecordBatch getBatch(ExecutorFragmentContext context, 
WindowPOP config, List<RecordBatch> children)
       throws ExecutionSetupException {
     Preconditions.checkArgument(children.size() == 1);
     return new WindowFrameRecordBatch(config, context, 
children.iterator().next());

Reply via email to