Repository: spark
Updated Branches:
  refs/heads/master dc5d34d8d -> 9e33954dd


http://git-wip-us.apache.org/repos/asf/spark/blob/9e33954d/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
new file mode 100644
index 0000000..b4f753c
--- /dev/null
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
@@ -0,0 +1,674 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.vectorized;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.apache.spark.sql.internal.SQLConf;
+import org.apache.spark.sql.types.*;
+import org.apache.spark.unsafe.types.UTF8String;
+
+/**
+ * This class adds write APIs to ColumnVector.
+ * It supports all the types and contains put APIs as well as their batched 
versions.
+ * The batched versions are preferable whenever possible.
+ *
+ * Capacity: The data stored is dense but the arrays are not fixed capacity. 
It is the
+ * responsibility of the caller to call reserve() to ensure there is enough 
room before adding
+ * elements. This means that the put() APIs do not check as in common cases 
(i.e. flat schemas),
+ * the lengths are known up front.
+ *
+ * A ColumnVector should be considered immutable once originally created. In 
other words, it is not
+ * valid to call put APIs after reads until reset() is called.
+ */
+public abstract class WritableColumnVector extends ColumnVector {
+
+  /**
+   * Resets this column for writing. The currently stored values are no longer 
accessible.
+   */
+  public void reset() {
+    if (isConstant) return;
+
+    if (childColumns != null) {
+      for (ColumnVector c: childColumns) {
+        ((WritableColumnVector) c).reset();
+      }
+    }
+    numNulls = 0;
+    elementsAppended = 0;
+    if (anyNullsSet) {
+      putNotNulls(0, capacity);
+      anyNullsSet = false;
+    }
+  }
+
+  public void reserve(int requiredCapacity) {
+    if (requiredCapacity > capacity) {
+      int newCapacity = (int) Math.min(MAX_CAPACITY, requiredCapacity * 2L);
+      if (requiredCapacity <= newCapacity) {
+        try {
+          reserveInternal(newCapacity);
+        } catch (OutOfMemoryError outOfMemoryError) {
+          throwUnsupportedException(requiredCapacity, outOfMemoryError);
+        }
+      } else {
+        throwUnsupportedException(requiredCapacity, null);
+      }
+    }
+  }
+
+  private void throwUnsupportedException(int requiredCapacity, Throwable 
cause) {
+    String message = "Cannot reserve additional contiguous bytes in the 
vectorized reader " +
+        "(requested = " + requiredCapacity + " bytes). As a workaround, you 
can disable the " +
+        "vectorized reader by setting " + 
SQLConf.PARQUET_VECTORIZED_READER_ENABLED().key() +
+        " to false.";
+    throw new RuntimeException(message, cause);
+  }
+
+  @Override
+  public int numNulls() { return numNulls; }
+
+  @Override
+  public boolean anyNullsSet() { return anyNullsSet; }
+
+  /**
+   * Ensures that there is enough storage to store capacity elements. That is, 
the put() APIs
+   * must work for all rowIds < capacity.
+   */
+  protected abstract void reserveInternal(int capacity);
+
+  /**
+   * Sets the value at rowId to null/not null.
+   */
+  public abstract void putNotNull(int rowId);
+  public abstract void putNull(int rowId);
+
+  /**
+   * Sets the values from [rowId, rowId + count) to null/not null.
+   */
+  public abstract void putNulls(int rowId, int count);
+  public abstract void putNotNulls(int rowId, int count);
+
+  /**
+   * Sets the value at rowId to `value`.
+   */
+  public abstract void putBoolean(int rowId, boolean value);
+
+  /**
+   * Sets values from [rowId, rowId + count) to value.
+   */
+  public abstract void putBooleans(int rowId, int count, boolean value);
+
+  /**
+   * Sets the value at rowId to `value`.
+   */
+  public abstract void putByte(int rowId, byte value);
+
+  /**
+   * Sets values from [rowId, rowId + count) to value.
+   */
+  public abstract void putBytes(int rowId, int count, byte value);
+
+  /**
+   * Sets values from [rowId, rowId + count) to [src + srcIndex, src + 
srcIndex + count)
+   */
+  public abstract void putBytes(int rowId, int count, byte[] src, int 
srcIndex);
+
+  /**
+   * Sets the value at rowId to `value`.
+   */
+  public abstract void putShort(int rowId, short value);
+
+  /**
+   * Sets values from [rowId, rowId + count) to value.
+   */
+  public abstract void putShorts(int rowId, int count, short value);
+
+  /**
+   * Sets values from [rowId, rowId + count) to [src + srcIndex, src + 
srcIndex + count)
+   */
+  public abstract void putShorts(int rowId, int count, short[] src, int 
srcIndex);
+
+  /**
+   * Sets the value at rowId to `value`.
+   */
+  public abstract void putInt(int rowId, int value);
+
+  /**
+   * Sets values from [rowId, rowId + count) to value.
+   */
+  public abstract void putInts(int rowId, int count, int value);
+
+  /**
+   * Sets values from [rowId, rowId + count) to [src + srcIndex, src + 
srcIndex + count)
+   */
+  public abstract void putInts(int rowId, int count, int[] src, int srcIndex);
+
+  /**
+   * Sets values from [rowId, rowId + count) to [src[srcIndex], src[srcIndex + 
count])
+   * The data in src must be 4-byte little endian ints.
+   */
+  public abstract void putIntsLittleEndian(int rowId, int count, byte[] src, 
int srcIndex);
+
+  /**
+   * Sets the value at rowId to `value`.
+   */
+  public abstract void putLong(int rowId, long value);
+
+  /**
+   * Sets values from [rowId, rowId + count) to value.
+   */
+  public abstract void putLongs(int rowId, int count, long value);
+
+  /**
+   * Sets values from [rowId, rowId + count) to [src + srcIndex, src + 
srcIndex + count)
+   */
+  public abstract void putLongs(int rowId, int count, long[] src, int 
srcIndex);
+
+  /**
+   * Sets values from [rowId, rowId + count) to [src[srcIndex], src[srcIndex + 
count])
+   * The data in src must be 8-byte little endian longs.
+   */
+  public abstract void putLongsLittleEndian(int rowId, int count, byte[] src, 
int srcIndex);
+
+  /**
+   * Sets the value at rowId to `value`.
+   */
+  public abstract void putFloat(int rowId, float value);
+
+  /**
+   * Sets values from [rowId, rowId + count) to value.
+   */
+  public abstract void putFloats(int rowId, int count, float value);
+
+  /**
+   * Sets values from [rowId, rowId + count) to [src + srcIndex, src + 
srcIndex + count)
+   */
+  public abstract void putFloats(int rowId, int count, float[] src, int 
srcIndex);
+
+  /**
+   * Sets values from [rowId, rowId + count) to [src[srcIndex], src[srcIndex + 
count])
+   * The data in src must be ieee formatted floats.
+   */
+  public abstract void putFloats(int rowId, int count, byte[] src, int 
srcIndex);
+
+  /**
+   * Sets the value at rowId to `value`.
+   */
+  public abstract void putDouble(int rowId, double value);
+
+  /**
+   * Sets values from [rowId, rowId + count) to value.
+   */
+  public abstract void putDoubles(int rowId, int count, double value);
+
+  /**
+   * Sets values from [rowId, rowId + count) to [src + srcIndex, src + 
srcIndex + count)
+   */
+  public abstract void putDoubles(int rowId, int count, double[] src, int 
srcIndex);
+
+  /**
+   * Sets values from [rowId, rowId + count) to [src[srcIndex], src[srcIndex + 
count])
+   * The data in src must be ieee formatted doubles.
+   */
+  public abstract void putDoubles(int rowId, int count, byte[] src, int 
srcIndex);
+
+  /**
+   * Puts a byte array that already exists in this column.
+   */
+  public abstract void putArray(int rowId, int offset, int length);
+
+  /**
+   * Sets the value at rowId to `value`.
+   */
+  public abstract int putByteArray(int rowId, byte[] value, int offset, int 
count);
+  public final int putByteArray(int rowId, byte[] value) {
+    return putByteArray(rowId, value, 0, value.length);
+  }
+
+  /**
+   * Returns the value for rowId.
+   */
+  private ColumnVector.Array getByteArray(int rowId) {
+    ColumnVector.Array array = getArray(rowId);
+    array.data.loadBytes(array);
+    return array;
+  }
+
+  /**
+   * Returns the decimal for rowId.
+   */
+  @Override
+  public Decimal getDecimal(int rowId, int precision, int scale) {
+    if (precision <= Decimal.MAX_INT_DIGITS()) {
+      return Decimal.createUnsafe(getInt(rowId), precision, scale);
+    } else if (precision <= Decimal.MAX_LONG_DIGITS()) {
+      return Decimal.createUnsafe(getLong(rowId), precision, scale);
+    } else {
+      // TODO: best perf?
+      byte[] bytes = getBinary(rowId);
+      BigInteger bigInteger = new BigInteger(bytes);
+      BigDecimal javaDecimal = new BigDecimal(bigInteger, scale);
+      return Decimal.apply(javaDecimal, precision, scale);
+    }
+  }
+
+  public void putDecimal(int rowId, Decimal value, int precision) {
+    if (precision <= Decimal.MAX_INT_DIGITS()) {
+      putInt(rowId, (int) value.toUnscaledLong());
+    } else if (precision <= Decimal.MAX_LONG_DIGITS()) {
+      putLong(rowId, value.toUnscaledLong());
+    } else {
+      BigInteger bigInteger = value.toJavaBigDecimal().unscaledValue();
+      putByteArray(rowId, bigInteger.toByteArray());
+    }
+  }
+
+  /**
+   * Returns the UTF8String for rowId.
+   */
+  @Override
+  public UTF8String getUTF8String(int rowId) {
+    if (dictionary == null) {
+      ColumnVector.Array a = getByteArray(rowId);
+      return UTF8String.fromBytes(a.byteArray, a.byteArrayOffset, a.length);
+    } else {
+      byte[] bytes = dictionary.decodeToBinary(dictionaryIds.getDictId(rowId));
+      return UTF8String.fromBytes(bytes);
+    }
+  }
+
+  /**
+   * Returns the byte array for rowId.
+   */
+  @Override
+  public byte[] getBinary(int rowId) {
+    if (dictionary == null) {
+      ColumnVector.Array array = getByteArray(rowId);
+      byte[] bytes = new byte[array.length];
+      System.arraycopy(array.byteArray, array.byteArrayOffset, bytes, 0, 
bytes.length);
+      return bytes;
+    } else {
+      return dictionary.decodeToBinary(dictionaryIds.getDictId(rowId));
+    }
+  }
+
+  /**
+   * Append APIs. These APIs all behave similarly and will append data to the 
current vector.  It
+   * is not valid to mix the put and append APIs. The append APIs are slower 
and should only be
+   * used if the sizes are not known up front.
+   * In all these cases, the return value is the rowId for the first appended 
element.
+   */
+  public final int appendNull() {
+    assert (!(dataType() instanceof StructType)); // Use appendStruct()
+    reserve(elementsAppended + 1);
+    putNull(elementsAppended);
+    return elementsAppended++;
+  }
+
+  public final int appendNotNull() {
+    reserve(elementsAppended + 1);
+    putNotNull(elementsAppended);
+    return elementsAppended++;
+  }
+
+  public final int appendNulls(int count) {
+    assert (!(dataType() instanceof StructType));
+    reserve(elementsAppended + count);
+    int result = elementsAppended;
+    putNulls(elementsAppended, count);
+    elementsAppended += count;
+    return result;
+  }
+
+  public final int appendNotNulls(int count) {
+    assert (!(dataType() instanceof StructType));
+    reserve(elementsAppended + count);
+    int result = elementsAppended;
+    putNotNulls(elementsAppended, count);
+    elementsAppended += count;
+    return result;
+  }
+
+  public final int appendBoolean(boolean v) {
+    reserve(elementsAppended + 1);
+    putBoolean(elementsAppended, v);
+    return elementsAppended++;
+  }
+
+  public final int appendBooleans(int count, boolean v) {
+    reserve(elementsAppended + count);
+    int result = elementsAppended;
+    putBooleans(elementsAppended, count, v);
+    elementsAppended += count;
+    return result;
+  }
+
+  public final int appendByte(byte v) {
+    reserve(elementsAppended + 1);
+    putByte(elementsAppended, v);
+    return elementsAppended++;
+  }
+
+  public final int appendBytes(int count, byte v) {
+    reserve(elementsAppended + count);
+    int result = elementsAppended;
+    putBytes(elementsAppended, count, v);
+    elementsAppended += count;
+    return result;
+  }
+
+  public final int appendBytes(int length, byte[] src, int offset) {
+    reserve(elementsAppended + length);
+    int result = elementsAppended;
+    putBytes(elementsAppended, length, src, offset);
+    elementsAppended += length;
+    return result;
+  }
+
+  public final int appendShort(short v) {
+    reserve(elementsAppended + 1);
+    putShort(elementsAppended, v);
+    return elementsAppended++;
+  }
+
+  public final int appendShorts(int count, short v) {
+    reserve(elementsAppended + count);
+    int result = elementsAppended;
+    putShorts(elementsAppended, count, v);
+    elementsAppended += count;
+    return result;
+  }
+
+  public final int appendShorts(int length, short[] src, int offset) {
+    reserve(elementsAppended + length);
+    int result = elementsAppended;
+    putShorts(elementsAppended, length, src, offset);
+    elementsAppended += length;
+    return result;
+  }
+
+  public final int appendInt(int v) {
+    reserve(elementsAppended + 1);
+    putInt(elementsAppended, v);
+    return elementsAppended++;
+  }
+
+  public final int appendInts(int count, int v) {
+    reserve(elementsAppended + count);
+    int result = elementsAppended;
+    putInts(elementsAppended, count, v);
+    elementsAppended += count;
+    return result;
+  }
+
+  public final int appendInts(int length, int[] src, int offset) {
+    reserve(elementsAppended + length);
+    int result = elementsAppended;
+    putInts(elementsAppended, length, src, offset);
+    elementsAppended += length;
+    return result;
+  }
+
+  public final int appendLong(long v) {
+    reserve(elementsAppended + 1);
+    putLong(elementsAppended, v);
+    return elementsAppended++;
+  }
+
+  public final int appendLongs(int count, long v) {
+    reserve(elementsAppended + count);
+    int result = elementsAppended;
+    putLongs(elementsAppended, count, v);
+    elementsAppended += count;
+    return result;
+  }
+
+  public final int appendLongs(int length, long[] src, int offset) {
+    reserve(elementsAppended + length);
+    int result = elementsAppended;
+    putLongs(elementsAppended, length, src, offset);
+    elementsAppended += length;
+    return result;
+  }
+
+  public final int appendFloat(float v) {
+    reserve(elementsAppended + 1);
+    putFloat(elementsAppended, v);
+    return elementsAppended++;
+  }
+
+  public final int appendFloats(int count, float v) {
+    reserve(elementsAppended + count);
+    int result = elementsAppended;
+    putFloats(elementsAppended, count, v);
+    elementsAppended += count;
+    return result;
+  }
+
+  public final int appendFloats(int length, float[] src, int offset) {
+    reserve(elementsAppended + length);
+    int result = elementsAppended;
+    putFloats(elementsAppended, length, src, offset);
+    elementsAppended += length;
+    return result;
+  }
+
+  public final int appendDouble(double v) {
+    reserve(elementsAppended + 1);
+    putDouble(elementsAppended, v);
+    return elementsAppended++;
+  }
+
+  public final int appendDoubles(int count, double v) {
+    reserve(elementsAppended + count);
+    int result = elementsAppended;
+    putDoubles(elementsAppended, count, v);
+    elementsAppended += count;
+    return result;
+  }
+
+  public final int appendDoubles(int length, double[] src, int offset) {
+    reserve(elementsAppended + length);
+    int result = elementsAppended;
+    putDoubles(elementsAppended, length, src, offset);
+    elementsAppended += length;
+    return result;
+  }
+
+  public final int appendByteArray(byte[] value, int offset, int length) {
+    int copiedOffset = arrayData().appendBytes(length, value, offset);
+    reserve(elementsAppended + 1);
+    putArray(elementsAppended, copiedOffset, length);
+    return elementsAppended++;
+  }
+
+  public final int appendArray(int length) {
+    reserve(elementsAppended + 1);
+    putArray(elementsAppended, arrayData().elementsAppended, length);
+    return elementsAppended++;
+  }
+
+  /**
+   * Appends a NULL struct. This *has* to be used for structs instead of 
appendNull() as this
+   * recursively appends a NULL to its children.
+   * We don't have this logic as the general appendNull implementation to 
optimize the more
+   * common non-struct case.
+   */
+  public final int appendStruct(boolean isNull) {
+    if (isNull) {
+      appendNull();
+      for (ColumnVector c: childColumns) {
+        if (c.type instanceof StructType) {
+          ((WritableColumnVector) c).appendStruct(true);
+        } else {
+          ((WritableColumnVector) c).appendNull();
+        }
+      }
+    } else {
+      appendNotNull();
+    }
+    return elementsAppended;
+  }
+
+  /**
+   * Returns the data for the underlying array.
+   */
+  @Override
+  public WritableColumnVector arrayData() { return childColumns[0]; }
+
+  /**
+   * Returns the ordinal's child data column.
+   */
+  @Override
+  public WritableColumnVector getChildColumn(int ordinal) { return 
childColumns[ordinal]; }
+
+  /**
+   * Returns the elements appended.
+   */
+  public final int getElementsAppended() { return elementsAppended; }
+
+  /**
+   * Marks this column as being constant.
+   */
+  public final void setIsConstant() { isConstant = true; }
+
+  /**
+   * Maximum number of rows that can be stored in this column.
+   */
+  protected int capacity;
+
+  /**
+   * Upper limit for the maximum capacity for this column.
+   */
+  @VisibleForTesting
+  protected int MAX_CAPACITY = Integer.MAX_VALUE;
+
+  /**
+   * Number of nulls in this column. This is an optimization for the reader, 
to skip NULL checks.
+   */
+  protected int numNulls;
+
+  /**
+   * True if there is at least one NULL byte set. This is an optimization for 
the writer, to skip
+   * having to clear NULL bits.
+   */
+  protected boolean anyNullsSet;
+
+  /**
+   * True if this column's values are fixed. This means the column values 
never change, even
+   * across resets.
+   */
+  protected boolean isConstant;
+
+  /**
+   * Default size of each array length value. This grows as necessary.
+   */
+  protected static final int DEFAULT_ARRAY_LENGTH = 4;
+
+  /**
+   * Current write cursor (row index) when appending data.
+   */
+  protected int elementsAppended;
+
+  /**
+   * If this is a nested type (array or struct), the column for the child data.
+   */
+  protected WritableColumnVector[] childColumns;
+
+  /**
+   * Update the dictionary.
+   */
+  public void setDictionary(Dictionary dictionary) {
+    this.dictionary = dictionary;
+  }
+
+  /**
+   * Reserve a integer column for ids of dictionary.
+   */
+  public WritableColumnVector reserveDictionaryIds(int capacity) {
+    WritableColumnVector dictionaryIds = (WritableColumnVector) 
this.dictionaryIds;
+    if (dictionaryIds == null) {
+      dictionaryIds = reserveNewColumn(capacity, DataTypes.IntegerType);
+      this.dictionaryIds = dictionaryIds;
+    } else {
+      dictionaryIds.reset();
+      dictionaryIds.reserve(capacity);
+    }
+    return dictionaryIds;
+  }
+
+  /**
+   * Returns the underlying integer column for ids of dictionary.
+   */
+  @Override
+  public WritableColumnVector getDictionaryIds() {
+    return (WritableColumnVector) dictionaryIds;
+  }
+
+  /**
+   * Reserve a new column.
+   */
+  protected abstract WritableColumnVector reserveNewColumn(int capacity, 
DataType type);
+
+  /**
+   * Sets up the common state and also handles creating the child columns if 
this is a nested
+   * type.
+   */
+  protected WritableColumnVector(int capacity, DataType type) {
+    super(type);
+    this.capacity = capacity;
+
+    if (type instanceof ArrayType || type instanceof BinaryType || type 
instanceof StringType
+        || DecimalType.isByteArrayDecimalType(type)) {
+      DataType childType;
+      int childCapacity = capacity;
+      if (type instanceof ArrayType) {
+        childType = ((ArrayType)type).elementType();
+      } else {
+        childType = DataTypes.ByteType;
+        childCapacity *= DEFAULT_ARRAY_LENGTH;
+      }
+      this.childColumns = new WritableColumnVector[1];
+      this.childColumns[0] = reserveNewColumn(childCapacity, childType);
+      this.resultArray = new ColumnVector.Array(this.childColumns[0]);
+      this.resultStruct = null;
+    } else if (type instanceof StructType) {
+      StructType st = (StructType)type;
+      this.childColumns = new WritableColumnVector[st.fields().length];
+      for (int i = 0; i < childColumns.length; ++i) {
+        this.childColumns[i] = reserveNewColumn(capacity, 
st.fields()[i].dataType());
+      }
+      this.resultArray = null;
+      this.resultStruct = new ColumnarBatch.Row(this.childColumns);
+    } else if (type instanceof CalendarIntervalType) {
+      // Two columns. Months as int. Microseconds as Long.
+      this.childColumns = new WritableColumnVector[2];
+      this.childColumns[0] = reserveNewColumn(capacity, DataTypes.IntegerType);
+      this.childColumns[1] = reserveNewColumn(capacity, DataTypes.LongType);
+      this.resultArray = null;
+      this.resultStruct = new ColumnarBatch.Row(this.childColumns);
+    } else {
+      this.childColumns = null;
+      this.resultArray = null;
+      this.resultStruct = null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9e33954d/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/VectorizedHashMapGenerator.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/VectorizedHashMapGenerator.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/VectorizedHashMapGenerator.scala
index 0c40417..13f7927 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/VectorizedHashMapGenerator.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/VectorizedHashMapGenerator.scala
@@ -76,6 +76,8 @@ class VectorizedHashMapGenerator(
         }.mkString("\n").concat(";")
 
     s"""
+       |  private 
org.apache.spark.sql.execution.vectorized.OnHeapColumnVector[] batchVectors;
+       |  private 
org.apache.spark.sql.execution.vectorized.OnHeapColumnVector[] bufferVectors;
        |  private org.apache.spark.sql.execution.vectorized.ColumnarBatch 
batch;
        |  private org.apache.spark.sql.execution.vectorized.ColumnarBatch 
aggregateBufferBatch;
        |  private int[] buckets;
@@ -89,14 +91,19 @@ class VectorizedHashMapGenerator(
        |    $generatedAggBufferSchema
        |
        |  public $generatedClassName() {
-       |    batch = 
org.apache.spark.sql.execution.vectorized.ColumnarBatch.allocate(schema,
-       |      org.apache.spark.memory.MemoryMode.ON_HEAP, capacity);
-       |    // TODO: Possibly generate this projection in HashAggregate 
directly
-       |    aggregateBufferBatch = 
org.apache.spark.sql.execution.vectorized.ColumnarBatch.allocate(
-       |      aggregateBufferSchema, 
org.apache.spark.memory.MemoryMode.ON_HEAP, capacity);
-       |    for (int i = 0 ; i < aggregateBufferBatch.numCols(); i++) {
-       |       aggregateBufferBatch.setColumn(i, 
batch.column(i+${groupingKeys.length}));
+       |    batchVectors = org.apache.spark.sql.execution.vectorized
+       |      .OnHeapColumnVector.allocateColumns(capacity, schema);
+       |    batch = new 
org.apache.spark.sql.execution.vectorized.ColumnarBatch(
+       |      schema, batchVectors, capacity);
+       |
+       |    bufferVectors = new org.apache.spark.sql.execution.vectorized
+       |      .OnHeapColumnVector[aggregateBufferSchema.fields().length];
+       |    for (int i = 0; i < aggregateBufferSchema.fields().length; i++) {
+       |      bufferVectors[i] = batchVectors[i + ${groupingKeys.length}];
        |    }
+       |    // TODO: Possibly generate this projection in HashAggregate 
directly
+       |    aggregateBufferBatch = new 
org.apache.spark.sql.execution.vectorized.ColumnarBatch(
+       |      aggregateBufferSchema, bufferVectors, capacity);
        |
        |    buckets = new int[numBuckets];
        |    java.util.Arrays.fill(buckets, -1);
@@ -112,8 +119,8 @@ class VectorizedHashMapGenerator(
    *
    * {{{
    * private boolean equals(int idx, long agg_key, long agg_key1) {
-   *   return batch.column(0).getLong(buckets[idx]) == agg_key &&
-   *     batch.column(1).getLong(buckets[idx]) == agg_key1;
+   *   return batchVectors[0].getLong(buckets[idx]) == agg_key &&
+   *     batchVectors[1].getLong(buckets[idx]) == agg_key1;
    * }
    * }}}
    */
@@ -121,8 +128,8 @@ class VectorizedHashMapGenerator(
 
     def genEqualsForKeys(groupingKeys: Seq[Buffer]): String = {
       groupingKeys.zipWithIndex.map { case (key: Buffer, ordinal: Int) =>
-        s"""(${ctx.genEqual(key.dataType, ctx.getValue("batch", "buckets[idx]",
-          key.dataType, ordinal), key.name)})"""
+        s"""(${ctx.genEqual(key.dataType, 
ctx.getValue(s"batchVectors[$ordinal]", "buckets[idx]",
+          key.dataType), key.name)})"""
       }.mkString(" && ")
     }
 
@@ -150,9 +157,9 @@ class VectorizedHashMapGenerator(
    *   while (step < maxSteps) {
    *     // Return bucket index if it's either an empty slot or already 
contains the key
    *     if (buckets[idx] == -1) {
-   *       batch.column(0).putLong(numRows, agg_key);
-   *       batch.column(1).putLong(numRows, agg_key1);
-   *       batch.column(2).putLong(numRows, 0);
+   *       batchVectors[0].putLong(numRows, agg_key);
+   *       batchVectors[1].putLong(numRows, agg_key1);
+   *       batchVectors[2].putLong(numRows, 0);
    *       buckets[idx] = numRows++;
    *       return batch.getRow(buckets[idx]);
    *     } else if (equals(idx, agg_key, agg_key1)) {
@@ -170,13 +177,13 @@ class VectorizedHashMapGenerator(
 
     def genCodeToSetKeys(groupingKeys: Seq[Buffer]): Seq[String] = {
       groupingKeys.zipWithIndex.map { case (key: Buffer, ordinal: Int) =>
-        ctx.setValue("batch", "numRows", key.dataType, ordinal, key.name)
+        ctx.setValue(s"batchVectors[$ordinal]", "numRows", key.dataType, 
key.name)
       }
     }
 
     def genCodeToSetAggBuffers(bufferValues: Seq[Buffer]): Seq[String] = {
       bufferValues.zipWithIndex.map { case (key: Buffer, ordinal: Int) =>
-        ctx.updateColumn("batch", "numRows", key.dataType, groupingKeys.length 
+ ordinal,
+        ctx.updateColumn(s"batchVectors[${groupingKeys.length + ordinal}]", 
"numRows", key.dataType,
           buffVars(ordinal), nullable = true)
       }
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/9e33954d/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchBenchmark.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchBenchmark.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchBenchmark.scala
index 67b3d98..1331f15 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchBenchmark.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchBenchmark.scala
@@ -24,7 +24,10 @@ import scala.util.Random
 import org.apache.spark.memory.MemoryMode
 import org.apache.spark.sql.catalyst.expressions.UnsafeRow
 import org.apache.spark.sql.execution.vectorized.ColumnVector
-import org.apache.spark.sql.types.{BinaryType, IntegerType}
+import org.apache.spark.sql.execution.vectorized.OffHeapColumnVector
+import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector
+import org.apache.spark.sql.execution.vectorized.WritableColumnVector
+import org.apache.spark.sql.types.{BinaryType, DataType, IntegerType}
 import org.apache.spark.unsafe.Platform
 import org.apache.spark.util.Benchmark
 import org.apache.spark.util.collection.BitSet
@@ -34,6 +37,14 @@ import org.apache.spark.util.collection.BitSet
  */
 object ColumnarBatchBenchmark {
 
+  def allocate(capacity: Int, dt: DataType, memMode: MemoryMode): 
WritableColumnVector = {
+    if (memMode == MemoryMode.OFF_HEAP) {
+      new OffHeapColumnVector(capacity, dt)
+    } else {
+      new OnHeapColumnVector(capacity, dt)
+    }
+  }
+
   // This benchmark reads and writes an array of ints.
   // TODO: there is a big (2x) penalty for a random access API for off heap.
   // Note: carefully if modifying this code. It's hard to reason about the JIT.
@@ -140,7 +151,7 @@ object ColumnarBatchBenchmark {
 
     // Access through the column API with on heap memory
     val columnOnHeap = { i: Int =>
-      val col = ColumnVector.allocate(count, IntegerType, MemoryMode.ON_HEAP)
+      val col = allocate(count, IntegerType, MemoryMode.ON_HEAP)
       var sum = 0L
       for (n <- 0L until iters) {
         var i = 0
@@ -159,7 +170,7 @@ object ColumnarBatchBenchmark {
 
     // Access through the column API with off heap memory
     def columnOffHeap = { i: Int => {
-      val col = ColumnVector.allocate(count, IntegerType, MemoryMode.OFF_HEAP)
+      val col = allocate(count, IntegerType, MemoryMode.OFF_HEAP)
       var sum = 0L
       for (n <- 0L until iters) {
         var i = 0
@@ -178,7 +189,7 @@ object ColumnarBatchBenchmark {
 
     // Access by directly getting the buffer backing the column.
     val columnOffheapDirect = { i: Int =>
-      val col = ColumnVector.allocate(count, IntegerType, MemoryMode.OFF_HEAP)
+      val col = allocate(count, IntegerType, MemoryMode.OFF_HEAP)
       var sum = 0L
       for (n <- 0L until iters) {
         var addr = col.valuesNativeAddress()
@@ -244,7 +255,7 @@ object ColumnarBatchBenchmark {
 
     // Adding values by appending, instead of putting.
     val onHeapAppend = { i: Int =>
-      val col = ColumnVector.allocate(count, IntegerType, MemoryMode.ON_HEAP)
+      val col = allocate(count, IntegerType, MemoryMode.ON_HEAP)
       var sum = 0L
       for (n <- 0L until iters) {
         var i = 0
@@ -362,7 +373,7 @@ object ColumnarBatchBenchmark {
       .map(_.getBytes(StandardCharsets.UTF_8)).toArray
 
     def column(memoryMode: MemoryMode) = { i: Int =>
-      val column = ColumnVector.allocate(count, BinaryType, memoryMode)
+      val column = allocate(count, BinaryType, memoryMode)
       var sum = 0L
       for (n <- 0L until iters) {
         var i = 0

http://git-wip-us.apache.org/repos/asf/spark/blob/9e33954d/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
index c8461dc..08ccbd6 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
@@ -34,11 +34,20 @@ import org.apache.spark.unsafe.Platform
 import org.apache.spark.unsafe.types.CalendarInterval
 
 class ColumnarBatchSuite extends SparkFunSuite {
+
+  def allocate(capacity: Int, dt: DataType, memMode: MemoryMode): 
WritableColumnVector = {
+    if (memMode == MemoryMode.OFF_HEAP) {
+      new OffHeapColumnVector(capacity, dt)
+    } else {
+      new OnHeapColumnVector(capacity, dt)
+    }
+  }
+
   test("Null Apis") {
     (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode => {
       val reference = mutable.ArrayBuffer.empty[Boolean]
 
-      val column = ColumnVector.allocate(1024, IntegerType, memMode)
+      val column = allocate(1024, IntegerType, memMode)
       var idx = 0
       assert(column.anyNullsSet() == false)
       assert(column.numNulls() == 0)
@@ -109,7 +118,7 @@ class ColumnarBatchSuite extends SparkFunSuite {
     (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode => {
       val reference = mutable.ArrayBuffer.empty[Byte]
 
-      val column = ColumnVector.allocate(1024, ByteType, memMode)
+      val column = allocate(1024, ByteType, memMode)
 
       var values = (10 :: 20 :: 30 :: 40 :: 50 :: Nil).map(_.toByte).toArray
       column.appendBytes(2, values, 0)
@@ -167,7 +176,7 @@ class ColumnarBatchSuite extends SparkFunSuite {
       val random = new Random(seed)
       val reference = mutable.ArrayBuffer.empty[Short]
 
-      val column = ColumnVector.allocate(1024, ShortType, memMode)
+      val column = allocate(1024, ShortType, memMode)
 
       var values = (10 :: 20 :: 30 :: 40 :: 50 :: Nil).map(_.toShort).toArray
       column.appendShorts(2, values, 0)
@@ -247,7 +256,7 @@ class ColumnarBatchSuite extends SparkFunSuite {
       val random = new Random(seed)
       val reference = mutable.ArrayBuffer.empty[Int]
 
-      val column = ColumnVector.allocate(1024, IntegerType, memMode)
+      val column = allocate(1024, IntegerType, memMode)
 
       var values = (10 :: 20 :: 30 :: 40 :: 50 :: Nil).toArray
       column.appendInts(2, values, 0)
@@ -332,7 +341,7 @@ class ColumnarBatchSuite extends SparkFunSuite {
       val random = new Random(seed)
       val reference = mutable.ArrayBuffer.empty[Long]
 
-      val column = ColumnVector.allocate(1024, LongType, memMode)
+      val column = allocate(1024, LongType, memMode)
 
       var values = (10L :: 20L :: 30L :: 40L :: 50L :: Nil).toArray
       column.appendLongs(2, values, 0)
@@ -419,7 +428,7 @@ class ColumnarBatchSuite extends SparkFunSuite {
       val random = new Random(seed)
       val reference = mutable.ArrayBuffer.empty[Float]
 
-      val column = ColumnVector.allocate(1024, FloatType, memMode)
+      val column = allocate(1024, FloatType, memMode)
 
       var values = (.1f :: .2f :: .3f :: .4f :: .5f :: Nil).toArray
       column.appendFloats(2, values, 0)
@@ -510,7 +519,7 @@ class ColumnarBatchSuite extends SparkFunSuite {
       val random = new Random(seed)
       val reference = mutable.ArrayBuffer.empty[Double]
 
-      val column = ColumnVector.allocate(1024, DoubleType, memMode)
+      val column = allocate(1024, DoubleType, memMode)
 
       var values = (.1 :: .2 :: .3 :: .4 :: .5 :: Nil).toArray
       column.appendDoubles(2, values, 0)
@@ -599,7 +608,7 @@ class ColumnarBatchSuite extends SparkFunSuite {
     (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode => {
       val reference = mutable.ArrayBuffer.empty[String]
 
-      val column = ColumnVector.allocate(6, BinaryType, memMode)
+      val column = allocate(6, BinaryType, memMode)
       assert(column.arrayData().elementsAppended == 0)
 
       val str = "string"
@@ -656,7 +665,7 @@ class ColumnarBatchSuite extends SparkFunSuite {
 
   test("Int Array") {
     (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode => {
-      val column = ColumnVector.allocate(10, new ArrayType(IntegerType, true), 
memMode)
+      val column = allocate(10, new ArrayType(IntegerType, true), memMode)
 
       // Fill the underlying data with all the arrays back to back.
       val data = column.arrayData();
@@ -714,43 +723,43 @@ class ColumnarBatchSuite extends SparkFunSuite {
     (MemoryMode.ON_HEAP :: Nil).foreach { memMode => {
       val len = 4
 
-      val columnBool = ColumnVector.allocate(len, new ArrayType(BooleanType, 
false), memMode)
+      val columnBool = allocate(len, new ArrayType(BooleanType, false), 
memMode)
       val boolArray = Array(false, true, false, true)
       boolArray.zipWithIndex.map { case (v, i) => 
columnBool.arrayData.putBoolean(i, v) }
       columnBool.putArray(0, 0, len)
       assert(columnBool.getArray(0).toBooleanArray === boolArray)
 
-      val columnByte = ColumnVector.allocate(len, new ArrayType(ByteType, 
false), memMode)
+      val columnByte = allocate(len, new ArrayType(ByteType, false), memMode)
       val byteArray = Array[Byte](0, 1, 2, 3)
       byteArray.zipWithIndex.map { case (v, i) => 
columnByte.arrayData.putByte(i, v) }
       columnByte.putArray(0, 0, len)
       assert(columnByte.getArray(0).toByteArray === byteArray)
 
-      val columnShort = ColumnVector.allocate(len, new ArrayType(ShortType, 
false), memMode)
+      val columnShort = allocate(len, new ArrayType(ShortType, false), memMode)
       val shortArray = Array[Short](0, 1, 2, 3)
       shortArray.zipWithIndex.map { case (v, i) => 
columnShort.arrayData.putShort(i, v) }
       columnShort.putArray(0, 0, len)
       assert(columnShort.getArray(0).toShortArray === shortArray)
 
-      val columnInt = ColumnVector.allocate(len, new ArrayType(IntegerType, 
false), memMode)
+      val columnInt = allocate(len, new ArrayType(IntegerType, false), memMode)
       val intArray = Array(0, 1, 2, 3)
       intArray.zipWithIndex.map { case (v, i) => columnInt.arrayData.putInt(i, 
v) }
       columnInt.putArray(0, 0, len)
       assert(columnInt.getArray(0).toIntArray === intArray)
 
-      val columnLong = ColumnVector.allocate(len, new ArrayType(LongType, 
false), memMode)
+      val columnLong = allocate(len, new ArrayType(LongType, false), memMode)
       val longArray = Array[Long](0, 1, 2, 3)
       longArray.zipWithIndex.map { case (v, i) => 
columnLong.arrayData.putLong(i, v) }
       columnLong.putArray(0, 0, len)
       assert(columnLong.getArray(0).toLongArray === longArray)
 
-      val columnFloat = ColumnVector.allocate(len, new ArrayType(FloatType, 
false), memMode)
+      val columnFloat = allocate(len, new ArrayType(FloatType, false), memMode)
       val floatArray = Array(0.0F, 1.1F, 2.2F, 3.3F)
       floatArray.zipWithIndex.map { case (v, i) => 
columnFloat.arrayData.putFloat(i, v) }
       columnFloat.putArray(0, 0, len)
       assert(columnFloat.getArray(0).toFloatArray === floatArray)
 
-      val columnDouble = ColumnVector.allocate(len, new ArrayType(DoubleType, 
false), memMode)
+      val columnDouble = allocate(len, new ArrayType(DoubleType, false), 
memMode)
       val doubleArray = Array(0.0, 1.1, 2.2, 3.3)
       doubleArray.zipWithIndex.map { case (v, i) => 
columnDouble.arrayData.putDouble(i, v) }
       columnDouble.putArray(0, 0, len)
@@ -761,7 +770,7 @@ class ColumnarBatchSuite extends SparkFunSuite {
   test("Struct Column") {
     (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode => {
       val schema = new StructType().add("int", IntegerType).add("double", 
DoubleType)
-      val column = ColumnVector.allocate(1024, schema, memMode)
+      val column = allocate(1024, schema, memMode)
 
       val c1 = column.getChildColumn(0)
       val c2 = column.getChildColumn(1)
@@ -790,7 +799,7 @@ class ColumnarBatchSuite extends SparkFunSuite {
 
   test("Nest Array in Array.") {
     (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode =>
-      val column = ColumnVector.allocate(10, new ArrayType(new 
ArrayType(IntegerType, true), true),
+      val column = allocate(10, new ArrayType(new ArrayType(IntegerType, 
true), true),
         memMode)
       val childColumn = column.arrayData()
       val data = column.arrayData().arrayData()
@@ -823,7 +832,7 @@ class ColumnarBatchSuite extends SparkFunSuite {
   test("Nest Struct in Array.") {
     (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode =>
       val schema = new StructType().add("int", IntegerType).add("long", 
LongType)
-      val column = ColumnVector.allocate(10, new ArrayType(schema, true), 
memMode)
+      val column = allocate(10, new ArrayType(schema, true), memMode)
       val data = column.arrayData()
       val c0 = data.getChildColumn(0)
       val c1 = data.getChildColumn(1)
@@ -853,7 +862,7 @@ class ColumnarBatchSuite extends SparkFunSuite {
       val schema = new StructType()
         .add("int", IntegerType)
         .add("array", new ArrayType(IntegerType, true))
-      val column = ColumnVector.allocate(10, schema, memMode)
+      val column = allocate(10, schema, memMode)
       val c0 = column.getChildColumn(0)
       val c1 = column.getChildColumn(1)
       c0.putInt(0, 0)
@@ -885,7 +894,7 @@ class ColumnarBatchSuite extends SparkFunSuite {
       val schema = new StructType()
         .add("int", IntegerType)
         .add("struct", subSchema)
-      val column = ColumnVector.allocate(10, schema, memMode)
+      val column = allocate(10, schema, memMode)
       val c0 = column.getChildColumn(0)
       val c1 = column.getChildColumn(1)
       c0.putInt(0, 0)
@@ -918,7 +927,11 @@ class ColumnarBatchSuite extends SparkFunSuite {
         .add("intCol2", IntegerType)
         .add("string", BinaryType)
 
-      val batch = ColumnarBatch.allocate(schema, memMode)
+      val capacity = ColumnarBatch.DEFAULT_BATCH_SIZE
+      val columns = schema.fields.map { field =>
+        allocate(capacity, field.dataType, memMode)
+      }
+      val batch = new ColumnarBatch(schema, columns.toArray, 
ColumnarBatch.DEFAULT_BATCH_SIZE)
       assert(batch.numCols() == 4)
       assert(batch.numRows() == 0)
       assert(batch.numValidRows() == 0)
@@ -926,10 +939,10 @@ class ColumnarBatchSuite extends SparkFunSuite {
       assert(batch.rowIterator().hasNext == false)
 
       // Add a row [1, 1.1, NULL]
-      batch.column(0).putInt(0, 1)
-      batch.column(1).putDouble(0, 1.1)
-      batch.column(2).putNull(0)
-      batch.column(3).putByteArray(0, "Hello".getBytes(StandardCharsets.UTF_8))
+      columns(0).putInt(0, 1)
+      columns(1).putDouble(0, 1.1)
+      columns(2).putNull(0)
+      columns(3).putByteArray(0, "Hello".getBytes(StandardCharsets.UTF_8))
       batch.setNumRows(1)
 
       // Verify the results of the row.
@@ -939,12 +952,12 @@ class ColumnarBatchSuite extends SparkFunSuite {
       assert(batch.rowIterator().hasNext == true)
       assert(batch.rowIterator().hasNext == true)
 
-      assert(batch.column(0).getInt(0) == 1)
-      assert(batch.column(0).isNullAt(0) == false)
-      assert(batch.column(1).getDouble(0) == 1.1)
-      assert(batch.column(1).isNullAt(0) == false)
-      assert(batch.column(2).isNullAt(0) == true)
-      assert(batch.column(3).getUTF8String(0).toString == "Hello")
+      assert(columns(0).getInt(0) == 1)
+      assert(columns(0).isNullAt(0) == false)
+      assert(columns(1).getDouble(0) == 1.1)
+      assert(columns(1).isNullAt(0) == false)
+      assert(columns(2).isNullAt(0) == true)
+      assert(columns(3).getUTF8String(0).toString == "Hello")
 
       // Verify the iterator works correctly.
       val it = batch.rowIterator()
@@ -955,7 +968,7 @@ class ColumnarBatchSuite extends SparkFunSuite {
       assert(row.getDouble(1) == 1.1)
       assert(row.isNullAt(1) == false)
       assert(row.isNullAt(2) == true)
-      assert(batch.column(3).getUTF8String(0).toString == "Hello")
+      assert(columns(3).getUTF8String(0).toString == "Hello")
       assert(it.hasNext == false)
       assert(it.hasNext == false)
 
@@ -972,20 +985,20 @@ class ColumnarBatchSuite extends SparkFunSuite {
       assert(batch.rowIterator().hasNext == false)
 
       // Add rows [NULL, 2.2, 2, "abc"], [3, NULL, 3, ""], [4, 4.4, 4, "world]
-      batch.column(0).putNull(0)
-      batch.column(1).putDouble(0, 2.2)
-      batch.column(2).putInt(0, 2)
-      batch.column(3).putByteArray(0, "abc".getBytes(StandardCharsets.UTF_8))
-
-      batch.column(0).putInt(1, 3)
-      batch.column(1).putNull(1)
-      batch.column(2).putInt(1, 3)
-      batch.column(3).putByteArray(1, "".getBytes(StandardCharsets.UTF_8))
-
-      batch.column(0).putInt(2, 4)
-      batch.column(1).putDouble(2, 4.4)
-      batch.column(2).putInt(2, 4)
-      batch.column(3).putByteArray(2, "world".getBytes(StandardCharsets.UTF_8))
+      columns(0).putNull(0)
+      columns(1).putDouble(0, 2.2)
+      columns(2).putInt(0, 2)
+      columns(3).putByteArray(0, "abc".getBytes(StandardCharsets.UTF_8))
+
+      columns(0).putInt(1, 3)
+      columns(1).putNull(1)
+      columns(2).putInt(1, 3)
+      columns(3).putByteArray(1, "".getBytes(StandardCharsets.UTF_8))
+
+      columns(0).putInt(2, 4)
+      columns(1).putDouble(2, 4.4)
+      columns(2).putInt(2, 4)
+      columns(3).putByteArray(2, "world".getBytes(StandardCharsets.UTF_8))
       batch.setNumRows(3)
 
       def rowEquals(x: InternalRow, y: Row): Unit = {
@@ -1232,7 +1245,7 @@ class ColumnarBatchSuite extends SparkFunSuite {
 
   test("exceeding maximum capacity should throw an error") {
     (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode =>
-      val column = ColumnVector.allocate(1, ByteType, memMode)
+      val column = allocate(1, ByteType, memMode)
       column.MAX_CAPACITY = 15
       column.appendBytes(5, 0.toByte)
       // Successfully allocate twice the requested capacity


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to