This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 0bbcbc6  [SPARK-36379][SQL] Null at root level of a JSON array should 
not fail w/ permissive mode
0bbcbc6 is described below

commit 0bbcbc65080cd67a9997f49906d9d48fdf21db10
Author: Hyukjin Kwon <gurwls...@apache.org>
AuthorDate: Mon Aug 2 10:01:12 2021 -0700

    [SPARK-36379][SQL] Null at root level of a JSON array should not fail w/ 
permissive mode
    
    ### What changes were proposed in this pull request?
    
    This PR proposes to fail properly so JSON parser can proceed and parse the 
input with the permissive mode.
    Previously, we passed `null`s as are, the root `InternalRow`s became 
`null`s, and it causes the query fails even with permissive mode on.
    Now, we fail explicitly if `null` is passed when the input array contains 
`null`.
    
    Note that this is consistent with non-array JSON input:
    
    **Permissive mode:**
    
    ```scala
    spark.read.json(Seq("""{"a": "str"}""", """null""").toDS).collect()
    ```
    ```
    res0: Array[org.apache.spark.sql.Row] = Array([str], [null])
    ```
    
    **Failfast mode**:
    
    ```scala
    spark.read.option("mode", "failfast").json(Seq("""{"a": "str"}""", 
"""null""").toDS).collect()
    ```
    ```
    org.apache.spark.SparkException: Malformed records are detected in record 
parsing. Parse Mode: FAILFAST. To process malformed records as null result, try 
setting the option 'mode' as 'PERMISSIVE'.
        at 
org.apache.spark.sql.catalyst.util.FailureSafeParser.parse(FailureSafeParser.scala:70)
        at 
org.apache.spark.sql.DataFrameReader.$anonfun$json$7(DataFrameReader.scala:540)
        at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484)
    ```
    
    ### Why are the changes needed?
    
    To make the permissive mode to proceed and parse without throwing an 
exception.
    
    ### Does this PR introduce _any_ user-facing change?
    
    **Permissive mode:**
    
    ```scala
    spark.read.json(Seq("""[{"a": "str"}, null]""").toDS).collect()
    ```
    
    Before:
    
    ```
    java.lang.NullPointerException
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
 Source)
        at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
    ```
    
    After:
    
    ```
    res0: Array[org.apache.spark.sql.Row] = Array([null])
    ```
    
    NOTE that this behaviour is consistent when JSON object is malformed:
    
    ```scala
    spark.read.schema("a int").json(Seq("""[{"a": 123}, {123123}, {"a": 
123}]""").toDS).collect()
    ```
    
    ```
    res0: Array[org.apache.spark.sql.Row] = Array([null])
    ```
    
    Since we're parsing _one_ JSON array, related records all fail together.
    
    **Failfast mode:**
    
    ```scala
    spark.read.option("mode", "failfast").json(Seq("""[{"a": "str"}, 
null]""").toDS).collect()
    ```
    
    Before:
    
    ```
    java.lang.NullPointerException
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
 Source)
        at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
    ```
    
    After:
    
    ```
    org.apache.spark.SparkException: Malformed records are detected in record 
parsing. Parse Mode: FAILFAST. To process malformed records as null result, try 
setting the option 'mode' as 'PERMISSIVE'.
        at 
org.apache.spark.sql.catalyst.util.FailureSafeParser.parse(FailureSafeParser.scala:70)
        at 
org.apache.spark.sql.DataFrameReader.$anonfun$json$7(DataFrameReader.scala:540)
        at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484)
    ```
    
    ### How was this patch tested?
    
    Manually tested, and unit test was added.
    
    Closes #33608 from HyukjinKwon/SPARK-36379.
    
    Authored-by: Hyukjin Kwon <gurwls...@apache.org>
    Signed-off-by: Dongjoon Hyun <dongj...@apache.org>
---
 .../org/apache/spark/sql/catalyst/json/JacksonParser.scala |  9 ++++++---
 .../spark/sql/execution/datasources/json/JsonSuite.scala   | 14 ++++++++++++++
 2 files changed, 20 insertions(+), 3 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
index 54cf251..dfa746f 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
@@ -108,7 +108,7 @@ class JacksonParser(
         // List([str_a_2,null], [null,str_b_3])
         //
       case START_ARRAY if allowArrayAsStructs =>
-        val array = convertArray(parser, elementConverter)
+        val array = convertArray(parser, elementConverter, isRoot = true)
         // Here, as we support reading top level JSON arrays and take every 
element
         // in such an array as a row, this case is possible.
         if (array.numElements() == 0) {
@@ -450,10 +450,13 @@ class JacksonParser(
    */
   private def convertArray(
       parser: JsonParser,
-      fieldConverter: ValueConverter): ArrayData = {
+      fieldConverter: ValueConverter,
+      isRoot: Boolean = false): ArrayData = {
     val values = ArrayBuffer.empty[Any]
     while (nextUntil(parser, JsonToken.END_ARRAY)) {
-      values += fieldConverter.apply(parser)
+      val v = fieldConverter.apply(parser)
+      if (isRoot && v == null) throw 
QueryExecutionErrors.rootConverterReturnNullError()
+      values += v
     }
 
     new GenericArrayData(values.toArray)
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index 31df5df..0910ff8 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -2944,6 +2944,20 @@ abstract class JsonSuite
       }
     }
   }
+
+  test("SPARK-36379: proceed parsing with root nulls in permissive mode") {
+    assert(intercept[SparkException] {
+      spark.read.option("mode", "failfast")
+        .schema("a string").json(Seq("""[{"a": "str"}, 
null]""").toDS).collect()
+    }.getMessage.contains("Malformed records are detected"))
+
+    // Permissive modes should proceed parsing malformed records (null).
+    // Here, since an array fails to parse in the middle, we will return one 
row.
+    checkAnswer(
+      spark.read.option("mode", "permissive")
+        .json(Seq("""[{"a": "str"}, null, {"a": "str"}]""").toDS),
+      Row(null) :: Nil)
+  }
 }
 
 class JsonV1Suite extends JsonSuite {

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to