This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.5 by this push: new 0dea7db3660 [SPARK-44940][SQL][3.5] Improve performance of JSON parsing when "spark.sql.json.enablePartialResults" is enabled 0dea7db3660 is described below commit 0dea7db3660b9db7bbe075c31712e7119bfd1af7 Author: Ivan Sadikov <ivan.sadi...@databricks.com> AuthorDate: Mon Sep 4 14:37:32 2023 -0700 [SPARK-44940][SQL][3.5] Improve performance of JSON parsing when "spark.sql.json.enablePartialResults" is enabled ### What changes were proposed in this pull request? Backport of https://github.com/apache/spark/pull/42667 to branch-3.5. The PR improves JSON parsing when `spark.sql.json.enablePartialResults` is enabled: - Fixes the issue when using nested arrays `ClassCastException: org.apache.spark.sql.catalyst.util.GenericArrayData cannot be cast to org.apache.spark.sql.catalyst.InternalRow` - Improves parsing of the nested struct fields, e.g. `{"a1": "AAA", "a2": [{"f1": "", "f2": ""}], "a3": "id1", "a4": "XXX"}` used to be parsed as `|AAA|NULL |NULL|NULL|` and now is parsed as `|AAA|[{NULL, }]|id1|XXX|`. - Improves performance of nested JSON parsing. The initial implementation would throw too many exceptions when multiple nested fields failed to parse. When the config is disabled, it is not a problem because the entire record is marked as NULL. The internal benchmarks show the performance improvement from slowdown of over 160% to an improvement of 7-8% compared to the master branch when the flag is enabled. I will create a follow-up ticket to add a benchmark for this regression. ### Why are the changes needed? Fixes some corner cases in JSON parsing and improves performance when `spark.sql.json.enablePartialResults` is enabled. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? I added tests to verify nested structs, maps, and arrays can be parsed without affecting the subsequent fields in the JSON. I also updated the existing tests when `spark.sql.json.enablePartialResults` is enabled because we parse more data now. I added a benchmark to check performance. Before the change (master, https://github.com/apache/spark/commit/a45a3a3d60cb97b107a177ad16bfe36372bc3e9b): ``` [info] OpenJDK 64-Bit Server VM 1.8.0_292-8u292-b10-0ubuntu1~18.04-b10 on Linux 5.4.0-1045-aws [info] Intel(R) Xeon(R) Platinum 8375C CPU 2.90GHz [info] Partial JSON results: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative [info] ------------------------------------------------------------------------------------------------------------------------ [info] parse invalid JSON 9537 9820 452 0.0 953651.6 1.0X ``` After the change (this PR): ``` OpenJDK 64-Bit Server VM 1.8.0_292-8u292-b10-0ubuntu1~18.04-b10 on Linux 5.4.0-1045-aws Intel(R) Xeon(R) Platinum 8375C CPU 2.90GHz Partial JSON results: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ parse invalid JSON 3100 3106 6 0.0 309967.6 1.0X ``` ### Was this patch authored or co-authored using generative AI tooling? No. Closes #42790 from sadikovi/SPARK-44940-3.5. Authored-by: Ivan Sadikov <ivan.sadi...@databricks.com> Signed-off-by: Dongjoon Hyun <dh...@apple.com> --- .../spark/sql/catalyst/json/JacksonParser.scala | 41 ++++- .../sql/catalyst/util/BadRecordException.scala | 64 +++++++- .../spark/sql/errors/QueryExecutionErrors.scala | 12 +- sql/core/benchmarks/JsonBenchmark-results.txt | 152 +++++++++--------- .../org/apache/spark/sql/JsonFunctionsSuite.scala | 20 ++- .../execution/datasources/json/JsonBenchmark.scala | 28 ++++ .../sql/execution/datasources/json/JsonSuite.scala | 170 ++++++++++++++++++++- 7 files changed, 393 insertions(+), 94 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala index 388edb9024c..f14f70532e6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala @@ -420,17 +420,17 @@ class JacksonParser( case VALUE_STRING if parser.getTextLength < 1 && allowEmptyString => dataType match { case FloatType | DoubleType | TimestampType | DateType => - throw QueryExecutionErrors.emptyJsonFieldValueError(dataType) + throw EmptyJsonFieldValueException(dataType) case _ => null } case VALUE_STRING if parser.getTextLength < 1 => - throw QueryExecutionErrors.emptyJsonFieldValueError(dataType) + throw EmptyJsonFieldValueException(dataType) case token => // We cannot parse this token based on the given data type. So, we throw a // RuntimeException and this exception will be caught by `parse` method. - throw QueryExecutionErrors.cannotParseJSONFieldError(parser, token, dataType) + throw CannotParseJSONFieldException(parser.getCurrentName, parser.getText, token, dataType) } /** @@ -459,6 +459,11 @@ class JacksonParser( bitmask(index) = false } catch { case e: SparkUpgradeException => throw e + case err: PartialValueException if enablePartialResults => + badRecordException = badRecordException.orElse(Some(err.cause)) + row.update(index, err.partialResult) + skipRow = structFilters.skipRow(row, index) + bitmask(index) = false case NonFatal(e) if isRoot || enablePartialResults => badRecordException = badRecordException.orElse(Some(e)) parser.skipChildren() @@ -508,7 +513,7 @@ class JacksonParser( if (badRecordException.isEmpty) { mapData } else { - throw PartialResultException(InternalRow(mapData), badRecordException.get) + throw PartialMapDataResultException(mapData, badRecordException.get) } } @@ -543,10 +548,21 @@ class JacksonParser( throw PartialResultArrayException(arrayData.toArray[InternalRow](schema), badRecordException.get) } else { - throw PartialResultException(InternalRow(arrayData), badRecordException.get) + throw PartialArrayDataResultException(arrayData, badRecordException.get) } } + /** + * Converts the non-stacktrace exceptions to user-friendly QueryExecutionErrors. + */ + private def convertCauseForPartialResult(err: Throwable): Throwable = err match { + case CannotParseJSONFieldException(fieldName, fieldValue, jsonType, dataType) => + QueryExecutionErrors.cannotParseJSONFieldError(fieldName, fieldValue, jsonType, dataType) + case EmptyJsonFieldValueException(dataType) => + QueryExecutionErrors.emptyJsonFieldValueError(dataType) + case _ => err + } + /** * Parse the JSON input to the set of [[InternalRow]]s. * @@ -589,12 +605,25 @@ class JacksonParser( throw BadRecordException( record = () => recordLiteral(record), partialResults = () => Array(row), - cause) + convertCauseForPartialResult(cause)) case PartialResultArrayException(rows, cause) => throw BadRecordException( record = () => recordLiteral(record), partialResults = () => rows, cause) + // These exceptions should never be thrown outside of JacksonParser. + // They are used for the control flow in the parser. We add them here for completeness + // since they also indicate a bad record. + case PartialArrayDataResultException(arrayData, cause) => + throw BadRecordException( + record = () => recordLiteral(record), + partialResults = () => Array(InternalRow(arrayData)), + convertCauseForPartialResult(cause)) + case PartialMapDataResultException(mapData, cause) => + throw BadRecordException( + record = () => recordLiteral(record), + partialResults = () => Array(InternalRow(mapData)), + convertCauseForPartialResult(cause)) } } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/BadRecordException.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/BadRecordException.scala index e1223a71f74..65a56c1064e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/BadRecordException.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/BadRecordException.scala @@ -17,18 +17,44 @@ package org.apache.spark.sql.catalyst.util +import com.fasterxml.jackson.core.JsonToken + import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.types.DataType import org.apache.spark.unsafe.types.UTF8String +abstract class PartialValueException(val cause: Throwable) extends Exception(cause) { + def partialResult: Serializable + override def getStackTrace(): Array[StackTraceElement] = cause.getStackTrace() + override def fillInStackTrace(): Throwable = this +} + /** - * Exception thrown when the underlying parser returns a partial result of parsing. + * Exception thrown when the underlying parser returns a partial result of parsing an object/row. * @param partialResult the partial result of parsing a bad record. * @param cause the actual exception about why the parser cannot return full result. */ case class PartialResultException( - partialResult: InternalRow, - cause: Throwable) - extends Exception(cause) + override val partialResult: InternalRow, + override val cause: Throwable) extends PartialValueException(cause) + +/** + * Exception thrown when the underlying parser returns a partial array result. + * @param partialResult the partial array result. + * @param cause the actual exception about why the parser cannot return full result. + */ +case class PartialArrayDataResultException( + override val partialResult: ArrayData, + override val cause: Throwable) extends PartialValueException(cause) + +/** + * Exception thrown when the underlying parser returns a partial map result. + * @param partialResult the partial map result. + * @param cause the actual exception about why the parser cannot return full result. + */ +case class PartialMapDataResultException( + override val partialResult: MapData, + override val cause: Throwable) extends PartialValueException(cause) /** * Exception thrown when the underlying parser returns partial result list of parsing. @@ -56,3 +82,33 @@ case class BadRecordException( * Exception thrown when the underlying parser parses a JSON array as a struct. */ case class JsonArraysAsStructsException() extends RuntimeException() + +/** + * Exception thrown when the underlying parser can not parses a String as a datatype. + */ +case class StringAsDataTypeException( + fieldName: String, + fieldValue: String, + dataType: DataType) extends RuntimeException() + +/** + * No-stacktrace equivalent of `QueryExecutionErrors.cannotParseJSONFieldError`. + * Used for code control flow in the parser without overhead of creating a full exception. + */ +case class CannotParseJSONFieldException( + fieldName: String, + fieldValue: String, + jsonType: JsonToken, + dataType: DataType) extends RuntimeException() { + override def getStackTrace(): Array[StackTraceElement] = new Array[StackTraceElement](0) + override def fillInStackTrace(): Throwable = this +} + +/** + * No-stacktrace equivalent of `QueryExecutionErrors.emptyJsonFieldValueError`. + * Used for code control flow in the parser without overhead of creating a full exception. + */ +case class EmptyJsonFieldValueException(dataType: DataType) extends RuntimeException() { + override def getStackTrace(): Array[StackTraceElement] = new Array[StackTraceElement](0) + override def fillInStackTrace(): Throwable = this +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala index 45986e42348..f4968cd0057 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala @@ -1329,11 +1329,19 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase with ExecutionE def cannotParseJSONFieldError(parser: JsonParser, jsonType: JsonToken, dataType: DataType) : SparkRuntimeException = { + cannotParseJSONFieldError(parser.getCurrentName, parser.getText, jsonType, dataType) + } + + def cannotParseJSONFieldError( + fieldName: String, + fieldValue: String, + jsonType: JsonToken, + dataType: DataType): SparkRuntimeException = { new SparkRuntimeException( errorClass = "CANNOT_PARSE_JSON_FIELD", messageParameters = Map( - "fieldName" -> toSQLValue(parser.getCurrentName, StringType), - "fieldValue" -> parser.getText, + "fieldName" -> toSQLValue(fieldName, StringType), + "fieldValue" -> fieldValue, "jsonType" -> jsonType.toString(), "dataType" -> toSQLType(dataType))) } diff --git a/sql/core/benchmarks/JsonBenchmark-results.txt b/sql/core/benchmarks/JsonBenchmark-results.txt index 55f66f7bb24..e53c7801141 100644 --- a/sql/core/benchmarks/JsonBenchmark-results.txt +++ b/sql/core/benchmarks/JsonBenchmark-results.txt @@ -3,121 +3,125 @@ Benchmark for performance of JSON parsing ================================================================================================ Preparing data for benchmarking ... -OpenJDK 64-Bit Server VM 1.8.0_362-b09 on Linux 5.15.0-1037-azure -Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz +OpenJDK 64-Bit Server VM 1.8.0_292-8u292-b10-0ubuntu1~18.04-b10 on Linux 5.4.0-1045-aws +Intel(R) Xeon(R) Platinum 8375C CPU @ 2.90GHz JSON schema inferring: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -No encoding 3720 3843 121 1.3 743.9 1.0X -UTF-8 is set 5412 5455 45 0.9 1082.4 0.7X +No encoding 2084 2134 46 2.4 416.8 1.0X +UTF-8 is set 3077 3093 14 1.6 615.3 0.7X Preparing data for benchmarking ... -OpenJDK 64-Bit Server VM 1.8.0_362-b09 on Linux 5.15.0-1037-azure -Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz +OpenJDK 64-Bit Server VM 1.8.0_292-8u292-b10-0ubuntu1~18.04-b10 on Linux 5.4.0-1045-aws +Intel(R) Xeon(R) Platinum 8375C CPU @ 2.90GHz count a short column: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -No encoding 3234 3254 33 1.5 646.7 1.0X -UTF-8 is set 4847 4868 21 1.0 969.5 0.7X +No encoding 2854 2863 8 1.8 570.8 1.0X +UTF-8 is set 4066 4066 1 1.2 813.1 0.7X Preparing data for benchmarking ... -OpenJDK 64-Bit Server VM 1.8.0_362-b09 on Linux 5.15.0-1037-azure -Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz +OpenJDK 64-Bit Server VM 1.8.0_292-8u292-b10-0ubuntu1~18.04-b10 on Linux 5.4.0-1045-aws +Intel(R) Xeon(R) Platinum 8375C CPU @ 2.90GHz count a wide column: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -No encoding 5702 5794 101 0.2 5702.1 1.0X -UTF-8 is set 9526 9607 73 0.1 9526.1 0.6X +No encoding 3348 3368 26 0.3 3347.8 1.0X +UTF-8 is set 5215 5239 22 0.2 5214.7 0.6X Preparing data for benchmarking ... -OpenJDK 64-Bit Server VM 1.8.0_362-b09 on Linux 5.15.0-1037-azure -Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz +OpenJDK 64-Bit Server VM 1.8.0_292-8u292-b10-0ubuntu1~18.04-b10 on Linux 5.4.0-1045-aws +Intel(R) Xeon(R) Platinum 8375C CPU @ 2.90GHz select wide row: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -No encoding 18318 18448 199 0.0 366367.7 1.0X -UTF-8 is set 19791 19887 99 0.0 395817.1 0.9X +No encoding 11046 11102 54 0.0 220928.4 1.0X +UTF-8 is set 12135 12181 54 0.0 242697.4 0.9X Preparing data for benchmarking ... -OpenJDK 64-Bit Server VM 1.8.0_362-b09 on Linux 5.15.0-1037-azure -Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz +OpenJDK 64-Bit Server VM 1.8.0_292-8u292-b10-0ubuntu1~18.04-b10 on Linux 5.4.0-1045-aws +Intel(R) Xeon(R) Platinum 8375C CPU @ 2.90GHz Select a subset of 10 columns: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Select 10 columns 2531 2570 51 0.4 2531.3 1.0X -Select 1 column 1867 1882 16 0.5 1867.0 1.4X +Select 10 columns 2486 2488 2 0.4 2486.5 1.0X +Select 1 column 1505 1506 2 0.7 1504.6 1.7X Preparing data for benchmarking ... -OpenJDK 64-Bit Server VM 1.8.0_362-b09 on Linux 5.15.0-1037-azure -Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz +OpenJDK 64-Bit Server VM 1.8.0_292-8u292-b10-0ubuntu1~18.04-b10 on Linux 5.4.0-1045-aws +Intel(R) Xeon(R) Platinum 8375C CPU @ 2.90GHz creation of JSON parser per line: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Short column without encoding 868 875 7 1.2 868.4 1.0X -Short column with UTF-8 1151 1163 11 0.9 1150.9 0.8X -Wide column without encoding 12063 12299 205 0.1 12063.0 0.1X -Wide column with UTF-8 16095 16136 51 0.1 16095.3 0.1X +Short column without encoding 888 889 3 1.1 887.6 1.0X +Short column with UTF-8 1134 1136 2 0.9 1134.3 0.8X +Wide column without encoding 8012 8056 51 0.1 8012.4 0.1X +Wide column with UTF-8 9830 9844 22 0.1 9829.7 0.1X Preparing data for benchmarking ... -OpenJDK 64-Bit Server VM 1.8.0_362-b09 on Linux 5.15.0-1037-azure -Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz +OpenJDK 64-Bit Server VM 1.8.0_292-8u292-b10-0ubuntu1~18.04-b10 on Linux 5.4.0-1045-aws +Intel(R) Xeon(R) Platinum 8375C CPU @ 2.90GHz JSON functions: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Text read 165 170 4 6.1 164.7 1.0X -from_json 2339 2386 77 0.4 2338.9 0.1X -json_tuple 2667 2730 55 0.4 2667.3 0.1X -get_json_object 2627 2659 32 0.4 2627.1 0.1X +Text read 85 87 2 11.7 85.4 1.0X +from_json 1706 1711 4 0.6 1706.4 0.1X +json_tuple 1528 1534 7 0.7 1528.2 0.1X +get_json_object 1275 1286 17 0.8 1275.0 0.1X Preparing data for benchmarking ... -OpenJDK 64-Bit Server VM 1.8.0_362-b09 on Linux 5.15.0-1037-azure -Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz +OpenJDK 64-Bit Server VM 1.8.0_292-8u292-b10-0ubuntu1~18.04-b10 on Linux 5.4.0-1045-aws +Intel(R) Xeon(R) Platinum 8375C CPU @ 2.90GHz Dataset of json strings: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Text read 700 715 20 7.1 140.1 1.0X -schema inferring 3144 3166 20 1.6 628.7 0.2X -parsing 3261 3271 9 1.5 652.1 0.2X +Text read 369 370 1 13.6 73.8 1.0X +schema inferring 1880 1883 4 2.7 376.0 0.2X +parsing 3731 3737 8 1.3 746.1 0.1X Preparing data for benchmarking ... -OpenJDK 64-Bit Server VM 1.8.0_362-b09 on Linux 5.15.0-1037-azure -Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz +OpenJDK 64-Bit Server VM 1.8.0_292-8u292-b10-0ubuntu1~18.04-b10 on Linux 5.4.0-1045-aws +Intel(R) Xeon(R) Platinum 8375C CPU @ 2.90GHz Json files in the per-line mode: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Text read 1096 1105 12 4.6 219.1 1.0X -Schema inferring 3818 3830 16 1.3 763.6 0.3X -Parsing without charset 4107 4137 32 1.2 821.4 0.3X -Parsing with UTF-8 5717 5763 41 0.9 1143.3 0.2X +Text read 553 579 32 9.0 110.6 1.0X +Schema inferring 2195 2196 2 2.3 439.0 0.3X +Parsing without charset 4272 4274 3 1.2 854.3 0.1X +Parsing with UTF-8 5459 5464 6 0.9 1091.7 0.1X -OpenJDK 64-Bit Server VM 1.8.0_362-b09 on Linux 5.15.0-1037-azure -Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz +OpenJDK 64-Bit Server VM 1.8.0_292-8u292-b10-0ubuntu1~18.04-b10 on Linux 5.4.0-1045-aws +Intel(R) Xeon(R) Platinum 8375C CPU @ 2.90GHz Write dates and timestamps: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Create a dataset of timestamps 199 202 3 5.0 198.9 1.0X -to_json(timestamp) 1458 1487 26 0.7 1458.0 0.1X -write timestamps to files 1232 1262 26 0.8 1232.5 0.2X -Create a dataset of dates 231 237 5 4.3 230.8 0.9X -to_json(date) 956 966 10 1.0 956.5 0.2X -write dates to files 785 793 10 1.3 785.4 0.3X +Create a dataset of timestamps 102 112 13 9.8 101.9 1.0X +to_json(timestamp) 840 841 1 1.2 839.6 0.1X +write timestamps to files 692 696 4 1.4 692.0 0.1X +Create a dataset of dates 120 121 1 8.4 119.7 0.9X +to_json(date) 589 591 2 1.7 589.3 0.2X +write dates to files 442 445 2 2.3 442.3 0.2X -OpenJDK 64-Bit Server VM 1.8.0_362-b09 on Linux 5.15.0-1037-azure -Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz +OpenJDK 64-Bit Server VM 1.8.0_292-8u292-b10-0ubuntu1~18.04-b10 on Linux 5.4.0-1045-aws +Intel(R) Xeon(R) Platinum 8375C CPU @ 2.90GHz Read dates and timestamps: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ----------------------------------------------------------------------------------------------------------------------------------------------------- -read timestamp text from files 294 300 6 3.4 293.8 1.0X -read timestamps from files 3254 3283 49 0.3 3254.0 0.1X -infer timestamps from files 8390 8528 165 0.1 8389.8 0.0X -read date text from files 269 276 7 3.7 269.3 1.1X -read date from files 1178 1192 13 0.8 1177.8 0.2X -timestamp strings 406 418 15 2.5 406.2 0.7X -parse timestamps from Dataset[String] 3700 3713 16 0.3 3699.5 0.1X -infer timestamps from Dataset[String] 8604 8647 65 0.1 8604.0 0.0X -date strings 464 479 14 2.2 463.7 0.6X -parse dates from Dataset[String] 1528 1538 10 0.7 1527.7 0.2X -from_json(timestamp) 5402 5429 26 0.2 5401.8 0.1X -from_json(date) 2948 2966 17 0.3 2947.5 0.1X -infer error timestamps from Dataset[String] with default format 2358 2434 70 0.4 2357.6 0.1X -infer error timestamps from Dataset[String] with user-provided format 2363 2390 36 0.4 2362.9 0.1X -infer error timestamps from Dataset[String] with legacy format 2248 2287 35 0.4 2248.3 0.1X +read timestamp text from files 143 145 4 7.0 142.6 1.0X +read timestamps from files 2449 2469 17 0.4 2448.6 0.1X +infer timestamps from files 5579 5596 15 0.2 5578.8 0.0X +read date text from files 132 139 7 7.6 131.7 1.1X +read date from files 1017 1020 2 1.0 1017.5 0.1X +timestamp strings 227 230 3 4.4 227.2 0.6X +parse timestamps from Dataset[String] 2827 2830 3 0.4 2826.5 0.1X +infer timestamps from Dataset[String] 6001 6008 6 0.2 6001.2 0.0X +date strings 259 261 2 3.9 259.0 0.6X +parse dates from Dataset[String] 1382 1387 4 0.7 1382.3 0.1X +from_json(timestamp) 3557 3561 7 0.3 3556.8 0.0X +from_json(date) 2146 2148 2 0.5 2146.4 0.1X +infer error timestamps from Dataset[String] with default format 1989 1993 4 0.5 1989.3 0.1X +infer error timestamps from Dataset[String] with user-provided format 1922 1925 3 0.5 1922.1 0.1X +infer error timestamps from Dataset[String] with legacy format 1919 1923 4 0.5 1919.1 0.1X -OpenJDK 64-Bit Server VM 1.8.0_362-b09 on Linux 5.15.0-1037-azure -Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz +OpenJDK 64-Bit Server VM 1.8.0_292-8u292-b10-0ubuntu1~18.04-b10 on Linux 5.4.0-1045-aws +Intel(R) Xeon(R) Platinum 8375C CPU @ 2.90GHz Filters pushdown: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -w/o filters 22544 22661 109 0.0 225436.4 1.0X -pushdown disabled 21045 21213 188 0.0 210452.6 1.1X -w/ filters 893 904 10 0.1 8931.8 25.2X - +w/o filters 14387 14399 12 0.0 143871.9 1.0X +pushdown disabled 13891 13899 7 0.0 138912.3 1.0X +w/ filters 782 784 2 0.1 7820.5 18.4X +OpenJDK 64-Bit Server VM 1.8.0_292-8u292-b10-0ubuntu1~18.04-b10 on Linux 5.4.0-1045-aws +Intel(R) Xeon(R) Platinum 8375C CPU @ 2.90GHz +Partial JSON results: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +parse invalid JSON 3100 3106 6 0.0 309967.6 1.0X diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala index 187fab75f63..a76e102fe91 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala @@ -1021,16 +1021,23 @@ class JsonFunctionsSuite extends QueryTest with SharedSparkSession { .add("c2", ArrayType(new StructType().add("c3", LongType).add("c4", StringType))) val df1 = Seq("""{"c2": [19], "c1": 123456}""").toDF("c0") checkAnswer(df1.select(from_json($"c0", st)), Row(Row(123456, null))) + val df2 = Seq("""{"data": {"c2": [19], "c1": 123456}}""").toDF("c0") - checkAnswer(df2.select(from_json($"c0", new StructType().add("data", st))), Row(Row(null))) + withSQLConf(SQLConf.JSON_ENABLE_PARTIAL_RESULTS.key -> "true") { + checkAnswer( + df2.select(from_json($"c0", new StructType().add("data", st))), + Row(Row(Row(123456, null))) + ) + } + withSQLConf(SQLConf.JSON_ENABLE_PARTIAL_RESULTS.key -> "false") { + checkAnswer(df2.select(from_json($"c0", new StructType().add("data", st))), Row(Row(null))) + } + val df3 = Seq("""[{"c2": [19], "c1": 123456}]""").toDF("c0") withSQLConf(SQLConf.JSON_ENABLE_PARTIAL_RESULTS.key -> "true") { - val df3 = Seq("""[{"c2": [19], "c1": 123456}]""").toDF("c0") checkAnswer(df3.select(from_json($"c0", ArrayType(st))), Row(Array(Row(123456, null)))) } - withSQLConf(SQLConf.JSON_ENABLE_PARTIAL_RESULTS.key -> "false") { - val df3 = Seq("""[{"c2": [19], "c1": 123456}]""").toDF("c0") checkAnswer(df3.select(from_json($"c0", ArrayType(st))), Row(null)) } @@ -1119,14 +1126,13 @@ class JsonFunctionsSuite extends QueryTest with SharedSparkSession { ) ) - // Value "a" cannot be parsed as an integer, - // the error cascades to "c2", thus making its value null. + // Value "a" cannot be parsed as an integer, c2 value is null. val df = Seq("""[{"c1": [{"c2": ["a"]}]}]""").toDF("c0") withSQLConf(SQLConf.JSON_ENABLE_PARTIAL_RESULTS.key -> "true") { checkAnswer( df.select(from_json($"c0", ArrayType(st))), - Row(Array(Row(null))) + Row(Array(Row(Seq(Row(null))))) ) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonBenchmark.scala index c522378a65d..5b86543648f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonBenchmark.scala @@ -542,6 +542,33 @@ object JsonBenchmark extends SqlBasedBenchmark { } } + private def partialResultBenchmark(rowsNum: Int, numIters: Int): Unit = { + val benchmark = new Benchmark("Partial JSON results", rowsNum, output = output) + val colsNum = 1000 + + val fields = Seq.tabulate(colsNum)(i => StructField(s"col$i", IntegerType)) + val schema = StructType(fields) + + def data: Dataset[String] = { + spark.range(0, rowsNum, 1, 1).mapPartitions { iter => + iter.map { i => + (0 until colsNum).map { j => + // Only the last column has an integer value. + if (j < colsNum - 1) s""""col${i}":"foo_${j}"""" else s""""col${i}":${j}""" + }.mkString("{", ", ", "}") + } + }.select($"value").as[String] + } + + benchmark.addCase("parse invalid JSON", numIters) { _ => + withSQLConf(SQLConf.JSON_ENABLE_PARTIAL_RESULTS.key -> "true") { + spark.read.schema(schema).json(data).noop() + } + } + + benchmark.run() + } + override def runBenchmarkSuite(mainArgs: Array[String]): Unit = { val numIters = 3 runBenchmark("Benchmark for performance of JSON parsing") { @@ -558,6 +585,7 @@ object JsonBenchmark extends SqlBasedBenchmark { // Benchmark pushdown filters that refer to top-level columns. // TODO (SPARK-32325): Add benchmarks for filters with nested column attributes. filtersPushdownBenchmark(rowsNum = 100 * 1000, numIters) + partialResultBenchmark(rowsNum = 10000, numIters) } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 95468a1f1d7..11779286ec2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -3437,7 +3437,7 @@ abstract class JsonSuite if (enablePartialResults) { checkAnswer( df, - Seq(Row(null, Row(1)), Row(Row(2, null), Row(2))) + Seq(Row(Row(1, null), Row(1)), Row(Row(2, null), Row(2))) ) } else { checkAnswer( @@ -3450,6 +3450,174 @@ abstract class JsonSuite } } + test("SPARK-44940: fully parse the record except f1 if partial results are enabled") { + withTempPath { path => + Seq( + """{"a1": "AAA", "a2": [{"f1": "", "f2": ""}], "a3": "id1", "a4": "XXX"}""", + """{"a1": "BBB", "a2": [{"f1": 12, "f2": ""}], "a3": "id2", "a4": "YYY"}""").toDF() + .repartition(1) + .write.text(path.getAbsolutePath) + + withSQLConf(SQLConf.JSON_ENABLE_PARTIAL_RESULTS.key -> "true") { + val df = spark.read.json(path.getAbsolutePath) + checkAnswer( + df, + Seq( + Row("AAA", Seq(Row(null, "")), "id1", "XXX"), + Row("BBB", Seq(Row(12, "")), "id2", "YYY") + ) + ) + } + + withSQLConf(SQLConf.JSON_ENABLE_PARTIAL_RESULTS.key -> "false") { + val df = spark.read.json(path.getAbsolutePath) + checkAnswer( + df, + Seq( + Row("AAA", null, null, null), + Row("BBB", Seq(Row(12, "")), "id2", "YYY") + ) + ) + } + } + } + + test("SPARK-44940: fully parse primitive map if partial results are enabled") { + withTempPath { path => + Seq( + """{"a1": "AAA", "a2": {"f1": "", "f2": ""}, "a3": "id1"}""", + """{"a1": "BBB", "a2": {"f1": 12, "f2": ""}, "a3": "id2"}""").toDF() + .repartition(1) + .write.text(path.getAbsolutePath) + + val schema = "a1 string, a2 map<string, int>, a3 string" + + withSQLConf(SQLConf.JSON_ENABLE_PARTIAL_RESULTS.key -> "true") { + val df = spark.read.schema(schema).json(path.getAbsolutePath) + // Although the keys match the string type and some values match the integer type, because + // some of the values do not match the type, we mark the entire map as null. + checkAnswer( + df, + Seq( + Row("AAA", null, "id1"), + Row("BBB", null, "id2") + ) + ) + } + + withSQLConf(SQLConf.JSON_ENABLE_PARTIAL_RESULTS.key -> "false") { + val df = spark.read.schema(schema).json(path.getAbsolutePath) + checkAnswer( + df, + Seq( + Row("AAA", null, null), + Row("BBB", null, null) + ) + ) + } + } + } + + test("SPARK-44940: fully parse map of structs if partial results are enabled") { + withTempPath { path => + Seq( + """{"a1": "AAA", "a2": {"key": {"f1": "", "f2": ""}}, "a3": "id1"}""", + """{"a1": "BBB", "a2": {"key": {"f1": 12, "f2": ""}}, "a3": "id2"}""").toDF() + .repartition(1) + .write.text(path.getAbsolutePath) + + val schema = "a1 string, a2 map<string, struct<f1: int, f2: string>>, a3 string" + + withSQLConf(SQLConf.JSON_ENABLE_PARTIAL_RESULTS.key -> "true") { + val df = spark.read.schema(schema).json(path.getAbsolutePath) + checkAnswer( + df, + Seq( + Row("AAA", Map("key" -> Row(null, "")), "id1"), + Row("BBB", Map("key" -> Row(12, "")), "id2") + ) + ) + } + + withSQLConf(SQLConf.JSON_ENABLE_PARTIAL_RESULTS.key -> "false") { + val df = spark.read.schema(schema).json(path.getAbsolutePath) + checkAnswer( + df, + Seq( + Row("AAA", null, null), + Row("BBB", Map("key" -> Row(12, "")), "id2") + ) + ) + } + } + } + + test("SPARK-44940: fully parse primitive arrays if partial results are enabled") { + withTempPath { path => + Seq( + """{"a1": "AAA", "a2": {"f1": [""]}, "a3": "id1", "a4": "XXX"}""", + """{"a1": "BBB", "a2": {"f1": [12]}, "a3": "id2", "a4": "YYY"}""").toDF() + .repartition(1) + .write.text(path.getAbsolutePath) + + withSQLConf(SQLConf.JSON_ENABLE_PARTIAL_RESULTS.key -> "true") { + val df = spark.read.json(path.getAbsolutePath) + checkAnswer( + df, + Seq( + Row("AAA", Row(null), "id1", "XXX"), + Row("BBB", Row(Seq(12)), "id2", "YYY") + ) + ) + } + + withSQLConf(SQLConf.JSON_ENABLE_PARTIAL_RESULTS.key -> "false") { + val df = spark.read.json(path.getAbsolutePath) + checkAnswer( + df, + Seq( + Row("AAA", null, null, null), + Row("BBB", Row(Seq(12)), "id2", "YYY") + ) + ) + } + } + } + + test("SPARK-44940: fully parse array of arrays if partial results are enabled") { + withTempPath { path => + Seq( + """{"a1": "AAA", "a2": [[12, ""], [""]], "a3": "id1", "a4": "XXX"}""", + """{"a1": "BBB", "a2": [[12, 34], [""]], "a3": "id2", "a4": "YYY"}""").toDF() + .repartition(1) + .write.text(path.getAbsolutePath) + + // We cannot parse `array<array<int>>` type because one of the inner arrays contains a + // mismatched type. + withSQLConf(SQLConf.JSON_ENABLE_PARTIAL_RESULTS.key -> "true") { + val df = spark.read.json(path.getAbsolutePath) + checkAnswer( + df, + Seq( + Row("AAA", null, "id1", "XXX"), + Row("BBB", null, "id2", "YYY") + ) + ) + } + + withSQLConf(SQLConf.JSON_ENABLE_PARTIAL_RESULTS.key -> "false") { + val df = spark.read.json(path.getAbsolutePath) + checkAnswer( + df, + Seq( + Row("AAA", null, "id1", "XXX"), + Row("BBB", null, "id2", "YYY") + ) + ) + } + } + } + test("SPARK-40667: validate JSON Options") { assert(JSONOptions.getAllOptions.size == 28) // Please add validation on any new Json options here --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org