This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
     new 0dea7db3660 [SPARK-44940][SQL][3.5] Improve performance of JSON 
parsing when "spark.sql.json.enablePartialResults" is enabled
0dea7db3660 is described below

commit 0dea7db3660b9db7bbe075c31712e7119bfd1af7
Author: Ivan Sadikov <ivan.sadi...@databricks.com>
AuthorDate: Mon Sep 4 14:37:32 2023 -0700

    [SPARK-44940][SQL][3.5] Improve performance of JSON parsing when 
"spark.sql.json.enablePartialResults" is enabled
    
    ### What changes were proposed in this pull request?
    
    Backport of https://github.com/apache/spark/pull/42667 to branch-3.5.
    
    The PR improves JSON parsing when `spark.sql.json.enablePartialResults` is 
enabled:
    - Fixes the issue when using nested arrays `ClassCastException: 
org.apache.spark.sql.catalyst.util.GenericArrayData cannot be cast to 
org.apache.spark.sql.catalyst.InternalRow`
    - Improves parsing of the nested struct fields, e.g. `{"a1": "AAA", "a2": 
[{"f1": "", "f2": ""}], "a3": "id1", "a4": "XXX"}` used to be parsed as 
`|AAA|NULL    |NULL|NULL|` and now is parsed as `|AAA|[{NULL, }]|id1|XXX|`.
    - Improves performance of nested JSON parsing. The initial implementation 
would throw too many exceptions when multiple nested fields failed to parse. 
When the config is disabled, it is not a problem because the entire record is 
marked as NULL.
    
    The internal benchmarks show the performance improvement from slowdown of 
over 160% to an improvement of 7-8% compared to the master branch when the flag 
is enabled. I will create a follow-up ticket to add a benchmark for this 
regression.
    
    ### Why are the changes needed?
    
    Fixes some corner cases in JSON parsing and improves performance when 
`spark.sql.json.enablePartialResults` is enabled.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    I added tests to verify nested structs, maps, and arrays can be parsed 
without affecting the subsequent fields in the JSON. I also updated the 
existing tests when `spark.sql.json.enablePartialResults` is enabled because we 
parse more data now.
    
    I added a benchmark to check performance.
    
    Before the change (master, 
https://github.com/apache/spark/commit/a45a3a3d60cb97b107a177ad16bfe36372bc3e9b):
    ```
    [info] OpenJDK 64-Bit Server VM 1.8.0_292-8u292-b10-0ubuntu1~18.04-b10 on 
Linux 5.4.0-1045-aws
    [info] Intel(R) Xeon(R) Platinum 8375C CPU  2.90GHz
    [info] Partial JSON results:                     Best Time(ms)   Avg 
Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
    [info] 
------------------------------------------------------------------------------------------------------------------------
    [info] parse invalid JSON                                 9537           
9820         452          0.0      953651.6       1.0X
    ```
    
    After the change (this PR):
    ```
    OpenJDK 64-Bit Server VM 1.8.0_292-8u292-b10-0ubuntu1~18.04-b10 on Linux 
5.4.0-1045-aws
    Intel(R) Xeon(R) Platinum 8375C CPU  2.90GHz
    Partial JSON results:                     Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
    
------------------------------------------------------------------------------------------------------------------------
    parse invalid JSON                                 3100           3106      
     6          0.0      309967.6       1.0X
    ```
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #42790 from sadikovi/SPARK-44940-3.5.
    
    Authored-by: Ivan Sadikov <ivan.sadi...@databricks.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../spark/sql/catalyst/json/JacksonParser.scala    |  41 ++++-
 .../sql/catalyst/util/BadRecordException.scala     |  64 +++++++-
 .../spark/sql/errors/QueryExecutionErrors.scala    |  12 +-
 sql/core/benchmarks/JsonBenchmark-results.txt      | 152 +++++++++---------
 .../org/apache/spark/sql/JsonFunctionsSuite.scala  |  20 ++-
 .../execution/datasources/json/JsonBenchmark.scala |  28 ++++
 .../sql/execution/datasources/json/JsonSuite.scala | 170 ++++++++++++++++++++-
 7 files changed, 393 insertions(+), 94 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
index 388edb9024c..f14f70532e6 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
@@ -420,17 +420,17 @@ class JacksonParser(
     case VALUE_STRING if parser.getTextLength < 1 && allowEmptyString =>
       dataType match {
         case FloatType | DoubleType | TimestampType | DateType =>
-          throw QueryExecutionErrors.emptyJsonFieldValueError(dataType)
+          throw EmptyJsonFieldValueException(dataType)
         case _ => null
       }
 
     case VALUE_STRING if parser.getTextLength < 1 =>
-      throw QueryExecutionErrors.emptyJsonFieldValueError(dataType)
+      throw EmptyJsonFieldValueException(dataType)
 
     case token =>
       // We cannot parse this token based on the given data type. So, we throw 
a
       // RuntimeException and this exception will be caught by `parse` method.
-      throw QueryExecutionErrors.cannotParseJSONFieldError(parser, token, 
dataType)
+      throw CannotParseJSONFieldException(parser.getCurrentName, 
parser.getText, token, dataType)
   }
 
   /**
@@ -459,6 +459,11 @@ class JacksonParser(
             bitmask(index) = false
           } catch {
             case e: SparkUpgradeException => throw e
+            case err: PartialValueException if enablePartialResults =>
+              badRecordException = badRecordException.orElse(Some(err.cause))
+              row.update(index, err.partialResult)
+              skipRow = structFilters.skipRow(row, index)
+              bitmask(index) = false
             case NonFatal(e) if isRoot || enablePartialResults =>
               badRecordException = badRecordException.orElse(Some(e))
               parser.skipChildren()
@@ -508,7 +513,7 @@ class JacksonParser(
     if (badRecordException.isEmpty) {
       mapData
     } else {
-      throw PartialResultException(InternalRow(mapData), 
badRecordException.get)
+      throw PartialMapDataResultException(mapData, badRecordException.get)
     }
   }
 
@@ -543,10 +548,21 @@ class JacksonParser(
       throw PartialResultArrayException(arrayData.toArray[InternalRow](schema),
         badRecordException.get)
     } else {
-      throw PartialResultException(InternalRow(arrayData), 
badRecordException.get)
+      throw PartialArrayDataResultException(arrayData, badRecordException.get)
     }
   }
 
+  /**
+   * Converts the non-stacktrace exceptions to user-friendly 
QueryExecutionErrors.
+   */
+  private def convertCauseForPartialResult(err: Throwable): Throwable = err 
match {
+    case CannotParseJSONFieldException(fieldName, fieldValue, jsonType, 
dataType) =>
+      QueryExecutionErrors.cannotParseJSONFieldError(fieldName, fieldValue, 
jsonType, dataType)
+    case EmptyJsonFieldValueException(dataType) =>
+      QueryExecutionErrors.emptyJsonFieldValueError(dataType)
+    case _ => err
+  }
+
   /**
    * Parse the JSON input to the set of [[InternalRow]]s.
    *
@@ -589,12 +605,25 @@ class JacksonParser(
         throw BadRecordException(
           record = () => recordLiteral(record),
           partialResults = () => Array(row),
-          cause)
+          convertCauseForPartialResult(cause))
       case PartialResultArrayException(rows, cause) =>
         throw BadRecordException(
           record = () => recordLiteral(record),
           partialResults = () => rows,
           cause)
+      // These exceptions should never be thrown outside of JacksonParser.
+      // They are used for the control flow in the parser. We add them here 
for completeness
+      // since they also indicate a bad record.
+      case PartialArrayDataResultException(arrayData, cause) =>
+        throw BadRecordException(
+          record = () => recordLiteral(record),
+          partialResults = () => Array(InternalRow(arrayData)),
+          convertCauseForPartialResult(cause))
+      case PartialMapDataResultException(mapData, cause) =>
+        throw BadRecordException(
+          record = () => recordLiteral(record),
+          partialResults = () => Array(InternalRow(mapData)),
+          convertCauseForPartialResult(cause))
     }
   }
 }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/BadRecordException.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/BadRecordException.scala
index e1223a71f74..65a56c1064e 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/BadRecordException.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/BadRecordException.scala
@@ -17,18 +17,44 @@
 
 package org.apache.spark.sql.catalyst.util
 
+import com.fasterxml.jackson.core.JsonToken
+
 import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.types.DataType
 import org.apache.spark.unsafe.types.UTF8String
 
+abstract class PartialValueException(val cause: Throwable) extends 
Exception(cause) {
+  def partialResult: Serializable
+  override def getStackTrace(): Array[StackTraceElement] = 
cause.getStackTrace()
+  override def fillInStackTrace(): Throwable = this
+}
+
 /**
- * Exception thrown when the underlying parser returns a partial result of 
parsing.
+ * Exception thrown when the underlying parser returns a partial result of 
parsing an object/row.
  * @param partialResult the partial result of parsing a bad record.
  * @param cause the actual exception about why the parser cannot return full 
result.
  */
 case class PartialResultException(
-     partialResult: InternalRow,
-     cause: Throwable)
-  extends Exception(cause)
+    override val partialResult: InternalRow,
+    override val cause: Throwable) extends PartialValueException(cause)
+
+/**
+ * Exception thrown when the underlying parser returns a partial array result.
+ * @param partialResult the partial array result.
+ * @param cause the actual exception about why the parser cannot return full 
result.
+ */
+case class PartialArrayDataResultException(
+    override val partialResult: ArrayData,
+    override val cause: Throwable) extends PartialValueException(cause)
+
+/**
+ * Exception thrown when the underlying parser returns a partial map result.
+ * @param partialResult the partial map result.
+ * @param cause the actual exception about why the parser cannot return full 
result.
+ */
+case class PartialMapDataResultException(
+    override val partialResult: MapData,
+    override val cause: Throwable) extends PartialValueException(cause)
 
 /**
  * Exception thrown when the underlying parser returns partial result list of 
parsing.
@@ -56,3 +82,33 @@ case class BadRecordException(
  * Exception thrown when the underlying parser parses a JSON array as a struct.
  */
 case class JsonArraysAsStructsException() extends RuntimeException()
+
+/**
+ * Exception thrown when the underlying parser can not parses a String as a 
datatype.
+ */
+case class StringAsDataTypeException(
+    fieldName: String,
+    fieldValue: String,
+    dataType: DataType) extends RuntimeException()
+
+/**
+ * No-stacktrace equivalent of 
`QueryExecutionErrors.cannotParseJSONFieldError`.
+ * Used for code control flow in the parser without overhead of creating a 
full exception.
+ */
+case class CannotParseJSONFieldException(
+    fieldName: String,
+    fieldValue: String,
+    jsonType: JsonToken,
+    dataType: DataType) extends RuntimeException() {
+  override def getStackTrace(): Array[StackTraceElement] = new 
Array[StackTraceElement](0)
+  override def fillInStackTrace(): Throwable = this
+}
+
+/**
+ * No-stacktrace equivalent of `QueryExecutionErrors.emptyJsonFieldValueError`.
+ * Used for code control flow in the parser without overhead of creating a 
full exception.
+ */
+case class EmptyJsonFieldValueException(dataType: DataType) extends 
RuntimeException() {
+  override def getStackTrace(): Array[StackTraceElement] = new 
Array[StackTraceElement](0)
+  override def fillInStackTrace(): Throwable = this
+}
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
index 45986e42348..f4968cd0057 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
@@ -1329,11 +1329,19 @@ private[sql] object QueryExecutionErrors extends 
QueryErrorsBase with ExecutionE
 
   def cannotParseJSONFieldError(parser: JsonParser, jsonType: JsonToken, 
dataType: DataType)
   : SparkRuntimeException = {
+    cannotParseJSONFieldError(parser.getCurrentName, parser.getText, jsonType, 
dataType)
+  }
+
+  def cannotParseJSONFieldError(
+      fieldName: String,
+      fieldValue: String,
+      jsonType: JsonToken,
+      dataType: DataType): SparkRuntimeException = {
     new SparkRuntimeException(
       errorClass = "CANNOT_PARSE_JSON_FIELD",
       messageParameters = Map(
-        "fieldName" -> toSQLValue(parser.getCurrentName, StringType),
-        "fieldValue" -> parser.getText,
+        "fieldName" -> toSQLValue(fieldName, StringType),
+        "fieldValue" -> fieldValue,
         "jsonType" -> jsonType.toString(),
         "dataType" -> toSQLType(dataType)))
   }
diff --git a/sql/core/benchmarks/JsonBenchmark-results.txt 
b/sql/core/benchmarks/JsonBenchmark-results.txt
index 55f66f7bb24..e53c7801141 100644
--- a/sql/core/benchmarks/JsonBenchmark-results.txt
+++ b/sql/core/benchmarks/JsonBenchmark-results.txt
@@ -3,121 +3,125 @@ Benchmark for performance of JSON parsing
 
================================================================================================
 
 Preparing data for benchmarking ...
-OpenJDK 64-Bit Server VM 1.8.0_362-b09 on Linux 5.15.0-1037-azure
-Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz
+OpenJDK 64-Bit Server VM 1.8.0_292-8u292-b10-0ubuntu1~18.04-b10 on Linux 
5.4.0-1045-aws
+Intel(R) Xeon(R) Platinum 8375C CPU @ 2.90GHz
 JSON schema inferring:                    Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
------------------------------------------------------------------------------------------------------------------------
-No encoding                                        3720           3843         
121          1.3         743.9       1.0X
-UTF-8 is set                                       5412           5455         
 45          0.9        1082.4       0.7X
+No encoding                                        2084           2134         
 46          2.4         416.8       1.0X
+UTF-8 is set                                       3077           3093         
 14          1.6         615.3       0.7X
 
 Preparing data for benchmarking ...
-OpenJDK 64-Bit Server VM 1.8.0_362-b09 on Linux 5.15.0-1037-azure
-Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz
+OpenJDK 64-Bit Server VM 1.8.0_292-8u292-b10-0ubuntu1~18.04-b10 on Linux 
5.4.0-1045-aws
+Intel(R) Xeon(R) Platinum 8375C CPU @ 2.90GHz
 count a short column:                     Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
------------------------------------------------------------------------------------------------------------------------
-No encoding                                        3234           3254         
 33          1.5         646.7       1.0X
-UTF-8 is set                                       4847           4868         
 21          1.0         969.5       0.7X
+No encoding                                        2854           2863         
  8          1.8         570.8       1.0X
+UTF-8 is set                                       4066           4066         
  1          1.2         813.1       0.7X
 
 Preparing data for benchmarking ...
-OpenJDK 64-Bit Server VM 1.8.0_362-b09 on Linux 5.15.0-1037-azure
-Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz
+OpenJDK 64-Bit Server VM 1.8.0_292-8u292-b10-0ubuntu1~18.04-b10 on Linux 
5.4.0-1045-aws
+Intel(R) Xeon(R) Platinum 8375C CPU @ 2.90GHz
 count a wide column:                      Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
------------------------------------------------------------------------------------------------------------------------
-No encoding                                        5702           5794         
101          0.2        5702.1       1.0X
-UTF-8 is set                                       9526           9607         
 73          0.1        9526.1       0.6X
+No encoding                                        3348           3368         
 26          0.3        3347.8       1.0X
+UTF-8 is set                                       5215           5239         
 22          0.2        5214.7       0.6X
 
 Preparing data for benchmarking ...
-OpenJDK 64-Bit Server VM 1.8.0_362-b09 on Linux 5.15.0-1037-azure
-Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz
+OpenJDK 64-Bit Server VM 1.8.0_292-8u292-b10-0ubuntu1~18.04-b10 on Linux 
5.4.0-1045-aws
+Intel(R) Xeon(R) Platinum 8375C CPU @ 2.90GHz
 select wide row:                          Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
------------------------------------------------------------------------------------------------------------------------
-No encoding                                       18318          18448         
199          0.0      366367.7       1.0X
-UTF-8 is set                                      19791          19887         
 99          0.0      395817.1       0.9X
+No encoding                                       11046          11102         
 54          0.0      220928.4       1.0X
+UTF-8 is set                                      12135          12181         
 54          0.0      242697.4       0.9X
 
 Preparing data for benchmarking ...
-OpenJDK 64-Bit Server VM 1.8.0_362-b09 on Linux 5.15.0-1037-azure
-Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz
+OpenJDK 64-Bit Server VM 1.8.0_292-8u292-b10-0ubuntu1~18.04-b10 on Linux 
5.4.0-1045-aws
+Intel(R) Xeon(R) Platinum 8375C CPU @ 2.90GHz
 Select a subset of 10 columns:            Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
------------------------------------------------------------------------------------------------------------------------
-Select 10 columns                                  2531           2570         
 51          0.4        2531.3       1.0X
-Select 1 column                                    1867           1882         
 16          0.5        1867.0       1.4X
+Select 10 columns                                  2486           2488         
  2          0.4        2486.5       1.0X
+Select 1 column                                    1505           1506         
  2          0.7        1504.6       1.7X
 
 Preparing data for benchmarking ...
-OpenJDK 64-Bit Server VM 1.8.0_362-b09 on Linux 5.15.0-1037-azure
-Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz
+OpenJDK 64-Bit Server VM 1.8.0_292-8u292-b10-0ubuntu1~18.04-b10 on Linux 
5.4.0-1045-aws
+Intel(R) Xeon(R) Platinum 8375C CPU @ 2.90GHz
 creation of JSON parser per line:         Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
------------------------------------------------------------------------------------------------------------------------
-Short column without encoding                       868            875         
  7          1.2         868.4       1.0X
-Short column with UTF-8                            1151           1163         
 11          0.9        1150.9       0.8X
-Wide column without encoding                      12063          12299         
205          0.1       12063.0       0.1X
-Wide column with UTF-8                            16095          16136         
 51          0.1       16095.3       0.1X
+Short column without encoding                       888            889         
  3          1.1         887.6       1.0X
+Short column with UTF-8                            1134           1136         
  2          0.9        1134.3       0.8X
+Wide column without encoding                       8012           8056         
 51          0.1        8012.4       0.1X
+Wide column with UTF-8                             9830           9844         
 22          0.1        9829.7       0.1X
 
 Preparing data for benchmarking ...
-OpenJDK 64-Bit Server VM 1.8.0_362-b09 on Linux 5.15.0-1037-azure
-Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz
+OpenJDK 64-Bit Server VM 1.8.0_292-8u292-b10-0ubuntu1~18.04-b10 on Linux 
5.4.0-1045-aws
+Intel(R) Xeon(R) Platinum 8375C CPU @ 2.90GHz
 JSON functions:                           Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
------------------------------------------------------------------------------------------------------------------------
-Text read                                           165            170         
  4          6.1         164.7       1.0X
-from_json                                          2339           2386         
 77          0.4        2338.9       0.1X
-json_tuple                                         2667           2730         
 55          0.4        2667.3       0.1X
-get_json_object                                    2627           2659         
 32          0.4        2627.1       0.1X
+Text read                                            85             87         
  2         11.7          85.4       1.0X
+from_json                                          1706           1711         
  4          0.6        1706.4       0.1X
+json_tuple                                         1528           1534         
  7          0.7        1528.2       0.1X
+get_json_object                                    1275           1286         
 17          0.8        1275.0       0.1X
 
 Preparing data for benchmarking ...
-OpenJDK 64-Bit Server VM 1.8.0_362-b09 on Linux 5.15.0-1037-azure
-Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz
+OpenJDK 64-Bit Server VM 1.8.0_292-8u292-b10-0ubuntu1~18.04-b10 on Linux 
5.4.0-1045-aws
+Intel(R) Xeon(R) Platinum 8375C CPU @ 2.90GHz
 Dataset of json strings:                  Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
------------------------------------------------------------------------------------------------------------------------
-Text read                                           700            715         
 20          7.1         140.1       1.0X
-schema inferring                                   3144           3166         
 20          1.6         628.7       0.2X
-parsing                                            3261           3271         
  9          1.5         652.1       0.2X
+Text read                                           369            370         
  1         13.6          73.8       1.0X
+schema inferring                                   1880           1883         
  4          2.7         376.0       0.2X
+parsing                                            3731           3737         
  8          1.3         746.1       0.1X
 
 Preparing data for benchmarking ...
-OpenJDK 64-Bit Server VM 1.8.0_362-b09 on Linux 5.15.0-1037-azure
-Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz
+OpenJDK 64-Bit Server VM 1.8.0_292-8u292-b10-0ubuntu1~18.04-b10 on Linux 
5.4.0-1045-aws
+Intel(R) Xeon(R) Platinum 8375C CPU @ 2.90GHz
 Json files in the per-line mode:          Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
------------------------------------------------------------------------------------------------------------------------
-Text read                                          1096           1105         
 12          4.6         219.1       1.0X
-Schema inferring                                   3818           3830         
 16          1.3         763.6       0.3X
-Parsing without charset                            4107           4137         
 32          1.2         821.4       0.3X
-Parsing with UTF-8                                 5717           5763         
 41          0.9        1143.3       0.2X
+Text read                                           553            579         
 32          9.0         110.6       1.0X
+Schema inferring                                   2195           2196         
  2          2.3         439.0       0.3X
+Parsing without charset                            4272           4274         
  3          1.2         854.3       0.1X
+Parsing with UTF-8                                 5459           5464         
  6          0.9        1091.7       0.1X
 
-OpenJDK 64-Bit Server VM 1.8.0_362-b09 on Linux 5.15.0-1037-azure
-Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz
+OpenJDK 64-Bit Server VM 1.8.0_292-8u292-b10-0ubuntu1~18.04-b10 on Linux 
5.4.0-1045-aws
+Intel(R) Xeon(R) Platinum 8375C CPU @ 2.90GHz
 Write dates and timestamps:               Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
------------------------------------------------------------------------------------------------------------------------
-Create a dataset of timestamps                      199            202         
  3          5.0         198.9       1.0X
-to_json(timestamp)                                 1458           1487         
 26          0.7        1458.0       0.1X
-write timestamps to files                          1232           1262         
 26          0.8        1232.5       0.2X
-Create a dataset of dates                           231            237         
  5          4.3         230.8       0.9X
-to_json(date)                                       956            966         
 10          1.0         956.5       0.2X
-write dates to files                                785            793         
 10          1.3         785.4       0.3X
+Create a dataset of timestamps                      102            112         
 13          9.8         101.9       1.0X
+to_json(timestamp)                                  840            841         
  1          1.2         839.6       0.1X
+write timestamps to files                           692            696         
  4          1.4         692.0       0.1X
+Create a dataset of dates                           120            121         
  1          8.4         119.7       0.9X
+to_json(date)                                       589            591         
  2          1.7         589.3       0.2X
+write dates to files                                442            445         
  2          2.3         442.3       0.2X
 
-OpenJDK 64-Bit Server VM 1.8.0_362-b09 on Linux 5.15.0-1037-azure
-Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz
+OpenJDK 64-Bit Server VM 1.8.0_292-8u292-b10-0ubuntu1~18.04-b10 on Linux 
5.4.0-1045-aws
+Intel(R) Xeon(R) Platinum 8375C CPU @ 2.90GHz
 Read dates and timestamps:                                             Best 
Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
-----------------------------------------------------------------------------------------------------------------------------------------------------
-read timestamp text from files                                                 
  294            300           6          3.4         293.8       1.0X
-read timestamps from files                                                     
 3254           3283          49          0.3        3254.0       0.1X
-infer timestamps from files                                                    
 8390           8528         165          0.1        8389.8       0.0X
-read date text from files                                                      
  269            276           7          3.7         269.3       1.1X
-read date from files                                                           
 1178           1192          13          0.8        1177.8       0.2X
-timestamp strings                                                              
  406            418          15          2.5         406.2       0.7X
-parse timestamps from Dataset[String]                                          
 3700           3713          16          0.3        3699.5       0.1X
-infer timestamps from Dataset[String]                                          
 8604           8647          65          0.1        8604.0       0.0X
-date strings                                                                   
  464            479          14          2.2         463.7       0.6X
-parse dates from Dataset[String]                                               
 1528           1538          10          0.7        1527.7       0.2X
-from_json(timestamp)                                                           
 5402           5429          26          0.2        5401.8       0.1X
-from_json(date)                                                                
 2948           2966          17          0.3        2947.5       0.1X
-infer error timestamps from Dataset[String] with default format                
 2358           2434          70          0.4        2357.6       0.1X
-infer error timestamps from Dataset[String] with user-provided format          
 2363           2390          36          0.4        2362.9       0.1X
-infer error timestamps from Dataset[String] with legacy format                 
 2248           2287          35          0.4        2248.3       0.1X
+read timestamp text from files                                                 
  143            145           4          7.0         142.6       1.0X
+read timestamps from files                                                     
 2449           2469          17          0.4        2448.6       0.1X
+infer timestamps from files                                                    
 5579           5596          15          0.2        5578.8       0.0X
+read date text from files                                                      
  132            139           7          7.6         131.7       1.1X
+read date from files                                                           
 1017           1020           2          1.0        1017.5       0.1X
+timestamp strings                                                              
  227            230           3          4.4         227.2       0.6X
+parse timestamps from Dataset[String]                                          
 2827           2830           3          0.4        2826.5       0.1X
+infer timestamps from Dataset[String]                                          
 6001           6008           6          0.2        6001.2       0.0X
+date strings                                                                   
  259            261           2          3.9         259.0       0.6X
+parse dates from Dataset[String]                                               
 1382           1387           4          0.7        1382.3       0.1X
+from_json(timestamp)                                                           
 3557           3561           7          0.3        3556.8       0.0X
+from_json(date)                                                                
 2146           2148           2          0.5        2146.4       0.1X
+infer error timestamps from Dataset[String] with default format                
 1989           1993           4          0.5        1989.3       0.1X
+infer error timestamps from Dataset[String] with user-provided format          
 1922           1925           3          0.5        1922.1       0.1X
+infer error timestamps from Dataset[String] with legacy format                 
 1919           1923           4          0.5        1919.1       0.1X
 
-OpenJDK 64-Bit Server VM 1.8.0_362-b09 on Linux 5.15.0-1037-azure
-Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz
+OpenJDK 64-Bit Server VM 1.8.0_292-8u292-b10-0ubuntu1~18.04-b10 on Linux 
5.4.0-1045-aws
+Intel(R) Xeon(R) Platinum 8375C CPU @ 2.90GHz
 Filters pushdown:                         Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
------------------------------------------------------------------------------------------------------------------------
-w/o filters                                       22544          22661         
109          0.0      225436.4       1.0X
-pushdown disabled                                 21045          21213         
188          0.0      210452.6       1.1X
-w/ filters                                          893            904         
 10          0.1        8931.8      25.2X
-
+w/o filters                                       14387          14399         
 12          0.0      143871.9       1.0X
+pushdown disabled                                 13891          13899         
  7          0.0      138912.3       1.0X
+w/ filters                                          782            784         
  2          0.1        7820.5      18.4X
 
+OpenJDK 64-Bit Server VM 1.8.0_292-8u292-b10-0ubuntu1~18.04-b10 on Linux 
5.4.0-1045-aws
+Intel(R) Xeon(R) Platinum 8375C CPU @ 2.90GHz
+Partial JSON results:                     Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
+------------------------------------------------------------------------------------------------------------------------
+parse invalid JSON                                 3100           3106         
  6          0.0      309967.6       1.0X
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
index 187fab75f63..a76e102fe91 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
@@ -1021,16 +1021,23 @@ class JsonFunctionsSuite extends QueryTest with 
SharedSparkSession {
       .add("c2", ArrayType(new StructType().add("c3", LongType).add("c4", 
StringType)))
     val df1 = Seq("""{"c2": [19], "c1": 123456}""").toDF("c0")
     checkAnswer(df1.select(from_json($"c0", st)), Row(Row(123456, null)))
+
     val df2 = Seq("""{"data": {"c2": [19], "c1": 123456}}""").toDF("c0")
-    checkAnswer(df2.select(from_json($"c0", new StructType().add("data", 
st))), Row(Row(null)))
+    withSQLConf(SQLConf.JSON_ENABLE_PARTIAL_RESULTS.key -> "true") {
+      checkAnswer(
+        df2.select(from_json($"c0", new StructType().add("data", st))),
+        Row(Row(Row(123456, null)))
+      )
+    }
+    withSQLConf(SQLConf.JSON_ENABLE_PARTIAL_RESULTS.key -> "false") {
+      checkAnswer(df2.select(from_json($"c0", new StructType().add("data", 
st))), Row(Row(null)))
+    }
 
+    val df3 = Seq("""[{"c2": [19], "c1": 123456}]""").toDF("c0")
     withSQLConf(SQLConf.JSON_ENABLE_PARTIAL_RESULTS.key -> "true") {
-      val df3 = Seq("""[{"c2": [19], "c1": 123456}]""").toDF("c0")
       checkAnswer(df3.select(from_json($"c0", ArrayType(st))), 
Row(Array(Row(123456, null))))
     }
-
     withSQLConf(SQLConf.JSON_ENABLE_PARTIAL_RESULTS.key -> "false") {
-      val df3 = Seq("""[{"c2": [19], "c1": 123456}]""").toDF("c0")
       checkAnswer(df3.select(from_json($"c0", ArrayType(st))), Row(null))
     }
 
@@ -1119,14 +1126,13 @@ class JsonFunctionsSuite extends QueryTest with 
SharedSparkSession {
         )
       )
 
-    // Value "a" cannot be parsed as an integer,
-    // the error cascades to "c2", thus making its value null.
+    // Value "a" cannot be parsed as an integer, c2 value is null.
     val df = Seq("""[{"c1": [{"c2": ["a"]}]}]""").toDF("c0")
 
     withSQLConf(SQLConf.JSON_ENABLE_PARTIAL_RESULTS.key -> "true") {
       checkAnswer(
         df.select(from_json($"c0", ArrayType(st))),
-        Row(Array(Row(null)))
+        Row(Array(Row(Seq(Row(null)))))
       )
     }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonBenchmark.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonBenchmark.scala
index c522378a65d..5b86543648f 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonBenchmark.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonBenchmark.scala
@@ -542,6 +542,33 @@ object JsonBenchmark extends SqlBasedBenchmark {
     }
   }
 
+  private def partialResultBenchmark(rowsNum: Int, numIters: Int): Unit = {
+    val benchmark = new Benchmark("Partial JSON results", rowsNum, output = 
output)
+    val colsNum = 1000
+
+    val fields = Seq.tabulate(colsNum)(i => StructField(s"col$i", IntegerType))
+    val schema = StructType(fields)
+
+    def data: Dataset[String] = {
+      spark.range(0, rowsNum, 1, 1).mapPartitions { iter =>
+        iter.map { i =>
+          (0 until colsNum).map { j =>
+            // Only the last column has an integer value.
+            if (j < colsNum - 1) s""""col${i}":"foo_${j}"""" else 
s""""col${i}":${j}"""
+          }.mkString("{", ", ", "}")
+        }
+      }.select($"value").as[String]
+    }
+
+    benchmark.addCase("parse invalid JSON", numIters) { _ =>
+      withSQLConf(SQLConf.JSON_ENABLE_PARTIAL_RESULTS.key -> "true") {
+        spark.read.schema(schema).json(data).noop()
+      }
+    }
+
+    benchmark.run()
+  }
+
   override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
     val numIters = 3
     runBenchmark("Benchmark for performance of JSON parsing") {
@@ -558,6 +585,7 @@ object JsonBenchmark extends SqlBasedBenchmark {
       // Benchmark pushdown filters that refer to top-level columns.
       // TODO (SPARK-32325): Add benchmarks for filters with nested column 
attributes.
       filtersPushdownBenchmark(rowsNum = 100 * 1000, numIters)
+      partialResultBenchmark(rowsNum = 10000, numIters)
     }
   }
 }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index 95468a1f1d7..11779286ec2 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -3437,7 +3437,7 @@ abstract class JsonSuite
           if (enablePartialResults) {
             checkAnswer(
               df,
-              Seq(Row(null, Row(1)), Row(Row(2, null), Row(2)))
+              Seq(Row(Row(1, null), Row(1)), Row(Row(2, null), Row(2)))
             )
           } else {
             checkAnswer(
@@ -3450,6 +3450,174 @@ abstract class JsonSuite
     }
   }
 
+  test("SPARK-44940: fully parse the record except f1 if partial results are 
enabled") {
+    withTempPath { path =>
+      Seq(
+        """{"a1": "AAA", "a2": [{"f1": "", "f2": ""}], "a3": "id1", "a4": 
"XXX"}""",
+        """{"a1": "BBB", "a2": [{"f1": 12, "f2": ""}], "a3": "id2", "a4": 
"YYY"}""").toDF()
+        .repartition(1)
+        .write.text(path.getAbsolutePath)
+
+      withSQLConf(SQLConf.JSON_ENABLE_PARTIAL_RESULTS.key -> "true") {
+        val df = spark.read.json(path.getAbsolutePath)
+        checkAnswer(
+          df,
+          Seq(
+            Row("AAA", Seq(Row(null, "")), "id1", "XXX"),
+            Row("BBB", Seq(Row(12, "")), "id2", "YYY")
+          )
+        )
+      }
+
+      withSQLConf(SQLConf.JSON_ENABLE_PARTIAL_RESULTS.key -> "false") {
+        val df = spark.read.json(path.getAbsolutePath)
+        checkAnswer(
+          df,
+          Seq(
+            Row("AAA", null, null, null),
+            Row("BBB", Seq(Row(12, "")), "id2", "YYY")
+          )
+        )
+      }
+    }
+  }
+
+  test("SPARK-44940: fully parse primitive map if partial results are 
enabled") {
+    withTempPath { path =>
+      Seq(
+        """{"a1": "AAA", "a2": {"f1": "", "f2": ""}, "a3": "id1"}""",
+        """{"a1": "BBB", "a2": {"f1": 12, "f2": ""}, "a3": "id2"}""").toDF()
+        .repartition(1)
+        .write.text(path.getAbsolutePath)
+
+      val schema = "a1 string, a2 map<string, int>, a3 string"
+
+      withSQLConf(SQLConf.JSON_ENABLE_PARTIAL_RESULTS.key -> "true") {
+        val df = spark.read.schema(schema).json(path.getAbsolutePath)
+        // Although the keys match the string type and some values match the 
integer type, because
+        // some of the values do not match the type, we mark the entire map as 
null.
+        checkAnswer(
+          df,
+          Seq(
+            Row("AAA", null, "id1"),
+            Row("BBB", null, "id2")
+          )
+        )
+      }
+
+      withSQLConf(SQLConf.JSON_ENABLE_PARTIAL_RESULTS.key -> "false") {
+        val df = spark.read.schema(schema).json(path.getAbsolutePath)
+        checkAnswer(
+          df,
+          Seq(
+            Row("AAA", null, null),
+            Row("BBB", null, null)
+          )
+        )
+      }
+    }
+  }
+
+  test("SPARK-44940: fully parse map of structs if partial results are 
enabled") {
+    withTempPath { path =>
+      Seq(
+        """{"a1": "AAA", "a2": {"key": {"f1": "", "f2": ""}}, "a3": "id1"}""",
+        """{"a1": "BBB", "a2": {"key": {"f1": 12, "f2": ""}}, "a3": 
"id2"}""").toDF()
+        .repartition(1)
+        .write.text(path.getAbsolutePath)
+
+      val schema = "a1 string, a2 map<string, struct<f1: int, f2: string>>, a3 
string"
+
+      withSQLConf(SQLConf.JSON_ENABLE_PARTIAL_RESULTS.key -> "true") {
+        val df = spark.read.schema(schema).json(path.getAbsolutePath)
+        checkAnswer(
+          df,
+          Seq(
+            Row("AAA", Map("key" -> Row(null, "")), "id1"),
+            Row("BBB", Map("key" -> Row(12, "")), "id2")
+          )
+        )
+      }
+
+      withSQLConf(SQLConf.JSON_ENABLE_PARTIAL_RESULTS.key -> "false") {
+        val df = spark.read.schema(schema).json(path.getAbsolutePath)
+        checkAnswer(
+          df,
+          Seq(
+            Row("AAA", null, null),
+            Row("BBB", Map("key" -> Row(12, "")), "id2")
+          )
+        )
+      }
+    }
+  }
+
+  test("SPARK-44940: fully parse primitive arrays if partial results are 
enabled") {
+    withTempPath { path =>
+      Seq(
+        """{"a1": "AAA", "a2": {"f1": [""]}, "a3": "id1", "a4": "XXX"}""",
+        """{"a1": "BBB", "a2": {"f1": [12]}, "a3": "id2", "a4": 
"YYY"}""").toDF()
+        .repartition(1)
+        .write.text(path.getAbsolutePath)
+
+      withSQLConf(SQLConf.JSON_ENABLE_PARTIAL_RESULTS.key -> "true") {
+        val df = spark.read.json(path.getAbsolutePath)
+        checkAnswer(
+          df,
+          Seq(
+            Row("AAA", Row(null), "id1", "XXX"),
+            Row("BBB", Row(Seq(12)), "id2", "YYY")
+          )
+        )
+      }
+
+      withSQLConf(SQLConf.JSON_ENABLE_PARTIAL_RESULTS.key -> "false") {
+        val df = spark.read.json(path.getAbsolutePath)
+        checkAnswer(
+          df,
+          Seq(
+            Row("AAA", null, null, null),
+            Row("BBB", Row(Seq(12)), "id2", "YYY")
+          )
+        )
+      }
+    }
+  }
+
+  test("SPARK-44940: fully parse array of arrays if partial results are 
enabled") {
+    withTempPath { path =>
+      Seq(
+        """{"a1": "AAA", "a2": [[12, ""], [""]], "a3": "id1", "a4": "XXX"}""",
+        """{"a1": "BBB", "a2": [[12, 34], [""]], "a3": "id2", "a4": 
"YYY"}""").toDF()
+        .repartition(1)
+        .write.text(path.getAbsolutePath)
+
+      // We cannot parse `array<array<int>>` type because one of the inner 
arrays contains a
+      // mismatched type.
+      withSQLConf(SQLConf.JSON_ENABLE_PARTIAL_RESULTS.key -> "true") {
+        val df = spark.read.json(path.getAbsolutePath)
+        checkAnswer(
+          df,
+          Seq(
+            Row("AAA", null, "id1", "XXX"),
+            Row("BBB", null, "id2", "YYY")
+          )
+        )
+      }
+
+      withSQLConf(SQLConf.JSON_ENABLE_PARTIAL_RESULTS.key -> "false") {
+        val df = spark.read.json(path.getAbsolutePath)
+        checkAnswer(
+          df,
+          Seq(
+            Row("AAA", null, "id1", "XXX"),
+            Row("BBB", null, "id2", "YYY")
+          )
+        )
+      }
+    }
+  }
+
   test("SPARK-40667: validate JSON Options") {
     assert(JSONOptions.getAllOptions.size == 28)
     // Please add validation on any new Json options here


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org


Reply via email to