This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.4 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.4 by this push: new daf481d9505 [SPARK-44940][SQL][3.4] Improve performance of JSON parsing when "spark.sql.json.enablePartialResults" is enabled daf481d9505 is described below commit daf481d950564efc01fb99628dded08ad1f51ff2 Author: Ivan Sadikov <ivan.sadi...@databricks.com> AuthorDate: Mon Sep 4 14:39:06 2023 -0700 [SPARK-44940][SQL][3.4] Improve performance of JSON parsing when "spark.sql.json.enablePartialResults" is enabled ### What changes were proposed in this pull request? Backport of https://github.com/apache/spark/pull/42667 to branch-3.4. The PR improves JSON parsing when `spark.sql.json.enablePartialResults` is enabled: - Fixes the issue when using nested arrays `ClassCastException: org.apache.spark.sql.catalyst.util.GenericArrayData cannot be cast to org.apache.spark.sql.catalyst.InternalRow` - Improves parsing of the nested struct fields, e.g. `{"a1": "AAA", "a2": [{"f1": "", "f2": ""}], "a3": "id1", "a4": "XXX"}` used to be parsed as `|AAA|NULL |NULL|NULL|` and now is parsed as `|AAA|[{NULL, }]|id1|XXX|`. - Improves performance of nested JSON parsing. The initial implementation would throw too many exceptions when multiple nested fields failed to parse. When the config is disabled, it is not a problem because the entire record is marked as NULL. The internal benchmarks show the performance improvement from slowdown of over 160% to an improvement of 7-8% compared to the master branch when the flag is enabled. I will create a follow-up ticket to add a benchmark for this regression. ### Why are the changes needed? Fixes some corner cases in JSON parsing and improves performance when `spark.sql.json.enablePartialResults` is enabled. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? I added tests to verify nested structs, maps, and arrays can be parsed without affecting the subsequent fields in the JSON. I also updated the existing tests when `spark.sql.json.enablePartialResults` is enabled because we parse more data now. I added a benchmark to check performance. Before the change (master, https://github.com/apache/spark/commit/a45a3a3d60cb97b107a177ad16bfe36372bc3e9b): ``` [info] OpenJDK 64-Bit Server VM 1.8.0_292-8u292-b10-0ubuntu1~18.04-b10 on Linux 5.4.0-1045-aws [info] Intel(R) Xeon(R) Platinum 8375C CPU 2.90GHz [info] Partial JSON results: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative [info] ------------------------------------------------------------------------------------------------------------------------ [info] parse invalid JSON 9537 9820 452 0.0 953651.6 1.0X ``` After the change (this PR): ``` OpenJDK 64-Bit Server VM 1.8.0_292-8u292-b10-0ubuntu1~18.04-b10 on Linux 5.4.0-1045-aws Intel(R) Xeon(R) Platinum 8375C CPU 2.90GHz Partial JSON results: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ parse invalid JSON 3100 3106 6 0.0 309967.6 1.0X ``` ### Was this patch authored or co-authored using generative AI tooling? No. Closes #42792 from sadikovi/SPARK-44940-3.4. Authored-by: Ivan Sadikov <ivan.sadi...@databricks.com> Signed-off-by: Dongjoon Hyun <dh...@apple.com> --- .../spark/sql/catalyst/json/JacksonParser.scala | 41 ++++- .../sql/catalyst/util/BadRecordException.scala | 69 ++++++++- .../spark/sql/errors/QueryExecutionErrors.scala | 12 +- sql/core/benchmarks/JsonBenchmark-results.txt | 153 ++++++++++--------- .../org/apache/spark/sql/JsonFunctionsSuite.scala | 20 ++- .../execution/datasources/json/JsonBenchmark.scala | 28 ++++ .../sql/execution/datasources/json/JsonSuite.scala | 170 ++++++++++++++++++++- 7 files changed, 400 insertions(+), 93 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala index d9bff3dc7ec..20b281332d0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala @@ -420,17 +420,17 @@ class JacksonParser( case VALUE_STRING if parser.getTextLength < 1 && allowEmptyString => dataType match { case FloatType | DoubleType | TimestampType | DateType => - throw QueryExecutionErrors.emptyJsonFieldValueError(dataType) + throw EmptyJsonFieldValueException(dataType) case _ => null } case VALUE_STRING if parser.getTextLength < 1 => - throw QueryExecutionErrors.emptyJsonFieldValueError(dataType) + throw EmptyJsonFieldValueException(dataType) case token => // We cannot parse this token based on the given data type. So, we throw a // RuntimeException and this exception will be caught by `parse` method. - throw QueryExecutionErrors.cannotParseJSONFieldError(parser, token, dataType) + throw CannotParseJSONFieldException(parser.getCurrentName, parser.getText, token, dataType) } /** @@ -458,6 +458,11 @@ class JacksonParser( schema.existenceDefaultsBitmask(index) = false } catch { case e: SparkUpgradeException => throw e + case err: PartialValueException if enablePartialResults => + badRecordException = badRecordException.orElse(Some(err.cause)) + row.update(index, err.partialResult) + skipRow = structFilters.skipRow(row, index) + schema.existenceDefaultsBitmask(index) = false case NonFatal(e) if isRoot || enablePartialResults => badRecordException = badRecordException.orElse(Some(e)) parser.skipChildren() @@ -507,7 +512,7 @@ class JacksonParser( if (badRecordException.isEmpty) { mapData } else { - throw PartialResultException(InternalRow(mapData), badRecordException.get) + throw PartialMapDataResultException(mapData, badRecordException.get) } } @@ -542,10 +547,21 @@ class JacksonParser( throw PartialResultArrayException(arrayData.toArray[InternalRow](schema), badRecordException.get) } else { - throw PartialResultException(InternalRow(arrayData), badRecordException.get) + throw PartialArrayDataResultException(arrayData, badRecordException.get) } } + /** + * Converts the non-stacktrace exceptions to user-friendly QueryExecutionErrors. + */ + private def convertCauseForPartialResult(err: Throwable): Throwable = err match { + case CannotParseJSONFieldException(fieldName, fieldValue, jsonType, dataType) => + QueryExecutionErrors.cannotParseJSONFieldError(fieldName, fieldValue, jsonType, dataType) + case EmptyJsonFieldValueException(dataType) => + QueryExecutionErrors.emptyJsonFieldValueError(dataType) + case _ => err + } + /** * Parse the JSON input to the set of [[InternalRow]]s. * @@ -588,12 +604,25 @@ class JacksonParser( throw BadRecordException( record = () => recordLiteral(record), partialResults = () => Array(row), - cause) + convertCauseForPartialResult(cause)) case PartialResultArrayException(rows, cause) => throw BadRecordException( record = () => recordLiteral(record), partialResults = () => rows, cause) + // These exceptions should never be thrown outside of JacksonParser. + // They are used for the control flow in the parser. We add them here for completeness + // since they also indicate a bad record. + case PartialArrayDataResultException(arrayData, cause) => + throw BadRecordException( + record = () => recordLiteral(record), + partialResults = () => Array(InternalRow(arrayData)), + convertCauseForPartialResult(cause)) + case PartialMapDataResultException(mapData, cause) => + throw BadRecordException( + record = () => recordLiteral(record), + partialResults = () => Array(InternalRow(mapData)), + convertCauseForPartialResult(cause)) } } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/BadRecordException.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/BadRecordException.scala index 005f32dd869..65a56c1064e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/BadRecordException.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/BadRecordException.scala @@ -17,18 +17,44 @@ package org.apache.spark.sql.catalyst.util +import com.fasterxml.jackson.core.JsonToken + import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.types.DataType import org.apache.spark.unsafe.types.UTF8String +abstract class PartialValueException(val cause: Throwable) extends Exception(cause) { + def partialResult: Serializable + override def getStackTrace(): Array[StackTraceElement] = cause.getStackTrace() + override def fillInStackTrace(): Throwable = this +} + /** - * Exception thrown when the underlying parser returns a partial result of parsing. + * Exception thrown when the underlying parser returns a partial result of parsing an object/row. * @param partialResult the partial result of parsing a bad record. * @param cause the actual exception about why the parser cannot return full result. */ case class PartialResultException( - partialResult: InternalRow, - cause: Throwable) - extends Exception(cause) + override val partialResult: InternalRow, + override val cause: Throwable) extends PartialValueException(cause) + +/** + * Exception thrown when the underlying parser returns a partial array result. + * @param partialResult the partial array result. + * @param cause the actual exception about why the parser cannot return full result. + */ +case class PartialArrayDataResultException( + override val partialResult: ArrayData, + override val cause: Throwable) extends PartialValueException(cause) + +/** + * Exception thrown when the underlying parser returns a partial map result. + * @param partialResult the partial map result. + * @param cause the actual exception about why the parser cannot return full result. + */ +case class PartialMapDataResultException( + override val partialResult: MapData, + override val cause: Throwable) extends PartialValueException(cause) /** * Exception thrown when the underlying parser returns partial result list of parsing. @@ -51,3 +77,38 @@ case class BadRecordException( @transient record: () => UTF8String, @transient partialResults: () => Array[InternalRow] = () => Array.empty[InternalRow], cause: Throwable) extends Exception(cause) + +/** + * Exception thrown when the underlying parser parses a JSON array as a struct. + */ +case class JsonArraysAsStructsException() extends RuntimeException() + +/** + * Exception thrown when the underlying parser can not parses a String as a datatype. + */ +case class StringAsDataTypeException( + fieldName: String, + fieldValue: String, + dataType: DataType) extends RuntimeException() + +/** + * No-stacktrace equivalent of `QueryExecutionErrors.cannotParseJSONFieldError`. + * Used for code control flow in the parser without overhead of creating a full exception. + */ +case class CannotParseJSONFieldException( + fieldName: String, + fieldValue: String, + jsonType: JsonToken, + dataType: DataType) extends RuntimeException() { + override def getStackTrace(): Array[StackTraceElement] = new Array[StackTraceElement](0) + override def fillInStackTrace(): Throwable = this +} + +/** + * No-stacktrace equivalent of `QueryExecutionErrors.emptyJsonFieldValueError`. + * Used for code control flow in the parser without overhead of creating a full exception. + */ +case class EmptyJsonFieldValueException(dataType: DataType) extends RuntimeException() { + override def getStackTrace(): Array[StackTraceElement] = new Array[StackTraceElement](0) + override def fillInStackTrace(): Throwable = this +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala index 2865ce5492f..c057c3a7ec0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala @@ -1437,11 +1437,19 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase { def cannotParseJSONFieldError(parser: JsonParser, jsonType: JsonToken, dataType: DataType) : SparkRuntimeException = { + cannotParseJSONFieldError(parser.getCurrentName, parser.getText, jsonType, dataType) + } + + def cannotParseJSONFieldError( + fieldName: String, + fieldValue: String, + jsonType: JsonToken, + dataType: DataType): SparkRuntimeException = { new SparkRuntimeException( errorClass = "CANNOT_PARSE_JSON_FIELD", messageParameters = Map( - "fieldName" -> toSQLValue(parser.getCurrentName, StringType), - "fieldValue" -> parser.getText, + "fieldName" -> toSQLValue(fieldName, StringType), + "fieldValue" -> fieldValue, "jsonType" -> jsonType.toString(), "dataType" -> toSQLType(dataType))) } diff --git a/sql/core/benchmarks/JsonBenchmark-results.txt b/sql/core/benchmarks/JsonBenchmark-results.txt index 1d5946e4661..e53c7801141 100644 --- a/sql/core/benchmarks/JsonBenchmark-results.txt +++ b/sql/core/benchmarks/JsonBenchmark-results.txt @@ -3,118 +3,125 @@ Benchmark for performance of JSON parsing ================================================================================================ Preparing data for benchmarking ... -OpenJDK 64-Bit Server VM 1.8.0_352-b08 on Linux 5.15.0-1023-azure -Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz +OpenJDK 64-Bit Server VM 1.8.0_292-8u292-b10-0ubuntu1~18.04-b10 on Linux 5.4.0-1045-aws +Intel(R) Xeon(R) Platinum 8375C CPU @ 2.90GHz JSON schema inferring: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -No encoding 3020 3050 28 1.7 604.0 1.0X -UTF-8 is set 4282 4296 23 1.2 856.4 0.7X +No encoding 2084 2134 46 2.4 416.8 1.0X +UTF-8 is set 3077 3093 14 1.6 615.3 0.7X Preparing data for benchmarking ... -OpenJDK 64-Bit Server VM 1.8.0_352-b08 on Linux 5.15.0-1023-azure -Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz +OpenJDK 64-Bit Server VM 1.8.0_292-8u292-b10-0ubuntu1~18.04-b10 on Linux 5.4.0-1045-aws +Intel(R) Xeon(R) Platinum 8375C CPU @ 2.90GHz count a short column: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -No encoding 2251 2290 54 2.2 450.2 1.0X -UTF-8 is set 3672 3684 11 1.4 734.4 0.6X +No encoding 2854 2863 8 1.8 570.8 1.0X +UTF-8 is set 4066 4066 1 1.2 813.1 0.7X Preparing data for benchmarking ... -OpenJDK 64-Bit Server VM 1.8.0_352-b08 on Linux 5.15.0-1023-azure -Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz +OpenJDK 64-Bit Server VM 1.8.0_292-8u292-b10-0ubuntu1~18.04-b10 on Linux 5.4.0-1045-aws +Intel(R) Xeon(R) Platinum 8375C CPU @ 2.90GHz count a wide column: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -No encoding 5696 6246 604 0.2 5696.4 1.0X -UTF-8 is set 8498 8523 24 0.1 8498.2 0.7X +No encoding 3348 3368 26 0.3 3347.8 1.0X +UTF-8 is set 5215 5239 22 0.2 5214.7 0.6X Preparing data for benchmarking ... -OpenJDK 64-Bit Server VM 1.8.0_352-b08 on Linux 5.15.0-1023-azure -Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz +OpenJDK 64-Bit Server VM 1.8.0_292-8u292-b10-0ubuntu1~18.04-b10 on Linux 5.4.0-1045-aws +Intel(R) Xeon(R) Platinum 8375C CPU @ 2.90GHz select wide row: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -No encoding 12496 12562 73 0.0 249920.4 1.0X -UTF-8 is set 12923 12949 25 0.0 258461.7 1.0X +No encoding 11046 11102 54 0.0 220928.4 1.0X +UTF-8 is set 12135 12181 54 0.0 242697.4 0.9X Preparing data for benchmarking ... -OpenJDK 64-Bit Server VM 1.8.0_352-b08 on Linux 5.15.0-1023-azure -Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz +OpenJDK 64-Bit Server VM 1.8.0_292-8u292-b10-0ubuntu1~18.04-b10 on Linux 5.4.0-1045-aws +Intel(R) Xeon(R) Platinum 8375C CPU @ 2.90GHz Select a subset of 10 columns: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Select 10 columns 2249 2261 13 0.4 2249.4 1.0X -Select 1 column 2472 2474 2 0.4 2472.0 0.9X +Select 10 columns 2486 2488 2 0.4 2486.5 1.0X +Select 1 column 1505 1506 2 0.7 1504.6 1.7X Preparing data for benchmarking ... -OpenJDK 64-Bit Server VM 1.8.0_352-b08 on Linux 5.15.0-1023-azure -Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz +OpenJDK 64-Bit Server VM 1.8.0_292-8u292-b10-0ubuntu1~18.04-b10 on Linux 5.4.0-1045-aws +Intel(R) Xeon(R) Platinum 8375C CPU @ 2.90GHz creation of JSON parser per line: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Short column without encoding 698 706 9 1.4 698.5 1.0X -Short column with UTF-8 965 970 9 1.0 964.9 0.7X -Wide column without encoding 10933 11224 409 0.1 10932.7 0.1X -Wide column with UTF-8 13842 13891 81 0.1 13841.6 0.1X +Short column without encoding 888 889 3 1.1 887.6 1.0X +Short column with UTF-8 1134 1136 2 0.9 1134.3 0.8X +Wide column without encoding 8012 8056 51 0.1 8012.4 0.1X +Wide column with UTF-8 9830 9844 22 0.1 9829.7 0.1X Preparing data for benchmarking ... -OpenJDK 64-Bit Server VM 1.8.0_352-b08 on Linux 5.15.0-1023-azure -Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz +OpenJDK 64-Bit Server VM 1.8.0_292-8u292-b10-0ubuntu1~18.04-b10 on Linux 5.4.0-1045-aws +Intel(R) Xeon(R) Platinum 8375C CPU @ 2.90GHz JSON functions: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Text read 130 134 3 7.7 130.5 1.0X -from_json 2017 2052 33 0.5 2017.2 0.1X -json_tuple 2327 2353 24 0.4 2327.1 0.1X -get_json_object 1996 2004 7 0.5 1995.7 0.1X +Text read 85 87 2 11.7 85.4 1.0X +from_json 1706 1711 4 0.6 1706.4 0.1X +json_tuple 1528 1534 7 0.7 1528.2 0.1X +get_json_object 1275 1286 17 0.8 1275.0 0.1X Preparing data for benchmarking ... -OpenJDK 64-Bit Server VM 1.8.0_352-b08 on Linux 5.15.0-1023-azure -Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz +OpenJDK 64-Bit Server VM 1.8.0_292-8u292-b10-0ubuntu1~18.04-b10 on Linux 5.4.0-1045-aws +Intel(R) Xeon(R) Platinum 8375C CPU @ 2.90GHz Dataset of json strings: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Text read 597 600 4 8.4 119.4 1.0X -schema inferring 2883 2896 20 1.7 576.6 0.2X -parsing 2686 2687 1 1.9 537.2 0.2X +Text read 369 370 1 13.6 73.8 1.0X +schema inferring 1880 1883 4 2.7 376.0 0.2X +parsing 3731 3737 8 1.3 746.1 0.1X Preparing data for benchmarking ... -OpenJDK 64-Bit Server VM 1.8.0_352-b08 on Linux 5.15.0-1023-azure -Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz +OpenJDK 64-Bit Server VM 1.8.0_292-8u292-b10-0ubuntu1~18.04-b10 on Linux 5.4.0-1045-aws +Intel(R) Xeon(R) Platinum 8375C CPU @ 2.90GHz Json files in the per-line mode: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Text read 853 860 8 5.9 170.6 1.0X -Schema inferring 3353 3362 8 1.5 670.6 0.3X -Parsing without charset 3258 3277 18 1.5 651.6 0.3X -Parsing with UTF-8 4810 4818 10 1.0 962.0 0.2X +Text read 553 579 32 9.0 110.6 1.0X +Schema inferring 2195 2196 2 2.3 439.0 0.3X +Parsing without charset 4272 4274 3 1.2 854.3 0.1X +Parsing with UTF-8 5459 5464 6 0.9 1091.7 0.1X -OpenJDK 64-Bit Server VM 1.8.0_352-b08 on Linux 5.15.0-1023-azure -Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz +OpenJDK 64-Bit Server VM 1.8.0_292-8u292-b10-0ubuntu1~18.04-b10 on Linux 5.4.0-1045-aws +Intel(R) Xeon(R) Platinum 8375C CPU @ 2.90GHz Write dates and timestamps: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Create a dataset of timestamps 150 154 3 6.7 150.0 1.0X -to_json(timestamp) 1233 1244 11 0.8 1232.6 0.1X -write timestamps to files 1012 1027 16 1.0 1012.3 0.1X -Create a dataset of dates 181 183 2 5.5 180.5 0.8X -to_json(date) 846 859 11 1.2 845.7 0.2X -write dates to files 603 619 22 1.7 603.2 0.2X +Create a dataset of timestamps 102 112 13 9.8 101.9 1.0X +to_json(timestamp) 840 841 1 1.2 839.6 0.1X +write timestamps to files 692 696 4 1.4 692.0 0.1X +Create a dataset of dates 120 121 1 8.4 119.7 0.9X +to_json(date) 589 591 2 1.7 589.3 0.2X +write dates to files 442 445 2 2.3 442.3 0.2X -OpenJDK 64-Bit Server VM 1.8.0_352-b08 on Linux 5.15.0-1023-azure -Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz -Read dates and timestamps: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------- -read timestamp text from files 229 232 3 4.4 228.6 1.0X -read timestamps from files 2410 2420 9 0.4 2410.3 0.1X -infer timestamps from files 6273 6282 9 0.2 6273.4 0.0X -read date text from files 204 205 1 4.9 204.1 1.1X -read date from files 826 830 4 1.2 826.3 0.3X -timestamp strings 319 327 10 3.1 319.5 0.7X -parse timestamps from Dataset[String] 2966 2970 4 0.3 2965.8 0.1X -infer timestamps from Dataset[String] 6888 6893 4 0.1 6888.5 0.0X -date strings 367 376 9 2.7 367.2 0.6X -parse dates from Dataset[String] 1242 1260 18 0.8 1241.8 0.2X -from_json(timestamp) 4160 4162 3 0.2 4160.3 0.1X -from_json(date) 2631 2641 10 0.4 2630.9 0.1X +OpenJDK 64-Bit Server VM 1.8.0_292-8u292-b10-0ubuntu1~18.04-b10 on Linux 5.4.0-1045-aws +Intel(R) Xeon(R) Platinum 8375C CPU @ 2.90GHz +Read dates and timestamps: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +----------------------------------------------------------------------------------------------------------------------------------------------------- +read timestamp text from files 143 145 4 7.0 142.6 1.0X +read timestamps from files 2449 2469 17 0.4 2448.6 0.1X +infer timestamps from files 5579 5596 15 0.2 5578.8 0.0X +read date text from files 132 139 7 7.6 131.7 1.1X +read date from files 1017 1020 2 1.0 1017.5 0.1X +timestamp strings 227 230 3 4.4 227.2 0.6X +parse timestamps from Dataset[String] 2827 2830 3 0.4 2826.5 0.1X +infer timestamps from Dataset[String] 6001 6008 6 0.2 6001.2 0.0X +date strings 259 261 2 3.9 259.0 0.6X +parse dates from Dataset[String] 1382 1387 4 0.7 1382.3 0.1X +from_json(timestamp) 3557 3561 7 0.3 3556.8 0.0X +from_json(date) 2146 2148 2 0.5 2146.4 0.1X +infer error timestamps from Dataset[String] with default format 1989 1993 4 0.5 1989.3 0.1X +infer error timestamps from Dataset[String] with user-provided format 1922 1925 3 0.5 1922.1 0.1X +infer error timestamps from Dataset[String] with legacy format 1919 1923 4 0.5 1919.1 0.1X -OpenJDK 64-Bit Server VM 1.8.0_352-b08 on Linux 5.15.0-1023-azure -Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz +OpenJDK 64-Bit Server VM 1.8.0_292-8u292-b10-0ubuntu1~18.04-b10 on Linux 5.4.0-1045-aws +Intel(R) Xeon(R) Platinum 8375C CPU @ 2.90GHz Filters pushdown: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -w/o filters 18511 18572 54 0.0 185107.0 1.0X -pushdown disabled 17747 17768 18 0.0 177474.0 1.0X -w/ filters 717 721 4 0.1 7169.1 25.8X - +w/o filters 14387 14399 12 0.0 143871.9 1.0X +pushdown disabled 13891 13899 7 0.0 138912.3 1.0X +w/ filters 782 784 2 0.1 7820.5 18.4X +OpenJDK 64-Bit Server VM 1.8.0_292-8u292-b10-0ubuntu1~18.04-b10 on Linux 5.4.0-1045-aws +Intel(R) Xeon(R) Platinum 8375C CPU @ 2.90GHz +Partial JSON results: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +parse invalid JSON 3100 3106 6 0.0 309967.6 1.0X diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala index f2e0fd57738..c73acc1d653 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala @@ -936,16 +936,23 @@ class JsonFunctionsSuite extends QueryTest with SharedSparkSession { .add("c2", ArrayType(new StructType().add("c3", LongType).add("c4", StringType))) val df1 = Seq("""{"c2": [19], "c1": 123456}""").toDF("c0") checkAnswer(df1.select(from_json($"c0", st)), Row(Row(123456, null))) + val df2 = Seq("""{"data": {"c2": [19], "c1": 123456}}""").toDF("c0") - checkAnswer(df2.select(from_json($"c0", new StructType().add("data", st))), Row(Row(null))) + withSQLConf(SQLConf.JSON_ENABLE_PARTIAL_RESULTS.key -> "true") { + checkAnswer( + df2.select(from_json($"c0", new StructType().add("data", st))), + Row(Row(Row(123456, null))) + ) + } + withSQLConf(SQLConf.JSON_ENABLE_PARTIAL_RESULTS.key -> "false") { + checkAnswer(df2.select(from_json($"c0", new StructType().add("data", st))), Row(Row(null))) + } + val df3 = Seq("""[{"c2": [19], "c1": 123456}]""").toDF("c0") withSQLConf(SQLConf.JSON_ENABLE_PARTIAL_RESULTS.key -> "true") { - val df3 = Seq("""[{"c2": [19], "c1": 123456}]""").toDF("c0") checkAnswer(df3.select(from_json($"c0", ArrayType(st))), Row(Array(Row(123456, null)))) } - withSQLConf(SQLConf.JSON_ENABLE_PARTIAL_RESULTS.key -> "false") { - val df3 = Seq("""[{"c2": [19], "c1": 123456}]""").toDF("c0") checkAnswer(df3.select(from_json($"c0", ArrayType(st))), Row(null)) } @@ -1034,14 +1041,13 @@ class JsonFunctionsSuite extends QueryTest with SharedSparkSession { ) ) - // Value "a" cannot be parsed as an integer, - // the error cascades to "c2", thus making its value null. + // Value "a" cannot be parsed as an integer, c2 value is null. val df = Seq("""[{"c1": [{"c2": ["a"]}]}]""").toDF("c0") withSQLConf(SQLConf.JSON_ENABLE_PARTIAL_RESULTS.key -> "true") { checkAnswer( df.select(from_json($"c0", ArrayType(st))), - Row(Array(Row(null))) + Row(Array(Row(Seq(Row(null))))) ) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonBenchmark.scala index a7794848434..d64df02362e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonBenchmark.scala @@ -516,6 +516,33 @@ object JsonBenchmark extends SqlBasedBenchmark { } } + private def partialResultBenchmark(rowsNum: Int, numIters: Int): Unit = { + val benchmark = new Benchmark("Partial JSON results", rowsNum, output = output) + val colsNum = 1000 + + val fields = Seq.tabulate(colsNum)(i => StructField(s"col$i", IntegerType)) + val schema = StructType(fields) + + def data: Dataset[String] = { + spark.range(0, rowsNum, 1, 1).mapPartitions { iter => + iter.map { i => + (0 until colsNum).map { j => + // Only the last column has an integer value. + if (j < colsNum - 1) s""""col${i}":"foo_${j}"""" else s""""col${i}":${j}""" + }.mkString("{", ", ", "}") + } + }.select($"value").as[String] + } + + benchmark.addCase("parse invalid JSON", numIters) { _ => + withSQLConf(SQLConf.JSON_ENABLE_PARTIAL_RESULTS.key -> "true") { + spark.read.schema(schema).json(data).noop() + } + } + + benchmark.run() + } + override def runBenchmarkSuite(mainArgs: Array[String]): Unit = { val numIters = 3 runBenchmark("Benchmark for performance of JSON parsing") { @@ -532,6 +559,7 @@ object JsonBenchmark extends SqlBasedBenchmark { // Benchmark pushdown filters that refer to top-level columns. // TODO (SPARK-32325): Add benchmarks for filters with nested column attributes. filtersPushdownBenchmark(rowsNum = 100 * 1000, numIters) + partialResultBenchmark(rowsNum = 10000, numIters) } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index f34059b22a4..bbcda1df033 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -3437,7 +3437,7 @@ abstract class JsonSuite if (enablePartialResults) { checkAnswer( df, - Seq(Row(null, Row(1)), Row(Row(2, null), Row(2))) + Seq(Row(Row(1, null), Row(1)), Row(Row(2, null), Row(2))) ) } else { checkAnswer( @@ -3450,6 +3450,174 @@ abstract class JsonSuite } } + test("SPARK-44940: fully parse the record except f1 if partial results are enabled") { + withTempPath { path => + Seq( + """{"a1": "AAA", "a2": [{"f1": "", "f2": ""}], "a3": "id1", "a4": "XXX"}""", + """{"a1": "BBB", "a2": [{"f1": 12, "f2": ""}], "a3": "id2", "a4": "YYY"}""").toDF() + .repartition(1) + .write.text(path.getAbsolutePath) + + withSQLConf(SQLConf.JSON_ENABLE_PARTIAL_RESULTS.key -> "true") { + val df = spark.read.json(path.getAbsolutePath) + checkAnswer( + df, + Seq( + Row("AAA", Seq(Row(null, "")), "id1", "XXX"), + Row("BBB", Seq(Row(12, "")), "id2", "YYY") + ) + ) + } + + withSQLConf(SQLConf.JSON_ENABLE_PARTIAL_RESULTS.key -> "false") { + val df = spark.read.json(path.getAbsolutePath) + checkAnswer( + df, + Seq( + Row("AAA", null, null, null), + Row("BBB", Seq(Row(12, "")), "id2", "YYY") + ) + ) + } + } + } + + test("SPARK-44940: fully parse primitive map if partial results are enabled") { + withTempPath { path => + Seq( + """{"a1": "AAA", "a2": {"f1": "", "f2": ""}, "a3": "id1"}""", + """{"a1": "BBB", "a2": {"f1": 12, "f2": ""}, "a3": "id2"}""").toDF() + .repartition(1) + .write.text(path.getAbsolutePath) + + val schema = "a1 string, a2 map<string, int>, a3 string" + + withSQLConf(SQLConf.JSON_ENABLE_PARTIAL_RESULTS.key -> "true") { + val df = spark.read.schema(schema).json(path.getAbsolutePath) + // Although the keys match the string type and some values match the integer type, because + // some of the values do not match the type, we mark the entire map as null. + checkAnswer( + df, + Seq( + Row("AAA", null, "id1"), + Row("BBB", null, "id2") + ) + ) + } + + withSQLConf(SQLConf.JSON_ENABLE_PARTIAL_RESULTS.key -> "false") { + val df = spark.read.schema(schema).json(path.getAbsolutePath) + checkAnswer( + df, + Seq( + Row("AAA", null, null), + Row("BBB", null, null) + ) + ) + } + } + } + + test("SPARK-44940: fully parse map of structs if partial results are enabled") { + withTempPath { path => + Seq( + """{"a1": "AAA", "a2": {"key": {"f1": "", "f2": ""}}, "a3": "id1"}""", + """{"a1": "BBB", "a2": {"key": {"f1": 12, "f2": ""}}, "a3": "id2"}""").toDF() + .repartition(1) + .write.text(path.getAbsolutePath) + + val schema = "a1 string, a2 map<string, struct<f1: int, f2: string>>, a3 string" + + withSQLConf(SQLConf.JSON_ENABLE_PARTIAL_RESULTS.key -> "true") { + val df = spark.read.schema(schema).json(path.getAbsolutePath) + checkAnswer( + df, + Seq( + Row("AAA", Map("key" -> Row(null, "")), "id1"), + Row("BBB", Map("key" -> Row(12, "")), "id2") + ) + ) + } + + withSQLConf(SQLConf.JSON_ENABLE_PARTIAL_RESULTS.key -> "false") { + val df = spark.read.schema(schema).json(path.getAbsolutePath) + checkAnswer( + df, + Seq( + Row("AAA", null, null), + Row("BBB", Map("key" -> Row(12, "")), "id2") + ) + ) + } + } + } + + test("SPARK-44940: fully parse primitive arrays if partial results are enabled") { + withTempPath { path => + Seq( + """{"a1": "AAA", "a2": {"f1": [""]}, "a3": "id1", "a4": "XXX"}""", + """{"a1": "BBB", "a2": {"f1": [12]}, "a3": "id2", "a4": "YYY"}""").toDF() + .repartition(1) + .write.text(path.getAbsolutePath) + + withSQLConf(SQLConf.JSON_ENABLE_PARTIAL_RESULTS.key -> "true") { + val df = spark.read.json(path.getAbsolutePath) + checkAnswer( + df, + Seq( + Row("AAA", Row(null), "id1", "XXX"), + Row("BBB", Row(Seq(12)), "id2", "YYY") + ) + ) + } + + withSQLConf(SQLConf.JSON_ENABLE_PARTIAL_RESULTS.key -> "false") { + val df = spark.read.json(path.getAbsolutePath) + checkAnswer( + df, + Seq( + Row("AAA", null, null, null), + Row("BBB", Row(Seq(12)), "id2", "YYY") + ) + ) + } + } + } + + test("SPARK-44940: fully parse array of arrays if partial results are enabled") { + withTempPath { path => + Seq( + """{"a1": "AAA", "a2": [[12, ""], [""]], "a3": "id1", "a4": "XXX"}""", + """{"a1": "BBB", "a2": [[12, 34], [""]], "a3": "id2", "a4": "YYY"}""").toDF() + .repartition(1) + .write.text(path.getAbsolutePath) + + // We cannot parse `array<array<int>>` type because one of the inner arrays contains a + // mismatched type. + withSQLConf(SQLConf.JSON_ENABLE_PARTIAL_RESULTS.key -> "true") { + val df = spark.read.json(path.getAbsolutePath) + checkAnswer( + df, + Seq( + Row("AAA", null, "id1", "XXX"), + Row("BBB", null, "id2", "YYY") + ) + ) + } + + withSQLConf(SQLConf.JSON_ENABLE_PARTIAL_RESULTS.key -> "false") { + val df = spark.read.json(path.getAbsolutePath) + checkAnswer( + df, + Seq( + Row("AAA", null, "id1", "XXX"), + Row("BBB", null, "id2", "YYY") + ) + ) + } + } + } + test("SPARK-40667: validate JSON Options") { assert(JSONOptions.getAllOptions.size == 28) // Please add validation on any new Json options here --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org