This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new ba98b7ae53f8 [SPARK-47689][SQL] Do not wrap query execution error during data writing ba98b7ae53f8 is described below commit ba98b7ae53f8b106cb0cfd8f0b3fe1a8068c1ce6 Author: Wenchen Fan <wenc...@databricks.com> AuthorDate: Tue Apr 2 20:34:32 2024 +0800 [SPARK-47689][SQL] Do not wrap query execution error during data writing ### What changes were proposed in this pull request? It's quite confusing to report `TASK_WRITE_FAILED` error when the error was caused by input query execution. This PR updates the error wrapping code to not wrap with `TASK_WRITE_FAILED` if the error was from input query execution. ### Why are the changes needed? better error reporting ### Does this PR introduce _any_ user-facing change? yes, now people won't see `TASK_WRITE_FAILED` error if the error was from input query. ### How was this patch tested? updated tests ### Was this patch authored or co-authored using generative AI tooling? no Closes #45797 from cloud-fan/write-error. Authored-by: Wenchen Fan <wenc...@databricks.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../execution/datasources/FileFormatWriter.scala | 29 ++- .../apache/spark/sql/CharVarcharTestSuite.scala | 256 ++++----------------- .../sql/errors/QueryExecutionAnsiErrorsSuite.scala | 16 +- .../org/apache/spark/sql/sources/InsertSuite.scala | 40 ++-- .../spark/sql/HiveCharVarcharTestSuite.scala | 20 +- 5 files changed, 89 insertions(+), 272 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala index 9dbadbd97ec7..1df63aa14b4b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala @@ -41,7 +41,7 @@ import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution.{ProjectExec, SortExec, SparkPlan, SQLExecution, UnsafeExternalRowSorter} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.util.{SerializableConfiguration, Utils} +import org.apache.spark.util.{NextIterator, SerializableConfiguration, Utils} import org.apache.spark.util.ArrayImplicits._ @@ -401,9 +401,10 @@ object FileFormatWriter extends Logging { } try { + val queryFailureCapturedIterator = new QueryFailureCapturedIterator(iterator) Utils.tryWithSafeFinallyAndFailureCallbacks(block = { // Execute the task to write rows out and commit the task. - dataWriter.writeWithIterator(iterator) + dataWriter.writeWithIterator(queryFailureCapturedIterator) dataWriter.commit() })(catchBlock = { // If there is an error, abort the task @@ -413,6 +414,8 @@ object FileFormatWriter extends Logging { dataWriter.close() }) } catch { + case e: QueryFailureDuringWrite => + throw e.queryFailure case e: FetchFailedException => throw e case f: FileAlreadyExistsException if SQLConf.get.fastFailFileFormatOutput => @@ -452,3 +455,25 @@ object FileFormatWriter extends Logging { } } } + +// A exception wrapper to indicate that the error was thrown when executing the query, not writing +// the data +private class QueryFailureDuringWrite(val queryFailure: Throwable) extends Throwable + +// An iterator wrapper to rethrow any error from the given iterator with `QueryFailureDuringWrite`. +private class QueryFailureCapturedIterator(data: Iterator[InternalRow]) + extends NextIterator[InternalRow] { + + override protected def getNext(): InternalRow = try { + if (data.hasNext) { + data.next() + } else { + finished = true + null + } + } catch { + case t: Throwable => throw new QueryFailureDuringWrite(t) + } + + override protected def close(): Unit = {} +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala index 7e845e69c772..013177425da7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql -import org.apache.spark.{SparkConf, SparkException, SparkRuntimeException} +import org.apache.spark.{SparkConf, SparkRuntimeException} import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.catalyst.plans.logical.Project @@ -60,6 +60,18 @@ trait CharVarcharTestSuite extends QueryTest with SQLTestUtils { } } + def assertLengthCheckFailure(query: String): Unit = { + assertLengthCheckFailure(() => sql(query)) + } + + def assertLengthCheckFailure(func: () => Unit): Unit = { + checkError( + exception = intercept[SparkRuntimeException](func()), + errorClass = "EXCEED_LIMIT_LENGTH", + parameters = Map("limit" -> "5") + ) + } + test("apply char padding/trimming and varchar trimming: top-level columns") { Seq("CHAR(5)", "VARCHAR(5)").foreach { typ => withTable("t") { @@ -147,28 +159,12 @@ trait CharVarcharTestSuite extends QueryTest with SQLTestUtils { withTable("t") { sql(s"CREATE TABLE t(i STRING, c $typ) USING $format PARTITIONED BY (c)") Seq("ADD", "DROP").foreach { op => - checkError( - exception = intercept[SparkRuntimeException] { - sql(s"ALTER TABLE t $op PARTITION(c='abcdef')") - }, - errorClass = "EXCEED_LIMIT_LENGTH", - parameters = Map("limit" -> "5") - ) + assertLengthCheckFailure(s"ALTER TABLE t $op PARTITION(c='abcdef')") } - checkError( - exception = intercept[SparkRuntimeException] { - sql(s"ALTER TABLE t PARTITION (c='abcdef') RENAME TO PARTITION (c='2')") - }, - errorClass = "EXCEED_LIMIT_LENGTH", - parameters = Map("limit" -> "5") - ) - checkError( - exception = intercept[SparkRuntimeException] { - sql(s"ALTER TABLE t PARTITION (c='1') RENAME TO PARTITION (c='abcdef')") - }, - errorClass = "EXCEED_LIMIT_LENGTH", - parameters = Map("limit" -> "5") - ) + assertLengthCheckFailure( + "ALTER TABLE t PARTITION (c='abcdef') RENAME TO PARTITION (c='2')") + assertLengthCheckFailure( + "ALTER TABLE t PARTITION (c='1') RENAME TO PARTITION (c='abcdef')") } } } @@ -315,17 +311,7 @@ trait CharVarcharTestSuite extends QueryTest with SQLTestUtils { sql(s"CREATE TABLE t(c $typeName(5)) USING $format") sql("INSERT INTO t VALUES (null)") checkAnswer(spark.table("t"), Row(null)) - val ex = intercept[Exception] { - sql("INSERT INTO t VALUES ('123456')") - } - checkError( - exception = ex match { - case c: SparkRuntimeException => c - case c: SparkException => c.getCause.asInstanceOf[SparkRuntimeException] - }, - errorClass = "EXCEED_LIMIT_LENGTH", - parameters = Map("limit" -> "5") - ) + assertLengthCheckFailure("INSERT INTO t VALUES ('123456')") } } @@ -337,14 +323,7 @@ trait CharVarcharTestSuite extends QueryTest with SQLTestUtils { sql(s"CREATE TABLE $tableName(i INT, c $typeName(5)) USING $format PARTITIONED BY (c)") sql(s"INSERT INTO $tableName VALUES (1, null)") checkAnswer(spark.table(tableName), Row(1, null)) - checkError( - exception = intercept[SparkException] { - sql(s"INSERT INTO $tableName VALUES (1, '123456')") - }, - errorClass = "TASK_WRITE_FAILED", - parameters = Map("path" -> s".*$tableName.*"), - matchPVals = true - ) + assertLengthCheckFailure(s"INSERT INTO $tableName VALUES (1, '123456')") } } } @@ -354,13 +333,7 @@ trait CharVarcharTestSuite extends QueryTest with SQLTestUtils { sql(s"CREATE TABLE t(c STRUCT<c: $typeName(5)>) USING $format") sql("INSERT INTO t SELECT struct(null)") checkAnswer(spark.table("t"), Row(Row(null))) - checkError( - exception = intercept[SparkRuntimeException] { - sql("INSERT INTO t SELECT struct('123456')") - }, - errorClass = "EXCEED_LIMIT_LENGTH", - parameters = Map("limit" -> "5") - ) + assertLengthCheckFailure("INSERT INTO t SELECT struct('123456')") } } @@ -369,32 +342,14 @@ trait CharVarcharTestSuite extends QueryTest with SQLTestUtils { sql(s"CREATE TABLE t(c ARRAY<$typeName(5)>) USING $format") sql("INSERT INTO t VALUES (array(null))") checkAnswer(spark.table("t"), Row(Seq(null))) - val e = intercept[Exception] { - sql("INSERT INTO t VALUES (array('a', '123456'))") - } - checkError( - exception = e match { - case c: SparkRuntimeException => c - case c: SparkException => c.getCause.asInstanceOf[SparkRuntimeException] - }, - errorClass = "EXCEED_LIMIT_LENGTH", - parameters = Map("limit" -> "5") - ) + assertLengthCheckFailure("INSERT INTO t VALUES (array('a', '123456'))") } } test("length check for input string values: nested in map key") { testTableWrite { typeName => sql(s"CREATE TABLE t(c MAP<$typeName(5), STRING>) USING $format") - val e = intercept[Exception](sql("INSERT INTO t VALUES (map('123456', 'a'))")) - checkError( - exception = e match { - case c: SparkRuntimeException => c - case c: SparkException => c.getCause.asInstanceOf[SparkRuntimeException] - }, - errorClass = "EXCEED_LIMIT_LENGTH", - parameters = Map("limit" -> "5") - ) + assertLengthCheckFailure("INSERT INTO t VALUES (map('123456', 'a'))") } } @@ -403,39 +358,15 @@ trait CharVarcharTestSuite extends QueryTest with SQLTestUtils { sql(s"CREATE TABLE t(c MAP<STRING, $typeName(5)>) USING $format") sql("INSERT INTO t VALUES (map('a', null))") checkAnswer(spark.table("t"), Row(Map("a" -> null))) - val e = intercept[Exception](sql("INSERT INTO t VALUES (map('a', '123456'))")) - checkError( - exception = e match { - case c: SparkRuntimeException => c - case c: SparkException => c.getCause.asInstanceOf[SparkRuntimeException] - }, - errorClass = "EXCEED_LIMIT_LENGTH", - parameters = Map("limit" -> "5") - ) + assertLengthCheckFailure("INSERT INTO t VALUES (map('a', '123456'))") } } test("length check for input string values: nested in both map key and value") { testTableWrite { typeName => sql(s"CREATE TABLE t(c MAP<$typeName(5), $typeName(5)>) USING $format") - val e1 = intercept[Exception](sql("INSERT INTO t VALUES (map('123456', 'a'))")) - checkError( - exception = e1 match { - case c: SparkRuntimeException => c - case c: SparkException => c.getCause.asInstanceOf[SparkRuntimeException] - }, - errorClass = "EXCEED_LIMIT_LENGTH", - parameters = Map("limit" -> "5") - ) - val e2 = intercept[Exception](sql("INSERT INTO t VALUES (map('a', '123456'))")) - checkError( - exception = e2 match { - case c: SparkRuntimeException => c - case c: SparkException => c.getCause.asInstanceOf[SparkRuntimeException] - }, - errorClass = "EXCEED_LIMIT_LENGTH", - parameters = Map("limit" -> "5") - ) + assertLengthCheckFailure("INSERT INTO t VALUES (map('123456', 'a'))") + assertLengthCheckFailure("INSERT INTO t VALUES (map('a', '123456'))") } } @@ -444,15 +375,7 @@ trait CharVarcharTestSuite extends QueryTest with SQLTestUtils { sql(s"CREATE TABLE t(c STRUCT<c: ARRAY<$typeName(5)>>) USING $format") sql("INSERT INTO t SELECT struct(array(null))") checkAnswer(spark.table("t"), Row(Row(Seq(null)))) - val e = intercept[Exception](sql("INSERT INTO t SELECT struct(array('123456'))")) - checkError( - exception = e match { - case c: SparkRuntimeException => c - case c: SparkException => c.getCause.asInstanceOf[SparkRuntimeException] - }, - errorClass = "EXCEED_LIMIT_LENGTH", - parameters = Map("limit" -> "5") - ) + assertLengthCheckFailure("INSERT INTO t SELECT struct(array('123456'))") } } @@ -461,15 +384,7 @@ trait CharVarcharTestSuite extends QueryTest with SQLTestUtils { sql(s"CREATE TABLE t(c ARRAY<STRUCT<c: $typeName(5)>>) USING $format") sql("INSERT INTO t VALUES (array(struct(null)))") checkAnswer(spark.table("t"), Row(Seq(Row(null)))) - val e = intercept[Exception](sql("INSERT INTO t VALUES (array(struct('123456')))")) - checkError( - exception = e match { - case c: SparkRuntimeException => c - case c: SparkException => c.getCause.asInstanceOf[SparkRuntimeException] - }, - errorClass = "EXCEED_LIMIT_LENGTH", - parameters = Map("limit" -> "5") - ) + assertLengthCheckFailure("INSERT INTO t VALUES (array(struct('123456')))") } } @@ -478,15 +393,7 @@ trait CharVarcharTestSuite extends QueryTest with SQLTestUtils { sql(s"CREATE TABLE t(c ARRAY<ARRAY<$typeName(5)>>) USING $format") sql("INSERT INTO t VALUES (array(array(null)))") checkAnswer(spark.table("t"), Row(Seq(Seq(null)))) - val e = intercept[Exception](sql("INSERT INTO t VALUES (array(array('123456')))")) - checkError( - exception = e match { - case c: SparkRuntimeException => c - case c: SparkException => c.getCause.asInstanceOf[SparkRuntimeException] - }, - errorClass = "EXCEED_LIMIT_LENGTH", - parameters = Map("limit" -> "5") - ) + assertLengthCheckFailure("INSERT INTO t VALUES (array(array('123456')))") } } @@ -506,24 +413,8 @@ trait CharVarcharTestSuite extends QueryTest with SQLTestUtils { sql(s"CREATE TABLE t(c1 CHAR(5), c2 VARCHAR(5)) USING $format") sql("INSERT INTO t VALUES (1234, 1234)") checkAnswer(spark.table("t"), Row("1234 ", "1234")) - val e1 = intercept[Exception](sql("INSERT INTO t VALUES (123456, 1)")) - checkError( - exception = e1 match { - case c: SparkRuntimeException => c - case c: SparkException => c.getCause.asInstanceOf[SparkRuntimeException] - }, - errorClass = "EXCEED_LIMIT_LENGTH", - parameters = Map("limit" -> "5") - ) - val e2 = intercept[Exception](sql("INSERT INTO t VALUES (1, 123456)")) - checkError( - exception = e2 match { - case c: SparkRuntimeException => c - case c: SparkException => c.getCause.asInstanceOf[SparkRuntimeException] - }, - errorClass = "EXCEED_LIMIT_LENGTH", - parameters = Map("limit" -> "5") - ) + assertLengthCheckFailure("INSERT INTO t VALUES (123456, 1)") + assertLengthCheckFailure("INSERT INTO t VALUES (1, 123456)") } } @@ -1046,23 +937,8 @@ class FileSourceCharVarcharTestSuite extends CharVarcharTestSuite with SharedSpa sql(s"ALTER TABLE $tableName DROP PARTITION(c=$v)") checkAnswer(spark.table(tableName), Nil) } - - checkError( - exception = intercept[SparkException] { - sql(s"INSERT OVERWRITE $tableName VALUES ('1', 100000)") - }, - errorClass = "TASK_WRITE_FAILED", - parameters = Map("path" -> s".*$tableName"), - matchPVals = true - ) - - checkError( - exception = intercept[SparkRuntimeException] { - sql("ALTER TABLE t DROP PARTITION(c=100000)") - }, - errorClass = "EXCEED_LIMIT_LENGTH", - parameters = Map("limit" -> "5") - ) + assertLengthCheckFailure(s"INSERT OVERWRITE $tableName VALUES ('1', 100000)") + assertLengthCheckFailure("ALTER TABLE t DROP PARTITION(c=100000)") } } } @@ -1087,24 +963,8 @@ class DSV2CharVarcharTestSuite extends CharVarcharTestSuite sql(s"ALTER TABLE t DROP PARTITION(c=$v)") checkAnswer(spark.table("t"), Nil) } - - val e1 = intercept[Exception](sql(s"INSERT OVERWRITE t VALUES ('1', 100000)")) - checkError( - exception = e1 match { - case c: SparkRuntimeException => c - case c: SparkException => c.getCause.asInstanceOf[SparkRuntimeException] - }, - errorClass = "EXCEED_LIMIT_LENGTH", - parameters = Map("limit" -> "5") - ) - - checkError( - exception = intercept[SparkRuntimeException] { - sql("ALTER TABLE t DROP PARTITION(c=100000)") - }, - errorClass = "EXCEED_LIMIT_LENGTH", - parameters = Map("limit" -> "5") - ) + assertLengthCheckFailure(s"INSERT OVERWRITE t VALUES ('1', 100000)") + assertLengthCheckFailure("ALTER TABLE t DROP PARTITION(c=100000)") } } } @@ -1113,16 +973,8 @@ class DSV2CharVarcharTestSuite extends CharVarcharTestSuite Seq("CHAR(5)", "VARCHAR(5)").foreach { typ => withTable("t") { sql(s"CREATE TABLE t(s STRUCT<n_c: $typ, n_i: INT>) USING $format") - val inputDF = sql("SELECT named_struct('n_i', 1, 'n_c', '123456') AS s") - - checkError( - exception = intercept[SparkRuntimeException] { - inputDF.writeTo("t").append() - }, - errorClass = "EXCEED_LIMIT_LENGTH", - parameters = Map("limit" -> "5") - ) + assertLengthCheckFailure(() => inputDF.writeTo("t").append()) } } } @@ -1131,18 +983,8 @@ class DSV2CharVarcharTestSuite extends CharVarcharTestSuite Seq("CHAR(5)", "VARCHAR(5)").foreach { typ => withTable("t") { sql(s"CREATE TABLE t(a ARRAY<STRUCT<n_c: $typ, n_i: INT>>) USING $format") - val inputDF = sql("SELECT array(named_struct('n_i', 1, 'n_c', '123456')) AS a") - - val e = intercept[Exception](inputDF.writeTo("t").append()) - checkError( - exception = e match { - case c: SparkRuntimeException => c - case c: SparkException => c.getCause.asInstanceOf[SparkRuntimeException] - }, - errorClass = "EXCEED_LIMIT_LENGTH", - parameters = Map("limit" -> "5") - ) + assertLengthCheckFailure(() => inputDF.writeTo("t").append()) } } } @@ -1151,18 +993,8 @@ class DSV2CharVarcharTestSuite extends CharVarcharTestSuite Seq("CHAR(5)", "VARCHAR(5)").foreach { typ => withTable("t") { sql(s"CREATE TABLE t(m MAP<STRUCT<n_c: $typ, n_i: INT>, INT>) USING $format") - val inputDF = sql("SELECT map(named_struct('n_i', 1, 'n_c', '123456'), 1) AS m") - - val e = intercept[Exception](inputDF.writeTo("t").append()) - checkError( - exception = e match { - case c: SparkRuntimeException => c - case c: SparkException => c.getCause.asInstanceOf[SparkRuntimeException] - }, - errorClass = "EXCEED_LIMIT_LENGTH", - parameters = Map("limit" -> "5") - ) + assertLengthCheckFailure(() => inputDF.writeTo("t").append()) } } } @@ -1171,18 +1003,8 @@ class DSV2CharVarcharTestSuite extends CharVarcharTestSuite Seq("CHAR(5)", "VARCHAR(5)").foreach { typ => withTable("t") { sql(s"CREATE TABLE t(m MAP<INT, STRUCT<n_c: $typ, n_i: INT>>) USING $format") - val inputDF = sql("SELECT map(1, named_struct('n_i', 1, 'n_c', '123456')) AS m") - - val e = intercept[Exception](inputDF.writeTo("t").append()) - checkError( - exception = e match { - case c: SparkRuntimeException => c - case c: SparkException => c.getCause.asInstanceOf[SparkRuntimeException] - }, - errorClass = "EXCEED_LIMIT_LENGTH", - parameters = Map("limit" -> "5") - ) + assertLengthCheckFailure(() => inputDF.writeTo("t").append()) } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionAnsiErrorsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionAnsiErrorsSuite.scala index 3927d6373b4a..a0b4d345628e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionAnsiErrorsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionAnsiErrorsSuite.scala @@ -251,12 +251,10 @@ class QueryExecutionAnsiErrorsSuite extends QueryTest val tableName = "overflowTable" withTable(tableName) { sql(s"CREATE TABLE $tableName(i $targetType) USING parquet") - val ex = intercept[SparkException] { - sql(s"insert into $tableName values 12345678901234567890D") - } - assert(ex.getErrorClass == "TASK_WRITE_FAILED") checkError( - exception = ex.getCause.asInstanceOf[SparkArithmeticException], + exception = intercept[SparkArithmeticException] { + sql(s"insert into $tableName values 12345678901234567890D") + }, errorClass = "CAST_OVERFLOW_IN_TABLE_INSERT", parameters = Map( "sourceType" -> "\"DOUBLE\"", @@ -289,12 +287,10 @@ class QueryExecutionAnsiErrorsSuite extends QueryTest sql("CREATE TABLE t2 (x tinyint) USING parquet") val insertCmd = "insert into t2 select 0 - (case when x = 1.2345678901234567E19D " + "then 1.2345678901234567E19D else x end) from t1 where x = 1.2345678901234567E19D;" - val ex = intercept[SparkException] { - sql(insertCmd).collect() - } - assert(ex.getErrorClass == "TASK_WRITE_FAILED") checkError( - exception = ex.getCause.asInstanceOf[SparkArithmeticException], + exception = intercept[SparkArithmeticException] { + sql(insertCmd).collect() + }, errorClass = "CAST_OVERFLOW", parameters = Map("value" -> "-1.2345678901234567E19D", "sourceType" -> "\"DOUBLE\"", diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala index c2b854777825..93698fdd7bc0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala @@ -771,12 +771,10 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { sql("create table t(b int) using parquet") val outOfRangeValue1 = (Int.MaxValue + 1L).toString - val e1 = intercept[SparkException] { - sql(s"insert into t values($outOfRangeValue1)") - } - assert(e1.getErrorClass == "TASK_WRITE_FAILED") checkError( - exception = e1.getCause.asInstanceOf[SparkArithmeticException], + exception = intercept[SparkArithmeticException] { + sql(s"insert into t values($outOfRangeValue1)") + }, errorClass = "CAST_OVERFLOW_IN_TABLE_INSERT", parameters = Map( "sourceType" -> "\"BIGINT\"", @@ -784,12 +782,10 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { "columnName" -> "`b`")) val outOfRangeValue2 = (Int.MinValue - 1L).toString - val e2 = intercept[SparkException] { - sql(s"insert into t values($outOfRangeValue2)") - } - assert(e2.getErrorClass == "TASK_WRITE_FAILED") checkError( - exception = e2.getCause.asInstanceOf[SparkArithmeticException], + exception = intercept[SparkArithmeticException] { + sql(s"insert into t values($outOfRangeValue2)") + }, errorClass = "CAST_OVERFLOW_IN_TABLE_INSERT", parameters = Map( "sourceType" -> "\"BIGINT\"", @@ -806,12 +802,10 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { sql("create table t(b long) using parquet") val outOfRangeValue1 = Math.nextUp(Long.MaxValue) - val e1 = intercept[SparkException] { - sql(s"insert into t values(${outOfRangeValue1}D)") - } - assert(e1.getErrorClass == "TASK_WRITE_FAILED") checkError( - exception = e1.getCause.asInstanceOf[SparkArithmeticException], + exception = intercept[SparkArithmeticException] { + sql(s"insert into t values(${outOfRangeValue1}D)") + }, errorClass = "CAST_OVERFLOW_IN_TABLE_INSERT", parameters = Map( "sourceType" -> "\"DOUBLE\"", @@ -819,12 +813,10 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { "columnName" -> "`b`")) val outOfRangeValue2 = Math.nextDown(Long.MinValue) - val e2 = intercept[SparkException] { - sql(s"insert into t values(${outOfRangeValue2}D)") - } - assert(e2.getErrorClass == "TASK_WRITE_FAILED") checkError( - exception = e2.getCause.asInstanceOf[SparkArithmeticException], + exception = intercept[SparkArithmeticException] { + sql(s"insert into t values(${outOfRangeValue2}D)") + }, errorClass = "CAST_OVERFLOW_IN_TABLE_INSERT", parameters = Map( "sourceType" -> "\"DOUBLE\"", @@ -840,12 +832,10 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { withTable("t") { sql("create table t(b decimal(3,2)) using parquet") val outOfRangeValue = "123.45" - val ex = intercept[SparkException] { - sql(s"insert into t values($outOfRangeValue)") - } - assert(ex.getErrorClass == "TASK_WRITE_FAILED") checkError( - exception = ex.getCause.asInstanceOf[SparkArithmeticException], + exception = intercept[SparkArithmeticException] { + sql(s"insert into t values($outOfRangeValue)") + }, errorClass = "CAST_OVERFLOW_IN_TABLE_INSERT", parameters = Map( "sourceType" -> "\"DECIMAL(5,2)\"", diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/HiveCharVarcharTestSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/HiveCharVarcharTestSuite.scala index 7312bbb6238d..c12d727e5974 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/HiveCharVarcharTestSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/HiveCharVarcharTestSuite.scala @@ -17,7 +17,6 @@ package org.apache.spark.sql -import org.apache.spark.{SparkException, SparkRuntimeException} import org.apache.spark.sql.execution.command.CharVarcharDDLTestBase import org.apache.spark.sql.hive.test.TestHiveSingleton @@ -86,23 +85,8 @@ class HiveCharVarcharTestSuite extends CharVarcharTestSuite with TestHiveSinglet sql(s"ALTER TABLE $tableName DROP PARTITION(c=$v)") checkAnswer(spark.table(tableName), Nil) } - - checkError( - exception = intercept[SparkException] { - sql(s"INSERT OVERWRITE $tableName VALUES ('1', 100000)") - }, - errorClass = "TASK_WRITE_FAILED", - parameters = Map("path" -> s".*$tableName.*"), - matchPVals = true - ) - - checkError( - exception = intercept[SparkRuntimeException] { - sql("ALTER TABLE t DROP PARTITION(c=100000)") - }, - errorClass = "EXCEED_LIMIT_LENGTH", - parameters = Map("limit" -> "5") - ) + assertLengthCheckFailure(s"INSERT OVERWRITE $tableName VALUES ('1', 100000)") + assertLengthCheckFailure("ALTER TABLE t DROP PARTITION(c=100000)") } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org