This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
     new 8a77a012cd6d [SPARK-47704][SQL] JSON parsing fails with 
"java.lang.ClassCastException" when spark.sql.json.enablePartialResults is 
enabled
8a77a012cd6d is described below

commit 8a77a012cd6d1d3057bb7f1340850cf567b8a6ed
Author: Ivan Sadikov <ivan.sadi...@databricks.com>
AuthorDate: Thu Apr 11 10:50:11 2024 +0900

    [SPARK-47704][SQL] JSON parsing fails with "java.lang.ClassCastException" 
when spark.sql.json.enablePartialResults is enabled
    
    This PR fixes a bug that was introduced in 
[SPARK-47704](https://issues.apache.org/jira/browse/SPARK-47704). To be 
precise, SPARK-47704 missed this corner case because I could not find a small 
stable repro for the problem at the time.
    
    When `spark.sql.json.enablePartialResults` is enabled (which is the 
default), if a user tries to read `{"a":[{"key":{"b":0}}]}` with the code:
    ```scala
    val df = spark.read
      .schema("a array<map<string, struct<b boolean>>>")
      .json(path)
    ```
    exception is thrown:
    ```
    java.lang.ClassCastException: class 
org.apache.spark.sql.catalyst.util.ArrayBasedMapData cannot be cast to class 
org.apache.spark.sql.catalyst.util.ArrayData 
(org.apache.spark.sql.catalyst.util.ArrayBasedMapData and 
org.apache.spark.sql.catalyst.util.ArrayData are in unnamed module of loader 
'app')
    at 
org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getArray(rows.scala:53)
    at 
org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getArray$(rows.scala:53)
    at 
org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getArray(rows.scala:172)
    at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown
 Source)
    at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown
 Source)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
    at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1$$anon$2.getNext(FileScanRDD.scala:605)
    at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
    at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.$anonfun$prepareNextFile$1(FileScanRDD.scala:884)
    at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659)
    ```
    
    The same happens when map and array are reversed: `{"a":{"key":[{"b":0}]}}`:
    ```scala
    val df = spark.read
      .schema("a map<string, array<struct<b boolean>>>")
      .json(path)
    ```
    
    In both cases, we should partially parse the record, only struct with 
boolean type cannot be parsed:
    - `Row(Array(Map("key" -> Row(null))))` in the first case.
    - `Row(Map("key" -> Array(Row(null))))` in the second case.
    
    We simply did not handle all of the partial results exceptions when 
converting array and map, instead of catching `PartialResultException` which is 
only for structs. Instead, we should catch `PartialValueException` that covers 
struct, map, and array.
    
    Fixes a bug where user would encounter an exception instead of reading a 
partially parsed JSON record.
    
    No.
    
    I added unit tests that verify the fix.
    
    No.
    
    Closes #45833 from sadikovi/SPARK-47704.
    
    Authored-by: Ivan Sadikov <ivan.sadi...@databricks.com>
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
    (cherry picked from commit a2b7050e0fc5db6ac98db57309e4737acd26bf3a)
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
---
 .../spark/sql/catalyst/json/JacksonParser.scala    | 12 +++---
 .../sql/execution/datasources/json/JsonSuite.scala | 44 ++++++++++++++++++++++
 2 files changed, 50 insertions(+), 6 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
index f14f70532e65..3f6ea9a174c0 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
@@ -497,9 +497,9 @@ class JacksonParser(
       try {
         values += fieldConverter.apply(parser)
       } catch {
-        case PartialResultException(row, cause) if enablePartialResults =>
-          badRecordException = badRecordException.orElse(Some(cause))
-          values += row
+        case err: PartialValueException if enablePartialResults =>
+          badRecordException = badRecordException.orElse(Some(err.cause))
+          values += err.partialResult
         case NonFatal(e) if enablePartialResults =>
           badRecordException = badRecordException.orElse(Some(e))
           parser.skipChildren()
@@ -534,9 +534,9 @@ class JacksonParser(
         if (isRoot && v == null) throw 
QueryExecutionErrors.rootConverterReturnNullError()
         values += v
       } catch {
-        case PartialResultException(row, cause) if enablePartialResults =>
-          badRecordException = badRecordException.orElse(Some(cause))
-          values += row
+        case err: PartialValueException if enablePartialResults =>
+          badRecordException = badRecordException.orElse(Some(err.cause))
+          values += err.partialResult
       }
     }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index 11779286ec25..e8005b204191 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -3654,6 +3654,50 @@ abstract class JsonSuite
     assert(JSONOptions.getAlternativeOption("charset").contains("encoding"))
     assert(JSONOptions.getAlternativeOption("dateFormat").isEmpty)
   }
+
+  test("SPARK-47704: Handle partial parsing of array<map>") {
+    withTempPath { path =>
+      Seq("""{"a":[{"key":{"b":0}}]}""").toDF()
+        .repartition(1)
+        .write.text(path.getAbsolutePath)
+
+      for (enablePartialResults <- Seq(true, false)) {
+        withSQLConf(SQLConf.JSON_ENABLE_PARTIAL_RESULTS.key -> 
s"$enablePartialResults") {
+          val df = spark.read
+            .schema("a array<map<string, struct<b boolean>>>")
+            .json(path.getAbsolutePath)
+
+          if (enablePartialResults) {
+            checkAnswer(df, Seq(Row(Array(Map("key" -> Row(null))))))
+          } else {
+            checkAnswer(df, Seq(Row(null)))
+          }
+        }
+      }
+    }
+  }
+
+  test("SPARK-47704: Handle partial parsing of map<string, array>") {
+    withTempPath { path =>
+      Seq("""{"a":{"key":[{"b":0}]}}""").toDF()
+        .repartition(1)
+        .write.text(path.getAbsolutePath)
+
+      for (enablePartialResults <- Seq(true, false)) {
+        withSQLConf(SQLConf.JSON_ENABLE_PARTIAL_RESULTS.key -> 
s"$enablePartialResults") {
+          val df = spark.read
+            .schema("a map<string, array<struct<b boolean>>>")
+            .json(path.getAbsolutePath)
+
+          if (enablePartialResults) {
+            checkAnswer(df, Seq(Row(Map("key" -> Seq(Row(null))))))
+          } else {
+            checkAnswer(df, Seq(Row(null)))
+          }
+        }
+      }
+    }
+  }
 }
 
 class JsonV1Suite extends JsonSuite {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to