Repository: spark Updated Branches: refs/heads/master 3a710b94b -> 3eda05703
[SPARK-18295][SQL] Make to_json function null safe (matching it to from_json) ## What changes were proposed in this pull request? This PR proposes to match up the behaviour of `to_json` to `from_json` function for null-safety. Currently, it throws `NullPointException` but this PR fixes this to produce `null` instead. with the data below: ```scala import spark.implicits._ val df = Seq(Some(Tuple1(Tuple1(1))), None).toDF("a") df.show() ``` ``` +----+ | a| +----+ | [1]| |null| +----+ ``` the codes below ```scala import org.apache.spark.sql.functions._ df.select(to_json($"a")).show() ``` produces.. **Before** throws `NullPointException` as below: ``` java.lang.NullPointerException at org.apache.spark.sql.catalyst.json.JacksonGenerator.org$apache$spark$sql$catalyst$json$JacksonGenerator$$writeFields(JacksonGenerator.scala:138) at org.apache.spark.sql.catalyst.json.JacksonGenerator$$anonfun$write$1.apply$mcV$sp(JacksonGenerator.scala:194) at org.apache.spark.sql.catalyst.json.JacksonGenerator.org$apache$spark$sql$catalyst$json$JacksonGenerator$$writeObject(JacksonGenerator.scala:131) at org.apache.spark.sql.catalyst.json.JacksonGenerator.write(JacksonGenerator.scala:193) at org.apache.spark.sql.catalyst.expressions.StructToJson.eval(jsonExpressions.scala:544) at org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala:142) at org.apache.spark.sql.catalyst.expressions.InterpretedProjection.apply(Projection.scala:48) at org.apache.spark.sql.catalyst.expressions.InterpretedProjection.apply(Projection.scala:30) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) ``` **After** ``` +---------------+ |structtojson(a)| +---------------+ | {"_1":1}| | null| +---------------+ ``` ## How was this patch tested? Unit test in `JsonExpressionsSuite.scala` and `JsonFunctionsSuite.scala`. Author: hyukjinkwon <gurwls...@gmail.com> Closes #15792 from HyukjinKwon/SPARK-18295. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3eda0570 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3eda0570 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3eda0570 Branch: refs/heads/master Commit: 3eda05703f02413540f180ade01f0f114e70b9cc Parents: 3a710b9 Author: hyukjinkwon <gurwls...@gmail.com> Authored: Mon Nov 7 16:54:40 2016 -0800 Committer: Michael Armbrust <mich...@databricks.com> Committed: Mon Nov 7 16:54:40 2016 -0800 ---------------------------------------------------------------------- .../sql/catalyst/expressions/jsonExpressions.scala | 14 +++++--------- .../catalyst/expressions/JsonExpressionsSuite.scala | 13 +++++++++++-- .../org/apache/spark/sql/JsonFunctionsSuite.scala | 14 ++++++++++++++ 3 files changed, 30 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/3eda0570/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala index 89fe7c4..b61583d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala @@ -484,7 +484,7 @@ case class JsonTuple(children: Seq[Expression]) * Converts an json input string to a [[StructType]] with the specified schema. */ case class JsonToStruct(schema: StructType, options: Map[String, String], child: Expression) - extends Expression with CodegenFallback with ExpectsInputTypes { + extends UnaryExpression with CodegenFallback with ExpectsInputTypes { override def nullable: Boolean = true @transient @@ -495,11 +495,8 @@ case class JsonToStruct(schema: StructType, options: Map[String, String], child: new JSONOptions(options ++ Map("mode" -> ParseModes.FAIL_FAST_MODE))) override def dataType: DataType = schema - override def children: Seq[Expression] = child :: Nil - override def eval(input: InternalRow): Any = { - val json = child.eval(input) - if (json == null) return null + override def nullSafeEval(json: Any): Any = { try parser.parse(json.toString).head catch { case _: SparkSQLJsonProcessingException => null } @@ -512,7 +509,7 @@ case class JsonToStruct(schema: StructType, options: Map[String, String], child: * Converts a [[StructType]] to a json output string. */ case class StructToJson(options: Map[String, String], child: Expression) - extends Expression with CodegenFallback with ExpectsInputTypes { + extends UnaryExpression with CodegenFallback with ExpectsInputTypes { override def nullable: Boolean = true @transient @@ -523,7 +520,6 @@ case class StructToJson(options: Map[String, String], child: Expression) new JacksonGenerator(child.dataType.asInstanceOf[StructType], writer) override def dataType: DataType = StringType - override def children: Seq[Expression] = child :: Nil override def checkInputDataTypes(): TypeCheckResult = { if (StructType.acceptsType(child.dataType)) { @@ -540,8 +536,8 @@ case class StructToJson(options: Map[String, String], child: Expression) } } - override def eval(input: InternalRow): Any = { - gen.write(child.eval(input).asInstanceOf[InternalRow]) + override def nullSafeEval(row: Any): Any = { + gen.write(row.asInstanceOf[InternalRow]) gen.flush() val json = writer.toString writer.reset() http://git-wip-us.apache.org/repos/asf/spark/blob/3eda0570/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala index 3bfa0bf..3b0e908 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.expressions import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.util.ParseModes -import org.apache.spark.sql.types.{IntegerType, StructField, StructType} +import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} import org.apache.spark.unsafe.types.UTF8String class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { @@ -347,7 +347,7 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { test("from_json null input column") { val schema = StructType(StructField("a", IntegerType) :: Nil) checkEvaluation( - JsonToStruct(schema, Map.empty, Literal(null)), + JsonToStruct(schema, Map.empty, Literal.create(null, StringType)), null ) } @@ -360,4 +360,13 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { """{"a":1}""" ) } + + test("to_json null input column") { + val schema = StructType(StructField("a", IntegerType) :: Nil) + val struct = Literal.create(null, schema) + checkEvaluation( + StructToJson(Map.empty, struct), + null + ) + } } http://git-wip-us.apache.org/repos/asf/spark/blob/3eda0570/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala index 59ae889..7d63d31 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala @@ -141,4 +141,18 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext { assert(e.getMessage.contains( "Unable to convert column a of type calendarinterval to JSON.")) } + + test("roundtrip in to_json and from_json") { + val dfOne = Seq(Some(Tuple1(Tuple1(1))), None).toDF("struct") + val schemaOne = dfOne.schema(0).dataType.asInstanceOf[StructType] + val readBackOne = dfOne.select(to_json($"struct").as("json")) + .select(from_json($"json", schemaOne).as("struct")) + checkAnswer(dfOne, readBackOne) + + val dfTwo = Seq(Some("""{"a":1}"""), None).toDF("json") + val schemaTwo = new StructType().add("a", IntegerType) + val readBackTwo = dfTwo.select(from_json($"json", schemaTwo).as("struct")) + .select(to_json($"struct").as("json")) + checkAnswer(dfTwo, readBackTwo) + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org