This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 326dbb447873 [SPARK-48143][SQL] Use lightweight exceptions for 
control-flow between UnivocityParser and FailureSafeParser
326dbb447873 is described below

commit 326dbb4478732eb9b7a683511e69206f2b21bd37
Author: Vladimir Golubev <vladimir.golu...@databricks.com>
AuthorDate: Tue May 7 20:28:50 2024 +0800

    [SPARK-48143][SQL] Use lightweight exceptions for control-flow between 
UnivocityParser and FailureSafeParser
    
    # What changes were proposed in this pull request?
    New lightweight exception for control-flow between UnivocityParser and 
FalureSafeParser to speed-up malformed CSV parsing
    
    ### Why are the changes needed?
    Parsing in `PermissiveMode` is slow due to heavy exception construction 
(stacktrace filling + string template substitution in `SparkRuntimeException`)
    
    ### Does this PR introduce _any_ user-facing change?
    No, since `FailureSafeParser` unwraps `BadRecordException` and correctly 
rethrows user-facing exceptions in `FailFastMode`
    
    ### How was this patch tested?
    - `testOnly org.apache.spark.sql.catalyst.csv.UnivocityParserSuite`
    - Manually run csv benchmark on DB benchmark workspace
    - Manually checked correct and malformed csv in sherk-shell 
(org.apache.spark.SparkException is thrown with the stacktrace)
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #46400 from vladimirg-db/vladimirg-db/speed-up-csv-parser.
    
    Authored-by: Vladimir Golubev <vladimir.golu...@databricks.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../spark/sql/catalyst/csv/UnivocityParser.scala   | 10 +++++-----
 .../spark/sql/catalyst/json/JacksonParser.scala    |  5 ++---
 .../sql/catalyst/util/BadRecordException.scala     | 23 ++++++++++++++++++----
 .../sql/catalyst/util/FailureSafeParser.scala      |  2 +-
 .../spark/sql/catalyst/xml/StaxXmlParser.scala     |  5 ++---
 5 files changed, 29 insertions(+), 16 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
index a5158d8a22c6..37d9143e5b5a 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
@@ -316,17 +316,17 @@ class UnivocityParser(
       throw BadRecordException(
         () => getCurrentInput,
         () => Array.empty,
-        QueryExecutionErrors.malformedCSVRecordError(""))
+        () => QueryExecutionErrors.malformedCSVRecordError(""))
     }
 
     val currentInput = getCurrentInput
 
-    var badRecordException: Option[Throwable] = if (tokens.length != 
parsedSchema.length) {
+    var badRecordException: Option[() => Throwable] = if (tokens.length != 
parsedSchema.length) {
       // If the number of tokens doesn't match the schema, we should treat it 
as a malformed record.
       // However, we still have chance to parse some of the tokens. It 
continues to parses the
       // tokens normally and sets null when `ArrayIndexOutOfBoundsException` 
occurs for missing
       // tokens.
-      Some(QueryExecutionErrors.malformedCSVRecordError(currentInput.toString))
+      Some(() => 
QueryExecutionErrors.malformedCSVRecordError(currentInput.toString))
     } else None
     // When the length of the returned tokens is identical to the length of 
the parsed schema,
     // we just need to:
@@ -348,7 +348,7 @@ class UnivocityParser(
       } catch {
         case e: SparkUpgradeException => throw e
         case NonFatal(e) =>
-          badRecordException = badRecordException.orElse(Some(e))
+          badRecordException = badRecordException.orElse(Some(() => e))
           // Use the corresponding DEFAULT value associated with the column, 
if any.
           row.update(i, 
ResolveDefaultColumns.existenceDefaultValues(requiredSchema)(i))
       }
@@ -359,7 +359,7 @@ class UnivocityParser(
     } else {
       if (badRecordException.isDefined) {
         throw BadRecordException(
-          () => currentInput, () => Array(requiredRow.get), 
badRecordException.get)
+          () => currentInput, () => Array[InternalRow](requiredRow.get), 
badRecordException.get)
       } else {
         requiredRow
       }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
index eadd0a4f8ab9..d1093a3b1be1 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
@@ -613,7 +613,7 @@ class JacksonParser(
         // JSON parser currently doesn't support partial results for corrupted 
records.
         // For such records, all fields other than the field configured by
         // `columnNameOfCorruptRecord` are set to `null`.
-        throw BadRecordException(() => recordLiteral(record), () => 
Array.empty, e)
+        throw BadRecordException(() => recordLiteral(record), cause = e)
       case e: CharConversionException if options.encoding.isEmpty =>
         val msg =
           """JSON parser cannot handle a character in its input.
@@ -621,8 +621,7 @@ class JacksonParser(
             |""".stripMargin + e.getMessage
         val wrappedCharException = new CharConversionException(msg)
         wrappedCharException.initCause(e)
-        throw BadRecordException(() => recordLiteral(record), () => 
Array.empty,
-          wrappedCharException)
+        throw BadRecordException(() => recordLiteral(record), cause = 
wrappedCharException)
       case PartialResultException(row, cause) =>
         throw BadRecordException(
           record = () => recordLiteral(record),
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/BadRecordException.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/BadRecordException.scala
index 65a56c1064e4..84f183af7719 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/BadRecordException.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/BadRecordException.scala
@@ -67,16 +67,31 @@ case class PartialResultArrayException(
   extends Exception(cause)
 
 /**
- * Exception thrown when the underlying parser meet a bad record and can't 
parse it.
+ * Exception thrown when the underlying parser meets a bad record and can't 
parse it. Used for
+ * control flow between wrapper and underlying parser without overhead of 
creating a full exception.
  * @param record a function to return the record that cause the parser to fail
  * @param partialResults a function that returns an row array, which is the 
partial results of
  *                      parsing this bad record.
- * @param cause the actual exception about why the record is bad and can't be 
parsed.
+ * @param cause a function to return the actual exception about why the record 
is bad and can't be
+ *                      parsed.
  */
 case class BadRecordException(
     @transient record: () => UTF8String,
-    @transient partialResults: () => Array[InternalRow] = () => 
Array.empty[InternalRow],
-    cause: Throwable) extends Exception(cause)
+    @transient partialResults: () => Array[InternalRow],
+    @transient cause: () => Throwable)
+    extends Exception() {
+
+  override def getStackTrace(): Array[StackTraceElement] = new 
Array[StackTraceElement](0)
+  override def fillInStackTrace(): Throwable = this
+}
+
+object BadRecordException {
+  def apply(
+      record: () => UTF8String,
+      partialResults: () => Array[InternalRow] = () => 
Array.empty[InternalRow],
+      cause: Throwable): BadRecordException =
+    new BadRecordException(record, partialResults, () => cause)
+}
 
 /**
  * Exception thrown when the underlying parser parses a JSON array as a struct.
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala
index 10cd159c769b..b005563aa824 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala
@@ -70,7 +70,7 @@ class FailureSafeParser[IN](
         case DropMalformedMode =>
           Iterator.empty
         case FailFastMode =>
-          e.getCause match {
+          e.cause() match {
             case _: JsonArraysAsStructsException =>
               // SPARK-42298 we recreate the exception here to make sure the 
error message
               // have the record content.
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlParser.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlParser.scala
index ab671e56a21e..2b30fe2bfab1 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlParser.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlParser.scala
@@ -148,7 +148,7 @@ class StaxXmlParser(
         // XML parser currently doesn't support partial results for corrupted 
records.
         // For such records, all fields other than the field configured by
         // `columnNameOfCorruptRecord` are set to `null`.
-        throw BadRecordException(() => xmlRecord, () => Array.empty, e)
+        throw BadRecordException(() => xmlRecord, cause = e)
       case e: CharConversionException if options.charset.isEmpty =>
         val msg =
           """XML parser cannot handle a character in its input.
@@ -156,8 +156,7 @@ class StaxXmlParser(
             |""".stripMargin + e.getMessage
         val wrappedCharException = new CharConversionException(msg)
         wrappedCharException.initCause(e)
-        throw BadRecordException(() => xmlRecord, () => Array.empty,
-          wrappedCharException)
+        throw BadRecordException(() => xmlRecord, cause = wrappedCharException)
       case PartialResultException(row, cause) =>
         throw BadRecordException(
           record = () => xmlRecord,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to