Repository: spark
Updated Branches:
  refs/heads/branch-2.1 9873d57f2 -> 4af82d56f


[SPARK-18295][SQL] Make to_json function null safe (matching it to from_json)

## What changes were proposed in this pull request?

This PR proposes to match up the behaviour of `to_json` to `from_json` function 
for null-safety.

Currently, it throws `NullPointException` but this PR fixes this to produce 
`null` instead.

with the data below:

```scala
import spark.implicits._

val df = Seq(Some(Tuple1(Tuple1(1))), None).toDF("a")
df.show()
```

```
+----+
|   a|
+----+
| [1]|
|null|
+----+
```

the codes below

```scala
import org.apache.spark.sql.functions._

df.select(to_json($"a")).show()
```

produces..

**Before**

throws `NullPointException` as below:

```
java.lang.NullPointerException
  at 
org.apache.spark.sql.catalyst.json.JacksonGenerator.org$apache$spark$sql$catalyst$json$JacksonGenerator$$writeFields(JacksonGenerator.scala:138)
  at 
org.apache.spark.sql.catalyst.json.JacksonGenerator$$anonfun$write$1.apply$mcV$sp(JacksonGenerator.scala:194)
  at 
org.apache.spark.sql.catalyst.json.JacksonGenerator.org$apache$spark$sql$catalyst$json$JacksonGenerator$$writeObject(JacksonGenerator.scala:131)
  at 
org.apache.spark.sql.catalyst.json.JacksonGenerator.write(JacksonGenerator.scala:193)
  at 
org.apache.spark.sql.catalyst.expressions.StructToJson.eval(jsonExpressions.scala:544)
  at 
org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala:142)
  at 
org.apache.spark.sql.catalyst.expressions.InterpretedProjection.apply(Projection.scala:48)
  at 
org.apache.spark.sql.catalyst.expressions.InterpretedProjection.apply(Projection.scala:30)
  at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
```

**After**

```
+---------------+
|structtojson(a)|
+---------------+
|       {"_1":1}|
|           null|
+---------------+
```

## How was this patch tested?

Unit test in `JsonExpressionsSuite.scala` and `JsonFunctionsSuite.scala`.

Author: hyukjinkwon <gurwls...@gmail.com>

Closes #15792 from HyukjinKwon/SPARK-18295.

(cherry picked from commit 3eda05703f02413540f180ade01f0f114e70b9cc)
Signed-off-by: Michael Armbrust <mich...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4af82d56
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4af82d56
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4af82d56

Branch: refs/heads/branch-2.1
Commit: 4af82d56f79ac3cceb08b702413ae2b35dfea48b
Parents: 9873d57
Author: hyukjinkwon <gurwls...@gmail.com>
Authored: Mon Nov 7 16:54:40 2016 -0800
Committer: Michael Armbrust <mich...@databricks.com>
Committed: Mon Nov 7 16:54:57 2016 -0800

----------------------------------------------------------------------
 .../sql/catalyst/expressions/jsonExpressions.scala    | 14 +++++---------
 .../catalyst/expressions/JsonExpressionsSuite.scala   | 13 +++++++++++--
 .../org/apache/spark/sql/JsonFunctionsSuite.scala     | 14 ++++++++++++++
 3 files changed, 30 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/4af82d56/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
index 89fe7c4..b61583d 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
@@ -484,7 +484,7 @@ case class JsonTuple(children: Seq[Expression])
  * Converts an json input string to a [[StructType]] with the specified schema.
  */
 case class JsonToStruct(schema: StructType, options: Map[String, String], 
child: Expression)
-  extends Expression with CodegenFallback with ExpectsInputTypes {
+  extends UnaryExpression with CodegenFallback with ExpectsInputTypes {
   override def nullable: Boolean = true
 
   @transient
@@ -495,11 +495,8 @@ case class JsonToStruct(schema: StructType, options: 
Map[String, String], child:
       new JSONOptions(options ++ Map("mode" -> ParseModes.FAIL_FAST_MODE)))
 
   override def dataType: DataType = schema
-  override def children: Seq[Expression] = child :: Nil
 
-  override def eval(input: InternalRow): Any = {
-    val json = child.eval(input)
-    if (json == null) return null
+  override def nullSafeEval(json: Any): Any = {
     try parser.parse(json.toString).head catch {
       case _: SparkSQLJsonProcessingException => null
     }
@@ -512,7 +509,7 @@ case class JsonToStruct(schema: StructType, options: 
Map[String, String], child:
  * Converts a [[StructType]] to a json output string.
  */
 case class StructToJson(options: Map[String, String], child: Expression)
-  extends Expression with CodegenFallback with ExpectsInputTypes {
+  extends UnaryExpression with CodegenFallback with ExpectsInputTypes {
   override def nullable: Boolean = true
 
   @transient
@@ -523,7 +520,6 @@ case class StructToJson(options: Map[String, String], 
child: Expression)
     new JacksonGenerator(child.dataType.asInstanceOf[StructType], writer)
 
   override def dataType: DataType = StringType
-  override def children: Seq[Expression] = child :: Nil
 
   override def checkInputDataTypes(): TypeCheckResult = {
     if (StructType.acceptsType(child.dataType)) {
@@ -540,8 +536,8 @@ case class StructToJson(options: Map[String, String], 
child: Expression)
     }
   }
 
-  override def eval(input: InternalRow): Any = {
-    gen.write(child.eval(input).asInstanceOf[InternalRow])
+  override def nullSafeEval(row: Any): Any = {
+    gen.write(row.asInstanceOf[InternalRow])
     gen.flush()
     val json = writer.toString
     writer.reset()

http://git-wip-us.apache.org/repos/asf/spark/blob/4af82d56/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
index 3bfa0bf..3b0e908 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.expressions
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.util.ParseModes
-import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
+import org.apache.spark.sql.types.{IntegerType, StringType, StructField, 
StructType}
 import org.apache.spark.unsafe.types.UTF8String
 
 class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
@@ -347,7 +347,7 @@ class JsonExpressionsSuite extends SparkFunSuite with 
ExpressionEvalHelper {
   test("from_json null input column") {
     val schema = StructType(StructField("a", IntegerType) :: Nil)
     checkEvaluation(
-      JsonToStruct(schema, Map.empty, Literal(null)),
+      JsonToStruct(schema, Map.empty, Literal.create(null, StringType)),
       null
     )
   }
@@ -360,4 +360,13 @@ class JsonExpressionsSuite extends SparkFunSuite with 
ExpressionEvalHelper {
       """{"a":1}"""
     )
   }
+
+  test("to_json null input column") {
+    val schema = StructType(StructField("a", IntegerType) :: Nil)
+    val struct = Literal.create(null, schema)
+    checkEvaluation(
+      StructToJson(Map.empty, struct),
+      null
+    )
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/4af82d56/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
index 59ae889..7d63d31 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
@@ -141,4 +141,18 @@ class JsonFunctionsSuite extends QueryTest with 
SharedSQLContext {
     assert(e.getMessage.contains(
       "Unable to convert column a of type calendarinterval to JSON."))
   }
+
+  test("roundtrip in to_json and from_json") {
+    val dfOne = Seq(Some(Tuple1(Tuple1(1))), None).toDF("struct")
+    val schemaOne = dfOne.schema(0).dataType.asInstanceOf[StructType]
+    val readBackOne = dfOne.select(to_json($"struct").as("json"))
+      .select(from_json($"json", schemaOne).as("struct"))
+    checkAnswer(dfOne, readBackOne)
+
+    val dfTwo = Seq(Some("""{"a":1}"""), None).toDF("json")
+    val schemaTwo = new StructType().add("a", IntegerType)
+    val readBackTwo = dfTwo.select(from_json($"json", schemaTwo).as("struct"))
+      .select(to_json($"struct").as("json"))
+    checkAnswer(dfTwo, readBackTwo)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to