Repository: spark Updated Branches: refs/heads/master 0698e6c88 -> 65accb813
[SPARK-17029] make toJSON not go through rdd form but operate on dataset always ## What changes were proposed in this pull request? Don't convert toRdd when doing toJSON ## How was this patch tested? Existing unit tests Author: Robert Kruszewski <robe...@palantir.com> Closes #14615 from robert3005/robertk/correct-tojson. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/65accb81 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/65accb81 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/65accb81 Branch: refs/heads/master Commit: 65accb813add9f58c1e9f1555863fe0bb1932ad8 Parents: 0698e6c Author: Robert Kruszewski <robe...@palantir.com> Authored: Thu May 11 15:26:48 2017 +0800 Committer: Wenchen Fan <wenc...@databricks.com> Committed: Thu May 11 15:26:48 2017 +0800 ---------------------------------------------------------------------- .../src/main/scala/org/apache/spark/sql/Dataset.scala | 8 +++----- .../spark/sql/execution/datasources/json/JsonSuite.scala | 10 ++++++++++ 2 files changed, 13 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/65accb81/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 61154e2..c75921e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -2806,7 +2806,7 @@ class Dataset[T] private[sql]( def toJSON: Dataset[String] = { val rowSchema = this.schema val sessionLocalTimeZone = sparkSession.sessionState.conf.sessionLocalTimeZone - val rdd: RDD[String] = queryExecution.toRdd.mapPartitions { iter => + mapPartitions { iter => val writer = new CharArrayWriter() // create the Generator without separator inserted between 2 records val gen = new JacksonGenerator(rowSchema, writer, @@ -2815,7 +2815,7 @@ class Dataset[T] private[sql]( new Iterator[String] { override def hasNext: Boolean = iter.hasNext override def next(): String = { - gen.write(iter.next()) + gen.write(exprEnc.toRow(iter.next())) gen.flush() val json = writer.toString @@ -2828,9 +2828,7 @@ class Dataset[T] private[sql]( json } } - } - import sparkSession.implicits.newStringEncoder - sparkSession.createDataset(rdd) + } (Encoders.STRING) } /** http://git-wip-us.apache.org/repos/asf/spark/blob/65accb81/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 2ab0381..5e7f794 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -31,6 +31,7 @@ import org.apache.spark.SparkException import org.apache.spark.sql.{functions => F, _} import org.apache.spark.sql.catalyst.json.{CreateJacksonParser, JacksonParser, JSONOptions} import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.execution.ExternalRDD import org.apache.spark.sql.execution.datasources.DataSource import org.apache.spark.sql.execution.datasources.json.JsonInferSchema.compatibleType import org.apache.spark.sql.internal.SQLConf @@ -1326,6 +1327,15 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { ) } + test("Dataset toJSON doesn't construct rdd") { + val containsRDD = spark.emptyDataFrame.toJSON.queryExecution.logical.find { + case ExternalRDD(_, _) => true + case _ => false + } + + assert(containsRDD.isEmpty, "Expected logical plan of toJSON to not contain an RDD") + } + test("JSONRelation equality test") { withTempPath(dir => { val path = dir.getCanonicalFile.toURI.toString --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org