[GitHub] spark pull request #23262: [SPARK-26312][SQL]Converting converters in RDDCon...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23262#discussion_r240180713 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala --- @@ -416,7 +416,12 @@ case class DataSourceStrategy(conf: SQLConf) extends Strategy with Logging with output: Seq[Attribute], rdd: RDD[Row]): RDD[InternalRow] = { if (relation.relation.needConversion) { - execution.RDDConversions.rowToRowRdd(rdd, output.map(_.dataType)) + val converters = RowEncoder(StructType.fromAttributes(output)) + rdd.mapPartitions { iterator => +iterator.map { r => --- End diff -- nit: `iterator.map(converters.toRow)` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23262: [SPARK-26312][SQL]Converting converters in RDDCon...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/23262#discussion_r240151214 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala --- @@ -17,51 +17,39 @@ package org.apache.spark.sql.execution +import scala.reflect.runtime.universe.TypeTag + import org.apache.spark.rdd.RDD import org.apache.spark.sql.{Encoder, Row, SparkSession} -import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} +import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation +import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, UnknownPartitioning} import org.apache.spark.sql.catalyst.util.truncatedString import org.apache.spark.sql.execution.metric.SQLMetrics -import org.apache.spark.sql.types.DataType +import org.apache.spark.sql.types.StructType object RDDConversions { - def productToRowRdd[A <: Product](data: RDD[A], outputTypes: Seq[DataType]): RDD[InternalRow] = { + def productToRowRdd[A <: Product : TypeTag](data: RDD[A], + outputSchema: StructType): RDD[InternalRow] = { +val converters = ExpressionEncoder[A].resolveAndBind(outputSchema.toAttributes) data.mapPartitions { iterator => - val numColumns = outputTypes.length - val mutableRow = new GenericInternalRow(numColumns) - val converters = outputTypes.map(CatalystTypeConverters.createToCatalystConverter) iterator.map { r => -var i = 0 -while (i < numColumns) { - mutableRow(i) = converters(i)(r.productElement(i)) - i += 1 -} - -mutableRow +converters.toRow(r) } } } /** * Convert the objects inside Row into the types Catalyst expected. */ - def rowToRowRdd(data: RDD[Row], outputTypes: Seq[DataType]): RDD[InternalRow] = { + def rowToRowRdd(data: RDD[Row], outputSchema: StructType): RDD[InternalRow] = { +val converters = RowEncoder(outputSchema) --- End diff -- I checked each case. Every case looks fine except one case: https://github.com/apache/spark/blob/fa0d4bf69929c5acd676d602e758a969713d19d8/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala#L289 Looks we're going to drop `Char` as `StringType`. I think it's trivial and rather a mistake that we supported this. I don't feel strongly about documenting it in migration guide but if anyone feels so, we better do that. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23262: [SPARK-26312][SQL]Converting converters in RDDCon...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/23262#discussion_r240141142 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala --- @@ -17,51 +17,39 @@ package org.apache.spark.sql.execution +import scala.reflect.runtime.universe.TypeTag + import org.apache.spark.rdd.RDD import org.apache.spark.sql.{Encoder, Row, SparkSession} -import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} +import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation +import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, UnknownPartitioning} import org.apache.spark.sql.catalyst.util.truncatedString import org.apache.spark.sql.execution.metric.SQLMetrics -import org.apache.spark.sql.types.DataType +import org.apache.spark.sql.types.StructType object RDDConversions { - def productToRowRdd[A <: Product](data: RDD[A], outputTypes: Seq[DataType]): RDD[InternalRow] = { + def productToRowRdd[A <: Product : TypeTag](data: RDD[A], + outputSchema: StructType): RDD[InternalRow] = { +val converters = ExpressionEncoder[A].resolveAndBind(outputSchema.toAttributes) data.mapPartitions { iterator => - val numColumns = outputTypes.length - val mutableRow = new GenericInternalRow(numColumns) - val converters = outputTypes.map(CatalystTypeConverters.createToCatalystConverter) iterator.map { r => -var i = 0 -while (i < numColumns) { - mutableRow(i) = converters(i)(r.productElement(i)) - i += 1 -} - -mutableRow +converters.toRow(r) } } } /** * Convert the objects inside Row into the types Catalyst expected. */ - def rowToRowRdd(data: RDD[Row], outputTypes: Seq[DataType]): RDD[InternalRow] = { + def rowToRowRdd(data: RDD[Row], outputSchema: StructType): RDD[InternalRow] = { --- End diff -- Let's remove whole object. `rowToRowRdd` looks only being used at one place and the code here is quite small. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23262: [SPARK-26312][SQL]Converting converters in RDDCon...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/23262#discussion_r240134191 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala --- @@ -17,51 +17,39 @@ package org.apache.spark.sql.execution +import scala.reflect.runtime.universe.TypeTag + import org.apache.spark.rdd.RDD import org.apache.spark.sql.{Encoder, Row, SparkSession} -import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} +import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation +import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, UnknownPartitioning} import org.apache.spark.sql.catalyst.util.truncatedString import org.apache.spark.sql.execution.metric.SQLMetrics -import org.apache.spark.sql.types.DataType +import org.apache.spark.sql.types.StructType object RDDConversions { - def productToRowRdd[A <: Product](data: RDD[A], outputTypes: Seq[DataType]): RDD[InternalRow] = { + def productToRowRdd[A <: Product : TypeTag](data: RDD[A], + outputSchema: StructType): RDD[InternalRow] = { --- End diff -- nit: indent --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23262: [SPARK-26312][SQL]Converting converters in RDDCon...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/23262#discussion_r240135694 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala --- @@ -17,51 +17,39 @@ package org.apache.spark.sql.execution +import scala.reflect.runtime.universe.TypeTag + import org.apache.spark.rdd.RDD import org.apache.spark.sql.{Encoder, Row, SparkSession} -import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} +import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation +import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, UnknownPartitioning} import org.apache.spark.sql.catalyst.util.truncatedString import org.apache.spark.sql.execution.metric.SQLMetrics -import org.apache.spark.sql.types.DataType +import org.apache.spark.sql.types.StructType object RDDConversions { - def productToRowRdd[A <: Product](data: RDD[A], outputTypes: Seq[DataType]): RDD[InternalRow] = { + def productToRowRdd[A <: Product : TypeTag](data: RDD[A], + outputSchema: StructType): RDD[InternalRow] = { --- End diff -- well, seems like this is never used actually... shall we remove it instead if this is the case? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23262: [SPARK-26312][SQL]Converting converters in RDDCon...
Github user eatoncys commented on a diff in the pull request: https://github.com/apache/spark/pull/23262#discussion_r240114106 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala --- @@ -53,7 +53,7 @@ object RDDConversions { data.mapPartitions { iterator => val numColumns = outputTypes.length val mutableRow = new GenericInternalRow(numColumns) - val converters = outputTypes.map(CatalystTypeConverters.createToCatalystConverter) + val converters = outputTypes.map(CatalystTypeConverters.createToCatalystConverter).toArray --- End diff -- It has been modified, and the performance is the same as converting to arrays. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23262: [SPARK-26312][SQL]Converting converters in RDDCon...
Github user eatoncys commented on a diff in the pull request: https://github.com/apache/spark/pull/23262#discussion_r240113822 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala --- @@ -33,7 +33,7 @@ object RDDConversions { data.mapPartitions { iterator => val numColumns = outputTypes.length val mutableRow = new GenericInternalRow(numColumns) - val converters = outputTypes.map(CatalystTypeConverters.createToCatalystConverter) + val converters = outputTypes.map(CatalystTypeConverters.createToCatalystConverter).toArray --- End diff -- It is a good suggestion, and has been modified, would you like to review it again, thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23262: [SPARK-26312][SQL]Converting converters in RDDCon...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23262#discussion_r240026394 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala --- @@ -53,7 +53,7 @@ object RDDConversions { data.mapPartitions { iterator => val numColumns = outputTypes.length val mutableRow = new GenericInternalRow(numColumns) - val converters = outputTypes.map(CatalystTypeConverters.createToCatalystConverter) + val converters = outputTypes.map(CatalystTypeConverters.createToCatalystConverter).toArray --- End diff -- shall we use `RowEncoder` here? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23262: [SPARK-26312][SQL]Converting converters in RDDCon...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23262#discussion_r240026388 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala --- @@ -33,7 +33,7 @@ object RDDConversions { data.mapPartitions { iterator => val numColumns = outputTypes.length val mutableRow = new GenericInternalRow(numColumns) - val converters = outputTypes.map(CatalystTypeConverters.createToCatalystConverter) + val converters = outputTypes.map(CatalystTypeConverters.createToCatalystConverter).toArray --- End diff -- shall we use `ExpressionEncoder` here? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23262: [SPARK-26312][SQL]Converting converters in RDDCon...
GitHub user eatoncys opened a pull request: https://github.com/apache/spark/pull/23262 [SPARK-26312][SQL]Converting converters in RDDConversions into arrays to improve their access performance ## What changes were proposed in this pull request? `RDDConversions` would get disproportionately slower as the number of columns in the query increased. This PR converts the `converters` in `RDDConversions` into arrays to improve their access performance, the type of `converters` before is `scala.collection.immutable.::` which is a subtype of list. The test of `PrunedScanSuite` for 2000 columns and 20k rows takes 409 seconds before this PR, and 361 seconds after. ## How was this patch tested? Test case of `PrunedScanSuite` Please review http://spark.apache.org/contributing.html before opening a pull request. You can merge this pull request into a Git repository by running: $ git pull https://github.com/eatoncys/spark toarray Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/23262.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #23262 commit ddb252892a439281b16bc14fdfdb7faf756f1067 Author: 10129659 Date: 2018-12-08T07:15:10Z Converting converters in RDDConversions into arrays to improve their access performance --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org