This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new a2b7050e0fc5 [SPARK-47704][SQL] JSON parsing fails with "java.lang.ClassCastException" when spark.sql.json.enablePartialResults is enabled a2b7050e0fc5 is described below commit a2b7050e0fc5db6ac98db57309e4737acd26bf3a Author: Ivan Sadikov <ivan.sadi...@databricks.com> AuthorDate: Thu Apr 11 10:50:11 2024 +0900 [SPARK-47704][SQL] JSON parsing fails with "java.lang.ClassCastException" when spark.sql.json.enablePartialResults is enabled ### What changes were proposed in this pull request? This PR fixes a bug that was introduced in [SPARK-47704](https://issues.apache.org/jira/browse/SPARK-47704). To be precise, SPARK-47704 missed this corner case because I could not find a small stable repro for the problem at the time. When `spark.sql.json.enablePartialResults` is enabled (which is the default), if a user tries to read `{"a":[{"key":{"b":0}}]}` with the code: ```scala val df = spark.read .schema("a array<map<string, struct<b boolean>>>") .json(path) ``` exception is thrown: ``` java.lang.ClassCastException: class org.apache.spark.sql.catalyst.util.ArrayBasedMapData cannot be cast to class org.apache.spark.sql.catalyst.util.ArrayData (org.apache.spark.sql.catalyst.util.ArrayBasedMapData and org.apache.spark.sql.catalyst.util.ArrayData are in unnamed module of loader 'app') at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getArray(rows.scala:53) at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getArray$(rows.scala:53) at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getArray(rows.scala:172) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source) at scala.collection.Iterator$$anon$10.next(Iterator.scala:461) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1$$anon$2.getNext(FileScanRDD.scala:605) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.$anonfun$prepareNextFile$1(FileScanRDD.scala:884) at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659) ``` The same happens when map and array are reversed: `{"a":{"key":[{"b":0}]}}`: ```scala val df = spark.read .schema("a map<string, array<struct<b boolean>>>") .json(path) ``` In both cases, we should partially parse the record, only struct with boolean type cannot be parsed: - `Row(Array(Map("key" -> Row(null))))` in the first case. - `Row(Map("key" -> Array(Row(null))))` in the second case. We simply did not handle all of the partial results exceptions when converting array and map, instead of catching `PartialResultException` which is only for structs. Instead, we should catch `PartialValueException` that covers struct, map, and array. ### Why are the changes needed? Fixes a bug where user would encounter an exception instead of reading a partially parsed JSON record. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? I added unit tests that verify the fix. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #45833 from sadikovi/SPARK-47704. Authored-by: Ivan Sadikov <ivan.sadi...@databricks.com> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> --- .../spark/sql/catalyst/json/JacksonParser.scala | 12 +++--- .../sql/execution/datasources/json/JsonSuite.scala | 44 ++++++++++++++++++++++ 2 files changed, 50 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala index a16a23cf0049..d3f33a70323f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala @@ -497,9 +497,9 @@ class JacksonParser( try { values += fieldConverter.apply(parser) } catch { - case PartialResultException(row, cause) if enablePartialResults => - badRecordException = badRecordException.orElse(Some(cause)) - values += row + case err: PartialValueException if enablePartialResults => + badRecordException = badRecordException.orElse(Some(err.cause)) + values += err.partialResult case NonFatal(e) if enablePartialResults => badRecordException = badRecordException.orElse(Some(e)) parser.skipChildren() @@ -534,9 +534,9 @@ class JacksonParser( if (isRoot && v == null) throw QueryExecutionErrors.rootConverterReturnNullError() values += v } catch { - case PartialResultException(row, cause) if enablePartialResults => - badRecordException = badRecordException.orElse(Some(cause)) - values += row + case err: PartialValueException if enablePartialResults => + badRecordException = badRecordException.orElse(Some(err.cause)) + values += err.partialResult } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 5c96df98dd23..f3c332bab183 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -3820,6 +3820,50 @@ abstract class JsonSuite } } } + + test("SPARK-47704: Handle partial parsing of array<map>") { + withTempPath { path => + Seq("""{"a":[{"key":{"b":0}}]}""").toDF() + .repartition(1) + .write.text(path.getAbsolutePath) + + for (enablePartialResults <- Seq(true, false)) { + withSQLConf(SQLConf.JSON_ENABLE_PARTIAL_RESULTS.key -> s"$enablePartialResults") { + val df = spark.read + .schema("a array<map<string, struct<b boolean>>>") + .json(path.getAbsolutePath) + + if (enablePartialResults) { + checkAnswer(df, Seq(Row(Array(Map("key" -> Row(null)))))) + } else { + checkAnswer(df, Seq(Row(null))) + } + } + } + } + } + + test("SPARK-47704: Handle partial parsing of map<string, array>") { + withTempPath { path => + Seq("""{"a":{"key":[{"b":0}]}}""").toDF() + .repartition(1) + .write.text(path.getAbsolutePath) + + for (enablePartialResults <- Seq(true, false)) { + withSQLConf(SQLConf.JSON_ENABLE_PARTIAL_RESULTS.key -> s"$enablePartialResults") { + val df = spark.read + .schema("a map<string, array<struct<b boolean>>>") + .json(path.getAbsolutePath) + + if (enablePartialResults) { + checkAnswer(df, Seq(Row(Map("key" -> Seq(Row(null)))))) + } else { + checkAnswer(df, Seq(Row(null))) + } + } + } + } + } } class JsonV1Suite extends JsonSuite { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org