This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new ba98b7ae53f8 [SPARK-47689][SQL] Do not wrap query execution error 
during data writing
ba98b7ae53f8 is described below

commit ba98b7ae53f8b106cb0cfd8f0b3fe1a8068c1ce6
Author: Wenchen Fan <wenc...@databricks.com>
AuthorDate: Tue Apr 2 20:34:32 2024 +0800

    [SPARK-47689][SQL] Do not wrap query execution error during data writing
    
    ### What changes were proposed in this pull request?
    
    It's quite confusing to report `TASK_WRITE_FAILED` error when the error was 
caused by input query execution. This PR updates the error wrapping code to not 
wrap with `TASK_WRITE_FAILED` if the error was from input query execution.
    
    ### Why are the changes needed?
    
    better error reporting
    
    ### Does this PR introduce _any_ user-facing change?
    
    yes, now people won't see `TASK_WRITE_FAILED` error if the error was from 
input query.
    
    ### How was this patch tested?
    
    updated tests
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    no
    
    Closes #45797 from cloud-fan/write-error.
    
    Authored-by: Wenchen Fan <wenc...@databricks.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../execution/datasources/FileFormatWriter.scala   |  29 ++-
 .../apache/spark/sql/CharVarcharTestSuite.scala    | 256 ++++-----------------
 .../sql/errors/QueryExecutionAnsiErrorsSuite.scala |  16 +-
 .../org/apache/spark/sql/sources/InsertSuite.scala |  40 ++--
 .../spark/sql/HiveCharVarcharTestSuite.scala       |  20 +-
 5 files changed, 89 insertions(+), 272 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
index 9dbadbd97ec7..1df63aa14b4b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
@@ -41,7 +41,7 @@ import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.sql.execution.{ProjectExec, SortExec, SparkPlan, 
SQLExecution, UnsafeExternalRowSorter}
 import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
 import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.util.{SerializableConfiguration, Utils}
+import org.apache.spark.util.{NextIterator, SerializableConfiguration, Utils}
 import org.apache.spark.util.ArrayImplicits._
 
 
@@ -401,9 +401,10 @@ object FileFormatWriter extends Logging {
       }
 
     try {
+      val queryFailureCapturedIterator = new 
QueryFailureCapturedIterator(iterator)
       Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
         // Execute the task to write rows out and commit the task.
-        dataWriter.writeWithIterator(iterator)
+        dataWriter.writeWithIterator(queryFailureCapturedIterator)
         dataWriter.commit()
       })(catchBlock = {
         // If there is an error, abort the task
@@ -413,6 +414,8 @@ object FileFormatWriter extends Logging {
         dataWriter.close()
       })
     } catch {
+      case e: QueryFailureDuringWrite =>
+        throw e.queryFailure
       case e: FetchFailedException =>
         throw e
       case f: FileAlreadyExistsException if 
SQLConf.get.fastFailFileFormatOutput =>
@@ -452,3 +455,25 @@ object FileFormatWriter extends Logging {
     }
   }
 }
+
+// A exception wrapper to indicate that the error was thrown when executing 
the query, not writing
+// the data
+private class QueryFailureDuringWrite(val queryFailure: Throwable) extends 
Throwable
+
+// An iterator wrapper to rethrow any error from the given iterator with 
`QueryFailureDuringWrite`.
+private class QueryFailureCapturedIterator(data: Iterator[InternalRow])
+  extends NextIterator[InternalRow] {
+
+  override protected def getNext(): InternalRow = try {
+    if (data.hasNext) {
+      data.next()
+    } else {
+      finished = true
+      null
+    }
+  } catch {
+    case t: Throwable => throw new QueryFailureDuringWrite(t)
+  }
+
+  override protected def close(): Unit = {}
+}
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala
index 7e845e69c772..013177425da7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql
 
-import org.apache.spark.{SparkConf, SparkException, SparkRuntimeException}
+import org.apache.spark.{SparkConf, SparkRuntimeException}
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
 import org.apache.spark.sql.catalyst.plans.logical.Project
@@ -60,6 +60,18 @@ trait CharVarcharTestSuite extends QueryTest with 
SQLTestUtils {
     }
   }
 
+  def assertLengthCheckFailure(query: String): Unit = {
+    assertLengthCheckFailure(() => sql(query))
+  }
+
+  def assertLengthCheckFailure(func: () => Unit): Unit = {
+    checkError(
+      exception = intercept[SparkRuntimeException](func()),
+      errorClass = "EXCEED_LIMIT_LENGTH",
+      parameters = Map("limit" -> "5")
+    )
+  }
+
   test("apply char padding/trimming and varchar trimming: top-level columns") {
     Seq("CHAR(5)", "VARCHAR(5)").foreach { typ =>
       withTable("t") {
@@ -147,28 +159,12 @@ trait CharVarcharTestSuite extends QueryTest with 
SQLTestUtils {
       withTable("t") {
         sql(s"CREATE TABLE t(i STRING, c $typ) USING $format PARTITIONED BY 
(c)")
         Seq("ADD", "DROP").foreach { op =>
-          checkError(
-            exception = intercept[SparkRuntimeException] {
-              sql(s"ALTER TABLE t $op PARTITION(c='abcdef')")
-            },
-            errorClass = "EXCEED_LIMIT_LENGTH",
-            parameters = Map("limit" -> "5")
-          )
+          assertLengthCheckFailure(s"ALTER TABLE t $op PARTITION(c='abcdef')")
         }
-        checkError(
-          exception = intercept[SparkRuntimeException] {
-            sql(s"ALTER TABLE t PARTITION (c='abcdef') RENAME TO PARTITION 
(c='2')")
-          },
-          errorClass = "EXCEED_LIMIT_LENGTH",
-          parameters = Map("limit" -> "5")
-        )
-        checkError(
-          exception = intercept[SparkRuntimeException] {
-            sql(s"ALTER TABLE t PARTITION (c='1') RENAME TO PARTITION 
(c='abcdef')")
-          },
-          errorClass = "EXCEED_LIMIT_LENGTH",
-          parameters = Map("limit" -> "5")
-        )
+        assertLengthCheckFailure(
+          "ALTER TABLE t PARTITION (c='abcdef') RENAME TO PARTITION (c='2')")
+        assertLengthCheckFailure(
+          "ALTER TABLE t PARTITION (c='1') RENAME TO PARTITION (c='abcdef')")
       }
     }
   }
@@ -315,17 +311,7 @@ trait CharVarcharTestSuite extends QueryTest with 
SQLTestUtils {
       sql(s"CREATE TABLE t(c $typeName(5)) USING $format")
       sql("INSERT INTO t VALUES (null)")
       checkAnswer(spark.table("t"), Row(null))
-      val ex = intercept[Exception] {
-        sql("INSERT INTO t VALUES ('123456')")
-      }
-      checkError(
-        exception = ex match {
-          case c: SparkRuntimeException => c
-          case c: SparkException => 
c.getCause.asInstanceOf[SparkRuntimeException]
-        },
-        errorClass = "EXCEED_LIMIT_LENGTH",
-        parameters = Map("limit" -> "5")
-      )
+      assertLengthCheckFailure("INSERT INTO t VALUES ('123456')")
     }
   }
 
@@ -337,14 +323,7 @@ trait CharVarcharTestSuite extends QueryTest with 
SQLTestUtils {
         sql(s"CREATE TABLE $tableName(i INT, c $typeName(5)) USING $format 
PARTITIONED BY (c)")
         sql(s"INSERT INTO $tableName VALUES (1, null)")
         checkAnswer(spark.table(tableName), Row(1, null))
-        checkError(
-          exception = intercept[SparkException] {
-            sql(s"INSERT INTO $tableName VALUES (1, '123456')")
-          },
-          errorClass = "TASK_WRITE_FAILED",
-          parameters = Map("path" -> s".*$tableName.*"),
-          matchPVals = true
-        )
+        assertLengthCheckFailure(s"INSERT INTO $tableName VALUES (1, 
'123456')")
       }
     }
   }
@@ -354,13 +333,7 @@ trait CharVarcharTestSuite extends QueryTest with 
SQLTestUtils {
       sql(s"CREATE TABLE t(c STRUCT<c: $typeName(5)>) USING $format")
       sql("INSERT INTO t SELECT struct(null)")
       checkAnswer(spark.table("t"), Row(Row(null)))
-      checkError(
-        exception = intercept[SparkRuntimeException] {
-          sql("INSERT INTO t SELECT struct('123456')")
-        },
-        errorClass = "EXCEED_LIMIT_LENGTH",
-        parameters = Map("limit" -> "5")
-      )
+      assertLengthCheckFailure("INSERT INTO t SELECT struct('123456')")
     }
   }
 
@@ -369,32 +342,14 @@ trait CharVarcharTestSuite extends QueryTest with 
SQLTestUtils {
       sql(s"CREATE TABLE t(c ARRAY<$typeName(5)>) USING $format")
       sql("INSERT INTO t VALUES (array(null))")
       checkAnswer(spark.table("t"), Row(Seq(null)))
-      val e = intercept[Exception] {
-        sql("INSERT INTO t VALUES (array('a', '123456'))")
-      }
-      checkError(
-        exception = e match {
-          case c: SparkRuntimeException => c
-          case c: SparkException => 
c.getCause.asInstanceOf[SparkRuntimeException]
-        },
-        errorClass = "EXCEED_LIMIT_LENGTH",
-        parameters = Map("limit" -> "5")
-      )
+      assertLengthCheckFailure("INSERT INTO t VALUES (array('a', '123456'))")
     }
   }
 
   test("length check for input string values: nested in map key") {
     testTableWrite { typeName =>
       sql(s"CREATE TABLE t(c MAP<$typeName(5), STRING>) USING $format")
-      val e = intercept[Exception](sql("INSERT INTO t VALUES (map('123456', 
'a'))"))
-      checkError(
-        exception = e match {
-          case c: SparkRuntimeException => c
-          case c: SparkException => 
c.getCause.asInstanceOf[SparkRuntimeException]
-        },
-        errorClass = "EXCEED_LIMIT_LENGTH",
-        parameters = Map("limit" -> "5")
-      )
+      assertLengthCheckFailure("INSERT INTO t VALUES (map('123456', 'a'))")
     }
   }
 
@@ -403,39 +358,15 @@ trait CharVarcharTestSuite extends QueryTest with 
SQLTestUtils {
       sql(s"CREATE TABLE t(c MAP<STRING, $typeName(5)>) USING $format")
       sql("INSERT INTO t VALUES (map('a', null))")
       checkAnswer(spark.table("t"), Row(Map("a" -> null)))
-      val e = intercept[Exception](sql("INSERT INTO t VALUES (map('a', 
'123456'))"))
-      checkError(
-        exception = e match {
-          case c: SparkRuntimeException => c
-          case c: SparkException => 
c.getCause.asInstanceOf[SparkRuntimeException]
-        },
-        errorClass = "EXCEED_LIMIT_LENGTH",
-        parameters = Map("limit" -> "5")
-      )
+      assertLengthCheckFailure("INSERT INTO t VALUES (map('a', '123456'))")
     }
   }
 
   test("length check for input string values: nested in both map key and 
value") {
     testTableWrite { typeName =>
       sql(s"CREATE TABLE t(c MAP<$typeName(5), $typeName(5)>) USING $format")
-      val e1 = intercept[Exception](sql("INSERT INTO t VALUES (map('123456', 
'a'))"))
-      checkError(
-        exception = e1 match {
-          case c: SparkRuntimeException => c
-          case c: SparkException => 
c.getCause.asInstanceOf[SparkRuntimeException]
-        },
-        errorClass = "EXCEED_LIMIT_LENGTH",
-        parameters = Map("limit" -> "5")
-      )
-      val e2 = intercept[Exception](sql("INSERT INTO t VALUES (map('a', 
'123456'))"))
-      checkError(
-        exception = e2 match {
-          case c: SparkRuntimeException => c
-          case c: SparkException => 
c.getCause.asInstanceOf[SparkRuntimeException]
-        },
-        errorClass = "EXCEED_LIMIT_LENGTH",
-        parameters = Map("limit" -> "5")
-      )
+      assertLengthCheckFailure("INSERT INTO t VALUES (map('123456', 'a'))")
+      assertLengthCheckFailure("INSERT INTO t VALUES (map('a', '123456'))")
     }
   }
 
@@ -444,15 +375,7 @@ trait CharVarcharTestSuite extends QueryTest with 
SQLTestUtils {
       sql(s"CREATE TABLE t(c STRUCT<c: ARRAY<$typeName(5)>>) USING $format")
       sql("INSERT INTO t SELECT struct(array(null))")
       checkAnswer(spark.table("t"), Row(Row(Seq(null))))
-      val e = intercept[Exception](sql("INSERT INTO t SELECT 
struct(array('123456'))"))
-      checkError(
-        exception = e match {
-          case c: SparkRuntimeException => c
-          case c: SparkException => 
c.getCause.asInstanceOf[SparkRuntimeException]
-        },
-        errorClass = "EXCEED_LIMIT_LENGTH",
-        parameters = Map("limit" -> "5")
-      )
+      assertLengthCheckFailure("INSERT INTO t SELECT struct(array('123456'))")
     }
   }
 
@@ -461,15 +384,7 @@ trait CharVarcharTestSuite extends QueryTest with 
SQLTestUtils {
       sql(s"CREATE TABLE t(c ARRAY<STRUCT<c: $typeName(5)>>) USING $format")
       sql("INSERT INTO t VALUES (array(struct(null)))")
       checkAnswer(spark.table("t"), Row(Seq(Row(null))))
-      val e = intercept[Exception](sql("INSERT INTO t VALUES 
(array(struct('123456')))"))
-      checkError(
-        exception = e match {
-          case c: SparkRuntimeException => c
-          case c: SparkException => 
c.getCause.asInstanceOf[SparkRuntimeException]
-        },
-        errorClass = "EXCEED_LIMIT_LENGTH",
-        parameters = Map("limit" -> "5")
-      )
+      assertLengthCheckFailure("INSERT INTO t VALUES 
(array(struct('123456')))")
     }
   }
 
@@ -478,15 +393,7 @@ trait CharVarcharTestSuite extends QueryTest with 
SQLTestUtils {
       sql(s"CREATE TABLE t(c ARRAY<ARRAY<$typeName(5)>>) USING $format")
       sql("INSERT INTO t VALUES (array(array(null)))")
       checkAnswer(spark.table("t"), Row(Seq(Seq(null))))
-      val e = intercept[Exception](sql("INSERT INTO t VALUES 
(array(array('123456')))"))
-      checkError(
-        exception = e match {
-          case c: SparkRuntimeException => c
-          case c: SparkException => 
c.getCause.asInstanceOf[SparkRuntimeException]
-        },
-        errorClass = "EXCEED_LIMIT_LENGTH",
-        parameters = Map("limit" -> "5")
-      )
+      assertLengthCheckFailure("INSERT INTO t VALUES (array(array('123456')))")
     }
   }
 
@@ -506,24 +413,8 @@ trait CharVarcharTestSuite extends QueryTest with 
SQLTestUtils {
       sql(s"CREATE TABLE t(c1 CHAR(5), c2 VARCHAR(5)) USING $format")
       sql("INSERT INTO t VALUES (1234, 1234)")
       checkAnswer(spark.table("t"), Row("1234 ", "1234"))
-      val e1 = intercept[Exception](sql("INSERT INTO t VALUES (123456, 1)"))
-      checkError(
-        exception = e1 match {
-          case c: SparkRuntimeException => c
-          case c: SparkException => 
c.getCause.asInstanceOf[SparkRuntimeException]
-        },
-        errorClass = "EXCEED_LIMIT_LENGTH",
-        parameters = Map("limit" -> "5")
-      )
-      val e2 = intercept[Exception](sql("INSERT INTO t VALUES (1, 123456)"))
-      checkError(
-        exception = e2 match {
-          case c: SparkRuntimeException => c
-          case c: SparkException => 
c.getCause.asInstanceOf[SparkRuntimeException]
-        },
-        errorClass = "EXCEED_LIMIT_LENGTH",
-        parameters = Map("limit" -> "5")
-      )
+      assertLengthCheckFailure("INSERT INTO t VALUES (123456, 1)")
+      assertLengthCheckFailure("INSERT INTO t VALUES (1, 123456)")
     }
   }
 
@@ -1046,23 +937,8 @@ class FileSourceCharVarcharTestSuite extends 
CharVarcharTestSuite with SharedSpa
           sql(s"ALTER TABLE $tableName DROP PARTITION(c=$v)")
           checkAnswer(spark.table(tableName), Nil)
         }
-
-        checkError(
-          exception = intercept[SparkException] {
-            sql(s"INSERT OVERWRITE $tableName VALUES ('1', 100000)")
-          },
-          errorClass = "TASK_WRITE_FAILED",
-          parameters = Map("path" -> s".*$tableName"),
-          matchPVals = true
-        )
-
-        checkError(
-          exception = intercept[SparkRuntimeException] {
-            sql("ALTER TABLE t DROP PARTITION(c=100000)")
-          },
-          errorClass = "EXCEED_LIMIT_LENGTH",
-          parameters = Map("limit" -> "5")
-        )
+        assertLengthCheckFailure(s"INSERT OVERWRITE $tableName VALUES ('1', 
100000)")
+        assertLengthCheckFailure("ALTER TABLE t DROP PARTITION(c=100000)")
       }
     }
   }
@@ -1087,24 +963,8 @@ class DSV2CharVarcharTestSuite extends 
CharVarcharTestSuite
           sql(s"ALTER TABLE t DROP PARTITION(c=$v)")
           checkAnswer(spark.table("t"), Nil)
         }
-
-        val e1 = intercept[Exception](sql(s"INSERT OVERWRITE t VALUES ('1', 
100000)"))
-        checkError(
-          exception = e1 match {
-            case c: SparkRuntimeException => c
-            case c: SparkException => 
c.getCause.asInstanceOf[SparkRuntimeException]
-          },
-          errorClass = "EXCEED_LIMIT_LENGTH",
-          parameters = Map("limit" -> "5")
-        )
-
-        checkError(
-          exception = intercept[SparkRuntimeException] {
-            sql("ALTER TABLE t DROP PARTITION(c=100000)")
-          },
-          errorClass = "EXCEED_LIMIT_LENGTH",
-          parameters = Map("limit" -> "5")
-        )
+        assertLengthCheckFailure(s"INSERT OVERWRITE t VALUES ('1', 100000)")
+        assertLengthCheckFailure("ALTER TABLE t DROP PARTITION(c=100000)")
       }
     }
   }
@@ -1113,16 +973,8 @@ class DSV2CharVarcharTestSuite extends 
CharVarcharTestSuite
     Seq("CHAR(5)", "VARCHAR(5)").foreach { typ =>
       withTable("t") {
         sql(s"CREATE TABLE t(s STRUCT<n_c: $typ, n_i: INT>) USING $format")
-
         val inputDF = sql("SELECT named_struct('n_i', 1, 'n_c', '123456') AS 
s")
-
-        checkError(
-          exception = intercept[SparkRuntimeException] {
-            inputDF.writeTo("t").append()
-          },
-          errorClass = "EXCEED_LIMIT_LENGTH",
-          parameters = Map("limit" -> "5")
-        )
+        assertLengthCheckFailure(() => inputDF.writeTo("t").append())
       }
     }
   }
@@ -1131,18 +983,8 @@ class DSV2CharVarcharTestSuite extends 
CharVarcharTestSuite
     Seq("CHAR(5)", "VARCHAR(5)").foreach { typ =>
       withTable("t") {
         sql(s"CREATE TABLE t(a ARRAY<STRUCT<n_c: $typ, n_i: INT>>) USING 
$format")
-
         val inputDF = sql("SELECT array(named_struct('n_i', 1, 'n_c', 
'123456')) AS a")
-
-        val e = intercept[Exception](inputDF.writeTo("t").append())
-        checkError(
-          exception = e match {
-            case c: SparkRuntimeException => c
-            case c: SparkException => 
c.getCause.asInstanceOf[SparkRuntimeException]
-          },
-          errorClass = "EXCEED_LIMIT_LENGTH",
-          parameters = Map("limit" -> "5")
-        )
+        assertLengthCheckFailure(() => inputDF.writeTo("t").append())
       }
     }
   }
@@ -1151,18 +993,8 @@ class DSV2CharVarcharTestSuite extends 
CharVarcharTestSuite
     Seq("CHAR(5)", "VARCHAR(5)").foreach { typ =>
       withTable("t") {
         sql(s"CREATE TABLE t(m MAP<STRUCT<n_c: $typ, n_i: INT>, INT>) USING 
$format")
-
         val inputDF = sql("SELECT map(named_struct('n_i', 1, 'n_c', '123456'), 
1) AS m")
-
-        val e = intercept[Exception](inputDF.writeTo("t").append())
-        checkError(
-          exception = e match {
-            case c: SparkRuntimeException => c
-            case c: SparkException => 
c.getCause.asInstanceOf[SparkRuntimeException]
-          },
-          errorClass = "EXCEED_LIMIT_LENGTH",
-          parameters = Map("limit" -> "5")
-        )
+        assertLengthCheckFailure(() => inputDF.writeTo("t").append())
       }
     }
   }
@@ -1171,18 +1003,8 @@ class DSV2CharVarcharTestSuite extends 
CharVarcharTestSuite
     Seq("CHAR(5)", "VARCHAR(5)").foreach { typ =>
       withTable("t") {
         sql(s"CREATE TABLE t(m MAP<INT, STRUCT<n_c: $typ, n_i: INT>>) USING 
$format")
-
         val inputDF = sql("SELECT map(1, named_struct('n_i', 1, 'n_c', 
'123456')) AS m")
-
-        val e = intercept[Exception](inputDF.writeTo("t").append())
-        checkError(
-          exception = e match {
-            case c: SparkRuntimeException => c
-            case c: SparkException => 
c.getCause.asInstanceOf[SparkRuntimeException]
-          },
-          errorClass = "EXCEED_LIMIT_LENGTH",
-          parameters = Map("limit" -> "5")
-        )
+        assertLengthCheckFailure(() => inputDF.writeTo("t").append())
       }
     }
   }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionAnsiErrorsSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionAnsiErrorsSuite.scala
index 3927d6373b4a..a0b4d345628e 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionAnsiErrorsSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionAnsiErrorsSuite.scala
@@ -251,12 +251,10 @@ class QueryExecutionAnsiErrorsSuite extends QueryTest
       val tableName = "overflowTable"
       withTable(tableName) {
         sql(s"CREATE TABLE $tableName(i $targetType) USING parquet")
-        val ex = intercept[SparkException] {
-          sql(s"insert into $tableName values 12345678901234567890D")
-        }
-        assert(ex.getErrorClass == "TASK_WRITE_FAILED")
         checkError(
-          exception = ex.getCause.asInstanceOf[SparkArithmeticException],
+          exception = intercept[SparkArithmeticException] {
+            sql(s"insert into $tableName values 12345678901234567890D")
+          },
           errorClass = "CAST_OVERFLOW_IN_TABLE_INSERT",
           parameters = Map(
             "sourceType" -> "\"DOUBLE\"",
@@ -289,12 +287,10 @@ class QueryExecutionAnsiErrorsSuite extends QueryTest
       sql("CREATE TABLE t2 (x tinyint) USING parquet")
       val insertCmd = "insert into t2 select 0 - (case when x = 
1.2345678901234567E19D " +
         "then 1.2345678901234567E19D else x end) from t1 where x = 
1.2345678901234567E19D;"
-      val ex = intercept[SparkException] {
-        sql(insertCmd).collect()
-      }
-      assert(ex.getErrorClass == "TASK_WRITE_FAILED")
       checkError(
-        exception = ex.getCause.asInstanceOf[SparkArithmeticException],
+        exception = intercept[SparkArithmeticException] {
+          sql(insertCmd).collect()
+        },
         errorClass = "CAST_OVERFLOW",
         parameters = Map("value" -> "-1.2345678901234567E19D",
           "sourceType" -> "\"DOUBLE\"",
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
index c2b854777825..93698fdd7bc0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
@@ -771,12 +771,10 @@ class InsertSuite extends DataSourceTest with 
SharedSparkSession {
         sql("create table t(b int) using parquet")
 
         val outOfRangeValue1 = (Int.MaxValue + 1L).toString
-        val e1 = intercept[SparkException] {
-          sql(s"insert into t values($outOfRangeValue1)")
-        }
-        assert(e1.getErrorClass == "TASK_WRITE_FAILED")
         checkError(
-          exception = e1.getCause.asInstanceOf[SparkArithmeticException],
+          exception = intercept[SparkArithmeticException] {
+            sql(s"insert into t values($outOfRangeValue1)")
+          },
           errorClass = "CAST_OVERFLOW_IN_TABLE_INSERT",
           parameters = Map(
             "sourceType" -> "\"BIGINT\"",
@@ -784,12 +782,10 @@ class InsertSuite extends DataSourceTest with 
SharedSparkSession {
             "columnName" -> "`b`"))
 
         val outOfRangeValue2 = (Int.MinValue - 1L).toString
-        val e2 = intercept[SparkException] {
-          sql(s"insert into t values($outOfRangeValue2)")
-        }
-        assert(e2.getErrorClass == "TASK_WRITE_FAILED")
         checkError(
-          exception = e2.getCause.asInstanceOf[SparkArithmeticException],
+          exception = intercept[SparkArithmeticException] {
+            sql(s"insert into t values($outOfRangeValue2)")
+          },
           errorClass = "CAST_OVERFLOW_IN_TABLE_INSERT",
           parameters = Map(
             "sourceType" -> "\"BIGINT\"",
@@ -806,12 +802,10 @@ class InsertSuite extends DataSourceTest with 
SharedSparkSession {
         sql("create table t(b long) using parquet")
 
         val outOfRangeValue1 = Math.nextUp(Long.MaxValue)
-        val e1 = intercept[SparkException] {
-          sql(s"insert into t values(${outOfRangeValue1}D)")
-        }
-        assert(e1.getErrorClass == "TASK_WRITE_FAILED")
         checkError(
-          exception = e1.getCause.asInstanceOf[SparkArithmeticException],
+          exception = intercept[SparkArithmeticException] {
+            sql(s"insert into t values(${outOfRangeValue1}D)")
+          },
           errorClass = "CAST_OVERFLOW_IN_TABLE_INSERT",
           parameters = Map(
             "sourceType" -> "\"DOUBLE\"",
@@ -819,12 +813,10 @@ class InsertSuite extends DataSourceTest with 
SharedSparkSession {
             "columnName" -> "`b`"))
 
         val outOfRangeValue2 = Math.nextDown(Long.MinValue)
-        val e2 = intercept[SparkException] {
-          sql(s"insert into t values(${outOfRangeValue2}D)")
-        }
-        assert(e2.getErrorClass == "TASK_WRITE_FAILED")
         checkError(
-          exception = e2.getCause.asInstanceOf[SparkArithmeticException],
+          exception = intercept[SparkArithmeticException] {
+            sql(s"insert into t values(${outOfRangeValue2}D)")
+          },
           errorClass = "CAST_OVERFLOW_IN_TABLE_INSERT",
           parameters = Map(
             "sourceType" -> "\"DOUBLE\"",
@@ -840,12 +832,10 @@ class InsertSuite extends DataSourceTest with 
SharedSparkSession {
       withTable("t") {
         sql("create table t(b decimal(3,2)) using parquet")
         val outOfRangeValue = "123.45"
-        val ex = intercept[SparkException] {
-          sql(s"insert into t values($outOfRangeValue)")
-        }
-        assert(ex.getErrorClass == "TASK_WRITE_FAILED")
         checkError(
-          exception = ex.getCause.asInstanceOf[SparkArithmeticException],
+          exception = intercept[SparkArithmeticException] {
+            sql(s"insert into t values($outOfRangeValue)")
+          },
           errorClass = "CAST_OVERFLOW_IN_TABLE_INSERT",
           parameters = Map(
             "sourceType" -> "\"DECIMAL(5,2)\"",
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/HiveCharVarcharTestSuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/HiveCharVarcharTestSuite.scala
index 7312bbb6238d..c12d727e5974 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/HiveCharVarcharTestSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/HiveCharVarcharTestSuite.scala
@@ -17,7 +17,6 @@
 
 package org.apache.spark.sql
 
-import org.apache.spark.{SparkException, SparkRuntimeException}
 import org.apache.spark.sql.execution.command.CharVarcharDDLTestBase
 import org.apache.spark.sql.hive.test.TestHiveSingleton
 
@@ -86,23 +85,8 @@ class HiveCharVarcharTestSuite extends CharVarcharTestSuite 
with TestHiveSinglet
           sql(s"ALTER TABLE $tableName DROP PARTITION(c=$v)")
           checkAnswer(spark.table(tableName), Nil)
         }
-
-        checkError(
-          exception = intercept[SparkException] {
-            sql(s"INSERT OVERWRITE $tableName VALUES ('1', 100000)")
-          },
-          errorClass = "TASK_WRITE_FAILED",
-          parameters = Map("path" -> s".*$tableName.*"),
-          matchPVals = true
-        )
-
-        checkError(
-          exception = intercept[SparkRuntimeException] {
-            sql("ALTER TABLE t DROP PARTITION(c=100000)")
-          },
-          errorClass = "EXCEED_LIMIT_LENGTH",
-          parameters = Map("limit" -> "5")
-        )
+        assertLengthCheckFailure(s"INSERT OVERWRITE $tableName VALUES ('1', 
100000)")
+        assertLengthCheckFailure("ALTER TABLE t DROP PARTITION(c=100000)")
       }
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to