This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 033ca3e7dd5b [SPARK-47922][SQL] Implement the try_parse_json expression 033ca3e7dd5b is described below commit 033ca3e7dd5bd9ceadd9219c2bde105bea301e70 Author: Harsh Motwani <harsh.motw...@databricks.com> AuthorDate: Fri Apr 26 14:35:59 2024 +0800 [SPARK-47922][SQL] Implement the try_parse_json expression ### What changes were proposed in this pull request? This pull request implements the `try_parse_json` that runs `parse_json` on string expressions to extract variants. However, if `parse_json` throws an exception on a row, the value `null` is returned. ### Why are the changes needed? Sometimes, columns containing JSON strings may contain some invalid inputs that should be ignored instead of having the whole execution failed because of it. ### Does this PR introduce _any_ user-facing change? Yes, it allows users to run the `try_parse_json` expression. ### How was this patch tested? Unit tests to check if `try_parse_json` works just like `parse_json` on valid inputs, returns `null` on invalid inputs, and fails on incorrect input data types. ### Was this patch authored or co-authored using generative AI tooling? No Closes #46141 from harshmotw-db/try_parse_json. Authored-by: Harsh Motwani <harsh.motw...@databricks.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../scala/org/apache/spark/sql/functions.scala | 12 ++++ .../apache/spark/sql/PlanGenerationTestSuite.scala | 4 ++ .../function_is_variant_null.explain | 2 +- .../explain-results/function_parse_json.explain | 2 +- .../function_schema_of_variant.explain | 2 +- .../function_schema_of_variant_agg.explain | 2 +- ...son.explain => function_try_parse_json.explain} | 2 +- .../function_try_variant_get.explain | 2 +- .../explain-results/function_variant_get.explain | 2 +- .../queries/function_try_parse_json.json | 25 ++++++++ .../queries/function_try_parse_json.proto.bin | Bin 0 -> 183 bytes .../source/reference/pyspark.sql/functions.rst | 1 + python/pyspark/sql/connect/functions/builtin.py | 7 +++ python/pyspark/sql/functions/builtin.py | 34 +++++++++- python/pyspark/sql/tests/test_functions.py | 8 +++ .../sql/catalyst/analysis/FunctionRegistry.scala | 3 +- .../variant/VariantExpressionEvalUtils.scala | 16 +++-- .../expressions/variant/variantExpressions.scala | 69 ++++++++++++++++----- .../variant/VariantExpressionEvalUtilsSuite.scala | 9 ++- .../scala/org/apache/spark/sql/functions.scala | 11 ++++ .../sql-functions/sql-expression-schema.md | 3 +- .../apache/spark/sql/VariantEndToEndSuite.scala | 35 +++++++++++ .../scala/org/apache/spark/sql/VariantSuite.scala | 9 +++ 23 files changed, 229 insertions(+), 31 deletions(-) diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala index 5a7880c87431..6471d15b63ab 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala @@ -7000,6 +7000,18 @@ object functions { fnWithOptions("from_json", options, e, schema) } + /** + * Parses a JSON string and constructs a Variant value. Returns null if the input string is not + * a valid JSON value. + * + * @param json + * a string column that contains JSON data. + * + * @group variant_funcs + * @since 4.0.0 + */ + def try_parse_json(json: Column): Column = Column.fn("try_parse_json", json) + /** * Parses a JSON string and constructs a Variant value. * diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala index 1005561b24ac..ebf4ee0e9073 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala @@ -2489,6 +2489,10 @@ class PlanGenerationTestSuite Collections.singletonMap("allowNumericLeadingZeros", "true")) } + functionTest("try_parse_json") { + fn.try_parse_json(fn.col("g")) + } + functionTest("to_json") { fn.to_json(fn.col("d"), Map(("timestampFormat", "dd/MM/yyyy"))) } diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_is_variant_null.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_is_variant_null.explain index 1273807d15c3..53ba167fca65 100644 --- a/connector/connect/common/src/test/resources/query-tests/explain-results/function_is_variant_null.explain +++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_is_variant_null.explain @@ -1,2 +1,2 @@ -Project [staticinvoke(class org.apache.spark.sql.catalyst.expressions.variant.VariantExpressionEvalUtils$, BooleanType, isVariantNull, staticinvoke(class org.apache.spark.sql.catalyst.expressions.variant.VariantExpressionEvalUtils$, VariantType, parseJson, g#0, StringType, true, false, true), VariantType, false, false, true) AS is_variant_null(parse_json(g))#0] +Project [staticinvoke(class org.apache.spark.sql.catalyst.expressions.variant.VariantExpressionEvalUtils$, BooleanType, isVariantNull, staticinvoke(class org.apache.spark.sql.catalyst.expressions.variant.VariantExpressionEvalUtils$, VariantType, parseJson, g#0, true, StringType, BooleanType, true, false, true), VariantType, false, false, true) AS is_variant_null(parse_json(g))#0] +- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0] diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_parse_json.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_parse_json.explain index 061aafbadb07..b844d19c85ac 100644 --- a/connector/connect/common/src/test/resources/query-tests/explain-results/function_parse_json.explain +++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_parse_json.explain @@ -1,2 +1,2 @@ -Project [staticinvoke(class org.apache.spark.sql.catalyst.expressions.variant.VariantExpressionEvalUtils$, VariantType, parseJson, g#0, StringType, true, false, true) AS parse_json(g)#0] +Project [staticinvoke(class org.apache.spark.sql.catalyst.expressions.variant.VariantExpressionEvalUtils$, VariantType, parseJson, g#0, true, StringType, BooleanType, true, false, true) AS parse_json(g)#0] +- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0] diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_schema_of_variant.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_schema_of_variant.explain index 5ba5d96425db..62f8e7f3e6fe 100644 --- a/connector/connect/common/src/test/resources/query-tests/explain-results/function_schema_of_variant.explain +++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_schema_of_variant.explain @@ -1,2 +1,2 @@ -Project [staticinvoke(class org.apache.spark.sql.catalyst.expressions.variant.SchemaOfVariant$, StringType, schemaOfVariant, staticinvoke(class org.apache.spark.sql.catalyst.expressions.variant.VariantExpressionEvalUtils$, VariantType, parseJson, g#0, StringType, true, false, true), VariantType, true, false, true) AS schema_of_variant(parse_json(g))#0] +Project [staticinvoke(class org.apache.spark.sql.catalyst.expressions.variant.SchemaOfVariant$, StringType, schemaOfVariant, staticinvoke(class org.apache.spark.sql.catalyst.expressions.variant.VariantExpressionEvalUtils$, VariantType, parseJson, g#0, true, StringType, BooleanType, true, false, true), VariantType, true, false, true) AS schema_of_variant(parse_json(g))#0] +- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0] diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_schema_of_variant_agg.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_schema_of_variant_agg.explain index b3a39efe8c50..d4f9e2c66d99 100644 --- a/connector/connect/common/src/test/resources/query-tests/explain-results/function_schema_of_variant_agg.explain +++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_schema_of_variant_agg.explain @@ -1,2 +1,2 @@ -Aggregate [schema_of_variant_agg(staticinvoke(class org.apache.spark.sql.catalyst.expressions.variant.VariantExpressionEvalUtils$, VariantType, parseJson, g#0, StringType, true, false, true), 0, 0) AS schema_of_variant_agg(parse_json(g))#0] +Aggregate [schema_of_variant_agg(staticinvoke(class org.apache.spark.sql.catalyst.expressions.variant.VariantExpressionEvalUtils$, VariantType, parseJson, g#0, true, StringType, BooleanType, true, false, true), 0, 0) AS schema_of_variant_agg(parse_json(g))#0] +- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0] diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_parse_json.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_try_parse_json.explain similarity index 69% copy from connector/connect/common/src/test/resources/query-tests/explain-results/function_parse_json.explain copy to connector/connect/common/src/test/resources/query-tests/explain-results/function_try_parse_json.explain index 061aafbadb07..1772b5d37623 100644 --- a/connector/connect/common/src/test/resources/query-tests/explain-results/function_parse_json.explain +++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_try_parse_json.explain @@ -1,2 +1,2 @@ -Project [staticinvoke(class org.apache.spark.sql.catalyst.expressions.variant.VariantExpressionEvalUtils$, VariantType, parseJson, g#0, StringType, true, false, true) AS parse_json(g)#0] +Project [staticinvoke(class org.apache.spark.sql.catalyst.expressions.variant.VariantExpressionEvalUtils$, VariantType, parseJson, g#0, false, StringType, BooleanType, true, false, true) AS try_parse_json(g)#0] +- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0] diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_try_variant_get.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_try_variant_get.explain index e590a9b2f45c..748465142bde 100644 --- a/connector/connect/common/src/test/resources/query-tests/explain-results/function_try_variant_get.explain +++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_try_variant_get.explain @@ -1,2 +1,2 @@ -Project [try_variant_get(staticinvoke(class org.apache.spark.sql.catalyst.expressions.variant.VariantExpressionEvalUtils$, VariantType, parseJson, g#0, StringType, true, false, true), $, IntegerType, false, Some(America/Los_Angeles)) AS try_variant_get(parse_json(g), $)#0] +Project [try_variant_get(staticinvoke(class org.apache.spark.sql.catalyst.expressions.variant.VariantExpressionEvalUtils$, VariantType, parseJson, g#0, true, StringType, BooleanType, true, false, true), $, IntegerType, false, Some(America/Los_Angeles)) AS try_variant_get(parse_json(g), $)#0] +- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0] diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_variant_get.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_variant_get.explain index 13fb6abc7bad..3503ee178ca7 100644 --- a/connector/connect/common/src/test/resources/query-tests/explain-results/function_variant_get.explain +++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_variant_get.explain @@ -1,2 +1,2 @@ -Project [variant_get(staticinvoke(class org.apache.spark.sql.catalyst.expressions.variant.VariantExpressionEvalUtils$, VariantType, parseJson, g#0, StringType, true, false, true), $, IntegerType, true, Some(America/Los_Angeles)) AS variant_get(parse_json(g), $)#0] +Project [variant_get(staticinvoke(class org.apache.spark.sql.catalyst.expressions.variant.VariantExpressionEvalUtils$, VariantType, parseJson, g#0, true, StringType, BooleanType, true, false, true), $, IntegerType, true, Some(America/Los_Angeles)) AS variant_get(parse_json(g), $)#0] +- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0] diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_try_parse_json.json b/connector/connect/common/src/test/resources/query-tests/queries/function_try_parse_json.json new file mode 100644 index 000000000000..91177eb4a585 --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/queries/function_try_parse_json.json @@ -0,0 +1,25 @@ +{ + "common": { + "planId": "1" + }, + "project": { + "input": { + "common": { + "planId": "0" + }, + "localRelation": { + "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e" + } + }, + "expressions": [{ + "unresolvedFunction": { + "functionName": "try_parse_json", + "arguments": [{ + "unresolvedAttribute": { + "unparsedIdentifier": "g" + } + }] + } + }] + } +} \ No newline at end of file diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_try_parse_json.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_try_parse_json.proto.bin new file mode 100644 index 000000000000..cc1f159cfd78 Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/function_try_parse_json.proto.bin differ diff --git a/python/docs/source/reference/pyspark.sql/functions.rst b/python/docs/source/reference/pyspark.sql/functions.rst index d293e34f0a79..40af3d52e653 100644 --- a/python/docs/source/reference/pyspark.sql/functions.rst +++ b/python/docs/source/reference/pyspark.sql/functions.rst @@ -547,6 +547,7 @@ VARIANT Functions schema_of_variant_agg try_variant_get variant_get + try_parse_json XML Functions diff --git a/python/pyspark/sql/connect/functions/builtin.py b/python/pyspark/sql/connect/functions/builtin.py index cbbad941bf29..ab3bcfcba4d3 100644 --- a/python/pyspark/sql/connect/functions/builtin.py +++ b/python/pyspark/sql/connect/functions/builtin.py @@ -2040,6 +2040,13 @@ def str_to_map( str_to_map.__doc__ = pysparkfuncs.str_to_map.__doc__ +def try_parse_json(col: "ColumnOrName") -> Column: + return _invoke_function("try_parse_json", _to_col(col)) + + +try_parse_json.__doc__ = pysparkfuncs.try_parse_json.__doc__ + + def parse_json(col: "ColumnOrName") -> Column: return _invoke_function("parse_json", _to_col(col)) diff --git a/python/pyspark/sql/functions/builtin.py b/python/pyspark/sql/functions/builtin.py index b54b377aaebc..64f39e352642 100644 --- a/python/pyspark/sql/functions/builtin.py +++ b/python/pyspark/sql/functions/builtin.py @@ -15594,12 +15594,44 @@ def from_json( return _invoke_function("from_json", _to_java_column(col), schema, _options_to_str(options)) +@_try_remote_functions +def try_parse_json( + col: "ColumnOrName", +) -> Column: + """ + Parses a column containing a JSON string into a :class:`VariantType`. Returns None if a string + contains an invalid JSON value. + + .. versionadded:: 4.0.0 + + Parameters + ---------- + col : :class:`~pyspark.sql.Column` or str + a column or column name JSON formatted strings + + Returns + ------- + :class:`~pyspark.sql.Column` + a new column of VariantType. + + Examples + -------- + >>> df = spark.createDataFrame([ {'json': '''{ "a" : 1 }'''}, {'json': '''{a : 1}'''} ]) + >>> df.select(to_json(try_parse_json(df.json))).collect() + [Row(to_json(try_parse_json(json))='{"a":1}'), Row(to_json(try_parse_json(json))=None)] + """ + from pyspark.sql.classic.column import _to_java_column + + return _invoke_function("try_parse_json", _to_java_column(col)) + + @_try_remote_functions def parse_json( col: "ColumnOrName", ) -> Column: """ - Parses a column containing a JSON string into a :class:`VariantType`. + Parses a column containing a JSON string into a :class:`VariantType`. Throws exception if a + string represents an invalid JSON value. .. versionadded:: 4.0.0 diff --git a/python/pyspark/sql/tests/test_functions.py b/python/pyspark/sql/tests/test_functions.py index b2fd1335904f..1fd23906edae 100644 --- a/python/pyspark/sql/tests/test_functions.py +++ b/python/pyspark/sql/tests/test_functions.py @@ -1354,6 +1354,14 @@ class FunctionsTestsMixin: message_parameters={"arg_name": "json", "arg_type": "int"}, ) + def test_try_parse_json(self): + df = self.spark.createDataFrame([{"json": """{ "a" : 1 }"""}, {"json": """{ a : 1 }"""}]) + actual = df.select( + F.to_json(F.try_parse_json(df.json)).alias("var"), + ).collect() + self.assertEqual("""{"a":1}""", actual[0]["var"]) + self.assertEqual(None, actual[1]["var"]) + def test_schema_of_csv(self): with self.assertRaises(PySparkTypeError) as pe: F.schema_of_csv(1) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala index e4e663d15167..d9e2154487c7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala @@ -821,7 +821,8 @@ object FunctionRegistry { expression[JsonObjectKeys]("json_object_keys"), // Variant - expression[ParseJson]("parse_json"), + expressionBuilder("parse_json", ParseJsonExpressionBuilder), + expressionBuilder("try_parse_json", TryParseJsonExpressionBuilder), expression[IsVariantNull]("is_variant_null"), expressionBuilder("variant_get", VariantGetExpressionBuilder), expressionBuilder("try_variant_get", TryVariantGetExpressionBuilder), diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/variant/VariantExpressionEvalUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/variant/VariantExpressionEvalUtils.scala index ea90bb88a906..a8d629acbe2b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/variant/VariantExpressionEvalUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/variant/VariantExpressionEvalUtils.scala @@ -31,16 +31,24 @@ import org.apache.spark.unsafe.types.{UTF8String, VariantVal} */ object VariantExpressionEvalUtils { - def parseJson(input: UTF8String): VariantVal = { + def parseJson(input: UTF8String, failOnError: Boolean = true): VariantVal = { + def parseJsonFailure(exception: Throwable): VariantVal = { + if (failOnError) { + throw exception + } else { + null + } + } try { val v = VariantBuilder.parseJson(input.toString) new VariantVal(v.getValue, v.getMetadata) } catch { case _: VariantSizeLimitException => - throw QueryExecutionErrors.variantSizeLimitError(VariantUtil.SIZE_LIMIT, "parse_json") + parseJsonFailure(QueryExecutionErrors + .variantSizeLimitError(VariantUtil.SIZE_LIMIT, "parse_json")) case NonFatal(e) => - throw QueryExecutionErrors.malformedRecordsDetectedInRecordParsingError( - input.toString, BadRecordException(() => input, cause = e)) + parseJsonFailure(QueryExecutionErrors.malformedRecordsDetectedInRecordParsingError( + input.toString, BadRecordException(() => input, cause = e))) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/variant/variantExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/variant/variantExpressions.scala index 6c4a8f90e3b5..43c561e10b6d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/variant/variantExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/variant/variantExpressions.scala @@ -42,34 +42,30 @@ import org.apache.spark.types.variant._ import org.apache.spark.types.variant.VariantUtil.Type import org.apache.spark.unsafe.types._ -// scalastyle:off line.size.limit -@ExpressionDescription( - usage = "_FUNC_(jsonStr) - Parse a JSON string as an Variant value. Throw an exception when the string is not valid JSON value.", - examples = """ - Examples: - > SELECT _FUNC_('{"a":1,"b":0.8}'); - {"a":1,"b":0.8} - """, - since = "4.0.0", - group = "variant_funcs" -) -// scalastyle:on line.size.limit -case class ParseJson(child: Expression) + +/** + * The implementation for `parse_json` and `try_parse_json` expressions. Parse a JSON string as a + * Variant value. + * @param child The string value to parse as a variant. + * @param failOnError Controls whether the expression should throw an exception or return null if + * the string does not represent a valid JSON value. + */ +case class ParseJson(child: Expression, failOnError: Boolean = true) extends UnaryExpression with ExpectsInputTypes with RuntimeReplaceable { override lazy val replacement: Expression = StaticInvoke( VariantExpressionEvalUtils.getClass, VariantType, "parseJson", - Seq(child), - inputTypes, + Seq(child, Literal(failOnError, BooleanType)), + inputTypes :+ BooleanType, returnNullable = false) override def inputTypes: Seq[AbstractDataType] = StringType :: Nil override def dataType: DataType = VariantType - override def prettyName: String = "parse_json" + override def prettyName: String = if (failOnError) "parse_json" else "try_parse_json" override protected def withNewChildInternal(newChild: Expression): ParseJson = copy(child = newChild) @@ -425,6 +421,47 @@ case object VariantGet { } } +abstract class ParseJsonExpressionBuilderBase(failOnError: Boolean) extends ExpressionBuilder { + override def build(funcName: String, expressions: Seq[Expression]): Expression = { + val numArgs = expressions.length + if (numArgs == 1) { + ParseJson(expressions.head, failOnError) + } else { + throw QueryCompilationErrors.wrongNumArgsError(funcName, Seq(1), numArgs) + } + } +} + +// scalastyle:off line.size.limit +@ExpressionDescription( + usage = "_FUNC_(jsonStr) - Parse a JSON string as a Variant value. Throw an exception when the string is not valid JSON value.", + examples = """ + Examples: + > SELECT _FUNC_('{"a":1,"b":0.8}'); + {"a":1,"b":0.8} + """, + since = "4.0.0", + group = "variant_funcs" +) +// scalastyle:on line.size.limit +object ParseJsonExpressionBuilder extends ParseJsonExpressionBuilderBase(true) + +// scalastyle:off line.size.limit +@ExpressionDescription( + usage = "_FUNC_(jsonStr) - Parse a JSON string as a Variant value. Return NULL when the string is not valid JSON value.", + examples = """ + Examples: + > SELECT _FUNC_('{"a":1,"b":0.8}'); + {"a":1,"b":0.8} + > SELECT _FUNC_('{"a":1,'); + NULL + """, + since = "4.0.0", + group = "variant_funcs" +) +// scalastyle:on line.size.limit +object TryParseJsonExpressionBuilder extends ParseJsonExpressionBuilderBase(false) + abstract class VariantGetExpressionBuilderBase(failOnError: Boolean) extends ExpressionBuilder { override def build(funcName: String, expressions: Seq[Expression]): Expression = { val numArgs = expressions.length diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/variant/VariantExpressionEvalUtilsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/variant/VariantExpressionEvalUtilsSuite.scala index 91e991dffd4a..8fc72caa4786 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/variant/VariantExpressionEvalUtilsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/variant/VariantExpressionEvalUtilsSuite.scala @@ -25,9 +25,13 @@ class VariantExpressionEvalUtilsSuite extends SparkFunSuite { test("parseJson type coercion") { def check(json: String, expectedValue: Array[Byte], expectedMetadata: Array[Byte]): Unit = { + // parse_json val actual = VariantExpressionEvalUtils.parseJson(UTF8String.fromString(json)) + // try_parse_json + val tryActual = VariantExpressionEvalUtils.parseJson(UTF8String.fromString(json), + failOnError = false) val expected = new VariantVal(expectedValue, expectedMetadata) - assert(actual === expected) + assert(actual === expected && tryActual === expected) } // Dictionary size is `0` for value 0. An empty dictionary contains one offset `0` for the @@ -104,6 +108,8 @@ class VariantExpressionEvalUtilsSuite extends SparkFunSuite { test("parseJson negative") { def checkException(json: String, errorClass: String, parameters: Map[String, String]): Unit = { + val try_parse_json_output = VariantExpressionEvalUtils.parseJson(UTF8String.fromString(json), + failOnError = false) checkError( exception = intercept[SparkThrowable] { VariantExpressionEvalUtils.parseJson(UTF8String.fromString(json)) @@ -111,6 +117,7 @@ class VariantExpressionEvalUtilsSuite extends SparkFunSuite { errorClass = errorClass, parameters = parameters ) + assert(try_parse_json_output === null) } for (json <- Seq("", "[", "+1", "1a", """{"a": 1, "b": 2, "a": "3"}""")) { checkException(json, "MALFORMED_RECORD_IN_PARSING.WITHOUT_SUGGESTION", diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala index dc68382ecdc0..e6aed3f7a43d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala @@ -6632,6 +6632,17 @@ object functions { fnWithOptions("from_json", options, e, schema) } + /** + * Parses a JSON string and constructs a Variant value. Returns null if the input string is not + * a valid JSON value. + * + * @param json a string column that contains JSON data. + * + * @group variant_funcs + * @since 4.0.0 + */ + def try_parse_json(json: Column): Column = Column.fn("try_parse_json", json) + /** * Parses a JSON string and constructs a Variant value. * diff --git a/sql/core/src/test/resources/sql-functions/sql-expression-schema.md b/sql/core/src/test/resources/sql-functions/sql-expression-schema.md index ae9e68c4cbb1..dd223939a184 100644 --- a/sql/core/src/test/resources/sql-functions/sql-expression-schema.md +++ b/sql/core/src/test/resources/sql-functions/sql-expression-schema.md @@ -437,9 +437,10 @@ | org.apache.spark.sql.catalyst.expressions.aggregate.VarianceSamp | var_samp | SELECT var_samp(col) FROM VALUES (1), (2), (3) AS tab(col) | struct<var_samp(col):double> | | org.apache.spark.sql.catalyst.expressions.aggregate.VarianceSamp | variance | SELECT variance(col) FROM VALUES (1), (2), (3) AS tab(col) | struct<variance(col):double> | | org.apache.spark.sql.catalyst.expressions.variant.IsVariantNull | is_variant_null | SELECT is_variant_null(parse_json('null')) | struct<is_variant_null(parse_json(null)):boolean> | -| org.apache.spark.sql.catalyst.expressions.variant.ParseJson | parse_json | SELECT parse_json('{"a":1,"b":0.8}') | struct<parse_json({"a":1,"b":0.8}):variant> | +| org.apache.spark.sql.catalyst.expressions.variant.ParseJsonExpressionBuilder | parse_json | SELECT parse_json('{"a":1,"b":0.8}') | struct<parse_json({"a":1,"b":0.8}):variant> | | org.apache.spark.sql.catalyst.expressions.variant.SchemaOfVariant | schema_of_variant | SELECT schema_of_variant(parse_json('null')) | struct<schema_of_variant(parse_json(null)):string> | | org.apache.spark.sql.catalyst.expressions.variant.SchemaOfVariantAgg | schema_of_variant_agg | SELECT schema_of_variant_agg(parse_json(j)) FROM VALUES ('1'), ('2'), ('3') AS tab(j) | struct<schema_of_variant_agg(parse_json(j)):string> | +| org.apache.spark.sql.catalyst.expressions.variant.TryParseJsonExpressionBuilder | try_parse_json | SELECT try_parse_json('{"a":1,"b":0.8}') | struct<try_parse_json({"a":1,"b":0.8}):variant> | | org.apache.spark.sql.catalyst.expressions.variant.TryVariantGetExpressionBuilder | try_variant_get | SELECT try_variant_get(parse_json('{"a": 1}'), '$.a', 'int') | struct<try_variant_get(parse_json({"a": 1}), $.a):int> | | org.apache.spark.sql.catalyst.expressions.variant.VariantGetExpressionBuilder | variant_get | SELECT variant_get(parse_json('{"a": 1}'), '$.a', 'int') | struct<variant_get(parse_json({"a": 1}), $.a):int> | | org.apache.spark.sql.catalyst.expressions.xml.XPathBoolean | xpath_boolean | SELECT xpath_boolean('<a><b>1</b></a>','a/b') | struct<xpath_boolean(<a><b>1</b></a>, a/b):boolean> | diff --git a/sql/core/src/test/scala/org/apache/spark/sql/VariantEndToEndSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/VariantEndToEndSuite.scala index d53b49f7ab5a..c8d267ff5ecc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/VariantEndToEndSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/VariantEndToEndSuite.scala @@ -88,6 +88,41 @@ class VariantEndToEndSuite extends QueryTest with SharedSparkSession { check("[0.0, 1.00, 1.10, 1.23]", "[0,1,1.1,1.23]") } + test("try_parse_json/to_json round-trip") { + def check(input: String, output: String = "INPUT IS OUTPUT"): Unit = { + val df = Seq(input).toDF("v") + val variantDF = df.selectExpr("to_json(try_parse_json(v)) as v").select(Column("v")) + val expected = if (output != "INPUT IS OUTPUT") output else input + checkAnswer(variantDF, Seq(Row(expected))) + } + + check("null") + check("true") + check("false") + check("-1") + check("1.0E10") + check("\"\"") + check("\"" + ("a" * 63) + "\"") + check("\"" + ("b" * 64) + "\"") + // scalastyle:off nonascii + check("\"" + ("你好,世界" * 20) + "\"") + // scalastyle:on nonascii + check("[]") + check("{}") + // scalastyle:off nonascii + check( + "[null, true, false,-1, 1e10, \"\\uD83D\\uDE05\", [ ], { } ]", + "[null,true,false,-1,1.0E10,\"😅\",[],{}]" + ) + // scalastyle:on nonascii + check("[0.0, 1.00, 1.10, 1.23]", "[0,1,1.1,1.23]") + // Places where parse_json should fail and therefore, try_parse_json should return null + check("{1:2}", null) + check("{\"a\":1", null) + check("{\"a\":[a,b,c]}", null) + check("\"" + "a" * (16 * 1024 * 1024) + "\"", null) + } + test("to_json with nested variant") { val df = Seq(1).toDF("v") val variantDF1 = df.select( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/VariantSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/VariantSuite.scala index da99ef346630..19e5f9ba63e6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/VariantSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/VariantSuite.scala @@ -58,6 +58,15 @@ class VariantSuite extends QueryTest with SharedSparkSession { } } + test("basic try_parse_json alias") { + val df = spark.createDataFrame(Seq(Row("""{ "a" : 1 }"""), Row("""{ a : 1 }""")).asJava, + new StructType().add("json", StringType)) + val actual = df.select(to_json(try_parse_json(col("json")))).collect() + + assert(actual(0)(0) == """{"a":1}""") + assert(actual(1)(0) == null) + } + test("basic parse_json alias") { val df = spark.createDataFrame(Seq(Row("""{ "a" : 1 }""")).asJava, new StructType().add("json", StringType)) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org