Repository: spark Updated Branches: refs/heads/master 3b6107704 -> 9db73ec12
[SPARK-8381][SQL]reuse typeConvert when convert Seq[Row] to catalyst type reuse-typeConvert when convert Seq[Row] to CatalystType Author: Lianhui Wang <lianhuiwan...@gmail.com> Closes #6831 from lianhuiwang/reuse-typeConvert and squashes the following commits: 1fec395 [Lianhui Wang] remove CatalystTypeConverters.convertToCatalyst 714462d [Lianhui Wang] add package[sql] 9d1fbf3 [Lianhui Wang] address JoshRosen's comments 768956f [Lianhui Wang] update scala style 4498c62 [Lianhui Wang] reuse typeConvert Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9db73ec1 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9db73ec1 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9db73ec1 Branch: refs/heads/master Commit: 9db73ec12412f6809030546cf69dcb32d2c8e0fe Parents: 3b61077 Author: Lianhui Wang <lianhuiwan...@gmail.com> Authored: Wed Jun 17 22:52:47 2015 -0700 Committer: Josh Rosen <joshro...@databricks.com> Committed: Wed Jun 17 22:52:47 2015 -0700 ---------------------------------------------------------------------- .../spark/sql/catalyst/CatalystTypeConverters.scala | 10 ---------- .../apache/spark/sql/catalyst/ScalaReflectionSuite.scala | 4 ++-- .../src/main/scala/org/apache/spark/sql/DataFrame.scala | 8 ++++---- .../src/main/scala/org/apache/spark/sql/SQLContext.scala | 8 ++++---- .../scala/org/apache/spark/sql/execution/commands.scala | 4 ++-- 5 files changed, 12 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/9db73ec1/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala index 6175456..620e8de 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala @@ -336,16 +336,6 @@ object CatalystTypeConverters { } /** - * Converts Scala objects to catalyst rows / types. This method is slow, and for batch - * conversion you should be using converter produced by createToCatalystConverter. - * Note: This is always called after schemaFor has been called. - * This ordering is important for UDT registration. - */ - def convertToCatalyst(scalaValue: Any, dataType: DataType): Any = { - getConverterForType(dataType).toCatalyst(scalaValue) - } - - /** * Creates a converter function that will convert Scala objects to the specified Catalyst type. * Typical use case would be converting a collection of rows that have the same schema. You will * call this function once to get a converter, and apply it to every row. http://git-wip-us.apache.org/repos/asf/spark/blob/9db73ec1/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala index c2d739b..b4b00f5 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala @@ -258,7 +258,7 @@ class ScalaReflectionSuite extends SparkFunSuite { val data = PrimitiveData(1, 1, 1, 1, 1, 1, true) val convertedData = InternalRow(1, 1.toLong, 1.toDouble, 1.toFloat, 1.toShort, 1.toByte, true) val dataType = schemaFor[PrimitiveData].dataType - assert(CatalystTypeConverters.convertToCatalyst(data, dataType) === convertedData) + assert(CatalystTypeConverters.createToCatalystConverter(dataType)(data) === convertedData) } test("convert Option[Product] to catalyst") { @@ -268,7 +268,7 @@ class ScalaReflectionSuite extends SparkFunSuite { val dataType = schemaFor[OptionalData].dataType val convertedData = InternalRow(2, 2.toLong, 2.toDouble, 2.toFloat, 2.toShort, 2.toByte, true, InternalRow(1, 1, 1, 1, 1, 1, true)) - assert(CatalystTypeConverters.convertToCatalyst(data, dataType) === convertedData) + assert(CatalystTypeConverters.createToCatalystConverter(dataType)(data) === convertedData) } test("infer schema from case class with multiple constructors") { http://git-wip-us.apache.org/repos/asf/spark/blob/9db73ec1/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index 444916b..466258e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -1029,10 +1029,10 @@ class DataFrame private[sql]( val elementTypes = schema.toAttributes.map { attr => (attr.dataType, attr.nullable) } val names = schema.toAttributes.map(_.name) + val convert = CatalystTypeConverters.createToCatalystConverter(schema) val rowFunction = - f.andThen(_.map(CatalystTypeConverters.convertToCatalyst(_, schema) - .asInstanceOf[InternalRow])) + f.andThen(_.map(convert(_).asInstanceOf[InternalRow])) val generator = UserDefinedGenerator(elementTypes, rowFunction, input.map(_.expr)) Generate(generator, join = true, outer = false, @@ -1059,8 +1059,8 @@ class DataFrame private[sql]( val names = attributes.map(_.name) def rowFunction(row: Row): TraversableOnce[InternalRow] = { - f(row(0).asInstanceOf[A]).map(o => - InternalRow(CatalystTypeConverters.convertToCatalyst(o, dataType))) + val convert = CatalystTypeConverters.createToCatalystConverter(dataType) + f(row(0).asInstanceOf[A]).map(o => InternalRow(convert(o))) } val generator = UserDefinedGenerator(elementTypes, rowFunction, apply(inputColumn).expr :: Nil) http://git-wip-us.apache.org/repos/asf/spark/blob/9db73ec1/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 9d1f89d..6b605f7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -536,12 +536,12 @@ class SQLContext(@transient val sparkContext: SparkContext) Class.forName(className, true, Utils.getContextOrSparkClassLoader)) val extractors = localBeanInfo.getPropertyDescriptors.filterNot(_.getName == "class").map(_.getReadMethod) - + val methodsToConverts = extractors.zip(attributeSeq).map { case (e, attr) => + (e, CatalystTypeConverters.createToCatalystConverter(attr.dataType)) + } iter.map { row => new GenericRow( - extractors.zip(attributeSeq).map { case (e, attr) => - CatalystTypeConverters.convertToCatalyst(e.invoke(row), attr.dataType) - }.toArray[Any] + methodsToConverts.map { case (e, convert) => convert(e.invoke(row)) }.toArray[Any] ) : InternalRow } } http://git-wip-us.apache.org/repos/asf/spark/blob/9db73ec1/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala index 653792e..c9dfcea 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala @@ -65,8 +65,8 @@ private[sql] case class ExecutedCommand(cmd: RunnableCommand) extends SparkPlan override def executeTake(limit: Int): Array[Row] = sideEffectResult.take(limit).toArray protected override def doExecute(): RDD[InternalRow] = { - val converted = sideEffectResult.map(r => - CatalystTypeConverters.convertToCatalyst(r, schema).asInstanceOf[InternalRow]) + val convert = CatalystTypeConverters.createToCatalystConverter(schema) + val converted = sideEffectResult.map(convert(_).asInstanceOf[InternalRow]) sqlContext.sparkContext.parallelize(converted, 1) } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org