This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 6cc3dc2ef4d2 [SPARK-48169][SPARK-48143][SQL] Revert BadRecordException 
optimizations
6cc3dc2ef4d2 is described below

commit 6cc3dc2ef4d2ffbff7ffc400e723b97b462e1bab
Author: Vladimir Golubev <vladimir.golu...@databricks.com>
AuthorDate: Thu May 9 15:35:28 2024 +0800

    [SPARK-48169][SPARK-48143][SQL] Revert BadRecordException optimizations
    
    ### What changes were proposed in this pull request?
    Revert BadRecordException optimizations for UnivocityParser, StaxXmlParser 
and JacksonParser
    
    ### Why are the changes needed?
    To reduce the blast radius - this will be implemented differently. There 
were two PRs by me recently:
    - https://github.com/apache/spark/pull/46438
    - https://github.com/apache/spark/pull/46400
    
    which introduced optimizations to speed-up control flow between 
UnivocityParser, StaxXmlParser and JacksonParser. However, these changes are 
quite unstable and may break any calling code, which relies on exception cause 
type, for example. Also, there may be some Spark plugins/extensions using that 
exception for user-facing errors
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    N/A
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #46478 from vladimirg-db/vladimirg-db/revert-SPARK-48169-SPARK-48143.
    
    Authored-by: Vladimir Golubev <vladimir.golu...@databricks.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../spark/sql/catalyst/csv/UnivocityParser.scala   |  8 ++++----
 .../spark/sql/catalyst/json/JacksonParser.scala    | 13 ++++++------
 .../sql/catalyst/util/BadRecordException.scala     | 13 +++---------
 .../sql/catalyst/util/FailureSafeParser.scala      |  2 +-
 .../spark/sql/catalyst/xml/StaxXmlParser.scala     | 23 +++++++++++-----------
 5 files changed, 26 insertions(+), 33 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
index 8d06789a7512..a5158d8a22c6 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
@@ -316,17 +316,17 @@ class UnivocityParser(
       throw BadRecordException(
         () => getCurrentInput,
         () => Array.empty,
-        () => QueryExecutionErrors.malformedCSVRecordError(""))
+        QueryExecutionErrors.malformedCSVRecordError(""))
     }
 
     val currentInput = getCurrentInput
 
-    var badRecordException: Option[() => Throwable] = if (tokens.length != 
parsedSchema.length) {
+    var badRecordException: Option[Throwable] = if (tokens.length != 
parsedSchema.length) {
       // If the number of tokens doesn't match the schema, we should treat it 
as a malformed record.
       // However, we still have chance to parse some of the tokens. It 
continues to parses the
       // tokens normally and sets null when `ArrayIndexOutOfBoundsException` 
occurs for missing
       // tokens.
-      Some(() => 
QueryExecutionErrors.malformedCSVRecordError(currentInput.toString))
+      Some(QueryExecutionErrors.malformedCSVRecordError(currentInput.toString))
     } else None
     // When the length of the returned tokens is identical to the length of 
the parsed schema,
     // we just need to:
@@ -348,7 +348,7 @@ class UnivocityParser(
       } catch {
         case e: SparkUpgradeException => throw e
         case NonFatal(e) =>
-          badRecordException = badRecordException.orElse(Some(() => e))
+          badRecordException = badRecordException.orElse(Some(e))
           // Use the corresponding DEFAULT value associated with the column, 
if any.
           row.update(i, 
ResolveDefaultColumns.existenceDefaultValues(requiredSchema)(i))
       }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
index 848c20ee36be..5e75ff6f6e1a 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
@@ -613,7 +613,7 @@ class JacksonParser(
         // JSON parser currently doesn't support partial results for corrupted 
records.
         // For such records, all fields other than the field configured by
         // `columnNameOfCorruptRecord` are set to `null`.
-        throw BadRecordException(() => recordLiteral(record), cause = () => e)
+        throw BadRecordException(() => recordLiteral(record), () => 
Array.empty, e)
       case e: CharConversionException if options.encoding.isEmpty =>
         val msg =
           """JSON parser cannot handle a character in its input.
@@ -621,17 +621,18 @@ class JacksonParser(
             |""".stripMargin + e.getMessage
         val wrappedCharException = new CharConversionException(msg)
         wrappedCharException.initCause(e)
-        throw BadRecordException(() => recordLiteral(record), cause = () => 
wrappedCharException)
+        throw BadRecordException(() => recordLiteral(record), () => 
Array.empty,
+          wrappedCharException)
       case PartialResultException(row, cause) =>
         throw BadRecordException(
           record = () => recordLiteral(record),
           partialResults = () => Array(row),
-          cause = () => convertCauseForPartialResult(cause))
+          convertCauseForPartialResult(cause))
       case PartialResultArrayException(rows, cause) =>
         throw BadRecordException(
           record = () => recordLiteral(record),
           partialResults = () => rows,
-          cause = () => cause)
+          cause)
       // These exceptions should never be thrown outside of JacksonParser.
       // They are used for the control flow in the parser. We add them here 
for completeness
       // since they also indicate a bad record.
@@ -639,12 +640,12 @@ class JacksonParser(
         throw BadRecordException(
           record = () => recordLiteral(record),
           partialResults = () => Array(InternalRow(arrayData)),
-          cause = () => convertCauseForPartialResult(cause))
+          convertCauseForPartialResult(cause))
       case PartialMapDataResultException(mapData, cause) =>
         throw BadRecordException(
           record = () => recordLiteral(record),
           partialResults = () => Array(InternalRow(mapData)),
-          cause = () => convertCauseForPartialResult(cause))
+          convertCauseForPartialResult(cause))
     }
   }
 }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/BadRecordException.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/BadRecordException.scala
index c4fcdf40360a..65a56c1064e4 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/BadRecordException.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/BadRecordException.scala
@@ -67,23 +67,16 @@ case class PartialResultArrayException(
   extends Exception(cause)
 
 /**
- * Exception thrown when the underlying parser meets a bad record and can't 
parse it. Used for
- * control flow between wrapper and underlying parser without overhead of 
creating a full exception.
+ * Exception thrown when the underlying parser meet a bad record and can't 
parse it.
  * @param record a function to return the record that cause the parser to fail
  * @param partialResults a function that returns an row array, which is the 
partial results of
  *                      parsing this bad record.
- * @param cause a function to return the actual exception about why the record 
is bad and can't be
- *                      parsed.
+ * @param cause the actual exception about why the record is bad and can't be 
parsed.
  */
 case class BadRecordException(
     @transient record: () => UTF8String,
     @transient partialResults: () => Array[InternalRow] = () => 
Array.empty[InternalRow],
-    @transient cause: () => Throwable)
-    extends Exception() {
-
-  override def getStackTrace(): Array[StackTraceElement] = new 
Array[StackTraceElement](0)
-  override def fillInStackTrace(): Throwable = this
-}
+    cause: Throwable) extends Exception(cause)
 
 /**
  * Exception thrown when the underlying parser parses a JSON array as a struct.
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala
index b005563aa824..10cd159c769b 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala
@@ -70,7 +70,7 @@ class FailureSafeParser[IN](
         case DropMalformedMode =>
           Iterator.empty
         case FailFastMode =>
-          e.cause() match {
+          e.getCause match {
             case _: JsonArraysAsStructsException =>
               // SPARK-42298 we recreate the exception here to make sure the 
error message
               // have the record content.
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlParser.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlParser.scala
index 2b237ab5db64..ab671e56a21e 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlParser.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlParser.scala
@@ -148,27 +148,26 @@ class StaxXmlParser(
         // XML parser currently doesn't support partial results for corrupted 
records.
         // For such records, all fields other than the field configured by
         // `columnNameOfCorruptRecord` are set to `null`.
-        throw BadRecordException(() => xmlRecord, cause = () => e)
+        throw BadRecordException(() => xmlRecord, () => Array.empty, e)
       case e: CharConversionException if options.charset.isEmpty =>
-        throw BadRecordException(() => xmlRecord, cause = () => {
-          val msg =
-            """XML parser cannot handle a character in its input.
-              |Specifying encoding as an input option explicitly might help to 
resolve the issue.
-              |""".stripMargin + e.getMessage
-          val wrappedCharException = new CharConversionException(msg)
-          wrappedCharException.initCause(e)
-          wrappedCharException
-        })
+        val msg =
+          """XML parser cannot handle a character in its input.
+            |Specifying encoding as an input option explicitly might help to 
resolve the issue.
+            |""".stripMargin + e.getMessage
+        val wrappedCharException = new CharConversionException(msg)
+        wrappedCharException.initCause(e)
+        throw BadRecordException(() => xmlRecord, () => Array.empty,
+          wrappedCharException)
       case PartialResultException(row, cause) =>
         throw BadRecordException(
           record = () => xmlRecord,
           partialResults = () => Array(row),
-          () => cause)
+          cause)
       case PartialResultArrayException(rows, cause) =>
         throw BadRecordException(
           record = () => xmlRecord,
           partialResults = () => rows,
-          () => cause)
+          cause)
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to