This is an automated email from the ASF dual-hosted git repository.

arina pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit 02306277bfacaece652ba76025a6f82def9570b6
Author: Paul Rogers <par0...@yahoo.com>
AuthorDate: Sun Nov 10 16:17:49 2019 -0800

    DRILL-7445: Create batch copier based on result set framework
    
    The result set framework now provides both a reader and writer.
    This PR provides a copier that copies batches using this
    framework. Such a copier can:
    
    - Copy selected records
    - Copy all records, such as for an SV2 or SV4
    
    The copier uses the result set loader to create uniformly-sized
    output batches from input batches of any size. It does this
    by merging or splitting input batches as needed.
    
    Since the result set reader handles both SV2 and SV4s, the
    copier can filter or reorder rows based on the SV associated
    with the input batch.
    
    This version assumes single stream of input batches, and handles
    any schema changes in that input by creating output batches
    that track the input schema. This would be used in, say, the
    selection vector remover. A different design is needed for merging
    such as in the merging receiver.
    
    Adds a "copy" method to the column writers. Copy is implemented
    by doing a direct memory copy from source to destination vectors.
    
    A unit test verifies functionality for various use cases
    and data types.
    
    closes #1899
---
 .../java/org/apache/drill/common/types/Types.java  |  32 +-
 .../physical/impl/aggregate/BatchIterator.java     |   2 -
 .../impl/protocol/IndirectContainerAccessor.java   | 100 ++++
 .../impl/protocol/VectorContainerAccessor.java     |  41 +-
 .../exec/physical/resultSet/ResultSetCopier.java   | 189 ++++++
 .../exec/physical/resultSet/ResultSetReader.java   |   7 +
 .../resultSet/impl/ResultSetCopierImpl.java        | 313 ++++++++++
 .../resultSet/impl/ResultSetLoaderImpl.java        |   1 +
 .../resultSet/impl/ResultSetReaderImpl.java        |   3 +
 .../exec/physical/rowSet/RowSetFormatter.java      |  13 +-
 .../exec/record/selection/SelectionVector2.java    |  13 +-
 .../selection/SelectionVector2Builder.java}        |  31 +-
 .../impl/protocol/TestOperatorRecordBatch.java     |   9 +-
 .../resultSet/impl/TestResultSetCopier.java        | 663 +++++++++++++++++++++
 .../resultSet/impl/TestResultSetReader.java        |  13 +-
 .../org/apache/drill/test/BaseDirTestWatcher.java  |  12 +-
 .../java/org/apache/drill/test/ClusterFixture.java |  59 +-
 .../main/codegen/templates/ColumnAccessors.java    |  21 +-
 .../drill/exec/record/MaterializedField.java       |   1 +
 .../drill/exec/vector/accessor/ColumnWriter.java   |   9 +
 .../drill/exec/vector/accessor/ObjectReader.java   |   7 +
 .../drill/exec/vector/accessor/ObjectWriter.java   |  11 +-
 .../accessor/convert/AbstractWriteConverter.java   |   6 +
 .../accessor/reader/AbstractObjectReader.java      |   1 +
 .../accessor/reader/AbstractTupleReader.java       |  16 +-
 .../vector/accessor/reader/ArrayReaderImpl.java    |   1 -
 .../vector/accessor/reader/BaseScalarReader.java   |  12 +
 .../vector/accessor/reader/OffsetVectorReader.java |   1 +
 .../vector/accessor/reader/UnionReaderImpl.java    |  12 +-
 .../accessor/writer/AbstractArrayWriter.java       |  14 +
 .../accessor/writer/AbstractObjectWriter.java      |  11 +-
 .../accessor/writer/AbstractTupleWriter.java       |  22 +-
 .../vector/accessor/writer/BaseVarWidthWriter.java |   8 +-
 .../vector/accessor/writer/BitColumnWriter.java    |   8 +
 .../exec/vector/accessor/writer/MapWriter.java     |   9 +-
 .../accessor/writer/NullableScalarWriter.java      |   9 +
 .../accessor/writer/OffsetVectorWriterImpl.java    |   6 +
 .../vector/accessor/writer/UnionWriterImpl.java    |  13 +-
 .../accessor/writer/dummy/DummyArrayWriter.java    |   4 +
 .../accessor/writer/dummy/DummyScalarWriter.java   |   4 +
 40 files changed, 1570 insertions(+), 137 deletions(-)

diff --git a/common/src/main/java/org/apache/drill/common/types/Types.java 
b/common/src/main/java/org/apache/drill/common/types/Types.java
index 57a752e..5cdae56 100644
--- a/common/src/main/java/org/apache/drill/common/types/Types.java
+++ b/common/src/main/java/org/apache/drill/common/types/Types.java
@@ -806,16 +806,32 @@ public class Types {
     return typeBuilder;
   }
 
-  public static boolean isEquivalent(MajorType type1, MajorType type2) {
+  /**
+   * Check if two "core" types are the same, ignoring subtypes and
+   * children. Primarily for non-complex types.
+   *
+   * @param type1 first type
+   * @param type2 second type
+   * @return true if the two types are are the same minor type, mode,
+   * precision and scale
+   */
+
+  public static boolean isSameType(MajorType type1, MajorType type2) {
+    return type1.getMinorType() == type2.getMinorType() &&
+           type1.getMode() == type2.getMode() &&
+           type1.getScale() == type2.getScale() &&
+           type1.getPrecision() == type2.getPrecision();
+  }
 
-    // Requires full type equality, including fields such as precision and 
scale.
-    // But, unset fields are equivalent to 0. Can't use the protobuf-provided
-    // isEquals() which treats set and unset fields as different.
+  /**
+   * Requires full type equality, including fields such as precision and scale.
+   * But, unset fields are equivalent to 0. Can't use the protobuf-provided
+   * isEquals() which treats set and unset fields as different.
+   */
+
+  public static boolean isEquivalent(MajorType type1, MajorType type2) {
 
-    if (type1.getMinorType() != type2.getMinorType() ||
-        type1.getMode() != type2.getMode() ||
-        type1.getScale() != type2.getScale() ||
-        type1.getPrecision() != type2.getPrecision()) {
+    if (!isSameType(type1, type2)) {
       return false;
     }
 
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/BatchIterator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/BatchIterator.java
index 3c287c8..5b98293 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/BatchIterator.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/BatchIterator.java
@@ -20,7 +20,5 @@ package org.apache.drill.exec.physical.impl.aggregate;
 import org.apache.drill.exec.record.RecordBatch.IterOutcome;
 
 public interface BatchIterator {
-  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(BatchIterator.class);
-
   public IterOutcome next();
 }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/IndirectContainerAccessor.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/IndirectContainerAccessor.java
new file mode 100644
index 0000000..45f141d
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/IndirectContainerAccessor.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.protocol;
+
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.selection.SelectionVector2;
+import org.apache.drill.exec.record.selection.SelectionVector4;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+
+/**
+ * Extension of the container accessor that holds an optional selection
+ * vector, presenting the batch row count as the selection vector
+ * count.
+ */
+
+public class IndirectContainerAccessor extends VectorContainerAccessor {
+
+  private SelectionVector2 sv2;
+  private SelectionVector4 sv4;
+
+  /**
+   * Add a record batch, performing schema checks and picking out a
+   * selection vector, if provided.
+   *
+   * @param batch batch of records in record batch format
+   */
+
+  public void addBatch(RecordBatch batch) {
+    addBatch(batch.getContainer());
+    switch (container.getSchema().getSelectionVectorMode()) {
+    case TWO_BYTE:
+       setSelectionVector(batch.getSelectionVector2());
+       break;
+    case FOUR_BYTE:
+       setSelectionVector(batch.getSelectionVector4());
+       break;
+     default:
+       break;
+    }
+  }
+
+  public void setSelectionVector(SelectionVector2 sv2) {
+    Preconditions.checkState(sv4 == null);
+    this.sv2 = sv2;
+  }
+
+  public void setSelectionVector(SelectionVector4 sv4) {
+    Preconditions.checkState(sv2 == null);
+    this.sv4 = sv4;
+  }
+
+  @Override
+  public SelectionVector2 selectionVector2() {
+    return sv2;
+  }
+
+  @Override
+  public SelectionVector4 selectionVector4() {
+    return sv4;
+  }
+
+  @Override
+  public int rowCount() {
+    if (sv2 != null) {
+      return sv2.getCount();
+    } else if (sv4 != null) {
+      return sv4.getCount();
+    } else {
+      return super.rowCount();
+    }
+  }
+
+  @Override
+  public void release() {
+    super.release();
+    if (sv2 != null) {
+      sv2.clear();
+      sv2 = null;
+    }
+    if (sv4 != null) {
+      sv4.clear();
+      sv4 = null;
+    }
+  }
+}
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/VectorContainerAccessor.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/VectorContainerAccessor.java
index bb6670f..998be89 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/VectorContainerAccessor.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/VectorContainerAccessor.java
@@ -22,6 +22,7 @@ import java.util.Iterator;
 
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.TypedFieldId;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
@@ -29,34 +30,16 @@ import org.apache.drill.exec.record.WritableBatch;
 import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.record.selection.SelectionVector4;
 
-public class VectorContainerAccessor implements BatchAccessor {
-
-  public static class ContainerAndSv2Accessor extends VectorContainerAccessor {
-
-    private SelectionVector2 sv2;
-
-    public void setSelectionVector(SelectionVector2 sv2) {
-      this.sv2 = sv2;
-    }
-
-    @Override
-    public SelectionVector2 selectionVector2() {
-      return sv2;
-    }
-  }
-
-  public static class ContainerAndSv4Accessor extends VectorContainerAccessor {
-
-    private SelectionVector4 sv4;
+/**
+ * Wraps a vector container and optional selection vector in an interface
+ * simpler than the entire {@link RecordBatch}. This implementation hosts
+ * a container only.
+ */
 
-    @Override
-    public SelectionVector4 selectionVector4() {
-      return sv4;
-    }
-  }
+public class VectorContainerAccessor implements BatchAccessor {
 
-  private VectorContainer container;
-  private SchemaTracker schemaTracker = new SchemaTracker();
+  protected VectorContainer container;
+  private final SchemaTracker schemaTracker = new SchemaTracker();
   private int batchCount;
 
   /**
@@ -146,5 +129,9 @@ public class VectorContainerAccessor implements 
BatchAccessor {
   }
 
   @Override
-  public void release() { container.zeroVectors(); }
+  public void release() {
+    if (container != null) {
+      container.zeroVectors();
+    }
+  }
 }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/ResultSetCopier.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/ResultSetCopier.java
new file mode 100644
index 0000000..5d68daf
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/ResultSetCopier.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.resultSet;
+
+import org.apache.drill.exec.physical.impl.aggregate.BatchIterator;
+import org.apache.drill.exec.record.VectorContainer;
+
+/**
+ * Copies rows from an input batch to an output batch. The input
+ * batch is assumed to have a selection vector, or the caller
+ * will pick the rows to copy.
+ * <p>
+ * Works to create full output batches to minimize per-batch
+ * overhead and to eliminate unnecessary empty batches if no
+ * rows are copied.
+ * <p>
+ * The output batches are assumed to have the same schema as
+ * input batches. (No projection occurs.) The output schema will
+ * change each time the input schema changes. (For an SV4, then
+ * the upstream operator must have ensured all batches covered
+ * by the SV4 have the same schema.)
+ * <p>
+ * This implementation works with a single stream of batches which,
+ * following Drill's rules, must consist of the same set of vectors on
+ * each non-schema-change batch.
+ *
+ * <h4>Protocol</h4>
+ * Overall lifecycle:
+ * <ol>
+ * <li>Create an instance of the
+ *     {@link org.apache.drill.exec.physical.resultSet.impl.ResultSetCopierImpl
+ *      ResultSetCopierImpl} class, passing the input batch
+ *      accessor to the constructor.</li>
+ * <li>Loop to process each output batch as shown below. That is, continually
+ *     process calls to the {@link BatchIterator#next()} method.</li>
+ * <li>Call {@link #close()}.</li>
+ * </ol>
+ * <p>
+ *
+ * To build each output batch:
+ *
+ * <pre><code>
+ * public IterOutcome next() {
+ *   copier.startOutputBatch();
+ *   while (! copier.isFull() {
+ *     copier.freeInput();
+ *     IterOutcome innerResult = inner.next();
+ *     if (innerResult == DONE) { break; }
+ *     copier.startInputBatch();
+ *     copier.copyAllRows();
+ *   }
+ *   if (copier.hasRows()) {
+ *     outputContainer = copier.harvest();
+ *     return outputContainer.isSchemaChanged() ? OK_NEW_SCHEMA ? OK;
+ *   } else { return DONE; }
+ * }
+ * </code></pre>
+ * <p>
+ * The above assumes that the upstream operator can be polled
+ * multiple times in the DONE state. The extra polling is
+ * needed to handle any in-flight copies when the input
+ * exhausts its batches.
+ * <p>
+ * The above also shows that the copier handles and reports
+ * schema changes by setting the schema change flag in the
+ * output container. Real code must handle multiple calls to
+ * next() in the DONE state, and work around lack of such support
+ * in its input (perhaps by tracking a state.)
+ * <p>
+ * An input batch is processed by copying the rows. Copying can be done
+ * row-by row, via a row range, or by copying the entire input batch as
+ * shown in the example.
+ * Copying the entire batch make sense when the input batch carries as
+ * selection vector that identifies which rows to copy, in which
+ * order.
+ * <p>
+ * Because we wish to fill the output batch, we may be able to copy
+ * part of a batch, the whole batch, or multiple batches to the output.
+ */
+
+public interface ResultSetCopier {
+
+  /**
+   * Start the next output batch.
+   */
+  void startOutputBatch();
+
+  /**
+   * Start the next input batch. The input batch must be held
+   * by the VectorAccessor passed into the constructor.
+   */
+  void startInputBatch();
+
+  /**
+   * If copying rows one by one, copy the next row from the
+   * input.
+   *
+   * @return true if more rows remain on the input, false
+   * if all rows are exhausted
+   */
+  boolean copyNextRow();
+
+  /**
+   * Copy a row at the given position. For those cases in
+   * which random copying is needed, but a selection vector
+   * is not available. Note that this version is slow because
+   * of the need to reset indexes for every row. Better to
+   * use a selection vector, then copy sequentially.
+   *
+   * @param inputRowIndex the input row position. If a selection vector
+   * is attached, then this is the selection vector position
+   */
+  void copyRow(int inputRowIndex);
+
+  /**
+   * Copy all (remaining) input rows to the output.
+   * If insufficient space exists in the output, does a partial
+   * copy, and {@link #isCopyPending()} will return true.
+   */
+  void copyAllRows();
+
+  /**
+   * Release the input. Must be called (explicitly, or via
+   * {@link #copyInput()} before loading another input batch.
+   */
+  void releaseInputBatch();
+
+  /**
+   * Reports if the output batch has rows. Useful after the end
+   * of input to determine if a partial output batch exists to
+   * send downstream.
+   * @return true if the output batch has one or more rows
+   */
+  boolean hasOutputRows();
+
+  /**
+   * Reports if the output batch is full and must be sent
+   * downstream. The output batch can be full in the middle
+   * of a copy, in which case {@link #isCopyPending()} will
+   * also return true.
+   * <p>
+   * This function also returns true if a schema change
+   * occurred on the latest input row, in which case the
+   * partially-completed batch of the old schema must be
+   * flushed downstream.
+   *
+   * @return true if the output is full and must be harvested
+   * and sent downstream
+   */
+  boolean isOutputFull();
+
+  /**
+   * Helper method to determine if a copy is pending: more rows
+   * remain to be copied. If so, start a new output batch, which
+   * will finish the copy. Do that before start a new input
+   * batch.
+   * @return
+   */
+  boolean isCopyPending();
+
+  /**
+   * Obtain the output batch. Returned as a vector container
+   * since the output will not have a selection vector.
+   *
+   * @return a vector container holding the output batch
+   */
+  VectorContainer harvest();
+
+  /**
+   * Release resources, including any pending input batch
+   * and any non-harvested output batch.
+   */
+  void close();
+}
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/ResultSetReader.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/ResultSetReader.java
index ae282fd..45f3193 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/ResultSetReader.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/ResultSetReader.java
@@ -89,4 +89,11 @@ public interface ResultSetReader {
    * you want to preserve the batch memory.
    */
   void close();
+
+  /**
+   * Convenience method to access the input batch.
+   * @return the batch bound to the reader at construction
+   * time
+   */
+  BatchAccessor inputBatch();
 }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ResultSetCopierImpl.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ResultSetCopierImpl.java
new file mode 100644
index 0000000..bd7b6a0
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ResultSetCopierImpl.java
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.resultSet.impl;
+
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.physical.impl.protocol.BatchAccessor;
+import org.apache.drill.exec.physical.resultSet.ResultSetCopier;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.ResultSetReader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.physical.rowSet.RowSetReader;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.metadata.MetadataUtils;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.vector.accessor.ColumnReader;
+import org.apache.drill.exec.vector.accessor.ColumnWriter;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+
+public class ResultSetCopierImpl implements ResultSetCopier {
+
+  private enum State {
+    START,
+    NO_SCHEMA,
+    BETWEEN_BATCHES,
+    BATCH_ACTIVE,
+    NEW_SCHEMA,
+    SCHEMA_PENDING,
+    CLOSED
+  }
+
+  private class CopyAll {
+
+     public void copy() {
+      while (!rowWriter.isFull() && rowReader.next()) {
+        copyColumns();
+      }
+    }
+
+    public boolean hasMore() {
+      return rowReader.hasNext();
+    }
+  }
+
+  private static class CopyPair {
+    protected final ColumnWriter writer;
+    protected final ColumnReader reader;
+
+    protected CopyPair(ColumnWriter writer, ColumnReader reader) {
+      this.writer = writer;
+      this.reader = reader;
+    }
+  }
+
+  // Input state
+
+  private int currentSchemaVersion = -1;
+  private final ResultSetReader resultSetReader;
+  protected RowSetReader rowReader;
+
+  // Output state
+
+  private final BufferAllocator allocator;
+  private final OptionBuilder writerOptions;
+  private ResultSetLoader resultSetWriter;
+  private RowSetLoader rowWriter;
+
+  // Copy state
+
+  private State state;
+  private CopyPair[] projection;
+  private CopyAll activeCopy;
+
+  public ResultSetCopierImpl(BufferAllocator allocator, BatchAccessor 
inputBatch) {
+    this(allocator, inputBatch, new OptionBuilder());
+  }
+
+  public ResultSetCopierImpl(BufferAllocator allocator, BatchAccessor 
inputBatch,
+      OptionBuilder outputOptions) {
+    this.allocator = allocator;
+    resultSetReader = new ResultSetReaderImpl(inputBatch);
+    writerOptions = outputOptions;
+    writerOptions.setVectorCache(new ResultVectorCacheImpl(allocator));
+    state = State.START;
+  }
+
+  @Override
+  public void startOutputBatch() {
+    if (state == State.START) {
+
+      // No schema yet. Defer real batch start until we see an input
+      // batch.
+
+      state = State.NO_SCHEMA;
+      return;
+    }
+    Preconditions.checkState(state == State.BETWEEN_BATCHES || state == 
State.SCHEMA_PENDING);
+    if (state == State.SCHEMA_PENDING) {
+
+      // We have a pending new schema. Create new writers to match.
+
+      createProjection();
+    }
+    resultSetWriter.startBatch();
+    state = State.BATCH_ACTIVE;
+    if (isCopyPending()) {
+
+      // Resume copying if a copy is active.
+
+      copyBlock();
+    }
+  }
+
+  @Override
+  public void startInputBatch() {
+    Preconditions.checkState(state == State.NO_SCHEMA || state == 
State.NEW_SCHEMA ||
+                             state == State.BATCH_ACTIVE,
+        "Can only start input while in an output batch");
+    Preconditions.checkState(!isCopyPending(),
+        "Finish the pending copy before changing input");
+
+    bindInput();
+
+    if (state == State.BATCH_ACTIVE) {
+
+      // If no schema change, we are ready to copy.
+
+      if (currentSchemaVersion == 
resultSetReader.inputBatch().schemaVersion()) {
+        return;
+      }
+
+      // The schema has changed. Handle it now or later.
+
+      if (hasOutputRows()) {
+
+        // Output batch has rows. Can't switch and bind inputs
+        // until current batch is sent downstream.
+
+        state = State.NEW_SCHEMA;
+        return;
+      }
+    }
+
+    // The schema changed: first schema, or a change while a bath
+    // is active, but is empty.
+
+    if (state == State.NO_SCHEMA) {
+      state = State.BATCH_ACTIVE;
+    } else {
+
+      // Discard the unused empty batch
+
+      harvest().zeroVectors();
+    }
+    createProjection();
+    resultSetWriter.startBatch();
+
+    // Stay in the current state.
+  }
+
+  protected void bindInput() {
+    resultSetReader.start();
+    rowReader = resultSetReader.reader();
+  }
+
+  @Override
+  public void releaseInputBatch() {
+    Preconditions.checkState(state != State.CLOSED);
+    resultSetReader.release();
+  }
+
+  private void createProjection() {
+    if (resultSetWriter != null) {
+
+      // Need to build a new writer. Close this one. Doing so
+      // will tear down the whole show. But, the vector cache will
+      // ensure that the new writer reuses any matching vectors from
+      // the prior batch to provide vector persistence as Drill expects.
+
+      resultSetWriter.close();
+    }
+    TupleMetadata schema = 
MetadataUtils.fromFields(resultSetReader.inputBatch().schema());
+    writerOptions.setSchema(schema);
+    resultSetWriter = new ResultSetLoaderImpl(allocator, 
writerOptions.build());
+    rowWriter = resultSetWriter.writer();
+    currentSchemaVersion = resultSetReader.inputBatch().schemaVersion();
+
+    int colCount = schema.size();
+    projection = new CopyPair[colCount];
+    for (int i = 0; i < colCount; i++) {
+      projection[i] = new CopyPair(
+          rowWriter.column(i).writer(),
+          rowReader.column(i).reader());
+    }
+  }
+
+  @Override
+  public boolean hasOutputRows() {
+    switch (state) {
+    case BATCH_ACTIVE:
+    case NEW_SCHEMA:
+      return resultSetWriter.hasRows();
+    default:
+      return false;
+    }
+  }
+
+  @Override
+  public boolean isOutputFull() {
+    switch (state) {
+    case BATCH_ACTIVE:
+      return rowWriter.isFull();
+    case NEW_SCHEMA:
+      return true;
+    default:
+      return false;
+    }
+  }
+
+  protected void verifyWritable() {
+    Preconditions.checkState(state != State.NEW_SCHEMA,
+        "Must harvest current batch to flush for new schema.");
+    Preconditions.checkState(state == State.BATCH_ACTIVE,
+        "Start an output batch before copying");
+    Preconditions.checkState(!isCopyPending(),
+        "Resume the in-flight copy before copying another");
+    Preconditions.checkState(!rowWriter.isFull(),
+        "Output batch is full; harvest before adding more");
+  }
+
+  @Override
+  public boolean copyNextRow() {
+    verifyWritable();
+    if (!rowReader.next()) {
+      return false;
+    }
+    copyColumns();
+    return true;
+  }
+
+  @Override
+  public void copyRow(int posn) {
+    verifyWritable();
+    rowReader.setPosition(posn);
+    copyColumns();
+  }
+
+  private final void copyColumns() {
+    rowWriter.start();
+    for (CopyPair pair : projection) {
+      pair.writer.copy(pair.reader);
+    }
+    rowWriter.save();
+  }
+
+  @Override
+  public void copyAllRows() {
+    verifyWritable();
+    activeCopy = new CopyAll();
+    copyBlock();
+  }
+
+  private void copyBlock() {
+    activeCopy.copy();
+    if (! activeCopy.hasMore()) {
+      activeCopy = null;
+    }
+  }
+
+  @Override
+  public boolean isCopyPending() {
+    return activeCopy != null && activeCopy.hasMore();
+  }
+
+  @Override
+  public VectorContainer harvest() {
+    Preconditions.checkState(state == State.BATCH_ACTIVE || state == 
State.NEW_SCHEMA);
+    VectorContainer output = resultSetWriter.harvest();
+    state = (state == State.BATCH_ACTIVE)
+        ? State.BETWEEN_BATCHES : State.SCHEMA_PENDING;
+    return output;
+  }
+
+  @Override
+  public void close() {
+    if (state == State.CLOSED) {
+      return;
+    }
+    if (resultSetWriter != null) {
+      resultSetWriter.close();
+      resultSetWriter = null;
+      rowWriter = null;
+    }
+    resultSetReader.close();
+    rowReader = null;
+
+    state = State.CLOSED;
+  }
+}
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ResultSetLoaderImpl.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ResultSetLoaderImpl.java
index 07f919d..6c27706 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ResultSetLoaderImpl.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ResultSetLoaderImpl.java
@@ -455,6 +455,7 @@ public class ResultSetLoaderImpl implements 
ResultSetLoader, LoaderInternals {
     switch (state) {
     case ACTIVE:
     case HARVESTED:
+    case FULL_BATCH:
       return rootWriter.rowCount() > 0;
     case LOOK_AHEAD:
       return true;
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ResultSetReaderImpl.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ResultSetReaderImpl.java
index eea2d2d..6046c97 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ResultSetReaderImpl.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ResultSetReaderImpl.java
@@ -100,4 +100,7 @@ public class ResultSetReaderImpl implements ResultSetReader 
{
 
   @VisibleForTesting
   protected State state() { return state; }
+
+  @Override
+  public BatchAccessor inputBatch() { return batch; }
 }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/RowSetFormatter.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/RowSetFormatter.java
index 0817eda..4b734e6 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/RowSetFormatter.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/RowSetFormatter.java
@@ -17,16 +17,16 @@
  */
 package org.apache.drill.exec.physical.rowSet;
 
-import org.apache.drill.exec.physical.impl.protocol.BatchAccessor;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+
 import org.apache.commons.io.output.StringBuilderWriter;
 import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.exec.physical.impl.protocol.BatchAccessor;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
-import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.exec.record.VectorContainer;
-
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.io.Writer;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
 
 /**
  * Helper class to obtain string representation of RowSet.
@@ -82,6 +82,7 @@ public class RowSetFormatter {
         }
         writer.write("\n");
       }
+      writer.flush();
     } catch (IOException e) {
       throw new DrillRuntimeException("Error happened when writing rowSet to 
writer", e);
     }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java
index 859d492..30791e9 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java
@@ -35,6 +35,8 @@ import io.netty.buffer.DrillBuf;
 
 public class SelectionVector2 implements AutoCloseable {
 
+  public static final int RECORD_SIZE = 2;
+
   private final BufferAllocator allocator;
   // Indicates number of indexes stored in the SV2 buffer which may be less 
than actual number of rows stored in
   // RecordBatch container owning this SV2 instance
@@ -43,8 +45,6 @@ public class SelectionVector2 implements AutoCloseable {
   private int batchActualRecordCount = -1;
   private DrillBuf buffer = DeadBuf.DEAD_BUFFER;
 
-  public static final int RECORD_SIZE = 2;
-
   public SelectionVector2(BufferAllocator allocator) {
     this.allocator = allocator;
   }
@@ -79,7 +79,7 @@ public class SelectionVector2 implements AutoCloseable {
   }
 
   public DrillBuf getBuffer(boolean clear) {
-    DrillBuf bufferHandle = this.buffer;
+    DrillBuf bufferHandle = buffer;
 
     if (clear) {
       /* Increment the ref count for this buffer */
@@ -98,7 +98,7 @@ public class SelectionVector2 implements AutoCloseable {
     /* clear the existing buffer */
     clear();
 
-    this.buffer = bufferHandle;
+    buffer = bufferHandle;
     buffer.retain(1);
   }
 
@@ -106,10 +106,6 @@ public class SelectionVector2 implements AutoCloseable {
     return buffer.getChar(index * RECORD_SIZE);
   }
 
-  public void setIndex(int index, char value) {
-    buffer.setChar(index * RECORD_SIZE, value);
-  }
-
   public long getDataAddr() {
     return buffer.memoryAddress();
   }
@@ -158,7 +154,6 @@ public class SelectionVector2 implements AutoCloseable {
   }
 
   public void setRecordCount(int recordCount){
-//    logger.debug("Setting record count to {}", recordCount);
     this.recordCount = recordCount;
   }
 
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/BatchIterator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2Builder.java
similarity index 51%
copy from 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/BatchIterator.java
copy to 
exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2Builder.java
index 3c287c8..1b290e3 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/BatchIterator.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2Builder.java
@@ -15,12 +15,33 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.drill.exec.physical.impl.aggregate;
+package org.apache.drill.exec.record.selection;
 
-import org.apache.drill.exec.record.RecordBatch.IterOutcome;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.record.VectorContainer;
 
-public interface BatchIterator {
-  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(BatchIterator.class);
+public class SelectionVector2Builder {
 
-  public IterOutcome next();
+  private final SelectionVector2 sv2;
+  private int index;
+
+  public SelectionVector2Builder(BufferAllocator allocator, int maxSize) {
+    sv2 = new SelectionVector2(allocator);
+    sv2.allocateNew(maxSize);
+  }
+
+  public void setNext(int value) {
+    sv2.setIndex(index++, value);
+  }
+
+  public void set(int posn, int value) {
+    sv2.setIndex(posn, value);
+    index = Math.max(index, posn + 1);
+  }
+
+  public SelectionVector2 harvest(VectorContainer batch) {
+    sv2.setRecordCount(index);
+    sv2.setBatchActualRecordCount(batch.getRecordCount());
+    return sv2;
+  }
 }
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/protocol/TestOperatorRecordBatch.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/protocol/TestOperatorRecordBatch.java
index 9986609..e35f7f6 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/protocol/TestOperatorRecordBatch.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/protocol/TestOperatorRecordBatch.java
@@ -36,22 +36,21 @@ import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.config.Limit;
-import 
org.apache.drill.exec.physical.impl.protocol.VectorContainerAccessor.ContainerAndSv2Accessor;
+import org.apache.drill.exec.physical.rowSet.RowSet.SingleRowSet;
 import org.apache.drill.exec.proto.UserBitShared.NamePart;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.exec.record.BatchSchemaBuilder;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.RecordBatch.IterOutcome;
-import org.apache.drill.exec.record.BatchSchemaBuilder;
-import org.apache.drill.exec.record.metadata.SchemaBuilder;
 import org.apache.drill.exec.record.TypedFieldId;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.exec.vector.IntVector;
 import org.apache.drill.exec.vector.VarCharVector;
 import org.apache.drill.test.SubOperatorTest;
-import org.apache.drill.exec.physical.rowSet.RowSet.SingleRowSet;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
@@ -524,7 +523,7 @@ public class TestOperatorRecordBatch extends 
SubOperatorTest {
         .withSv2()
         .build();
 
-    ContainerAndSv2Accessor accessor = new ContainerAndSv2Accessor();
+    IndirectContainerAccessor accessor = new IndirectContainerAccessor();
     accessor.addBatch(rs.container());
     accessor.setSelectionVector(rs.getSv2());
 
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetCopier.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetCopier.java
new file mode 100644
index 0000000..19083aa
--- /dev/null
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetCopier.java
@@ -0,0 +1,663 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.resultSet.impl;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.impl.protocol.BatchAccessor;
+import org.apache.drill.exec.physical.impl.protocol.IndirectContainerAccessor;
+import org.apache.drill.exec.physical.impl.protocol.VectorContainerAccessor;
+import org.apache.drill.exec.physical.resultSet.ResultSetCopier;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import 
org.apache.drill.exec.physical.resultSet.impl.ResultSetLoaderImpl.ResultSetOptions;
+import org.apache.drill.exec.physical.rowSet.RowSet;
+import org.apache.drill.exec.physical.rowSet.RowSetBuilder;
+import org.apache.drill.exec.physical.rowSet.RowSets;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.metadata.MetadataUtils;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.record.selection.SelectionVector2Builder;
+import org.apache.drill.exec.vector.accessor.ArrayWriter;
+import org.apache.drill.exec.vector.accessor.TupleWriter;
+import org.apache.drill.test.SubOperatorTest;
+import org.apache.drill.test.rowSet.RowSetComparison;
+import org.apache.drill.test.rowSet.RowSetUtilities;
+import org.junit.Test;
+
+public class TestResultSetCopier extends SubOperatorTest {
+
+  private static final TupleMetadata TEST_SCHEMA =
+      new SchemaBuilder()
+        .add("id", MinorType.INT)
+        .add("name", MinorType.VARCHAR)
+        .build();
+
+  private static class BaseDataGen {
+    protected final TupleMetadata schema;
+    protected final ResultSetLoader rsLoader;
+    protected final VectorContainerAccessor batch = new 
VectorContainerAccessor();
+
+    public BaseDataGen(TupleMetadata schema) {
+      this.schema = schema;
+      ResultSetOptions options = new OptionBuilder()
+          .setSchema(schema)
+          .setVectorCache(new ResultVectorCacheImpl(fixture.allocator()))
+          .build();
+      rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
+    }
+
+    public TupleMetadata schema() { return schema; }
+
+    public BatchAccessor batchAccessor() {
+      return batch;
+    }
+  }
+
+  private static class DataGen extends BaseDataGen {
+
+    public DataGen() {
+      super(TEST_SCHEMA);
+    }
+
+    public void makeBatch(int start, int end) {
+      rsLoader.startBatch();
+      for (int i = start; i <= end; i++) {
+        rsLoader.writer().addRow(i, "Row " + i);
+      }
+      batch.addBatch(rsLoader.harvest());
+    }
+  }
+
+  public static class DataGen2 extends DataGen {
+    private final int batchCount = 2;
+    private final int batchSize = 5;
+    private int batchIndex;
+
+    boolean next() {
+      if (batchIndex >= batchCount) {
+        return false;
+      }
+      int start = nextRow();
+      makeBatch(start, start + batchSize - 1);
+      batchIndex++;
+      return true;
+    }
+
+    int nextRow() {
+      return batchIndex * batchSize + 1;
+    }
+
+    int targetRowCount( ) {
+      return batchCount * batchSize;
+    }
+  }
+
+  public static class SchemaChangeGen extends DataGen {
+    private int batchIndex;
+    public final int batchSize = 5;
+    private int schemaVersion = 1;
+
+    public void makeBatch2(int start, int end) {
+      rsLoader.startBatch();
+      for (int i = start; i <= end; i++) {
+        rsLoader.writer().addRow(i, "Row " + i, i * 10);
+      }
+      batch.addBatch(rsLoader.harvest());
+    }
+
+    public TupleMetadata schema2() {
+      return new SchemaBuilder()
+          .add("id", MinorType.INT)
+          .add("name", MinorType.VARCHAR)
+          .add("amount", MinorType.INT)
+          .build();
+    }
+
+    public void evolveSchema() {
+      rsLoader.writer().addColumn(MetadataUtils.newScalar("amount", 
MinorType.INT, DataMode.REQUIRED));
+      schemaVersion = 2;
+    }
+
+    public void nextBatch() {
+      int start = batchIndex * batchSize + 1;
+      int end = start + batchSize - 1;
+      if (schemaVersion == 1) {
+        makeBatch(start, end);
+      } else {
+        makeBatch2(start, end);
+      }
+      batchIndex++;
+    }
+  }
+
+  private static class NullableGen extends BaseDataGen {
+
+    public NullableGen() {
+      super(new SchemaBuilder()
+          .add("id", MinorType.INT)
+          .addNullable("name", MinorType.VARCHAR)
+          .addNullable("amount", MinorType.INT)
+          .build());
+    }
+
+    public void makeBatch(int start, int end) {
+      rsLoader.startBatch();
+      RowSetLoader writer = rsLoader.writer();
+      for (int i = start; i <= end; i++) {
+        writer.start();
+        writer.scalar(0).setInt(i);
+        if (i % 2 == 0) {
+          writer.scalar(1).setString("Row " + i);
+        }
+        if (i % 3 == 0) {
+          writer.scalar(2).setInt(i * 10);
+        }
+        writer.save();
+      }
+      batch.addBatch(rsLoader.harvest());
+    }
+  }
+
+  private static class ArrayGen extends BaseDataGen {
+
+    public ArrayGen() {
+      super(new SchemaBuilder()
+          .add("id", MinorType.INT)
+          .addArray("name", MinorType.VARCHAR)
+          .build());
+    }
+
+    public void makeBatch(int start, int end) {
+      rsLoader.startBatch();
+      RowSetLoader writer = rsLoader.writer();
+      ArrayWriter aw = writer.array(1);
+      for (int i = start; i <= end; i++) {
+        writer.start();
+        writer.scalar(0).setInt(i);
+        int n = i % 3;
+        for (int j = 0; j < n; j++) {
+          aw.scalar().setString("Row " + i + "." + j);
+        }
+        writer.save();
+      }
+      batch.addBatch(rsLoader.harvest());
+    }
+  }
+
+  private static class MapGen extends BaseDataGen {
+
+    public MapGen() {
+      super(new SchemaBuilder()
+          .add("id", MinorType.INT)
+          .addMapArray("map")
+            .add("name", MinorType.VARCHAR)
+            .add("amount", MinorType.INT)
+            .resumeSchema()
+          .build());
+    }
+
+    public void makeBatch(int start, int end) {
+      rsLoader.startBatch();
+      RowSetLoader writer = rsLoader.writer();
+      ArrayWriter aw = writer.array(1);
+      TupleWriter mw = aw.entry().tuple();
+      for (int i = start; i <= end; i++) {
+        writer.start();
+        writer.scalar(0).setInt(i);
+        int n = i % 3;
+        for (int j = 0; j < n; j++) {
+          mw.scalar(0).setString("Row " + i + "." + j);
+          mw.scalar(1).setInt(i * 100 + j);
+          aw.save();
+        }
+        writer.save();
+      }
+      batch.addBatch(rsLoader.harvest());
+    }
+  }
+
+  @Test
+  public void testBasics() {
+
+    DataGen dataGen = new DataGen();
+    ResultSetCopier copier = new ResultSetCopierImpl(
+        fixture.allocator(), dataGen.batchAccessor());
+
+    // Nothing should work yet
+
+    try {
+      copier.copyAllRows();
+      fail();
+    } catch (IllegalStateException e) {
+      // Expected
+    }
+    try {
+      copier.harvest();
+      fail();
+    } catch (IllegalStateException e) {
+      // Expected
+    }
+
+    // Predicates should work
+
+    assertFalse(copier.isCopyPending());
+    assertFalse(copier.hasOutputRows());
+    assertFalse(copier.isOutputFull());
+
+    // Define a schema and start an output batch.
+
+    copier.startOutputBatch();
+    assertFalse(copier.isCopyPending());
+    assertFalse(copier.hasOutputRows());
+    assertFalse(copier.isOutputFull());
+
+    // Provide an input row
+
+    dataGen.makeBatch(1, 3);
+    copier.startInputBatch();
+    assertFalse(copier.isCopyPending());
+    assertFalse(copier.hasOutputRows());
+    assertFalse(copier.isOutputFull());
+
+    // Now can do some actual copying
+
+    while (copier.copyNextRow()) {
+      // empty
+    }
+    assertFalse(copier.isCopyPending());
+    assertTrue(copier.hasOutputRows());
+    assertFalse(copier.isOutputFull());
+
+    // Get and verify the output batch
+    // (Does not free the input batch, we reuse it
+    // in the verify step below.)
+
+    RowSet result = fixture.wrap(copier.harvest());
+    new RowSetComparison(fixture.wrap(dataGen.batchAccessor().container()))
+      .verifyAndClear(result);
+
+    // Copier will release the input batch
+
+    copier.close();
+  }
+
+  @Test
+  public void testImmediateClose() {
+
+    DataGen dataGen = new DataGen();
+    ResultSetCopier copier = new ResultSetCopierImpl(
+        fixture.allocator(), dataGen.batchAccessor());
+
+    // Close OK before things got started
+
+    copier.close();
+
+    // Second close is benign
+
+    copier.close();
+  }
+
+  @Test
+  public void testCloseBeforeSchema() {
+
+    DataGen dataGen = new DataGen();
+    ResultSetCopier copier = new ResultSetCopierImpl(
+        fixture.allocator(), dataGen.batchAccessor());
+
+    // Start batch, no data yet.
+
+    copier.startOutputBatch();
+
+    // Close OK before things data arrives
+
+    copier.close();
+
+    // Second close is benign
+
+    copier.close();
+  }
+
+  @Test
+  public void testCloseWithData() {
+
+    DataGen dataGen = new DataGen();
+    ResultSetCopier copier = new ResultSetCopierImpl(
+        fixture.allocator(), dataGen.batchAccessor());
+
+    // Start batch, with data.
+
+    copier.startOutputBatch();
+    dataGen.makeBatch(1, 3);
+    copier.startInputBatch();
+    copier.copyNextRow();
+
+    // Close OK with input and output batch allocated.
+
+    copier.close();
+
+    // Second close is benign
+
+    copier.close();
+  }
+
+  /**
+   * Test merging multiple batches from the same input
+   * source; all batches share the same vectors, hence
+   * implicitly the same schema.
+   * <p>
+   * This copier does not support merging from multiple
+   * streams.
+   */
+
+  @Test
+  public void testMerge() {
+    DataGen dataGen = new DataGen();
+    ResultSetCopier copier = new ResultSetCopierImpl(
+        fixture.allocator(), dataGen.batchAccessor());
+    copier.startOutputBatch();
+
+    for (int i = 0; i < 5; i++) {
+      int start = i * 3 + 1;
+      dataGen.makeBatch(start, start + 2);
+      copier.startInputBatch();
+      assertFalse(copier.isOutputFull());
+      copier.copyAllRows();
+      copier.releaseInputBatch();
+      assertFalse(copier.isOutputFull());
+      assertFalse(copier.isCopyPending());
+    }
+    RowSet result = fixture.wrap(copier.harvest());
+    dataGen.makeBatch(1, 15);
+    RowSet expected = RowSets.wrap(dataGen.batchAccessor());
+    RowSetUtilities.verify(expected, result);
+
+    copier.close();
+  }
+
+  @Test
+  public void testMultiOutput() {
+    DataGen2 dataGen = new DataGen2();
+    DataGen validatorGen = new DataGen();
+
+    // Equivalent of operator start() method.
+
+    OptionBuilder options = new OptionBuilder()
+        .setRowCountLimit(12);
+    ResultSetCopier copier = new ResultSetCopierImpl(
+        fixture.allocator(), dataGen.batchAccessor(), options);
+
+    // Equivalent of an entire operator run
+
+    int start = 1;
+    for (;;) {
+
+      // Equivalent of operator next() method
+
+      copier.startOutputBatch();
+      while (! copier.isOutputFull()) {
+        copier.releaseInputBatch();
+        if (! dataGen.next()) {
+          break;
+        }
+        copier.startInputBatch();
+        copier.copyAllRows();
+      }
+      if (! copier.hasOutputRows()) {
+        break;
+      }
+
+      // Equivalent of sending downstream
+
+      RowSet result = fixture.wrap(copier.harvest());
+       int nextRow = dataGen.nextRow();
+      validatorGen.makeBatch(start, nextRow - 1);
+      RowSet expected = RowSets.wrap(validatorGen.batchAccessor());
+      RowSetUtilities.verify(expected, result);
+      start = nextRow;
+    }
+
+    // Ensure more than one output batch.
+
+    assertTrue(start > 1);
+
+    // Ensure all rows generated.
+
+    assertEquals(dataGen.targetRowCount(), start - 1);
+
+    // Simulate operator close();
+
+    copier.close();
+  }
+
+  @Test
+  public void testCopyRecord() {
+    DataGen dataGen = new DataGen();
+    ResultSetCopier copier = new ResultSetCopierImpl(
+        fixture.allocator(), dataGen.batchAccessor());
+    copier.startOutputBatch();
+
+    dataGen.makeBatch(1, 3);
+    copier.startInputBatch();
+    copier.copyRow(2);
+    copier.copyRow(0);
+    copier.copyRow(1);
+    copier.releaseInputBatch();
+
+    dataGen.makeBatch(4, 6);
+    copier.startInputBatch();
+    copier.copyRow(1);
+    copier.copyRow(0);
+    copier.copyRow(2);
+    copier.releaseInputBatch();
+
+    RowSet expected = new RowSetBuilder(fixture.allocator(), dataGen.schema())
+        .addRow(3, "Row 3")
+        .addRow(1, "Row 1")
+        .addRow(2, "Row 2")
+        .addRow(5, "Row 5")
+        .addRow(4, "Row 4")
+        .addRow(6, "Row 6")
+        .build();
+    RowSet result = fixture.wrap(copier.harvest());
+    RowSetUtilities.verify(expected, result);
+
+    copier.close();
+  }
+
+  @Test
+  public void testSchemaChange() {
+    SchemaChangeGen dataGen = new SchemaChangeGen();
+    SchemaChangeGen verifierGen = new SchemaChangeGen();
+    ResultSetCopier copier = new ResultSetCopierImpl(
+        fixture.allocator(), dataGen.batchAccessor());
+
+    // Copy first batch with first schema
+
+    copier.startOutputBatch();
+    dataGen.nextBatch();
+    copier.startInputBatch();
+    copier.copyAllRows();
+    assertFalse(copier.isOutputFull());
+
+    // Second, same schema
+
+    copier.releaseInputBatch();
+    dataGen.nextBatch();
+    copier.startInputBatch();
+    copier.copyAllRows();
+    assertFalse(copier.isOutputFull());
+
+    // Plenty of room. But, change the schema.
+
+    copier.releaseInputBatch();
+    dataGen.evolveSchema();
+    dataGen.nextBatch();
+    copier.startInputBatch();
+    assertTrue(copier.isOutputFull());
+
+    // Must harvest partial output
+
+    RowSet result = fixture.wrap(copier.harvest());
+    verifierGen.makeBatch(1, 2 * dataGen.batchSize - 1);
+    RowSet expected = RowSets.wrap(verifierGen.batchAccessor());
+    RowSetUtilities.verify(expected, result);
+
+    // Start a new batch, implicitly complete pending copy
+
+    copier.startOutputBatch();
+    copier.copyAllRows();
+
+    // Add one more of second schema
+
+    copier.releaseInputBatch();
+    dataGen.nextBatch();
+    copier.startInputBatch();
+    copier.copyAllRows();
+    assertFalse(copier.isOutputFull());
+
+    result = fixture.wrap(copier.harvest());
+    verifierGen.evolveSchema();
+    verifierGen.makeBatch2(2 * dataGen.batchSize + 1, 4 * dataGen.batchSize - 
1);
+    expected = RowSets.wrap(verifierGen.batchAccessor());
+    RowSetUtilities.verify(expected, result);
+    assertFalse(copier.isCopyPending());
+
+    copier.close();
+  }
+
+  // TODO: Test with two consecutive schema changes in
+  // same input batch: once with rows pending, another without.
+
+  @Test
+  public void testSV2() {
+    DataGen dataGen = new DataGen();
+    IndirectContainerAccessor filtered = new IndirectContainerAccessor();
+    ResultSetCopier copier = new ResultSetCopierImpl(
+        fixture.allocator(), filtered);
+
+    copier.startOutputBatch();
+    dataGen.makeBatch(1, 10);
+
+    // Pick out every other record, in descending
+    // order.
+
+    VectorContainer container = dataGen.batchAccessor().container();
+    SelectionVector2Builder sv2Builder =
+        new SelectionVector2Builder(fixture.allocator(), 
container.getRecordCount());
+    for (int i = 0; i < 5; i++) {
+      sv2Builder.setNext(10 - 2 * i - 1);
+    }
+    container.buildSchema(SelectionVectorMode.TWO_BYTE);
+    filtered.addBatch(container);
+    filtered.setSelectionVector(sv2Builder.harvest(container));
+    assertEquals(5, filtered.rowCount());
+
+    copier.startInputBatch();
+    copier.copyAllRows();
+    copier.releaseInputBatch();
+
+    RowSet expected = new RowSetBuilder(fixture.allocator(), TEST_SCHEMA)
+        .addRow(10, "Row 10")
+        .addRow(8, "Row 8")
+        .addRow(6, "Row 6")
+        .addRow(4, "Row 4")
+        .addRow(2, "Row 2")
+        .build();
+    RowSet result = fixture.wrap(copier.harvest());
+    RowSetUtilities.verify(expected, result);
+
+    copier.close();
+  }
+
+  @Test
+  public void testSV4() {
+    // TODO
+  }
+
+  @Test
+  public void testNullable() {
+    NullableGen dataGen = new NullableGen();
+    ResultSetCopier copier = new ResultSetCopierImpl(
+        fixture.allocator(), dataGen.batchAccessor());
+    copier.startOutputBatch();
+
+    dataGen.makeBatch(1, 10);
+    copier.startInputBatch();
+    copier.copyAllRows();
+
+    RowSet result = fixture.wrap(copier.harvest());
+    RowSet expected = RowSets.wrap(dataGen.batchAccessor());
+    RowSetUtilities.verify(expected, result);
+
+    copier.close();
+  }
+
+  @Test
+  public void testArrays() {
+    ArrayGen dataGen = new ArrayGen();
+    ResultSetCopier copier = new ResultSetCopierImpl(
+        fixture.allocator(), dataGen.batchAccessor());
+    copier.startOutputBatch();
+
+    dataGen.makeBatch(1, 5);
+    copier.startInputBatch();
+    copier.copyAllRows();
+
+    RowSet result = fixture.wrap(copier.harvest());
+    RowSet expected = RowSets.wrap(dataGen.batchAccessor());
+    RowSetUtilities.verify(expected, result);
+
+    copier.close();
+  }
+
+  @Test
+  public void testMaps() {
+    MapGen dataGen = new MapGen();
+    ResultSetCopier copier = new ResultSetCopierImpl(
+        fixture.allocator(), dataGen.batchAccessor());
+    copier.startOutputBatch();
+
+    dataGen.makeBatch(1, 5);
+    copier.startInputBatch();
+    copier.copyAllRows();
+
+    RowSet result = fixture.wrap(copier.harvest());
+    RowSet expected = RowSets.wrap(dataGen.batchAccessor());
+    RowSetUtilities.verify(expected, result);
+
+    copier.close();
+  }
+
+  @Test
+  public void testUnions() {
+    // TODO
+  }
+
+  @Test
+  public void testOverflow() {
+    // TODO
+  }
+}
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetReader.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetReader.java
index a49f16e..5a60a7c 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetReader.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetReader.java
@@ -57,6 +57,7 @@ public class TestResultSetReader extends SubOperatorTest {
           .build();
       ResultSetOptions options = new OptionBuilder()
           .setSchema(schema1)
+          .setVectorCache(new ResultVectorCacheImpl(fixture.allocator()))
           .build();
       rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
       state = State.SCHEMA1;
@@ -188,16 +189,4 @@ public class TestResultSetReader extends SubOperatorTest {
 
     rsReader.close();
   }
-
-  @Test
-  public void testAutoRelease() {
-    BatchGenerator gen = new BatchGenerator();
-    ResultSetReader rsReader = new ResultSetReaderImpl(gen.batchAccessor());
-    gen.batch1(1, 10);
-    rsReader.start();
-
-    // If the test fails with open allocators, then the following failed.
-
-    rsReader.close();
-  }
 }
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/test/BaseDirTestWatcher.java 
b/exec/java-exec/src/test/java/org/apache/drill/test/BaseDirTestWatcher.java
index b1a2bd3..9a538d8 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/BaseDirTestWatcher.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/BaseDirTestWatcher.java
@@ -17,15 +17,15 @@
  */
 package org.apache.drill.test;
 
-import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
-import org.apache.commons.io.FileUtils;
-import org.junit.runner.Description;
-
 import java.io.File;
 import java.io.IOException;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 
+import org.apache.commons.io.FileUtils;
+import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
+import org.junit.runner.Description;
+
 /**
  * <h4>Overview</h4>
  * <p>
@@ -89,6 +89,10 @@ public class BaseDirTestWatcher extends DirTestWatcher {
     super(deleteDirAtEnd);
   }
 
+  public void start(Class<?> suite) {
+    starting(Description.createSuiteDescription(suite));
+  }
+
   @Override
   protected void starting(Description description) {
     super.starting(description);
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java 
b/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java
index 91584dd..d1391da 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java
@@ -17,6 +17,26 @@
  */
 package org.apache.drill.test;
 
+import static org.apache.drill.exec.util.StoragePluginTestUtils.DFS_TMP_SCHEMA;
+import static org.apache.drill.exec.util.StoragePluginTestUtils.ROOT_SCHEMA;
+import static org.apache.drill.exec.util.StoragePluginTestUtils.TMP_SCHEMA;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URL;
+import java.nio.file.Paths;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+
 import org.apache.drill.common.config.DrillProperties;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.logical.FormatPluginConfig;
@@ -46,26 +66,6 @@ import 
org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
 import org.apache.drill.shaded.guava.com.google.common.io.Resources;
 import org.apache.drill.test.DrillTestWrapper.TestServices;
 
-import java.io.File;
-import java.io.IOException;
-import java.net.URI;
-import java.net.URL;
-import java.nio.file.Paths;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Properties;
-
-import static org.apache.drill.exec.util.StoragePluginTestUtils.DFS_TMP_SCHEMA;
-import static org.apache.drill.exec.util.StoragePluginTestUtils.ROOT_SCHEMA;
-import static org.apache.drill.exec.util.StoragePluginTestUtils.TMP_SCHEMA;
-
 /**
  * Test fixture to start a Drillbit with provide options, create a client, and
  * execute queries. Can be used in JUnit tests, or in ad-hoc programs. Provides
@@ -124,7 +124,7 @@ public class ClusterFixture extends BaseFixture implements 
AutoCloseable {
 
   public static final String DEFAULT_BIT_NAME = "drillbit";
 
-  private Map<String, Drillbit> bits = new HashMap<>();
+  private final Map<String, Drillbit> bits = new HashMap<>();
   private Drillbit defaultDrillbit;
   private boolean ownsZK;
   private ZookeeperHelper zkHelper;
@@ -264,9 +264,15 @@ public class ClusterFixture extends BaseFixture implements 
AutoCloseable {
     final StoragePluginRegistry pluginRegistry = bit.getContext().getStorage();
     StoragePluginTestUtils.configureFormatPlugins(pluginRegistry);
 
-    
StoragePluginTestUtils.updateSchemaLocation(StoragePluginTestUtils.DFS_PLUGIN_NAME,
 pluginRegistry, builder.dirTestWatcher.getDfsTestTmpDir(), TMP_SCHEMA);
-    
StoragePluginTestUtils.updateSchemaLocation(StoragePluginTestUtils.DFS_PLUGIN_NAME,
 pluginRegistry, builder.dirTestWatcher.getRootDir(), ROOT_SCHEMA);
-    
StoragePluginTestUtils.updateSchemaLocation(StoragePluginTestUtils.DFS_PLUGIN_NAME,
 pluginRegistry, builder.dirTestWatcher.getRootDir(), 
SchemaFactory.DEFAULT_WS_NAME);
+    StoragePluginTestUtils.updateSchemaLocation(
+        StoragePluginTestUtils.DFS_PLUGIN_NAME, pluginRegistry,
+        builder.dirTestWatcher.getDfsTestTmpDir(), TMP_SCHEMA);
+    StoragePluginTestUtils.updateSchemaLocation(
+        StoragePluginTestUtils.DFS_PLUGIN_NAME,
+        pluginRegistry, builder.dirTestWatcher.getRootDir(), ROOT_SCHEMA);
+    StoragePluginTestUtils.updateSchemaLocation(
+        StoragePluginTestUtils.DFS_PLUGIN_NAME, pluginRegistry,
+        builder.dirTestWatcher.getRootDir(), SchemaFactory.DEFAULT_WS_NAME);
 
     // Create the mock data plugin
 
@@ -275,7 +281,8 @@ public class ClusterFixture extends BaseFixture implements 
AutoCloseable {
         MockStorageEngineConfig.INSTANCE, bit.getContext(),
         MockStorageEngineConfig.NAME);
     config.setEnabled(true);
-    ((StoragePluginRegistryImpl) 
pluginRegistry).addPluginToPersistentStoreIfAbsent(MockStorageEngineConfig.NAME,
 config, plugin);
+    ((StoragePluginRegistryImpl) 
pluginRegistry).addPluginToPersistentStoreIfAbsent(
+        MockStorageEngineConfig.NAME, config, plugin);
   }
 
   private void applyOptions() throws Exception {
@@ -593,7 +600,7 @@ public class ClusterFixture extends BaseFixture implements 
AutoCloseable {
 
   public static class FixtureTestServices implements TestServices {
 
-    private ClientFixture client;
+    private final ClientFixture client;
 
     public FixtureTestServices(ClientFixture client) {
       this.client = client;
diff --git a/exec/vector/src/main/codegen/templates/ColumnAccessors.java 
b/exec/vector/src/main/codegen/templates/ColumnAccessors.java
index 79f352a..c6dc033 100644
--- a/exec/vector/src/main/codegen/templates/ColumnAccessors.java
+++ b/exec/vector/src/main/codegen/templates/ColumnAccessors.java
@@ -415,7 +415,7 @@ public class ColumnAccessors {
       buf.writerIndex(VALUE_WIDTH);
     }
     </#if>
-    
+
     <#if drillType == "VarChar" || drillType == "Var16Char" || drillType == 
"VarBinary">
     @Override
     public final void appendBytes(final byte[] value, final int len) {
@@ -592,6 +592,25 @@ public class ColumnAccessors {
       }
     </#if>
     }
+
+    @Override
+    public final void copy(ColumnReader from) {
+      ${drillType}ColumnReader source = (${drillType}ColumnReader) from;
+      final DrillBuf sourceBuf = source.buffer();
+    <#if varWidth>
+      final long entry = source.getEntry();
+      final int sourceOffset = (int) (entry >> 32);
+      final int len = (int) (entry & 0xFFFF_FFFF);
+      final int destOffset = prepareWrite(len);
+      drillBuf.setBytes(destOffset, sourceBuf, sourceOffset, len);
+      offsetsWriter.setNextOffset(destOffset + len);
+    <#else>
+      final int sourceOffset = source.offsetIndex() * VALUE_WIDTH;
+      final int destOffset = prepareWrite() * VALUE_WIDTH;
+      drillBuf.setBytes(destOffset, sourceBuf, sourceOffset, VALUE_WIDTH);
+    </#if>
+      vectorIndex.nextElement();
+    }
   }
 
   </#list>
diff --git 
a/exec/vector/src/main/java/org/apache/drill/exec/record/MaterializedField.java 
b/exec/vector/src/main/java/org/apache/drill/exec/record/MaterializedField.java
index 58a4e58..bb734de 100644
--- 
a/exec/vector/src/main/java/org/apache/drill/exec/record/MaterializedField.java
+++ 
b/exec/vector/src/main/java/org/apache/drill/exec/record/MaterializedField.java
@@ -164,6 +164,7 @@ public class MaterializedField {
   public int getPrecision() { return type.getPrecision(); }
   public boolean isNullable() { return type.getMode() == DataMode.OPTIONAL; }
   public DataMode getDataMode() { return type.getMode(); }
+  public int getChildCount() { return children.size(); }
 
   public MaterializedField getOtherNullableVersion() {
     final MajorType mt = type;
diff --git 
a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ColumnWriter.java
 
b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ColumnWriter.java
index 563114a..c5da75a 100644
--- 
a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ColumnWriter.java
+++ 
b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ColumnWriter.java
@@ -80,6 +80,15 @@ public interface ColumnWriter {
   void setNull();
 
   /**
+   * Copy a single value from the given reader, which must be of the
+   * same type as this writer.
+   *
+   * @param from reader to provide the data
+   */
+
+  void copy(ColumnReader from);
+
+  /**
    * Generic technique to write data as a generic Java object. The
    * type of the object must match the target writer.
    * Primarily for testing.
diff --git 
a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ObjectReader.java
 
b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ObjectReader.java
index ce74fe3..c9244b5 100644
--- 
a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ObjectReader.java
+++ 
b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ObjectReader.java
@@ -34,4 +34,11 @@ public interface ObjectReader extends ColumnReader {
   TupleReader tuple();
   ArrayReader array();
   VariantReader variant();
+
+  /**
+   * Gets the reader as a generic type, for dynamic
+   * programming.
+   * @return the untyped reader
+   */
+  ColumnReader reader();
 }
diff --git 
a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ObjectWriter.java
 
b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ObjectWriter.java
index fffd4e5..5d8077c 100644
--- 
a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ObjectWriter.java
+++ 
b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ObjectWriter.java
@@ -45,14 +45,19 @@ import 
org.apache.drill.exec.vector.accessor.writer.WriterEvents;
 public interface ObjectWriter extends ColumnWriter {
 
   ScalarWriter scalar();
-
   TupleWriter tuple();
-
   ArrayWriter array();
-
   VariantWriter variant();
 
   /**
+   * Generic version of the above, for dynamic handling of
+   * writers.
+   * @return the generic form of the column writer
+   */
+
+  ColumnWriter writer();
+
+  /**
    * The internal state behind this writer. To be used only by the
    * implementation, not by the client.
    */
diff --git 
a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/convert/AbstractWriteConverter.java
 
b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/convert/AbstractWriteConverter.java
index a3f61e0..60810fb 100644
--- 
a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/convert/AbstractWriteConverter.java
+++ 
b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/convert/AbstractWriteConverter.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.vector.accessor.convert;
 import java.math.BigDecimal;
 
 import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.vector.accessor.ColumnReader;
 import org.apache.drill.exec.vector.accessor.ObjectType;
 import org.apache.drill.exec.vector.accessor.ScalarWriter;
 import org.apache.drill.exec.vector.accessor.ValueType;
@@ -143,4 +144,9 @@ public abstract class AbstractWriteConverter extends 
AbstractScalarWriter {
   public void setTimestamp(Instant value) {
     baseWriter.setTimestamp(value);
   }
+
+  @Override
+  public final void copy(ColumnReader from) {
+    throw new UnsupportedOperationException("Cannot copy values through a type 
converter");
+  }
 }
diff --git 
a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/AbstractObjectReader.java
 
b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/AbstractObjectReader.java
index 1350796..126fbe8 100644
--- 
a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/AbstractObjectReader.java
+++ 
b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/AbstractObjectReader.java
@@ -48,6 +48,7 @@ public abstract class AbstractObjectReader implements 
ObjectReader {
 
   public abstract ReaderEvents events();
 
+  @Override
   public abstract ColumnReader reader();
 
   @Override
diff --git 
a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/AbstractTupleReader.java
 
b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/AbstractTupleReader.java
index 65443aa..77fae8f 100644
--- 
a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/AbstractTupleReader.java
+++ 
b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/AbstractTupleReader.java
@@ -77,8 +77,8 @@ public abstract class AbstractTupleReader implements 
TupleReader, ReaderEvents {
 
   @Override
   public void bindIndex(ColumnReaderIndex index) {
-    for (int i = 0; i < readers.length; i++) {
-      readers[i].events().bindIndex(index);
+    for (AbstractObjectReader reader : readers) {
+      reader.events().bindIndex(index);
     }
   }
 
@@ -89,8 +89,8 @@ public abstract class AbstractTupleReader implements 
TupleReader, ReaderEvents {
 
   @Override
   public void bindBuffer() {
-    for (int i = 0; i < readers.length; i++) {
-      readers[i].events().bindBuffer();
+    for (AbstractObjectReader reader : readers) {
+      reader.events().bindBuffer();
     }
   }
 
@@ -168,16 +168,16 @@ public abstract class AbstractTupleReader implements 
TupleReader, ReaderEvents {
 
   @Override
   public void reposition() {
-    for (int i = 0; i < columnCount(); i++) {
-      readers[i].events().reposition();
+    for (AbstractObjectReader reader : readers) {
+      reader.events().reposition();
     }
   }
 
   @Override
   public Object getObject() {
     List<Object> elements = new ArrayList<>();
-    for (int i = 0; i < columnCount(); i++) {
-      elements.add(readers[i].getObject());
+    for (AbstractObjectReader reader : readers) {
+      elements.add(reader.getObject());
     }
     return elements;
   }
diff --git 
a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/ArrayReaderImpl.java
 
b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/ArrayReaderImpl.java
index 6f66869..fbafa8c 100644
--- 
a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/ArrayReaderImpl.java
+++ 
b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/ArrayReaderImpl.java
@@ -371,7 +371,6 @@ public class ArrayReaderImpl implements ArrayReader, 
ReaderEvents {
     elementIndex.rewind();
   }
 
-
   @Override
   public void bindBuffer() {
     elementReader.events().bindBuffer();
diff --git 
a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/BaseScalarReader.java
 
b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/BaseScalarReader.java
index a621207..dc7ae11 100644
--- 
a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/BaseScalarReader.java
+++ 
b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/BaseScalarReader.java
@@ -35,6 +35,10 @@ public abstract class BaseScalarReader extends 
AbstractScalarReader {
   public abstract static class BaseFixedWidthReader extends BaseScalarReader {
 
     public abstract int width();
+
+    public final int offsetIndex() {
+      return vectorIndex.offset();
+    }
   }
 
   public abstract static class BaseVarWidthReader extends BaseScalarReader {
@@ -59,6 +63,10 @@ public abstract class BaseScalarReader extends 
AbstractScalarReader {
       super.bindBuffer();
       offsetsReader.bindBuffer();
     }
+
+    public final long getEntry( ) {
+      return offsetsReader.getEntry();
+    }
   }
 
   /**
@@ -170,4 +178,8 @@ public abstract class BaseScalarReader extends 
AbstractScalarReader {
   public void bindBuffer() {
     bufferAccessor.rebind();
   }
+
+  public final DrillBuf buffer() {
+    return bufferAccessor.buffer();
+  }
 }
diff --git 
a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/OffsetVectorReader.java
 
b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/OffsetVectorReader.java
index 9d81638..739aa31 100644
--- 
a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/OffsetVectorReader.java
+++ 
b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/OffsetVectorReader.java
@@ -21,6 +21,7 @@ import org.apache.drill.exec.record.metadata.ColumnMetadata;
 import org.apache.drill.exec.vector.UInt4Vector;
 import org.apache.drill.exec.vector.accessor.ValueType;
 import 
org.apache.drill.exec.vector.accessor.reader.BaseScalarReader.BaseFixedWidthReader;
+
 import io.netty.buffer.DrillBuf;
 
 /**
diff --git 
a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/UnionReaderImpl.java
 
b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/UnionReaderImpl.java
index d766bb7..1dfc65a 100644
--- 
a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/UnionReaderImpl.java
+++ 
b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/UnionReaderImpl.java
@@ -175,18 +175,18 @@ public class UnionReaderImpl implements VariantReader, 
ReaderEvents {
 
   @Override
   public void reposition() {
-    for (int i = 0; i < variants.length; i++) {
-      if (variants[i] != null) {
-        variants[i].events().reposition();
+    for (AbstractObjectReader variantReader : variants) {
+      if (variantReader != null) {
+        variantReader.events().reposition();
       }
     }
   }
 
   @Override
   public void bindBuffer() {
-    for (int i = 0; i < variants.length; i++) {
-      if (variants[i] != null) {
-        variants[i].events().bindBuffer();
+    for (AbstractObjectReader variantReader : variants) {
+      if (variantReader != null) {
+        variantReader.events().bindBuffer();
       }
     }
   }
diff --git 
a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractArrayWriter.java
 
b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractArrayWriter.java
index 9d966f7..18702ac 100644
--- 
a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractArrayWriter.java
+++ 
b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractArrayWriter.java
@@ -19,9 +19,12 @@ package org.apache.drill.exec.vector.accessor.writer;
 
 import org.apache.drill.exec.record.metadata.ColumnMetadata;
 import org.apache.drill.exec.vector.UInt4Vector;
+import org.apache.drill.exec.vector.accessor.ArrayReader;
 import org.apache.drill.exec.vector.accessor.ArrayWriter;
+import org.apache.drill.exec.vector.accessor.ColumnReader;
 import org.apache.drill.exec.vector.accessor.ColumnWriter;
 import org.apache.drill.exec.vector.accessor.ColumnWriterIndex;
+import org.apache.drill.exec.vector.accessor.ObjectReader;
 import org.apache.drill.exec.vector.accessor.ObjectType;
 import org.apache.drill.exec.vector.accessor.ObjectWriter;
 import org.apache.drill.exec.vector.accessor.ScalarWriter;
@@ -351,6 +354,17 @@ public abstract class AbstractArrayWriter implements 
ArrayWriter, WriterEvents {
     }
   }
 
+  @Override
+  public void copy(ColumnReader from) {
+    ArrayReader source = (ArrayReader) from;
+    // Inefficient initial implementation
+    ObjectReader entryReader = source.entry();
+    while (source.next()) {
+      elementObjWriter.writer().copy(entryReader.reader());
+      save();
+    }
+  }
+
   /**
    * Return the writer for the offset vector for this array. Primarily used
    * to handle overflow; other clients should not attempt to muck about with
diff --git 
a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractObjectWriter.java
 
b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractObjectWriter.java
index 29bcef7..d2b65ff 100644
--- 
a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractObjectWriter.java
+++ 
b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractObjectWriter.java
@@ -19,7 +19,8 @@ package org.apache.drill.exec.vector.accessor.writer;
 
 import org.apache.drill.exec.record.metadata.ColumnMetadata;
 import org.apache.drill.exec.vector.accessor.ArrayWriter;
-import org.apache.drill.exec.vector.accessor.ColumnWriter;
+import org.apache.drill.exec.vector.accessor.ColumnReader;
+import org.apache.drill.exec.vector.accessor.ObjectReader;
 import org.apache.drill.exec.vector.accessor.ObjectType;
 import org.apache.drill.exec.vector.accessor.ObjectWriter;
 import org.apache.drill.exec.vector.accessor.ScalarWriter;
@@ -59,8 +60,6 @@ public abstract class AbstractObjectWriter implements 
ObjectWriter {
     throw new UnsupportedOperationException();
   }
 
-  public abstract ColumnWriter writer();
-
   @Override
   public abstract WriterEvents events();
 
@@ -82,6 +81,12 @@ public abstract class AbstractObjectWriter implements 
ObjectWriter {
   @Override
   public boolean isProjected() { return writer().isProjected(); }
 
+  @Override
+  public void copy(ColumnReader from) {
+    ObjectReader source = (ObjectReader) from;
+    writer().copy(source.reader());
+  }
+
   public abstract void dump(HierarchicalFormatter format);
 
   protected static ScalarWriter convertWriter(
diff --git 
a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractTupleWriter.java
 
b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractTupleWriter.java
index 948b8c7..246b4c1 100644
--- 
a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractTupleWriter.java
+++ 
b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractTupleWriter.java
@@ -25,6 +25,7 @@ import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.metadata.ColumnMetadata;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.exec.vector.accessor.ArrayWriter;
+import org.apache.drill.exec.vector.accessor.ColumnReader;
 import org.apache.drill.exec.vector.accessor.ColumnWriter;
 import org.apache.drill.exec.vector.accessor.ColumnWriterIndex;
 import org.apache.drill.exec.vector.accessor.ObjectType;
@@ -33,6 +34,7 @@ import org.apache.drill.exec.vector.accessor.ScalarWriter;
 import org.apache.drill.exec.vector.accessor.TupleWriter;
 import org.apache.drill.exec.vector.accessor.VariantWriter;
 import org.apache.drill.exec.vector.accessor.impl.HierarchicalFormatter;
+import org.apache.drill.exec.vector.accessor.reader.MapReader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -321,21 +323,33 @@ public abstract class AbstractTupleWriter implements 
TupleWriter, WriterEvents {
     // Rollover can only happen while a row is in progress.
 
     assert state == State.IN_ROW;
-    for (int i = 0; i < writers.size(); i++) {
-      writers.get(i).events().postRollover();
+    for (AbstractObjectWriter writer : writers) {
+      writer.events().postRollover();
     }
   }
 
   @Override
   public void endWrite() {
     assert state != State.IDLE;
-    for (int i = 0; i < writers.size(); i++) {
-      writers.get(i).events().endWrite();
+    for (AbstractObjectWriter writer : writers) {
+      writer.events().endWrite();
     }
     state = State.IDLE;
   }
 
   @Override
+  public void copy(ColumnReader from) {
+    MapReader source = (MapReader) from;
+    // Assumes a 1:1 correspondence between source and
+    // destination tuples. That is, does not handle the
+    // case of projection: more columns on one side vs.
+    // the other. That must be handled outside this class.
+    for (int i = 0; i < writers.size(); i++) {
+      writers.get(i).writer().copy(source.column(i).reader());
+    }
+  }
+
+  @Override
   public ObjectWriter column(int colIndex) {
     return writers.get(colIndex);
   }
diff --git 
a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/BaseVarWidthWriter.java
 
b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/BaseVarWidthWriter.java
index 0bac916..e0f88f8 100644
--- 
a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/BaseVarWidthWriter.java
+++ 
b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/BaseVarWidthWriter.java
@@ -65,15 +65,15 @@ public abstract class BaseVarWidthWriter extends 
BaseScalarWriter {
   public void startRow() { offsetsWriter.startRow(); }
 
   protected final int prepareWrite(final int width) {
-
-    // This is performance critical code; every operation counts.
-    // Please be thoughtful when making changes.
-
     fillEmpties();
     return writeOffset(width);
   }
 
   private final int writeOffset(final int width) {
+
+    // This is performance critical code; every operation counts.
+    // Please be thoughtful when making changes.
+
     final int writeOffset = offsetsWriter.nextOffset;
     if (writeOffset + width < capacity) {
       return writeOffset;
diff --git 
a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/BitColumnWriter.java
 
b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/BitColumnWriter.java
index 40fc6ac..43cae14 100644
--- 
a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/BitColumnWriter.java
+++ 
b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/BitColumnWriter.java
@@ -20,6 +20,8 @@ package org.apache.drill.exec.vector.accessor.writer;
 import org.apache.drill.exec.vector.BaseDataValueVector;
 import org.apache.drill.exec.vector.BitVector;
 import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.accessor.ColumnReader;
+import org.apache.drill.exec.vector.accessor.ScalarReader;
 import org.apache.drill.exec.vector.accessor.ValueType;
 
 /**
@@ -118,4 +120,10 @@ public class BitColumnWriter extends 
AbstractFixedWidthWriter {
   public final void setDefaultValue(final Object value) {
     defaultValue = ((Boolean) value) ? 1 : 0;
   }
+
+  @Override
+  public void copy(ColumnReader from) {
+    ScalarReader source = (ScalarReader) from;
+    setInt(source.getInt());
+  }
 }
diff --git 
a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/MapWriter.java
 
b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/MapWriter.java
index e9e26d9..38d6a5a 100644
--- 
a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/MapWriter.java
+++ 
b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/MapWriter.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.vector.accessor.ColumnReader;
 import org.apache.drill.exec.vector.accessor.ColumnWriterIndex;
 import 
org.apache.drill.exec.vector.accessor.writer.AbstractArrayWriter.ArrayObjectWriter;
 import org.apache.drill.exec.vector.accessor.writer.dummy.DummyArrayWriter;
@@ -41,7 +42,7 @@ public abstract class MapWriter extends AbstractTupleWriter {
    */
 
   private static class MemberWriterIndex implements ColumnWriterIndex {
-    private ColumnWriterIndex baseIndex;
+    private final ColumnWriterIndex baseIndex;
 
     private MemberWriterIndex(ColumnWriterIndex baseIndex) {
       this.baseIndex = baseIndex;
@@ -149,6 +150,9 @@ public abstract class MapWriter extends AbstractTupleWriter 
{
 
     @Override
     public boolean isProjected() { return false; }
+
+    @Override
+    public void copy(ColumnReader from) { }
   }
 
   protected static class DummyArrayMapWriter extends MapWriter {
@@ -160,6 +164,9 @@ public abstract class MapWriter extends AbstractTupleWriter 
{
 
     @Override
     public boolean isProjected() { return false; }
+
+    @Override
+    public void copy(ColumnReader from) { }
   }
 
   protected final ColumnMetadata mapColumnSchema;
diff --git 
a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/NullableScalarWriter.java
 
b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/NullableScalarWriter.java
index 5100ba9..ebf0d88 100644
--- 
a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/NullableScalarWriter.java
+++ 
b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/NullableScalarWriter.java
@@ -23,6 +23,7 @@ import org.apache.drill.exec.record.metadata.ColumnMetadata;
 import org.apache.drill.exec.vector.BaseDataValueVector;
 import org.apache.drill.exec.vector.NullableVector;
 import org.apache.drill.exec.vector.accessor.ColumnAccessors.UInt1ColumnWriter;
+import org.apache.drill.exec.vector.accessor.ColumnReader;
 import org.apache.drill.exec.vector.accessor.ColumnWriterIndex;
 import org.apache.drill.exec.vector.accessor.ValueType;
 import org.apache.drill.exec.vector.accessor.convert.ColumnConversionFactory;
@@ -234,6 +235,14 @@ public class NullableScalarWriter extends 
AbstractScalarWriterImpl {
   }
 
   @Override
+  public void copy(ColumnReader from) {
+    if (!from.isNull()) {
+      isSetWriter.setInt(1);
+      baseWriter.copy(from);
+    }
+  }
+
+  @Override
   public void preRollover() {
     isSetWriter.preRollover();
     baseWriter.preRollover();
diff --git 
a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/OffsetVectorWriterImpl.java
 
b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/OffsetVectorWriterImpl.java
index 0b25d61..eaeacfe 100644
--- 
a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/OffsetVectorWriterImpl.java
+++ 
b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/OffsetVectorWriterImpl.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.vector.accessor.writer;
 
 import org.apache.drill.exec.vector.BaseDataValueVector;
 import org.apache.drill.exec.vector.UInt4Vector;
+import org.apache.drill.exec.vector.accessor.ColumnReader;
 import org.apache.drill.exec.vector.accessor.InvalidConversionError;
 import org.apache.drill.exec.vector.accessor.ValueType;
 import org.apache.drill.exec.vector.accessor.impl.HierarchicalFormatter;
@@ -340,4 +341,9 @@ public class OffsetVectorWriterImpl extends 
AbstractFixedWidthWriter implements
   public void setDefaultValue(Object value) {
     throw new UnsupportedOperationException("Encoding not supported for offset 
vectors");
   }
+
+  @Override
+  public void copy(ColumnReader from) {
+    throw new UnsupportedOperationException("Copying of offset vectors not 
supported");
+  }
 }
diff --git 
a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/UnionWriterImpl.java
 
b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/UnionWriterImpl.java
index 4c599ce..30ab11e 100644
--- 
a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/UnionWriterImpl.java
+++ 
b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/UnionWriterImpl.java
@@ -23,12 +23,14 @@ import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.exec.record.metadata.ColumnMetadata;
 import org.apache.drill.exec.record.metadata.VariantMetadata;
 import org.apache.drill.exec.vector.accessor.ArrayWriter;
+import org.apache.drill.exec.vector.accessor.ColumnReader;
 import org.apache.drill.exec.vector.accessor.ColumnWriter;
 import org.apache.drill.exec.vector.accessor.ColumnWriterIndex;
 import org.apache.drill.exec.vector.accessor.ObjectType;
 import org.apache.drill.exec.vector.accessor.ObjectWriter;
 import org.apache.drill.exec.vector.accessor.ScalarWriter;
 import org.apache.drill.exec.vector.accessor.TupleWriter;
+import org.apache.drill.exec.vector.accessor.VariantReader;
 import org.apache.drill.exec.vector.accessor.VariantWriter;
 import org.apache.drill.exec.vector.accessor.WriterPosition;
 import org.apache.drill.exec.vector.accessor.impl.HierarchicalFormatter;
@@ -240,7 +242,7 @@ public class UnionWriterImpl implements VariantWriter, 
WriterEvents {
     // schema, do so now. (Unfortunately, the default listener
     // does add the schema, while the row set loader does not.)
 
-    if (! variantSchema().hasType(type)) {
+    if (!variantSchema().hasType(type)) {
       variantSchema().addType(writer.schema());
     }
     writer.events().bindIndex(index);
@@ -333,6 +335,14 @@ public class UnionWriterImpl implements VariantWriter, 
WriterEvents {
   }
 
   @Override
+  public void copy(ColumnReader from) {
+    if (!from.isNull()) {
+      VariantReader source = (VariantReader) from;
+      member(source.dataType()).copy(source.member());
+    }
+  }
+
+  @Override
   public void setObject(Object value) {
     if (value == null) {
       setNull();
@@ -379,6 +389,5 @@ public class UnionWriterImpl implements VariantWriter, 
WriterEvents {
   @Override
   public void dump(HierarchicalFormatter format) {
     // TODO Auto-generated method stub
-
   }
 }
\ No newline at end of file
diff --git 
a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/dummy/DummyArrayWriter.java
 
b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/dummy/DummyArrayWriter.java
index b5b01bb..dcc847d 100644
--- 
a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/dummy/DummyArrayWriter.java
+++ 
b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/dummy/DummyArrayWriter.java
@@ -18,6 +18,7 @@
 package org.apache.drill.exec.vector.accessor.writer.dummy;
 
 import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.vector.accessor.ColumnReader;
 import org.apache.drill.exec.vector.accessor.ColumnWriterIndex;
 import org.apache.drill.exec.vector.accessor.writer.AbstractArrayWriter;
 import org.apache.drill.exec.vector.accessor.writer.AbstractObjectWriter;
@@ -96,4 +97,7 @@ public class DummyArrayWriter extends AbstractArrayWriter {
 
   @Override
   public boolean isProjected() { return false; }
+
+  @Override
+  public void copy(ColumnReader from) { }
 }
diff --git 
a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/dummy/DummyScalarWriter.java
 
b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/dummy/DummyScalarWriter.java
index d110d1b..85a7fe1 100644
--- 
a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/dummy/DummyScalarWriter.java
+++ 
b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/dummy/DummyScalarWriter.java
@@ -21,6 +21,7 @@ import java.math.BigDecimal;
 
 import org.apache.drill.exec.record.metadata.ColumnMetadata;
 import org.apache.drill.exec.vector.BaseDataValueVector;
+import org.apache.drill.exec.vector.accessor.ColumnReader;
 import org.apache.drill.exec.vector.accessor.ColumnWriterIndex;
 import org.apache.drill.exec.vector.accessor.ValueType;
 import org.apache.drill.exec.vector.accessor.writer.AbstractScalarWriterImpl;
@@ -121,4 +122,7 @@ public class DummyScalarWriter extends 
AbstractScalarWriterImpl {
 
   @Override
   public boolean isProjected() { return false; }
+
+  @Override
+  public void copy(ColumnReader from) { }
 }

Reply via email to