This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 033ca3e7dd5b [SPARK-47922][SQL] Implement the try_parse_json expression
033ca3e7dd5b is described below

commit 033ca3e7dd5bd9ceadd9219c2bde105bea301e70
Author: Harsh Motwani <harsh.motw...@databricks.com>
AuthorDate: Fri Apr 26 14:35:59 2024 +0800

    [SPARK-47922][SQL] Implement the try_parse_json expression
    
    ### What changes were proposed in this pull request?
    
    This pull request implements the `try_parse_json` that runs `parse_json` on 
string expressions to extract variants. However, if `parse_json` throws an 
exception on a row, the value `null` is returned.
    
    ### Why are the changes needed?
    
    Sometimes, columns containing JSON strings may contain some invalid inputs 
that should be ignored instead of having the whole execution failed because of 
it.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, it allows users to run the `try_parse_json` expression.
    
    ### How was this patch tested?
    
    Unit tests to check if `try_parse_json` works just like `parse_json` on 
valid inputs, returns `null` on invalid inputs, and fails on incorrect input 
data types.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #46141 from harshmotw-db/try_parse_json.
    
    Authored-by: Harsh Motwani <harsh.motw...@databricks.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../scala/org/apache/spark/sql/functions.scala     |  12 ++++
 .../apache/spark/sql/PlanGenerationTestSuite.scala |   4 ++
 .../function_is_variant_null.explain               |   2 +-
 .../explain-results/function_parse_json.explain    |   2 +-
 .../function_schema_of_variant.explain             |   2 +-
 .../function_schema_of_variant_agg.explain         |   2 +-
 ...son.explain => function_try_parse_json.explain} |   2 +-
 .../function_try_variant_get.explain               |   2 +-
 .../explain-results/function_variant_get.explain   |   2 +-
 .../queries/function_try_parse_json.json           |  25 ++++++++
 .../queries/function_try_parse_json.proto.bin      | Bin 0 -> 183 bytes
 .../source/reference/pyspark.sql/functions.rst     |   1 +
 python/pyspark/sql/connect/functions/builtin.py    |   7 +++
 python/pyspark/sql/functions/builtin.py            |  34 +++++++++-
 python/pyspark/sql/tests/test_functions.py         |   8 +++
 .../sql/catalyst/analysis/FunctionRegistry.scala   |   3 +-
 .../variant/VariantExpressionEvalUtils.scala       |  16 +++--
 .../expressions/variant/variantExpressions.scala   |  69 ++++++++++++++++-----
 .../variant/VariantExpressionEvalUtilsSuite.scala  |   9 ++-
 .../scala/org/apache/spark/sql/functions.scala     |  11 ++++
 .../sql-functions/sql-expression-schema.md         |   3 +-
 .../apache/spark/sql/VariantEndToEndSuite.scala    |  35 +++++++++++
 .../scala/org/apache/spark/sql/VariantSuite.scala  |   9 +++
 23 files changed, 229 insertions(+), 31 deletions(-)

diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala
index 5a7880c87431..6471d15b63ab 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala
@@ -7000,6 +7000,18 @@ object functions {
     fnWithOptions("from_json", options, e, schema)
   }
 
+  /**
+   * Parses a JSON string and constructs a Variant value. Returns null if the 
input string is not
+   * a valid JSON value.
+   *
+   * @param json
+   *   a string column that contains JSON data.
+   *
+   * @group variant_funcs
+   * @since 4.0.0
+   */
+  def try_parse_json(json: Column): Column = Column.fn("try_parse_json", json)
+
   /**
    * Parses a JSON string and constructs a Variant value.
    *
diff --git 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
index 1005561b24ac..ebf4ee0e9073 100644
--- 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
+++ 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
@@ -2489,6 +2489,10 @@ class PlanGenerationTestSuite
       Collections.singletonMap("allowNumericLeadingZeros", "true"))
   }
 
+  functionTest("try_parse_json") {
+    fn.try_parse_json(fn.col("g"))
+  }
+
   functionTest("to_json") {
     fn.to_json(fn.col("d"), Map(("timestampFormat", "dd/MM/yyyy")))
   }
diff --git 
a/connector/connect/common/src/test/resources/query-tests/explain-results/function_is_variant_null.explain
 
b/connector/connect/common/src/test/resources/query-tests/explain-results/function_is_variant_null.explain
index 1273807d15c3..53ba167fca65 100644
--- 
a/connector/connect/common/src/test/resources/query-tests/explain-results/function_is_variant_null.explain
+++ 
b/connector/connect/common/src/test/resources/query-tests/explain-results/function_is_variant_null.explain
@@ -1,2 +1,2 @@
-Project [staticinvoke(class 
org.apache.spark.sql.catalyst.expressions.variant.VariantExpressionEvalUtils$, 
BooleanType, isVariantNull, staticinvoke(class 
org.apache.spark.sql.catalyst.expressions.variant.VariantExpressionEvalUtils$, 
VariantType, parseJson, g#0, StringType, true, false, true), VariantType, 
false, false, true) AS is_variant_null(parse_json(g))#0]
+Project [staticinvoke(class 
org.apache.spark.sql.catalyst.expressions.variant.VariantExpressionEvalUtils$, 
BooleanType, isVariantNull, staticinvoke(class 
org.apache.spark.sql.catalyst.expressions.variant.VariantExpressionEvalUtils$, 
VariantType, parseJson, g#0, true, StringType, BooleanType, true, false, true), 
VariantType, false, false, true) AS is_variant_null(parse_json(g))#0]
 +- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git 
a/connector/connect/common/src/test/resources/query-tests/explain-results/function_parse_json.explain
 
b/connector/connect/common/src/test/resources/query-tests/explain-results/function_parse_json.explain
index 061aafbadb07..b844d19c85ac 100644
--- 
a/connector/connect/common/src/test/resources/query-tests/explain-results/function_parse_json.explain
+++ 
b/connector/connect/common/src/test/resources/query-tests/explain-results/function_parse_json.explain
@@ -1,2 +1,2 @@
-Project [staticinvoke(class 
org.apache.spark.sql.catalyst.expressions.variant.VariantExpressionEvalUtils$, 
VariantType, parseJson, g#0, StringType, true, false, true) AS parse_json(g)#0]
+Project [staticinvoke(class 
org.apache.spark.sql.catalyst.expressions.variant.VariantExpressionEvalUtils$, 
VariantType, parseJson, g#0, true, StringType, BooleanType, true, false, true) 
AS parse_json(g)#0]
 +- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git 
a/connector/connect/common/src/test/resources/query-tests/explain-results/function_schema_of_variant.explain
 
b/connector/connect/common/src/test/resources/query-tests/explain-results/function_schema_of_variant.explain
index 5ba5d96425db..62f8e7f3e6fe 100644
--- 
a/connector/connect/common/src/test/resources/query-tests/explain-results/function_schema_of_variant.explain
+++ 
b/connector/connect/common/src/test/resources/query-tests/explain-results/function_schema_of_variant.explain
@@ -1,2 +1,2 @@
-Project [staticinvoke(class 
org.apache.spark.sql.catalyst.expressions.variant.SchemaOfVariant$, StringType, 
schemaOfVariant, staticinvoke(class 
org.apache.spark.sql.catalyst.expressions.variant.VariantExpressionEvalUtils$, 
VariantType, parseJson, g#0, StringType, true, false, true), VariantType, true, 
false, true) AS schema_of_variant(parse_json(g))#0]
+Project [staticinvoke(class 
org.apache.spark.sql.catalyst.expressions.variant.SchemaOfVariant$, StringType, 
schemaOfVariant, staticinvoke(class 
org.apache.spark.sql.catalyst.expressions.variant.VariantExpressionEvalUtils$, 
VariantType, parseJson, g#0, true, StringType, BooleanType, true, false, true), 
VariantType, true, false, true) AS schema_of_variant(parse_json(g))#0]
 +- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git 
a/connector/connect/common/src/test/resources/query-tests/explain-results/function_schema_of_variant_agg.explain
 
b/connector/connect/common/src/test/resources/query-tests/explain-results/function_schema_of_variant_agg.explain
index b3a39efe8c50..d4f9e2c66d99 100644
--- 
a/connector/connect/common/src/test/resources/query-tests/explain-results/function_schema_of_variant_agg.explain
+++ 
b/connector/connect/common/src/test/resources/query-tests/explain-results/function_schema_of_variant_agg.explain
@@ -1,2 +1,2 @@
-Aggregate [schema_of_variant_agg(staticinvoke(class 
org.apache.spark.sql.catalyst.expressions.variant.VariantExpressionEvalUtils$, 
VariantType, parseJson, g#0, StringType, true, false, true), 0, 0) AS 
schema_of_variant_agg(parse_json(g))#0]
+Aggregate [schema_of_variant_agg(staticinvoke(class 
org.apache.spark.sql.catalyst.expressions.variant.VariantExpressionEvalUtils$, 
VariantType, parseJson, g#0, true, StringType, BooleanType, true, false, true), 
0, 0) AS schema_of_variant_agg(parse_json(g))#0]
 +- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git 
a/connector/connect/common/src/test/resources/query-tests/explain-results/function_parse_json.explain
 
b/connector/connect/common/src/test/resources/query-tests/explain-results/function_try_parse_json.explain
similarity index 69%
copy from 
connector/connect/common/src/test/resources/query-tests/explain-results/function_parse_json.explain
copy to 
connector/connect/common/src/test/resources/query-tests/explain-results/function_try_parse_json.explain
index 061aafbadb07..1772b5d37623 100644
--- 
a/connector/connect/common/src/test/resources/query-tests/explain-results/function_parse_json.explain
+++ 
b/connector/connect/common/src/test/resources/query-tests/explain-results/function_try_parse_json.explain
@@ -1,2 +1,2 @@
-Project [staticinvoke(class 
org.apache.spark.sql.catalyst.expressions.variant.VariantExpressionEvalUtils$, 
VariantType, parseJson, g#0, StringType, true, false, true) AS parse_json(g)#0]
+Project [staticinvoke(class 
org.apache.spark.sql.catalyst.expressions.variant.VariantExpressionEvalUtils$, 
VariantType, parseJson, g#0, false, StringType, BooleanType, true, false, true) 
AS try_parse_json(g)#0]
 +- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git 
a/connector/connect/common/src/test/resources/query-tests/explain-results/function_try_variant_get.explain
 
b/connector/connect/common/src/test/resources/query-tests/explain-results/function_try_variant_get.explain
index e590a9b2f45c..748465142bde 100644
--- 
a/connector/connect/common/src/test/resources/query-tests/explain-results/function_try_variant_get.explain
+++ 
b/connector/connect/common/src/test/resources/query-tests/explain-results/function_try_variant_get.explain
@@ -1,2 +1,2 @@
-Project [try_variant_get(staticinvoke(class 
org.apache.spark.sql.catalyst.expressions.variant.VariantExpressionEvalUtils$, 
VariantType, parseJson, g#0, StringType, true, false, true), $, IntegerType, 
false, Some(America/Los_Angeles)) AS try_variant_get(parse_json(g), $)#0]
+Project [try_variant_get(staticinvoke(class 
org.apache.spark.sql.catalyst.expressions.variant.VariantExpressionEvalUtils$, 
VariantType, parseJson, g#0, true, StringType, BooleanType, true, false, true), 
$, IntegerType, false, Some(America/Los_Angeles)) AS 
try_variant_get(parse_json(g), $)#0]
 +- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git 
a/connector/connect/common/src/test/resources/query-tests/explain-results/function_variant_get.explain
 
b/connector/connect/common/src/test/resources/query-tests/explain-results/function_variant_get.explain
index 13fb6abc7bad..3503ee178ca7 100644
--- 
a/connector/connect/common/src/test/resources/query-tests/explain-results/function_variant_get.explain
+++ 
b/connector/connect/common/src/test/resources/query-tests/explain-results/function_variant_get.explain
@@ -1,2 +1,2 @@
-Project [variant_get(staticinvoke(class 
org.apache.spark.sql.catalyst.expressions.variant.VariantExpressionEvalUtils$, 
VariantType, parseJson, g#0, StringType, true, false, true), $, IntegerType, 
true, Some(America/Los_Angeles)) AS variant_get(parse_json(g), $)#0]
+Project [variant_get(staticinvoke(class 
org.apache.spark.sql.catalyst.expressions.variant.VariantExpressionEvalUtils$, 
VariantType, parseJson, g#0, true, StringType, BooleanType, true, false, true), 
$, IntegerType, true, Some(America/Los_Angeles)) AS variant_get(parse_json(g), 
$)#0]
 +- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git 
a/connector/connect/common/src/test/resources/query-tests/queries/function_try_parse_json.json
 
b/connector/connect/common/src/test/resources/query-tests/queries/function_try_parse_json.json
new file mode 100644
index 000000000000..91177eb4a585
--- /dev/null
+++ 
b/connector/connect/common/src/test/resources/query-tests/queries/function_try_parse_json.json
@@ -0,0 +1,25 @@
+{
+  "common": {
+    "planId": "1"
+  },
+  "project": {
+    "input": {
+      "common": {
+        "planId": "0"
+      },
+      "localRelation": {
+        "schema": 
"struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
+      }
+    },
+    "expressions": [{
+      "unresolvedFunction": {
+        "functionName": "try_parse_json",
+        "arguments": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "g"
+          }
+        }]
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git 
a/connector/connect/common/src/test/resources/query-tests/queries/function_try_parse_json.proto.bin
 
b/connector/connect/common/src/test/resources/query-tests/queries/function_try_parse_json.proto.bin
new file mode 100644
index 000000000000..cc1f159cfd78
Binary files /dev/null and 
b/connector/connect/common/src/test/resources/query-tests/queries/function_try_parse_json.proto.bin
 differ
diff --git a/python/docs/source/reference/pyspark.sql/functions.rst 
b/python/docs/source/reference/pyspark.sql/functions.rst
index d293e34f0a79..40af3d52e653 100644
--- a/python/docs/source/reference/pyspark.sql/functions.rst
+++ b/python/docs/source/reference/pyspark.sql/functions.rst
@@ -547,6 +547,7 @@ VARIANT Functions
     schema_of_variant_agg
     try_variant_get
     variant_get
+    try_parse_json
 
 
 XML Functions
diff --git a/python/pyspark/sql/connect/functions/builtin.py 
b/python/pyspark/sql/connect/functions/builtin.py
index cbbad941bf29..ab3bcfcba4d3 100644
--- a/python/pyspark/sql/connect/functions/builtin.py
+++ b/python/pyspark/sql/connect/functions/builtin.py
@@ -2040,6 +2040,13 @@ def str_to_map(
 str_to_map.__doc__ = pysparkfuncs.str_to_map.__doc__
 
 
+def try_parse_json(col: "ColumnOrName") -> Column:
+    return _invoke_function("try_parse_json", _to_col(col))
+
+
+try_parse_json.__doc__ = pysparkfuncs.try_parse_json.__doc__
+
+
 def parse_json(col: "ColumnOrName") -> Column:
     return _invoke_function("parse_json", _to_col(col))
 
diff --git a/python/pyspark/sql/functions/builtin.py 
b/python/pyspark/sql/functions/builtin.py
index b54b377aaebc..64f39e352642 100644
--- a/python/pyspark/sql/functions/builtin.py
+++ b/python/pyspark/sql/functions/builtin.py
@@ -15594,12 +15594,44 @@ def from_json(
     return _invoke_function("from_json", _to_java_column(col), schema, 
_options_to_str(options))
 
 
+@_try_remote_functions
+def try_parse_json(
+    col: "ColumnOrName",
+) -> Column:
+    """
+    Parses a column containing a JSON string into a :class:`VariantType`. 
Returns None if a string
+    contains an invalid JSON value.
+
+    .. versionadded:: 4.0.0
+
+    Parameters
+    ----------
+    col : :class:`~pyspark.sql.Column` or str
+        a column or column name JSON formatted strings
+
+    Returns
+    -------
+    :class:`~pyspark.sql.Column`
+        a new column of VariantType.
+
+    Examples
+    --------
+    >>> df = spark.createDataFrame([ {'json': '''{ "a" : 1 }'''}, {'json': 
'''{a : 1}'''} ])
+    >>> df.select(to_json(try_parse_json(df.json))).collect()
+    [Row(to_json(try_parse_json(json))='{"a":1}'), 
Row(to_json(try_parse_json(json))=None)]
+    """
+    from pyspark.sql.classic.column import _to_java_column
+
+    return _invoke_function("try_parse_json", _to_java_column(col))
+
+
 @_try_remote_functions
 def parse_json(
     col: "ColumnOrName",
 ) -> Column:
     """
-    Parses a column containing a JSON string into a :class:`VariantType`.
+    Parses a column containing a JSON string into a :class:`VariantType`. 
Throws exception if a
+    string represents an invalid JSON value.
 
     .. versionadded:: 4.0.0
 
diff --git a/python/pyspark/sql/tests/test_functions.py 
b/python/pyspark/sql/tests/test_functions.py
index b2fd1335904f..1fd23906edae 100644
--- a/python/pyspark/sql/tests/test_functions.py
+++ b/python/pyspark/sql/tests/test_functions.py
@@ -1354,6 +1354,14 @@ class FunctionsTestsMixin:
             message_parameters={"arg_name": "json", "arg_type": "int"},
         )
 
+    def test_try_parse_json(self):
+        df = self.spark.createDataFrame([{"json": """{ "a" : 1 }"""}, {"json": 
"""{ a : 1 }"""}])
+        actual = df.select(
+            F.to_json(F.try_parse_json(df.json)).alias("var"),
+        ).collect()
+        self.assertEqual("""{"a":1}""", actual[0]["var"])
+        self.assertEqual(None, actual[1]["var"])
+
     def test_schema_of_csv(self):
         with self.assertRaises(PySparkTypeError) as pe:
             F.schema_of_csv(1)
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
index e4e663d15167..d9e2154487c7 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
@@ -821,7 +821,8 @@ object FunctionRegistry {
     expression[JsonObjectKeys]("json_object_keys"),
 
     // Variant
-    expression[ParseJson]("parse_json"),
+    expressionBuilder("parse_json", ParseJsonExpressionBuilder),
+    expressionBuilder("try_parse_json", TryParseJsonExpressionBuilder),
     expression[IsVariantNull]("is_variant_null"),
     expressionBuilder("variant_get", VariantGetExpressionBuilder),
     expressionBuilder("try_variant_get", TryVariantGetExpressionBuilder),
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/variant/VariantExpressionEvalUtils.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/variant/VariantExpressionEvalUtils.scala
index ea90bb88a906..a8d629acbe2b 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/variant/VariantExpressionEvalUtils.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/variant/VariantExpressionEvalUtils.scala
@@ -31,16 +31,24 @@ import org.apache.spark.unsafe.types.{UTF8String, 
VariantVal}
  */
 object VariantExpressionEvalUtils {
 
-  def parseJson(input: UTF8String): VariantVal = {
+  def parseJson(input: UTF8String, failOnError: Boolean = true): VariantVal = {
+    def parseJsonFailure(exception: Throwable): VariantVal = {
+      if (failOnError) {
+        throw exception
+      } else {
+        null
+      }
+    }
     try {
       val v = VariantBuilder.parseJson(input.toString)
       new VariantVal(v.getValue, v.getMetadata)
     } catch {
       case _: VariantSizeLimitException =>
-        throw 
QueryExecutionErrors.variantSizeLimitError(VariantUtil.SIZE_LIMIT, "parse_json")
+        parseJsonFailure(QueryExecutionErrors
+          .variantSizeLimitError(VariantUtil.SIZE_LIMIT, "parse_json"))
       case NonFatal(e) =>
-        throw 
QueryExecutionErrors.malformedRecordsDetectedInRecordParsingError(
-          input.toString, BadRecordException(() => input, cause = e))
+        
parseJsonFailure(QueryExecutionErrors.malformedRecordsDetectedInRecordParsingError(
+          input.toString, BadRecordException(() => input, cause = e)))
     }
   }
 
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/variant/variantExpressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/variant/variantExpressions.scala
index 6c4a8f90e3b5..43c561e10b6d 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/variant/variantExpressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/variant/variantExpressions.scala
@@ -42,34 +42,30 @@ import org.apache.spark.types.variant._
 import org.apache.spark.types.variant.VariantUtil.Type
 import org.apache.spark.unsafe.types._
 
-// scalastyle:off line.size.limit
-@ExpressionDescription(
-  usage = "_FUNC_(jsonStr) - Parse a JSON string as an Variant value. Throw an 
exception when the string is not valid JSON value.",
-  examples = """
-    Examples:
-      > SELECT _FUNC_('{"a":1,"b":0.8}');
-       {"a":1,"b":0.8}
-  """,
-  since = "4.0.0",
-  group = "variant_funcs"
-)
-// scalastyle:on line.size.limit
-case class ParseJson(child: Expression)
+
+/**
+ * The implementation for `parse_json` and `try_parse_json` expressions. Parse 
a JSON string as a
+ * Variant value.
+ * @param child The string value to parse as a variant.
+ * @param failOnError Controls whether the expression should throw an 
exception or return null if
+ *                    the string does not represent a valid JSON value.
+ */
+case class ParseJson(child: Expression, failOnError: Boolean = true)
   extends UnaryExpression with ExpectsInputTypes with RuntimeReplaceable {
 
   override lazy val replacement: Expression = StaticInvoke(
     VariantExpressionEvalUtils.getClass,
     VariantType,
     "parseJson",
-    Seq(child),
-    inputTypes,
+    Seq(child, Literal(failOnError, BooleanType)),
+    inputTypes :+ BooleanType,
     returnNullable = false)
 
   override def inputTypes: Seq[AbstractDataType] = StringType :: Nil
 
   override def dataType: DataType = VariantType
 
-  override def prettyName: String = "parse_json"
+  override def prettyName: String = if (failOnError) "parse_json" else 
"try_parse_json"
 
   override protected def withNewChildInternal(newChild: Expression): ParseJson 
=
     copy(child = newChild)
@@ -425,6 +421,47 @@ case object VariantGet {
   }
 }
 
+abstract class ParseJsonExpressionBuilderBase(failOnError: Boolean) extends 
ExpressionBuilder {
+  override def build(funcName: String, expressions: Seq[Expression]): 
Expression = {
+    val numArgs = expressions.length
+    if (numArgs == 1) {
+      ParseJson(expressions.head, failOnError)
+    } else {
+      throw QueryCompilationErrors.wrongNumArgsError(funcName, Seq(1), numArgs)
+    }
+  }
+}
+
+// scalastyle:off line.size.limit
+@ExpressionDescription(
+  usage = "_FUNC_(jsonStr) - Parse a JSON string as a Variant value. Throw an 
exception when the string is not valid JSON value.",
+  examples = """
+    Examples:
+      > SELECT _FUNC_('{"a":1,"b":0.8}');
+       {"a":1,"b":0.8}
+  """,
+  since = "4.0.0",
+  group = "variant_funcs"
+)
+// scalastyle:on line.size.limit
+object ParseJsonExpressionBuilder extends ParseJsonExpressionBuilderBase(true)
+
+// scalastyle:off line.size.limit
+@ExpressionDescription(
+  usage = "_FUNC_(jsonStr) - Parse a JSON string as a Variant value. Return 
NULL when the string is not valid JSON value.",
+  examples = """
+    Examples:
+      > SELECT _FUNC_('{"a":1,"b":0.8}');
+       {"a":1,"b":0.8}
+      > SELECT _FUNC_('{"a":1,');
+       NULL
+  """,
+  since = "4.0.0",
+  group = "variant_funcs"
+)
+// scalastyle:on line.size.limit
+object TryParseJsonExpressionBuilder extends 
ParseJsonExpressionBuilderBase(false)
+
 abstract class VariantGetExpressionBuilderBase(failOnError: Boolean) extends 
ExpressionBuilder {
   override def build(funcName: String, expressions: Seq[Expression]): 
Expression = {
     val numArgs = expressions.length
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/variant/VariantExpressionEvalUtilsSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/variant/VariantExpressionEvalUtilsSuite.scala
index 91e991dffd4a..8fc72caa4786 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/variant/VariantExpressionEvalUtilsSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/variant/VariantExpressionEvalUtilsSuite.scala
@@ -25,9 +25,13 @@ class VariantExpressionEvalUtilsSuite extends SparkFunSuite {
 
   test("parseJson type coercion") {
     def check(json: String, expectedValue: Array[Byte], expectedMetadata: 
Array[Byte]): Unit = {
+      // parse_json
       val actual = 
VariantExpressionEvalUtils.parseJson(UTF8String.fromString(json))
+      // try_parse_json
+      val tryActual = 
VariantExpressionEvalUtils.parseJson(UTF8String.fromString(json),
+        failOnError = false)
       val expected = new VariantVal(expectedValue, expectedMetadata)
-      assert(actual === expected)
+      assert(actual === expected && tryActual === expected)
     }
 
     // Dictionary size is `0` for value 0. An empty dictionary contains one 
offset `0` for the
@@ -104,6 +108,8 @@ class VariantExpressionEvalUtilsSuite extends SparkFunSuite 
{
 
   test("parseJson negative") {
     def checkException(json: String, errorClass: String, parameters: 
Map[String, String]): Unit = {
+      val try_parse_json_output = 
VariantExpressionEvalUtils.parseJson(UTF8String.fromString(json),
+        failOnError = false)
       checkError(
         exception = intercept[SparkThrowable] {
           VariantExpressionEvalUtils.parseJson(UTF8String.fromString(json))
@@ -111,6 +117,7 @@ class VariantExpressionEvalUtilsSuite extends SparkFunSuite 
{
         errorClass = errorClass,
         parameters = parameters
       )
+      assert(try_parse_json_output === null)
     }
     for (json <- Seq("", "[", "+1", "1a", """{"a": 1, "b": 2, "a": "3"}""")) {
       checkException(json, "MALFORMED_RECORD_IN_PARSING.WITHOUT_SUGGESTION",
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
index dc68382ecdc0..e6aed3f7a43d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
@@ -6632,6 +6632,17 @@ object functions {
     fnWithOptions("from_json", options, e, schema)
   }
 
+  /**
+   * Parses a JSON string and constructs a Variant value. Returns null if the 
input string is not
+   * a valid JSON value.
+   *
+   * @param json a string column that contains JSON data.
+   *
+   * @group variant_funcs
+   * @since 4.0.0
+   */
+  def try_parse_json(json: Column): Column = Column.fn("try_parse_json", json)
+
   /**
    * Parses a JSON string and constructs a Variant value.
    *
diff --git a/sql/core/src/test/resources/sql-functions/sql-expression-schema.md 
b/sql/core/src/test/resources/sql-functions/sql-expression-schema.md
index ae9e68c4cbb1..dd223939a184 100644
--- a/sql/core/src/test/resources/sql-functions/sql-expression-schema.md
+++ b/sql/core/src/test/resources/sql-functions/sql-expression-schema.md
@@ -437,9 +437,10 @@
 | org.apache.spark.sql.catalyst.expressions.aggregate.VarianceSamp | var_samp 
| SELECT var_samp(col) FROM VALUES (1), (2), (3) AS tab(col) | 
struct<var_samp(col):double> |
 | org.apache.spark.sql.catalyst.expressions.aggregate.VarianceSamp | variance 
| SELECT variance(col) FROM VALUES (1), (2), (3) AS tab(col) | 
struct<variance(col):double> |
 | org.apache.spark.sql.catalyst.expressions.variant.IsVariantNull | 
is_variant_null | SELECT is_variant_null(parse_json('null')) | 
struct<is_variant_null(parse_json(null)):boolean> |
-| org.apache.spark.sql.catalyst.expressions.variant.ParseJson | parse_json | 
SELECT parse_json('{"a":1,"b":0.8}') | 
struct<parse_json({"a":1,"b":0.8}):variant> |
+| org.apache.spark.sql.catalyst.expressions.variant.ParseJsonExpressionBuilder 
| parse_json | SELECT parse_json('{"a":1,"b":0.8}') | 
struct<parse_json({"a":1,"b":0.8}):variant> |
 | org.apache.spark.sql.catalyst.expressions.variant.SchemaOfVariant | 
schema_of_variant | SELECT schema_of_variant(parse_json('null')) | 
struct<schema_of_variant(parse_json(null)):string> |
 | org.apache.spark.sql.catalyst.expressions.variant.SchemaOfVariantAgg | 
schema_of_variant_agg | SELECT schema_of_variant_agg(parse_json(j)) FROM VALUES 
('1'), ('2'), ('3') AS tab(j) | 
struct<schema_of_variant_agg(parse_json(j)):string> |
+| 
org.apache.spark.sql.catalyst.expressions.variant.TryParseJsonExpressionBuilder 
| try_parse_json | SELECT try_parse_json('{"a":1,"b":0.8}') | 
struct<try_parse_json({"a":1,"b":0.8}):variant> |
 | 
org.apache.spark.sql.catalyst.expressions.variant.TryVariantGetExpressionBuilder
 | try_variant_get | SELECT try_variant_get(parse_json('{"a": 1}'), '$.a', 
'int') | struct<try_variant_get(parse_json({"a": 1}), $.a):int> |
 | 
org.apache.spark.sql.catalyst.expressions.variant.VariantGetExpressionBuilder | 
variant_get | SELECT variant_get(parse_json('{"a": 1}'), '$.a', 'int') | 
struct<variant_get(parse_json({"a": 1}), $.a):int> |
 | org.apache.spark.sql.catalyst.expressions.xml.XPathBoolean | xpath_boolean | 
SELECT xpath_boolean('<a><b>1</b></a>','a/b') | 
struct<xpath_boolean(<a><b>1</b></a>, a/b):boolean> |
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/VariantEndToEndSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/VariantEndToEndSuite.scala
index d53b49f7ab5a..c8d267ff5ecc 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/VariantEndToEndSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/VariantEndToEndSuite.scala
@@ -88,6 +88,41 @@ class VariantEndToEndSuite extends QueryTest with 
SharedSparkSession {
     check("[0.0, 1.00, 1.10, 1.23]", "[0,1,1.1,1.23]")
   }
 
+  test("try_parse_json/to_json round-trip") {
+    def check(input: String, output: String = "INPUT IS OUTPUT"): Unit = {
+      val df = Seq(input).toDF("v")
+      val variantDF = df.selectExpr("to_json(try_parse_json(v)) as 
v").select(Column("v"))
+      val expected = if (output != "INPUT IS OUTPUT") output else input
+      checkAnswer(variantDF, Seq(Row(expected)))
+    }
+
+    check("null")
+    check("true")
+    check("false")
+    check("-1")
+    check("1.0E10")
+    check("\"\"")
+    check("\"" + ("a" * 63) + "\"")
+    check("\"" + ("b" * 64) + "\"")
+    // scalastyle:off nonascii
+    check("\"" + ("你好,世界" * 20) + "\"")
+    // scalastyle:on nonascii
+    check("[]")
+    check("{}")
+    // scalastyle:off nonascii
+    check(
+      "[null, true,   false,-1, 1e10, \"\\uD83D\\uDE05\", [ ], { } ]",
+      "[null,true,false,-1,1.0E10,\"😅\",[],{}]"
+    )
+    // scalastyle:on nonascii
+    check("[0.0, 1.00, 1.10, 1.23]", "[0,1,1.1,1.23]")
+    // Places where parse_json should fail and therefore, try_parse_json 
should return null
+    check("{1:2}", null)
+    check("{\"a\":1", null)
+    check("{\"a\":[a,b,c]}", null)
+    check("\"" + "a" * (16 * 1024 * 1024) + "\"", null)
+  }
+
   test("to_json with nested variant") {
     val df = Seq(1).toDF("v")
     val variantDF1 = df.select(
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/VariantSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/VariantSuite.scala
index da99ef346630..19e5f9ba63e6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/VariantSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/VariantSuite.scala
@@ -58,6 +58,15 @@ class VariantSuite extends QueryTest with SharedSparkSession 
{
     }
   }
 
+  test("basic try_parse_json alias") {
+    val df = spark.createDataFrame(Seq(Row("""{ "a" : 1 }"""), Row("""{ a : 1 
}""")).asJava,
+      new StructType().add("json", StringType))
+    val actual = df.select(to_json(try_parse_json(col("json")))).collect()
+
+    assert(actual(0)(0) == """{"a":1}""")
+    assert(actual(1)(0) == null)
+  }
+
   test("basic parse_json alias") {
     val df = spark.createDataFrame(Seq(Row("""{ "a" : 1 }""")).asJava,
       new StructType().add("json", StringType))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to