[GitHub] spark pull request #23262: [SPARK-26312][SQL]Converting converters in RDDCon...

2018-12-10 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23262#discussion_r240180713
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
 ---
@@ -416,7 +416,12 @@ case class DataSourceStrategy(conf: SQLConf) extends 
Strategy with Logging with
   output: Seq[Attribute],
   rdd: RDD[Row]): RDD[InternalRow] = {
 if (relation.relation.needConversion) {
-  execution.RDDConversions.rowToRowRdd(rdd, output.map(_.dataType))
+  val converters = RowEncoder(StructType.fromAttributes(output))
+  rdd.mapPartitions { iterator =>
+iterator.map { r =>
--- End diff --

nit: `iterator.map(converters.toRow)`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23262: [SPARK-26312][SQL]Converting converters in RDDCon...

2018-12-10 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/23262#discussion_r240151214
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala ---
@@ -17,51 +17,39 @@
 
 package org.apache.spark.sql.execution
 
+import scala.reflect.runtime.universe.TypeTag
+
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{Encoder, Row, SparkSession}
-import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
+import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
+import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, 
RowEncoder}
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, 
UnknownPartitioning}
 import org.apache.spark.sql.catalyst.util.truncatedString
 import org.apache.spark.sql.execution.metric.SQLMetrics
-import org.apache.spark.sql.types.DataType
+import org.apache.spark.sql.types.StructType
 
 object RDDConversions {
-  def productToRowRdd[A <: Product](data: RDD[A], outputTypes: 
Seq[DataType]): RDD[InternalRow] = {
+  def productToRowRdd[A <: Product : TypeTag](data: RDD[A],
+  outputSchema: StructType): 
RDD[InternalRow] = {
+val converters = 
ExpressionEncoder[A].resolveAndBind(outputSchema.toAttributes)
 data.mapPartitions { iterator =>
-  val numColumns = outputTypes.length
-  val mutableRow = new GenericInternalRow(numColumns)
-  val converters = 
outputTypes.map(CatalystTypeConverters.createToCatalystConverter)
   iterator.map { r =>
-var i = 0
-while (i < numColumns) {
-  mutableRow(i) = converters(i)(r.productElement(i))
-  i += 1
-}
-
-mutableRow
+converters.toRow(r)
   }
 }
   }
 
   /**
* Convert the objects inside Row into the types Catalyst expected.
*/
-  def rowToRowRdd(data: RDD[Row], outputTypes: Seq[DataType]): 
RDD[InternalRow] = {
+  def rowToRowRdd(data: RDD[Row], outputSchema: StructType): 
RDD[InternalRow] = {
+val converters = RowEncoder(outputSchema)
--- End diff --

I checked each case. Every case looks fine except one case:


https://github.com/apache/spark/blob/fa0d4bf69929c5acd676d602e758a969713d19d8/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala#L289

Looks we're going to drop `Char` as `StringType`. I think it's trivial and 
rather a mistake that we supported this. I don't feel strongly about 
documenting it in migration guide but if anyone feels so, we better do that.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23262: [SPARK-26312][SQL]Converting converters in RDDCon...

2018-12-10 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/23262#discussion_r240141142
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala ---
@@ -17,51 +17,39 @@
 
 package org.apache.spark.sql.execution
 
+import scala.reflect.runtime.universe.TypeTag
+
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{Encoder, Row, SparkSession}
-import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
+import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
+import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, 
RowEncoder}
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, 
UnknownPartitioning}
 import org.apache.spark.sql.catalyst.util.truncatedString
 import org.apache.spark.sql.execution.metric.SQLMetrics
-import org.apache.spark.sql.types.DataType
+import org.apache.spark.sql.types.StructType
 
 object RDDConversions {
-  def productToRowRdd[A <: Product](data: RDD[A], outputTypes: 
Seq[DataType]): RDD[InternalRow] = {
+  def productToRowRdd[A <: Product : TypeTag](data: RDD[A],
+  outputSchema: StructType): 
RDD[InternalRow] = {
+val converters = 
ExpressionEncoder[A].resolveAndBind(outputSchema.toAttributes)
 data.mapPartitions { iterator =>
-  val numColumns = outputTypes.length
-  val mutableRow = new GenericInternalRow(numColumns)
-  val converters = 
outputTypes.map(CatalystTypeConverters.createToCatalystConverter)
   iterator.map { r =>
-var i = 0
-while (i < numColumns) {
-  mutableRow(i) = converters(i)(r.productElement(i))
-  i += 1
-}
-
-mutableRow
+converters.toRow(r)
   }
 }
   }
 
   /**
* Convert the objects inside Row into the types Catalyst expected.
*/
-  def rowToRowRdd(data: RDD[Row], outputTypes: Seq[DataType]): 
RDD[InternalRow] = {
+  def rowToRowRdd(data: RDD[Row], outputSchema: StructType): 
RDD[InternalRow] = {
--- End diff --

Let's remove whole object. `rowToRowRdd` looks only being used at one place 
and the code here is quite small.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23262: [SPARK-26312][SQL]Converting converters in RDDCon...

2018-12-10 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/23262#discussion_r240134191
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala ---
@@ -17,51 +17,39 @@
 
 package org.apache.spark.sql.execution
 
+import scala.reflect.runtime.universe.TypeTag
+
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{Encoder, Row, SparkSession}
-import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
+import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
+import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, 
RowEncoder}
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, 
UnknownPartitioning}
 import org.apache.spark.sql.catalyst.util.truncatedString
 import org.apache.spark.sql.execution.metric.SQLMetrics
-import org.apache.spark.sql.types.DataType
+import org.apache.spark.sql.types.StructType
 
 object RDDConversions {
-  def productToRowRdd[A <: Product](data: RDD[A], outputTypes: 
Seq[DataType]): RDD[InternalRow] = {
+  def productToRowRdd[A <: Product : TypeTag](data: RDD[A],
+  outputSchema: StructType): 
RDD[InternalRow] = {
--- End diff --

nit: indent


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23262: [SPARK-26312][SQL]Converting converters in RDDCon...

2018-12-10 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/23262#discussion_r240135694
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala ---
@@ -17,51 +17,39 @@
 
 package org.apache.spark.sql.execution
 
+import scala.reflect.runtime.universe.TypeTag
+
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{Encoder, Row, SparkSession}
-import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
+import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
+import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, 
RowEncoder}
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, 
UnknownPartitioning}
 import org.apache.spark.sql.catalyst.util.truncatedString
 import org.apache.spark.sql.execution.metric.SQLMetrics
-import org.apache.spark.sql.types.DataType
+import org.apache.spark.sql.types.StructType
 
 object RDDConversions {
-  def productToRowRdd[A <: Product](data: RDD[A], outputTypes: 
Seq[DataType]): RDD[InternalRow] = {
+  def productToRowRdd[A <: Product : TypeTag](data: RDD[A],
+  outputSchema: StructType): 
RDD[InternalRow] = {
--- End diff --

well, seems like this is never used actually... shall we remove it instead 
if this is the case?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23262: [SPARK-26312][SQL]Converting converters in RDDCon...

2018-12-10 Thread eatoncys
Github user eatoncys commented on a diff in the pull request:

https://github.com/apache/spark/pull/23262#discussion_r240114106
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala ---
@@ -53,7 +53,7 @@ object RDDConversions {
 data.mapPartitions { iterator =>
   val numColumns = outputTypes.length
   val mutableRow = new GenericInternalRow(numColumns)
-  val converters = 
outputTypes.map(CatalystTypeConverters.createToCatalystConverter)
+  val converters = 
outputTypes.map(CatalystTypeConverters.createToCatalystConverter).toArray
--- End diff --

It has been modified, and the performance is the same as converting to 
arrays.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23262: [SPARK-26312][SQL]Converting converters in RDDCon...

2018-12-10 Thread eatoncys
Github user eatoncys commented on a diff in the pull request:

https://github.com/apache/spark/pull/23262#discussion_r240113822
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala ---
@@ -33,7 +33,7 @@ object RDDConversions {
 data.mapPartitions { iterator =>
   val numColumns = outputTypes.length
   val mutableRow = new GenericInternalRow(numColumns)
-  val converters = 
outputTypes.map(CatalystTypeConverters.createToCatalystConverter)
+  val converters = 
outputTypes.map(CatalystTypeConverters.createToCatalystConverter).toArray
--- End diff --

It is a good suggestion, and has been modified, would you like to review it 
again, thanks.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23262: [SPARK-26312][SQL]Converting converters in RDDCon...

2018-12-08 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23262#discussion_r240026394
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala ---
@@ -53,7 +53,7 @@ object RDDConversions {
 data.mapPartitions { iterator =>
   val numColumns = outputTypes.length
   val mutableRow = new GenericInternalRow(numColumns)
-  val converters = 
outputTypes.map(CatalystTypeConverters.createToCatalystConverter)
+  val converters = 
outputTypes.map(CatalystTypeConverters.createToCatalystConverter).toArray
--- End diff --

shall we use `RowEncoder` here?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23262: [SPARK-26312][SQL]Converting converters in RDDCon...

2018-12-08 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23262#discussion_r240026388
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala ---
@@ -33,7 +33,7 @@ object RDDConversions {
 data.mapPartitions { iterator =>
   val numColumns = outputTypes.length
   val mutableRow = new GenericInternalRow(numColumns)
-  val converters = 
outputTypes.map(CatalystTypeConverters.createToCatalystConverter)
+  val converters = 
outputTypes.map(CatalystTypeConverters.createToCatalystConverter).toArray
--- End diff --

shall we use `ExpressionEncoder` here?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23262: [SPARK-26312][SQL]Converting converters in RDDCon...

2018-12-07 Thread eatoncys
GitHub user eatoncys opened a pull request:

https://github.com/apache/spark/pull/23262

[SPARK-26312][SQL]Converting converters in RDDConversions into arrays to 
improve their access performance


## What changes were proposed in this pull request?

`RDDConversions` would get disproportionately slower as the number of 
columns in the query increased.
This PR converts the `converters` in `RDDConversions` into arrays to 
improve their access performance, the type of `converters` before is 
`scala.collection.immutable.::` which is a subtype of list.

The test of `PrunedScanSuite` for 2000 columns and 20k rows takes 409 
seconds before this PR, and 361 seconds after.

## How was this patch tested?

Test case of `PrunedScanSuite` 

Please review http://spark.apache.org/contributing.html before opening a 
pull request.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/eatoncys/spark toarray

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/23262.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #23262


commit ddb252892a439281b16bc14fdfdb7faf756f1067
Author: 10129659 
Date:   2018-12-08T07:15:10Z

Converting converters in RDDConversions into arrays to improve their access 
performance




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org