Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/23262#discussion_r240151214 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala --- @@ -17,51 +17,39 @@ package org.apache.spark.sql.execution +import scala.reflect.runtime.universe.TypeTag + import org.apache.spark.rdd.RDD import org.apache.spark.sql.{Encoder, Row, SparkSession} -import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} +import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation +import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, UnknownPartitioning} import org.apache.spark.sql.catalyst.util.truncatedString import org.apache.spark.sql.execution.metric.SQLMetrics -import org.apache.spark.sql.types.DataType +import org.apache.spark.sql.types.StructType object RDDConversions { - def productToRowRdd[A <: Product](data: RDD[A], outputTypes: Seq[DataType]): RDD[InternalRow] = { + def productToRowRdd[A <: Product : TypeTag](data: RDD[A], + outputSchema: StructType): RDD[InternalRow] = { + val converters = ExpressionEncoder[A].resolveAndBind(outputSchema.toAttributes) data.mapPartitions { iterator => - val numColumns = outputTypes.length - val mutableRow = new GenericInternalRow(numColumns) - val converters = outputTypes.map(CatalystTypeConverters.createToCatalystConverter) iterator.map { r => - var i = 0 - while (i < numColumns) { - mutableRow(i) = converters(i)(r.productElement(i)) - i += 1 - } - - mutableRow + converters.toRow(r) } } } /** * Convert the objects inside Row into the types Catalyst expected. */ - def rowToRowRdd(data: RDD[Row], outputTypes: Seq[DataType]): RDD[InternalRow] = { + def rowToRowRdd(data: RDD[Row], outputSchema: StructType): RDD[InternalRow] = { + val converters = RowEncoder(outputSchema) --- End diff -- I checked each case. Every case looks fine except one case: https://github.com/apache/spark/blob/fa0d4bf69929c5acd676d602e758a969713d19d8/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala#L289 Looks we're going to drop `Char` as `StringType`. I think it's trivial and rather a mistake that we supported this. I don't feel strongly about documenting it in migration guide but if anyone feels so, we better do that.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org