Repository: spark
Updated Branches:
  refs/heads/master d54670192 -> 64df08b64


[SPARK-20783][SQL] Create ColumnVector to abstract existing compressed column 
(batch method)

## What changes were proposed in this pull request?

This PR abstracts data compressed by `CompressibleColumnAccessor` using 
`ColumnVector` in batch method. When `ColumnAccessor.decompress` is called, 
`ColumnVector` will have uncompressed data. This batch decompress does not use 
`InternalRow` to reduce the number of memory accesses.

As first step of this implementation, this JIRA supports primitive data types. 
Another PR will support array and other data types.

This implementation decompress data in batch into uncompressed column batch, as 
rxin suggested at 
[here](https://github.com/apache/spark/pull/18468#issuecomment-316914076). 
Another implementation uses adapter approach [as cloud-fan 
suggested](https://github.com/apache/spark/pull/18468).

## How was this patch tested?

Added test suites

Author: Kazuaki Ishizaki <ishiz...@jp.ibm.com>

Closes #18704 from kiszk/SPARK-20783a.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/64df08b6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/64df08b6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/64df08b6

Branch: refs/heads/master
Commit: 64df08b64779bab629a8a90a3797d8bd70f61703
Parents: d546701
Author: Kazuaki Ishizaki <ishiz...@jp.ibm.com>
Authored: Wed Oct 4 15:06:44 2017 +0800
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Wed Oct 4 15:06:44 2017 +0800

----------------------------------------------------------------------
 .../execution/columnar/ColumnDictionary.java    |  58 ++++
 .../vectorized/OffHeapColumnVector.java         |  18 +
 .../vectorized/OnHeapColumnVector.java          |  18 +
 .../vectorized/WritableColumnVector.java        |  76 +++--
 .../sql/execution/columnar/ColumnAccessor.scala |  16 +-
 .../sql/execution/columnar/ColumnType.scala     |  33 ++
 .../CompressibleColumnAccessor.scala            |   4 +
 .../compression/CompressionScheme.scala         |   3 +
 .../compression/compressionSchemes.scala        | 340 ++++++++++++++++++-
 .../compression/BooleanBitSetSuite.scala        |  52 +++
 .../compression/DictionaryEncodingSuite.scala   |  72 +++-
 .../compression/IntegralDeltaSuite.scala        |  72 ++++
 .../compression/PassThroughEncodingSuite.scala  | 189 +++++++++++
 .../compression/RunLengthEncodingSuite.scala    |  89 ++++-
 .../TestCompressibleColumnBuilder.scala         |   9 +-
 .../vectorized/ColumnVectorSuite.scala          | 183 +++++++++-
 .../vectorized/ColumnarBatchSuite.scala         |   4 +-
 17 files changed, 1192 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/64df08b6/sql/core/src/main/java/org/apache/spark/sql/execution/columnar/ColumnDictionary.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/columnar/ColumnDictionary.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/columnar/ColumnDictionary.java
new file mode 100644
index 0000000..f178585
--- /dev/null
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/columnar/ColumnDictionary.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.columnar;
+
+import org.apache.spark.sql.execution.vectorized.Dictionary;
+
+public final class ColumnDictionary implements Dictionary {
+  private int[] intDictionary;
+  private long[] longDictionary;
+
+  public ColumnDictionary(int[] dictionary) {
+    this.intDictionary = dictionary;
+  }
+
+  public ColumnDictionary(long[] dictionary) {
+    this.longDictionary = dictionary;
+  }
+
+  @Override
+  public int decodeToInt(int id) {
+    return intDictionary[id];
+  }
+
+  @Override
+  public long decodeToLong(int id) {
+    return longDictionary[id];
+  }
+
+  @Override
+  public float decodeToFloat(int id) {
+    throw new UnsupportedOperationException("Dictionary encoding does not 
support float");
+  }
+
+  @Override
+  public double decodeToDouble(int id) {
+    throw new UnsupportedOperationException("Dictionary encoding does not 
support double");
+  }
+
+  @Override
+  public byte[] decodeToBinary(int id) {
+    throw new UnsupportedOperationException("Dictionary encoding does not 
support String");
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/64df08b6/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
index 8cbc895..a7522eb 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
@@ -229,6 +229,12 @@ public final class OffHeapColumnVector extends 
WritableColumnVector {
   }
 
   @Override
+  public void putShorts(int rowId, int count, byte[] src, int srcIndex) {
+    Platform.copyMemory(src, Platform.BYTE_ARRAY_OFFSET + srcIndex,
+      null, data + rowId * 2, count * 2);
+  }
+
+  @Override
   public short getShort(int rowId) {
     if (dictionary == null) {
       return Platform.getShort(null, data + 2 * rowId);
@@ -269,6 +275,12 @@ public final class OffHeapColumnVector extends 
WritableColumnVector {
   }
 
   @Override
+  public void putInts(int rowId, int count, byte[] src, int srcIndex) {
+    Platform.copyMemory(src, Platform.BYTE_ARRAY_OFFSET + srcIndex,
+      null, data + rowId * 4, count * 4);
+  }
+
+  @Override
   public void putIntsLittleEndian(int rowId, int count, byte[] src, int 
srcIndex) {
     if (!bigEndianPlatform) {
       Platform.copyMemory(src, srcIndex + Platform.BYTE_ARRAY_OFFSET,
@@ -335,6 +347,12 @@ public final class OffHeapColumnVector extends 
WritableColumnVector {
   }
 
   @Override
+  public void putLongs(int rowId, int count, byte[] src, int srcIndex) {
+    Platform.copyMemory(src, Platform.BYTE_ARRAY_OFFSET + srcIndex,
+      null, data + rowId * 8, count * 8);
+  }
+
+  @Override
   public void putLongsLittleEndian(int rowId, int count, byte[] src, int 
srcIndex) {
     if (!bigEndianPlatform) {
       Platform.copyMemory(src, srcIndex + Platform.BYTE_ARRAY_OFFSET,

http://git-wip-us.apache.org/repos/asf/spark/blob/64df08b6/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
index 2725a29..166a39e 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
@@ -234,6 +234,12 @@ public final class OnHeapColumnVector extends 
WritableColumnVector {
   }
 
   @Override
+  public void putShorts(int rowId, int count, byte[] src, int srcIndex) {
+    Platform.copyMemory(src, Platform.BYTE_ARRAY_OFFSET + srcIndex, shortData,
+      Platform.SHORT_ARRAY_OFFSET + rowId * 2, count * 2);
+  }
+
+  @Override
   public short getShort(int rowId) {
     if (dictionary == null) {
       return shortData[rowId];
@@ -273,6 +279,12 @@ public final class OnHeapColumnVector extends 
WritableColumnVector {
   }
 
   @Override
+  public void putInts(int rowId, int count, byte[] src, int srcIndex) {
+    Platform.copyMemory(src, Platform.BYTE_ARRAY_OFFSET + srcIndex, intData,
+      Platform.INT_ARRAY_OFFSET + rowId * 4, count * 4);
+  }
+
+  @Override
   public void putIntsLittleEndian(int rowId, int count, byte[] src, int 
srcIndex) {
     int srcOffset = srcIndex + Platform.BYTE_ARRAY_OFFSET;
     for (int i = 0; i < count; ++i, srcOffset += 4) {
@@ -333,6 +345,12 @@ public final class OnHeapColumnVector extends 
WritableColumnVector {
   }
 
   @Override
+  public void putLongs(int rowId, int count, byte[] src, int srcIndex) {
+    Platform.copyMemory(src, Platform.BYTE_ARRAY_OFFSET + srcIndex, longData,
+      Platform.LONG_ARRAY_OFFSET + rowId * 8, count * 8);
+  }
+
+  @Override
   public void putLongsLittleEndian(int rowId, int count, byte[] src, int 
srcIndex) {
     int srcOffset = srcIndex + Platform.BYTE_ARRAY_OFFSET;
     for (int i = 0; i < count; ++i, srcOffset += 8) {

http://git-wip-us.apache.org/repos/asf/spark/blob/64df08b6/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
index 163f251..da72954 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
@@ -113,138 +113,156 @@ public abstract class WritableColumnVector extends 
ColumnVector {
   protected abstract void reserveInternal(int capacity);
 
   /**
-   * Sets the value at rowId to null/not null.
+   * Sets null/not null to the value at rowId.
    */
   public abstract void putNotNull(int rowId);
   public abstract void putNull(int rowId);
 
   /**
-   * Sets the values from [rowId, rowId + count) to null/not null.
+   * Sets null/not null to the values at [rowId, rowId + count).
    */
   public abstract void putNulls(int rowId, int count);
   public abstract void putNotNulls(int rowId, int count);
 
   /**
-   * Sets the value at rowId to `value`.
+   * Sets `value` to the value at rowId.
    */
   public abstract void putBoolean(int rowId, boolean value);
 
   /**
-   * Sets values from [rowId, rowId + count) to value.
+   * Sets value to [rowId, rowId + count).
    */
   public abstract void putBooleans(int rowId, int count, boolean value);
 
   /**
-   * Sets the value at rowId to `value`.
+   * Sets `value` to the value at rowId.
    */
   public abstract void putByte(int rowId, byte value);
 
   /**
-   * Sets values from [rowId, rowId + count) to value.
+   * Sets value to [rowId, rowId + count).
    */
   public abstract void putBytes(int rowId, int count, byte value);
 
   /**
-   * Sets values from [rowId, rowId + count) to [src + srcIndex, src + 
srcIndex + count)
+   * Sets values from [src[srcIndex], src[srcIndex + count]) to [rowId, rowId 
+ count)
    */
   public abstract void putBytes(int rowId, int count, byte[] src, int 
srcIndex);
 
   /**
-   * Sets the value at rowId to `value`.
+   * Sets `value` to the value at rowId.
    */
   public abstract void putShort(int rowId, short value);
 
   /**
-   * Sets values from [rowId, rowId + count) to value.
+   * Sets value to [rowId, rowId + count).
    */
   public abstract void putShorts(int rowId, int count, short value);
 
   /**
-   * Sets values from [rowId, rowId + count) to [src + srcIndex, src + 
srcIndex + count)
+   * Sets values from [src[srcIndex], src[srcIndex + count]) to [rowId, rowId 
+ count)
    */
   public abstract void putShorts(int rowId, int count, short[] src, int 
srcIndex);
 
   /**
-   * Sets the value at rowId to `value`.
+   * Sets values from [src[srcIndex], src[srcIndex + count * 2]) to [rowId, 
rowId + count)
+   * The data in src must be 2-byte platform native endian shorts.
+   */
+  public abstract void putShorts(int rowId, int count, byte[] src, int 
srcIndex);
+
+  /**
+   * Sets `value` to the value at rowId.
    */
   public abstract void putInt(int rowId, int value);
 
   /**
-   * Sets values from [rowId, rowId + count) to value.
+   * Sets value to [rowId, rowId + count).
    */
   public abstract void putInts(int rowId, int count, int value);
 
   /**
-   * Sets values from [rowId, rowId + count) to [src + srcIndex, src + 
srcIndex + count)
+   * Sets values from [src[srcIndex], src[srcIndex + count]) to [rowId, rowId 
+ count)
    */
   public abstract void putInts(int rowId, int count, int[] src, int srcIndex);
 
   /**
-   * Sets values from [rowId, rowId + count) to [src[srcIndex], src[srcIndex + 
count])
+   * Sets values from [src[srcIndex], src[srcIndex + count * 4]) to [rowId, 
rowId + count)
+   * The data in src must be 4-byte platform native endian ints.
+   */
+  public abstract void putInts(int rowId, int count, byte[] src, int srcIndex);
+
+  /**
+   * Sets values from [src[srcIndex], src[srcIndex + count * 4]) to [rowId, 
rowId + count)
    * The data in src must be 4-byte little endian ints.
    */
   public abstract void putIntsLittleEndian(int rowId, int count, byte[] src, 
int srcIndex);
 
   /**
-   * Sets the value at rowId to `value`.
+   * Sets `value` to the value at rowId.
    */
   public abstract void putLong(int rowId, long value);
 
   /**
-   * Sets values from [rowId, rowId + count) to value.
+   * Sets value to [rowId, rowId + count).
    */
   public abstract void putLongs(int rowId, int count, long value);
 
   /**
-   * Sets values from [rowId, rowId + count) to [src + srcIndex, src + 
srcIndex + count)
+   * Sets values from [src[srcIndex], src[srcIndex + count]) to [rowId, rowId 
+ count)
    */
   public abstract void putLongs(int rowId, int count, long[] src, int 
srcIndex);
 
   /**
-   * Sets values from [rowId, rowId + count) to [src[srcIndex], src[srcIndex + 
count])
+   * Sets values from [src[srcIndex], src[srcIndex + count * 8]) to [rowId, 
rowId + count)
+   * The data in src must be 8-byte platform native endian longs.
+   */
+  public abstract void putLongs(int rowId, int count, byte[] src, int 
srcIndex);
+
+  /**
+   * Sets values from [src + srcIndex, src + srcIndex + count * 8) to [rowId, 
rowId + count)
    * The data in src must be 8-byte little endian longs.
    */
   public abstract void putLongsLittleEndian(int rowId, int count, byte[] src, 
int srcIndex);
 
   /**
-   * Sets the value at rowId to `value`.
+   * Sets `value` to the value at rowId.
    */
   public abstract void putFloat(int rowId, float value);
 
   /**
-   * Sets values from [rowId, rowId + count) to value.
+   * Sets value to [rowId, rowId + count).
    */
   public abstract void putFloats(int rowId, int count, float value);
 
   /**
-   * Sets values from [rowId, rowId + count) to [src + srcIndex, src + 
srcIndex + count)
+   * Sets values from [src[srcIndex], src[srcIndex + count]) to [rowId, rowId 
+ count)
    */
   public abstract void putFloats(int rowId, int count, float[] src, int 
srcIndex);
 
   /**
-   * Sets values from [rowId, rowId + count) to [src[srcIndex], src[srcIndex + 
count])
-   * The data in src must be ieee formatted floats.
+   * Sets values from [src[srcIndex], src[srcIndex + count * 4]) to [rowId, 
rowId + count)
+   * The data in src must be ieee formatted floats in platform native endian.
    */
   public abstract void putFloats(int rowId, int count, byte[] src, int 
srcIndex);
 
   /**
-   * Sets the value at rowId to `value`.
+   * Sets `value` to the value at rowId.
    */
   public abstract void putDouble(int rowId, double value);
 
   /**
-   * Sets values from [rowId, rowId + count) to value.
+   * Sets value to [rowId, rowId + count).
    */
   public abstract void putDoubles(int rowId, int count, double value);
 
   /**
-   * Sets values from [rowId, rowId + count) to [src + srcIndex, src + 
srcIndex + count)
+   * Sets values from [src[srcIndex], src[srcIndex + count]) to [rowId, rowId 
+ count)
    */
   public abstract void putDoubles(int rowId, int count, double[] src, int 
srcIndex);
 
   /**
-   * Sets values from [rowId, rowId + count) to [src[srcIndex], src[srcIndex + 
count])
-   * The data in src must be ieee formatted doubles.
+   * Sets values from [src[srcIndex], src[srcIndex + count * 8]) to [rowId, 
rowId + count)
+   * The data in src must be ieee formatted doubles in platform native endian.
    */
   public abstract void putDoubles(int rowId, int count, byte[] src, int 
srcIndex);
 
@@ -254,7 +272,7 @@ public abstract class WritableColumnVector extends 
ColumnVector {
   public abstract void putArray(int rowId, int offset, int length);
 
   /**
-   * Sets the value at rowId to `value`.
+   * Sets values from [value + offset, value + offset + count) to the values 
at rowId.
    */
   public abstract int putByteArray(int rowId, byte[] value, int offset, int 
count);
   public final int putByteArray(int rowId, byte[] value) {

http://git-wip-us.apache.org/repos/asf/spark/blob/64df08b6/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnAccessor.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnAccessor.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnAccessor.scala
index 6241b79..24c8ac8 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnAccessor.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnAccessor.scala
@@ -24,6 +24,7 @@ import scala.annotation.tailrec
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{UnsafeArrayData, 
UnsafeMapData, UnsafeRow}
 import 
org.apache.spark.sql.execution.columnar.compression.CompressibleColumnAccessor
+import org.apache.spark.sql.execution.vectorized.WritableColumnVector
 import org.apache.spark.sql.types._
 
 /**
@@ -62,6 +63,9 @@ private[columnar] abstract class BasicColumnAccessor[JvmType](
   }
 
   protected def underlyingBuffer = buffer
+
+  def getByteBuffer: ByteBuffer =
+    buffer.duplicate.order(ByteOrder.nativeOrder())
 }
 
 private[columnar] class NullColumnAccessor(buffer: ByteBuffer)
@@ -122,7 +126,7 @@ private[columnar] class MapColumnAccessor(buffer: 
ByteBuffer, dataType: MapType)
   extends BasicColumnAccessor[UnsafeMapData](buffer, MAP(dataType))
   with NullableColumnAccessor
 
-private[columnar] object ColumnAccessor {
+private[sql] object ColumnAccessor {
   @tailrec
   def apply(dataType: DataType, buffer: ByteBuffer): ColumnAccessor = {
     val buf = buffer.order(ByteOrder.nativeOrder)
@@ -149,4 +153,14 @@ private[columnar] object ColumnAccessor {
         throw new Exception(s"not support type: $other")
     }
   }
+
+  def decompress(columnAccessor: ColumnAccessor, columnVector: 
WritableColumnVector, numRows: Int):
+      Unit = {
+    if (columnAccessor.isInstanceOf[NativeColumnAccessor[_]]) {
+      val nativeAccessor = columnAccessor.asInstanceOf[NativeColumnAccessor[_]]
+      nativeAccessor.decompress(columnVector, numRows)
+    } else {
+      throw new RuntimeException("Not support non-primitive type now")
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/64df08b6/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnType.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnType.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnType.scala
index 5cfb003..e9b150f 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnType.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnType.scala
@@ -43,6 +43,12 @@ import org.apache.spark.unsafe.types.UTF8String
  * WARNING: This only works with HeapByteBuffer
  */
 private[columnar] object ByteBufferHelper {
+  def getShort(buffer: ByteBuffer): Short = {
+    val pos = buffer.position()
+    buffer.position(pos + 2)
+    Platform.getShort(buffer.array(), Platform.BYTE_ARRAY_OFFSET + pos)
+  }
+
   def getInt(buffer: ByteBuffer): Int = {
     val pos = buffer.position()
     buffer.position(pos + 4)
@@ -66,6 +72,33 @@ private[columnar] object ByteBufferHelper {
     buffer.position(pos + 8)
     Platform.getDouble(buffer.array(), Platform.BYTE_ARRAY_OFFSET + pos)
   }
+
+  def putShort(buffer: ByteBuffer, value: Short): Unit = {
+    val pos = buffer.position()
+    buffer.position(pos + 2)
+    Platform.putShort(buffer.array(), Platform.BYTE_ARRAY_OFFSET + pos, value)
+  }
+
+  def putInt(buffer: ByteBuffer, value: Int): Unit = {
+    val pos = buffer.position()
+    buffer.position(pos + 4)
+    Platform.putInt(buffer.array(), Platform.BYTE_ARRAY_OFFSET + pos, value)
+  }
+
+  def putLong(buffer: ByteBuffer, value: Long): Unit = {
+    val pos = buffer.position()
+    buffer.position(pos + 8)
+    Platform.putLong(buffer.array(), Platform.BYTE_ARRAY_OFFSET + pos, value)
+  }
+
+  def copyMemory(src: ByteBuffer, dst: ByteBuffer, len: Int): Unit = {
+    val srcPos = src.position()
+    val dstPos = dst.position()
+    src.position(srcPos + len)
+    dst.position(dstPos + len)
+    Platform.copyMemory(src.array(), Platform.BYTE_ARRAY_OFFSET + srcPos,
+      dst.array(), Platform.BYTE_ARRAY_OFFSET + dstPos, len)
+  }
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/64df08b6/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressibleColumnAccessor.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressibleColumnAccessor.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressibleColumnAccessor.scala
index e1d13ad..774011f 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressibleColumnAccessor.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressibleColumnAccessor.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.columnar.compression
 
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.execution.columnar.{ColumnAccessor, 
NativeColumnAccessor}
+import org.apache.spark.sql.execution.vectorized.WritableColumnVector
 import org.apache.spark.sql.types.AtomicType
 
 private[columnar] trait CompressibleColumnAccessor[T <: AtomicType] extends 
ColumnAccessor {
@@ -36,4 +37,7 @@ private[columnar] trait CompressibleColumnAccessor[T <: 
AtomicType] extends Colu
   override def extractSingle(row: InternalRow, ordinal: Int): Unit = {
     decoder.next(row, ordinal)
   }
+
+  def decompress(columnVector: WritableColumnVector, capacity: Int): Unit =
+    decoder.decompress(columnVector, capacity)
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/64df08b6/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressionScheme.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressionScheme.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressionScheme.scala
index 6e4f1c5..f8aeba4 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressionScheme.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressionScheme.scala
@@ -21,6 +21,7 @@ import java.nio.{ByteBuffer, ByteOrder}
 
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.execution.columnar.{ColumnType, NativeColumnType}
+import org.apache.spark.sql.execution.vectorized.WritableColumnVector
 import org.apache.spark.sql.types.AtomicType
 
 private[columnar] trait Encoder[T <: AtomicType] {
@@ -41,6 +42,8 @@ private[columnar] trait Decoder[T <: AtomicType] {
   def next(row: InternalRow, ordinal: Int): Unit
 
   def hasNext: Boolean
+
+  def decompress(columnVector: WritableColumnVector, capacity: Int): Unit
 }
 
 private[columnar] trait CompressionScheme {

http://git-wip-us.apache.org/repos/asf/spark/blob/64df08b6/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/compressionSchemes.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/compressionSchemes.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/compressionSchemes.scala
index ee99c90..bf00ad9 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/compressionSchemes.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/compressionSchemes.scala
@@ -18,12 +18,14 @@
 package org.apache.spark.sql.execution.columnar.compression
 
 import java.nio.ByteBuffer
+import java.nio.ByteOrder
 
 import scala.collection.mutable
 
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow
 import org.apache.spark.sql.execution.columnar._
+import org.apache.spark.sql.execution.vectorized.WritableColumnVector
 import org.apache.spark.sql.types._
 
 
@@ -61,6 +63,101 @@ private[columnar] case object PassThrough extends 
CompressionScheme {
     }
 
     override def hasNext: Boolean = buffer.hasRemaining
+
+    private def putBooleans(
+        columnVector: WritableColumnVector, pos: Int, bufferPos: Int, len: 
Int): Unit = {
+      for (i <- 0 until len) {
+        columnVector.putBoolean(pos + i, (buffer.get(bufferPos + i) != 0))
+      }
+    }
+
+    private def putBytes(
+        columnVector: WritableColumnVector, pos: Int, bufferPos: Int, len: 
Int): Unit = {
+      columnVector.putBytes(pos, len, buffer.array, bufferPos)
+    }
+
+    private def putShorts(
+        columnVector: WritableColumnVector, pos: Int, bufferPos: Int, len: 
Int): Unit = {
+      columnVector.putShorts(pos, len, buffer.array, bufferPos)
+    }
+
+    private def putInts(
+        columnVector: WritableColumnVector, pos: Int, bufferPos: Int, len: 
Int): Unit = {
+      columnVector.putInts(pos, len, buffer.array, bufferPos)
+    }
+
+    private def putLongs(
+        columnVector: WritableColumnVector, pos: Int, bufferPos: Int, len: 
Int): Unit = {
+      columnVector.putLongs(pos, len, buffer.array, bufferPos)
+    }
+
+    private def putFloats(
+        columnVector: WritableColumnVector, pos: Int, bufferPos: Int, len: 
Int): Unit = {
+      columnVector.putFloats(pos, len, buffer.array, bufferPos)
+    }
+
+    private def putDoubles(
+        columnVector: WritableColumnVector, pos: Int, bufferPos: Int, len: 
Int): Unit = {
+      columnVector.putDoubles(pos, len, buffer.array, bufferPos)
+    }
+
+    private def decompress0(
+        columnVector: WritableColumnVector,
+        capacity: Int,
+        unitSize: Int,
+        putFunction: (WritableColumnVector, Int, Int, Int) => Unit): Unit = {
+      val nullsBuffer = buffer.duplicate().order(ByteOrder.nativeOrder())
+      nullsBuffer.rewind()
+      val nullCount = ByteBufferHelper.getInt(nullsBuffer)
+      var nextNullIndex = if (nullCount > 0) 
ByteBufferHelper.getInt(nullsBuffer) else capacity
+      var pos = 0
+      var seenNulls = 0
+      var bufferPos = buffer.position
+      while (pos < capacity) {
+        if (pos != nextNullIndex) {
+          val len = nextNullIndex - pos
+          assert(len * unitSize < Int.MaxValue)
+          putFunction(columnVector, pos, bufferPos, len)
+          bufferPos += len * unitSize
+          pos += len
+        } else {
+          seenNulls += 1
+          nextNullIndex = if (seenNulls < nullCount) {
+            ByteBufferHelper.getInt(nullsBuffer)
+          } else {
+            capacity
+          }
+          columnVector.putNull(pos)
+          pos += 1
+        }
+      }
+    }
+
+    override def decompress(columnVector: WritableColumnVector, capacity: 
Int): Unit = {
+      columnType.dataType match {
+        case _: BooleanType =>
+          val unitSize = 1
+          decompress0(columnVector, capacity, unitSize, putBooleans)
+        case _: ByteType =>
+          val unitSize = 1
+          decompress0(columnVector, capacity, unitSize, putBytes)
+        case _: ShortType =>
+          val unitSize = 2
+          decompress0(columnVector, capacity, unitSize, putShorts)
+        case _: IntegerType =>
+          val unitSize = 4
+          decompress0(columnVector, capacity, unitSize, putInts)
+        case _: LongType =>
+          val unitSize = 8
+          decompress0(columnVector, capacity, unitSize, putLongs)
+        case _: FloatType =>
+          val unitSize = 4
+          decompress0(columnVector, capacity, unitSize, putFloats)
+        case _: DoubleType =>
+          val unitSize = 8
+          decompress0(columnVector, capacity, unitSize, putDoubles)
+      }
+    }
   }
 }
 
@@ -169,6 +266,94 @@ private[columnar] case object RunLengthEncoding extends 
CompressionScheme {
     }
 
     override def hasNext: Boolean = valueCount < run || buffer.hasRemaining
+
+    private def putBoolean(columnVector: WritableColumnVector, pos: Int, 
value: Long): Unit = {
+      columnVector.putBoolean(pos, value == 1)
+    }
+
+    private def getByte(buffer: ByteBuffer): Long = {
+      buffer.get().toLong
+    }
+
+    private def putByte(columnVector: WritableColumnVector, pos: Int, value: 
Long): Unit = {
+      columnVector.putByte(pos, value.toByte)
+    }
+
+    private def getShort(buffer: ByteBuffer): Long = {
+      buffer.getShort().toLong
+    }
+
+    private def putShort(columnVector: WritableColumnVector, pos: Int, value: 
Long): Unit = {
+      columnVector.putShort(pos, value.toShort)
+    }
+
+    private def getInt(buffer: ByteBuffer): Long = {
+      buffer.getInt().toLong
+    }
+
+    private def putInt(columnVector: WritableColumnVector, pos: Int, value: 
Long): Unit = {
+      columnVector.putInt(pos, value.toInt)
+    }
+
+    private def getLong(buffer: ByteBuffer): Long = {
+      buffer.getLong()
+    }
+
+    private def putLong(columnVector: WritableColumnVector, pos: Int, value: 
Long): Unit = {
+      columnVector.putLong(pos, value)
+    }
+
+    private def decompress0(
+        columnVector: WritableColumnVector,
+        capacity: Int,
+        getFunction: (ByteBuffer) => Long,
+        putFunction: (WritableColumnVector, Int, Long) => Unit): Unit = {
+      val nullsBuffer = buffer.duplicate().order(ByteOrder.nativeOrder())
+      nullsBuffer.rewind()
+      val nullCount = ByteBufferHelper.getInt(nullsBuffer)
+      var nextNullIndex = if (nullCount > 0) 
ByteBufferHelper.getInt(nullsBuffer) else -1
+      var pos = 0
+      var seenNulls = 0
+      var runLocal = 0
+      var valueCountLocal = 0
+      var currentValueLocal: Long = 0
+
+      while (valueCountLocal < runLocal || (pos < capacity)) {
+        if (pos != nextNullIndex) {
+          if (valueCountLocal == runLocal) {
+            currentValueLocal = getFunction(buffer)
+            runLocal = ByteBufferHelper.getInt(buffer)
+            valueCountLocal = 1
+          } else {
+            valueCountLocal += 1
+          }
+          putFunction(columnVector, pos, currentValueLocal)
+        } else {
+          seenNulls += 1
+          if (seenNulls < nullCount) {
+            nextNullIndex = ByteBufferHelper.getInt(nullsBuffer)
+          }
+          columnVector.putNull(pos)
+        }
+        pos += 1
+      }
+    }
+
+    override def decompress(columnVector: WritableColumnVector, capacity: 
Int): Unit = {
+      columnType.dataType match {
+        case _: BooleanType =>
+          decompress0(columnVector, capacity, getByte, putBoolean)
+        case _: ByteType =>
+          decompress0(columnVector, capacity, getByte, putByte)
+        case _: ShortType =>
+          decompress0(columnVector, capacity, getShort, putShort)
+        case _: IntegerType =>
+          decompress0(columnVector, capacity, getInt, putInt)
+        case _: LongType =>
+          decompress0(columnVector, capacity, getLong, putLong)
+        case _ => throw new IllegalStateException("Not supported type in 
RunLengthEncoding.")
+      }
+    }
   }
 }
 
@@ -266,11 +451,32 @@ private[columnar] case object DictionaryEncoding extends 
CompressionScheme {
   }
 
   class Decoder[T <: AtomicType](buffer: ByteBuffer, columnType: 
NativeColumnType[T])
-    extends compression.Decoder[T] {
-
-    private val dictionary: Array[Any] = {
-      val elementNum = ByteBufferHelper.getInt(buffer)
-      Array.fill[Any](elementNum)(columnType.extract(buffer).asInstanceOf[Any])
+      extends compression.Decoder[T] {
+    val elementNum = ByteBufferHelper.getInt(buffer)
+    private val dictionary: Array[Any] = new Array[Any](elementNum)
+    private var intDictionary: Array[Int] = null
+    private var longDictionary: Array[Long] = null
+
+    columnType.dataType match {
+      case _: IntegerType =>
+        intDictionary = new Array[Int](elementNum)
+        for (i <- 0 until elementNum) {
+          val v = columnType.extract(buffer).asInstanceOf[Int]
+          intDictionary(i) = v
+          dictionary(i) = v
+        }
+      case _: LongType =>
+        longDictionary = new Array[Long](elementNum)
+        for (i <- 0 until elementNum) {
+          val v = columnType.extract(buffer).asInstanceOf[Long]
+          longDictionary(i) = v
+          dictionary(i) = v
+        }
+      case _: StringType =>
+        for (i <- 0 until elementNum) {
+          val v = columnType.extract(buffer).asInstanceOf[Any]
+          dictionary(i) = v
+        }
     }
 
     override def next(row: InternalRow, ordinal: Int): Unit = {
@@ -278,6 +484,46 @@ private[columnar] case object DictionaryEncoding extends 
CompressionScheme {
     }
 
     override def hasNext: Boolean = buffer.hasRemaining
+
+    override def decompress(columnVector: WritableColumnVector, capacity: 
Int): Unit = {
+      val nullsBuffer = buffer.duplicate().order(ByteOrder.nativeOrder())
+      nullsBuffer.rewind()
+      val nullCount = ByteBufferHelper.getInt(nullsBuffer)
+      var nextNullIndex = if (nullCount > 0) 
ByteBufferHelper.getInt(nullsBuffer) else -1
+      var pos = 0
+      var seenNulls = 0
+      columnType.dataType match {
+        case _: IntegerType =>
+          val dictionaryIds = columnVector.reserveDictionaryIds(capacity)
+          columnVector.setDictionary(new ColumnDictionary(intDictionary))
+          while (pos < capacity) {
+            if (pos != nextNullIndex) {
+              dictionaryIds.putInt(pos, buffer.getShort())
+            } else {
+              seenNulls += 1
+              if (seenNulls < nullCount) nextNullIndex = 
ByteBufferHelper.getInt(nullsBuffer)
+              columnVector.putNull(pos)
+            }
+            pos += 1
+          }
+        case _: LongType =>
+          val dictionaryIds = columnVector.reserveDictionaryIds(capacity)
+          columnVector.setDictionary(new ColumnDictionary(longDictionary))
+          while (pos < capacity) {
+            if (pos != nextNullIndex) {
+              dictionaryIds.putInt(pos, buffer.getShort())
+            } else {
+              seenNulls += 1
+              if (seenNulls < nullCount) {
+                nextNullIndex = ByteBufferHelper.getInt(nullsBuffer)
+              }
+              columnVector.putNull(pos)
+            }
+            pos += 1
+          }
+        case _ => throw new IllegalStateException("Not supported type in 
DictionaryEncoding.")
+      }
+    }
   }
 }
 
@@ -368,6 +614,38 @@ private[columnar] case object BooleanBitSet extends 
CompressionScheme {
     }
 
     override def hasNext: Boolean = visited < count
+
+    override def decompress(columnVector: WritableColumnVector, capacity: 
Int): Unit = {
+      val countLocal = count
+      var currentWordLocal: Long = 0
+      var visitedLocal: Int = 0
+      val nullsBuffer = buffer.duplicate().order(ByteOrder.nativeOrder())
+      nullsBuffer.rewind()
+      val nullCount = ByteBufferHelper.getInt(nullsBuffer)
+      var nextNullIndex = if (nullCount > 0) 
ByteBufferHelper.getInt(nullsBuffer) else -1
+      var pos = 0
+      var seenNulls = 0
+
+      while (visitedLocal < countLocal) {
+        if (pos != nextNullIndex) {
+          val bit = visitedLocal % BITS_PER_LONG
+
+          visitedLocal += 1
+          if (bit == 0) {
+            currentWordLocal = ByteBufferHelper.getLong(buffer)
+          }
+
+          columnVector.putBoolean(pos, ((currentWordLocal >> bit) & 1) != 0)
+        } else {
+          seenNulls += 1
+          if (seenNulls < nullCount) {
+            nextNullIndex = ByteBufferHelper.getInt(nullsBuffer)
+          }
+          columnVector.putNull(pos)
+        }
+        pos += 1
+      }
+    }
   }
 }
 
@@ -448,6 +726,32 @@ private[columnar] case object IntDelta extends 
CompressionScheme {
       prev = if (delta > Byte.MinValue) prev + delta else 
ByteBufferHelper.getInt(buffer)
       row.setInt(ordinal, prev)
     }
+
+    override def decompress(columnVector: WritableColumnVector, capacity: 
Int): Unit = {
+      var prevLocal: Int = 0
+      val nullsBuffer = buffer.duplicate().order(ByteOrder.nativeOrder())
+      nullsBuffer.rewind()
+      val nullCount = ByteBufferHelper.getInt(nullsBuffer)
+      var nextNullIndex = if (nullCount > 0) 
ByteBufferHelper.getInt(nullsBuffer) else -1
+      var pos = 0
+      var seenNulls = 0
+
+      while (pos < capacity) {
+        if (pos != nextNullIndex) {
+          val delta = buffer.get
+          prevLocal = if (delta > Byte.MinValue) { prevLocal + delta } else
+          { ByteBufferHelper.getInt(buffer) }
+          columnVector.putInt(pos, prevLocal)
+        } else {
+          seenNulls += 1
+          if (seenNulls < nullCount) {
+            nextNullIndex = ByteBufferHelper.getInt(nullsBuffer)
+          }
+          columnVector.putNull(pos)
+        }
+        pos += 1
+      }
+    }
   }
 }
 
@@ -528,5 +832,31 @@ private[columnar] case object LongDelta extends 
CompressionScheme {
       prev = if (delta > Byte.MinValue) prev + delta else 
ByteBufferHelper.getLong(buffer)
       row.setLong(ordinal, prev)
     }
+
+    override def decompress(columnVector: WritableColumnVector, capacity: 
Int): Unit = {
+      var prevLocal: Long = 0
+      val nullsBuffer = buffer.duplicate().order(ByteOrder.nativeOrder())
+      nullsBuffer.rewind
+      val nullCount = ByteBufferHelper.getInt(nullsBuffer)
+      var nextNullIndex = if (nullCount > 0) 
ByteBufferHelper.getInt(nullsBuffer) else -1
+      var pos = 0
+      var seenNulls = 0
+
+      while (pos < capacity) {
+        if (pos != nextNullIndex) {
+          val delta = buffer.get()
+          prevLocal = if (delta > Byte.MinValue) { prevLocal + delta } else
+          { ByteBufferHelper.getLong(buffer) }
+          columnVector.putLong(pos, prevLocal)
+        } else {
+          seenNulls += 1
+          if (seenNulls < nullCount) {
+            nextNullIndex = ByteBufferHelper.getInt(nullsBuffer)
+          }
+          columnVector.putNull(pos)
+        }
+        pos += 1
+      }
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/64df08b6/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/BooleanBitSetSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/BooleanBitSetSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/BooleanBitSetSuite.scala
index d01bf91..2d71a42 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/BooleanBitSetSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/BooleanBitSetSuite.scala
@@ -22,6 +22,8 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
 import org.apache.spark.sql.execution.columnar.{BOOLEAN, NoopColumnStats}
 import org.apache.spark.sql.execution.columnar.ColumnarTestUtils._
+import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector
+import org.apache.spark.sql.types.BooleanType
 
 class BooleanBitSetSuite extends SparkFunSuite {
   import BooleanBitSet._
@@ -85,6 +87,36 @@ class BooleanBitSetSuite extends SparkFunSuite {
     assert(!decoder.hasNext)
   }
 
+  def skeletonForDecompress(count: Int) {
+    val builder = TestCompressibleColumnBuilder(new NoopColumnStats, BOOLEAN, 
BooleanBitSet)
+    val rows = Seq.fill[InternalRow](count)(makeRandomRow(BOOLEAN))
+    val values = rows.map(_.getBoolean(0))
+
+    rows.foreach(builder.appendFrom(_, 0))
+    val buffer = builder.build()
+
+    // ----------------
+    // Tests decompress
+    // ----------------
+
+    // Rewinds, skips column header and 4 more bytes for compression scheme ID
+    val headerSize = CompressionScheme.columnHeaderSize(buffer)
+    buffer.position(headerSize)
+    assertResult(BooleanBitSet.typeId, "Wrong compression scheme 
ID")(buffer.getInt())
+
+    val decoder = BooleanBitSet.decoder(buffer, BOOLEAN)
+    val columnVector = new OnHeapColumnVector(values.length, BooleanType)
+    decoder.decompress(columnVector, values.length)
+
+    if (values.nonEmpty) {
+      values.zipWithIndex.foreach { case (b: Boolean, index: Int) =>
+        assertResult(b, s"Wrong ${index}-th decoded boolean value") {
+          columnVector.getBoolean(index)
+        }
+      }
+    }
+  }
+
   test(s"$BooleanBitSet: empty") {
     skeleton(0)
   }
@@ -104,4 +136,24 @@ class BooleanBitSetSuite extends SparkFunSuite {
   test(s"$BooleanBitSet: multiple words and 1 more bit") {
     skeleton(BITS_PER_LONG * 2 + 1)
   }
+
+  test(s"$BooleanBitSet: empty for decompression()") {
+    skeletonForDecompress(0)
+  }
+
+  test(s"$BooleanBitSet: less than 1 word for decompression()") {
+    skeletonForDecompress(BITS_PER_LONG - 1)
+  }
+
+  test(s"$BooleanBitSet: exactly 1 word for decompression()") {
+    skeletonForDecompress(BITS_PER_LONG)
+  }
+
+  test(s"$BooleanBitSet: multiple whole words for decompression()") {
+    skeletonForDecompress(BITS_PER_LONG * 2)
+  }
+
+  test(s"$BooleanBitSet: multiple words and 1 more bit for decompression()") {
+    skeletonForDecompress(BITS_PER_LONG * 2 + 1)
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/64df08b6/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/DictionaryEncodingSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/DictionaryEncodingSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/DictionaryEncodingSuite.scala
index 67139b1..28950b7 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/DictionaryEncodingSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/DictionaryEncodingSuite.scala
@@ -23,16 +23,19 @@ import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
 import org.apache.spark.sql.execution.columnar._
 import org.apache.spark.sql.execution.columnar.ColumnarTestUtils._
+import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector
 import org.apache.spark.sql.types.AtomicType
 
 class DictionaryEncodingSuite extends SparkFunSuite {
+  val nullValue = -1
   testDictionaryEncoding(new IntColumnStats, INT)
   testDictionaryEncoding(new LongColumnStats, LONG)
-  testDictionaryEncoding(new StringColumnStats, STRING)
+  testDictionaryEncoding(new StringColumnStats, STRING, false)
 
   def testDictionaryEncoding[T <: AtomicType](
       columnStats: ColumnStats,
-      columnType: NativeColumnType[T]) {
+      columnType: NativeColumnType[T],
+      testDecompress: Boolean = true) {
 
     val typeName = columnType.getClass.getSimpleName.stripSuffix("$")
 
@@ -113,6 +116,58 @@ class DictionaryEncodingSuite extends SparkFunSuite {
       }
     }
 
+    def skeletonForDecompress(uniqueValueCount: Int, inputSeq: Seq[Int]) {
+      if (!testDecompress) return
+      val builder = TestCompressibleColumnBuilder(columnStats, columnType, 
DictionaryEncoding)
+      val (values, rows) = makeUniqueValuesAndSingleValueRows(columnType, 
uniqueValueCount)
+      val dictValues = stableDistinct(inputSeq)
+
+      val nullRow = new GenericInternalRow(1)
+      nullRow.setNullAt(0)
+      inputSeq.foreach { i =>
+        if (i == nullValue) {
+          builder.appendFrom(nullRow, 0)
+        } else {
+          builder.appendFrom(rows(i), 0)
+        }
+      }
+      val buffer = builder.build()
+
+      // ----------------
+      // Tests decompress
+      // ----------------
+      // Rewinds, skips column header and 4 more bytes for compression scheme 
ID
+      val headerSize = CompressionScheme.columnHeaderSize(buffer)
+      buffer.position(headerSize)
+      assertResult(DictionaryEncoding.typeId, "Wrong compression scheme 
ID")(buffer.getInt())
+
+      val decoder = DictionaryEncoding.decoder(buffer, columnType)
+      val columnVector = new OnHeapColumnVector(inputSeq.length, 
columnType.dataType)
+      decoder.decompress(columnVector, inputSeq.length)
+
+      if (inputSeq.nonEmpty) {
+        inputSeq.zipWithIndex.foreach { case (i: Any, index: Int) =>
+          if (i == nullValue) {
+            assertResult(true, s"Wrong null ${index}-th position") {
+              columnVector.isNullAt(index)
+            }
+          } else {
+            columnType match {
+              case INT =>
+                assertResult(values(i), s"Wrong ${index}-th decoded int 
value") {
+                  columnVector.getInt(index)
+                }
+              case LONG =>
+                assertResult(values(i), s"Wrong ${index}-th decoded long 
value") {
+                  columnVector.getLong(index)
+                }
+              case _ => fail("Unsupported type")
+            }
+          }
+        }
+      }
+    }
+
     test(s"$DictionaryEncoding with $typeName: empty") {
       skeleton(0, Seq.empty)
     }
@@ -124,5 +179,18 @@ class DictionaryEncodingSuite extends SparkFunSuite {
     test(s"$DictionaryEncoding with $typeName: dictionary overflow") {
       skeleton(DictionaryEncoding.MAX_DICT_SIZE + 1, 0 to 
DictionaryEncoding.MAX_DICT_SIZE)
     }
+
+    test(s"$DictionaryEncoding with $typeName: empty for decompress()") {
+      skeletonForDecompress(0, Seq.empty)
+    }
+
+    test(s"$DictionaryEncoding with $typeName: simple case for decompress()") {
+      skeletonForDecompress(2, Seq(0, nullValue, 0, nullValue))
+    }
+
+    test(s"$DictionaryEncoding with $typeName: dictionary overflow for 
decompress()") {
+      skeletonForDecompress(DictionaryEncoding.MAX_DICT_SIZE + 2,
+        Seq(nullValue) ++ (0 to DictionaryEncoding.MAX_DICT_SIZE - 1) ++ 
Seq(nullValue))
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/64df08b6/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/IntegralDeltaSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/IntegralDeltaSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/IntegralDeltaSuite.scala
index 411d31f..0d9f1fb 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/IntegralDeltaSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/IntegralDeltaSuite.scala
@@ -21,9 +21,11 @@ import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
 import org.apache.spark.sql.execution.columnar._
 import org.apache.spark.sql.execution.columnar.ColumnarTestUtils._
+import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector
 import org.apache.spark.sql.types.IntegralType
 
 class IntegralDeltaSuite extends SparkFunSuite {
+  val nullValue = -1
   testIntegralDelta(new IntColumnStats, INT, IntDelta)
   testIntegralDelta(new LongColumnStats, LONG, LongDelta)
 
@@ -109,6 +111,53 @@ class IntegralDeltaSuite extends SparkFunSuite {
       assert(!decoder.hasNext)
     }
 
+    def skeletonForDecompress(input: Seq[I#InternalType]) {
+      val builder = TestCompressibleColumnBuilder(columnStats, columnType, 
scheme)
+      val row = new GenericInternalRow(1)
+      val nullRow = new GenericInternalRow(1)
+      nullRow.setNullAt(0)
+      input.map { value =>
+        if (value == nullValue) {
+          builder.appendFrom(nullRow, 0)
+        } else {
+          columnType.setField(row, 0, value)
+          builder.appendFrom(row, 0)
+        }
+      }
+      val buffer = builder.build()
+
+      // ----------------
+      // Tests decompress
+      // ----------------
+      // Rewinds, skips column header and 4 more bytes for compression scheme 
ID
+      val headerSize = CompressionScheme.columnHeaderSize(buffer)
+      buffer.position(headerSize)
+      assertResult(scheme.typeId, "Wrong compression scheme 
ID")(buffer.getInt())
+
+      val decoder = scheme.decoder(buffer, columnType)
+      val columnVector = new OnHeapColumnVector(input.length, 
columnType.dataType)
+      decoder.decompress(columnVector, input.length)
+
+      if (input.nonEmpty) {
+        input.zipWithIndex.foreach {
+          case (expected: Any, index: Int) if expected == nullValue =>
+            assertResult(true, s"Wrong null ${index}th-position") {
+              columnVector.isNullAt(index)
+            }
+          case (expected: Int, index: Int) =>
+            assertResult(expected, s"Wrong ${index}-th decoded int value") {
+              columnVector.getInt(index)
+            }
+          case (expected: Long, index: Int) =>
+            assertResult(expected, s"Wrong ${index}-th decoded long value") {
+              columnVector.getLong(index)
+            }
+          case _ =>
+            fail("Unsupported type")
+        }
+      }
+    }
+
     test(s"$scheme: empty column") {
       skeleton(Seq.empty)
     }
@@ -127,5 +176,28 @@ class IntegralDeltaSuite extends SparkFunSuite {
       val input = Array.fill[Any](10000)(makeRandomValue(columnType))
       skeleton(input.map(_.asInstanceOf[I#InternalType]))
     }
+
+
+    test(s"$scheme: empty column for decompress()") {
+      skeletonForDecompress(Seq.empty)
+    }
+
+    test(s"$scheme: simple case for decompress()") {
+      val input = columnType match {
+        case INT => Seq(2: Int, 1: Int, 2: Int, 130: Int)
+        case LONG => Seq(2: Long, 1: Long, 2: Long, 130: Long)
+      }
+
+      skeletonForDecompress(input.map(_.asInstanceOf[I#InternalType]))
+    }
+
+    test(s"$scheme: simple case with null for decompress()") {
+      val input = columnType match {
+        case INT => Seq(2: Int, 1: Int, 2: Int, nullValue: Int, 5: Int)
+        case LONG => Seq(2: Long, 1: Long, 2: Long, nullValue: Long, 5: Long)
+      }
+
+      skeletonForDecompress(input.map(_.asInstanceOf[I#InternalType]))
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/64df08b6/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/PassThroughEncodingSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/PassThroughEncodingSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/PassThroughEncodingSuite.scala
new file mode 100644
index 0000000..b6f0b5e
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/PassThroughEncodingSuite.scala
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.columnar.compression
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
+import org.apache.spark.sql.execution.columnar._
+import org.apache.spark.sql.execution.columnar.ColumnarTestUtils._
+import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector
+import org.apache.spark.sql.types.AtomicType
+
+class PassThroughSuite extends SparkFunSuite {
+  val nullValue = -1
+  testPassThrough(new ByteColumnStats, BYTE)
+  testPassThrough(new ShortColumnStats, SHORT)
+  testPassThrough(new IntColumnStats, INT)
+  testPassThrough(new LongColumnStats, LONG)
+  testPassThrough(new FloatColumnStats, FLOAT)
+  testPassThrough(new DoubleColumnStats, DOUBLE)
+
+  def testPassThrough[T <: AtomicType](
+      columnStats: ColumnStats,
+      columnType: NativeColumnType[T]) {
+
+    val typeName = columnType.getClass.getSimpleName.stripSuffix("$")
+
+    def skeleton(input: Seq[T#InternalType]) {
+      // -------------
+      // Tests encoder
+      // -------------
+
+      val builder = TestCompressibleColumnBuilder(columnStats, columnType, 
PassThrough)
+
+      input.map { value =>
+        val row = new GenericInternalRow(1)
+        columnType.setField(row, 0, value)
+        builder.appendFrom(row, 0)
+      }
+
+      val buffer = builder.build()
+      // Column type ID + null count + null positions
+      val headerSize = CompressionScheme.columnHeaderSize(buffer)
+
+      // Compression scheme ID + compressed contents
+      val compressedSize = 4 + input.size * columnType.defaultSize
+
+      // 4 extra bytes for compression scheme type ID
+      assertResult(headerSize + compressedSize, "Wrong buffer 
capacity")(buffer.capacity)
+
+      buffer.position(headerSize)
+      assertResult(PassThrough.typeId, "Wrong compression scheme 
ID")(buffer.getInt())
+
+      if (input.nonEmpty) {
+        input.foreach { value =>
+          assertResult(value, "Wrong value")(columnType.extract(buffer))
+        }
+      }
+
+      // -------------
+      // Tests decoder
+      // -------------
+
+      // Rewinds, skips column header and 4 more bytes for compression scheme 
ID
+      buffer.rewind().position(headerSize + 4)
+
+      val decoder = PassThrough.decoder(buffer, columnType)
+      val mutableRow = new GenericInternalRow(1)
+
+      if (input.nonEmpty) {
+        input.foreach{
+          assert(decoder.hasNext)
+          assertResult(_, "Wrong decoded value") {
+            decoder.next(mutableRow, 0)
+            columnType.getField(mutableRow, 0)
+          }
+        }
+      }
+      assert(!decoder.hasNext)
+    }
+
+    def skeletonForDecompress(input: Seq[T#InternalType]) {
+      val builder = TestCompressibleColumnBuilder(columnStats, columnType, 
PassThrough)
+      val row = new GenericInternalRow(1)
+      val nullRow = new GenericInternalRow(1)
+      nullRow.setNullAt(0)
+      input.map { value =>
+        if (value == nullValue) {
+          builder.appendFrom(nullRow, 0)
+        } else {
+          columnType.setField(row, 0, value)
+          builder.appendFrom(row, 0)
+        }
+      }
+      val buffer = builder.build()
+
+      // ----------------
+      // Tests decompress
+      // ----------------
+      // Rewinds, skips column header and 4 more bytes for compression scheme 
ID
+      val headerSize = CompressionScheme.columnHeaderSize(buffer)
+      buffer.position(headerSize)
+      assertResult(PassThrough.typeId, "Wrong compression scheme 
ID")(buffer.getInt())
+
+      val decoder = PassThrough.decoder(buffer, columnType)
+      val columnVector = new OnHeapColumnVector(input.length, 
columnType.dataType)
+      decoder.decompress(columnVector, input.length)
+
+      if (input.nonEmpty) {
+        input.zipWithIndex.foreach {
+          case (expected: Any, index: Int) if expected == nullValue =>
+            assertResult(true, s"Wrong null ${index}th-position") {
+              columnVector.isNullAt(index)
+            }
+          case (expected: Byte, index: Int) =>
+            assertResult(expected, s"Wrong ${index}-th decoded byte value") {
+              columnVector.getByte(index)
+            }
+          case (expected: Short, index: Int) =>
+            assertResult(expected, s"Wrong ${index}-th decoded short value") {
+              columnVector.getShort(index)
+            }
+          case (expected: Int, index: Int) =>
+            assertResult(expected, s"Wrong ${index}-th decoded int value") {
+              columnVector.getInt(index)
+            }
+          case (expected: Long, index: Int) =>
+            assertResult(expected, s"Wrong ${index}-th decoded long value") {
+              columnVector.getLong(index)
+            }
+          case (expected: Float, index: Int) =>
+            assertResult(expected, s"Wrong ${index}-th decoded float value") {
+              columnVector.getFloat(index)
+            }
+          case (expected: Double, index: Int) =>
+            assertResult(expected, s"Wrong ${index}-th decoded double value") {
+              columnVector.getDouble(index)
+            }
+          case _ => fail("Unsupported type")
+        }
+      }
+    }
+
+    test(s"$PassThrough with $typeName: empty column") {
+      skeleton(Seq.empty)
+    }
+
+    test(s"$PassThrough with $typeName: long random series") {
+      val input = Array.fill[Any](10000)(makeRandomValue(columnType))
+      skeleton(input.map(_.asInstanceOf[T#InternalType]))
+    }
+
+    test(s"$PassThrough with $typeName: empty column for decompress()") {
+      skeletonForDecompress(Seq.empty)
+    }
+
+    test(s"$PassThrough with $typeName: long random series for decompress()") {
+      val input = Array.fill[Any](10000)(makeRandomValue(columnType))
+      skeletonForDecompress(input.map(_.asInstanceOf[T#InternalType]))
+    }
+
+    test(s"$PassThrough with $typeName: simple case with null for 
decompress()") {
+      val input = columnType match {
+        case BYTE => Seq(2: Byte, 1: Byte, 2: Byte, nullValue.toByte: Byte, 5: 
Byte)
+        case SHORT => Seq(2: Short, 1: Short, 2: Short, nullValue.toShort: 
Short, 5: Short)
+        case INT => Seq(2: Int, 1: Int, 2: Int, nullValue: Int, 5: Int)
+        case LONG => Seq(2: Long, 1: Long, 2: Long, nullValue: Long, 5: Long)
+        case FLOAT => Seq(2: Float, 1: Float, 2: Float, nullValue: Float, 5: 
Float)
+        case DOUBLE => Seq(2: Double, 1: Double, 2: Double, nullValue: Double, 
5: Double)
+      }
+
+      skeletonForDecompress(input.map(_.asInstanceOf[T#InternalType]))
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/64df08b6/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/RunLengthEncodingSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/RunLengthEncodingSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/RunLengthEncodingSuite.scala
index dffa9b3..eb1cdd9 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/RunLengthEncodingSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/RunLengthEncodingSuite.scala
@@ -21,19 +21,22 @@ import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
 import org.apache.spark.sql.execution.columnar._
 import org.apache.spark.sql.execution.columnar.ColumnarTestUtils._
+import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector
 import org.apache.spark.sql.types.AtomicType
 
 class RunLengthEncodingSuite extends SparkFunSuite {
+  val nullValue = -1
   testRunLengthEncoding(new NoopColumnStats, BOOLEAN)
   testRunLengthEncoding(new ByteColumnStats, BYTE)
   testRunLengthEncoding(new ShortColumnStats, SHORT)
   testRunLengthEncoding(new IntColumnStats, INT)
   testRunLengthEncoding(new LongColumnStats, LONG)
-  testRunLengthEncoding(new StringColumnStats, STRING)
+  testRunLengthEncoding(new StringColumnStats, STRING, false)
 
   def testRunLengthEncoding[T <: AtomicType](
       columnStats: ColumnStats,
-      columnType: NativeColumnType[T]) {
+      columnType: NativeColumnType[T],
+      testDecompress: Boolean = true) {
 
     val typeName = columnType.getClass.getSimpleName.stripSuffix("$")
 
@@ -95,6 +98,72 @@ class RunLengthEncodingSuite extends SparkFunSuite {
       assert(!decoder.hasNext)
     }
 
+    def skeletonForDecompress(uniqueValueCount: Int, inputRuns: Seq[(Int, 
Int)]) {
+      if (!testDecompress) return
+      val builder = TestCompressibleColumnBuilder(columnStats, columnType, 
RunLengthEncoding)
+      val (values, rows) = makeUniqueValuesAndSingleValueRows(columnType, 
uniqueValueCount)
+      val inputSeq = inputRuns.flatMap { case (index, run) =>
+        Seq.fill(run)(index)
+      }
+
+      val nullRow = new GenericInternalRow(1)
+      nullRow.setNullAt(0)
+      inputSeq.foreach { i =>
+        if (i == nullValue) {
+          builder.appendFrom(nullRow, 0)
+        } else {
+          builder.appendFrom(rows(i), 0)
+        }
+      }
+      val buffer = builder.build()
+
+      // ----------------
+      // Tests decompress
+      // ----------------
+      // Rewinds, skips column header and 4 more bytes for compression scheme 
ID
+      val headerSize = CompressionScheme.columnHeaderSize(buffer)
+      buffer.position(headerSize)
+      assertResult(RunLengthEncoding.typeId, "Wrong compression scheme 
ID")(buffer.getInt())
+
+      val decoder = RunLengthEncoding.decoder(buffer, columnType)
+      val columnVector = new OnHeapColumnVector(inputSeq.length, 
columnType.dataType)
+      decoder.decompress(columnVector, inputSeq.length)
+
+      if (inputSeq.nonEmpty) {
+        inputSeq.zipWithIndex.foreach {
+          case (expected: Any, index: Int) if expected == nullValue =>
+            assertResult(true, s"Wrong null ${index}th-position") {
+              columnVector.isNullAt(index)
+            }
+          case (i: Int, index: Int) =>
+            columnType match {
+              case BOOLEAN =>
+                assertResult(values(i), s"Wrong ${index}-th decoded boolean 
value") {
+                  columnVector.getBoolean(index)
+                }
+              case BYTE =>
+                assertResult(values(i), s"Wrong ${index}-th decoded byte 
value") {
+                  columnVector.getByte(index)
+                }
+              case SHORT =>
+                assertResult(values(i), s"Wrong ${index}-th decoded short 
value") {
+                  columnVector.getShort(index)
+                }
+              case INT =>
+                assertResult(values(i), s"Wrong ${index}-th decoded int 
value") {
+                  columnVector.getInt(index)
+                }
+              case LONG =>
+                assertResult(values(i), s"Wrong ${index}-th decoded long 
value") {
+                  columnVector.getLong(index)
+                }
+              case _ => fail("Unsupported type")
+            }
+          case _ => fail("Unsupported type")
+        }
+      }
+    }
+
     test(s"$RunLengthEncoding with $typeName: empty column") {
       skeleton(0, Seq.empty)
     }
@@ -110,5 +179,21 @@ class RunLengthEncodingSuite extends SparkFunSuite {
     test(s"$RunLengthEncoding with $typeName: single long run") {
       skeleton(1, Seq(0 -> 1000))
     }
+
+    test(s"$RunLengthEncoding with $typeName: empty column for decompress()") {
+      skeletonForDecompress(0, Seq.empty)
+    }
+
+    test(s"$RunLengthEncoding with $typeName: simple case for decompress()") {
+      skeletonForDecompress(2, Seq(0 -> 2, 1 -> 2))
+    }
+
+    test(s"$RunLengthEncoding with $typeName: single long run for 
decompress()") {
+      skeletonForDecompress(1, Seq(0 -> 1000))
+    }
+
+    test(s"$RunLengthEncoding with $typeName: single case with null for 
decompress()") {
+      skeletonForDecompress(2, Seq(0 -> 2, nullValue -> 2))
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/64df08b6/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/TestCompressibleColumnBuilder.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/TestCompressibleColumnBuilder.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/TestCompressibleColumnBuilder.scala
index 5e078f2..310cb0b 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/TestCompressibleColumnBuilder.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/TestCompressibleColumnBuilder.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.sql.execution.columnar.compression
 
 import org.apache.spark.sql.execution.columnar._
-import org.apache.spark.sql.types.AtomicType
+import org.apache.spark.sql.types.{AtomicType, DataType}
 
 class TestCompressibleColumnBuilder[T <: AtomicType](
     override val columnStats: ColumnStats,
@@ -42,3 +42,10 @@ object TestCompressibleColumnBuilder {
     builder
   }
 }
+
+object ColumnBuilderHelper {
+  def apply(
+      dataType: DataType, batchSize: Int, name: String, useCompression: 
Boolean): ColumnBuilder = {
+    ColumnBuilder(dataType, batchSize, name, useCompression)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/64df08b6/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala
index 85da827..c5c8ae3 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala
@@ -20,7 +20,10 @@ package org.apache.spark.sql.execution.vectorized
 import org.scalatest.BeforeAndAfterEach
 
 import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow
 import org.apache.spark.sql.catalyst.util.ArrayData
+import org.apache.spark.sql.execution.columnar.ColumnAccessor
+import org.apache.spark.sql.execution.columnar.compression.ColumnBuilderHelper
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
 
@@ -31,14 +34,21 @@ class ColumnVectorSuite extends SparkFunSuite with 
BeforeAndAfterEach {
     try block(vector) finally vector.close()
   }
 
+  private def withVectors(
+      size: Int,
+      dt: DataType)(
+      block: WritableColumnVector => Unit): Unit = {
+    withVector(new OnHeapColumnVector(size, dt))(block)
+    withVector(new OffHeapColumnVector(size, dt))(block)
+  }
+
   private def testVectors(
       name: String,
       size: Int,
       dt: DataType)(
       block: WritableColumnVector => Unit): Unit = {
     test(name) {
-      withVector(new OnHeapColumnVector(size, dt))(block)
-      withVector(new OffHeapColumnVector(size, dt))(block)
+      withVectors(size, dt)(block)
     }
   }
 
@@ -218,4 +228,173 @@ class ColumnVectorSuite extends SparkFunSuite with 
BeforeAndAfterEach {
       (0 until 8).foreach(i => assert(testVector.isNullAt(i) == (i % 2 == 0)))
     }
   }
+
+  test("CachedBatch boolean Apis") {
+    val dataType = BooleanType
+    val columnBuilder = ColumnBuilderHelper(dataType, 1024, "col", true)
+    val row = new SpecificInternalRow(Array(dataType))
+
+    row.setNullAt(0)
+    columnBuilder.appendFrom(row, 0)
+    for (i <- 1 until 16) {
+      row.setBoolean(0, i % 2 == 0)
+      columnBuilder.appendFrom(row, 0)
+    }
+
+    withVectors(16, dataType) { testVector =>
+      val columnAccessor = ColumnAccessor(dataType, columnBuilder.build)
+      ColumnAccessor.decompress(columnAccessor, testVector, 16)
+
+      assert(testVector.isNullAt(0) == true)
+      for (i <- 1 until 16) {
+        assert(testVector.isNullAt(i) == false)
+        assert(testVector.getBoolean(i) == (i % 2 == 0))
+      }
+    }
+  }
+
+  test("CachedBatch byte Apis") {
+    val dataType = ByteType
+    val columnBuilder = ColumnBuilderHelper(dataType, 1024, "col", true)
+    val row = new SpecificInternalRow(Array(dataType))
+
+    row.setNullAt(0)
+    columnBuilder.appendFrom(row, 0)
+    for (i <- 1 until 16) {
+      row.setByte(0, i.toByte)
+      columnBuilder.appendFrom(row, 0)
+    }
+
+    withVectors(16, dataType) { testVector =>
+      val columnAccessor = ColumnAccessor(dataType, columnBuilder.build)
+      ColumnAccessor.decompress(columnAccessor, testVector, 16)
+
+      assert(testVector.isNullAt(0) == true)
+      for (i <- 1 until 16) {
+        assert(testVector.isNullAt(i) == false)
+        assert(testVector.getByte(i) == i)
+      }
+    }
+  }
+
+  test("CachedBatch short Apis") {
+    val dataType = ShortType
+    val columnBuilder = ColumnBuilderHelper(dataType, 1024, "col", true)
+    val row = new SpecificInternalRow(Array(dataType))
+
+    row.setNullAt(0)
+    columnBuilder.appendFrom(row, 0)
+    for (i <- 1 until 16) {
+      row.setShort(0, i.toShort)
+      columnBuilder.appendFrom(row, 0)
+    }
+
+    withVectors(16, dataType) { testVector =>
+      val columnAccessor = ColumnAccessor(dataType, columnBuilder.build)
+      ColumnAccessor.decompress(columnAccessor, testVector, 16)
+
+      assert(testVector.isNullAt(0) == true)
+      for (i <- 1 until 16) {
+        assert(testVector.isNullAt(i) == false)
+        assert(testVector.getShort(i) == i)
+      }
+    }
+  }
+
+  test("CachedBatch int Apis") {
+    val dataType = IntegerType
+    val columnBuilder = ColumnBuilderHelper(dataType, 1024, "col", true)
+    val row = new SpecificInternalRow(Array(dataType))
+
+    row.setNullAt(0)
+    columnBuilder.appendFrom(row, 0)
+    for (i <- 1 until 16) {
+      row.setInt(0, i)
+      columnBuilder.appendFrom(row, 0)
+    }
+
+    withVectors(16, dataType) { testVector =>
+      val columnAccessor = ColumnAccessor(dataType, columnBuilder.build)
+      ColumnAccessor.decompress(columnAccessor, testVector, 16)
+
+      assert(testVector.isNullAt(0) == true)
+      for (i <- 1 until 16) {
+        assert(testVector.isNullAt(i) == false)
+        assert(testVector.getInt(i) == i)
+      }
+    }
+  }
+
+  test("CachedBatch long Apis") {
+    val dataType = LongType
+    val columnBuilder = ColumnBuilderHelper(dataType, 1024, "col", true)
+    val row = new SpecificInternalRow(Array(dataType))
+
+    row.setNullAt(0)
+    columnBuilder.appendFrom(row, 0)
+    for (i <- 1 until 16) {
+      row.setLong(0, i.toLong)
+      columnBuilder.appendFrom(row, 0)
+    }
+
+    withVectors(16, dataType) { testVector =>
+      val columnAccessor = ColumnAccessor(dataType, columnBuilder.build)
+      ColumnAccessor.decompress(columnAccessor, testVector, 16)
+
+      assert(testVector.isNullAt(0) == true)
+      for (i <- 1 until 16) {
+        assert(testVector.isNullAt(i) == false)
+        assert(testVector.getLong(i) == i.toLong)
+      }
+    }
+  }
+
+  test("CachedBatch float Apis") {
+    val dataType = FloatType
+    val columnBuilder = ColumnBuilderHelper(dataType, 1024, "col", true)
+    val row = new SpecificInternalRow(Array(dataType))
+
+    row.setNullAt(0)
+    columnBuilder.appendFrom(row, 0)
+    for (i <- 1 until 16) {
+      row.setFloat(0, i.toFloat)
+      columnBuilder.appendFrom(row, 0)
+    }
+
+    withVectors(16, dataType) { testVector =>
+      val columnAccessor = ColumnAccessor(dataType, columnBuilder.build)
+      ColumnAccessor.decompress(columnAccessor, testVector, 16)
+
+      assert(testVector.isNullAt(0) == true)
+      for (i <- 1 until 16) {
+        assert(testVector.isNullAt(i) == false)
+        assert(testVector.getFloat(i) == i.toFloat)
+      }
+    }
+  }
+
+  test("CachedBatch double Apis") {
+    val dataType = DoubleType
+    val columnBuilder = ColumnBuilderHelper(dataType, 1024, "col", true)
+    val row = new SpecificInternalRow(Array(dataType))
+
+    row.setNullAt(0)
+    columnBuilder.appendFrom(row, 0)
+    for (i <- 1 until 16) {
+      row.setDouble(0, i.toDouble)
+      columnBuilder.appendFrom(row, 0)
+    }
+
+    withVectors(16, dataType) { testVector =>
+      val columnAccessor = ColumnAccessor(dataType, columnBuilder.build)
+      ColumnAccessor.decompress(columnAccessor, testVector, 16)
+
+      assert(testVector.isNullAt(0) == true)
+      for (i <- 1 until 16) {
+        assert(testVector.isNullAt(i) == false)
+        assert(testVector.getDouble(i) == i.toDouble)
+      }
+    }
+  }
 }
+

http://git-wip-us.apache.org/repos/asf/spark/blob/64df08b6/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
index 983eb10..0b179aa 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
@@ -413,7 +413,7 @@ class ColumnarBatchSuite extends SparkFunSuite {
 
       reference.zipWithIndex.foreach { v =>
         assert(v._1 == column.getLong(v._2), "idx=" + v._2 +
-            " Seed = " + seed + " MemMode=" + memMode)
+          " Seed = " + seed + " MemMode=" + memMode)
         if (memMode == MemoryMode.OFF_HEAP) {
           val addr = column.valuesNativeAddress()
           assert(v._1 == Platform.getLong(null, addr + 8 * v._2))
@@ -1120,7 +1120,7 @@ class ColumnarBatchSuite extends SparkFunSuite {
       }
       batch.close()
     }
-  }}
+    }}
 
   /**
    * This test generates a random schema data, serializes it to column batches 
and verifies the


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to