This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 925457cadd22 [SPARK-48169][SQL] Use lazy BadRecordException cause in all parsers and remove the old constructor, which was meant for the migration 925457cadd22 is described below commit 925457cadd229673323e91a82d0b504145f509e0 Author: Vladimir Golubev <vladimir.golu...@databricks.com> AuthorDate: Tue May 7 09:09:00 2024 -0700 [SPARK-48169][SQL] Use lazy BadRecordException cause in all parsers and remove the old constructor, which was meant for the migration ### What changes were proposed in this pull request? Use factory function for the exception cause in `BadRecordException` to avoid constructing heavy exceptions in the underlying parser. Now they are constructed on-demand in `FailureSafeParser`. A follow-up for https://github.com/apache/spark/pull/46400 ### Why are the changes needed? - Speed-up `JacksonParser` and `StaxXmlParser`, since they throw user-facing exceptions to `FailureSafeParser` - Refactoring - leave only one constructor in `BadRecordException` ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? - `testOnly org.apache.spark.sql.catalyst.json.JacksonParserSuite` - `testOnly org.apache.spark.sql.catalyst.csv.UnivocityParserSuite` ### Was this patch authored or co-authored using generative AI tooling? No Closes #46438 from vladimirg-db/vladimirg-db/use-lazy-exception-cause-in-all-bad-record-exception-invocations. Authored-by: Vladimir Golubev <vladimir.golu...@databricks.com> Signed-off-by: Dongjoon Hyun <dh...@apple.com> --- .../spark/sql/catalyst/csv/UnivocityParser.scala | 2 +- .../spark/sql/catalyst/json/JacksonParser.scala | 12 ++++++------ .../sql/catalyst/util/BadRecordException.scala | 10 +--------- .../spark/sql/catalyst/xml/StaxXmlParser.scala | 22 ++++++++++++---------- 4 files changed, 20 insertions(+), 26 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala index 37d9143e5b5a..8d06789a7512 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala @@ -359,7 +359,7 @@ class UnivocityParser( } else { if (badRecordException.isDefined) { throw BadRecordException( - () => currentInput, () => Array[InternalRow](requiredRow.get), badRecordException.get) + () => currentInput, () => Array(requiredRow.get), badRecordException.get) } else { requiredRow } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala index d1093a3b1be1..3c42f72fa6b6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala @@ -613,7 +613,7 @@ class JacksonParser( // JSON parser currently doesn't support partial results for corrupted records. // For such records, all fields other than the field configured by // `columnNameOfCorruptRecord` are set to `null`. - throw BadRecordException(() => recordLiteral(record), cause = e) + throw BadRecordException(() => recordLiteral(record), cause = () => e) case e: CharConversionException if options.encoding.isEmpty => val msg = """JSON parser cannot handle a character in its input. @@ -621,17 +621,17 @@ class JacksonParser( |""".stripMargin + e.getMessage val wrappedCharException = new CharConversionException(msg) wrappedCharException.initCause(e) - throw BadRecordException(() => recordLiteral(record), cause = wrappedCharException) + throw BadRecordException(() => recordLiteral(record), cause = () => wrappedCharException) case PartialResultException(row, cause) => throw BadRecordException( record = () => recordLiteral(record), partialResults = () => Array(row), - convertCauseForPartialResult(cause)) + cause = () => convertCauseForPartialResult(cause)) case PartialResultArrayException(rows, cause) => throw BadRecordException( record = () => recordLiteral(record), partialResults = () => rows, - cause) + cause = () => cause) // These exceptions should never be thrown outside of JacksonParser. // They are used for the control flow in the parser. We add them here for completeness // since they also indicate a bad record. @@ -639,12 +639,12 @@ class JacksonParser( throw BadRecordException( record = () => recordLiteral(record), partialResults = () => Array(InternalRow(arrayData)), - convertCauseForPartialResult(cause)) + cause = () => convertCauseForPartialResult(cause)) case PartialMapDataResultException(mapData, cause) => throw BadRecordException( record = () => recordLiteral(record), partialResults = () => Array(InternalRow(mapData)), - convertCauseForPartialResult(cause)) + cause = () => convertCauseForPartialResult(cause)) } } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/BadRecordException.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/BadRecordException.scala index 84f183af7719..c4fcdf40360a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/BadRecordException.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/BadRecordException.scala @@ -77,7 +77,7 @@ case class PartialResultArrayException( */ case class BadRecordException( @transient record: () => UTF8String, - @transient partialResults: () => Array[InternalRow], + @transient partialResults: () => Array[InternalRow] = () => Array.empty[InternalRow], @transient cause: () => Throwable) extends Exception() { @@ -85,14 +85,6 @@ case class BadRecordException( override def fillInStackTrace(): Throwable = this } -object BadRecordException { - def apply( - record: () => UTF8String, - partialResults: () => Array[InternalRow] = () => Array.empty[InternalRow], - cause: Throwable): BadRecordException = - new BadRecordException(record, partialResults, () => cause) -} - /** * Exception thrown when the underlying parser parses a JSON array as a struct. */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlParser.scala index 2b30fe2bfab1..2b237ab5db64 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlParser.scala @@ -148,25 +148,27 @@ class StaxXmlParser( // XML parser currently doesn't support partial results for corrupted records. // For such records, all fields other than the field configured by // `columnNameOfCorruptRecord` are set to `null`. - throw BadRecordException(() => xmlRecord, cause = e) + throw BadRecordException(() => xmlRecord, cause = () => e) case e: CharConversionException if options.charset.isEmpty => - val msg = - """XML parser cannot handle a character in its input. - |Specifying encoding as an input option explicitly might help to resolve the issue. - |""".stripMargin + e.getMessage - val wrappedCharException = new CharConversionException(msg) - wrappedCharException.initCause(e) - throw BadRecordException(() => xmlRecord, cause = wrappedCharException) + throw BadRecordException(() => xmlRecord, cause = () => { + val msg = + """XML parser cannot handle a character in its input. + |Specifying encoding as an input option explicitly might help to resolve the issue. + |""".stripMargin + e.getMessage + val wrappedCharException = new CharConversionException(msg) + wrappedCharException.initCause(e) + wrappedCharException + }) case PartialResultException(row, cause) => throw BadRecordException( record = () => xmlRecord, partialResults = () => Array(row), - cause) + () => cause) case PartialResultArrayException(rows, cause) => throw BadRecordException( record = () => xmlRecord, partialResults = () => rows, - cause) + () => cause) } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org