[SPARK-21745][SQL] Refactor ColumnVector hierarchy to make ColumnVector 
read-only and to introduce WritableColumnVector.

## What changes were proposed in this pull request?

This is a refactoring of `ColumnVector` hierarchy and related classes.

1. make `ColumnVector` read-only
2. introduce `WritableColumnVector` with write interface
3. remove `ReadOnlyColumnVector`

## How was this patch tested?

Existing tests.

Author: Takuya UESHIN <ues...@databricks.com>

Closes #18958 from ueshin/issues/SPARK-21745.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9e33954d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9e33954d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9e33954d

Branch: refs/heads/master
Commit: 9e33954ddfe1148f69e523c89827feb76ba892c9
Parents: dc5d34d
Author: Takuya UESHIN <ues...@databricks.com>
Authored: Thu Aug 24 21:13:44 2017 +0800
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Thu Aug 24 21:13:44 2017 +0800

----------------------------------------------------------------------
 .../expressions/codegen/CodeGenerator.scala     |  28 +-
 .../parquet/VectorizedColumnReader.java         |  31 +-
 .../parquet/VectorizedParquetRecordReader.java  |  23 +-
 .../parquet/VectorizedPlainValuesReader.java    |  16 +-
 .../parquet/VectorizedRleValuesReader.java      |  87 ++-
 .../parquet/VectorizedValuesReader.java         |  16 +-
 .../execution/vectorized/AggregateHashMap.java  |  10 +-
 .../execution/vectorized/ArrowColumnVector.java |  45 +-
 .../sql/execution/vectorized/ColumnVector.java  | 632 +----------------
 .../execution/vectorized/ColumnVectorUtils.java |  18 +-
 .../sql/execution/vectorized/ColumnarBatch.java | 106 +--
 .../vectorized/OffHeapColumnVector.java         |  34 +-
 .../vectorized/OnHeapColumnVector.java          |  35 +-
 .../vectorized/ReadOnlyColumnVector.java        | 251 -------
 .../vectorized/WritableColumnVector.java        | 674 +++++++++++++++++++
 .../aggregate/VectorizedHashMapGenerator.scala  |  39 +-
 .../vectorized/ColumnarBatchBenchmark.scala     |  23 +-
 .../vectorized/ColumnarBatchSuite.scala         | 109 +--
 18 files changed, 1078 insertions(+), 1099 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/9e33954d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
index 807765c..3853863 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
@@ -464,14 +464,13 @@ class CodegenContext {
   /**
    * Returns the specialized code to set a given value in a column vector for 
a given `DataType`.
    */
-  def setValue(batch: String, row: String, dataType: DataType, ordinal: Int,
-      value: String): String = {
+  def setValue(vector: String, rowId: String, dataType: DataType, value: 
String): String = {
     val jt = javaType(dataType)
     dataType match {
       case _ if isPrimitiveType(jt) =>
-        s"$batch.column($ordinal).put${primitiveTypeName(jt)}($row, $value);"
-      case t: DecimalType => s"$batch.column($ordinal).putDecimal($row, 
$value, ${t.precision});"
-      case t: StringType => s"$batch.column($ordinal).putByteArray($row, 
$value.getBytes());"
+        s"$vector.put${primitiveTypeName(jt)}($rowId, $value);"
+      case t: DecimalType => s"$vector.putDecimal($rowId, $value, 
${t.precision});"
+      case t: StringType => s"$vector.putByteArray($rowId, $value.getBytes());"
       case _ =>
         throw new IllegalArgumentException(s"cannot generate code for 
unsupported type: $dataType")
     }
@@ -482,37 +481,36 @@ class CodegenContext {
    * that could potentially be nullable.
    */
   def updateColumn(
-      batch: String,
-      row: String,
+      vector: String,
+      rowId: String,
       dataType: DataType,
-      ordinal: Int,
       ev: ExprCode,
       nullable: Boolean): String = {
     if (nullable) {
       s"""
          if (!${ev.isNull}) {
-           ${setValue(batch, row, dataType, ordinal, ev.value)}
+           ${setValue(vector, rowId, dataType, ev.value)}
          } else {
-           $batch.column($ordinal).putNull($row);
+           $vector.putNull($rowId);
          }
        """
     } else {
-      s"""${setValue(batch, row, dataType, ordinal, ev.value)};"""
+      s"""${setValue(vector, rowId, dataType, ev.value)};"""
     }
   }
 
   /**
    * Returns the specialized code to access a value from a column vector for a 
given `DataType`.
    */
-  def getValue(batch: String, row: String, dataType: DataType, ordinal: Int): 
String = {
+  def getValue(vector: String, rowId: String, dataType: DataType): String = {
     val jt = javaType(dataType)
     dataType match {
       case _ if isPrimitiveType(jt) =>
-        s"$batch.column($ordinal).get${primitiveTypeName(jt)}($row)"
+        s"$vector.get${primitiveTypeName(jt)}($rowId)"
       case t: DecimalType =>
-        s"$batch.column($ordinal).getDecimal($row, ${t.precision}, ${t.scale})"
+        s"$vector.getDecimal($rowId, ${t.precision}, ${t.scale})"
       case StringType =>
-        s"$batch.column($ordinal).getUTF8String($row)"
+        s"$vector.getUTF8String($rowId)"
       case _ =>
         throw new IllegalArgumentException(s"cannot generate code for 
unsupported type: $dataType")
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/9e33954d/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
index fd8db17..f37864a 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
@@ -30,6 +30,7 @@ import org.apache.parquet.schema.PrimitiveType;
 
 import org.apache.spark.sql.catalyst.util.DateTimeUtils;
 import org.apache.spark.sql.execution.vectorized.ColumnVector;
+import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
 import org.apache.spark.sql.types.DataTypes;
 import org.apache.spark.sql.types.DecimalType;
 
@@ -135,9 +136,9 @@ public class VectorizedColumnReader {
   /**
    * Reads `total` values from this columnReader into column.
    */
-  void readBatch(int total, ColumnVector column) throws IOException {
+  void readBatch(int total, WritableColumnVector column) throws IOException {
     int rowId = 0;
-    ColumnVector dictionaryIds = null;
+    WritableColumnVector dictionaryIds = null;
     if (dictionary != null) {
       // SPARK-16334: We only maintain a single dictionary per row batch, so 
that it can be used to
       // decode all previous dictionary encoded pages if we ever encounter a 
non-dictionary encoded
@@ -219,8 +220,11 @@ public class VectorizedColumnReader {
   /**
    * Reads `num` values into column, decoding the values from `dictionaryIds` 
and `dictionary`.
    */
-  private void decodeDictionaryIds(int rowId, int num, ColumnVector column,
-                                   ColumnVector dictionaryIds) {
+  private void decodeDictionaryIds(
+      int rowId,
+      int num,
+      WritableColumnVector column,
+      ColumnVector dictionaryIds) {
     switch (descriptor.getType()) {
       case INT32:
         if (column.dataType() == DataTypes.IntegerType ||
@@ -346,13 +350,13 @@ public class VectorizedColumnReader {
    * is guaranteed that num is smaller than the number of values left in the 
current page.
    */
 
-  private void readBooleanBatch(int rowId, int num, ColumnVector column) 
throws IOException {
+  private void readBooleanBatch(int rowId, int num, WritableColumnVector 
column) throws IOException {
     assert(column.dataType() == DataTypes.BooleanType);
     defColumn.readBooleans(
         num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
   }
 
-  private void readIntBatch(int rowId, int num, ColumnVector column) throws 
IOException {
+  private void readIntBatch(int rowId, int num, WritableColumnVector column) 
throws IOException {
     // This is where we implement support for the valid type conversions.
     // TODO: implement remaining type conversions
     if (column.dataType() == DataTypes.IntegerType || column.dataType() == 
DataTypes.DateType ||
@@ -370,7 +374,7 @@ public class VectorizedColumnReader {
     }
   }
 
-  private void readLongBatch(int rowId, int num, ColumnVector column) throws 
IOException {
+  private void readLongBatch(int rowId, int num, WritableColumnVector column) 
throws IOException {
     // This is where we implement support for the valid type conversions.
     if (column.dataType() == DataTypes.LongType ||
         DecimalType.is64BitDecimalType(column.dataType())) {
@@ -389,7 +393,7 @@ public class VectorizedColumnReader {
     }
   }
 
-  private void readFloatBatch(int rowId, int num, ColumnVector column) throws 
IOException {
+  private void readFloatBatch(int rowId, int num, WritableColumnVector column) 
throws IOException {
     // This is where we implement support for the valid type conversions.
     // TODO: support implicit cast to double?
     if (column.dataType() == DataTypes.FloatType) {
@@ -400,7 +404,7 @@ public class VectorizedColumnReader {
     }
   }
 
-  private void readDoubleBatch(int rowId, int num, ColumnVector column) throws 
IOException {
+  private void readDoubleBatch(int rowId, int num, WritableColumnVector 
column) throws IOException {
     // This is where we implement support for the valid type conversions.
     // TODO: implement remaining type conversions
     if (column.dataType() == DataTypes.DoubleType) {
@@ -411,7 +415,7 @@ public class VectorizedColumnReader {
     }
   }
 
-  private void readBinaryBatch(int rowId, int num, ColumnVector column) throws 
IOException {
+  private void readBinaryBatch(int rowId, int num, WritableColumnVector 
column) throws IOException {
     // This is where we implement support for the valid type conversions.
     // TODO: implement remaining type conversions
     VectorizedValuesReader data = (VectorizedValuesReader) dataColumn;
@@ -432,8 +436,11 @@ public class VectorizedColumnReader {
     }
   }
 
-  private void readFixedLenByteArrayBatch(int rowId, int num,
-                                          ColumnVector column, int arrayLen) 
throws IOException {
+  private void readFixedLenByteArrayBatch(
+      int rowId,
+      int num,
+      WritableColumnVector column,
+      int arrayLen) throws IOException {
     VectorizedValuesReader data = (VectorizedValuesReader) dataColumn;
     // This is where we implement support for the valid type conversions.
     // TODO: implement remaining type conversions

http://git-wip-us.apache.org/repos/asf/spark/blob/9e33954d/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
index 04f8141..0cacf0c 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
@@ -31,6 +31,9 @@ import org.apache.spark.memory.MemoryMode;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.execution.vectorized.ColumnVectorUtils;
 import org.apache.spark.sql.execution.vectorized.ColumnarBatch;
+import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
+import org.apache.spark.sql.execution.vectorized.OffHeapColumnVector;
+import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector;
 import org.apache.spark.sql.types.StructField;
 import org.apache.spark.sql.types.StructType;
 
@@ -90,6 +93,8 @@ public class VectorizedParquetRecordReader extends 
SpecificParquetRecordReaderBa
    */
   private ColumnarBatch columnarBatch;
 
+  private WritableColumnVector[] columnVectors;
+
   /**
    * If true, this class returns batches instead of rows.
    */
@@ -172,20 +177,26 @@ public class VectorizedParquetRecordReader extends 
SpecificParquetRecordReaderBa
       }
     }
 
-    columnarBatch = ColumnarBatch.allocate(batchSchema, memMode);
+    int capacity = ColumnarBatch.DEFAULT_BATCH_SIZE;
+    if (memMode == MemoryMode.OFF_HEAP) {
+      columnVectors = OffHeapColumnVector.allocateColumns(capacity, 
batchSchema);
+    } else {
+      columnVectors = OnHeapColumnVector.allocateColumns(capacity, 
batchSchema);
+    }
+    columnarBatch = new ColumnarBatch(batchSchema, columnVectors, capacity);
     if (partitionColumns != null) {
       int partitionIdx = sparkSchema.fields().length;
       for (int i = 0; i < partitionColumns.fields().length; i++) {
-        ColumnVectorUtils.populate(columnarBatch.column(i + partitionIdx), 
partitionValues, i);
-        columnarBatch.column(i + partitionIdx).setIsConstant();
+        ColumnVectorUtils.populate(columnVectors[i + partitionIdx], 
partitionValues, i);
+        columnVectors[i + partitionIdx].setIsConstant();
       }
     }
 
     // Initialize missing columns with nulls.
     for (int i = 0; i < missingColumns.length; i++) {
       if (missingColumns[i]) {
-        columnarBatch.column(i).putNulls(0, columnarBatch.capacity());
-        columnarBatch.column(i).setIsConstant();
+        columnVectors[i].putNulls(0, columnarBatch.capacity());
+        columnVectors[i].setIsConstant();
       }
     }
   }
@@ -226,7 +237,7 @@ public class VectorizedParquetRecordReader extends 
SpecificParquetRecordReaderBa
     int num = (int) Math.min((long) columnarBatch.capacity(), 
totalCountLoadedSoFar - rowsReturned);
     for (int i = 0; i < columnReaders.length; ++i) {
       if (columnReaders[i] == null) continue;
-      columnReaders[i].readBatch(num, columnarBatch.column(i));
+      columnReaders[i].readBatch(num, columnVectors[i]);
     }
     rowsReturned += num;
     columnarBatch.setNumRows(num);

http://git-wip-us.apache.org/repos/asf/spark/blob/9e33954d/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
index 98018b7..5b75f71 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
@@ -20,7 +20,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 
-import org.apache.spark.sql.execution.vectorized.ColumnVector;
+import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
 import org.apache.spark.unsafe.Platform;
 
 import org.apache.parquet.column.values.ValuesReader;
@@ -56,7 +56,7 @@ public class VectorizedPlainValuesReader extends ValuesReader 
implements Vectori
   }
 
   @Override
-  public final void readBooleans(int total, ColumnVector c, int rowId) {
+  public final void readBooleans(int total, WritableColumnVector c, int rowId) 
{
     // TODO: properly vectorize this
     for (int i = 0; i < total; i++) {
       c.putBoolean(rowId + i, readBoolean());
@@ -64,31 +64,31 @@ public class VectorizedPlainValuesReader extends 
ValuesReader implements Vectori
   }
 
   @Override
-  public final void readIntegers(int total, ColumnVector c, int rowId) {
+  public final void readIntegers(int total, WritableColumnVector c, int rowId) 
{
     c.putIntsLittleEndian(rowId, total, buffer, offset - 
Platform.BYTE_ARRAY_OFFSET);
     offset += 4 * total;
   }
 
   @Override
-  public final void readLongs(int total, ColumnVector c, int rowId) {
+  public final void readLongs(int total, WritableColumnVector c, int rowId) {
     c.putLongsLittleEndian(rowId, total, buffer, offset - 
Platform.BYTE_ARRAY_OFFSET);
     offset += 8 * total;
   }
 
   @Override
-  public final void readFloats(int total, ColumnVector c, int rowId) {
+  public final void readFloats(int total, WritableColumnVector c, int rowId) {
     c.putFloats(rowId, total, buffer, offset - Platform.BYTE_ARRAY_OFFSET);
     offset += 4 * total;
   }
 
   @Override
-  public final void readDoubles(int total, ColumnVector c, int rowId) {
+  public final void readDoubles(int total, WritableColumnVector c, int rowId) {
     c.putDoubles(rowId, total, buffer, offset - Platform.BYTE_ARRAY_OFFSET);
     offset += 8 * total;
   }
 
   @Override
-  public final void readBytes(int total, ColumnVector c, int rowId) {
+  public final void readBytes(int total, WritableColumnVector c, int rowId) {
     for (int i = 0; i < total; i++) {
       // Bytes are stored as a 4-byte little endian int. Just read the first 
byte.
       // TODO: consider pushing this in ColumnVector by adding a readBytes 
with a stride.
@@ -159,7 +159,7 @@ public class VectorizedPlainValuesReader extends 
ValuesReader implements Vectori
   }
 
   @Override
-  public final void readBinary(int total, ColumnVector v, int rowId) {
+  public final void readBinary(int total, WritableColumnVector v, int rowId) {
     for (int i = 0; i < total; i++) {
       int len = readInteger();
       int start = offset;

http://git-wip-us.apache.org/repos/asf/spark/blob/9e33954d/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
index 6215738..fc7fa70 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
@@ -25,7 +25,7 @@ import org.apache.parquet.column.values.bitpacking.Packer;
 import org.apache.parquet.io.ParquetDecodingException;
 import org.apache.parquet.io.api.Binary;
 
-import org.apache.spark.sql.execution.vectorized.ColumnVector;
+import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
 
 /**
  * A values reader for Parquet's run-length encoded data. This is based off of 
the version in
@@ -177,7 +177,11 @@ public final class VectorizedRleValuesReader extends 
ValuesReader
    *    c[rowId] = null;
    *  }
    */
-  public void readIntegers(int total, ColumnVector c, int rowId, int level,
+  public void readIntegers(
+      int total,
+      WritableColumnVector c,
+      int rowId,
+      int level,
       VectorizedValuesReader data) {
     int left = total;
     while (left > 0) {
@@ -208,8 +212,12 @@ public final class VectorizedRleValuesReader extends 
ValuesReader
   }
 
   // TODO: can this code duplication be removed without a perf penalty?
-  public void readBooleans(int total, ColumnVector c,
-                        int rowId, int level, VectorizedValuesReader data) {
+  public void readBooleans(
+      int total,
+      WritableColumnVector c,
+      int rowId,
+      int level,
+      VectorizedValuesReader data) {
     int left = total;
     while (left > 0) {
       if (this.currentCount == 0) this.readNextGroup();
@@ -238,8 +246,12 @@ public final class VectorizedRleValuesReader extends 
ValuesReader
     }
   }
 
-  public void readBytes(int total, ColumnVector c,
-                        int rowId, int level, VectorizedValuesReader data) {
+  public void readBytes(
+      int total,
+      WritableColumnVector c,
+      int rowId,
+      int level,
+      VectorizedValuesReader data) {
     int left = total;
     while (left > 0) {
       if (this.currentCount == 0) this.readNextGroup();
@@ -268,8 +280,12 @@ public final class VectorizedRleValuesReader extends 
ValuesReader
     }
   }
 
-  public void readShorts(int total, ColumnVector c,
-                        int rowId, int level, VectorizedValuesReader data) {
+  public void readShorts(
+      int total,
+      WritableColumnVector c,
+      int rowId,
+      int level,
+      VectorizedValuesReader data) {
     int left = total;
     while (left > 0) {
       if (this.currentCount == 0) this.readNextGroup();
@@ -300,8 +316,12 @@ public final class VectorizedRleValuesReader extends 
ValuesReader
     }
   }
 
-  public void readLongs(int total, ColumnVector c, int rowId, int level,
-                        VectorizedValuesReader data) {
+  public void readLongs(
+      int total,
+      WritableColumnVector c,
+      int rowId,
+      int level,
+      VectorizedValuesReader data) {
     int left = total;
     while (left > 0) {
       if (this.currentCount == 0) this.readNextGroup();
@@ -330,8 +350,12 @@ public final class VectorizedRleValuesReader extends 
ValuesReader
     }
   }
 
-  public void readFloats(int total, ColumnVector c, int rowId, int level,
-                        VectorizedValuesReader data) {
+  public void readFloats(
+      int total,
+      WritableColumnVector c,
+      int rowId,
+      int level,
+      VectorizedValuesReader data) {
     int left = total;
     while (left > 0) {
       if (this.currentCount == 0) this.readNextGroup();
@@ -360,8 +384,12 @@ public final class VectorizedRleValuesReader extends 
ValuesReader
     }
   }
 
-  public void readDoubles(int total, ColumnVector c, int rowId, int level,
-                         VectorizedValuesReader data) {
+  public void readDoubles(
+      int total,
+      WritableColumnVector c,
+      int rowId,
+      int level,
+      VectorizedValuesReader data) {
     int left = total;
     while (left > 0) {
       if (this.currentCount == 0) this.readNextGroup();
@@ -390,8 +418,12 @@ public final class VectorizedRleValuesReader extends 
ValuesReader
     }
   }
 
-  public void readBinarys(int total, ColumnVector c, int rowId, int level,
-                        VectorizedValuesReader data) {
+  public void readBinarys(
+      int total,
+      WritableColumnVector c,
+      int rowId,
+      int level,
+      VectorizedValuesReader data) {
     int left = total;
     while (left > 0) {
       if (this.currentCount == 0) this.readNextGroup();
@@ -424,8 +456,13 @@ public final class VectorizedRleValuesReader extends 
ValuesReader
    * Decoding for dictionary ids. The IDs are populated into `values` and the 
nullability is
    * populated into `nulls`.
    */
-  public void readIntegers(int total, ColumnVector values, ColumnVector nulls, 
int rowId, int level,
-                           VectorizedValuesReader data) {
+  public void readIntegers(
+      int total,
+      WritableColumnVector values,
+      WritableColumnVector nulls,
+      int rowId,
+      int level,
+      VectorizedValuesReader data) {
     int left = total;
     while (left > 0) {
       if (this.currentCount == 0) this.readNextGroup();
@@ -459,7 +496,7 @@ public final class VectorizedRleValuesReader extends 
ValuesReader
   // IDs. This is different than the above APIs that decodes definitions 
levels along with values.
   // Since this is only used to decode dictionary IDs, only decoding integers 
is supported.
   @Override
-  public void readIntegers(int total, ColumnVector c, int rowId) {
+  public void readIntegers(int total, WritableColumnVector c, int rowId) {
     int left = total;
     while (left > 0) {
       if (this.currentCount == 0) this.readNextGroup();
@@ -485,32 +522,32 @@ public final class VectorizedRleValuesReader extends 
ValuesReader
   }
 
   @Override
-  public void readBytes(int total, ColumnVector c, int rowId) {
+  public void readBytes(int total, WritableColumnVector c, int rowId) {
     throw new UnsupportedOperationException("only readInts is valid.");
   }
 
   @Override
-  public void readLongs(int total, ColumnVector c, int rowId) {
+  public void readLongs(int total, WritableColumnVector c, int rowId) {
     throw new UnsupportedOperationException("only readInts is valid.");
   }
 
   @Override
-  public void readBinary(int total, ColumnVector c, int rowId) {
+  public void readBinary(int total, WritableColumnVector c, int rowId) {
     throw new UnsupportedOperationException("only readInts is valid.");
   }
 
   @Override
-  public void readBooleans(int total, ColumnVector c, int rowId) {
+  public void readBooleans(int total, WritableColumnVector c, int rowId) {
     throw new UnsupportedOperationException("only readInts is valid.");
   }
 
   @Override
-  public void readFloats(int total, ColumnVector c, int rowId) {
+  public void readFloats(int total, WritableColumnVector c, int rowId) {
     throw new UnsupportedOperationException("only readInts is valid.");
   }
 
   @Override
-  public void readDoubles(int total, ColumnVector c, int rowId) {
+  public void readDoubles(int total, WritableColumnVector c, int rowId) {
     throw new UnsupportedOperationException("only readInts is valid.");
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/9e33954d/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedValuesReader.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedValuesReader.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedValuesReader.java
index 88418ca..57d92ae 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedValuesReader.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedValuesReader.java
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.execution.datasources.parquet;
 
-import org.apache.spark.sql.execution.vectorized.ColumnVector;
+import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
 
 import org.apache.parquet.io.api.Binary;
 
@@ -37,11 +37,11 @@ public interface VectorizedValuesReader {
   /*
    * Reads `total` values into `c` start at `c[rowId]`
    */
-  void readBooleans(int total, ColumnVector c, int rowId);
-  void readBytes(int total, ColumnVector c, int rowId);
-  void readIntegers(int total, ColumnVector c, int rowId);
-  void readLongs(int total, ColumnVector c, int rowId);
-  void readFloats(int total, ColumnVector c, int rowId);
-  void readDoubles(int total, ColumnVector c, int rowId);
-  void readBinary(int total, ColumnVector c, int rowId);
+  void readBooleans(int total, WritableColumnVector c, int rowId);
+  void readBytes(int total, WritableColumnVector c, int rowId);
+  void readIntegers(int total, WritableColumnVector c, int rowId);
+  void readLongs(int total, WritableColumnVector c, int rowId);
+  void readFloats(int total, WritableColumnVector c, int rowId);
+  void readDoubles(int total, WritableColumnVector c, int rowId);
+  void readBinary(int total, WritableColumnVector c, int rowId);
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/9e33954d/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/AggregateHashMap.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/AggregateHashMap.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/AggregateHashMap.java
index 25a565d..1c94f70 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/AggregateHashMap.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/AggregateHashMap.java
@@ -41,6 +41,7 @@ import static org.apache.spark.sql.types.DataTypes.LongType;
  */
 public class AggregateHashMap {
 
+  private OnHeapColumnVector[] columnVectors;
   private ColumnarBatch batch;
   private int[] buckets;
   private int numBuckets;
@@ -62,7 +63,8 @@ public class AggregateHashMap {
 
     this.maxSteps = maxSteps;
     numBuckets = (int) (capacity / loadFactor);
-    batch = ColumnarBatch.allocate(schema, MemoryMode.ON_HEAP, capacity);
+    columnVectors = OnHeapColumnVector.allocateColumns(capacity, schema);
+    batch = new ColumnarBatch(schema, columnVectors, capacity);
     buckets = new int[numBuckets];
     Arrays.fill(buckets, -1);
   }
@@ -74,8 +76,8 @@ public class AggregateHashMap {
   public ColumnarBatch.Row findOrInsert(long key) {
     int idx = find(key);
     if (idx != -1 && buckets[idx] == -1) {
-      batch.column(0).putLong(numRows, key);
-      batch.column(1).putLong(numRows, 0);
+      columnVectors[0].putLong(numRows, key);
+      columnVectors[1].putLong(numRows, 0);
       buckets[idx] = numRows++;
     }
     return batch.getRow(buckets[idx]);
@@ -105,6 +107,6 @@ public class AggregateHashMap {
   }
 
   private boolean equals(int idx, long key1) {
-    return batch.column(0).getLong(buckets[idx]) == key1;
+    return columnVectors[0].getLong(buckets[idx]) == key1;
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/9e33954d/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ArrowColumnVector.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ArrowColumnVector.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ArrowColumnVector.java
index 59d66c5..be2a9c2 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ArrowColumnVector.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ArrowColumnVector.java
@@ -29,12 +29,13 @@ import org.apache.spark.unsafe.types.UTF8String;
 /**
  * A column vector backed by Apache Arrow.
  */
-public final class ArrowColumnVector extends ReadOnlyColumnVector {
+public final class ArrowColumnVector extends ColumnVector {
 
   private final ArrowVectorAccessor accessor;
-  private final int valueCount;
+  private ArrowColumnVector[] childColumns;
 
   private void ensureAccessible(int index) {
+    int valueCount = accessor.getValueCount();
     if (index < 0 || index >= valueCount) {
       throw new IndexOutOfBoundsException(
         String.format("index: %d, valueCount: %d", index, valueCount));
@@ -42,6 +43,7 @@ public final class ArrowColumnVector extends 
ReadOnlyColumnVector {
   }
 
   private void ensureAccessible(int index, int count) {
+    int valueCount = accessor.getValueCount();
     if (index < 0 || index + count > valueCount) {
       throw new IndexOutOfBoundsException(
         String.format("index range: [%d, %d), valueCount: %d", index, index + 
count, valueCount));
@@ -49,6 +51,16 @@ public final class ArrowColumnVector extends 
ReadOnlyColumnVector {
   }
 
   @Override
+  public int numNulls() {
+    return accessor.getNullCount();
+  }
+
+  @Override
+  public boolean anyNullsSet() {
+    return numNulls() > 0;
+  }
+
+  @Override
   public long nullsNativeAddress() {
     throw new RuntimeException("Cannot get native address for arrow column");
   }
@@ -274,9 +286,20 @@ public final class ArrowColumnVector extends 
ReadOnlyColumnVector {
     return accessor.getBinary(rowId);
   }
 
+  /**
+   * Returns the data for the underlying array.
+   */
+  @Override
+  public ArrowColumnVector arrayData() { return childColumns[0]; }
+
+  /**
+   * Returns the ordinal's child data column.
+   */
+  @Override
+  public ArrowColumnVector getChildColumn(int ordinal) { return 
childColumns[ordinal]; }
+
   public ArrowColumnVector(ValueVector vector) {
-    super(vector.getValueCapacity(), 
ArrowUtils.fromArrowField(vector.getField()),
-      MemoryMode.OFF_HEAP);
+    super(ArrowUtils.fromArrowField(vector.getField()));
 
     if (vector instanceof NullableBitVector) {
       accessor = new BooleanAccessor((NullableBitVector) vector);
@@ -302,7 +325,7 @@ public final class ArrowColumnVector extends 
ReadOnlyColumnVector {
       ListVector listVector = (ListVector) vector;
       accessor = new ArrayAccessor(listVector);
 
-      childColumns = new ColumnVector[1];
+      childColumns = new ArrowColumnVector[1];
       childColumns[0] = new ArrowColumnVector(listVector.getDataVector());
       resultArray = new ColumnVector.Array(childColumns[0]);
     } else if (vector instanceof MapVector) {
@@ -317,9 +340,6 @@ public final class ArrowColumnVector extends 
ReadOnlyColumnVector {
     } else {
       throw new UnsupportedOperationException();
     }
-    valueCount = accessor.getValueCount();
-    numNulls = accessor.getNullCount();
-    anyNullsSet = numNulls > 0;
   }
 
   private abstract static class ArrowVectorAccessor {
@@ -327,14 +347,9 @@ public final class ArrowColumnVector extends 
ReadOnlyColumnVector {
     private final ValueVector vector;
     private final ValueVector.Accessor nulls;
 
-    private final int valueCount;
-    private final int nullCount;
-
     ArrowVectorAccessor(ValueVector vector) {
       this.vector = vector;
       this.nulls = vector.getAccessor();
-      this.valueCount = nulls.getValueCount();
-      this.nullCount = nulls.getNullCount();
     }
 
     final boolean isNullAt(int rowId) {
@@ -342,11 +357,11 @@ public final class ArrowColumnVector extends 
ReadOnlyColumnVector {
     }
 
     final int getValueCount() {
-      return valueCount;
+      return nulls.getValueCount();
     }
 
     final int getNullCount() {
-      return nullCount;
+      return nulls.getNullCount();
     }
 
     final void close() {

http://git-wip-us.apache.org/repos/asf/spark/blob/9e33954d/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
index 7796638..a69dd97 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
@@ -16,23 +16,16 @@
  */
 package org.apache.spark.sql.execution.vectorized;
 
-import java.math.BigDecimal;
-import java.math.BigInteger;
-
-import com.google.common.annotations.VisibleForTesting;
-
-import org.apache.spark.memory.MemoryMode;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.catalyst.util.ArrayData;
 import org.apache.spark.sql.catalyst.util.MapData;
-import org.apache.spark.sql.internal.SQLConf;
 import org.apache.spark.sql.types.*;
 import org.apache.spark.unsafe.types.CalendarInterval;
 import org.apache.spark.unsafe.types.UTF8String;
 
 /**
  * This class represents a column of values and provides the main APIs to 
access the data
- * values. It supports all the types and contains get/put APIs as well as 
their batched versions.
+ * values. It supports all the types and contains get APIs as well as their 
batched versions.
  * The batched versions are preferable whenever possible.
  *
  * To handle nested schemas, ColumnVector has two types: Arrays and Structs. 
In both cases these
@@ -40,34 +33,15 @@ import org.apache.spark.unsafe.types.UTF8String;
  * contains nullability, and in the case of Arrays, the lengths and offsets 
into the child column.
  * Lengths and offsets are encoded identically to INTs.
  * Maps are just a special case of a two field struct.
- * Strings are handled as an Array of ByteType.
- *
- * Capacity: The data stored is dense but the arrays are not fixed capacity. 
It is the
- * responsibility of the caller to call reserve() to ensure there is enough 
room before adding
- * elements. This means that the put() APIs do not check as in common cases 
(i.e. flat schemas),
- * the lengths are known up front.
  *
  * Most of the APIs take the rowId as a parameter. This is the batch local 
0-based row id for values
  * in the current RowBatch.
  *
- * A ColumnVector should be considered immutable once originally created. In 
other words, it is not
- * valid to call put APIs after reads until reset() is called.
+ * A ColumnVector should be considered immutable once originally created.
  *
  * ColumnVectors are intended to be reused.
  */
 public abstract class ColumnVector implements AutoCloseable {
-  /**
-   * Allocates a column to store elements of `type` on or off heap.
-   * Capacity is the initial capacity of the vector and it will grow as 
necessary. Capacity is
-   * in number of elements, not number of bytes.
-   */
-  public static ColumnVector allocate(int capacity, DataType type, MemoryMode 
mode) {
-    if (mode == MemoryMode.OFF_HEAP) {
-      return new OffHeapColumnVector(capacity, type);
-    } else {
-      return new OnHeapColumnVector(capacity, type);
-    }
-  }
 
   /**
    * Holder object to return an array. This object is intended to be reused. 
Callers should
@@ -279,74 +253,21 @@ public abstract class ColumnVector implements 
AutoCloseable {
   public final DataType dataType() { return type; }
 
   /**
-   * Resets this column for writing. The currently stored values are no longer 
accessible.
-   */
-  public void reset() {
-    if (isConstant) return;
-
-    if (childColumns != null) {
-      for (ColumnVector c: childColumns) {
-        c.reset();
-      }
-    }
-    numNulls = 0;
-    elementsAppended = 0;
-    if (anyNullsSet) {
-      putNotNulls(0, capacity);
-      anyNullsSet = false;
-    }
-  }
-
-  /**
    * Cleans up memory for this column. The column is not usable after this.
    * TODO: this should probably have ref-counted semantics.
    */
   public abstract void close();
 
-  public void reserve(int requiredCapacity) {
-    if (requiredCapacity > capacity) {
-      int newCapacity = (int) Math.min(MAX_CAPACITY, requiredCapacity * 2L);
-      if (requiredCapacity <= newCapacity) {
-        try {
-          reserveInternal(newCapacity);
-        } catch (OutOfMemoryError outOfMemoryError) {
-          throwUnsupportedException(requiredCapacity, outOfMemoryError);
-        }
-      } else {
-        throwUnsupportedException(requiredCapacity, null);
-      }
-    }
-  }
-
-  private void throwUnsupportedException(int requiredCapacity, Throwable 
cause) {
-    String message = "Cannot reserve additional contiguous bytes in the 
vectorized reader " +
-        "(requested = " + requiredCapacity + " bytes). As a workaround, you 
can disable the " +
-        "vectorized reader by setting " + 
SQLConf.PARQUET_VECTORIZED_READER_ENABLED().key() +
-        " to false.";
-
-    if (cause != null) {
-      throw new RuntimeException(message, cause);
-    } else {
-      throw new RuntimeException(message);
-    }
-  }
-
-  /**
-   * Ensures that there is enough storage to store capacity elements. That is, 
the put() APIs
-   * must work for all rowIds < capacity.
-   */
-  protected abstract void reserveInternal(int capacity);
-
   /**
    * Returns the number of nulls in this column.
    */
-  public final int numNulls() { return numNulls; }
+  public abstract int numNulls();
 
   /**
    * Returns true if any of the nulls indicator are set for this column. This 
can be used
    * as an optimization to prevent setting nulls.
    */
-  public final boolean anyNullsSet() { return anyNullsSet; }
+  public abstract boolean anyNullsSet();
 
   /**
    * Returns the off heap ptr for the arrays backing the NULLs and values 
buffer. Only valid
@@ -356,33 +277,11 @@ public abstract class ColumnVector implements 
AutoCloseable {
   public abstract long valuesNativeAddress();
 
   /**
-   * Sets the value at rowId to null/not null.
-   */
-  public abstract void putNotNull(int rowId);
-  public abstract void putNull(int rowId);
-
-  /**
-   * Sets the values from [rowId, rowId + count) to null/not null.
-   */
-  public abstract void putNulls(int rowId, int count);
-  public abstract void putNotNulls(int rowId, int count);
-
-  /**
    * Returns whether the value at rowId is NULL.
    */
   public abstract boolean isNullAt(int rowId);
 
   /**
-   * Sets the value at rowId to `value`.
-   */
-  public abstract void putBoolean(int rowId, boolean value);
-
-  /**
-   * Sets values from [rowId, rowId + count) to value.
-   */
-  public abstract void putBooleans(int rowId, int count, boolean value);
-
-  /**
    * Returns the value for rowId.
    */
   public abstract boolean getBoolean(int rowId);
@@ -393,21 +292,6 @@ public abstract class ColumnVector implements 
AutoCloseable {
   public abstract boolean[] getBooleans(int rowId, int count);
 
   /**
-   * Sets the value at rowId to `value`.
-   */
-  public abstract void putByte(int rowId, byte value);
-
-  /**
-   * Sets values from [rowId, rowId + count) to value.
-   */
-  public abstract void putBytes(int rowId, int count, byte value);
-
-  /**
-   * Sets values from [rowId, rowId + count) to [src + srcIndex, src + 
srcIndex + count)
-   */
-  public abstract void putBytes(int rowId, int count, byte[] src, int 
srcIndex);
-
-  /**
    * Returns the value for rowId.
    */
   public abstract byte getByte(int rowId);
@@ -418,21 +302,6 @@ public abstract class ColumnVector implements 
AutoCloseable {
   public abstract byte[] getBytes(int rowId, int count);
 
   /**
-   * Sets the value at rowId to `value`.
-   */
-  public abstract void putShort(int rowId, short value);
-
-  /**
-   * Sets values from [rowId, rowId + count) to value.
-   */
-  public abstract void putShorts(int rowId, int count, short value);
-
-  /**
-   * Sets values from [rowId, rowId + count) to [src + srcIndex, src + 
srcIndex + count)
-   */
-  public abstract void putShorts(int rowId, int count, short[] src, int 
srcIndex);
-
-  /**
    * Returns the value for rowId.
    */
   public abstract short getShort(int rowId);
@@ -443,27 +312,6 @@ public abstract class ColumnVector implements 
AutoCloseable {
   public abstract short[] getShorts(int rowId, int count);
 
   /**
-   * Sets the value at rowId to `value`.
-   */
-  public abstract void putInt(int rowId, int value);
-
-  /**
-   * Sets values from [rowId, rowId + count) to value.
-   */
-  public abstract void putInts(int rowId, int count, int value);
-
-  /**
-   * Sets values from [rowId, rowId + count) to [src + srcIndex, src + 
srcIndex + count)
-   */
-  public abstract void putInts(int rowId, int count, int[] src, int srcIndex);
-
-  /**
-   * Sets values from [rowId, rowId + count) to [src[srcIndex], src[srcIndex + 
count])
-   * The data in src must be 4-byte little endian ints.
-   */
-  public abstract void putIntsLittleEndian(int rowId, int count, byte[] src, 
int srcIndex);
-
-  /**
    * Returns the value for rowId.
    */
   public abstract int getInt(int rowId);
@@ -481,27 +329,6 @@ public abstract class ColumnVector implements 
AutoCloseable {
   public abstract int getDictId(int rowId);
 
   /**
-   * Sets the value at rowId to `value`.
-   */
-  public abstract void putLong(int rowId, long value);
-
-  /**
-   * Sets values from [rowId, rowId + count) to value.
-   */
-  public abstract void putLongs(int rowId, int count, long value);
-
-  /**
-   * Sets values from [rowId, rowId + count) to [src + srcIndex, src + 
srcIndex + count)
-   */
-  public abstract void putLongs(int rowId, int count, long[] src, int 
srcIndex);
-
-  /**
-   * Sets values from [rowId, rowId + count) to [src[srcIndex], src[srcIndex + 
count])
-   * The data in src must be 8-byte little endian longs.
-   */
-  public abstract void putLongsLittleEndian(int rowId, int count, byte[] src, 
int srcIndex);
-
-  /**
    * Returns the value for rowId.
    */
   public abstract long getLong(int rowId);
@@ -512,27 +339,6 @@ public abstract class ColumnVector implements 
AutoCloseable {
   public abstract long[] getLongs(int rowId, int count);
 
   /**
-   * Sets the value at rowId to `value`.
-   */
-  public abstract void putFloat(int rowId, float value);
-
-  /**
-   * Sets values from [rowId, rowId + count) to value.
-   */
-  public abstract void putFloats(int rowId, int count, float value);
-
-  /**
-   * Sets values from [rowId, rowId + count) to [src + srcIndex, src + 
srcIndex + count)
-   */
-  public abstract void putFloats(int rowId, int count, float[] src, int 
srcIndex);
-
-  /**
-   * Sets values from [rowId, rowId + count) to [src[srcIndex], src[srcIndex + 
count])
-   * The data in src must be ieee formatted floats.
-   */
-  public abstract void putFloats(int rowId, int count, byte[] src, int 
srcIndex);
-
-  /**
    * Returns the value for rowId.
    */
   public abstract float getFloat(int rowId);
@@ -543,27 +349,6 @@ public abstract class ColumnVector implements 
AutoCloseable {
   public abstract float[] getFloats(int rowId, int count);
 
   /**
-   * Sets the value at rowId to `value`.
-   */
-  public abstract void putDouble(int rowId, double value);
-
-  /**
-   * Sets values from [rowId, rowId + count) to value.
-   */
-  public abstract void putDoubles(int rowId, int count, double value);
-
-  /**
-   * Sets values from [rowId, rowId + count) to [src + srcIndex, src + 
srcIndex + count)
-   */
-  public abstract void putDoubles(int rowId, int count, double[] src, int 
srcIndex);
-
-  /**
-   * Sets values from [rowId, rowId + count) to [src[srcIndex], src[srcIndex + 
count])
-   * The data in src must be ieee formatted doubles.
-   */
-  public abstract void putDoubles(int rowId, int count, byte[] src, int 
srcIndex);
-
-  /**
    * Returns the value for rowId.
    */
   public abstract double getDouble(int rowId);
@@ -574,11 +359,6 @@ public abstract class ColumnVector implements 
AutoCloseable {
   public abstract double[] getDoubles(int rowId, int count);
 
   /**
-   * Puts a byte array that already exists in this column.
-   */
-  public abstract void putArray(int rowId, int offset, int length);
-
-  /**
    * Returns the length of the array at rowid.
    */
   public abstract int getArrayLength(int rowId);
@@ -608,7 +388,7 @@ public abstract class ColumnVector implements AutoCloseable 
{
   /**
    * Returns the array at rowid.
    */
-  public final Array getArray(int rowId) {
+  public final ColumnVector.Array getArray(int rowId) {
     resultArray.length = getArrayLength(rowId);
     resultArray.offset = getArrayOffset(rowId);
     return resultArray;
@@ -617,24 +397,7 @@ public abstract class ColumnVector implements 
AutoCloseable {
   /**
    * Loads the data into array.byteArray.
    */
-  public abstract void loadBytes(Array array);
-
-  /**
-   * Sets the value at rowId to `value`.
-   */
-  public abstract int putByteArray(int rowId, byte[] value, int offset, int 
count);
-  public final int putByteArray(int rowId, byte[] value) {
-    return putByteArray(rowId, value, 0, value.length);
-  }
-
-  /**
-   * Returns the value for rowId.
-   */
-  private Array getByteArray(int rowId) {
-    Array array = getArray(rowId);
-    array.data.loadBytes(array);
-    return array;
-  }
+  public abstract void loadBytes(ColumnVector.Array array);
 
   /**
    * Returns the value for rowId.
@@ -646,291 +409,27 @@ public abstract class ColumnVector implements 
AutoCloseable {
   /**
    * Returns the decimal for rowId.
    */
-  public Decimal getDecimal(int rowId, int precision, int scale) {
-    if (precision <= Decimal.MAX_INT_DIGITS()) {
-      return Decimal.createUnsafe(getInt(rowId), precision, scale);
-    } else if (precision <= Decimal.MAX_LONG_DIGITS()) {
-      return Decimal.createUnsafe(getLong(rowId), precision, scale);
-    } else {
-      // TODO: best perf?
-      byte[] bytes = getBinary(rowId);
-      BigInteger bigInteger = new BigInteger(bytes);
-      BigDecimal javaDecimal = new BigDecimal(bigInteger, scale);
-      return Decimal.apply(javaDecimal, precision, scale);
-    }
-  }
-
-
-  public void putDecimal(int rowId, Decimal value, int precision) {
-    if (precision <= Decimal.MAX_INT_DIGITS()) {
-      putInt(rowId, (int) value.toUnscaledLong());
-    } else if (precision <= Decimal.MAX_LONG_DIGITS()) {
-      putLong(rowId, value.toUnscaledLong());
-    } else {
-      BigInteger bigInteger = value.toJavaBigDecimal().unscaledValue();
-      putByteArray(rowId, bigInteger.toByteArray());
-    }
-  }
+  public abstract Decimal getDecimal(int rowId, int precision, int scale);
 
   /**
    * Returns the UTF8String for rowId.
    */
-  public UTF8String getUTF8String(int rowId) {
-    if (dictionary == null) {
-      ColumnVector.Array a = getByteArray(rowId);
-      return UTF8String.fromBytes(a.byteArray, a.byteArrayOffset, a.length);
-    } else {
-      byte[] bytes = dictionary.decodeToBinary(dictionaryIds.getDictId(rowId));
-      return UTF8String.fromBytes(bytes);
-    }
-  }
+  public abstract UTF8String getUTF8String(int rowId);
 
   /**
    * Returns the byte array for rowId.
    */
-  public byte[] getBinary(int rowId) {
-    if (dictionary == null) {
-      ColumnVector.Array array = getByteArray(rowId);
-      byte[] bytes = new byte[array.length];
-      System.arraycopy(array.byteArray, array.byteArrayOffset, bytes, 0, 
bytes.length);
-      return bytes;
-    } else {
-      return dictionary.decodeToBinary(dictionaryIds.getDictId(rowId));
-    }
-  }
-
-  /**
-   * Append APIs. These APIs all behave similarly and will append data to the 
current vector.  It
-   * is not valid to mix the put and append APIs. The append APIs are slower 
and should only be
-   * used if the sizes are not known up front.
-   * In all these cases, the return value is the rowId for the first appended 
element.
-   */
-  public final int appendNull() {
-    assert (!(dataType() instanceof StructType)); // Use appendStruct()
-    reserve(elementsAppended + 1);
-    putNull(elementsAppended);
-    return elementsAppended++;
-  }
-
-  public final int appendNotNull() {
-    reserve(elementsAppended + 1);
-    putNotNull(elementsAppended);
-    return elementsAppended++;
-  }
-
-  public final int appendNulls(int count) {
-    assert (!(dataType() instanceof StructType));
-    reserve(elementsAppended + count);
-    int result = elementsAppended;
-    putNulls(elementsAppended, count);
-    elementsAppended += count;
-    return result;
-  }
-
-  public final int appendNotNulls(int count) {
-    assert (!(dataType() instanceof StructType));
-    reserve(elementsAppended + count);
-    int result = elementsAppended;
-    putNotNulls(elementsAppended, count);
-    elementsAppended += count;
-    return result;
-  }
-
-  public final int appendBoolean(boolean v) {
-    reserve(elementsAppended + 1);
-    putBoolean(elementsAppended, v);
-    return elementsAppended++;
-  }
-
-  public final int appendBooleans(int count, boolean v) {
-    reserve(elementsAppended + count);
-    int result = elementsAppended;
-    putBooleans(elementsAppended, count, v);
-    elementsAppended += count;
-    return result;
-  }
-
-  public final int appendByte(byte v) {
-    reserve(elementsAppended + 1);
-    putByte(elementsAppended, v);
-    return elementsAppended++;
-  }
-
-  public final int appendBytes(int count, byte v) {
-    reserve(elementsAppended + count);
-    int result = elementsAppended;
-    putBytes(elementsAppended, count, v);
-    elementsAppended += count;
-    return result;
-  }
-
-  public final int appendBytes(int length, byte[] src, int offset) {
-    reserve(elementsAppended + length);
-    int result = elementsAppended;
-    putBytes(elementsAppended, length, src, offset);
-    elementsAppended += length;
-    return result;
-  }
-
-  public final int appendShort(short v) {
-    reserve(elementsAppended + 1);
-    putShort(elementsAppended, v);
-    return elementsAppended++;
-  }
-
-  public final int appendShorts(int count, short v) {
-    reserve(elementsAppended + count);
-    int result = elementsAppended;
-    putShorts(elementsAppended, count, v);
-    elementsAppended += count;
-    return result;
-  }
-
-  public final int appendShorts(int length, short[] src, int offset) {
-    reserve(elementsAppended + length);
-    int result = elementsAppended;
-    putShorts(elementsAppended, length, src, offset);
-    elementsAppended += length;
-    return result;
-  }
-
-  public final int appendInt(int v) {
-    reserve(elementsAppended + 1);
-    putInt(elementsAppended, v);
-    return elementsAppended++;
-  }
-
-  public final int appendInts(int count, int v) {
-    reserve(elementsAppended + count);
-    int result = elementsAppended;
-    putInts(elementsAppended, count, v);
-    elementsAppended += count;
-    return result;
-  }
-
-  public final int appendInts(int length, int[] src, int offset) {
-    reserve(elementsAppended + length);
-    int result = elementsAppended;
-    putInts(elementsAppended, length, src, offset);
-    elementsAppended += length;
-    return result;
-  }
-
-  public final int appendLong(long v) {
-    reserve(elementsAppended + 1);
-    putLong(elementsAppended, v);
-    return elementsAppended++;
-  }
-
-  public final int appendLongs(int count, long v) {
-    reserve(elementsAppended + count);
-    int result = elementsAppended;
-    putLongs(elementsAppended, count, v);
-    elementsAppended += count;
-    return result;
-  }
-
-  public final int appendLongs(int length, long[] src, int offset) {
-    reserve(elementsAppended + length);
-    int result = elementsAppended;
-    putLongs(elementsAppended, length, src, offset);
-    elementsAppended += length;
-    return result;
-  }
-
-  public final int appendFloat(float v) {
-    reserve(elementsAppended + 1);
-    putFloat(elementsAppended, v);
-    return elementsAppended++;
-  }
-
-  public final int appendFloats(int count, float v) {
-    reserve(elementsAppended + count);
-    int result = elementsAppended;
-    putFloats(elementsAppended, count, v);
-    elementsAppended += count;
-    return result;
-  }
-
-  public final int appendFloats(int length, float[] src, int offset) {
-    reserve(elementsAppended + length);
-    int result = elementsAppended;
-    putFloats(elementsAppended, length, src, offset);
-    elementsAppended += length;
-    return result;
-  }
-
-  public final int appendDouble(double v) {
-    reserve(elementsAppended + 1);
-    putDouble(elementsAppended, v);
-    return elementsAppended++;
-  }
-
-  public final int appendDoubles(int count, double v) {
-    reserve(elementsAppended + count);
-    int result = elementsAppended;
-    putDoubles(elementsAppended, count, v);
-    elementsAppended += count;
-    return result;
-  }
-
-  public final int appendDoubles(int length, double[] src, int offset) {
-    reserve(elementsAppended + length);
-    int result = elementsAppended;
-    putDoubles(elementsAppended, length, src, offset);
-    elementsAppended += length;
-    return result;
-  }
-
-  public final int appendByteArray(byte[] value, int offset, int length) {
-    int copiedOffset = arrayData().appendBytes(length, value, offset);
-    reserve(elementsAppended + 1);
-    putArray(elementsAppended, copiedOffset, length);
-    return elementsAppended++;
-  }
-
-  public final int appendArray(int length) {
-    reserve(elementsAppended + 1);
-    putArray(elementsAppended, arrayData().elementsAppended, length);
-    return elementsAppended++;
-  }
-
-  /**
-   * Appends a NULL struct. This *has* to be used for structs instead of 
appendNull() as this
-   * recursively appends a NULL to its children.
-   * We don't have this logic as the general appendNull implementation to 
optimize the more
-   * common non-struct case.
-   */
-  public final int appendStruct(boolean isNull) {
-    if (isNull) {
-      appendNull();
-      for (ColumnVector c: childColumns) {
-        if (c.type instanceof StructType) {
-          c.appendStruct(true);
-        } else {
-          c.appendNull();
-        }
-      }
-    } else {
-      appendNotNull();
-    }
-    return elementsAppended;
-  }
+  public abstract byte[] getBinary(int rowId);
 
   /**
    * Returns the data for the underlying array.
    */
-  public final ColumnVector arrayData() { return childColumns[0]; }
+  public abstract ColumnVector arrayData();
 
   /**
    * Returns the ordinal's child data column.
    */
-  public final ColumnVector getChildColumn(int ordinal) { return 
childColumns[ordinal]; }
-
-  /**
-   * Returns the elements appended.
-   */
-  public final int getElementsAppended() { return elementsAppended; }
+  public abstract ColumnVector getChildColumn(int ordinal);
 
   /**
    * Returns true if this column is an array.
@@ -938,62 +437,14 @@ public abstract class ColumnVector implements 
AutoCloseable {
   public final boolean isArray() { return resultArray != null; }
 
   /**
-   * Marks this column as being constant.
-   */
-  public final void setIsConstant() { isConstant = true; }
-
-  /**
-   * Maximum number of rows that can be stored in this column.
-   */
-  protected int capacity;
-
-  /**
-   * Upper limit for the maximum capacity for this column.
-   */
-  @VisibleForTesting
-  protected int MAX_CAPACITY = Integer.MAX_VALUE;
-
-  /**
    * Data type for this column.
    */
   protected DataType type;
 
   /**
-   * Number of nulls in this column. This is an optimization for the reader, 
to skip NULL checks.
-   */
-  protected int numNulls;
-
-  /**
-   * True if there is at least one NULL byte set. This is an optimization for 
the writer, to skip
-   * having to clear NULL bits.
-   */
-  protected boolean anyNullsSet;
-
-  /**
-   * True if this column's values are fixed. This means the column values 
never change, even
-   * across resets.
-   */
-  protected boolean isConstant;
-
-  /**
-   * Default size of each array length value. This grows as necessary.
-   */
-  protected static final int DEFAULT_ARRAY_LENGTH = 4;
-
-  /**
-   * Current write cursor (row index) when appending data.
-   */
-  protected int elementsAppended;
-
-  /**
-   * If this is a nested type (array or struct), the column for the child data.
-   */
-  protected ColumnVector[] childColumns;
-
-  /**
    * Reusable Array holder for getArray().
    */
-  protected Array resultArray;
+  protected ColumnVector.Array resultArray;
 
   /**
    * Reusable Struct holder for getStruct().
@@ -1013,32 +464,11 @@ public abstract class ColumnVector implements 
AutoCloseable {
   protected ColumnVector dictionaryIds;
 
   /**
-   * Update the dictionary.
-   */
-  public void setDictionary(Dictionary dictionary) {
-    this.dictionary = dictionary;
-  }
-
-  /**
    * Returns true if this column has a dictionary.
    */
   public boolean hasDictionary() { return this.dictionary != null; }
 
   /**
-   * Reserve a integer column for ids of dictionary.
-   */
-  public ColumnVector reserveDictionaryIds(int capacity) {
-    if (dictionaryIds == null) {
-      dictionaryIds = allocate(capacity, DataTypes.IntegerType,
-        this instanceof OnHeapColumnVector ? MemoryMode.ON_HEAP : 
MemoryMode.OFF_HEAP);
-    } else {
-      dictionaryIds.reset();
-      dictionaryIds.reserve(capacity);
-    }
-    return dictionaryIds;
-  }
-
-  /**
    * Returns the underlying integer column for ids of dictionary.
    */
   public ColumnVector getDictionaryIds() {
@@ -1049,43 +479,7 @@ public abstract class ColumnVector implements 
AutoCloseable {
    * Sets up the common state and also handles creating the child columns if 
this is a nested
    * type.
    */
-  protected ColumnVector(int capacity, DataType type, MemoryMode memMode) {
-    this.capacity = capacity;
+  protected ColumnVector(DataType type) {
     this.type = type;
-
-    if (type instanceof ArrayType || type instanceof BinaryType || type 
instanceof StringType
-        || DecimalType.isByteArrayDecimalType(type)) {
-      DataType childType;
-      int childCapacity = capacity;
-      if (type instanceof ArrayType) {
-        childType = ((ArrayType)type).elementType();
-      } else {
-        childType = DataTypes.ByteType;
-        childCapacity *= DEFAULT_ARRAY_LENGTH;
-      }
-      this.childColumns = new ColumnVector[1];
-      this.childColumns[0] = ColumnVector.allocate(childCapacity, childType, 
memMode);
-      this.resultArray = new Array(this.childColumns[0]);
-      this.resultStruct = null;
-    } else if (type instanceof StructType) {
-      StructType st = (StructType)type;
-      this.childColumns = new ColumnVector[st.fields().length];
-      for (int i = 0; i < childColumns.length; ++i) {
-        this.childColumns[i] = ColumnVector.allocate(capacity, 
st.fields()[i].dataType(), memMode);
-      }
-      this.resultArray = null;
-      this.resultStruct = new ColumnarBatch.Row(this.childColumns);
-    } else if (type instanceof CalendarIntervalType) {
-      // Two columns. Months as int. Microseconds as Long.
-      this.childColumns = new ColumnVector[2];
-      this.childColumns[0] = ColumnVector.allocate(capacity, 
DataTypes.IntegerType, memMode);
-      this.childColumns[1] = ColumnVector.allocate(capacity, 
DataTypes.LongType, memMode);
-      this.resultArray = null;
-      this.resultStruct = new ColumnarBatch.Row(this.childColumns);
-    } else {
-      this.childColumns = null;
-      this.resultArray = null;
-      this.resultStruct = null;
-    }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/9e33954d/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java
index 900d7c4..adb859e 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java
@@ -40,7 +40,7 @@ public class ColumnVectorUtils {
   /**
    * Populates the entire `col` with `row[fieldIdx]`
    */
-  public static void populate(ColumnVector col, InternalRow row, int fieldIdx) 
{
+  public static void populate(WritableColumnVector col, InternalRow row, int 
fieldIdx) {
     int capacity = col.capacity;
     DataType t = col.dataType();
 
@@ -115,7 +115,7 @@ public class ColumnVectorUtils {
     }
   }
 
-  private static void appendValue(ColumnVector dst, DataType t, Object o) {
+  private static void appendValue(WritableColumnVector dst, DataType t, Object 
o) {
     if (o == null) {
       if (t instanceof CalendarIntervalType) {
         dst.appendStruct(true);
@@ -165,7 +165,7 @@ public class ColumnVectorUtils {
     }
   }
 
-  private static void appendValue(ColumnVector dst, DataType t, Row src, int 
fieldIdx) {
+  private static void appendValue(WritableColumnVector dst, DataType t, Row 
src, int fieldIdx) {
     if (t instanceof ArrayType) {
       ArrayType at = (ArrayType)t;
       if (src.isNullAt(fieldIdx)) {
@@ -198,15 +198,23 @@ public class ColumnVectorUtils {
    */
   public static ColumnarBatch toBatch(
       StructType schema, MemoryMode memMode, Iterator<Row> row) {
-    ColumnarBatch batch = ColumnarBatch.allocate(schema, memMode);
+    int capacity = ColumnarBatch.DEFAULT_BATCH_SIZE;
+    WritableColumnVector[] columnVectors;
+    if (memMode == MemoryMode.OFF_HEAP) {
+      columnVectors = OffHeapColumnVector.allocateColumns(capacity, schema);
+    } else {
+      columnVectors = OnHeapColumnVector.allocateColumns(capacity, schema);
+    }
+
     int n = 0;
     while (row.hasNext()) {
       Row r = row.next();
       for (int i = 0; i < schema.fields().length; i++) {
-        appendValue(batch.column(i), schema.fields()[i].dataType(), r, i);
+        appendValue(columnVectors[i], schema.fields()[i].dataType(), r, i);
       }
       n++;
     }
+    ColumnarBatch batch = new ColumnarBatch(schema, columnVectors, capacity);
     batch.setNumRows(n);
     return batch;
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/9e33954d/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java
index 34dc3af..e782756 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java
@@ -19,7 +19,6 @@ package org.apache.spark.sql.execution.vectorized;
 import java.math.BigDecimal;
 import java.util.*;
 
-import org.apache.spark.memory.MemoryMode;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
 import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
@@ -44,8 +43,7 @@ import org.apache.spark.unsafe.types.UTF8String;
  *  - Compaction: The batch and columns should be able to compact based on a 
selection vector.
  */
 public final class ColumnarBatch {
-  private static final int DEFAULT_BATCH_SIZE = 4 * 1024;
-  private static MemoryMode DEFAULT_MEMORY_MODE = MemoryMode.ON_HEAP;
+  public static final int DEFAULT_BATCH_SIZE = 4 * 1024;
 
   private final StructType schema;
   private final int capacity;
@@ -64,18 +62,6 @@ public final class ColumnarBatch {
   // Staging row returned from getRow.
   final Row row;
 
-  public static ColumnarBatch allocate(StructType schema, MemoryMode memMode) {
-    return new ColumnarBatch(schema, DEFAULT_BATCH_SIZE, memMode);
-  }
-
-  public static ColumnarBatch allocate(StructType type) {
-    return new ColumnarBatch(type, DEFAULT_BATCH_SIZE, DEFAULT_MEMORY_MODE);
-  }
-
-  public static ColumnarBatch allocate(StructType schema, MemoryMode memMode, 
int maxRows) {
-    return new ColumnarBatch(schema, maxRows, memMode);
-  }
-
   /**
    * Called to close all the columns in this batch. It is not valid to access 
the data after
    * calling this. This must be called at the end to clean up memory 
allocations.
@@ -95,12 +81,19 @@ public final class ColumnarBatch {
     private final ColumnarBatch parent;
     private final int fixedLenRowSize;
     private final ColumnVector[] columns;
+    private final WritableColumnVector[] writableColumns;
 
     // Ctor used if this is a top level row.
     private Row(ColumnarBatch parent) {
       this.parent = parent;
       this.fixedLenRowSize = 
UnsafeRow.calculateFixedPortionByteSize(parent.numCols());
       this.columns = parent.columns;
+      this.writableColumns = new WritableColumnVector[this.columns.length];
+      for (int i = 0; i < this.columns.length; i++) {
+        if (this.columns[i] instanceof WritableColumnVector) {
+          this.writableColumns[i] = (WritableColumnVector) this.columns[i];
+        }
+      }
     }
 
     // Ctor used if this is a struct.
@@ -108,6 +101,12 @@ public final class ColumnarBatch {
       this.parent = null;
       this.fixedLenRowSize = 
UnsafeRow.calculateFixedPortionByteSize(columns.length);
       this.columns = columns;
+      this.writableColumns = new WritableColumnVector[this.columns.length];
+      for (int i = 0; i < this.columns.length; i++) {
+        if (this.columns[i] instanceof WritableColumnVector) {
+          this.writableColumns[i] = (WritableColumnVector) this.columns[i];
+        }
+      }
     }
 
     /**
@@ -307,64 +306,69 @@ public final class ColumnarBatch {
 
     @Override
     public void setNullAt(int ordinal) {
-      assert (!columns[ordinal].isConstant);
-      columns[ordinal].putNull(rowId);
+      getWritableColumn(ordinal).putNull(rowId);
     }
 
     @Override
     public void setBoolean(int ordinal, boolean value) {
-      assert (!columns[ordinal].isConstant);
-      columns[ordinal].putNotNull(rowId);
-      columns[ordinal].putBoolean(rowId, value);
+      WritableColumnVector column = getWritableColumn(ordinal);
+      column.putNotNull(rowId);
+      column.putBoolean(rowId, value);
     }
 
     @Override
     public void setByte(int ordinal, byte value) {
-      assert (!columns[ordinal].isConstant);
-      columns[ordinal].putNotNull(rowId);
-      columns[ordinal].putByte(rowId, value);
+      WritableColumnVector column = getWritableColumn(ordinal);
+      column.putNotNull(rowId);
+      column.putByte(rowId, value);
     }
 
     @Override
     public void setShort(int ordinal, short value) {
-      assert (!columns[ordinal].isConstant);
-      columns[ordinal].putNotNull(rowId);
-      columns[ordinal].putShort(rowId, value);
+      WritableColumnVector column = getWritableColumn(ordinal);
+      column.putNotNull(rowId);
+      column.putShort(rowId, value);
     }
 
     @Override
     public void setInt(int ordinal, int value) {
-      assert (!columns[ordinal].isConstant);
-      columns[ordinal].putNotNull(rowId);
-      columns[ordinal].putInt(rowId, value);
+      WritableColumnVector column = getWritableColumn(ordinal);
+      column.putNotNull(rowId);
+      column.putInt(rowId, value);
     }
 
     @Override
     public void setLong(int ordinal, long value) {
-      assert (!columns[ordinal].isConstant);
-      columns[ordinal].putNotNull(rowId);
-      columns[ordinal].putLong(rowId, value);
+      WritableColumnVector column = getWritableColumn(ordinal);
+      column.putNotNull(rowId);
+      column.putLong(rowId, value);
     }
 
     @Override
     public void setFloat(int ordinal, float value) {
-      assert (!columns[ordinal].isConstant);
-      columns[ordinal].putNotNull(rowId);
-      columns[ordinal].putFloat(rowId, value);
+      WritableColumnVector column = getWritableColumn(ordinal);
+      column.putNotNull(rowId);
+      column.putFloat(rowId, value);
     }
 
     @Override
     public void setDouble(int ordinal, double value) {
-      assert (!columns[ordinal].isConstant);
-      columns[ordinal].putNotNull(rowId);
-      columns[ordinal].putDouble(rowId, value);
+      WritableColumnVector column = getWritableColumn(ordinal);
+      column.putNotNull(rowId);
+      column.putDouble(rowId, value);
     }
 
     @Override
     public void setDecimal(int ordinal, Decimal value, int precision) {
-      assert (!columns[ordinal].isConstant);
-      columns[ordinal].putNotNull(rowId);
-      columns[ordinal].putDecimal(rowId, value, precision);
+      WritableColumnVector column = getWritableColumn(ordinal);
+      column.putNotNull(rowId);
+      column.putDecimal(rowId, value, precision);
+    }
+
+    private WritableColumnVector getWritableColumn(int ordinal) {
+      WritableColumnVector column = writableColumns[ordinal];
+      assert (!column.isConstant);
+      return column;
     }
   }
 
@@ -409,7 +413,9 @@ public final class ColumnarBatch {
    */
   public void reset() {
     for (int i = 0; i < numCols(); ++i) {
-      columns[i].reset();
+      if (columns[i] instanceof WritableColumnVector) {
+        ((WritableColumnVector) columns[i]).reset();
+      }
     }
     if (this.numRowsFiltered > 0) {
       Arrays.fill(filteredRows, false);
@@ -427,7 +433,7 @@ public final class ColumnarBatch {
     this.numRows = numRows;
 
     for (int ordinal : nullFilteredColumns) {
-      if (columns[ordinal].numNulls != 0) {
+      if (columns[ordinal].numNulls() != 0) {
         for (int rowId = 0; rowId < numRows; rowId++) {
           if (!filteredRows[rowId] && columns[ordinal].isNullAt(rowId)) {
             filteredRows[rowId] = true;
@@ -505,18 +511,12 @@ public final class ColumnarBatch {
     nullFilteredColumns.add(ordinal);
   }
 
-  private ColumnarBatch(StructType schema, int maxRows, MemoryMode memMode) {
+  public ColumnarBatch(StructType schema, ColumnVector[] columns, int 
capacity) {
     this.schema = schema;
-    this.capacity = maxRows;
-    this.columns = new ColumnVector[schema.size()];
+    this.columns = columns;
+    this.capacity = capacity;
     this.nullFilteredColumns = new HashSet<>();
-    this.filteredRows = new boolean[maxRows];
-
-    for (int i = 0; i < schema.fields().length; ++i) {
-      StructField field = schema.fields()[i];
-      columns[i] = ColumnVector.allocate(maxRows, field.dataType(), memMode);
-    }
-
+    this.filteredRows = new boolean[capacity];
     this.row = new Row(this);
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/9e33954d/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
index 2d1f3da..3568275 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
@@ -19,18 +19,39 @@ package org.apache.spark.sql.execution.vectorized;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 
-import org.apache.spark.memory.MemoryMode;
 import org.apache.spark.sql.types.*;
 import org.apache.spark.unsafe.Platform;
 
 /**
  * Column data backed using offheap memory.
  */
-public final class OffHeapColumnVector extends ColumnVector {
+public final class OffHeapColumnVector extends WritableColumnVector {
 
   private static final boolean bigEndianPlatform =
     ByteOrder.nativeOrder().equals(ByteOrder.BIG_ENDIAN);
 
+  /**
+   * Allocates columns to store elements of each field of the schema off heap.
+   * Capacity is the initial capacity of the vector and it will grow as 
necessary. Capacity is
+   * in number of elements, not number of bytes.
+   */
+  public static OffHeapColumnVector[] allocateColumns(int capacity, StructType 
schema) {
+    return allocateColumns(capacity, schema.fields());
+  }
+
+  /**
+   * Allocates columns to store elements of each field off heap.
+   * Capacity is the initial capacity of the vector and it will grow as 
necessary. Capacity is
+   * in number of elements, not number of bytes.
+   */
+  public static OffHeapColumnVector[] allocateColumns(int capacity, 
StructField[] fields) {
+    OffHeapColumnVector[] vectors = new OffHeapColumnVector[fields.length];
+    for (int i = 0; i < fields.length; i++) {
+      vectors[i] = new OffHeapColumnVector(capacity, fields[i].dataType());
+    }
+    return vectors;
+  }
+
   // The data stored in these two allocations need to maintain binary 
compatible. We can
   // directly pass this buffer to external components.
   private long nulls;
@@ -40,8 +61,8 @@ public final class OffHeapColumnVector extends ColumnVector {
   private long lengthData;
   private long offsetData;
 
-  protected OffHeapColumnVector(int capacity, DataType type) {
-    super(capacity, type, MemoryMode.OFF_HEAP);
+  public OffHeapColumnVector(int capacity, DataType type) {
+    super(capacity, type);
 
     nulls = 0;
     data = 0;
@@ -519,4 +540,9 @@ public final class OffHeapColumnVector extends ColumnVector 
{
     Platform.setMemory(nulls + oldCapacity, (byte)0, newCapacity - 
oldCapacity);
     capacity = newCapacity;
   }
+
+  @Override
+  protected OffHeapColumnVector reserveNewColumn(int capacity, DataType type) {
+    return new OffHeapColumnVector(capacity, type);
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/9e33954d/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
index 5064343..96a4529 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
@@ -20,7 +20,6 @@ import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 import java.util.Arrays;
 
-import org.apache.spark.memory.MemoryMode;
 import org.apache.spark.sql.types.*;
 import org.apache.spark.unsafe.Platform;
 
@@ -28,11 +27,33 @@ import org.apache.spark.unsafe.Platform;
  * A column backed by an in memory JVM array. This stores the NULLs as a byte 
per value
  * and a java array for the values.
  */
-public final class OnHeapColumnVector extends ColumnVector {
+public final class OnHeapColumnVector extends WritableColumnVector {
 
   private static final boolean bigEndianPlatform =
     ByteOrder.nativeOrder().equals(ByteOrder.BIG_ENDIAN);
 
+  /**
+   * Allocates columns to store elements of each field of the schema on heap.
+   * Capacity is the initial capacity of the vector and it will grow as 
necessary. Capacity is
+   * in number of elements, not number of bytes.
+   */
+  public static OnHeapColumnVector[] allocateColumns(int capacity, StructType 
schema) {
+    return allocateColumns(capacity, schema.fields());
+  }
+
+  /**
+   * Allocates columns to store elements of each field on heap.
+   * Capacity is the initial capacity of the vector and it will grow as 
necessary. Capacity is
+   * in number of elements, not number of bytes.
+   */
+  public static OnHeapColumnVector[] allocateColumns(int capacity, 
StructField[] fields) {
+    OnHeapColumnVector[] vectors = new OnHeapColumnVector[fields.length];
+    for (int i = 0; i < fields.length; i++) {
+      vectors[i] = new OnHeapColumnVector(capacity, fields[i].dataType());
+    }
+    return vectors;
+  }
+
   // The data stored in these arrays need to maintain binary compatible. We can
   // directly pass this buffer to external components.
 
@@ -51,8 +72,9 @@ public final class OnHeapColumnVector extends ColumnVector {
   private int[] arrayLengths;
   private int[] arrayOffsets;
 
-  protected OnHeapColumnVector(int capacity, DataType type) {
-    super(capacity, type, MemoryMode.ON_HEAP);
+  public OnHeapColumnVector(int capacity, DataType type) {
+    super(capacity, type);
+
     reserveInternal(capacity);
     reset();
   }
@@ -529,4 +551,9 @@ public final class OnHeapColumnVector extends ColumnVector {
 
     capacity = newCapacity;
   }
+
+  @Override
+  protected OnHeapColumnVector reserveNewColumn(int capacity, DataType type) {
+    return new OnHeapColumnVector(capacity, type);
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/9e33954d/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ReadOnlyColumnVector.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ReadOnlyColumnVector.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ReadOnlyColumnVector.java
deleted file mode 100644
index e9f6e7c..0000000
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ReadOnlyColumnVector.java
+++ /dev/null
@@ -1,251 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.vectorized;
-
-import org.apache.spark.memory.MemoryMode;
-import org.apache.spark.sql.types.*;
-
-/**
- * An abstract class for read-only column vector.
- */
-public abstract class ReadOnlyColumnVector extends ColumnVector {
-
-  protected ReadOnlyColumnVector(int capacity, DataType type, MemoryMode 
memMode) {
-    super(capacity, DataTypes.NullType, memMode);
-    this.type = type;
-    isConstant = true;
-  }
-
-  //
-  // APIs dealing with nulls
-  //
-
-  @Override
-  public final void putNotNull(int rowId) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public final void putNull(int rowId) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public final void putNulls(int rowId, int count) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public final void putNotNulls(int rowId, int count) {
-    throw new UnsupportedOperationException();
-  }
-
-  //
-  // APIs dealing with Booleans
-  //
-
-  @Override
-  public final void putBoolean(int rowId, boolean value) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public final void putBooleans(int rowId, int count, boolean value) {
-    throw new UnsupportedOperationException();
-  }
-
-  //
-  // APIs dealing with Bytes
-  //
-
-  @Override
-  public final void putByte(int rowId, byte value) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public final void putBytes(int rowId, int count, byte value) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public final void putBytes(int rowId, int count, byte[] src, int srcIndex) {
-    throw new UnsupportedOperationException();
-  }
-
-  //
-  // APIs dealing with Shorts
-  //
-
-  @Override
-  public final void putShort(int rowId, short value) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public final void putShorts(int rowId, int count, short value) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public final void putShorts(int rowId, int count, short[] src, int srcIndex) 
{
-    throw new UnsupportedOperationException();
-  }
-
-  //
-  // APIs dealing with Ints
-  //
-
-  @Override
-  public final void putInt(int rowId, int value) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public final void putInts(int rowId, int count, int value) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public final void putInts(int rowId, int count, int[] src, int srcIndex) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public final void putIntsLittleEndian(int rowId, int count, byte[] src, int 
srcIndex) {
-    throw new UnsupportedOperationException();
-  }
-
-  //
-  // APIs dealing with Longs
-  //
-
-  @Override
-  public final void putLong(int rowId, long value) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public final void putLongs(int rowId, int count, long value) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public final void putLongs(int rowId, int count, long[] src, int srcIndex) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public final void putLongsLittleEndian(int rowId, int count, byte[] src, int 
srcIndex) {
-    throw new UnsupportedOperationException();
-  }
-
-  //
-  // APIs dealing with floats
-  //
-
-  @Override
-  public final void putFloat(int rowId, float value) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public final void putFloats(int rowId, int count, float value) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public final void putFloats(int rowId, int count, float[] src, int srcIndex) 
{
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public final void putFloats(int rowId, int count, byte[] src, int srcIndex) {
-    throw new UnsupportedOperationException();
-  }
-
-  //
-  // APIs dealing with doubles
-  //
-
-  @Override
-  public final void putDouble(int rowId, double value) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public final void putDoubles(int rowId, int count, double value) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public final void putDoubles(int rowId, int count, double[] src, int 
srcIndex) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public final void putDoubles(int rowId, int count, byte[] src, int srcIndex) 
{
-    throw new UnsupportedOperationException();
-  }
-
-  //
-  // APIs dealing with Arrays
-  //
-
-  @Override
-  public final void putArray(int rowId, int offset, int length) {
-    throw new UnsupportedOperationException();
-  }
-
-  //
-  // APIs dealing with Byte Arrays
-  //
-
-  @Override
-  public final int putByteArray(int rowId, byte[] value, int offset, int 
count) {
-    throw new UnsupportedOperationException();
-  }
-
-  //
-  // APIs dealing with Decimals
-  //
-
-  @Override
-  public final void putDecimal(int rowId, Decimal value, int precision) {
-    throw new UnsupportedOperationException();
-  }
-
-  //
-  // Other APIs
-  //
-
-  @Override
-  public final void setDictionary(Dictionary dictionary) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public final ColumnVector reserveDictionaryIds(int capacity) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  protected final void reserveInternal(int newCapacity) {
-    throw new UnsupportedOperationException();
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to