DRILL-865: Interface changes to AbstractRecordBatch to enable easy collection 
of stats.
Add BaseRootExec as a wrapper to collect stats for senders.


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/3e98ffcb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/3e98ffcb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/3e98ffcb

Branch: refs/heads/master
Commit: 3e98ffcbaa5b3ae26db6cfac3d223049b1ad9358
Parents: 83fb40e
Author: Mehant Baid <[email protected]>
Authored: Wed May 21 23:47:08 2014 -0700
Committer: Jacques Nadeau <[email protected]>
Committed: Sun Jun 8 19:13:06 2014 -0700

----------------------------------------------------------------------
 .../drill/exec/physical/impl/BaseRootExec.java  |  48 +++++
 .../exec/physical/impl/TopN/TopNBatch.java      |   2 +-
 .../exec/physical/impl/WriterRecordBatch.java   | 155 ++++++++---------
 .../physical/impl/aggregate/HashAggBatch.java   |  97 +++++------
 .../impl/aggregate/StreamingAggBatch.java       |  79 ++++-----
 .../exec/physical/impl/join/HashJoinBatch.java  |   6 +-
 .../exec/physical/impl/join/MergeJoinBatch.java | 141 +++++++--------
 .../physical/impl/limit/LimitRecordBatch.java   |   4 +-
 .../impl/mergereceiver/MergingRecordBatch.java  |  10 +-
 .../OrderedPartitionRecordBatch.java            | 173 +++++++++----------
 .../PartitionSenderRootExec.java                |  10 +-
 .../impl/project/ProjectRecordBatch.java        |   4 +-
 .../exec/physical/impl/sort/SortBatch.java      |   2 +-
 .../impl/svremover/RemovingRecordBatch.java     |   4 +-
 .../physical/impl/union/UnionRecordBatch.java   |   2 +-
 .../physical/impl/xsort/ExternalSortBatch.java  |   2 +-
 .../drill/exec/record/AbstractRecordBatch.java  |  40 +++--
 .../exec/record/AbstractSingleRecordBatch.java  |  71 ++++----
 18 files changed, 426 insertions(+), 424 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3e98ffcb/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
new file mode 100644
index 0000000..0db8c07
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl;
+
+import org.apache.drill.exec.memory.OutOfMemoryException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.ops.OperatorStats;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.record.RecordBatch;
+
+public abstract class BaseRootExec<T extends PhysicalOperator> implements 
RootExec {
+
+  protected final OperatorStats stats;
+  protected final OperatorContext oContext;
+
+  public BaseRootExec(FragmentContext context, T operator) throws 
OutOfMemoryException {
+    oContext = new OperatorContext(operator, context);
+    stats = oContext.getStats();
+  }
+
+  @Override
+  public final boolean next() {
+    try {
+      stats.startProcessing();
+      return innerNext();
+    } finally {
+      stats.stopProcessing();
+    }
+  }
+
+  public abstract boolean innerNext();
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3e98ffcb/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
index 1c1a6d2..712311f 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
@@ -119,7 +119,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
   }
 
   @Override
-  public IterOutcome next() {
+  public IterOutcome innerNext() {
     if(schema != null){
       if(getSelectionVector4().next()){
         return IterOutcome.OK;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3e98ffcb/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
index 1113af4..3c4bc41 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
@@ -68,90 +68,83 @@ public class WriterRecordBatch extends 
AbstractRecordBatch<Writer> {
   }
 
   @Override
-  public IterOutcome next() {
-    stats.startProcessing();
-    try{
-
-      if(processed) {
-        // if the upstream record batch is already processed and next() is 
called by
-        // downstream then return NONE to indicate completion
-        return IterOutcome.NONE;
+  public IterOutcome innerNext() {
+    if(processed) {
+      // if the upstream record batch is already processed and next() is 
called by
+      // downstream then return NONE to indicate completion
+      return IterOutcome.NONE;
+    }
+
+    // process the complete upstream in one next() call
+    IterOutcome upstream;
+    do {
+      upstream = next(incoming);
+      if(first && upstream == IterOutcome.OK)
+        upstream = IterOutcome.OK_NEW_SCHEMA;
+      first = false;
+
+      switch(upstream) {
+        case NOT_YET:
+        case NONE:
+        case STOP:
+          cleanup();
+          if (upstream == IterOutcome.STOP)
+            return upstream;
+          break;
+
+        case OK_NEW_SCHEMA:
+          try{
+            setupNewSchema();
+          }catch(Exception ex){
+            kill();
+            logger.error("Failure during query", ex);
+            context.fail(ex);
+            return IterOutcome.STOP;
+          }
+          // fall through.
+        case OK:
+          try {
+            counter += eventBasedRecordWriter.write();
+            logger.debug("Total records written so far: {}", counter);
+          } catch(IOException ex) {
+            throw new RuntimeException(ex);
+          }
+
+          for(VectorWrapper v : incoming)
+            v.getValueVector().clear();
+
+          break;
+
+        default:
+          throw new UnsupportedOperationException();
       }
+    } while(upstream != IterOutcome.NONE);
 
-      // process the complete upstream in one next() call
-      IterOutcome upstream;
-      do {
-        upstream = next(incoming);
-        if(first && upstream == IterOutcome.OK)
-          upstream = IterOutcome.OK_NEW_SCHEMA;
-        first = false;
-
-        switch(upstream) {
-          case NOT_YET:
-          case NONE:
-          case STOP:
-            cleanup();
-            if (upstream == IterOutcome.STOP)
-              return upstream;
-            break;
-
-          case OK_NEW_SCHEMA:
-            try{
-              setupNewSchema();
-            }catch(Exception ex){
-              kill();
-              logger.error("Failure during query", ex);
-              context.fail(ex);
-              return IterOutcome.STOP;
-            }
-            // fall through.
-          case OK:
-            try {
-              counter += eventBasedRecordWriter.write();
-              logger.debug("Total records written so far: {}", counter);
-            } catch(IOException ex) {
-              throw new RuntimeException(ex);
-            }
-
-            for(VectorWrapper v : incoming)
-              v.getValueVector().clear();
-
-            break;
-
-          default:
-            throw new UnsupportedOperationException();
-        }
-      } while(upstream != IterOutcome.NONE);
-
-      // Create two vectors for:
-      //   1. Fragment unique id.
-      //   2. Summary: currently contains number of records written.
-      MaterializedField fragmentIdField = 
MaterializedField.create(SchemaPath.getSimplePath("Fragment"), 
Types.required(MinorType.VARCHAR));
-      MaterializedField summaryField = 
MaterializedField.create(SchemaPath.getSimplePath("Number of records written"), 
Types.required(MinorType.BIGINT));
-
-      VarCharVector fragmentIdVector = (VarCharVector) 
TypeHelper.getNewVector(fragmentIdField, context.getAllocator());
-      AllocationHelper.allocate(fragmentIdVector, 1, 
TypeHelper.getSize(Types.required(MinorType.VARCHAR)));
-      BigIntVector summaryVector = (BigIntVector) 
TypeHelper.getNewVector(summaryField, context.getAllocator());
-      AllocationHelper.allocate(summaryVector, 1, 
TypeHelper.getSize(Types.required(MinorType.VARCHAR)));
-
-
-      container.add(fragmentIdVector);
-      container.add(summaryVector);
-      container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
-
-      fragmentIdVector.getMutator().setSafe(0, fragmentUniqueId.getBytes());
-      fragmentIdVector.getMutator().setValueCount(1);
-      summaryVector.getMutator().setSafe(0, counter);
-      summaryVector.getMutator().setValueCount(1);
-
-      container.setRecordCount(1);
-      processed = true;
-
-      return IterOutcome.OK_NEW_SCHEMA;
-    }finally{
-      stats.stopProcessing();
-    }
+    // Create two vectors for:
+    //   1. Fragment unique id.
+    //   2. Summary: currently contains number of records written.
+    MaterializedField fragmentIdField = 
MaterializedField.create(SchemaPath.getSimplePath("Fragment"), 
Types.required(MinorType.VARCHAR));
+    MaterializedField summaryField = 
MaterializedField.create(SchemaPath.getSimplePath("Number of records written"), 
Types.required(MinorType.BIGINT));
+
+    VarCharVector fragmentIdVector = (VarCharVector) 
TypeHelper.getNewVector(fragmentIdField, context.getAllocator());
+    AllocationHelper.allocate(fragmentIdVector, 1, 
TypeHelper.getSize(Types.required(MinorType.VARCHAR)));
+    BigIntVector summaryVector = (BigIntVector) 
TypeHelper.getNewVector(summaryField, context.getAllocator());
+    AllocationHelper.allocate(summaryVector, 1, 
TypeHelper.getSize(Types.required(MinorType.VARCHAR)));
+
+
+    container.add(fragmentIdVector);
+    container.add(summaryVector);
+    container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
+
+    fragmentIdVector.getMutator().setSafe(0, fragmentUniqueId.getBytes());
+    fragmentIdVector.getMutator().setValueCount(1);
+    summaryVector.getMutator().setSafe(0, counter);
+    summaryVector.getMutator().setValueCount(1);
+
+    container.setRecordCount(1);
+    processed = true;
 
+    return IterOutcome.OK_NEW_SCHEMA;
   }
 
   protected void setupNewSchema() throws Exception {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3e98ffcb/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
index 4478938..ad929a4 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
@@ -86,65 +86,60 @@ public class HashAggBatch extends 
AbstractRecordBatch<HashAggregate> {
   }
 
   @Override
-  public IterOutcome next() {
-    stats.startProcessing();
-    try{
-      // this is only called on the first batch. Beyond this, the aggregator 
manages batches.
-      if (aggregator == null) {
-        IterOutcome outcome = next(incoming);
-        logger.debug("Next outcome of {}", outcome);
-        switch (outcome) {
-        case NONE:
-        case NOT_YET:
-        case STOP:
-          return outcome;
-        case OK_NEW_SCHEMA:
-          if (!createAggregator()){
-            done = true;
-            return IterOutcome.STOP;
-          }
-          break;
-        case OK:
-          throw new IllegalStateException("You should never get a first batch 
without a new schema");
-        default:
-          throw new IllegalStateException(String.format("unknown outcome %s", 
outcome));
+  public IterOutcome innerNext() {
+    // this is only called on the first batch. Beyond this, the aggregator 
manages batches.
+    if (aggregator == null) {
+      IterOutcome outcome = next(incoming);
+      logger.debug("Next outcome of {}", outcome);
+      switch (outcome) {
+      case NONE:
+      case NOT_YET:
+      case STOP:
+        return outcome;
+      case OK_NEW_SCHEMA:
+        if (!createAggregator()){
+          done = true;
+          return IterOutcome.STOP;
         }
+        break;
+      case OK:
+        throw new IllegalStateException("You should never get a first batch 
without a new schema");
+      default:
+        throw new IllegalStateException(String.format("unknown outcome %s", 
outcome));
       }
+    }
 
-      if (aggregator.allFlushed()) {
-        return IterOutcome.NONE;
-      }
-
-    if (aggregator.buildComplete() && ! aggregator.allFlushed()) {
-      // aggregation is complete and not all records have been output yet
-      return aggregator.outputCurrentBatch();    
+    if (aggregator.allFlushed()) {
+      return IterOutcome.NONE;
     }
 
-    logger.debug("Starting aggregator doWork; incoming record count = {} ", 
incoming.getRecordCount());   
+  if (aggregator.buildComplete() && ! aggregator.allFlushed()) {
+    // aggregation is complete and not all records have been output yet
+    return aggregator.outputCurrentBatch();
+  }
 
-      while(true){
-        AggOutcome out = aggregator.doWork();
-        logger.debug("Aggregator response {}, records {}", out, 
aggregator.getOutputCount());
-        switch(out){
-        case CLEANUP_AND_RETURN:
-          container.clear();
-          aggregator.cleanup();
-          done = true;
-          return aggregator.getOutcome();
-        case RETURN_OUTCOME:
-          return aggregator.getOutcome();
-        case UPDATE_AGGREGATOR:
-          aggregator = null;
-          if(!createAggregator()){
-            return IterOutcome.STOP;
-          }
-          continue;
-        default:
-          throw new IllegalStateException(String.format("Unknown state %s.", 
out));
+  logger.debug("Starting aggregator doWork; incoming record count = {} ", 
incoming.getRecordCount());
+
+    while(true){
+      AggOutcome out = aggregator.doWork();
+      logger.debug("Aggregator response {}, records {}", out, 
aggregator.getOutputCount());
+      switch(out){
+      case CLEANUP_AND_RETURN:
+        container.clear();
+        aggregator.cleanup();
+        done = true;
+        return aggregator.getOutcome();
+      case RETURN_OUTCOME:
+        return aggregator.getOutcome();
+      case UPDATE_AGGREGATOR:
+        aggregator = null;
+        if(!createAggregator()){
+          return IterOutcome.STOP;
         }
+        continue;
+      default:
+        throw new IllegalStateException(String.format("Unknown state %s.", 
out));
       }
-      }finally{
-      stats.stopProcessing();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3e98ffcb/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
index 5b61a82..8cad91b 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
@@ -78,53 +78,48 @@ public class StreamingAggBatch extends 
AbstractRecordBatch<StreamingAggregate> {
   }
 
   @Override
-  public IterOutcome next() {
-    stats.startProcessing();
-    try{
-      // this is only called on the first batch. Beyond this, the aggregator 
manages batches.
-      if (aggregator == null) {
-        IterOutcome outcome = next(incoming);
-        logger.debug("Next outcome of {}", outcome);
-        switch (outcome) {
-        case NONE:
-        case NOT_YET:
-        case STOP:
-          return outcome;
-        case OK_NEW_SCHEMA:
-          if (!createAggregator()){
-            done = true;
-            return IterOutcome.STOP;
-          }
-          break;
-        case OK:
-          throw new IllegalStateException("You should never get a first batch 
without a new schema");
-        default:
-          throw new IllegalStateException(String.format("unknown outcome %s", 
outcome));
+  public IterOutcome innerNext() {
+    // this is only called on the first batch. Beyond this, the aggregator 
manages batches.
+    if (aggregator == null) {
+      IterOutcome outcome = next(incoming);
+      logger.debug("Next outcome of {}", outcome);
+      switch (outcome) {
+      case NONE:
+      case NOT_YET:
+      case STOP:
+        return outcome;
+      case OK_NEW_SCHEMA:
+        if (!createAggregator()){
+          done = true;
+          return IterOutcome.STOP;
         }
+        break;
+      case OK:
+        throw new IllegalStateException("You should never get a first batch 
without a new schema");
+      default:
+        throw new IllegalStateException(String.format("unknown outcome %s", 
outcome));
       }
+    }
 
-      while(true){
-        AggOutcome out = aggregator.doWork();
-        logger.debug("Aggregator response {}, records {}", out, 
aggregator.getOutputCount());
-        switch(out){
-        case CLEANUP_AND_RETURN:
-          container.clear();
-          done = true;
-          return aggregator.getOutcome();
-        case RETURN_OUTCOME:
-          return aggregator.getOutcome();
-        case UPDATE_AGGREGATOR:
-          aggregator = null;
-          if(!createAggregator()){
-            return IterOutcome.STOP;
-          }
-          continue;
-        default:
-          throw new IllegalStateException(String.format("Unknown state %s.", 
out));
+    while(true){
+      AggOutcome out = aggregator.doWork();
+      logger.debug("Aggregator response {}, records {}", out, 
aggregator.getOutputCount());
+      switch(out){
+      case CLEANUP_AND_RETURN:
+        container.clear();
+        done = true;
+        return aggregator.getOutcome();
+      case RETURN_OUTCOME:
+        return aggregator.getOutcome();
+      case UPDATE_AGGREGATOR:
+        aggregator = null;
+        if(!createAggregator()){
+          return IterOutcome.STOP;
         }
+        continue;
+      default:
+        throw new IllegalStateException(String.format("Unknown state %s.", 
out));
       }
-    }finally{
-      stats.stopProcessing();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3e98ffcb/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index 2ea9339..684965d 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -143,8 +143,7 @@ public class HashJoinBatch extends 
AbstractRecordBatch<HashJoinPOP> {
 
 
     @Override
-    public IterOutcome next() {
-        stats.startProcessing();
+    public IterOutcome innerNext() {
         try {
             /* If we are here for the first time, execute the build phase of 
the
              * hash join and setup the run time generated class for the probe 
side
@@ -225,10 +224,7 @@ public class HashJoinBatch extends 
AbstractRecordBatch<HashJoinPOP> {
             context.fail(e);
             killIncoming();
             return IterOutcome.STOP;
-        } finally{
-          stats.stopProcessing();
         }
-
     }
 
     public void setupHashTable() throws IOException, SchemaChangeException, 
ClassTransformationException {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3e98ffcb/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
index b284454..587dd6a 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
@@ -130,87 +130,80 @@ public class MergeJoinBatch extends 
AbstractRecordBatch<MergeJoinPOP> {
   }
 
   @Override
-  public IterOutcome next() {
-    stats.startProcessing();
-
-    try{
-      // we do this in the here instead of the constructor because don't 
necessary want to start consuming on construction.
-      status.ensureInitial();
-
-      // loop so we can start over again if we find a new batch was created.
-      while(true){
-
-        JoinOutcome outcome = status.getOutcome();
-        // if the previous outcome was a change in schema or we sent a batch, 
we have to set up a new batch.
-        if (outcome == JoinOutcome.BATCH_RETURNED ||
-            outcome == JoinOutcome.SCHEMA_CHANGED)
-          allocateBatch();
-
-        // reset the output position to zero after our parent iterates this 
RecordBatch
-        if (outcome == JoinOutcome.BATCH_RETURNED ||
-            outcome == JoinOutcome.SCHEMA_CHANGED ||
-            outcome == JoinOutcome.NO_MORE_DATA)
-          status.resetOutputPos();
-
-        if (outcome == JoinOutcome.NO_MORE_DATA) {
-          logger.debug("NO MORE DATA; returning {}  NONE");
-          return IterOutcome.NONE;
-        }
+  public IterOutcome innerNext() {
+    // we do this in the here instead of the constructor because don't 
necessary want to start consuming on construction.
+    status.ensureInitial();
+
+    // loop so we can start over again if we find a new batch was created.
+    while(true){
+
+      JoinOutcome outcome = status.getOutcome();
+      // if the previous outcome was a change in schema or we sent a batch, we 
have to set up a new batch.
+      if (outcome == JoinOutcome.BATCH_RETURNED ||
+          outcome == JoinOutcome.SCHEMA_CHANGED)
+        allocateBatch();
+
+      // reset the output position to zero after our parent iterates this 
RecordBatch
+      if (outcome == JoinOutcome.BATCH_RETURNED ||
+          outcome == JoinOutcome.SCHEMA_CHANGED ||
+          outcome == JoinOutcome.NO_MORE_DATA)
+        status.resetOutputPos();
+
+      if (outcome == JoinOutcome.NO_MORE_DATA) {
+        logger.debug("NO MORE DATA; returning {}  NONE");
+        return IterOutcome.NONE;
+      }
 
-        boolean first = false;
-        if(worker == null){
-          try {
-            logger.debug("Creating New Worker");
-            stats.startSetup();
-            this.worker = generateNewWorker();
-            first = true;
-            stats.stopSetup();
-          } catch (ClassTransformationException | IOException | 
SchemaChangeException e) {
-            stats.stopSetup();
-            context.fail(new SchemaChangeException(e));
-            kill();
-            return IterOutcome.STOP;
-          }
+      boolean first = false;
+      if(worker == null){
+        try {
+          logger.debug("Creating New Worker");
+          stats.startSetup();
+          this.worker = generateNewWorker();
+          first = true;
+          stats.stopSetup();
+        } catch (ClassTransformationException | IOException | 
SchemaChangeException e) {
+          stats.stopSetup();
+          context.fail(new SchemaChangeException(e));
+          kill();
+          return IterOutcome.STOP;
         }
+      }
 
-        // join until we have a complete outgoing batch
-        if (!worker.doJoin(status))
-          worker = null;
-
-        // get the outcome of the join.
-        switch(status.getOutcome()){
-        case BATCH_RETURNED:
-          // only return new schema if new worker has been setup.
-          logger.debug("BATCH RETURNED; returning {}", (first ? 
"OK_NEW_SCHEMA" : "OK"));
+      // join until we have a complete outgoing batch
+      if (!worker.doJoin(status))
+        worker = null;
+
+      // get the outcome of the join.
+      switch(status.getOutcome()){
+      case BATCH_RETURNED:
+        // only return new schema if new worker has been setup.
+        logger.debug("BATCH RETURNED; returning {}", (first ? "OK_NEW_SCHEMA" 
: "OK"));
+        setRecordCountInContainer();
+        return first ? IterOutcome.OK_NEW_SCHEMA : IterOutcome.OK;
+      case FAILURE:
+        kill();
+        return IterOutcome.STOP;
+      case NO_MORE_DATA:
+        logger.debug("NO MORE DATA; returning {}", (status.getOutPosition() > 
0 ? (first ? "OK_NEW_SCHEMA" : "OK") : "NONE"));
+        setRecordCountInContainer();
+        return status.getOutPosition() > 0 ? (first ? 
IterOutcome.OK_NEW_SCHEMA : IterOutcome.OK): IterOutcome.NONE;
+      case SCHEMA_CHANGED:
+        worker = null;
+        if(status.getOutPosition() > 0){
+          // if we have current data, let's return that.
+          logger.debug("SCHEMA CHANGED; returning {} ", (first ? 
"OK_NEW_SCHEMA" : "OK"));
           setRecordCountInContainer();
           return first ? IterOutcome.OK_NEW_SCHEMA : IterOutcome.OK;
-        case FAILURE:
-          kill();
-          return IterOutcome.STOP;
-        case NO_MORE_DATA:
-          logger.debug("NO MORE DATA; returning {}", (status.getOutPosition() 
> 0 ? (first ? "OK_NEW_SCHEMA" : "OK") : "NONE"));
-          setRecordCountInContainer();
-          return status.getOutPosition() > 0 ? (first ? 
IterOutcome.OK_NEW_SCHEMA : IterOutcome.OK): IterOutcome.NONE;
-        case SCHEMA_CHANGED:
-          worker = null;
-          if(status.getOutPosition() > 0){
-            // if we have current data, let's return that.
-            logger.debug("SCHEMA CHANGED; returning {} ", (first ? 
"OK_NEW_SCHEMA" : "OK"));
-            setRecordCountInContainer();
-            return first ? IterOutcome.OK_NEW_SCHEMA : IterOutcome.OK;
-          }else{
-            // loop again to rebuild worker.
-            continue;
-          }
-        case WAITING:
-          return IterOutcome.NOT_YET;
-        default:
-          throw new IllegalStateException();
+        }else{
+          // loop again to rebuild worker.
+          continue;
         }
+      case WAITING:
+        return IterOutcome.NOT_YET;
+      default:
+        throw new IllegalStateException();
       }
-
-    }finally{
-      stats.stopProcessing();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3e98ffcb/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
index 648fd89..3e408bd 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
@@ -78,7 +78,7 @@ public class LimitRecordBatch extends 
AbstractSingleRecordBatch<Limit> {
   }
 
   @Override
-  public IterOutcome next() {
+  public IterOutcome innerNext() {
     if(!noEndLimit && recordsLeft <= 0) {
       // don't kill incoming batches or call cleanup yet, as this could close 
allocators before the buffers have been cleared
       // Drain the incoming record batch and clear the memory
@@ -96,7 +96,7 @@ public class LimitRecordBatch extends 
AbstractSingleRecordBatch<Limit> {
       return IterOutcome.NONE;
     }
 
-    return super.next();
+    return super.innerNext();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3e98ffcb/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
index be5bf76..07a949c 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
@@ -121,10 +121,7 @@ public class MergingRecordBatch extends 
AbstractRecordBatch<MergingReceiverPOP>
   }
 
   @Override
-  public IterOutcome next() {
-    stats.startProcessing();
-    try{
-
+  public IterOutcome innerNext() {
     if (fragProviders.length == 0) return IterOutcome.NONE;
     boolean schemaChanged = false;
 
@@ -336,11 +333,6 @@ public class MergingRecordBatch extends 
AbstractRecordBatch<MergingReceiverPOP>
       return IterOutcome.OK_NEW_SCHEMA;
     else
       return IterOutcome.OK;
-
-    }finally{
-      stats.stopProcessing();
-    }
-
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3e98ffcb/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
index 4c0d3e0..f677e54 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
@@ -424,103 +424,98 @@ public class OrderedPartitionRecordBatch extends 
AbstractRecordBatch<OrderedPart
   }
 
   @Override
-  public IterOutcome next() {
-    stats.startProcessing();
-    try{
-      container.zeroVectors();
-
-      // if we got IterOutcome.NONE while getting partition vectors, and there 
are no batches on the queue, then we are
-      // done
-      if (upstreamNone && (batchQueue == null || batchQueue.size() == 0))
-        return IterOutcome.NONE;
-
-      // if there are batches on the queue, process them first, rather than 
calling incoming.next()
-      if (batchQueue != null && batchQueue.size() > 0) {
-        VectorContainer vc = batchQueue.poll();
-        recordCount = vc.getRecordCount();
-        try {
-
-          // Must set up a new schema each time, because ValueVectors are not 
reused between containers in queue
-          setupNewSchema(vc);
-        } catch (SchemaChangeException ex) {
-          kill();
-          logger.error("Failure during query", ex);
-          context.fail(ex);
-          return IterOutcome.STOP;
-        }
-        doWork(vc);
-        vc.zeroVectors();
-        return IterOutcome.OK_NEW_SCHEMA;
+  public IterOutcome innerNext() {
+    container.zeroVectors();
+
+    // if we got IterOutcome.NONE while getting partition vectors, and there 
are no batches on the queue, then we are
+    // done
+    if (upstreamNone && (batchQueue == null || batchQueue.size() == 0))
+      return IterOutcome.NONE;
+
+    // if there are batches on the queue, process them first, rather than 
calling incoming.next()
+    if (batchQueue != null && batchQueue.size() > 0) {
+      VectorContainer vc = batchQueue.poll();
+      recordCount = vc.getRecordCount();
+      try {
+
+        // Must set up a new schema each time, because ValueVectors are not 
reused between containers in queue
+        setupNewSchema(vc);
+      } catch (SchemaChangeException ex) {
+        kill();
+        logger.error("Failure during query", ex);
+        context.fail(ex);
+        return IterOutcome.STOP;
       }
+      doWork(vc);
+      vc.zeroVectors();
+      return IterOutcome.OK_NEW_SCHEMA;
+    }
 
-      // Reaching this point, either this is the first iteration, or there are 
no batches left on the queue and there are
-      // more incoming
-      IterOutcome upstream = next(incoming);
+    // Reaching this point, either this is the first iteration, or there are 
no batches left on the queue and there are
+    // more incoming
+    IterOutcome upstream = next(incoming);
 
-      if (this.first && upstream == IterOutcome.OK) {
-        throw new RuntimeException("Invalid state: First batch should have 
OK_NEW_SCHEMA");
-      }
-
-      // If this is the first iteration, we need to generate the partition 
vectors before we can proceed
-      if (this.first && upstream == IterOutcome.OK_NEW_SCHEMA) {
-        if (!getPartitionVectors()){
-          cleanup();
-          return IterOutcome.STOP;
-        }
+    if (this.first && upstream == IterOutcome.OK) {
+      throw new RuntimeException("Invalid state: First batch should have 
OK_NEW_SCHEMA");
+    }
 
-        batchQueue = new LinkedBlockingQueue<>(this.sampledIncomingBatches);
-        first = false;
-
-        // Now that we have the partition vectors, we immediately process the 
first batch on the queue
-        VectorContainer vc = batchQueue.poll();
-        try {
-          setupNewSchema(vc);
-        } catch (SchemaChangeException ex) {
-          kill();
-          logger.error("Failure during query", ex);
-          context.fail(ex);
-          return IterOutcome.STOP;
-        }
-        doWork(vc);
-        vc.zeroVectors();
-        recordCount = vc.getRecordCount();
-        return IterOutcome.OK_NEW_SCHEMA;
+    // If this is the first iteration, we need to generate the partition 
vectors before we can proceed
+    if (this.first && upstream == IterOutcome.OK_NEW_SCHEMA) {
+      if (!getPartitionVectors()){
+        cleanup();
+        return IterOutcome.STOP;
       }
 
-      // if this now that all the batches on the queue are processed, we begin 
processing the incoming batches. For the
-      // first one
-      // we need to generate a new schema, even if the outcome is 
IterOutcome.OK After that we can reuse the schema.
-      if (this.startedUnsampledBatches == false) {
-        this.startedUnsampledBatches = true;
-        if (upstream == IterOutcome.OK)
-          upstream = IterOutcome.OK_NEW_SCHEMA;
+      batchQueue = new LinkedBlockingQueue<>(this.sampledIncomingBatches);
+      first = false;
+
+      // Now that we have the partition vectors, we immediately process the 
first batch on the queue
+      VectorContainer vc = batchQueue.poll();
+      try {
+        setupNewSchema(vc);
+      } catch (SchemaChangeException ex) {
+        kill();
+        logger.error("Failure during query", ex);
+        context.fail(ex);
+        return IterOutcome.STOP;
       }
-      switch (upstream) {
-      case NONE:
-      case NOT_YET:
-      case STOP:
-        cleanup();
-        recordCount = 0;
-        return upstream;
-      case OK_NEW_SCHEMA:
-        try {
-          setupNewSchema(incoming);
-        } catch (SchemaChangeException ex) {
-          kill();
-          logger.error("Failure during query", ex);
-          context.fail(ex);
-          return IterOutcome.STOP;
-        }
-        // fall through.
-      case OK:
-        doWork(incoming);
-        recordCount = incoming.getRecordCount();
-        return upstream; // change if upstream changed, otherwise normal.
-      default:
-        throw new UnsupportedOperationException();
+      doWork(vc);
+      vc.zeroVectors();
+      recordCount = vc.getRecordCount();
+      return IterOutcome.OK_NEW_SCHEMA;
+    }
+
+    // if this now that all the batches on the queue are processed, we begin 
processing the incoming batches. For the
+    // first one
+    // we need to generate a new schema, even if the outcome is IterOutcome.OK 
After that we can reuse the schema.
+    if (this.startedUnsampledBatches == false) {
+      this.startedUnsampledBatches = true;
+      if (upstream == IterOutcome.OK)
+        upstream = IterOutcome.OK_NEW_SCHEMA;
+    }
+    switch (upstream) {
+    case NONE:
+    case NOT_YET:
+    case STOP:
+      cleanup();
+      recordCount = 0;
+      return upstream;
+    case OK_NEW_SCHEMA:
+      try {
+        setupNewSchema(incoming);
+      } catch (SchemaChangeException ex) {
+        kill();
+        logger.error("Failure during query", ex);
+        context.fail(ex);
+        return IterOutcome.STOP;
       }
-    }finally{
-      stats.stopProcessing();
+      // fall through.
+    case OK:
+      doWork(incoming);
+      recordCount = incoming.getRecordCount();
+      return upstream; // change if upstream changed, otherwise normal.
+    default:
+      throw new UnsupportedOperationException();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3e98ffcb/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
index ffb3780..5476a50 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
@@ -33,6 +33,7 @@ import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.ops.OperatorStats;
 import org.apache.drill.exec.physical.config.HashPartitionSender;
+import org.apache.drill.exec.physical.impl.BaseRootExec;
 import org.apache.drill.exec.physical.impl.RootExec;
 import org.apache.drill.exec.physical.impl.SendingAccountor;
 import org.apache.drill.exec.physical.impl.svremover.RemovingRecordBatch;
@@ -48,17 +49,15 @@ import com.sun.codemodel.JType;
 import org.apache.drill.exec.vector.CopyUtil;
 
 
-public class PartitionSenderRootExec implements RootExec {
+public class PartitionSenderRootExec extends BaseRootExec {
 
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(PartitionSenderRootExec.class);
   private RecordBatch incoming;
   private HashPartitionSender operator;
   private Partitioner partitioner;
   private FragmentContext context;
-  private OperatorContext oContext;
   private boolean ok = true;
   private final SendingAccountor sendCount = new SendingAccountor();
-  private final OperatorStats stats;
   private final int outGoingBatchCount;
   private final HashPartitionSender popConfig;
   private final StatusHandler statusHandler;
@@ -68,18 +67,17 @@ public class PartitionSenderRootExec implements RootExec {
                                  RecordBatch incoming,
                                  HashPartitionSender operator) throws 
OutOfMemoryException {
 
+    super(context, operator);
     this.incoming = incoming;
     this.operator = operator;
     this.context = context;
-    this.oContext = new OperatorContext(operator, context);
-    this.stats = oContext.getStats();
     this.outGoingBatchCount = operator.getDestinations().size();
     this.popConfig = operator;
     this.statusHandler = new StatusHandler(sendCount, context);
   }
 
   @Override
-  public boolean next() {
+  public boolean innerNext() {
     boolean newSchema = false;
 
     if (!ok) {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3e98ffcb/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
index 05a6724..93cd19d 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
@@ -75,12 +75,12 @@ public class ProjectRecordBatch extends 
AbstractSingleRecordBatch<Project>{
   }
 
   @Override
-  public IterOutcome next() {
+  public IterOutcome innerNext() {
     if (hasRemainder) {
       handleRemainder();
       return IterOutcome.OK;
     }
-    return super.next();
+    return super.innerNext();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3e98ffcb/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
index 375276e..f21673d 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
@@ -98,7 +98,7 @@ public class SortBatch extends AbstractRecordBatch<Sort> {
   }
 
   @Override
-  public IterOutcome next() {
+  public IterOutcome innerNext() {
     if(schema != null){
       if(getSelectionVector4().next()){
         return IterOutcome.OK;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3e98ffcb/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
index fd06de1..f3388dc 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
@@ -83,12 +83,12 @@ public class RemovingRecordBatch extends 
AbstractSingleRecordBatch<SelectionVect
   }
 
   @Override
-  public IterOutcome next() {
+  public IterOutcome innerNext() {
     if (hasRemainder) {
       handleRemainder();
       return IterOutcome.OK;
     }
-    return super.next();
+    return super.innerNext();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3e98ffcb/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionRecordBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionRecordBatch.java
index c27b3c8..d515323 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionRecordBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionRecordBatch.java
@@ -80,7 +80,7 @@ public class UnionRecordBatch extends 
AbstractRecordBatch<Union> {
   }
 
   @Override
-  public IterOutcome next() {
+  public IterOutcome innerNext() {
     if (current == null) { // end of iteration
       return IterOutcome.NONE;
     }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3e98ffcb/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
index 7a2b251..d4c0b25 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
@@ -159,7 +159,7 @@ public class ExternalSortBatch extends 
AbstractRecordBatch<ExternalSort> {
   }
 
   @Override
-  public IterOutcome next() {
+  public IterOutcome innerNext() {
     if(schema != null){
       if (spillCount == 0) {
         if(schema != null){

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3e98ffcb/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
index 53b223e..7b7b708 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
@@ -59,32 +59,36 @@ public abstract class AbstractRecordBatch<T extends 
PhysicalOperator> implements
     return popConfig;
   }
 
-  public final IterOutcome next(RecordBatch b){
+  public final IterOutcome next(RecordBatch b) {
+
     return next(0, b);
   }
 
   public final IterOutcome next(int inputIndex, RecordBatch b){
-    stats.stopProcessing();
-    try{
-      IterOutcome next = b.next();
-
-      switch(next){
-      case OK_NEW_SCHEMA:
-        stats.batchReceived(inputIndex, b.getRecordCount(), true);
-        break;
-      case OK:
-        stats.batchReceived(inputIndex, b.getRecordCount(), false);
-        break;
-      }
-
-      return next;
-
-    }finally{
-      stats.startProcessing();
+    IterOutcome next = b.next();
+
+    switch(next){
+    case OK_NEW_SCHEMA:
+      stats.batchReceived(inputIndex, b.getRecordCount(), true);
+      break;
+    case OK:
+      stats.batchReceived(inputIndex, b.getRecordCount(), false);
+      break;
     }
+    return next;
+  }
 
+  public final IterOutcome next() {
+    try {
+      stats.startProcessing();
+      return innerNext();
+    } finally {
+      stats.stopProcessing();
+    }
   }
 
+  public abstract IterOutcome innerNext();
+
   @Override
   public BatchSchema getSchema() {
     return container.getSchema();

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3e98ffcb/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
index d897a78..c5fdaeb 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
@@ -41,47 +41,40 @@ public abstract class AbstractSingleRecordBatch<T extends 
PhysicalOperator> exte
   }
 
   @Override
-  public IterOutcome next() {
-    try{
-      stats.startProcessing();
-      IterOutcome upstream = next(incoming);
-      if(first && upstream == IterOutcome.OK) upstream = 
IterOutcome.OK_NEW_SCHEMA;
-      first = false;
-      switch(upstream){
-      case NONE:
-      case NOT_YET:
-      case STOP:
-        return upstream;
-      case OUT_OF_MEMORY:
-        return upstream;
-      case OK_NEW_SCHEMA:
-        try{
-          stats.startSetup();
-          setupNewSchema();
-        }catch(SchemaChangeException ex){
-          kill();
-          logger.error("Failure during query", ex);
-          context.fail(ex);
-          return IterOutcome.STOP;
-        }finally{
-          stats.stopSetup();
-        }
-        // fall through.
-      case OK:
-        doWork();
-        if (outOfMemory) {
-          outOfMemory = false;
-          return IterOutcome.OUT_OF_MEMORY;
-        }
-        return upstream; // change if upstream changed, otherwise normal.
-      default:
-        throw new UnsupportedOperationException();
+  public IterOutcome innerNext() {
+    IterOutcome upstream = next(incoming);
+    if(first && upstream == IterOutcome.OK) upstream = 
IterOutcome.OK_NEW_SCHEMA;
+    first = false;
+    switch(upstream){
+    case NONE:
+    case NOT_YET:
+    case STOP:
+      return upstream;
+    case OUT_OF_MEMORY:
+      return upstream;
+    case OK_NEW_SCHEMA:
+      try{
+        stats.startSetup();
+        setupNewSchema();
+      }catch(SchemaChangeException ex){
+        kill();
+        logger.error("Failure during query", ex);
+        context.fail(ex);
+        return IterOutcome.STOP;
+      }finally{
+        stats.stopSetup();
       }
-    }finally{
-      stats.stopProcessing();
+      // fall through.
+    case OK:
+      doWork();
+      if (outOfMemory) {
+        outOfMemory = false;
+        return IterOutcome.OUT_OF_MEMORY;
+      }
+      return upstream; // change if upstream changed, otherwise normal.
+    default:
+      throw new UnsupportedOperationException();
     }
-
-
   }
 
   @Override

Reply via email to