DRILL-1382: Fast schema return

Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/63d3008e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/63d3008e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/63d3008e

Branch: refs/heads/master
Commit: 63d3008e120a12d9167208a6db0faa950f9a618b
Parents: 451dd60
Author: Steven Phillips <[email protected]>
Authored: Fri Oct 10 17:03:19 2014 -0700
Committer: Steven Phillips <[email protected]>
Committed: Mon Oct 27 06:37:26 2014 -0700

----------------------------------------------------------------------
 .../org/apache/drill/exec/memory/Accountor.java |  13 +-
 .../drill/exec/physical/impl/RootExec.java      |   6 +
 .../drill/exec/physical/impl/ScanBatch.java     |  44 ++++--
 .../drill/exec/physical/impl/ScreenCreator.java |  43 ++++--
 .../exec/physical/impl/SingleSenderCreator.java |  31 ++++
 .../impl/TopN/PriorityQueueTemplate.java        |   1 +
 .../exec/physical/impl/TopN/TopNBatch.java      |  47 ++++--
 .../exec/physical/impl/WriterRecordBatch.java   |  53 ++++---
 .../physical/impl/aggregate/HashAggBatch.java   |  44 ++++--
 .../impl/aggregate/StreamingAggBatch.java       |  24 +++-
 .../impl/aggregate/StreamingAggTemplate.java    |   2 +-
 .../BroadcastSenderRootExec.java                |  29 ++++
 .../exec/physical/impl/common/HashTable.java    |   2 +
 .../physical/impl/common/HashTableTemplate.java |   9 +-
 .../physical/impl/filter/FilterRecordBatch.java |  17 ++-
 .../exec/physical/impl/join/HashJoinBatch.java  |  85 ++++++-----
 .../impl/join/HashJoinProbeTemplate.java        |  14 +-
 .../exec/physical/impl/join/JoinStatus.java     |   8 ++
 .../exec/physical/impl/join/MergeJoinBatch.java | 144 ++++++++++---------
 .../physical/impl/limit/LimitRecordBatch.java   |  38 ++---
 .../impl/mergereceiver/MergingRecordBatch.java  |  42 +++---
 .../PartitionSenderRootExec.java                |  21 +++
 .../partitionsender/PartitionerTemplate.java    |  12 +-
 .../impl/producer/ProducerConsumerBatch.java    |  25 ++++
 .../impl/project/ProjectRecordBatch.java        |  53 +++++--
 .../impl/svremover/RemovingRecordBatch.java     |  36 +++--
 .../physical/impl/trace/TraceRecordBatch.java   |   4 +-
 .../impl/union/UnionAllRecordBatch.java         |  15 ++
 .../UnorderedReceiverBatch.java                 |   5 +
 .../IteratorValidatorBatchIterator.java         |  19 ++-
 .../window/StreamingWindowFrameRecordBatch.java |  17 ++-
 .../physical/impl/xsort/ExternalSortBatch.java  |  34 ++++-
 .../drill/exec/record/AbstractRecordBatch.java  |   6 +
 .../exec/record/AbstractSingleRecordBatch.java  |  16 ++-
 .../exec/record/ExpandableHyperContainer.java   |  14 ++
 .../exec/record/FragmentWritableBatch.java      |  11 ++
 .../apache/drill/exec/record/RecordBatch.java   |   9 ++
 .../drill/exec/record/VectorContainer.java      |  56 +++++++-
 .../exec/work/fragment/FragmentExecutor.java    |   1 +
 .../java/org/apache/drill/BaseTestQuery.java    |   4 +-
 .../org/apache/drill/TestExampleQueries.java    |   2 +-
 .../exec/fn/impl/TestAggregateFunction.java     |   2 +-
 .../drill/exec/fn/impl/TestDateFunctions.java   |   2 +-
 .../drill/exec/fn/impl/TestMultiInputAdd.java   |   2 +-
 .../exec/fn/impl/TestNewAggregateFunctions.java |   2 +-
 .../exec/fn/impl/TestNewMathFunctions.java      |   3 +-
 .../exec/physical/impl/SimpleRootExec.java      |  15 ++
 .../exec/physical/impl/TestCastFunctions.java   |   2 +-
 .../physical/impl/TestCastVarCharToBigInt.java  |   2 +-
 .../drill/exec/physical/impl/TestDecimal.java   |  12 +-
 .../physical/impl/TestExtractFunctions.java     |   2 +-
 .../impl/TestImplicitCastFunctions.java         |   1 +
 .../physical/impl/TestSimpleFragmentRun.java    |   8 +-
 .../exec/physical/impl/TestStringFunctions.java |   1 +
 .../exec/physical/impl/TopN/TestSimpleTopN.java |   2 +-
 .../exec/physical/impl/join/TestHashJoin.java   |   4 +-
 .../exec/physical/impl/sort/TestSimpleSort.java |   2 +
 .../physical/impl/window/TestWindowFrame.java   |  12 +-
 .../exec/physical/impl/writer/TestWriter.java   |   2 +-
 .../drill/exec/record/vector/TestDateTypes.java |  12 +-
 .../vector/complex/writer/TestJsonReader.java   |   8 +-
 .../java-exec/src/test/resources/agg/test1.json |   2 +-
 .../src/test/resources/agg/twokey.json          |   2 +-
 .../decimal/test_decimal_sort_complex.json      |   2 +-
 .../functions/cast/testICastConstant.json       |   4 +-
 .../functions/date/interval_arithmetic.json     |   2 +-
 .../functions/string/testRegexpReplace.json     |   4 +-
 .../src/test/resources/join/join_batchsize.json |   4 +-
 .../test/resources/join/mj_multi_condition.json |   4 +-
 .../resources/mergerecv/merging_receiver.json   |   2 +-
 .../resources/mergerecv/multiple_providers.json |   2 +-
 .../resources/record/vector/test_sort_date.json |   2 +-
 .../src/test/resources/window/oneKeyCount.json  |   2 +-
 .../resources/window/oneKeyCountMultiBatch.json |   2 +-
 .../src/test/resources/window/twoKeys.json      |   2 +-
 .../apache/drill/jdbc/test/TestJdbcQuery.java   |   1 +
 76 files changed, 841 insertions(+), 348 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/63d3008e/exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java
index 67beb95..a86367f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java
@@ -356,13 +356,14 @@ public class Accountor {
         sb.append("at stack location:\n");
         entry.addToString(sb);
       }
-      IllegalStateException e = new IllegalStateException(sb.toString());
-      if (errorOnLeak) {
-        throw e;
-      } else {
-        logger.warn("Memory leaked.", e);
+      if (!buffers.isEmpty()) {
+        IllegalStateException e = new IllegalStateException(sb.toString());
+        if (errorOnLeak) {
+          throw e;
+        } else {
+          logger.warn("Memory leaked.", e);
+        }
       }
-
     }
 
     remainder.close();

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/63d3008e/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java
index 4250e27..d9c4e5b 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java
@@ -18,6 +18,7 @@
 package org.apache.drill.exec.physical.impl;
 
 
+import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 
 /**
@@ -28,6 +29,11 @@ public interface RootExec {
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(RootExec.class);
 
   /**
+   * Generate and send emtpy schema batch
+   */
+  public void buildSchema() throws SchemaChangeException;
+
+  /**
    * Do the next batch of work.
    * @return Whether or not additional batches of work are necessary.  False 
means that this fragment is done.
    */

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/63d3008e/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
index fc23441..ac65e40 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
@@ -65,6 +65,7 @@ public class ScanBatch implements RecordBatch {
   private final Map<MaterializedField.Key, ValueVector> fieldVectorMap = 
Maps.newHashMap();
 
   private final VectorContainer container = new VectorContainer();
+  private VectorContainer tempContainer;
   private int recordCount;
   private final FragmentContext context;
   private final OperatorContext oContext;
@@ -77,7 +78,7 @@ public class ScanBatch implements RecordBatch {
   private List<ValueVector> partitionVectors;
   private List<Integer> selectedPartitionColumns;
   private String partitionColumnDesignator;
-  private boolean first = true;
+  private boolean first = false;
   private boolean done = false;
 
   public ScanBatch(PhysicalOperator subScanConfig, FragmentContext context, 
Iterator<RecordReader> readers, List<String[]> partitionColumns, List<Integer> 
selectedPartitionColumns) throws ExecutionSetupException {
@@ -117,6 +118,22 @@ public class ScanBatch implements RecordBatch {
   }
 
   @Override
+  public IterOutcome buildSchema() {
+    IterOutcome outcome = next();
+    if (outcome == IterOutcome.NONE) {
+      container.buildSchema(SelectionVectorMode.NONE);
+      schema = container.getSchema();
+      done = true;
+    }
+    first = true;
+    tempContainer = VectorContainer.getTransferClone(container);
+    for (VectorWrapper w : container) {
+      w.getValueVector().allocateNew();
+    }
+    return IterOutcome.OK_NEW_SCHEMA;
+  }
+
+  @Override
   public int getRecordCount() {
     return recordCount;
   }
@@ -138,11 +155,24 @@ public class ScanBatch implements RecordBatch {
     container.zeroVectors();
   }
 
+  private void transfer() {
+    container.zeroVectors();
+    for (VectorWrapper w : tempContainer) {
+      MaterializedField field = w.getField();
+      
w.getValueVector().makeTransferPair(container.addOrGet(field)).transfer();
+    }
+  }
+
   @Override
   public IterOutcome next() {
     if (done) {
       return IterOutcome.NONE;
     }
+    if (first) {
+      first = false;
+      transfer();
+      return IterOutcome.OK;
+    }
     long t1 = System.nanoTime();
     oContext.getStats().startProcessing();
     try {
@@ -159,14 +189,6 @@ public class ScanBatch implements RecordBatch {
         try {
           if (!readers.hasNext()) {
             currentReader.cleanup();
-            if (first) {
-              first = false;
-              done = true;
-              populatePartitionVectors();
-              container.buildSchema(SelectionVectorMode.NONE);
-              schema = container.getSchema();
-              return IterOutcome.OK_NEW_SCHEMA;
-            }
             releaseAssets();
             return IterOutcome.NONE;
           }
@@ -196,7 +218,6 @@ public class ScanBatch implements RecordBatch {
           return IterOutcome.STOP;
         }
       }
-      first = false;
 
       populatePartitionVectors();
       if (mutator.isNewSchema()) {
@@ -349,6 +370,9 @@ public class ScanBatch implements RecordBatch {
 
   public void cleanup() {
     container.clear();
+    if (tempContainer != null) {
+      tempContainer.clear();
+    }
     for (ValueVector v : partitionVectors) {
       v.clear();
     }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/63d3008e/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
index 868eb6e..3a843ea 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
@@ -23,6 +23,7 @@ import java.util.List;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.memory.OutOfMemoryException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.MetricDef;
@@ -85,6 +86,30 @@ public class ScreenCreator implements RootCreator<Screen>{
       this.connection = context.getConnection();
     }
 
+    @Override
+    public void buildSchema() throws SchemaChangeException {
+      stats.startProcessing();
+      try {
+        stats.stopProcessing();
+        try {
+          incoming.buildSchema();
+        } finally {
+          stats.startProcessing();
+        }
+
+        QueryWritableBatch batch = 
QueryWritableBatch.getEmptyBatchWithSchema(context.getHandle().getQueryId(), 0, 
false, incoming.getSchema());
+        stats.startWait();
+        try {
+          connection.sendResult(listener, batch);
+        } finally {
+          stats.stopWait();
+        }
+        sendCount.increment();
+      } finally {
+        stats.stopProcessing();
+      }
+      materializer = new VectorRecordMaterializer(context, incoming);
+    }
 
     @Override
     public boolean innerNext() {
@@ -123,17 +148,13 @@ public class ScreenCreator implements RootCreator<Screen>{
       case NONE: {
         this.internalStop();
         QueryWritableBatch batch;
-        if (!first) {
-          QueryResult header = QueryResult.newBuilder() //
-              .setQueryId(context.getHandle().getQueryId()) //
-              .setRowCount(0) //
-              .setDef(RecordBatchDef.getDefaultInstance()) //
-              .setIsLastChunk(true) //
-              .build();
-          batch = new QueryWritableBatch(header);
-        } else {
-          batch = 
QueryWritableBatch.getEmptyBatchWithSchema(context.getHandle().getQueryId(), 0, 
true, incoming.getSchema());
-        }
+        QueryResult header = QueryResult.newBuilder() //
+            .setQueryId(context.getHandle().getQueryId()) //
+            .setRowCount(0) //
+            .setDef(RecordBatchDef.getDefaultInstance()) //
+            .setIsLastChunk(true) //
+            .build();
+        batch = new QueryWritableBatch(header);
         stats.startWait();
         try {
           connection.sendResult(listener, batch);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/63d3008e/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
index 34196b7..b638de0 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
@@ -22,11 +22,14 @@ import io.netty.buffer.ByteBuf;
 import java.util.List;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.memory.OutOfMemoryException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.MetricDef;
 import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.physical.config.SingleSender;
+import org.apache.drill.exec.physical.impl.materialize.QueryWritableBatch;
+import 
org.apache.drill.exec.physical.impl.materialize.VectorRecordMaterializer;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
 import org.apache.drill.exec.record.FragmentWritableBatch;
@@ -52,6 +55,7 @@ public class SingleSenderCreator implements 
RootCreator<SingleSender>{
     private RecordBatch incoming;
     private DataTunnel tunnel;
     private FragmentHandle handle;
+    private SingleSender config;
     private int recMajor;
     private FragmentContext context;
     private volatile boolean ok = true;
@@ -73,6 +77,7 @@ public class SingleSenderCreator implements 
RootCreator<SingleSender>{
       this.incoming = batch;
       assert(incoming != null);
       this.handle = context.getHandle();
+      this.config = config;
       this.recMajor = config.getOppositeMajorFragmentId();
       FragmentHandle opposite = 
handle.toBuilder().setMajorFragmentId(config.getOppositeMajorFragmentId()).setMinorFragmentId(0).build();
       this.tunnel = context.getDataTunnel(config.getDestination(), opposite);
@@ -80,6 +85,32 @@ public class SingleSenderCreator implements 
RootCreator<SingleSender>{
     }
 
     @Override
+    public void buildSchema() throws SchemaChangeException {
+      stats.startProcessing();
+      try {
+        stats.stopProcessing();
+        try {
+          incoming.buildSchema();
+        } finally {
+          stats.startProcessing();
+        }
+
+        FragmentWritableBatch batch = 
FragmentWritableBatch.getEmptyBatchWithSchema(handle.getQueryId(),
+                handle.getMajorFragmentId(), handle.getMinorFragmentId(), 
config.getOppositeMajorFragmentId(), 0, incoming.getSchema());
+
+        stats.startWait();
+        try {
+          tunnel.sendRecordBatch(new RecordSendFailure(), batch);
+        } finally {
+          stats.stopWait();
+        }
+        sendCount.increment();
+      } finally {
+        stats.stopProcessing();
+      }
+    }
+
+    @Override
     public boolean innerNext() {
       if (!ok) {
         incoming.kill(false);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/63d3008e/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueueTemplate.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueueTemplate.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueueTemplate.java
index e0a4c92..369c0ec 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueueTemplate.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueueTemplate.java
@@ -77,6 +77,7 @@ public abstract class PriorityQueueTemplate implements 
PriorityQueue {
       heapSv4.set(i, v4.get(i));
     }
     v4.clear();
+    doSetup(context, hyperBatch, null);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/63d3008e/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
index 473e3a3..400a867 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
@@ -110,15 +110,6 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
   }
 
   @Override
-  public BatchSchema getSchema() {
-    List<MaterializedField> fields = Lists.newArrayList();
-    for (MaterializedField field : incoming.getSchema()) {
-      fields.add(field);
-    }
-    return 
BatchSchema.newBuilder().addFields(fields).setSelectionVectorMode(SelectionVectorMode.FOUR_BYTE).build();
-  }
-
-  @Override
   public void cleanup() {
     if (sv4 != null) {
       sv4.clear();
@@ -131,6 +122,32 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
   }
 
   @Override
+  public IterOutcome buildSchema() throws SchemaChangeException {
+    VectorContainer c = new VectorContainer(oContext);
+    stats.startProcessing();
+    try {
+      stats.stopProcessing();
+      try {
+        incoming.buildSchema();
+      } finally {
+        stats.startProcessing();
+      }
+      for (VectorWrapper w : incoming) {
+        c.addOrGet(w.getField());
+      }
+      c = VectorContainer.canonicalize(c);
+      for (VectorWrapper w : c) {
+        container.add(w.getValueVector());
+      }
+      container.buildSchema(SelectionVectorMode.NONE);
+      container.setRecordCount(0);
+      return IterOutcome.OK_NEW_SCHEMA;
+    } finally {
+      stats.stopProcessing();
+    }
+  }
+
+  @Override
   public IterOutcome innerNext() {
     if (schema != null) {
       if (getSelectionVector4().next()) {
@@ -146,6 +163,10 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
         Stopwatch watch = new Stopwatch();
         watch.start();
         IterOutcome upstream = incoming.next();
+        if (upstream == IterOutcome.OK && schema == null) {
+          upstream = IterOutcome.OK_NEW_SCHEMA;
+          container.clear();
+        }
         logger.debug("Took {} us to get next", 
watch.elapsed(TimeUnit.MICROSECONDS));
         switch (upstream) {
         case NONE:
@@ -191,6 +212,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
       priorityQueue.generate();
 
       this.sv4 = priorityQueue.getFinalSv4();
+      container.clear();
       for (VectorWrapper w : priorityQueue.getHyperBatch()) {
         container.add(w.getValueVectors());
       }
@@ -210,7 +232,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
     Stopwatch watch = new Stopwatch();
     watch.start();
     VectorContainer c = priorityQueue.getHyperBatch();
-    VectorContainer newContainer = new VectorContainer();
+    VectorContainer newContainer = new VectorContainer(oContext);
     SelectionVector4 selectionVector4 = priorityQueue.getHeapSv4();
     SimpleRecordBatch batch = new SimpleRecordBatch(c, selectionVector4, 
context);
     SimpleRecordBatch newBatch = new SimpleRecordBatch(newContainer, null, 
context);
@@ -323,6 +345,11 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
     }
 
     @Override
+    public IterOutcome buildSchema() throws SchemaChangeException {
+      return null;
+    }
+
+    @Override
     public int getRecordCount() {
       if (sv4 != null) {
         return sv4.getCount();

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/63d3008e/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
index 8c1a4c0..acbb815 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.memory.OutOfMemoryException;
 import org.apache.drill.exec.ops.FragmentContext;
@@ -30,6 +31,7 @@ import org.apache.drill.exec.physical.base.Writer;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.record.AbstractRecordBatch;
 import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.VectorWrapper;
@@ -47,7 +49,6 @@ public class WriterRecordBatch extends 
AbstractRecordBatch<Writer> {
   private RecordWriter recordWriter;
   private int counter = 0;
   private final RecordBatch incoming;
-  private boolean first = true;
   private boolean processed = false;
   private String fragmentUniqueId;
 
@@ -71,8 +72,23 @@ public class WriterRecordBatch extends 
AbstractRecordBatch<Writer> {
   }
 
   @Override
+  public IterOutcome buildSchema() throws SchemaChangeException {
+    incoming.buildSchema();
+    try {
+      stats.startProcessing();
+      setupNewSchema();
+    } catch (Exception e) {
+      throw new SchemaChangeException(e);
+    } finally {
+      stats.stopProcessing();
+    }
+    return IterOutcome.OK_NEW_SCHEMA;
+  }
+
+  @Override
   public IterOutcome innerNext() {
     if(processed) {
+      cleanup();
       // if the upstream record batch is already processed and next() is 
called by
       // downstream then return NONE to indicate completion
       return IterOutcome.NONE;
@@ -82,16 +98,11 @@ public class WriterRecordBatch extends 
AbstractRecordBatch<Writer> {
     IterOutcome upstream;
     do {
       upstream = next(incoming);
-      if(first && upstream == IterOutcome.OK) {
-        upstream = IterOutcome.OK_NEW_SCHEMA;
-      }
-      first = false;
 
       switch(upstream) {
         case NOT_YET:
         case NONE:
         case STOP:
-          cleanup();
           if (upstream == IterOutcome.STOP) {
             return upstream;
           }
@@ -125,22 +136,12 @@ public class WriterRecordBatch extends 
AbstractRecordBatch<Writer> {
       }
     } while(upstream != IterOutcome.NONE);
 
-    // Create two vectors for:
-    //   1. Fragment unique id.
-    //   2. Summary: currently contains number of records written.
-    MaterializedField fragmentIdField = 
MaterializedField.create(SchemaPath.getSimplePath("Fragment"), 
Types.required(MinorType.VARCHAR));
-    MaterializedField summaryField = 
MaterializedField.create(SchemaPath.getSimplePath("Number of records written"), 
Types.required(MinorType.BIGINT));
-
-    VarCharVector fragmentIdVector = (VarCharVector) 
TypeHelper.getNewVector(fragmentIdField, context.getAllocator());
-    AllocationHelper.allocate(fragmentIdVector, 1, 
TypeHelper.getSize(Types.required(MinorType.VARCHAR)));
-    BigIntVector summaryVector = (BigIntVector) 
TypeHelper.getNewVector(summaryField, context.getAllocator());
-    AllocationHelper.allocate(summaryVector, 1, 
TypeHelper.getSize(Types.required(MinorType.VARCHAR)));
-
-
-    container.add(fragmentIdVector);
-    container.add(summaryVector);
-    container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
 
+    VarCharVector fragmentIdVector = (VarCharVector) 
container.getValueAccessorById(VarCharVector.class, 
container.getValueVectorId(SchemaPath.getSimplePath("Fragment")).getFieldIds()).getValueVector();
+    AllocationHelper.allocate(fragmentIdVector, 1, 50);
+    BigIntVector summaryVector = (BigIntVector) 
container.getValueAccessorById(BigIntVector.class,
+            container.getValueVectorId(SchemaPath.getSimplePath("Number of 
records written")).getFieldIds()).getValueVector();
+    AllocationHelper.allocate(summaryVector, 1, 8);
     fragmentIdVector.getMutator().setSafe(0, fragmentUniqueId.getBytes());
     fragmentIdVector.getMutator().setValueCount(1);
     summaryVector.getMutator().setSafe(0, counter);
@@ -157,6 +158,15 @@ public class WriterRecordBatch extends 
AbstractRecordBatch<Writer> {
       // update the schema in RecordWriter
       stats.startSetup();
       recordWriter.updateSchema(incoming.getSchema());
+      // Create two vectors for:
+      //   1. Fragment unique id.
+      //   2. Summary: currently contains number of records written.
+      MaterializedField fragmentIdField = 
MaterializedField.create(SchemaPath.getSimplePath("Fragment"), 
Types.required(MinorType.VARCHAR));
+      MaterializedField summaryField = 
MaterializedField.create(SchemaPath.getSimplePath("Number of records written"), 
Types.required(MinorType.BIGINT));
+
+      container.addOrGet(fragmentIdField);
+      container.addOrGet(summaryField);
+      container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
     } catch(IOException ex) {
       throw new RuntimeException("Failed to update schema in RecordWriter", 
ex);
     } finally{
@@ -164,6 +174,7 @@ public class WriterRecordBatch extends 
AbstractRecordBatch<Writer> {
     }
 
     eventBasedRecordWriter = new EventBasedRecordWriter(incoming, 
recordWriter);
+    container.buildSchema(SelectionVectorMode.NONE);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/63d3008e/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
index c522870..a0b8d3f 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
@@ -89,17 +89,45 @@ public class HashAggBatch extends 
AbstractRecordBatch<HashAggregate> {
   }
 
   @Override
+  public IterOutcome buildSchema() throws SchemaChangeException {
+    stats.startProcessing();
+    try {
+      stats.stopProcessing();
+      try {
+        incoming.buildSchema();
+      } finally {
+        stats.startProcessing();
+      }
+      if (!createAggregator()) {
+        done = true;
+        return IterOutcome.STOP;
+      }
+      return IterOutcome.OK_NEW_SCHEMA;
+    } finally {
+      stats.stopProcessing();
+    }
+  }
+
+  @Override
   public IterOutcome innerNext() {
     if (done) {
       return IterOutcome.NONE;
     }
     // this is only called on the first batch. Beyond this, the aggregator 
manages batches.
-    if (aggregator == null) {
+    if (aggregator == null || first) {
+      first = false;
+      if (aggregator != null) {
+        aggregator.cleanup();
+      }
       IterOutcome outcome = next(incoming);
+      if (outcome == IterOutcome.OK) {
+        outcome = IterOutcome.OK_NEW_SCHEMA;
+      }
       logger.debug("Next outcome of {}", outcome);
       switch (outcome) {
       case NONE:
-        throw new UnsupportedOperationException("Received NONE on first 
batch");
+//        throw new UnsupportedOperationException("Received NONE on first 
batch");
+        return outcome;
       case NOT_YET:
       case STOP:
         return outcome;
@@ -110,7 +138,7 @@ public class HashAggBatch extends 
AbstractRecordBatch<HashAggregate> {
         }
         break;
       case OK:
-        throw new IllegalStateException("You should never get a first batch 
without a new schema");
+        break;
       default:
         throw new IllegalStateException(String.format("unknown outcome %s", 
outcome));
       }
@@ -123,11 +151,6 @@ public class HashAggBatch extends 
AbstractRecordBatch<HashAggregate> {
   if (aggregator.buildComplete() && ! aggregator.allFlushed()) {
     // aggregation is complete and not all records have been output yet
     IterOutcome outcome = aggregator.outputCurrentBatch();
-    if (outcome == IterOutcome.NONE && first) {
-      first = false;
-      done = true;
-      return IterOutcome.OK_NEW_SCHEMA;
-    }
     return outcome;
   }
 
@@ -144,11 +167,6 @@ public class HashAggBatch extends 
AbstractRecordBatch<HashAggregate> {
         // fall through
       case RETURN_OUTCOME:
         IterOutcome outcome = aggregator.getOutcome();
-        if (outcome == IterOutcome.NONE && first) {
-          first = false;
-          done = true;
-          return IterOutcome.OK_NEW_SCHEMA;
-        }
         return aggregator.getOutcome();
       case UPDATE_AGGREGATOR:
         aggregator = null;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/63d3008e/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
index 4d3925e..17aaae8 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
@@ -77,13 +77,33 @@ public class StreamingAggBatch extends 
AbstractRecordBatch<StreamingAggregate> {
   }
 
   @Override
+  public IterOutcome buildSchema() throws SchemaChangeException {
+    stats.startProcessing();
+    try {
+      stats.stopProcessing();
+      try {
+        incoming.buildSchema();
+      } finally {
+        stats.startProcessing();
+      }
+      if (!createAggregator()) {
+        done = true;
+        return IterOutcome.STOP;
+      }
+      return IterOutcome.OK_NEW_SCHEMA;
+    } finally {
+      stats.stopProcessing();
+    }
+  }
+  @Override
   public IterOutcome innerNext() {
     if (done) {
       container.zeroVectors();
       return IterOutcome.NONE;
     }
       // this is only called on the first batch. Beyond this, the aggregator 
manages batches.
-    if (aggregator == null) {
+    if (aggregator == null || first) {
+      first = false;
       IterOutcome outcome = next(incoming);
       logger.debug("Next outcome of {}", outcome);
       switch (outcome) {
@@ -98,7 +118,7 @@ public class StreamingAggBatch extends 
AbstractRecordBatch<StreamingAggregate> {
         }
         break;
       case OK:
-        throw new IllegalStateException("You should never get a first batch 
without a new schema");
+        break;
       default:
         throw new IllegalStateException(String.format("unknown outcome %s", 
outcome));
       }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/63d3008e/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
index c2a5715..556b260 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
@@ -55,7 +55,6 @@ public abstract class StreamingAggTemplate implements 
StreamingAggregator {
     this.schema = incoming.getSchema();
     this.outgoing = outgoing;
     setupInterior(incoming, outgoing);
-    this.currentIndex = incoming.getRecordCount() == 0 ? 0 : 
this.getVectorIndex(underlyingIndex);
   }
 
 
@@ -92,6 +91,7 @@ public abstract class StreamingAggTemplate implements 
StreamingAggregator {
 
       // if we're in the first state, allocate outgoing.
       if (first) {
+        this.currentIndex = incoming.getRecordCount() == 0 ? 0 : 
this.getVectorIndex(underlyingIndex);
         allocateOutgoing();
       }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/63d3008e/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java
index 3c8e551..4e7d222 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java
@@ -22,6 +22,7 @@ import io.netty.buffer.ByteBuf;
 import java.util.List;
 
 import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.memory.OutOfMemoryException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.MetricDef;
@@ -84,6 +85,34 @@ public class BroadcastSenderRootExec extends BaseRootExec {
   }
 
   @Override
+  public void buildSchema() throws SchemaChangeException {
+    stats.startProcessing();
+    try {
+      stats.stopProcessing();
+      try {
+        incoming.buildSchema();
+      } finally {
+        stats.startProcessing();
+      }
+
+      FragmentWritableBatch batch = 
FragmentWritableBatch.getEmptyBatchWithSchema(handle.getQueryId(),
+              handle.getMajorFragmentId(), handle.getMinorFragmentId(), 
config.getOppositeMajorFragmentId(), 0, incoming.getSchema());
+
+      stats.startWait();
+      for (int i = 0; i < tunnels.length; i++) {
+        try {
+          tunnels[i].sendRecordBatch(this.statusHandler, batch);
+        } finally {
+          stats.stopWait();
+        }
+        statusHandler.sendCount.increment();
+      }
+    } finally {
+      stats.stopProcessing();
+    }
+  }
+
+  @Override
   public boolean innerNext() {
     if(!ok) {
       context.fail(statusHandler.ex);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/63d3008e/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
index 6028a04..e8ccd62 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
@@ -46,6 +46,8 @@ public interface HashTable {
                     RecordBatch incomingBuild, RecordBatch incomingProbe,
                     RecordBatch outgoing, VectorContainer htContainerOrig);
 
+  public void updateBatches();
+
   public PutStatus put(int incomingRowIdx, IndexPointer htIdxHolder, int 
retryCount);
 
   public int containsKey(int incomingRowIdx, boolean isProbe);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/63d3008e/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
index 6024523..5b56f8e 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
@@ -144,7 +144,7 @@ public abstract class HashTableTemplate implements 
HashTable {
       hashValues.getMutator().setValueCount(size);
     }
 
-    private void setup() {
+    protected void setup() {
       setupInterior(incomingBuild, incomingProbe, outgoing, htContainer);
     }
 
@@ -433,6 +433,13 @@ public abstract class HashTableTemplate implements 
HashTable {
     currentIdxHolder = new IndexPointer();
   }
 
+  public void updateBatches() {
+    doSetup(incomingBuild, incomingProbe);
+    for (BatchHolder batchHolder : batchHolders) {
+      batchHolder.setup();
+    }
+  }
+
   public int numBuckets() {
     return startIndices.getAccessor().getValueCount();
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/63d3008e/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
index 85f664c..7d68e07 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
@@ -77,6 +77,7 @@ public class FilterRecordBatch extends 
AbstractSingleRecordBatch<Filter>{
 
   @Override
   protected IterOutcome doWork() {
+    container.zeroVectors();
     int recordCount = incoming.getRecordCount();
     filter.filterBatch(recordCount);
 //    for (VectorWrapper<?> v : container) {
@@ -100,15 +101,16 @@ public class FilterRecordBatch extends 
AbstractSingleRecordBatch<Filter>{
   }
 
   @Override
-  protected void setupNewSchema() throws SchemaChangeException {
-    container.clear();
+  protected boolean setupNewSchema() throws SchemaChangeException {
     if (sv2 != null) {
       sv2.clear();
     }
 
     switch (incoming.getSchema().getSelectionVectorMode()) {
       case NONE:
-        sv2 = new SelectionVector2(oContext.getAllocator());
+        if (sv2 == null) {
+          sv2 = new SelectionVector2(oContext.getAllocator());
+        }
         this.filter = generateSV2Filterer();
         break;
       case TWO_BYTE:
@@ -135,6 +137,11 @@ public class FilterRecordBatch extends 
AbstractSingleRecordBatch<Filter>{
         throw new UnsupportedOperationException();
     }
 
+    if (container.isSchemaChanged()) {
+      container.buildSchema(SelectionVectorMode.TWO_BYTE);
+      return true;
+    }
+    return false;
   }
 
   protected Filterer generateSV4Filterer() throws SchemaChangeException {
@@ -190,12 +197,10 @@ public class FilterRecordBatch extends 
AbstractSingleRecordBatch<Filter>{
     cg.addExpr(new ReturnValueExpression(expr));
 
     for (VectorWrapper<?> v : incoming) {
-      TransferPair pair = v.getValueVector().getTransferPair();
-      container.add(pair.getTo());
+      TransferPair pair = 
v.getValueVector().makeTransferPair(container.addOrGet(v.getField()));
       transfers.add(pair);
     }
 
-    container.buildSchema(SelectionVectorMode.TWO_BYTE);
 
     try {
       TransferPair[] tx = transfers.toArray(new 
TransferPair[transfers.size()]);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/63d3008e/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index 2a08c05..238c992 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -33,7 +33,6 @@ import 
org.apache.drill.exec.exception.ClassTransformationException;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.expr.ClassGenerator;
 import org.apache.drill.exec.expr.CodeGenerator;
-import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.memory.OutOfMemoryException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.MetricDef;
@@ -46,12 +45,13 @@ import 
org.apache.drill.exec.physical.impl.common.IndexPointer;
 import org.apache.drill.exec.physical.impl.sort.RecordBatchData;
 import org.apache.drill.exec.record.AbstractRecordBatch;
 import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 import org.apache.drill.exec.record.ExpandableHyperContainer;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.TypedFieldId;
+import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
-import org.apache.drill.exec.vector.ValueVector;
 import org.eigenbase.rel.JoinRelType;
 
 import com.sun.codemodel.JExpr;
@@ -167,6 +167,35 @@ public class HashJoinBatch extends 
AbstractRecordBatch<HashJoinPOP> {
     }
 
 
+
+  @Override
+  public IterOutcome buildSchema() throws SchemaChangeException {
+    leftUpstream = left.buildSchema();
+    right.buildSchema();
+    // Initialize the hash join helper context
+    hjHelper = new HashJoinHelper(context, oContext.getAllocator());
+    try {
+      rightSchema = right.getSchema();
+      VectorContainer c = new VectorContainer(oContext);
+      for (MaterializedField field : rightSchema) {
+        c.addOrGet(field);
+      }
+      c.buildSchema(SelectionVectorMode.NONE);
+      c.setRecordCount(0);
+      hyperContainer = new ExpandableHyperContainer(c);
+      hjHelper.addNewBatch(0);
+      buildBatchIndex++;
+      setupHashTable();
+      hashJoinProbe = setupHashJoinProbe();
+      // Build the container schema and set the counts
+      container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
+      container.setRecordCount(outputRecords);
+    } catch (IOException | ClassTransformationException e) {
+      throw new SchemaChangeException(e);
+    }
+    return IterOutcome.OK_NEW_SCHEMA;
+  }
+
     @Override
     public IterOutcome innerNext() {
         if (done) {
@@ -176,27 +205,15 @@ public class HashJoinBatch extends 
AbstractRecordBatch<HashJoinPOP> {
             /* If we are here for the first time, execute the build phase of 
the
              * hash join and setup the run time generated class for the probe 
side
              */
-            if (hashJoinProbe == null) {
-
-                // Initialize the hash join helper context
-                hjHelper = new HashJoinHelper(context, 
oContext.getAllocator());
-
-                /* Build phase requires setting up the hash table. Hash table 
will
-                 * materialize both the build and probe side expressions while
-                 * creating the hash table. So we need to invoke next() on our 
probe batch
-                 * as well, for the materialization to be successful. This 
batch will not be used
-                 * till we complete the build phase.
-                 */
-                leftUpstream = next(HashJoinHelper.LEFT_INPUT, left);
-
+            if (first) {
+                first = false;
                 // Build the hash table, using the build side record batches.
                 executeBuildPhase();
+//                IterOutcome next = next(HashJoinHelper.LEFT_INPUT, left);
+                hashJoinProbe.setupHashJoinProbe(context, hyperContainer, 
left, 0, this, hashTable, hjHelper, joinType);
 
                 // Update the hash table related stats for the operator
                 updateStats(this.hashTable);
-
-                // Create the run time generated code needed to probe and 
project
-                hashJoinProbe = setupHashJoinProbe();
             }
 
             // Store the number of records projected
@@ -216,21 +233,11 @@ public class HashJoinBatch extends 
AbstractRecordBatch<HashJoinPOP> {
                 if (outputRecords > 0 || first) {
                   first = false;
 
-                  // Build the container schema and set the counts
-                  container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
-                  container.setRecordCount(outputRecords);
 
                   for (VectorWrapper<?> v : container) {
                     
v.getValueVector().getMutator().setValueCount(outputRecords);
                   }
 
-                  // First output batch, return OK_NEW_SCHEMA
-                  if (firstOutputBatch == true) {
-                    firstOutputBatch = false;
-                    return IterOutcome.OK_NEW_SCHEMA;
-                  }
-
-                  // Not the first output batch
                   return IterOutcome.OK;
                 }
             } else {
@@ -302,6 +309,9 @@ public class HashJoinBatch extends 
AbstractRecordBatch<HashJoinPOP> {
 
         //Setup the underlying hash table
         IterOutcome rightUpstream = next(HashJoinHelper.RIGHT_INPUT, right);
+      if (hashTable == null) {
+        rightUpstream = IterOutcome.OK_NEW_SCHEMA;
+      }
 
         boolean moreData = true;
 
@@ -324,7 +334,10 @@ public class HashJoinBatch extends 
AbstractRecordBatch<HashJoinPOP> {
                         }
                         setupHashTable();
                     } else {
+                      if (!rightSchema.equals(right.getSchema())) {
                         throw new SchemaChangeException("Hash join does not 
support schema changes");
+                      }
+                      hashTable.updateBatches();
                     }
                 // Fall through
                 case OK:
@@ -388,10 +401,10 @@ public class HashJoinBatch extends 
AbstractRecordBatch<HashJoinPOP> {
         JExpression outIndex = JExpr.direct("outIndex");
         g.rotateBlock();
 
-        if (hyperContainer != null) {
-            for(VectorWrapper<?> vv : hyperContainer) {
+        if (rightSchema != null) {
+            for(MaterializedField field : rightSchema) {
 
-                MajorType inputType = vv.getField().getType();
+                MajorType inputType = field.getType();
                 MajorType outputType;
                 if (joinType == JoinRelType.LEFT && inputType.getMode() == 
DataMode.REQUIRED) {
                   outputType = Types.overrideMode(inputType, 
DataMode.OPTIONAL);
@@ -400,10 +413,10 @@ public class HashJoinBatch extends 
AbstractRecordBatch<HashJoinPOP> {
                 }
 
                 // Add the vector to our output container
-                ValueVector v = 
TypeHelper.getNewVector(MaterializedField.create(vv.getField().getPath(), 
outputType), context.getAllocator());
-                container.add(v);
+//                ValueVector v = 
TypeHelper.getNewVector(MaterializedField.create(vv.getField().getPath(), 
outputType), context.getAllocator());
+                container.addOrGet(MaterializedField.create(field.getPath(), 
outputType));
 
-                JVar inVV = g.declareVectorValueSetupAndMember("buildBatch", 
new TypedFieldId(vv.getField().getType(), true, fieldId));
+                JVar inVV = g.declareVectorValueSetupAndMember("buildBatch", 
new TypedFieldId(field.getType(), true, fieldId));
                 JVar outVV = g.declareVectorValueSetupAndMember("outgoing", 
new TypedFieldId(outputType, false, fieldId));
                 g.getEvalBlock()._if(outVV.invoke("copyFromSafe")
                   .arg(buildIndex.band(JExpr.lit((int) Character.MAX_VALUE)))
@@ -435,8 +448,7 @@ public class HashJoinBatch extends 
AbstractRecordBatch<HashJoinPOP> {
                   outputType = inputType;
                 }
 
-                ValueVector v = 
TypeHelper.getNewVector(MaterializedField.create(vv.getField().getPath(), 
outputType), oContext.getAllocator());
-                container.add(v);
+                
container.addOrGet(MaterializedField.create(vv.getField().getPath(), 
outputType));
 
                 JVar inVV = g.declareVectorValueSetupAndMember("probeBatch", 
new TypedFieldId(inputType, false, fieldId));
                 JVar outVV = g.declareVectorValueSetupAndMember("outgoing", 
new TypedFieldId(outputType, false, outputFieldId));
@@ -453,7 +465,6 @@ public class HashJoinBatch extends 
AbstractRecordBatch<HashJoinPOP> {
 
         HashJoinProbe hj = context.getImplementationClass(cg);
 
-        hj.setupHashJoinProbe(context, hyperContainer, left, recordCount, 
this, hashTable, hjHelper, joinType);
         return hj;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/63d3008e/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
index 133289e..c58f9a3 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
@@ -26,6 +26,7 @@ import 
org.apache.drill.exec.exception.ClassTransformationException;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.impl.common.HashTable;
+import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.RecordBatch.IterOutcome;
 import org.apache.drill.exec.record.VectorContainer;
@@ -37,6 +38,10 @@ public abstract class HashJoinProbeTemplate implements 
HashJoinProbe {
   // Probe side record batch
   private RecordBatch probeBatch;
 
+  private BatchSchema probeSchema;
+
+  private VectorContainer buildBatch;
+
   // Join type, INNER, LEFT, RIGHT or OUTER
   private JoinRelType joinType;
 
@@ -81,6 +86,8 @@ public abstract class HashJoinProbeTemplate implements 
HashJoinProbe {
                                  HashJoinHelper hjHelper, JoinRelType 
joinRelType) {
 
     this.probeBatch = probeBatch;
+    this.probeSchema = probeBatch.getSchema();
+    this.buildBatch = buildBatch;
     this.joinType = joinRelType;
     this.recordsToProcess = probeRecordCount;
     this.hashTable = hashTable;
@@ -135,7 +142,12 @@ public abstract class HashJoinProbeTemplate implements 
HashJoinProbe {
             continue;
 
           case OK_NEW_SCHEMA:
-            throw new SchemaChangeException("Hash join does not support schema 
changes");
+            if (probeBatch.getSchema().equals(probeSchema)) {
+              doSetup(outgoingJoinBatch.getContext(), buildBatch, probeBatch, 
outgoingJoinBatch);
+              hashTable.updateBatches();
+            } else {
+              throw new SchemaChangeException("Hash join does not support 
schema changes");
+            }
           case OK:
             recordsToProcess = probeBatch.getRecordCount();
             recordsProcessed = 0;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/63d3008e/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java
index 39bdb94..3bc8daa 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java
@@ -219,6 +219,14 @@ public final class JoinStatus {
     return leftPosition + 1 < left.getRecordCount();
   }
 
+  public IterOutcome getLastRight() {
+    return lastRight;
+  }
+
+  public IterOutcome getLastLeft() {
+    return lastLeft;
+  }
+
   /**
    * Check if the next left record position can advance by one in the current 
batch.
    */

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/63d3008e/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
index 1d4e353..518971d 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
@@ -51,6 +51,7 @@ import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.TypedFieldId;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.vector.AllocationHelper;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.allocator.VectorAllocator;
 import org.eigenbase.rel.JoinRelType;
@@ -136,6 +137,19 @@ public class MergeJoinBatch extends 
AbstractRecordBatch<MergeJoinPOP> {
   }
 
   @Override
+  public IterOutcome buildSchema() throws SchemaChangeException {
+    left.buildSchema();
+    right.buildSchema();
+    try {
+      allocateBatch(true);
+      worker = generateNewWorker();
+    } catch (IOException | ClassTransformationException e) {
+      throw new SchemaChangeException(e);
+    }
+    return IterOutcome.OK_NEW_SCHEMA;
+  }
+
+  @Override
   public IterOutcome innerNext() {
     if (done) {
       return IterOutcome.NONE;
@@ -148,9 +162,10 @@ public class MergeJoinBatch extends 
AbstractRecordBatch<MergeJoinPOP> {
 
       JoinOutcome outcome = status.getOutcome();
       // if the previous outcome was a change in schema or we sent a batch, we 
have to set up a new batch.
-      if (outcome == JoinOutcome.BATCH_RETURNED ||
-          outcome == JoinOutcome.SCHEMA_CHANGED) {
-        allocateBatch();
+      if (outcome == JoinOutcome.SCHEMA_CHANGED) {
+        allocateBatch(true);
+      } else if (outcome == JoinOutcome.BATCH_RETURNED) {
+        allocateBatch(false);
       }
 
       // reset the output position to zero after our parent iterates this 
RecordBatch
@@ -388,7 +403,7 @@ public class MergeJoinBatch extends 
AbstractRecordBatch<MergeJoinPOP> {
     cg.setMappingSet(copyRightMappping);
 
     int rightVectorBase = vectorId;
-    if (worker == null || status.isRightPositionAllowed()) {
+    if (status.getLastRight() != IterOutcome.NONE && (worker == null || 
status.isRightPositionAllowed())) {
       for (VectorWrapper<?> vw : right) {
         MajorType inputType = vw.getField().getType();
         MajorType outputType;
@@ -418,18 +433,17 @@ public class MergeJoinBatch extends 
AbstractRecordBatch<MergeJoinPOP> {
     return w;
   }
 
-  private void allocateBatch() {
+  private void allocateBatch(boolean newSchema) {
     // allocate new batch space.
-    container.clear();
+    container.zeroVectors();
 
     //estimation of joinBatchSize : max of left/right size, expanded by a 
factor of 16, which is then bounded by MAX_BATCH_SIZE.
     int leftCount = worker == null ? left.getRecordCount() : 
(status.isLeftPositionAllowed() ? left.getRecordCount() : 0);
     int rightCount = worker == null ? left.getRecordCount() : 
(status.isRightPositionAllowed() ? right.getRecordCount() : 0);
     int joinBatchSize = Math.min(Math.max(leftCount, rightCount) * 16, 
MAX_BATCH_SIZE);
 
+    if (newSchema) {
     // add fields from both batches
-    if (worker == null || leftCount > 0) {
-
       for (VectorWrapper<?> w : left) {
         MajorType inputType = w.getField().getType();
         MajorType outputType;
@@ -438,13 +452,10 @@ public class MergeJoinBatch extends 
AbstractRecordBatch<MergeJoinPOP> {
         } else {
           outputType = inputType;
         }
-        ValueVector outgoingVector = 
TypeHelper.getNewVector(MaterializedField.create(w.getField().getPath(), 
outputType), oContext.getAllocator());
-        VectorAllocator.getAllocator(outgoingVector, (int) 
Math.ceil(w.getValueVector().getBufferSize() / Math.max(1, 
left.getRecordCount()))).alloc(joinBatchSize);
-        container.add(outgoingVector);
+        MaterializedField newField = 
MaterializedField.create(w.getField().getPath(), outputType);
+       container.addOrGet(newField);
       }
-    }
 
-    if (worker == null || rightCount > 0) {
       for (VectorWrapper<?> w : right) {
         MajorType inputType = w.getField().getType();
         MajorType outputType;
@@ -453,12 +464,15 @@ public class MergeJoinBatch extends 
AbstractRecordBatch<MergeJoinPOP> {
         } else {
           outputType = inputType;
         }
-        ValueVector outgoingVector = 
TypeHelper.getNewVector(MaterializedField.create(w.getField().getPath(), 
outputType), oContext.getAllocator());
-        VectorAllocator.getAllocator(outgoingVector, (int) 
Math.ceil(w.getValueVector().getBufferSize() / Math.max(1, 
right.getRecordCount()))).alloc(joinBatchSize);
-        container.add(outgoingVector);
+        MaterializedField newField = 
MaterializedField.create(w.getField().getPath(), outputType);
+        container.addOrGet(newField);
       }
     }
 
+    for (VectorWrapper w : container) {
+      AllocationHelper.allocate(w.getValueVector(), 5000, 50);
+    }
+
     container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
     logger.debug("Built joined schema: {}", container.getSchema());
   }
@@ -467,58 +481,60 @@ public class MergeJoinBatch extends 
AbstractRecordBatch<MergeJoinPOP> {
       JVar incomingLeftRecordBatch, JVar incomingRightRecordBatch, 
ErrorCollector collector) throws ClassTransformationException {
 
     cg.setMappingSet(compareMapping);
+    if (status.getLastRight() != IterOutcome.NONE) {
 
-    for (JoinCondition condition : conditions) {
-      final LogicalExpression leftFieldExpr = condition.getLeft();
-      final LogicalExpression rightFieldExpr = condition.getRight();
+      for (JoinCondition condition : conditions) {
+        final LogicalExpression leftFieldExpr = condition.getLeft();
+        final LogicalExpression rightFieldExpr = condition.getRight();
 
-      // materialize value vector readers from join expression
-      LogicalExpression materializedLeftExpr;
-      if (worker == null || status.isLeftPositionAllowed()) {
-        materializedLeftExpr = 
ExpressionTreeMaterializer.materialize(leftFieldExpr, left, collector, 
context.getFunctionRegistry());
-      } else {
-        materializedLeftExpr = new 
TypedNullConstant(Types.optional(MinorType.INT));
-      }
-      if (collector.hasErrors()) {
-        throw new ClassTransformationException(String.format(
-            "Failure while trying to materialize incoming left field.  
Errors:\n %s.", collector.toErrorString()));
-      }
-
-      LogicalExpression materializedRightExpr;
-      if (worker == null || status.isRightPositionAllowed()) {
-        materializedRightExpr = 
ExpressionTreeMaterializer.materialize(rightFieldExpr, right, collector, 
context.getFunctionRegistry());
-      } else {
-        materializedRightExpr = new 
TypedNullConstant(Types.optional(MinorType.INT));
-      }
-      if (collector.hasErrors()) {
-        throw new ClassTransformationException(String.format(
-            "Failure while trying to materialize incoming right field.  
Errors:\n %s.", collector.toErrorString()));
-      }
-
-      // generate compare()
-      ////////////////////////
-      cg.setMappingSet(compareMapping);
-      cg.getSetupBlock().assign(JExpr._this().ref(incomingRecordBatch), 
JExpr._this().ref(incomingLeftRecordBatch));
-      ClassGenerator.HoldingContainer compareLeftExprHolder = 
cg.addExpr(materializedLeftExpr, false);
+        // materialize value vector readers from join expression
+        LogicalExpression materializedLeftExpr;
+        if (worker == null || status.isLeftPositionAllowed()) {
+          materializedLeftExpr = 
ExpressionTreeMaterializer.materialize(leftFieldExpr, left, collector, 
context.getFunctionRegistry());
+        } else {
+          materializedLeftExpr = new 
TypedNullConstant(Types.optional(MinorType.INT));
+        }
+        if (collector.hasErrors()) {
+          throw new ClassTransformationException(String.format(
+              "Failure while trying to materialize incoming left field.  
Errors:\n %s.", collector.toErrorString()));
+        }
 
-      cg.setMappingSet(compareRightMapping);
-      cg.getSetupBlock().assign(JExpr._this().ref(incomingRecordBatch), 
JExpr._this().ref(incomingRightRecordBatch));
-      ClassGenerator.HoldingContainer compareRightExprHolder = 
cg.addExpr(materializedRightExpr, false);
+        LogicalExpression materializedRightExpr;
+        if (worker == null || status.isRightPositionAllowed()) {
+          materializedRightExpr = 
ExpressionTreeMaterializer.materialize(rightFieldExpr, right, collector, 
context.getFunctionRegistry());
+        } else {
+          materializedRightExpr = new 
TypedNullConstant(Types.optional(MinorType.INT));
+        }
+        if (collector.hasErrors()) {
+          throw new ClassTransformationException(String.format(
+              "Failure while trying to materialize incoming right field.  
Errors:\n %s.", collector.toErrorString()));
+        }
 
-      LogicalExpression fh = 
FunctionGenerationHelper.getComparator(compareLeftExprHolder,
-        compareRightExprHolder,
-        context.getFunctionRegistry());
-      HoldingContainer out = cg.addExpr(fh, false);
-
-      // If not 0, it means not equal. We return this out value.
-      // Null compares to Null should returns null (unknown). In such case, we 
return 1 to indicate they are not equal.
-      if (compareLeftExprHolder.isOptional() && 
compareRightExprHolder.isOptional()) {
-        JConditional jc = 
cg.getEvalBlock()._if(compareLeftExprHolder.getIsSet().eq(JExpr.lit(0)).
-                                    
cand(compareRightExprHolder.getIsSet().eq(JExpr.lit(0))));
-        jc._then()._return(JExpr.lit(1));
-        
jc._elseif(out.getValue().ne(JExpr.lit(0)))._then()._return(out.getValue());
-      } else {
-        
cg.getEvalBlock()._if(out.getValue().ne(JExpr.lit(0)))._then()._return(out.getValue());
+        // generate compare()
+        ////////////////////////
+        cg.setMappingSet(compareMapping);
+        cg.getSetupBlock().assign(JExpr._this().ref(incomingRecordBatch), 
JExpr._this().ref(incomingLeftRecordBatch));
+        ClassGenerator.HoldingContainer compareLeftExprHolder = 
cg.addExpr(materializedLeftExpr, false);
+
+        cg.setMappingSet(compareRightMapping);
+        cg.getSetupBlock().assign(JExpr._this().ref(incomingRecordBatch), 
JExpr._this().ref(incomingRightRecordBatch));
+        ClassGenerator.HoldingContainer compareRightExprHolder = 
cg.addExpr(materializedRightExpr, false);
+
+        LogicalExpression fh = 
FunctionGenerationHelper.getComparator(compareLeftExprHolder,
+          compareRightExprHolder,
+          context.getFunctionRegistry());
+        HoldingContainer out = cg.addExpr(fh, false);
+
+        // If not 0, it means not equal. We return this out value.
+        // Null compares to Null should returns null (unknown). In such case, 
we return 1 to indicate they are not equal.
+        if (compareLeftExprHolder.isOptional() && 
compareRightExprHolder.isOptional()) {
+          JConditional jc = 
cg.getEvalBlock()._if(compareLeftExprHolder.getIsSet().eq(JExpr.lit(0)).
+                                      
cand(compareRightExprHolder.getIsSet().eq(JExpr.lit(0))));
+          jc._then()._return(JExpr.lit(1));
+          
jc._elseif(out.getValue().ne(JExpr.lit(0)))._then()._return(out.getValue());
+        } else {
+          
cg.getEvalBlock()._if(out.getValue().ne(JExpr.lit(0)))._then()._return(out.getValue());
+        }
       }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/63d3008e/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
index 8ffd7be..02e1a92 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
@@ -42,7 +42,6 @@ public class LimitRecordBatch extends 
AbstractSingleRecordBatch<Limit> {
   private int recordsLeft;
   private boolean noEndLimit;
   private boolean skipBatch;
-  private boolean first = true;
   private boolean done = false;
   List<TransferPair> transfers = Lists.newArrayList();
 
@@ -58,14 +57,13 @@ public class LimitRecordBatch extends 
AbstractSingleRecordBatch<Limit> {
   }
 
   @Override
-  protected void setupNewSchema() throws SchemaChangeException {
-    container.clear();
+  protected boolean setupNewSchema() throws SchemaChangeException {
+    container.zeroVectors();
     transfers.clear();
 
 
     for(VectorWrapper<?> v : incoming){
-      TransferPair pair = v.getValueVector().getTransferPair();
-      container.add(pair.getTo());
+      TransferPair pair = 
v.getValueVector().makeTransferPair(container.addOrGet(v.getField()));
       transfers.add(pair);
     }
 
@@ -81,8 +79,12 @@ public class LimitRecordBatch extends 
AbstractSingleRecordBatch<Limit> {
         throw new UnsupportedOperationException();
     }
 
-    container.buildSchema(BatchSchema.SelectionVectorMode.TWO_BYTE);
+    if (container.isSchemaChanged()) {
+      container.buildSchema(BatchSchema.SelectionVectorMode.TWO_BYTE);
+      return true;
+    }
 
+    return false;
   }
 
   @Override
@@ -92,10 +94,6 @@ public class LimitRecordBatch extends 
AbstractSingleRecordBatch<Limit> {
     }
 
     if(!noEndLimit && recordsLeft <= 0) {
-      if (first) {
-        return produceEmptyFirstBatch();
-      }
-
       incoming.kill(true);
 
       IterOutcome upStream = incoming.next();
@@ -109,11 +107,8 @@ public class LimitRecordBatch extends 
AbstractSingleRecordBatch<Limit> {
         upStream = incoming.next();
       }
 
-      first = false;
       return IterOutcome.NONE;
     }
-
-    first = false;
     return super.innerNext();
   }
 
@@ -144,23 +139,6 @@ public class LimitRecordBatch extends 
AbstractSingleRecordBatch<Limit> {
     return IterOutcome.OK;
   }
 
-  private IterOutcome produceEmptyFirstBatch() {
-    incoming.next();
-    first = false;
-    done = true;
-    // Build the container schema and set the count
-    for (VectorWrapper<?> v : incoming) {
-      TransferPair pair = v.getValueVector().getTransferPair();
-      container.add(pair.getTo());
-      transfers.add(pair);
-    }
-    container.buildSchema(BatchSchema.SelectionVectorMode.TWO_BYTE);
-    container.setRecordCount(0);
-
-    incoming.kill(true);
-    return IterOutcome.OK_NEW_SCHEMA;
-  }
-
   private void limitWithNoSV(int recordCount) {
     int offset = Math.max(0, Math.min(recordCount - 1, recordsToSkip));
     recordsToSkip -= offset;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/63d3008e/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
index ed49cf1..8da8f96 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
@@ -50,10 +50,12 @@ import 
org.apache.drill.exec.proto.BitControl.FinishedReceiver;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
 import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.proto.UserBitShared.SerializedField;
 import org.apache.drill.exec.record.AbstractRecordBatch;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 import org.apache.drill.exec.record.ExpandableHyperContainer;
+import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.RawFragmentBatch;
 import org.apache.drill.exec.record.RawFragmentBatchProvider;
 import org.apache.drill.exec.record.RecordBatch;
@@ -125,7 +127,7 @@ public class MergingRecordBatch extends 
AbstractRecordBatch<MergingReceiverPOP>
     //super(config, context);
     this.fragProviders = fragProviders;
     this.context = context;
-    this.outgoingContainer = new VectorContainer();
+    this.outgoingContainer = new VectorContainer(oContext);
     this.stats.setLongStat(Metric.NUM_SENDERS, config.getNumSenders());
     this.config = config;
   }
@@ -212,23 +214,6 @@ public class MergingRecordBatch extends 
AbstractRecordBatch<MergingReceiverPOP>
 
       // allocate the incoming record batch loaders
       senderCount = rawBatches.size();
-      if (senderCount == 0) {
-        if (firstBatch) {
-          RecordBatchLoader loader = new 
RecordBatchLoader(oContext.getAllocator());
-          try {
-            loader.load(emptyBatch.getHeader().getDef(), emptyBatch.getBody());
-          } catch (SchemaChangeException e) {
-            throw new RuntimeException(e);
-          }
-          for (VectorWrapper w : loader) {
-            outgoingContainer.add(w.getValueVector());
-          }
-          outgoingContainer.buildSchema(SelectionVectorMode.NONE);
-          done = true;
-          return IterOutcome.OK_NEW_SCHEMA;
-        }
-        return IterOutcome.NONE;
-      }
       incomingBatches = new RawFragmentBatch[senderCount];
       batchOffsets = new int[senderCount];
       batchLoaders = new RecordBatchLoader[senderCount];
@@ -274,9 +259,8 @@ public class MergingRecordBatch extends 
AbstractRecordBatch<MergingReceiverPOP>
         bldr.addField(v.getField());
 
         // allocate a new value vector
-        ValueVector outgoingVector = TypeHelper.getNewVector(v.getField(), 
oContext.getAllocator());
+        ValueVector outgoingVector = outgoingContainer.addOrGet(v.getField());
         outgoingVector.allocateNew();
-        outgoingContainer.add(outgoingVector);
         ++vectorCount;
       }
 
@@ -446,6 +430,24 @@ public class MergingRecordBatch extends 
AbstractRecordBatch<MergingReceiverPOP>
   }
 
   @Override
+  public IterOutcome buildSchema() throws SchemaChangeException {
+    stats.startProcessing();
+    try {
+      RawFragmentBatch batch = getNext(fragProviders[0]);
+      for (SerializedField field : batch.getHeader().getDef().getFieldList()) {
+        outgoingContainer.addOrGet(MaterializedField.create(field));
+      }
+    } catch (IOException e) {
+      throw new SchemaChangeException(e);
+    } finally {
+      stats.stopProcessing();
+    }
+    outgoingContainer = VectorContainer.canonicalize(outgoingContainer);
+    outgoingContainer.buildSchema(SelectionVectorMode.NONE);
+    return IterOutcome.OK_NEW_SCHEMA;
+  }
+
+  @Override
   public int getRecordCount() {
     return outgoingPosition;
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/63d3008e/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
index 2c3e85a..9e3cfe5 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
@@ -67,6 +67,7 @@ public class PartitionSenderRootExec extends BaseRootExec {
   private final AtomicIntegerArray remainingReceivers;
   private final AtomicInteger remaingReceiverCount;
   private volatile boolean done = false;
+  private boolean first = true;
 
   long minReceiverRecordCount = Long.MAX_VALUE;
   long maxReceiverRecordCount = Long.MIN_VALUE;
@@ -110,6 +111,22 @@ public class PartitionSenderRootExec extends BaseRootExec {
   }
 
   @Override
+  public void buildSchema() throws SchemaChangeException {
+    incoming.buildSchema();
+    stats.startProcessing();
+    try {
+      createPartitioner();
+      try {
+        partitioner.flushOutgoingBatches(false, true);
+      } catch (IOException e) {
+        throw new SchemaChangeException(e);
+      }
+    } finally {
+      stats.stopProcessing();
+    }
+  }
+
+  @Override
   public boolean innerNext() {
     boolean newSchema = false;
 
@@ -128,6 +145,10 @@ public class PartitionSenderRootExec extends BaseRootExec {
     }
 
     logger.debug("Partitioner.next(): got next record batch with status {}", 
out);
+    if (first && out == IterOutcome.OK) {
+      first = false;
+      out = IterOutcome.OK_NEW_SCHEMA;
+    }
     switch(out){
       case NONE:
         try {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/63d3008e/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
index 338a704..5224f75 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
@@ -212,6 +212,7 @@ public abstract class PartitionerTemplate implements 
Partitioner {
     private final int oppositeMinorFragmentId;
 
     private boolean isLast = false;
+    private boolean isFirst = true;
     private volatile boolean terminated = false;
     private boolean dropAll = false;
     private BatchSchema outSchema;
@@ -289,9 +290,9 @@ public abstract class PartitionerTemplate implements 
Partitioner {
         this.sendCount.increment();
       } else {
         logger.debug("Flush requested on an empty outgoing record batch" + 
(isLast ? " (last batch)" : ""));
-        if (isLast || terminated) {
+        if (isFirst || isLast || terminated) {
           // send final (empty) batch
-          FragmentWritableBatch writableBatch = new FragmentWritableBatch(true,
+          FragmentWritableBatch writableBatch = new 
FragmentWritableBatch(isLast || terminated,
                   handle.getQueryId(),
                   handle.getMajorFragmentId(),
                   handle.getMinorFragmentId(),
@@ -306,7 +307,12 @@ public abstract class PartitionerTemplate implements 
Partitioner {
           }
           this.sendCount.increment();
           vectorContainer.zeroVectors();
-          dropAll = true;
+          if (!isFirst) {
+            dropAll = true;
+          }
+          if (isFirst) {
+            isFirst = !isFirst;
+          }
           return;
         }
       }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/63d3008e/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
index 7f3a966..fd2878f 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
@@ -22,6 +22,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.LinkedBlockingDeque;
 
 import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.memory.OutOfMemoryException;
 import org.apache.drill.exec.ops.FragmentContext;
@@ -56,6 +57,30 @@ public class ProducerConsumerBatch extends 
AbstractRecordBatch {
   }
 
   @Override
+  public IterOutcome buildSchema() throws SchemaChangeException {
+    stats.startProcessing();
+    try {
+      stats.stopProcessing();
+      try {
+        incoming.buildSchema();
+      } finally {
+        stats.startProcessing();
+      }
+      stats.startSetup();
+      try {
+        for (VectorWrapper w : incoming) {
+          container.addOrGet(w.getField());
+        }
+      } finally {
+        stats.stopSetup();
+      }
+    } finally {
+      stats.stopProcessing();
+    }
+    return IterOutcome.OK_NEW_SCHEMA;
+  }
+
+  @Override
   public IterOutcome innerNext() {
     if (!running) {
       producer.start();

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/63d3008e/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
index 224753e..486fb12 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
@@ -62,6 +62,7 @@ import org.apache.drill.exec.record.TypedFieldId;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.complex.MapVector;
 import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter;
 
 import com.carrotsearch.hppc.IntOpenHashSet;
@@ -79,6 +80,7 @@ public class ProjectRecordBatch extends 
AbstractSingleRecordBatch<Project> {
   private boolean hasRemainder = false;
   private int remainderIndex = 0;
   private int recordCount;
+  private boolean buildingSchema = true;
 
   private static final String EMPTY_STRING = "";
 
@@ -137,6 +139,8 @@ public class ProjectRecordBatch extends 
AbstractSingleRecordBatch<Project> {
 //    VectorUtil.showVectorAccessibleContent(incoming, ",");
     int incomingRecordCount = incoming.getRecordCount();
 
+    container.zeroVectors();
+
     if (!doAlloc()) {
       outOfMemory = true;
       return IterOutcome.OUT_OF_MEMORY;
@@ -261,9 +265,25 @@ public class ProjectRecordBatch extends 
AbstractSingleRecordBatch<Project> {
   }
 
   @Override
-  protected void setupNewSchema() throws SchemaChangeException {
+  public IterOutcome buildSchema() throws SchemaChangeException {
+    incoming.buildSchema();
+    setupNewSchema();
+    return IterOutcome.OK_NEW_SCHEMA;
+  }
+
+  @Override
+  protected boolean setupNewSchema() throws SchemaChangeException {
+    if (allocationVectors != null) {
+      for (ValueVector v : allocationVectors) {
+        v.clear();
+      }
+    }
     this.allocationVectors = Lists.newArrayList();
-    container.clear();
+    if (complexWriters != null) {
+      container.clear();
+    } else {
+      container.zeroVectors();
+    }
     final List<NamedExpression> exprs = getExpressionList();
     final ErrorCollector collector = new ErrorCollectorImpl();
     final List<TransferPair> transfers = Lists.newArrayList();
@@ -300,9 +320,9 @@ public class ProjectRecordBatch extends 
AbstractSingleRecordBatch<Project> {
                 continue;
               }
               FieldReference ref = new FieldReference(name);
-              TransferPair tp = wrapper.getValueVector().getTransferPair(ref);
+              ValueVector vvOut = 
container.addOrGet(MaterializedField.create(ref, vvIn.getField().getType()));
+              TransferPair tp = vvIn.makeTransferPair(vvOut);
               transfers.add(tp);
-              container.add(tp.getTo());
             }
           } else if (value != null && value.intValue() > 1) { // subsequent 
wildcards should do a copy of incoming valuevectors
             int k = 0;
@@ -323,9 +343,9 @@ public class ProjectRecordBatch extends 
AbstractSingleRecordBatch<Project> {
               }
 
               MaterializedField outputField = MaterializedField.create(name, 
expr.getMajorType());
-              ValueVector vv = TypeHelper.getNewVector(outputField, 
oContext.getAllocator());
+              ValueVector vv = container.addOrGet(outputField);
               allocationVectors.add(vv);
-              TypedFieldId fid = container.add(vv);
+              TypedFieldId fid = 
container.getValueVectorId(outputField.getPath());
               ValueVectorWriteExpression write = new 
ValueVectorWriteExpression(fid, expr, true);
               HoldingContainer hc = cg.addExpr(write);
 
@@ -363,9 +383,10 @@ public class ProjectRecordBatch extends 
AbstractSingleRecordBatch<Project> {
         ValueVector vvIn = 
incoming.getValueAccessorById(id.getIntermediateClass(), 
id.getFieldIds()).getValueVector();
         Preconditions.checkNotNull(incoming);
 
-        TransferPair tp = vvIn.getTransferPair(getRef(namedExpression));
+        FieldReference ref = getRef(namedExpression);
+        ValueVector vvOut = container.addOrGet(MaterializedField.create(ref, 
vectorRead.getMajorType()));
+        TransferPair tp = vvIn.makeTransferPair(vvOut);
         transfers.add(tp);
-        container.add(tp.getTo());
         transferFieldIds.add(vectorRead.getFieldId().getFieldIds()[0]);
         logger.debug("Added transfer for project expression.");
       } else if (expr instanceof DrillFuncHolderExpr &&
@@ -379,11 +400,16 @@ public class ProjectRecordBatch extends 
AbstractSingleRecordBatch<Project> {
         // The reference name will be passed to ComplexWriter, used as the 
name of the output vector from the writer.
         ((DrillComplexWriterFuncHolder) ((DrillFuncHolderExpr) 
expr).getHolder()).setReference(namedExpression.getRef());
         cg.addExpr(expr);
+        if (buildingSchema) {
+          buildingSchema = false;
+          MaterializedField f = 
MaterializedField.create(outputField.getPath().getAsUnescapedPath(), 
Types.required(MinorType.MAP));
+          container.addOrGet(f);
+        }
       } else{
         // need to do evaluation.
-        ValueVector vector = TypeHelper.getNewVector(outputField, 
oContext.getAllocator());
+        ValueVector vector = container.addOrGet(outputField);
         allocationVectors.add(vector);
-        TypedFieldId fid = container.add(vector);
+        TypedFieldId fid = container.getValueVectorId(outputField.getPath());
         ValueVectorWriteExpression write = new ValueVectorWriteExpression(fid, 
expr, true);
         HoldingContainer hc = cg.addExpr(write);
 
@@ -395,7 +421,6 @@ public class ProjectRecordBatch extends 
AbstractSingleRecordBatch<Project> {
     cg.rotateBlock();
     cg.getEvalBlock()._return(JExpr.TRUE);
 
-    container.buildSchema(SelectionVectorMode.NONE);
 
     try {
       this.projector = context.getImplementationClass(cg.getCodeGenerator());
@@ -403,6 +428,12 @@ public class ProjectRecordBatch extends 
AbstractSingleRecordBatch<Project> {
     } catch (ClassTransformationException | IOException e) {
       throw new SchemaChangeException("Failure while attempting to load 
generated class", e);
     }
+    if (container.isSchemaChanged()) {
+      container.buildSchema(SelectionVectorMode.NONE);
+      return true;
+    } else {
+      return false;
+    }
   }
 
   private List<NamedExpression> getExpressionList() {

Reply via email to