Repository: spark
Updated Branches:
  refs/heads/master 341b13f8f -> b3862d3c5


[SPARK-10705] [SQL] Avoid using external rows in DataFrame.toJSON

JIRA: https://issues.apache.org/jira/browse/SPARK-10705

As described in the JIRA ticket, `DataFrame.toJSON` uses 
`DataFrame.mapPartitions`, which converts internal rows to external rows. We 
should use `queryExecution.toRdd.mapPartitions` that directly uses internal 
rows for better performance.

Author: Liang-Chi Hsieh <vii...@appier.com>

Closes #8865 from viirya/df-tojson-internalrow.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b3862d3c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b3862d3c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b3862d3c

Branch: refs/heads/master
Commit: b3862d3c59746ffb5f089aea4ff9e6f033a2c658
Parents: 341b13f
Author: Liang-Chi Hsieh <vii...@appier.com>
Authored: Thu Sep 24 12:52:11 2015 -0700
Committer: Yin Huai <yh...@databricks.com>
Committed: Thu Sep 24 12:52:11 2015 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/sql/DataFrame.scala  |  2 +-
 .../datasources/json/JSONRelation.scala         |  2 +-
 .../datasources/json/JacksonGenerator.scala     | 84 +-------------------
 3 files changed, 3 insertions(+), 85 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b3862d3c/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index a11140b..f9995da 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -1564,7 +1564,7 @@ class DataFrame private[sql](
    */
   def toJSON: RDD[String] = {
     val rowSchema = this.schema
-    this.mapPartitions { iter =>
+    queryExecution.toRdd.mapPartitions { iter =>
       val writer = new CharArrayWriter()
       // create the Generator without separator inserted between 2 records
       val gen = new 
JsonFactory().createGenerator(writer).setRootValueSeparator(null)

http://git-wip-us.apache.org/repos/asf/spark/blob/b3862d3c/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
index 8ee0127..d05e6ef 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
@@ -182,7 +182,7 @@ private[json] class JsonOutputWriter(
   override def write(row: Row): Unit = throw new 
UnsupportedOperationException("call writeInternal")
 
   override protected[sql] def writeInternal(row: InternalRow): Unit = {
-    JacksonGenerator(dataSchema, gen, row)
+    JacksonGenerator(dataSchema, gen)(row)
     gen.flush()
 
     result.set(writer.toString)

http://git-wip-us.apache.org/repos/asf/spark/blob/b3862d3c/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala
index 23bada1..d7d6ede 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala
@@ -28,88 +28,6 @@ import org.apache.spark.sql.Row
 import org.apache.spark.sql.types._
 
 private[sql] object JacksonGenerator {
-  /** Transforms a single Row to JSON using Jackson
-    *
-    * @param rowSchema the schema object used for conversion
-    * @param gen a JsonGenerator object
-    * @param row The row to convert
-    */
-  def apply(rowSchema: StructType, gen: JsonGenerator)(row: Row): Unit = {
-    def valWriter: (DataType, Any) => Unit = {
-      case (_, null) | (NullType, _) => gen.writeNull()
-      case (StringType, v: String) => gen.writeString(v)
-      case (TimestampType, v: java.sql.Timestamp) => 
gen.writeString(v.toString)
-      case (IntegerType, v: Int) => gen.writeNumber(v)
-      case (ShortType, v: Short) => gen.writeNumber(v)
-      case (FloatType, v: Float) => gen.writeNumber(v)
-      case (DoubleType, v: Double) => gen.writeNumber(v)
-      case (LongType, v: Long) => gen.writeNumber(v)
-      case (DecimalType(), v: java.math.BigDecimal) => gen.writeNumber(v)
-      case (ByteType, v: Byte) => gen.writeNumber(v.toInt)
-      case (BinaryType, v: Array[Byte]) => gen.writeBinary(v)
-      case (BooleanType, v: Boolean) => gen.writeBoolean(v)
-      case (DateType, v) => gen.writeString(v.toString)
-      case (udt: UserDefinedType[_], v) => valWriter(udt.sqlType, 
udt.serialize(v))
-
-      case (ArrayType(ty, _), v: Seq[_]) =>
-        gen.writeStartArray()
-        v.foreach(valWriter(ty, _))
-        gen.writeEndArray()
-
-      case (MapType(kv, vv, _), v: Map[_, _]) =>
-        gen.writeStartObject()
-        v.foreach { p =>
-          gen.writeFieldName(p._1.toString)
-          valWriter(vv, p._2)
-        }
-        gen.writeEndObject()
-
-      case (StructType(ty), v: Row) =>
-        gen.writeStartObject()
-        ty.zip(v.toSeq).foreach {
-          case (_, null) =>
-          case (field, v) =>
-            gen.writeFieldName(field.name)
-            valWriter(field.dataType, v)
-        }
-        gen.writeEndObject()
-
-      // For UDT, udt.serialize will produce SQL types. So, we need the 
following three cases.
-      case (ArrayType(ty, _), v: ArrayData) =>
-        gen.writeStartArray()
-        v.foreach(ty, (_, value) => valWriter(ty, value))
-        gen.writeEndArray()
-
-      case (MapType(kt, vt, _), v: MapData) =>
-        gen.writeStartObject()
-        v.foreach(kt, vt, { (k, v) =>
-          gen.writeFieldName(k.toString)
-          valWriter(vt, v)
-        })
-        gen.writeEndObject()
-
-      case (StructType(ty), v: InternalRow) =>
-        gen.writeStartObject()
-        var i = 0
-        while (i < ty.length) {
-          val field = ty(i)
-          val value = v.get(i, field.dataType)
-          if (value != null) {
-            gen.writeFieldName(field.name)
-            valWriter(field.dataType, value)
-          }
-          i += 1
-        }
-        gen.writeEndObject()
-
-      case (dt, v) =>
-        sys.error(
-          s"Failed to convert value $v (class of ${v.getClass}}) with the type 
of $dt to JSON.")
-    }
-
-    valWriter(rowSchema, row)
-  }
-
   /** Transforms a single InternalRow to JSON using Jackson
    *
    * TODO: make the code shared with the other apply method.
@@ -118,7 +36,7 @@ private[sql] object JacksonGenerator {
    * @param gen a JsonGenerator object
    * @param row The row to convert
    */
-  def apply(rowSchema: StructType, gen: JsonGenerator, row: InternalRow): Unit 
= {
+  def apply(rowSchema: StructType, gen: JsonGenerator)(row: InternalRow): Unit 
= {
     def valWriter: (DataType, Any) => Unit = {
       case (_, null) | (NullType, _) => gen.writeNull()
       case (StringType, v) => gen.writeString(v.toString)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to