This is an automated email from the ASF dual-hosted git repository.

tgraves pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new c341de8  [SPARK-27945][SQL] Minimal changes to support columnar 
processing
c341de8 is described below

commit c341de8b3e1f1d3327bd4ae3b0d2ec048f64d306
Author: Robert (Bobby) Evans <bo...@apache.org>
AuthorDate: Fri Jun 28 14:00:12 2019 -0500

    [SPARK-27945][SQL] Minimal changes to support columnar processing
    
    ## What changes were proposed in this pull request?
    
    This is the first part of 
[SPARK-27396](https://issues.apache.org/jira/browse/SPARK-27396).  This is the 
minimum set of changes necessary to support a pluggable back end for columnar 
processing.  Follow on JIRAs would cover removing some of the duplication 
between functionality in this patch and functionality currently covered by 
things like ColumnarBatchScan.
    
    ## How was this patch tested?
    
    I added in a new unit test to cover new code not really covered in other 
places.
    
    I also did manual testing by implementing two plugins/extensions that take 
advantage of the new APIs to allow for columnar processing for some simple 
queries.  One version runs on the 
[CPU](https://gist.github.com/revans2/c3cad77075c4fa5d9d271308ee2f1b1d).  The 
other version run on a GPU, but because it has unreleased dependencies I will 
not include a link to it yet.
    
    The CPU version I would expect to add in as an example with other 
documentation in a follow on JIRA
    
    This is contributed on behalf of NVIDIA Corporation.
    
    Closes #24795 from revans2/columnar-basic.
    
    Authored-by: Robert (Bobby) Evans <bo...@apache.org>
    Signed-off-by: Thomas Graves <tgra...@apache.org>
---
 .../apache/spark/sql/vectorized/ColumnVector.java  |   2 +-
 .../apache/spark/sql/vectorized/ColumnarBatch.java |  13 +-
 .../execution/vectorized/WritableColumnVector.java |   5 +-
 .../apache/spark/sql/SparkSessionExtensions.scala  |  19 +
 .../org/apache/spark/sql/execution/Columnar.scala  | 534 +++++++++++++++++++++
 .../spark/sql/execution/ColumnarBatchScan.scala    |   2 -
 .../spark/sql/execution/QueryExecution.scala       |   2 +
 .../org/apache/spark/sql/execution/SparkPlan.scala |  36 ++
 .../sql/execution/WholeStageCodegenExec.scala      |  97 +++-
 .../sql/internal/BaseSessionStateBuilder.scala     |   9 +-
 .../apache/spark/sql/internal/SessionState.scala   |   3 +-
 .../spark/sql/SparkSessionExtensionSuite.scala     | 409 +++++++++++++++-
 .../python/BatchEvalPythonExecSuite.scala          |   8 +-
 .../execution/vectorized/ColumnarBatchSuite.scala  | 210 +++++++-
 14 files changed, 1311 insertions(+), 38 deletions(-)

diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java 
b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java
index 14caaea..f18d003 100644
--- 
a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java
@@ -287,7 +287,7 @@ public abstract class ColumnVector implements AutoCloseable 
{
   /**
    * @return child [[ColumnVector]] at the given ordinal.
    */
-  protected abstract ColumnVector getChild(int ordinal);
+  public abstract ColumnVector getChild(int ordinal);
 
   /**
    * Data type for this column.
diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatch.java 
b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatch.java
index 9f917ea..a2feac8 100644
--- 
a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatch.java
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatch.java
@@ -31,7 +31,7 @@ import org.apache.spark.unsafe.types.UTF8String;
  * the entire data loading process.
  */
 @Evolving
-public final class ColumnarBatch {
+public final class ColumnarBatch implements AutoCloseable {
   private int numRows;
   private final ColumnVector[] columns;
 
@@ -42,6 +42,7 @@ public final class ColumnarBatch {
    * Called to close all the columns in this batch. It is not valid to access 
the data after
    * calling this. This must be called at the end to clean up memory 
allocations.
    */
+  @Override
   public void close() {
     for (ColumnVector c: columns) {
       c.close();
@@ -110,7 +111,17 @@ public final class ColumnarBatch {
   }
 
   public ColumnarBatch(ColumnVector[] columns) {
+    this(columns, 0);
+  }
+
+  /**
+   * Create a new batch from existing column vectors.
+   * @param columns The columns of this batch
+   * @param numRows The number of rows in this batch
+   */
+  public ColumnarBatch(ColumnVector[] columns, int numRows) {
     this.columns = columns;
+    this.numRows = numRows;
     this.row = new ColumnarBatchRow(columns);
   }
 }
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
index 4f5e72c..14fac72 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
@@ -604,7 +604,10 @@ public abstract class WritableColumnVector extends 
ColumnVector {
    */
   public final int appendStruct(boolean isNull) {
     if (isNull) {
-      appendNull();
+      // This is the same as appendNull but without the assertion for struct 
types
+      reserve(elementsAppended + 1);
+      putNull(elementsAppended);
+      elementsAppended++;
       for (WritableColumnVector c: childColumns) {
         if (c.type instanceof StructType) {
           c.appendStruct(true);
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/SparkSessionExtensions.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/SparkSessionExtensions.scala
index 66becf3..1c2bf9e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSessionExtensions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSessionExtensions.scala
@@ -27,6 +27,7 @@ import 
org.apache.spark.sql.catalyst.expressions.ExpressionInfo
 import org.apache.spark.sql.catalyst.parser.ParserInterface
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.ColumnarRule
 
 /**
  * :: Experimental ::
@@ -42,6 +43,7 @@ import org.apache.spark.sql.catalyst.rules.Rule
  * <li>Planning Strategies.</li>
  * <li>Customized Parser.</li>
  * <li>(External) Catalog listeners.</li>
+ * <li>Columnar Rules.</li>
  * </ul>
  *
  * The extensions can be used by calling `withExtensions` on the 
[[SparkSession.Builder]], for
@@ -93,6 +95,23 @@ class SparkSessionExtensions {
   type StrategyBuilder = SparkSession => Strategy
   type ParserBuilder = (SparkSession, ParserInterface) => ParserInterface
   type FunctionDescription = (FunctionIdentifier, ExpressionInfo, 
FunctionBuilder)
+  type ColumnarRuleBuilder = SparkSession => ColumnarRule
+
+  private[this] val columnarRuleBuilders = 
mutable.Buffer.empty[ColumnarRuleBuilder]
+
+  /**
+   * Build the override rules for columnar execution.
+   */
+  private[sql] def buildColumnarRules(session: SparkSession): 
Seq[ColumnarRule] = {
+    columnarRuleBuilders.map(_.apply(session))
+  }
+
+  /**
+   * Inject a rule that can override the columnar execution of an executor.
+   */
+  def injectColumnar(builder: ColumnarRuleBuilder): Unit = {
+    columnarRuleBuilders += builder
+  }
 
   private[this] val resolutionRuleBuilders = mutable.Buffer.empty[RuleBuilder]
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala
new file mode 100644
index 0000000..315eba6
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala
@@ -0,0 +1,534 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.{broadcast, TaskContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder, 
SpecializedGetters, UnsafeProjection}
+import org.apache.spark.sql.catalyst.expressions.codegen._
+import org.apache.spark.sql.catalyst.expressions.codegen.Block._
+import org.apache.spark.sql.catalyst.plans.physical.Partitioning
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
+import org.apache.spark.sql.execution.vectorized.{OffHeapColumnVector, 
OnHeapColumnVector, WritableColumnVector}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types._
+import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}
+
+/**
+ * Holds a user defined rule that can be used to inject columnar 
implementations of various
+ * operators in the plan. The [[preColumnarTransitions]] [[Rule]] can be used 
to replace
+ * [[SparkPlan]] instances with versions that support a columnar 
implementation. After this
+ * Spark will insert any transitions necessary. This includes transitions from 
row to columnar
+ * [[RowToColumnarExec]] and from columnar to row [[ColumnarToRowExec]]. At 
this point the
+ * [[postColumnarTransitions]] [[Rule]] is called to allow replacing any of 
the implementations
+ * of the transitions or doing cleanup of the plan, like inserting stages to 
build larger batches
+ * for more efficient processing, or stages that transition the data to/from 
an accelerator's
+ * memory.
+ */
+class ColumnarRule {
+  def preColumnarTransitions: Rule[SparkPlan] = plan => plan
+  def postColumnarTransitions: Rule[SparkPlan] = plan => plan
+}
+
+/**
+ * Provides a common executor to translate an [[RDD]] of [[ColumnarBatch]] 
into an [[RDD]] of
+ * [[InternalRow]]. This is inserted whenever such a transition is determined 
to be needed.
+ *
+ * The implementation is based off of similar implementations in 
[[ColumnarBatchScan]],
+ * [[org.apache.spark.sql.execution.python.ArrowEvalPythonExec]], and
+ * [[MapPartitionsInRWithArrowExec]]. Eventually this should replace those 
implementations.
+ */
+case class ColumnarToRowExec(child: SparkPlan)
+  extends UnaryExecNode with CodegenSupport {
+
+  override def output: Seq[Attribute] = child.output
+
+  override def outputPartitioning: Partitioning = child.outputPartitioning
+
+  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
+
+  override lazy val metrics: Map[String, SQLMetric] = Map(
+    "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output 
rows"),
+    "numInputBatches" -> SQLMetrics.createMetric(sparkContext, "number of 
input batches"),
+    "scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time")
+  )
+
+  override def doExecute(): RDD[InternalRow] = {
+    val numOutputRows = longMetric("numOutputRows")
+    val numInputBatches = longMetric("numInputBatches")
+    val scanTime = longMetric("scanTime")
+    // UnsafeProjection is not serializable so do it on the executor side, 
which is why it is lazy
+    @transient lazy val outputProject = UnsafeProjection.create(output, output)
+    val batches = child.executeColumnar()
+    batches.flatMap(batch => {
+      val batchStartNs = System.nanoTime()
+      numInputBatches += 1
+      // In order to match the numOutputRows metric in the generated code we 
update
+      // numOutputRows for each batch. This is less accurate than doing it at 
output
+      // because it will over count the number of rows output in the case of a 
limit,
+      // but it is more efficient.
+      numOutputRows += batch.numRows()
+      val ret = batch.rowIterator().asScala
+      scanTime += ((System.nanoTime() - batchStartNs) / (1000 * 1000))
+      ret.map(outputProject)
+    })
+  }
+
+  /**
+   * Generate [[ColumnVector]] expressions for our parent to consume as rows.
+   * This is called once per [[ColumnVector]] in the batch.
+   *
+   * This code came unchanged from [[ColumnarBatchScan]] and will hopefully 
replace it
+   * at some point.
+   */
+  private def genCodeColumnVector(
+      ctx: CodegenContext,
+      columnVar: String,
+      ordinal: String,
+      dataType: DataType,
+      nullable: Boolean): ExprCode = {
+    val javaType = CodeGenerator.javaType(dataType)
+    val value = CodeGenerator.getValueFromVector(columnVar, dataType, ordinal)
+    val isNullVar = if (nullable) {
+      JavaCode.isNullVariable(ctx.freshName("isNull"))
+    } else {
+      FalseLiteral
+    }
+    val valueVar = ctx.freshName("value")
+    val str = s"columnVector[$columnVar, $ordinal, ${dataType.simpleString}]"
+    val code = code"${ctx.registerComment(str)}" + (if (nullable) {
+      code"""
+        boolean $isNullVar = $columnVar.isNullAt($ordinal);
+        $javaType $valueVar = $isNullVar ? 
${CodeGenerator.defaultValue(dataType)} : ($value);
+      """
+    } else {
+      code"$javaType $valueVar = $value;"
+    })
+    ExprCode(code, isNullVar, JavaCode.variable(valueVar, dataType))
+  }
+
+  /**
+   * Produce code to process the input iterator as [[ColumnarBatch]]es.
+   * This produces an [[org.apache.spark.sql.catalyst.expressions.UnsafeRow]] 
for each row in
+   * each batch.
+   *
+   * This code came almost completely unchanged from [[ColumnarBatchScan]] and 
will
+   * hopefully replace it at some point.
+   */
+  override protected def doProduce(ctx: CodegenContext): String = {
+    // PhysicalRDD always just has one input
+    val input = ctx.addMutableState("scala.collection.Iterator", "input",
+      v => s"$v = inputs[0];")
+
+    // metrics
+    val numOutputRows = metricTerm(ctx, "numOutputRows")
+    val numInputBatches = metricTerm(ctx, "numInputBatches")
+    val scanTimeMetric = metricTerm(ctx, "scanTime")
+    val scanTimeTotalNs =
+      ctx.addMutableState(CodeGenerator.JAVA_LONG, "scanTime") // init as 
scanTime = 0
+
+    val columnarBatchClz = classOf[ColumnarBatch].getName
+    val batch = ctx.addMutableState(columnarBatchClz, "batch")
+
+    val idx = ctx.addMutableState(CodeGenerator.JAVA_INT, "batchIdx") // init 
as batchIdx = 0
+    val columnVectorClzs = child.vectorTypes.getOrElse(
+      Seq.fill(output.indices.size)(classOf[ColumnVector].getName))
+    val (colVars, columnAssigns) = columnVectorClzs.zipWithIndex.map {
+      case (columnVectorClz, i) =>
+        val name = ctx.addMutableState(columnVectorClz, s"colInstance$i")
+        (name, s"$name = ($columnVectorClz) $batch.column($i);")
+    }.unzip
+
+    val nextBatch = ctx.freshName("nextBatch")
+    val nextBatchFuncName = ctx.addNewFunction(nextBatch,
+      s"""
+         |private void $nextBatch() throws java.io.IOException {
+         |  long getBatchStart = System.nanoTime();
+         |  if ($input.hasNext()) {
+         |    $batch = ($columnarBatchClz)$input.next();
+         |    $numOutputRows.add($batch.numRows());
+         |    $idx = 0;
+         |    ${columnAssigns.mkString("", "\n", "\n")}
+         |    ${numInputBatches}.add(1);
+         |  }
+         |  $scanTimeTotalNs += System.nanoTime() - getBatchStart;
+         |}""".stripMargin)
+
+    ctx.currentVars = null
+    val rowidx = ctx.freshName("rowIdx")
+    val columnsBatchInput = (output zip colVars).map { case (attr, colVar) =>
+      genCodeColumnVector(ctx, colVar, rowidx, attr.dataType, attr.nullable)
+    }
+    val localIdx = ctx.freshName("localIdx")
+    val localEnd = ctx.freshName("localEnd")
+    val numRows = ctx.freshName("numRows")
+    val shouldStop = if (parent.needStopCheck) {
+      s"if (shouldStop()) { $idx = $rowidx + 1; return; }"
+    } else {
+      "// shouldStop check is eliminated"
+    }
+    s"""
+       |if ($batch == null) {
+       |  $nextBatchFuncName();
+       |}
+       |while ($batch != null) {
+       |  int $numRows = $batch.numRows();
+       |  int $localEnd = $numRows - $idx;
+       |  for (int $localIdx = 0; $localIdx < $localEnd; $localIdx++) {
+       |    int $rowidx = $idx + $localIdx;
+       |    ${consume(ctx, columnsBatchInput).trim}
+       |    $shouldStop
+       |  }
+       |  $idx = $numRows;
+       |  $batch = null;
+       |  $nextBatchFuncName();
+       |}
+       |$scanTimeMetric.add($scanTimeTotalNs / (1000 * 1000));
+       |$scanTimeTotalNs = 0;
+     """.stripMargin
+  }
+
+  override def inputRDDs(): Seq[RDD[InternalRow]] = {
+    child.asInstanceOf[CodegenSupport].inputRDDs()
+  }
+}
+
+/**
+ * Provides an optimized set of APIs to append row based data to an array of
+ * [[WritableColumnVector]].
+ */
+private[execution] class RowToColumnConverter(schema: StructType) extends 
Serializable {
+  private val converters = schema.fields.map {
+    f => RowToColumnConverter.getConverterForType(f.dataType, f.nullable)
+  }
+
+  final def convert(row: InternalRow, vectors: Array[WritableColumnVector]): 
Unit = {
+    var idx = 0
+    while (idx < row.numFields) {
+      converters(idx).append(row, idx, vectors(idx))
+      idx += 1
+    }
+  }
+}
+
+/**
+ * Provides an optimized set of APIs to extract a column from a row and append 
it to a
+ * [[WritableColumnVector]].
+ */
+private object RowToColumnConverter {
+  private abstract class TypeConverter extends Serializable {
+    def append(row: SpecializedGetters, column: Int, cv: 
WritableColumnVector): Unit
+  }
+
+  private final case class BasicNullableTypeConverter(base: TypeConverter) 
extends TypeConverter {
+    override def append(row: SpecializedGetters, column: Int, cv: 
WritableColumnVector): Unit = {
+      if (row.isNullAt(column)) {
+        cv.appendNull
+      } else {
+        base.append(row, column, cv)
+      }
+    }
+  }
+
+  private final case class StructNullableTypeConverter(base: TypeConverter) 
extends TypeConverter {
+    override def append(row: SpecializedGetters, column: Int, cv: 
WritableColumnVector): Unit = {
+      if (row.isNullAt(column)) {
+        cv.appendStruct(true)
+      } else {
+        base.append(row, column, cv)
+      }
+    }
+  }
+
+  private def getConverterForType(dataType: DataType, nullable: Boolean): 
TypeConverter = {
+    val core = dataType match {
+      case BooleanType => BooleanConverter
+      case ByteType => ByteConverter
+      case ShortType => ShortConverter
+      case IntegerType | DateType => IntConverter
+      case FloatType => FloatConverter
+      case LongType | TimestampType => LongConverter
+      case DoubleType => DoubleConverter
+      case StringType => StringConverter
+      case CalendarIntervalType => CalendarConverter
+      case at: ArrayType => new 
ArrayConverter(getConverterForType(at.elementType, nullable))
+      case st: StructType => new StructConverter(st.fields.map(
+        (f) => getConverterForType(f.dataType, f.nullable)))
+      case dt: DecimalType => new DecimalConverter(dt)
+      case mt: MapType => new MapConverter(getConverterForType(mt.keyType, 
nullable),
+        getConverterForType(mt.valueType, nullable))
+      case unknown => throw new UnsupportedOperationException(
+        s"Type $unknown not supported")
+    }
+
+    if (nullable) {
+      dataType match {
+        case CalendarIntervalType => new StructNullableTypeConverter(core)
+        case st: StructType => new StructNullableTypeConverter(core)
+        case _ => new BasicNullableTypeConverter(core)
+      }
+    } else {
+      core
+    }
+  }
+
+  private object BooleanConverter extends TypeConverter {
+    override def append(row: SpecializedGetters, column: Int, cv: 
WritableColumnVector): Unit =
+      cv.appendBoolean(row.getBoolean(column))
+  }
+
+  private object ByteConverter extends TypeConverter {
+    override def append(row: SpecializedGetters, column: Int, cv: 
WritableColumnVector): Unit =
+      cv.appendByte(row.getByte(column))
+  }
+
+  private object ShortConverter extends TypeConverter {
+    override def append(row: SpecializedGetters, column: Int, cv: 
WritableColumnVector): Unit =
+      cv.appendShort(row.getShort(column))
+  }
+
+  private object IntConverter extends TypeConverter {
+    override def append(row: SpecializedGetters, column: Int, cv: 
WritableColumnVector): Unit =
+      cv.appendInt(row.getInt(column))
+  }
+
+  private object FloatConverter extends TypeConverter {
+    override def append(row: SpecializedGetters, column: Int, cv: 
WritableColumnVector): Unit =
+      cv.appendFloat(row.getFloat(column))
+  }
+
+  private object LongConverter extends TypeConverter {
+    override def append(row: SpecializedGetters, column: Int, cv: 
WritableColumnVector): Unit =
+      cv.appendLong(row.getLong(column))
+  }
+
+  private object DoubleConverter extends TypeConverter {
+    override def append(row: SpecializedGetters, column: Int, cv: 
WritableColumnVector): Unit =
+      cv.appendDouble(row.getDouble(column))
+  }
+
+  private object StringConverter extends TypeConverter {
+    override def append(row: SpecializedGetters, column: Int, cv: 
WritableColumnVector): Unit = {
+      val data = row.getUTF8String(column).getBytes
+      cv.appendByteArray(data, 0, data.length)
+    }
+  }
+
+  private object CalendarConverter extends TypeConverter {
+    override def append(row: SpecializedGetters, column: Int, cv: 
WritableColumnVector): Unit = {
+      val c = row.getInterval(column)
+      cv.appendStruct(false)
+      cv.getChild(0).appendInt(c.months)
+      cv.getChild(1).appendLong(c.microseconds)
+    }
+  }
+
+  private case class ArrayConverter(childConverter: TypeConverter) extends 
TypeConverter {
+    override def append(row: SpecializedGetters, column: Int, cv: 
WritableColumnVector): Unit = {
+      val values = row.getArray(column)
+      val numElements = values.numElements()
+      cv.appendArray(numElements)
+      val arrData = cv.arrayData()
+      for (i <- 0 until numElements) {
+        childConverter.append(values, i, arrData)
+      }
+    }
+  }
+
+  private case class StructConverter(childConverters: Array[TypeConverter]) 
extends TypeConverter {
+    override def append(row: SpecializedGetters, column: Int, cv: 
WritableColumnVector): Unit = {
+      cv.appendStruct(false)
+      val data = row.getStruct(column, childConverters.length)
+      for (i <- 0 until childConverters.length) {
+        childConverters(i).append(data, i, cv.getChild(i))
+      }
+    }
+  }
+
+  private case class DecimalConverter(dt: DecimalType) extends TypeConverter {
+    override def append(row: SpecializedGetters, column: Int, cv: 
WritableColumnVector): Unit = {
+      val d = row.getDecimal(column, dt.precision, dt.scale)
+      if (dt.precision <= Decimal.MAX_INT_DIGITS) {
+        cv.appendInt(d.toUnscaledLong.toInt)
+      } else if (dt.precision <= Decimal.MAX_LONG_DIGITS) {
+        cv.appendLong(d.toUnscaledLong)
+      } else {
+        val integer = d.toJavaBigDecimal.unscaledValue
+        val bytes = integer.toByteArray
+        cv.appendByteArray(bytes, 0, bytes.length)
+      }
+    }
+  }
+
+  private case class MapConverter(keyConverter: TypeConverter, valueConverter: 
TypeConverter)
+    extends TypeConverter {
+    override def append(row: SpecializedGetters, column: Int, cv: 
WritableColumnVector): Unit = {
+      val m = row.getMap(column)
+      val keys = cv.getChild(0)
+      val values = cv.getChild(1)
+      val numElements = m.numElements()
+      cv.appendArray(numElements)
+
+      val srcKeys = m.keyArray()
+      val srcValues = m.valueArray()
+
+      for (i <- 0 until numElements) {
+        keyConverter.append(srcKeys, i, keys)
+        valueConverter.append(srcValues, i, values)
+      }
+    }
+  }
+}
+
+/**
+ * Provides a common executor to translate an [[RDD]] of [[InternalRow]] into 
an [[RDD]] of
+ * [[ColumnarBatch]]. This is inserted whenever such a transition is 
determined to be needed.
+ *
+ * This is similar to some of the code in ArrowConverters.scala and
+ * [[org.apache.spark.sql.execution.arrow.ArrowWriter]]. That code is more 
specialized
+ * to convert [[InternalRow]] to Arrow formatted data, but in the future if we 
make
+ * [[OffHeapColumnVector]] internally Arrow formatted we may be able to 
replace much of that code.
+ *
+ * This is also similar to
+ * [[org.apache.spark.sql.execution.vectorized.ColumnVectorUtils.populate()]] 
and
+ * [[org.apache.spark.sql.execution.vectorized.ColumnVectorUtils.toBatch()]] 
toBatch is only ever
+ * called from tests and can probably be removed, but populate is used by both 
Orc and Parquet
+ * to initialize partition and missing columns. There is some chance that we 
could replace
+ * populate with [[RowToColumnConverter]], but the performance requirements 
are different and it
+ * would only be to reduce code.
+ */
+case class RowToColumnarExec(child: SparkPlan) extends UnaryExecNode {
+  override def output: Seq[Attribute] = child.output
+
+  override def outputPartitioning: Partitioning = child.outputPartitioning
+
+  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
+
+  override def doExecute(): RDD[InternalRow] = {
+    child.execute()
+  }
+
+  override def doExecuteBroadcast[T](): broadcast.Broadcast[T] = {
+    child.doExecuteBroadcast()
+  }
+
+  override def supportsColumnar: Boolean = true
+
+  override lazy val metrics: Map[String, SQLMetric] = Map(
+    "numInputRows" -> SQLMetrics.createMetric(sparkContext, "number of input 
rows"),
+    "numOutputBatches" -> SQLMetrics.createMetric(sparkContext, "number of 
output batches")
+  )
+
+  override def doExecuteColumnar(): RDD[ColumnarBatch] = {
+    val enableOffHeapColumnVector = sqlContext.conf.offHeapColumnVectorEnabled
+    val numInputRows = longMetric("numInputRows")
+    val numOutputBatches = longMetric("numOutputBatches")
+    // Instead of creating a new config we are reusing columnBatchSize. In the 
future if we do
+    // combine with some of the Arrow conversion tools we will need to unify 
some of the configs.
+    val numRows = conf.columnBatchSize
+    val converters = new RowToColumnConverter(schema)
+    val rowBased = child.execute()
+    rowBased.mapPartitions(rowIterator => {
+      new Iterator[ColumnarBatch] {
+        var cb: ColumnarBatch = null
+
+        TaskContext.get().addTaskCompletionListener[Unit] { _ =>
+          if (cb != null) {
+            cb.close()
+            cb = null
+          }
+        }
+
+        override def hasNext: Boolean = {
+          rowIterator.hasNext
+        }
+
+        override def next(): ColumnarBatch = {
+          if (cb != null) {
+            cb.close()
+            cb = null
+          }
+          val columnVectors : Array[WritableColumnVector] =
+            if (enableOffHeapColumnVector) {
+              OffHeapColumnVector.allocateColumns(numRows, schema).toArray
+            } else {
+              OnHeapColumnVector.allocateColumns(numRows, schema).toArray
+            }
+          var rowCount = 0
+          while (rowCount < numRows && rowIterator.hasNext) {
+            val row = rowIterator.next()
+            converters.convert(row, columnVectors)
+            rowCount += 1
+          }
+          cb = new ColumnarBatch(columnVectors.toArray, rowCount)
+          numInputRows += rowCount
+          numOutputBatches += 1
+          cb
+        }
+      }
+    })
+  }
+}
+
+/**
+ * Apply any user defined [[ColumnarRule]]s and find the correct place to 
insert transitions
+ * to/from columnar formatted data.
+ */
+case class ApplyColumnarRulesAndInsertTransitions(conf: SQLConf, 
columnarRules: Seq[ColumnarRule])
+  extends Rule[SparkPlan] {
+
+  /**
+   * Inserts an transition to columnar formatted data.
+   */
+  private def insertRowToColumnar(plan: SparkPlan): SparkPlan = {
+    if (!plan.supportsColumnar) {
+      // The tree feels kind of backwards
+      // Columnar Processing will start here, so transition from row to 
columnar
+      RowToColumnarExec(insertTransitions(plan))
+    } else {
+      plan.withNewChildren(plan.children.map(insertRowToColumnar))
+    }
+  }
+
+  /**
+   * Inserts RowToColumnarExecs and ColumnarToRowExecs where needed.
+   */
+  private def insertTransitions(plan: SparkPlan): SparkPlan = {
+    if (plan.supportsColumnar) {
+      // The tree feels kind of backwards
+      // This is the end of the columnar processing so go back to rows
+      ColumnarToRowExec(insertRowToColumnar(plan))
+    } else {
+      plan.withNewChildren(plan.children.map(insertTransitions))
+    }
+  }
+
+  def apply(plan: SparkPlan): SparkPlan = {
+    var preInsertPlan: SparkPlan = plan
+    columnarRules.foreach((r : ColumnarRule) =>
+      preInsertPlan = r.preColumnarTransitions(preInsertPlan))
+    var postInsertPlan = insertTransitions(preInsertPlan)
+    columnarRules.reverse.foreach((r : ColumnarRule) =>
+      postInsertPlan = r.postColumnarTransitions(postInsertPlan))
+    postInsertPlan
+  }
+}
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala
index 7caff69..b2e9f76 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala
@@ -30,8 +30,6 @@ import org.apache.spark.sql.vectorized.{ColumnarBatch, 
ColumnVector}
  */
 private[sql] trait ColumnarBatchScan extends CodegenSupport {
 
-  def vectorTypes: Option[Seq[String]] = None
-
   protected def supportsBatch: Boolean = true
 
   override lazy val metrics = Map(
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
index 6f0b489..9fcffac 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
@@ -119,6 +119,8 @@ class QueryExecution(
     InsertAdaptiveSparkPlan(sparkSession),
     PlanSubqueries(sparkSession),
     EnsureRequirements(sparkSession.sessionState.conf),
+    ApplyColumnarRulesAndInsertTransitions(sparkSession.sessionState.conf,
+      sparkSession.sessionState.columnarRules),
     CollapseCodegenStages(sparkSession.sessionState.conf),
     ReuseExchange(sparkSession.sessionState.conf),
     ReuseSubquery(sparkSession.sessionState.conf))
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
index fbe8e50..6deb90c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
@@ -38,6 +38,7 @@ import org.apache.spark.sql.catalyst.plans.physical._
 import org.apache.spark.sql.catalyst.trees.TreeNodeTag
 import org.apache.spark.sql.execution.metric.SQLMetric
 import org.apache.spark.sql.types.DataType
+import org.apache.spark.sql.vectorized.ColumnarBatch
 
 object SparkPlan {
   /** The original [[LogicalPlan]] from which this [[SparkPlan]] is converted. 
*/
@@ -73,6 +74,17 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with 
Logging with Serializ
   // whether we should fallback when hitting compilation errors caused by 
codegen
   private val codeGenFallBack = (sqlContext == null) || 
sqlContext.conf.codegenFallback
 
+  /**
+   * Return true if this stage of the plan supports columnar execution.
+   */
+  def supportsColumnar: Boolean = false
+
+  /**
+   * The exact java types of the columns that are output in columnar 
processing mode. This
+   * is a performance optimization for code generation and is optional.
+   */
+  def vectorTypes: Option[Seq[String]] = None
+
   /** Overridden make copy also propagates sqlContext to copied plan. */
   override def makeCopy(newArgs: Array[AnyRef]): SparkPlan = {
     if (sqlContext != null) {
@@ -182,6 +194,20 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with 
Logging with Serializ
   }
 
   /**
+   * Returns the result of this query as an RDD[ColumnarBatch] by delegating 
to `doColumnarExecute`
+   * after preparations.
+   *
+   * Concrete implementations of SparkPlan should override `doColumnarExecute` 
if `supportsColumnar`
+   * returns true.
+   */
+  final def executeColumnar(): RDD[ColumnarBatch] = executeQuery {
+    if (isCanonicalizedPlan) {
+      throw new IllegalStateException("A canonicalized plan is not supposed to 
be executed.")
+    }
+    doExecuteColumnar()
+  }
+
+  /**
    * Executes a query after preparing the query and adding query plan 
information to created RDDs
    * for visualization.
    */
@@ -273,6 +299,16 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with 
Logging with Serializ
   }
 
   /**
+   * Produces the result of the query as an `RDD[ColumnarBatch]` if 
[[supportsColumnar]] returns
+   * true. By convention the executor that creates a ColumnarBatch is 
responsible for closing it
+   * when it is no longer needed. This allows input formats to be able to 
reuse batches if needed.
+   */
+  protected def doExecuteColumnar(): RDD[ColumnarBatch] = {
+    throw new IllegalStateException(s"Internal Error ${this.getClass} has 
column support" +
+      s" mismatch:\n${this}")
+  }
+
+  /**
    * Packing the UnsafeRows into byte array for faster serialization.
    * The byte arrays are in the following format:
    * [size] [bytes of UnsafeRow] [size] [bytes of UnsafeRow] ... [-1]
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
index 92e80dc..94a5ede 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
@@ -38,6 +38,7 @@ import 
org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, SortMergeJoi
 import org.apache.spark.sql.execution.metric.SQLMetrics
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
+import org.apache.spark.sql.vectorized.ColumnarBatch
 import org.apache.spark.util.Utils
 
 /**
@@ -490,8 +491,12 @@ trait InputRDDCodegen extends CodegenSupport {
  *
  * This is the leaf node of a tree with WholeStageCodegen that is used to 
generate code
  * that consumes an RDD iterator of InternalRow.
+ *
+ * @param isChildColumnar true if the inputRDD is really columnar data hidden 
by type erasure,
+ *                        false if inputRDD is really an RDD[InternalRow]
  */
-case class InputAdapter(child: SparkPlan) extends UnaryExecNode with 
InputRDDCodegen {
+case class InputAdapter(child: SparkPlan, isChildColumnar: Boolean)
+    extends UnaryExecNode with InputRDDCodegen {
 
   override def output: Seq[Attribute] = child.output
 
@@ -499,6 +504,12 @@ case class InputAdapter(child: SparkPlan) extends 
UnaryExecNode with InputRDDCod
 
   override def outputOrdering: Seq[SortOrder] = child.outputOrdering
 
+  override def vectorTypes: Option[Seq[String]] = child.vectorTypes
+
+  // This is not strictly needed because the codegen transformation happens 
after the columnar
+  // transformation but just for consistency
+  override def supportsColumnar: Boolean = child.supportsColumnar
+
   override def doExecute(): RDD[InternalRow] = {
     child.execute()
   }
@@ -507,7 +518,17 @@ case class InputAdapter(child: SparkPlan) extends 
UnaryExecNode with InputRDDCod
     child.doExecuteBroadcast()
   }
 
-  override def inputRDD: RDD[InternalRow] = child.execute()
+  override def doExecuteColumnar(): RDD[ColumnarBatch] = {
+    child.executeColumnar()
+  }
+
+  override def inputRDD: RDD[InternalRow] = {
+    if (isChildColumnar) {
+      child.executeColumnar().asInstanceOf[RDD[InternalRow]] // Hack because 
of type erasure
+    } else {
+      child.execute()
+    }
+  }
 
   // This is a leaf node so the node can produce limit not reached checks.
   override protected def canCheckLimitNotReached: Boolean = true
@@ -589,6 +610,10 @@ case class WholeStageCodegenExec(child: SparkPlan)(val 
codegenStageId: Int)
 
   override def outputOrdering: Seq[SortOrder] = child.outputOrdering
 
+  // This is not strictly needed because the codegen transformation happens 
after the columnar
+  // transformation but just for consistency
+  override def supportsColumnar: Boolean = child.supportsColumnar
+
   override lazy val metrics = Map(
     "pipelineTime" -> SQLMetrics.createTimingMetric(sparkContext,
       WholeStageCodegenExec.PIPELINE_DURATION_METRIC))
@@ -659,6 +684,12 @@ case class WholeStageCodegenExec(child: SparkPlan)(val 
codegenStageId: Int)
     (ctx, cleanedSource)
   }
 
+  override def doExecuteColumnar(): RDD[ColumnarBatch] = {
+    // Code generation is not currently supported for columnar output, so just 
fall back to
+    // the interpreted path
+    child.executeColumnar()
+  }
+
   override def doExecute(): RDD[InternalRow] = {
     val (ctx, cleanedSource) = doCodeGen()
     // try to compile and fallback if it failed
@@ -689,6 +720,9 @@ case class WholeStageCodegenExec(child: SparkPlan)(val 
codegenStageId: Int)
 
     val durationMs = longMetric("pipelineTime")
 
+    // Even though rdds is an RDD[InternalRow] it may actually be an 
RDD[ColumnarBatch] with
+    // type erasure hiding that. This allows for the input to a code gen stage 
to be columnar,
+    // but the output must be rows.
     val rdds = child.asInstanceOf[CodegenSupport].inputRDDs()
     assert(rdds.size <= 2, "Up to two input RDDs can be supported")
     if (rdds.length == 1) {
@@ -840,34 +874,55 @@ case class CollapseCodegenStages(
   /**
    * Inserts an InputAdapter on top of those that do not support codegen.
    */
-  private def insertInputAdapter(plan: SparkPlan): SparkPlan = plan match {
-    case p if !supportCodegen(p) =>
-      // collapse them recursively
-      InputAdapter(insertWholeStageCodegen(p))
-    case j: SortMergeJoinExec =>
-      // The children of SortMergeJoin should do codegen separately.
-      j.withNewChildren(j.children.map(child => 
InputAdapter(insertWholeStageCodegen(child))))
-    case p =>
-      p.withNewChildren(p.children.map(insertInputAdapter))
+  private def insertInputAdapter(plan: SparkPlan, isColumnarInput: Boolean): 
SparkPlan = {
+    val isColumnar = adjustColumnar(plan, isColumnarInput)
+    plan match {
+      case p if !supportCodegen(p) =>
+        // collapse them recursively
+        InputAdapter(insertWholeStageCodegen(p, isColumnar), isColumnar)
+      case j: SortMergeJoinExec =>
+        // The children of SortMergeJoin should do codegen separately.
+        j.withNewChildren(j.children.map(
+          child => InputAdapter(insertWholeStageCodegen(child, isColumnar), 
isColumnar)))
+      case p =>
+        p.withNewChildren(p.children.map(insertInputAdapter(_, isColumnar)))
+    }
   }
 
   /**
    * Inserts a WholeStageCodegen on top of those that support codegen.
    */
-  private def insertWholeStageCodegen(plan: SparkPlan): SparkPlan = plan match 
{
-    // For operators that will output domain object, do not insert 
WholeStageCodegen for it as
-    // domain object can not be written into unsafe row.
-    case plan if plan.output.length == 1 && 
plan.output.head.dataType.isInstanceOf[ObjectType] =>
-      plan.withNewChildren(plan.children.map(insertWholeStageCodegen))
-    case plan: CodegenSupport if supportCodegen(plan) =>
-      
WholeStageCodegenExec(insertInputAdapter(plan))(codegenStageCounter.incrementAndGet())
-    case other =>
-      other.withNewChildren(other.children.map(insertWholeStageCodegen))
+  private def insertWholeStageCodegen(plan: SparkPlan, isColumnarInput: 
Boolean): SparkPlan = {
+    val isColumnar = adjustColumnar(plan, isColumnarInput)
+    plan match {
+      // For operators that will output domain object, do not insert 
WholeStageCodegen for it as
+      // domain object can not be written into unsafe row.
+      case plan if plan.output.length == 1 && 
plan.output.head.dataType.isInstanceOf[ObjectType] =>
+        plan.withNewChildren(plan.children.map(insertWholeStageCodegen(_, 
isColumnar)))
+      case plan: CodegenSupport if supportCodegen(plan) =>
+        WholeStageCodegenExec(
+          insertInputAdapter(plan, 
isColumnar))(codegenStageCounter.incrementAndGet())
+      case other =>
+        other.withNewChildren(other.children.map(insertWholeStageCodegen(_, 
isColumnar)))
+    }
+  }
+
+  /**
+   * Depending on the stage in the plan and if we currently are columnar or not
+   * return if we are still columnar or not.
+   */
+  private def adjustColumnar(plan: SparkPlan, isColumnar: Boolean): Boolean =
+    // We are walking up the plan, so columnar starts when we transition to 
rows
+    // and ends when we transition to columns
+  plan match {
+    case c2r: ColumnarToRowExec => true
+    case r2c: RowToColumnarExec => false
+    case _ => isColumnar
   }
 
   def apply(plan: SparkPlan): SparkPlan = {
     if (conf.wholeStageEnabled) {
-      insertWholeStageCodegen(plan)
+      insertWholeStageCodegen(plan, false)
     } else {
       plan
     }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala
index 8d73449..8dc30ea 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.optimizer.Optimizer
 import org.apache.spark.sql.catalyst.parser.ParserInterface
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.execution.{QueryExecution, SparkOptimizer, 
SparkPlanner, SparkSqlParser}
+import org.apache.spark.sql.execution.{ColumnarRule, QueryExecution, 
SparkOptimizer, SparkPlanner, SparkSqlParser}
 import org.apache.spark.sql.execution.datasources._
 import 
org.apache.spark.sql.execution.datasources.v2.{V2StreamingScanSupportCheck, 
V2WriteSupportCheck}
 import org.apache.spark.sql.streaming.StreamingQueryManager
@@ -264,6 +264,10 @@ abstract class BaseSessionStateBuilder(
     extensions.buildPlannerStrategies(session)
   }
 
+  protected def columnarRules: Seq[ColumnarRule] = {
+    extensions.buildColumnarRules(session)
+  }
+
   /**
    * Create a query execution object.
    */
@@ -314,7 +318,8 @@ abstract class BaseSessionStateBuilder(
       listenerManager,
       () => resourceLoader,
       createQueryExecution,
-      createClone)
+      createClone,
+      columnarRules)
   }
 }
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
index b34db58..b962ab6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
@@ -71,7 +71,8 @@ private[sql] class SessionState(
     val listenerManager: ExecutionListenerManager,
     resourceLoaderBuilder: () => SessionResourceLoader,
     createQueryExecution: LogicalPlan => QueryExecution,
-    createClone: (SparkSession, SessionState) => SessionState) {
+    createClone: (SparkSession, SessionState) => SessionState,
+    val columnarRules: Seq[ColumnarRule]) {
 
   // The following fields are lazy to avoid creating the Hive client when 
creating SessionState.
   lazy val catalog: SessionCatalog = catalogBuilder()
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala
index 8812684..2e2e61b 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala
@@ -16,14 +16,19 @@
  */
 package org.apache.spark.sql
 
-import org.apache.spark.SparkFunSuite
+import org.apache.spark.{SparkFunSuite, TaskContext}
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
-import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionInfo, 
Literal}
+import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, 
ParserInterface}
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.execution.{SparkPlan, SparkStrategy}
-import org.apache.spark.sql.types.{DataType, IntegerType, StructType}
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector
+import org.apache.spark.sql.types.{DataType, Decimal, IntegerType, LongType, 
Metadata, StructType}
+import org.apache.spark.sql.vectorized.{ColumnarArray, ColumnarBatch, 
ColumnarMap, ColumnVector}
+import org.apache.spark.unsafe.types.UTF8String
 
 /**
  * Test cases for the [[SparkSessionExtensions]].
@@ -116,6 +121,34 @@ class SparkSessionExtensionSuite extends SparkFunSuite {
     }
   }
 
+  test("inject columnar") {
+    val extensions = create { extensions =>
+      extensions.injectColumnar(session =>
+        MyColumarRule(PreRuleReplaceAddWithBrokenVersion(), MyPostRule()))
+    }
+    withSession(extensions) { session =>
+      assert(session.sessionState.columnarRules.contains(
+        MyColumarRule(PreRuleReplaceAddWithBrokenVersion(), MyPostRule())))
+      import session.sqlContext.implicits._
+      // repartitioning avoids having the add operation pushed up into the 
LocalTableScan
+      val data = Seq((100L), (200L), (300L)).toDF("vals").repartition(1)
+      val df = data.selectExpr("vals + 1")
+      // Verify that both pre and post processing of the plan worked.
+      val found = df.queryExecution.executedPlan.collect {
+        case rep: ReplacedRowToColumnarExec => 1
+        case proj: ColumnarProjectExec => 10
+        case c2r: ColumnarToRowExec => 100
+      }.sum
+      assert(found == 111)
+
+      // Verify that we get back the expected, wrong, result
+      val result = df.collect()
+      assert(result(0).getLong(0) == 102L) // Check that broken columnar Add 
was used.
+      assert(result(1).getLong(0) == 202L)
+      assert(result(2).getLong(0) == 302L)
+    }
+  }
+
   test("use custom class for extensions") {
     val session = SparkSession.builder()
       .master("local[1]")
@@ -130,6 +163,8 @@ class SparkSessionExtensionSuite extends SparkFunSuite {
       assert(session.sessionState.sqlParser.isInstanceOf[MyParser])
       assert(session.sessionState.functionRegistry
         .lookupFunction(MyExtensions.myFunction._1).isDefined)
+      assert(session.sessionState.columnarRules.contains(
+        MyColumarRule(PreRuleReplaceAddWithBrokenVersion(), MyPostRule())))
     } finally {
       stop(session)
     }
@@ -251,6 +286,371 @@ object MyExtensions {
     (_: Seq[Expression]) => Literal(5, IntegerType))
 }
 
+case class CloseableColumnBatchIterator(itr: Iterator[ColumnarBatch],
+    f: ColumnarBatch => ColumnarBatch) extends Iterator[ColumnarBatch] {
+  var cb: ColumnarBatch = null
+
+  private def closeCurrentBatch(): Unit = {
+    if (cb != null) {
+      cb.close
+      cb = null
+    }
+  }
+
+  TaskContext.get().addTaskCompletionListener[Unit]((tc: TaskContext) => {
+    closeCurrentBatch()
+  })
+
+  override def hasNext: Boolean = {
+    closeCurrentBatch()
+    itr.hasNext
+  }
+
+  override def next(): ColumnarBatch = {
+    closeCurrentBatch()
+    cb = f(itr.next())
+    cb
+  }
+}
+
+object NoCloseColumnVector extends Logging {
+  def wrapIfNeeded(cv: ColumnVector): NoCloseColumnVector = cv match {
+    case ref: NoCloseColumnVector =>
+      ref
+    case vec => NoCloseColumnVector(vec)
+  }
+}
+
+/**
+ * Provide a ColumnVector so ColumnarExpression can close temporary values 
without
+ * having to guess what type it really is.
+ */
+case class NoCloseColumnVector(wrapped: ColumnVector) extends 
ColumnVector(wrapped.dataType) {
+  private var refCount = 1
+
+  /**
+   * Don't actually close the ColumnVector this wraps.  The producer of the 
vector will take
+   * care of that.
+   */
+  override def close(): Unit = {
+    // Empty
+  }
+
+  override def hasNull: Boolean = wrapped.hasNull
+
+  override def numNulls(): Int = wrapped.numNulls
+
+  override def isNullAt(rowId: Int): Boolean = wrapped.isNullAt(rowId)
+
+  override def getBoolean(rowId: Int): Boolean = wrapped.getBoolean(rowId)
+
+  override def getByte(rowId: Int): Byte = wrapped.getByte(rowId)
+
+  override def getShort(rowId: Int): Short = wrapped.getShort(rowId)
+
+  override def getInt(rowId: Int): Int = wrapped.getInt(rowId)
+
+  override def getLong(rowId: Int): Long = wrapped.getLong(rowId)
+
+  override def getFloat(rowId: Int): Float = wrapped.getFloat(rowId)
+
+  override def getDouble(rowId: Int): Double = wrapped.getDouble(rowId)
+
+  override def getArray(rowId: Int): ColumnarArray = wrapped.getArray(rowId)
+
+  override def getMap(ordinal: Int): ColumnarMap = wrapped.getMap(ordinal)
+
+  override def getDecimal(rowId: Int, precision: Int, scale: Int): Decimal =
+    wrapped.getDecimal(rowId, precision, scale)
+
+  override def getUTF8String(rowId: Int): UTF8String = 
wrapped.getUTF8String(rowId)
+
+  override def getBinary(rowId: Int): Array[Byte] = wrapped.getBinary(rowId)
+
+  override protected def getChild(ordinal: Int): ColumnVector = 
wrapped.getChild(ordinal)
+}
+
+trait ColumnarExpression extends Expression with Serializable {
+  /**
+   * Returns true if this expression supports columnar processing through 
[[columnarEval]].
+   */
+  def supportsColumnar: Boolean = true
+
+  /**
+   * Returns the result of evaluating this expression on the entire
+   * [[org.apache.spark.sql.vectorized.ColumnarBatch]]. The result of
+   * calling this may be a single 
[[org.apache.spark.sql.vectorized.ColumnVector]] or a scalar
+   * value. Scalar values typically happen if they are a part of the 
expression i.e. col("a") + 100.
+   * In this case the 100 is a 
[[org.apache.spark.sql.catalyst.expressions.Literal]] that
+   * [[org.apache.spark.sql.catalyst.expressions.Add]] would have to be able 
to handle.
+   *
+   * By convention any [[org.apache.spark.sql.vectorized.ColumnVector]] 
returned by [[columnarEval]]
+   * is owned by the caller and will need to be closed by them. This can 
happen by putting it into
+   * a [[org.apache.spark.sql.vectorized.ColumnarBatch]] and closing the batch 
or by closing the
+   * vector directly if it is a temporary value.
+   */
+  def columnarEval(batch: ColumnarBatch): Any = {
+    throw new IllegalStateException(s"Internal Error ${this.getClass} has 
column support mismatch")
+  }
+
+  // We need to override equals because we are subclassing a case class
+  override def equals(other: Any): Boolean = {
+    if (!super.equals(other)) {
+      return false
+    }
+    return other.isInstanceOf[ColumnarExpression]
+  }
+
+  override def hashCode(): Int = super.hashCode()
+}
+
+object ColumnarBindReferences extends Logging {
+
+  // Mostly copied from BoundAttribute.scala so we can do columnar processing
+  def bindReference[A <: ColumnarExpression](
+      expression: A,
+      input: AttributeSeq,
+      allowFailures: Boolean = false): A = {
+    expression.transform { case a: AttributeReference =>
+      val ordinal = input.indexOf(a.exprId)
+      if (ordinal == -1) {
+        if (allowFailures) {
+          a
+        } else {
+          sys.error(s"Couldn't find $a in ${input.attrs.mkString("[", ",", 
"]")}")
+        }
+      } else {
+        new ColumnarBoundReference(ordinal, a.dataType, 
input(ordinal).nullable)
+      }
+    }.asInstanceOf[A]
+  }
+
+  /**
+   * A helper function to bind given expressions to an input schema.
+   */
+  def bindReferences[A <: ColumnarExpression](
+      expressions: Seq[A],
+      input: AttributeSeq): Seq[A] = {
+    expressions.map(ColumnarBindReferences.bindReference(_, input))
+  }
+}
+
+class ColumnarBoundReference(ordinal: Int, dataType: DataType, nullable: 
Boolean)
+  extends BoundReference(ordinal, dataType, nullable) with ColumnarExpression {
+
+  override def columnarEval(batch: ColumnarBatch): Any = {
+    // Because of the convention that the returned ColumnVector must be closed 
by the
+    // caller we wrap this column vector so a close is a NOOP, and let the 
original source
+    // of the vector close it.
+    NoCloseColumnVector.wrapIfNeeded(batch.column(ordinal))
+  }
+}
+
+class ColumnarAlias(child: ColumnarExpression, name: String)(
+    override val exprId: ExprId = NamedExpression.newExprId,
+    override val qualifier: Seq[String] = Seq.empty,
+    override val explicitMetadata: Option[Metadata] = None)
+  extends Alias(child, name)(exprId, qualifier, explicitMetadata)
+  with ColumnarExpression {
+
+  override def columnarEval(batch: ColumnarBatch): Any = 
child.columnarEval(batch)
+}
+
+class ColumnarAttributeReference(
+    name: String,
+    dataType: DataType,
+    nullable: Boolean = true,
+    override val metadata: Metadata = Metadata.empty)(
+    override val exprId: ExprId = NamedExpression.newExprId,
+    override val qualifier: Seq[String] = Seq.empty[String])
+  extends AttributeReference(name, dataType, nullable, metadata)(exprId, 
qualifier)
+  with ColumnarExpression {
+
+  // No columnar eval is needed because this must be bound before it is 
evaluated
+}
+
+class ColumnarLiteral (value: Any, dataType: DataType) extends Literal(value, 
dataType)
+  with ColumnarExpression {
+  override def columnarEval(batch: ColumnarBatch): Any = value
+}
+
+/**
+ * A version of ProjectExec that adds in columnar support.
+ */
+class ColumnarProjectExec(projectList: Seq[NamedExpression], child: SparkPlan)
+  extends ProjectExec(projectList, child) {
+
+  override def supportsColumnar: Boolean =
+    projectList.forall(_.asInstanceOf[ColumnarExpression].supportsColumnar)
+
+  // Disable code generation
+  override def supportCodegen: Boolean = false
+
+  override def doExecuteColumnar() : RDD[ColumnarBatch] = {
+    val boundProjectList: Seq[Any] =
+      ColumnarBindReferences.bindReferences(
+        projectList.asInstanceOf[Seq[ColumnarExpression]], child.output)
+    val rdd = child.executeColumnar()
+    rdd.mapPartitions((itr) => CloseableColumnBatchIterator(itr,
+      (cb) => {
+        val newColumns = boundProjectList.map(
+          expr => 
expr.asInstanceOf[ColumnarExpression].columnarEval(cb).asInstanceOf[ColumnVector]
+        ).toArray
+        new ColumnarBatch(newColumns, cb.numRows())
+      })
+    )
+  }
+
+  // We have to override equals because subclassing a case class like 
ProjectExec is not that clean
+  // One of the issues is that the generated equals will see 
ColumnarProjectExec and ProjectExec
+  // as being equal and this can result in the withNewChildren method not 
actually replacing
+  // anything
+  override def equals(other: Any): Boolean = {
+    if (!super.equals(other)) {
+      return false
+    }
+    return other.isInstanceOf[ColumnarProjectExec]
+  }
+
+  override def hashCode(): Int = super.hashCode()
+}
+
+/**
+ * A version of add that supports columnar processing for longs.  This version 
is broken
+ * on purpose so it adds the numbers plus 1 so that the tests can show that it 
was replaced.
+ */
+class BrokenColumnarAdd(left: ColumnarExpression, right: ColumnarExpression)
+  extends Add(left, right) with ColumnarExpression {
+
+  override def supportsColumnar(): Boolean = left.supportsColumnar && 
right.supportsColumnar
+
+  override def columnarEval(batch: ColumnarBatch): Any = {
+    var lhs: Any = null
+    var rhs: Any = null
+    var ret: Any = null
+    try {
+      lhs = left.columnarEval(batch)
+      rhs = right.columnarEval(batch)
+
+      if (lhs == null || rhs == null) {
+        ret = null
+      } else if (lhs.isInstanceOf[ColumnVector] && 
rhs.isInstanceOf[ColumnVector]) {
+        val l = lhs.asInstanceOf[ColumnVector]
+        val r = rhs.asInstanceOf[ColumnVector]
+        val result = new OnHeapColumnVector(batch.numRows(), dataType)
+        ret = result
+
+        for (i <- 0 until batch.numRows()) {
+          result.appendLong(l.getLong(i) + r.getLong(i) + 1) // BUG to show we 
replaced Add
+        }
+      } else if (rhs.isInstanceOf[ColumnVector]) {
+        val l = lhs.asInstanceOf[Long]
+        val r = rhs.asInstanceOf[ColumnVector]
+        val result = new OnHeapColumnVector(batch.numRows(), dataType)
+        ret = result
+
+        for (i <- 0 until batch.numRows()) {
+          result.appendLong(l + r.getLong(i) + 1) // BUG to show we replaced 
Add
+        }
+      } else if (lhs.isInstanceOf[ColumnVector]) {
+        val l = lhs.asInstanceOf[ColumnVector]
+        val r = rhs.asInstanceOf[Long]
+        val result = new OnHeapColumnVector(batch.numRows(), dataType)
+        ret = result
+
+        for (i <- 0 until batch.numRows()) {
+          result.appendLong(l.getLong(i) + r + 1) // BUG to show we replaced 
Add
+        }
+      } else {
+        ret = nullSafeEval(lhs, rhs)
+      }
+    } finally {
+      if (lhs != null && lhs.isInstanceOf[ColumnVector]) {
+        lhs.asInstanceOf[ColumnVector].close()
+      }
+      if (rhs != null && rhs.isInstanceOf[ColumnVector]) {
+        rhs.asInstanceOf[ColumnVector].close()
+      }
+    }
+    ret
+  }
+}
+
+class CannotReplaceException(str: String) extends RuntimeException(str) {
+
+}
+
+case class PreRuleReplaceAddWithBrokenVersion() extends Rule[SparkPlan] {
+  def replaceWithColumnarExpression(exp: Expression): ColumnarExpression = exp 
match {
+    case a: Alias =>
+      new ColumnarAlias(replaceWithColumnarExpression(a.child),
+        a.name)(a.exprId, a.qualifier, a.explicitMetadata)
+    case att: AttributeReference =>
+      new ColumnarAttributeReference(att.name, att.dataType, att.nullable,
+        att.metadata)(att.exprId, att.qualifier)
+    case lit: Literal =>
+      new ColumnarLiteral(lit.value, lit.dataType)
+    case add: Add if (add.dataType == LongType) &&
+      (add.left.dataType == LongType) &&
+      (add.right.dataType == LongType) =>
+      // Add only supports Longs for now.
+      new BrokenColumnarAdd(replaceWithColumnarExpression(add.left),
+        replaceWithColumnarExpression(add.right))
+    case exp =>
+      throw new CannotReplaceException(s"expression " +
+        s"${exp.getClass} ${exp} is not currently supported.")
+  }
+
+  def replaceWithColumnarPlan(plan: SparkPlan): SparkPlan =
+    try {
+      plan match {
+        case plan: ProjectExec =>
+          new ColumnarProjectExec(plan.projectList.map((exp) =>
+            replaceWithColumnarExpression(exp).asInstanceOf[NamedExpression]),
+            replaceWithColumnarPlan(plan.child))
+        case p =>
+          logWarning(s"Columnar processing for ${p.getClass} is not currently 
supported.")
+          p.withNewChildren(p.children.map(replaceWithColumnarPlan))
+      }
+    } catch {
+      case exp: CannotReplaceException =>
+        logWarning(s"Columnar processing for ${plan.getClass} is not currently 
supported" +
+          s"because ${exp.getMessage}")
+        plan
+    }
+
+  override def apply(plan: SparkPlan): SparkPlan = 
replaceWithColumnarPlan(plan)
+}
+
+class ReplacedRowToColumnarExec(override val child: SparkPlan)
+  extends RowToColumnarExec(child) {
+
+  // We have to override equals because subclassing a case class like 
ProjectExec is not that clean
+  // One of the issues is that the generated equals will see 
ColumnarProjectExec and ProjectExec
+  // as being equal and this can result in the withNewChildren method not 
actually replacing
+  // anything
+  override def equals(other: Any): Boolean = {
+    if (!super.equals(other)) {
+      return false
+    }
+    return other.isInstanceOf[ReplacedRowToColumnarExec]
+  }
+
+  override def hashCode(): Int = super.hashCode()
+}
+
+case class MyPostRule() extends Rule[SparkPlan] {
+  override def apply(plan: SparkPlan): SparkPlan = plan match {
+    case rc: RowToColumnarExec => new ReplacedRowToColumnarExec(rc.child)
+    case plan => plan.withNewChildren(plan.children.map(apply))
+  }
+}
+
+case class MyColumarRule(pre: Rule[SparkPlan], post: Rule[SparkPlan]) extends 
ColumnarRule {
+  override def preColumnarTransitions: Rule[SparkPlan] = pre
+  override def postColumnarTransitions: Rule[SparkPlan] = post
+}
+
 class MyExtensions extends (SparkSessionExtensions => Unit) {
   def apply(e: SparkSessionExtensions): Unit = {
     e.injectPlannerStrategy(MySparkStrategy)
@@ -260,6 +660,7 @@ class MyExtensions extends (SparkSessionExtensions => Unit) 
{
     e.injectOptimizerRule(MyRule)
     e.injectParser(MyParser)
     e.injectFunction(MyExtensions.myFunction)
+    e.injectColumnar(session => 
MyColumarRule(PreRuleReplaceAddWithBrokenVersion(), MyPostRule()))
   }
 }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExecSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExecSuite.scala
index 289cc66..8a18a1a 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExecSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExecSuite.scala
@@ -50,7 +50,7 @@ class BatchEvalPythonExecSuite extends SparkPlanTest with 
SharedSQLContext {
     val qualifiedPlanNodes = df.queryExecution.executedPlan.collect {
       case f @ FilterExec(
           And(_: AttributeReference, _: AttributeReference),
-          InputAdapter(_: BatchEvalPythonExec)) => f
+          InputAdapter(_: BatchEvalPythonExec, _)) => f
       case b @ BatchEvalPythonExec(_, _, WholeStageCodegenExec(FilterExec(_: 
In, _))) => b
     }
     assert(qualifiedPlanNodes.size == 2)
@@ -60,7 +60,7 @@ class BatchEvalPythonExecSuite extends SparkPlanTest with 
SharedSQLContext {
     val df = Seq(("Hello", 4)).toDF("a", "b")
       .where("dummyPythonUDF(a, dummyPythonUDF(a, b)) and a in (3, 4)")
     val qualifiedPlanNodes = df.queryExecution.executedPlan.collect {
-      case f @ FilterExec(_: AttributeReference, InputAdapter(_: 
BatchEvalPythonExec)) => f
+      case f @ FilterExec(_: AttributeReference, InputAdapter(_: 
BatchEvalPythonExec, _)) => f
       case b @ BatchEvalPythonExec(_, _, WholeStageCodegenExec(FilterExec(_: 
In, _))) => b
     }
     assert(qualifiedPlanNodes.size == 2)
@@ -72,7 +72,7 @@ class BatchEvalPythonExecSuite extends SparkPlanTest with 
SharedSQLContext {
     val qualifiedPlanNodes = df.queryExecution.executedPlan.collect {
       case f @ FilterExec(
           And(_: AttributeReference, _: GreaterThan),
-          InputAdapter(_: BatchEvalPythonExec)) => f
+          InputAdapter(_: BatchEvalPythonExec, _)) => f
       case b @ BatchEvalPythonExec(_, _, WholeStageCodegenExec(_: FilterExec)) 
=> b
     }
     assert(qualifiedPlanNodes.size == 2)
@@ -85,7 +85,7 @@ class BatchEvalPythonExecSuite extends SparkPlanTest with 
SharedSQLContext {
     val qualifiedPlanNodes = df.queryExecution.executedPlan.collect {
       case f @ FilterExec(
           And(_: AttributeReference, _: GreaterThan),
-          InputAdapter(_: BatchEvalPythonExec)) => f
+          InputAdapter(_: BatchEvalPythonExec, _)) => f
       case b @ BatchEvalPythonExec(_, _, WholeStageCodegenExec(_: FilterExec)) 
=> b
     }
     assert(qualifiedPlanNodes.size == 2)
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
index bee2022..758780c 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
@@ -31,11 +31,14 @@ import org.apache.spark.SparkFunSuite
 import org.apache.spark.memory.MemoryMode
 import org.apache.spark.sql.{RandomDataGenerator, Row}
 import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
+import org.apache.spark.sql.catalyst.util.{ArrayBasedMapBuilder, 
DateTimeUtils, GenericArrayData}
+import org.apache.spark.sql.execution.RowToColumnConverter
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.util.ArrowUtils
 import org.apache.spark.sql.vectorized.{ArrowColumnVector, ColumnarBatch}
 import org.apache.spark.unsafe.Platform
-import org.apache.spark.unsafe.types.CalendarInterval
+import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
 
 class ColumnarBatchSuite extends SparkFunSuite {
 
@@ -1270,6 +1273,211 @@ class ColumnarBatchSuite extends SparkFunSuite {
     allocator.close()
   }
 
+  test("RowToColumnConverter") {
+    val schema = StructType(
+      StructField("str", StringType) ::
+        StructField("bool", BooleanType) ::
+        StructField("byte", ByteType) ::
+        StructField("short", ShortType) ::
+        StructField("int", IntegerType) ::
+        StructField("long", LongType) ::
+        StructField("float", FloatType) ::
+        StructField("double", DoubleType) ::
+        StructField("decimal", DecimalType(25, 5)) ::
+        StructField("date", DateType) ::
+        StructField("ts", TimestampType) ::
+        StructField("cal", CalendarIntervalType) ::
+        StructField("arr_of_int", ArrayType(IntegerType)) ::
+        StructField("int_and_int", StructType(
+          StructField("int1", IntegerType, false) ::
+            StructField("int2", IntegerType) ::
+            Nil
+        )) ::
+        StructField("int_to_int", MapType(IntegerType, IntegerType)) ::
+        Nil)
+    var mapBuilder = new ArrayBasedMapBuilder(IntegerType, IntegerType)
+    mapBuilder.put(1, 10)
+    mapBuilder.put(20, null)
+    val row1 = new GenericInternalRow(Array[Any](
+      UTF8String.fromString("a string"),
+      true,
+      1.toByte,
+      2.toShort,
+      3,
+      Long.MaxValue,
+      0.25.toFloat,
+      0.75D,
+      Decimal("1234.23456"),
+      DateTimeUtils.fromJavaDate(java.sql.Date.valueOf("2015-01-01")),
+      DateTimeUtils.fromJavaTimestamp(java.sql.Timestamp.valueOf("2015-01-01 
23:50:59.123")),
+      new CalendarInterval(1, 0),
+      new GenericArrayData(Array(1, 2, 3, 4, null)),
+      new GenericInternalRow(Array[Any](5.asInstanceOf[Any], 10)),
+      mapBuilder.build()
+    ))
+
+    mapBuilder = new ArrayBasedMapBuilder(IntegerType, IntegerType)
+    mapBuilder.put(30, null)
+    mapBuilder.put(40, 50)
+    val row2 = new GenericInternalRow(Array[Any](
+      UTF8String.fromString("second string"),
+      false,
+      -1.toByte,
+      17.toShort,
+      Int.MinValue,
+      987654321L,
+      Float.NaN,
+      Double.PositiveInfinity,
+      Decimal("0.01000"),
+      DateTimeUtils.fromJavaDate(java.sql.Date.valueOf("1875-12-12")),
+      DateTimeUtils.fromJavaTimestamp(java.sql.Timestamp.valueOf("1880-01-05 
12:45:21.321")),
+      new CalendarInterval(-10, -100),
+      new GenericArrayData(Array(5, 10, -100)),
+      new GenericInternalRow(Array[Any](20.asInstanceOf[Any], null)),
+      mapBuilder.build()
+    ))
+
+    val row3 = new GenericInternalRow(Array[Any](
+      null,
+      null,
+      null,
+      null,
+      null,
+      null,
+      null,
+      null,
+      null,
+      null,
+      null,
+      null,
+      null,
+      null,
+      null
+    ))
+
+    val converter = new RowToColumnConverter(schema)
+    val columns = OnHeapColumnVector.allocateColumns(3, schema)
+    val batch = new ColumnarBatch(columns.toArray, 3)
+    try {
+      converter.convert(row1, columns.toArray)
+      converter.convert(row2, columns.toArray)
+      converter.convert(row3, columns.toArray)
+
+      assert(columns(0).dataType() == StringType)
+      assert(columns(0).getUTF8String(0).toString == "a string")
+      assert(columns(0).getUTF8String(1).toString == "second string")
+      assert(columns(0).isNullAt(2))
+
+      assert(columns(1).dataType() == BooleanType)
+      assert(columns(1).getBoolean(0) == true)
+      assert(columns(1).getBoolean(1) == false)
+      assert(columns(1).isNullAt(2))
+
+      assert(columns(2).dataType() == ByteType)
+      assert(columns(2).getByte(0) == 1.toByte)
+      assert(columns(2).getByte(1) == -1.toByte)
+      assert(columns(2).isNullAt(2))
+
+      assert(columns(3).dataType() == ShortType)
+      assert(columns(3).getShort(0) == 2.toShort)
+      assert(columns(3).getShort(1) == 17.toShort)
+      assert(columns(3).isNullAt(2))
+
+      assert(columns(4).dataType() == IntegerType)
+      assert(columns(4).getInt(0) == 3)
+      assert(columns(4).getInt(1) == Int.MinValue)
+      assert(columns(4).isNullAt(2))
+
+      assert(columns(5).dataType() == LongType)
+      assert(columns(5).getLong(0) == Long.MaxValue)
+      assert(columns(5).getLong(1) == 987654321L)
+      assert(columns(5).isNullAt(2))
+
+      assert(columns(6).dataType() == FloatType)
+      assert(columns(6).getFloat(0) == 0.25.toFloat)
+      assert(columns(6).getFloat(1).isNaN)
+      assert(columns(6).isNullAt(2))
+
+      assert(columns(7).dataType() == DoubleType)
+      assert(columns(7).getDouble(0) == 0.75D)
+      assert(columns(7).getDouble(1) == Double.PositiveInfinity)
+      assert(columns(7).isNullAt(2))
+
+      assert(columns(8).dataType() == DecimalType(25, 5))
+      assert(columns(8).getDecimal(0, 25, 5) == Decimal("1234.23456"))
+      assert(columns(8).getDecimal(1, 25, 5) == Decimal("0.01000"))
+      assert(columns(8).isNullAt(2))
+
+      assert(columns(9).dataType() == DateType)
+      assert(columns(9).getInt(0) ==
+        DateTimeUtils.fromJavaDate(java.sql.Date.valueOf("2015-01-01")))
+      assert(columns(9).getInt(1) ==
+        DateTimeUtils.fromJavaDate(java.sql.Date.valueOf("1875-12-12")))
+      assert(columns(9).isNullAt(2))
+
+      assert(columns(10).dataType() == TimestampType)
+      assert(columns(10).getLong(0) ==
+        DateTimeUtils.fromJavaTimestamp(java.sql.Timestamp.valueOf("2015-01-01 
23:50:59.123")))
+      assert(columns(10).getLong(1) ==
+        DateTimeUtils.fromJavaTimestamp(java.sql.Timestamp.valueOf("1880-01-05 
12:45:21.321")))
+      assert(columns(10).isNullAt(2))
+
+      assert(columns(11).dataType() == CalendarIntervalType)
+      assert(columns(11).getInterval(0) == new CalendarInterval(1, 0))
+      assert(columns(11).getInterval(1) == new CalendarInterval(-10, -100))
+      assert(columns(11).isNullAt(2))
+
+      assert(columns(12).dataType() == ArrayType(IntegerType))
+      val arr1 = columns(12).getArray(0)
+      assert(arr1.numElements() == 5)
+      assert(arr1.getInt(0) == 1)
+      assert(arr1.getInt(1) == 2)
+      assert(arr1.getInt(2) == 3)
+      assert(arr1.getInt(3) == 4)
+      assert(arr1.isNullAt(4))
+
+      val arr2 = columns(12).getArray(1)
+      assert(arr2.numElements() == 3)
+      assert(arr2.getInt(0) == 5)
+      assert(arr2.getInt(1) == 10)
+      assert(arr2.getInt(2) == -100)
+
+      assert(columns(12).isNullAt(2))
+
+      assert(columns(13).dataType() == StructType(
+        StructField("int1", IntegerType, false) ::
+          StructField("int2", IntegerType) ::
+          Nil
+      ))
+      val struct1 = columns(13).getStruct(0)
+      assert(struct1.getInt(0) == 5)
+      assert(struct1.getInt(1) == 10)
+      val struct2 = columns(13).getStruct(1)
+      assert(struct2.getInt(0) == 20)
+      assert(struct2.isNullAt(1))
+      assert(columns(13).isNullAt(2))
+
+      assert(columns(14).dataType() == MapType(IntegerType, IntegerType))
+      val map1 = columns(14).getMap(0)
+      assert(map1.numElements() == 2)
+      assert(map1.keyArray().getInt(0) == 1)
+      assert(map1.valueArray().getInt(0) == 10)
+      assert(map1.keyArray().getInt(1) == 20)
+      assert(map1.valueArray().isNullAt(1))
+
+      val map2 = columns(14).getMap(1)
+      assert(map2.numElements() == 2)
+      assert(map2.keyArray().getInt(0) == 30)
+      assert(map2.valueArray().isNullAt(0))
+      assert(map2.keyArray().getInt(1) == 40)
+      assert(map2.valueArray().getInt(1) == 50)
+
+      assert(columns(14).isNullAt(2))
+    } finally {
+      batch.close()
+    }
+  }
+
   testVector("Decimal API", 4, DecimalType.IntDecimal) {
     column =>
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to