This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 925457cadd22 [SPARK-48169][SQL] Use lazy BadRecordException cause in 
all parsers and remove the old constructor, which was meant for the migration
925457cadd22 is described below

commit 925457cadd229673323e91a82d0b504145f509e0
Author: Vladimir Golubev <vladimir.golu...@databricks.com>
AuthorDate: Tue May 7 09:09:00 2024 -0700

    [SPARK-48169][SQL] Use lazy BadRecordException cause in all parsers and 
remove the old constructor, which was meant for the migration
    
    ### What changes were proposed in this pull request?
    Use factory function for the exception cause in `BadRecordException` to 
avoid constructing heavy exceptions in the underlying parser. Now they are 
constructed on-demand in `FailureSafeParser`. A follow-up for 
https://github.com/apache/spark/pull/46400
    
    ### Why are the changes needed?
    - Speed-up `JacksonParser` and `StaxXmlParser`, since they throw 
user-facing exceptions to `FailureSafeParser`
    - Refactoring - leave only one constructor in `BadRecordException`
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    - `testOnly org.apache.spark.sql.catalyst.json.JacksonParserSuite`
    - `testOnly org.apache.spark.sql.catalyst.csv.UnivocityParserSuite`
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #46438 from 
vladimirg-db/vladimirg-db/use-lazy-exception-cause-in-all-bad-record-exception-invocations.
    
    Authored-by: Vladimir Golubev <vladimir.golu...@databricks.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../spark/sql/catalyst/csv/UnivocityParser.scala   |  2 +-
 .../spark/sql/catalyst/json/JacksonParser.scala    | 12 ++++++------
 .../sql/catalyst/util/BadRecordException.scala     | 10 +---------
 .../spark/sql/catalyst/xml/StaxXmlParser.scala     | 22 ++++++++++++----------
 4 files changed, 20 insertions(+), 26 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
index 37d9143e5b5a..8d06789a7512 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
@@ -359,7 +359,7 @@ class UnivocityParser(
     } else {
       if (badRecordException.isDefined) {
         throw BadRecordException(
-          () => currentInput, () => Array[InternalRow](requiredRow.get), 
badRecordException.get)
+          () => currentInput, () => Array(requiredRow.get), 
badRecordException.get)
       } else {
         requiredRow
       }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
index d1093a3b1be1..3c42f72fa6b6 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
@@ -613,7 +613,7 @@ class JacksonParser(
         // JSON parser currently doesn't support partial results for corrupted 
records.
         // For such records, all fields other than the field configured by
         // `columnNameOfCorruptRecord` are set to `null`.
-        throw BadRecordException(() => recordLiteral(record), cause = e)
+        throw BadRecordException(() => recordLiteral(record), cause = () => e)
       case e: CharConversionException if options.encoding.isEmpty =>
         val msg =
           """JSON parser cannot handle a character in its input.
@@ -621,17 +621,17 @@ class JacksonParser(
             |""".stripMargin + e.getMessage
         val wrappedCharException = new CharConversionException(msg)
         wrappedCharException.initCause(e)
-        throw BadRecordException(() => recordLiteral(record), cause = 
wrappedCharException)
+        throw BadRecordException(() => recordLiteral(record), cause = () => 
wrappedCharException)
       case PartialResultException(row, cause) =>
         throw BadRecordException(
           record = () => recordLiteral(record),
           partialResults = () => Array(row),
-          convertCauseForPartialResult(cause))
+          cause = () => convertCauseForPartialResult(cause))
       case PartialResultArrayException(rows, cause) =>
         throw BadRecordException(
           record = () => recordLiteral(record),
           partialResults = () => rows,
-          cause)
+          cause = () => cause)
       // These exceptions should never be thrown outside of JacksonParser.
       // They are used for the control flow in the parser. We add them here 
for completeness
       // since they also indicate a bad record.
@@ -639,12 +639,12 @@ class JacksonParser(
         throw BadRecordException(
           record = () => recordLiteral(record),
           partialResults = () => Array(InternalRow(arrayData)),
-          convertCauseForPartialResult(cause))
+          cause = () => convertCauseForPartialResult(cause))
       case PartialMapDataResultException(mapData, cause) =>
         throw BadRecordException(
           record = () => recordLiteral(record),
           partialResults = () => Array(InternalRow(mapData)),
-          convertCauseForPartialResult(cause))
+          cause = () => convertCauseForPartialResult(cause))
     }
   }
 }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/BadRecordException.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/BadRecordException.scala
index 84f183af7719..c4fcdf40360a 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/BadRecordException.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/BadRecordException.scala
@@ -77,7 +77,7 @@ case class PartialResultArrayException(
  */
 case class BadRecordException(
     @transient record: () => UTF8String,
-    @transient partialResults: () => Array[InternalRow],
+    @transient partialResults: () => Array[InternalRow] = () => 
Array.empty[InternalRow],
     @transient cause: () => Throwable)
     extends Exception() {
 
@@ -85,14 +85,6 @@ case class BadRecordException(
   override def fillInStackTrace(): Throwable = this
 }
 
-object BadRecordException {
-  def apply(
-      record: () => UTF8String,
-      partialResults: () => Array[InternalRow] = () => 
Array.empty[InternalRow],
-      cause: Throwable): BadRecordException =
-    new BadRecordException(record, partialResults, () => cause)
-}
-
 /**
  * Exception thrown when the underlying parser parses a JSON array as a struct.
  */
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlParser.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlParser.scala
index 2b30fe2bfab1..2b237ab5db64 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlParser.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlParser.scala
@@ -148,25 +148,27 @@ class StaxXmlParser(
         // XML parser currently doesn't support partial results for corrupted 
records.
         // For such records, all fields other than the field configured by
         // `columnNameOfCorruptRecord` are set to `null`.
-        throw BadRecordException(() => xmlRecord, cause = e)
+        throw BadRecordException(() => xmlRecord, cause = () => e)
       case e: CharConversionException if options.charset.isEmpty =>
-        val msg =
-          """XML parser cannot handle a character in its input.
-            |Specifying encoding as an input option explicitly might help to 
resolve the issue.
-            |""".stripMargin + e.getMessage
-        val wrappedCharException = new CharConversionException(msg)
-        wrappedCharException.initCause(e)
-        throw BadRecordException(() => xmlRecord, cause = wrappedCharException)
+        throw BadRecordException(() => xmlRecord, cause = () => {
+          val msg =
+            """XML parser cannot handle a character in its input.
+              |Specifying encoding as an input option explicitly might help to 
resolve the issue.
+              |""".stripMargin + e.getMessage
+          val wrappedCharException = new CharConversionException(msg)
+          wrappedCharException.initCause(e)
+          wrappedCharException
+        })
       case PartialResultException(row, cause) =>
         throw BadRecordException(
           record = () => xmlRecord,
           partialResults = () => Array(row),
-          cause)
+          () => cause)
       case PartialResultArrayException(rows, cause) =>
         throw BadRecordException(
           record = () => xmlRecord,
           partialResults = () => rows,
-          cause)
+          () => cause)
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to