This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new a6632ffa16f6 [SPARK-48143][SQL] Use lightweight exceptions for 
control-flow between UnivocityParser and FailureSafeParser
a6632ffa16f6 is described below

commit a6632ffa16f6907eba96e745920d571924bf4b63
Author: Vladimir Golubev <vladimir.golu...@databricks.com>
AuthorDate: Sat May 11 00:37:54 2024 +0800

    [SPARK-48143][SQL] Use lightweight exceptions for control-flow between 
UnivocityParser and FailureSafeParser
    
    # What changes were proposed in this pull request?
    New lightweight exception for control-flow between UnivocityParser and 
FalureSafeParser to speed-up malformed CSV parsing.
    
    This is a different way to implement these reverted changes: 
https://github.com/apache/spark/pull/46478
    
    The previous implementation was more invasive - removing `cause` from 
`BadRecordException` could break upper code, which unwraps errors and checks 
the types of the causes. This implementation only touches `FailureSafeParser` 
and `UnivocityParser` since in the codebase they are always used together, 
unlike `JacksonParser` and `StaxXmlParser`. Removing stacktrace from 
`BadRecordException` is safe, since the cause itself has an adequate stacktrace 
(except pure control-flow cases).
    
    ### Why are the changes needed?
    Parsing in `PermissiveMode` is slow due to heavy exception construction 
(stacktrace filling + string template substitution in `SparkRuntimeException`)
    
    ### Does this PR introduce _any_ user-facing change?
    No, since `FailureSafeParser` unwraps `BadRecordException` and correctly 
rethrows user-facing exceptions in `FailFastMode`
    
    ### How was this patch tested?
    - `testOnly org.apache.spark.sql.catalyst.csv.UnivocityParserSuite`
    - Manually run csv benchmark
    - Manually checked correct and malformed csv in sherk-shell 
(org.apache.spark.SparkException is thrown with the stacktrace)
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #46500 from 
vladimirg-db/vladimirg-db/use-special-lighweight-exception-for-control-flow-between-univocity-parser-and-failure-safe-parser.
    
    Authored-by: Vladimir Golubev <vladimir.golu...@databricks.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../spark/sql/catalyst/csv/UnivocityParser.scala   |  5 +++--
 .../sql/catalyst/util/BadRecordException.scala     | 22 +++++++++++++++++++---
 .../sql/catalyst/util/FailureSafeParser.scala      | 11 +++++++++--
 3 files changed, 31 insertions(+), 7 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
index a5158d8a22c6..4d95097e1681 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
@@ -316,7 +316,7 @@ class UnivocityParser(
       throw BadRecordException(
         () => getCurrentInput,
         () => Array.empty,
-        QueryExecutionErrors.malformedCSVRecordError(""))
+        LazyBadRecordCauseWrapper(() => 
QueryExecutionErrors.malformedCSVRecordError("")))
     }
 
     val currentInput = getCurrentInput
@@ -326,7 +326,8 @@ class UnivocityParser(
       // However, we still have chance to parse some of the tokens. It 
continues to parses the
       // tokens normally and sets null when `ArrayIndexOutOfBoundsException` 
occurs for missing
       // tokens.
-      Some(QueryExecutionErrors.malformedCSVRecordError(currentInput.toString))
+      Some(LazyBadRecordCauseWrapper(
+        () => 
QueryExecutionErrors.malformedCSVRecordError(currentInput.toString)))
     } else None
     // When the length of the returned tokens is identical to the length of 
the parsed schema,
     // we just need to:
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/BadRecordException.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/BadRecordException.scala
index 65a56c1064e4..654b0b8c73e5 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/BadRecordException.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/BadRecordException.scala
@@ -67,16 +67,32 @@ case class PartialResultArrayException(
   extends Exception(cause)
 
 /**
- * Exception thrown when the underlying parser meet a bad record and can't 
parse it.
+ * Exception thrown when the underlying parser met a bad record and can't 
parse it.
+ * The stacktrace is not collected for better preformance, and thus, this 
exception should
+ * not be used in a user-facing context.
  * @param record a function to return the record that cause the parser to fail
  * @param partialResults a function that returns an row array, which is the 
partial results of
  *                      parsing this bad record.
- * @param cause the actual exception about why the record is bad and can't be 
parsed.
+ * @param cause the actual exception about why the record is bad and can't be 
parsed. It's better
+ *                      to use `LazyBadRecordCauseWrapper` here to delay heavy 
cause construction
+ *                      until it's needed.
  */
 case class BadRecordException(
     @transient record: () => UTF8String,
     @transient partialResults: () => Array[InternalRow] = () => 
Array.empty[InternalRow],
-    cause: Throwable) extends Exception(cause)
+    cause: Throwable) extends Exception(cause) {
+  override def getStackTrace(): Array[StackTraceElement] = new 
Array[StackTraceElement](0)
+  override def fillInStackTrace(): Throwable = this
+}
+
+/**
+ * Exception to use as `BadRecordException` cause to delay heavy user-facing 
exception construction.
+ * Does not contain stacktrace and used only for control flow
+ */
+case class LazyBadRecordCauseWrapper(cause: () => Throwable) extends 
Exception() {
+  override def getStackTrace(): Array[StackTraceElement] = new 
Array[StackTraceElement](0)
+  override def fillInStackTrace(): Throwable = this
+}
 
 /**
  * Exception thrown when the underlying parser parses a JSON array as a struct.
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala
index 10cd159c769b..d9946d1b12ec 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala
@@ -78,10 +78,17 @@ class FailureSafeParser[IN](
             case StringAsDataTypeException(fieldName, fieldValue, dataType) =>
               throw 
QueryExecutionErrors.cannotParseStringAsDataTypeError(e.record().toString,
                 fieldName, fieldValue, dataType)
-            case other => throw 
QueryExecutionErrors.malformedRecordsDetectedInRecordParsingError(
-              toResultRow(e.partialResults().headOption, e.record).toString, 
other)
+            case causeWrapper: LazyBadRecordCauseWrapper =>
+              throwMalformedRecordsDetectedInRecordParsingError(e, 
causeWrapper.cause())
+            case cause => throwMalformedRecordsDetectedInRecordParsingError(e, 
cause)
           }
       }
     }
   }
+
+  private def throwMalformedRecordsDetectedInRecordParsingError(
+      e: BadRecordException, cause: Throwable): Nothing = {
+    throw QueryExecutionErrors.malformedRecordsDetectedInRecordParsingError(
+      toResultRow(e.partialResults().headOption, e.record).toString, cause)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to