Github user kiszk commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18747#discussion_r145602871
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
 ---
    @@ -23,21 +23,72 @@ import org.apache.spark.sql.catalyst.dsl.expressions._
     import org.apache.spark.sql.catalyst.expressions._
     import org.apache.spark.sql.catalyst.plans.QueryPlan
     import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, 
Partitioning}
    -import org.apache.spark.sql.execution.LeafExecNode
    -import org.apache.spark.sql.execution.metric.SQLMetrics
    -import org.apache.spark.sql.types.UserDefinedType
    +import org.apache.spark.sql.execution.{ColumnarBatchScan, LeafExecNode, 
WholeStageCodegenExec}
    +import org.apache.spark.sql.execution.vectorized._
    +import org.apache.spark.sql.types._
     
     
     case class InMemoryTableScanExec(
         attributes: Seq[Attribute],
         predicates: Seq[Expression],
         @transient relation: InMemoryRelation)
    -  extends LeafExecNode {
    +  extends LeafExecNode with ColumnarBatchScan {
     
       override protected def innerChildren: Seq[QueryPlan[_]] = Seq(relation) 
++ super.innerChildren
     
    -  override lazy val metrics = Map(
    -    "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of 
output rows"))
    +  override def vectorTypes: Option[Seq[String]] =
    +    
Option(Seq.fill(attributes.length)(classOf[OnHeapColumnVector].getName))
    +
    +  /**
    +   * If true, get data from ColumnVector in ColumnarBatch, which are 
generally faster.
    +   * If false, get data from UnsafeRow build from ColumnVector
    +   */
    +  override val supportCodegen: Boolean = {
    +    // In the initial implementation, for ease of review
    +    // support only primitive data types and # of fields is less than 
wholeStageMaxNumFields
    +    val schema = StructType.fromAttributes(relation.output)
    +    schema.fields.find(f => f.dataType match {
    +      case BooleanType | ByteType | ShortType | IntegerType | LongType |
    +           FloatType | DoubleType => false
    +      case _ => true
    +    }).isEmpty &&
    +      !WholeStageCodegenExec.isTooManyFields(conf, relation.schema) &&
    +      children.find(p => WholeStageCodegenExec.isTooManyFields(conf, 
p.schema)).isEmpty
    +  }
    +
    +  private val columnIndices =
    +    attributes.map(a => relation.output.map(o => 
o.exprId).indexOf(a.exprId)).toArray
    +
    +  private val relationSchema = relation.schema.toArray
    +
    +  private lazy val columnarBatchSchema = new 
StructType(columnIndices.map(i => relationSchema(i)))
    +
    +  private def createAndDecompressColumn(cachedColumnarBatch: CachedBatch): 
ColumnarBatch = {
    +    val rowCount = cachedColumnarBatch.numRows
    +    val columnVectors = OnHeapColumnVector.allocateColumns(rowCount, 
columnarBatchSchema)
    --- End diff --
    
    I see. Let us make a follow-up PR in the future.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to