Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/18747#discussion_r144609180 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala --- @@ -23,21 +23,37 @@ import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning} -import org.apache.spark.sql.execution.LeafExecNode -import org.apache.spark.sql.execution.metric.SQLMetrics +import org.apache.spark.sql.execution.{ColumnarBatchScan, LeafExecNode} +import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector import org.apache.spark.sql.types.UserDefinedType case class InMemoryTableScanExec( attributes: Seq[Attribute], predicates: Seq[Expression], @transient relation: InMemoryRelation) - extends LeafExecNode { + extends LeafExecNode with ColumnarBatchScan { override protected def innerChildren: Seq[QueryPlan[_]] = Seq(relation) ++ super.innerChildren - override lazy val metrics = Map( - "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) + override def vectorTypes: Option[Seq[String]] = + Option(Seq.fill(attributes.length)(classOf[OnHeapColumnVector].getName)) + + override val columnIndexes = + attributes.map(a => relation.output.map(o => o.exprId).indexOf(a.exprId)).toArray + + override val supportCodegen: Boolean = relation.useColumnarBatches + + override def inputRDDs(): Seq[RDD[InternalRow]] = { + if (supportCodegen) { + val buffers = relation.cachedColumnBuffers + // HACK ALERT: This is actually an RDD[CachedBatch]. + // We're taking advantage of Scala's type erasure here to pass these batches along. + Seq(buffers.asInstanceOf[RDD[InternalRow]]) --- End diff -- Yes, I break that assumption ([`RDD[CachedBatch]`](https://github.com/apache/spark/pull/18747/files#diff-2654ea5e4013b9ad5f43d64b68413225R115)) since we have to create `ColumnarBatch` when it will be read. Should we convert `CachedBatch` to `ColumnarBatch` here?
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org