Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23262#discussion_r240141142
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala ---
    @@ -17,51 +17,39 @@
     
     package org.apache.spark.sql.execution
     
    +import scala.reflect.runtime.universe.TypeTag
    +
     import org.apache.spark.rdd.RDD
     import org.apache.spark.sql.{Encoder, Row, SparkSession}
    -import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
    +import org.apache.spark.sql.catalyst.InternalRow
     import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
    +import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, 
RowEncoder}
     import org.apache.spark.sql.catalyst.expressions._
     import org.apache.spark.sql.catalyst.plans.logical._
     import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, 
UnknownPartitioning}
     import org.apache.spark.sql.catalyst.util.truncatedString
     import org.apache.spark.sql.execution.metric.SQLMetrics
    -import org.apache.spark.sql.types.DataType
    +import org.apache.spark.sql.types.StructType
     
     object RDDConversions {
    -  def productToRowRdd[A <: Product](data: RDD[A], outputTypes: 
Seq[DataType]): RDD[InternalRow] = {
    +  def productToRowRdd[A <: Product : TypeTag](data: RDD[A],
    +                                              outputSchema: StructType): 
RDD[InternalRow] = {
    +    val converters = 
ExpressionEncoder[A].resolveAndBind(outputSchema.toAttributes)
         data.mapPartitions { iterator =>
    -      val numColumns = outputTypes.length
    -      val mutableRow = new GenericInternalRow(numColumns)
    -      val converters = 
outputTypes.map(CatalystTypeConverters.createToCatalystConverter)
           iterator.map { r =>
    -        var i = 0
    -        while (i < numColumns) {
    -          mutableRow(i) = converters(i)(r.productElement(i))
    -          i += 1
    -        }
    -
    -        mutableRow
    +        converters.toRow(r)
           }
         }
       }
     
       /**
        * Convert the objects inside Row into the types Catalyst expected.
        */
    -  def rowToRowRdd(data: RDD[Row], outputTypes: Seq[DataType]): 
RDD[InternalRow] = {
    +  def rowToRowRdd(data: RDD[Row], outputSchema: StructType): 
RDD[InternalRow] = {
    --- End diff --
    
    Let's remove whole object. `rowToRowRdd` looks only being used at one place 
and the code here is quite small.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to