Repository: spark
Updated Branches:
  refs/heads/master 3b6107704 -> 9db73ec12


[SPARK-8381][SQL]reuse typeConvert when convert Seq[Row] to catalyst type

reuse-typeConvert when convert Seq[Row] to CatalystType

Author: Lianhui Wang <lianhuiwan...@gmail.com>

Closes #6831 from lianhuiwang/reuse-typeConvert and squashes the following 
commits:

1fec395 [Lianhui Wang] remove CatalystTypeConverters.convertToCatalyst
714462d [Lianhui Wang] add package[sql]
9d1fbf3 [Lianhui Wang] address JoshRosen's comments
768956f [Lianhui Wang] update scala style
4498c62 [Lianhui Wang] reuse typeConvert


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9db73ec1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9db73ec1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9db73ec1

Branch: refs/heads/master
Commit: 9db73ec12412f6809030546cf69dcb32d2c8e0fe
Parents: 3b61077
Author: Lianhui Wang <lianhuiwan...@gmail.com>
Authored: Wed Jun 17 22:52:47 2015 -0700
Committer: Josh Rosen <joshro...@databricks.com>
Committed: Wed Jun 17 22:52:47 2015 -0700

----------------------------------------------------------------------
 .../spark/sql/catalyst/CatalystTypeConverters.scala       | 10 ----------
 .../apache/spark/sql/catalyst/ScalaReflectionSuite.scala  |  4 ++--
 .../src/main/scala/org/apache/spark/sql/DataFrame.scala   |  8 ++++----
 .../src/main/scala/org/apache/spark/sql/SQLContext.scala  |  8 ++++----
 .../scala/org/apache/spark/sql/execution/commands.scala   |  4 ++--
 5 files changed, 12 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/9db73ec1/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
index 6175456..620e8de 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
@@ -336,16 +336,6 @@ object CatalystTypeConverters {
   }
 
   /**
-   * Converts Scala objects to catalyst rows / types. This method is slow, and 
for batch
-   * conversion you should be using converter produced by 
createToCatalystConverter.
-   * Note: This is always called after schemaFor has been called.
-   *       This ordering is important for UDT registration.
-   */
-  def convertToCatalyst(scalaValue: Any, dataType: DataType): Any = {
-    getConverterForType(dataType).toCatalyst(scalaValue)
-  }
-
-  /**
    * Creates a converter function that will convert Scala objects to the 
specified Catalyst type.
    * Typical use case would be converting a collection of rows that have the 
same schema. You will
    * call this function once to get a converter, and apply it to every row.

http://git-wip-us.apache.org/repos/asf/spark/blob/9db73ec1/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala
index c2d739b..b4b00f5 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala
@@ -258,7 +258,7 @@ class ScalaReflectionSuite extends SparkFunSuite {
     val data = PrimitiveData(1, 1, 1, 1, 1, 1, true)
     val convertedData = InternalRow(1, 1.toLong, 1.toDouble, 1.toFloat, 
1.toShort, 1.toByte, true)
     val dataType = schemaFor[PrimitiveData].dataType
-    assert(CatalystTypeConverters.convertToCatalyst(data, dataType) === 
convertedData)
+    assert(CatalystTypeConverters.createToCatalystConverter(dataType)(data) 
=== convertedData)
   }
 
   test("convert Option[Product] to catalyst") {
@@ -268,7 +268,7 @@ class ScalaReflectionSuite extends SparkFunSuite {
     val dataType = schemaFor[OptionalData].dataType
     val convertedData = InternalRow(2, 2.toLong, 2.toDouble, 2.toFloat, 
2.toShort, 2.toByte, true,
       InternalRow(1, 1, 1, 1, 1, 1, true))
-    assert(CatalystTypeConverters.convertToCatalyst(data, dataType) === 
convertedData)
+    assert(CatalystTypeConverters.createToCatalystConverter(dataType)(data) 
=== convertedData)
   }
 
   test("infer schema from case class with multiple constructors") {

http://git-wip-us.apache.org/repos/asf/spark/blob/9db73ec1/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index 444916b..466258e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -1029,10 +1029,10 @@ class DataFrame private[sql](
 
     val elementTypes = schema.toAttributes.map { attr => (attr.dataType, 
attr.nullable) }
     val names = schema.toAttributes.map(_.name)
+    val convert = CatalystTypeConverters.createToCatalystConverter(schema)
 
     val rowFunction =
-      f.andThen(_.map(CatalystTypeConverters.convertToCatalyst(_, schema)
-        .asInstanceOf[InternalRow]))
+      f.andThen(_.map(convert(_).asInstanceOf[InternalRow]))
     val generator = UserDefinedGenerator(elementTypes, rowFunction, 
input.map(_.expr))
 
     Generate(generator, join = true, outer = false,
@@ -1059,8 +1059,8 @@ class DataFrame private[sql](
     val names = attributes.map(_.name)
 
     def rowFunction(row: Row): TraversableOnce[InternalRow] = {
-      f(row(0).asInstanceOf[A]).map(o =>
-        InternalRow(CatalystTypeConverters.convertToCatalyst(o, dataType)))
+      val convert = CatalystTypeConverters.createToCatalystConverter(dataType)
+      f(row(0).asInstanceOf[A]).map(o => InternalRow(convert(o)))
     }
     val generator = UserDefinedGenerator(elementTypes, rowFunction, 
apply(inputColumn).expr :: Nil)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/9db73ec1/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 9d1f89d..6b605f7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -536,12 +536,12 @@ class SQLContext(@transient val sparkContext: 
SparkContext)
         Class.forName(className, true, Utils.getContextOrSparkClassLoader))
       val extractors =
         localBeanInfo.getPropertyDescriptors.filterNot(_.getName == 
"class").map(_.getReadMethod)
-
+      val methodsToConverts = extractors.zip(attributeSeq).map { case (e, 
attr) =>
+        (e, CatalystTypeConverters.createToCatalystConverter(attr.dataType))
+      }
       iter.map { row =>
         new GenericRow(
-          extractors.zip(attributeSeq).map { case (e, attr) =>
-            CatalystTypeConverters.convertToCatalyst(e.invoke(row), 
attr.dataType)
-          }.toArray[Any]
+          methodsToConverts.map { case (e, convert) => convert(e.invoke(row)) 
}.toArray[Any]
         ) : InternalRow
       }
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/9db73ec1/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
index 653792e..c9dfcea 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
@@ -65,8 +65,8 @@ private[sql] case class ExecutedCommand(cmd: RunnableCommand) 
extends SparkPlan
   override def executeTake(limit: Int): Array[Row] = 
sideEffectResult.take(limit).toArray
 
   protected override def doExecute(): RDD[InternalRow] = {
-    val converted = sideEffectResult.map(r =>
-      CatalystTypeConverters.convertToCatalyst(r, 
schema).asInstanceOf[InternalRow])
+    val convert = CatalystTypeConverters.createToCatalystConverter(schema)
+    val converted = sideEffectResult.map(convert(_).asInstanceOf[InternalRow])
     sqlContext.sparkContext.parallelize(converted, 1)
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to