This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new aaee89a12fd [SPARK-41575][SQL] Assign name to _LEGACY_ERROR_TEMP_2054 aaee89a12fd is described below commit aaee89a12fd9b8ca3c57fa4283a51ce229dd7b71 Author: itholic <haejoon....@databricks.com> AuthorDate: Tue Jan 10 16:25:15 2023 +0300 [SPARK-41575][SQL] Assign name to _LEGACY_ERROR_TEMP_2054 ### What changes were proposed in this pull request? This PR proposes to assign name to _LEGACY_ERROR_TEMP_2054, "TASK_WRITE_FAILED". ### Why are the changes needed? We should assign proper name to _LEGACY_ERROR_TEMP_* ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? `./build/sbt "sql/testOnly org.apache.spark.sql.SQLQueryTestSuite*` Closes #39394 from itholic/LEGACY_2054. Authored-by: itholic <haejoon....@databricks.com> Signed-off-by: Max Gekk <max.g...@gmail.com> --- core/src/main/resources/error/error-classes.json | 10 +-- .../spark/sql/errors/QueryExecutionErrors.scala | 6 +- .../execution/datasources/FileFormatWriter.scala | 2 +- .../apache/spark/sql/CharVarcharTestSuite.scala | 82 +++++++++++++++------- .../org/apache/spark/sql/sources/InsertSuite.scala | 16 +++-- .../spark/sql/HiveCharVarcharTestSuite.scala | 27 +++++++ 6 files changed, 104 insertions(+), 39 deletions(-) diff --git a/core/src/main/resources/error/error-classes.json b/core/src/main/resources/error/error-classes.json index a3acb940585..edf46a0fe09 100644 --- a/core/src/main/resources/error/error-classes.json +++ b/core/src/main/resources/error/error-classes.json @@ -1187,6 +1187,11 @@ ], "sqlState" : "42000" }, + "TASK_WRITE_FAILED" : { + "message" : [ + "Task failed while writing rows to <path>." + ] + }, "TEMP_TABLE_OR_VIEW_ALREADY_EXISTS" : { "message" : [ "Cannot create the temporary view <relationName> because it already exists.", @@ -3728,11 +3733,6 @@ "buildReader is not supported for <format>" ] }, - "_LEGACY_ERROR_TEMP_2054" : { - "message" : [ - "Task failed while writing rows. <message>" - ] - }, "_LEGACY_ERROR_TEMP_2055" : { "message" : [ "<message>", diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala index 17fc38812f8..9598933d941 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala @@ -782,10 +782,10 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase { messageParameters = Map("format" -> format)) } - def taskFailedWhileWritingRowsError(cause: Throwable): Throwable = { + def taskFailedWhileWritingRowsError(path: String, cause: Throwable): Throwable = { new SparkException( - errorClass = "_LEGACY_ERROR_TEMP_2054", - messageParameters = Map("message" -> cause.getMessage), + errorClass = "TASK_WRITE_FAILED", + messageParameters = Map("path" -> path), cause = cause) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala index 6285095c647..5c4d662c145 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala @@ -423,7 +423,7 @@ object FileFormatWriter extends Logging { // We throw the exception and let Executor throw ExceptionFailure to abort the job. throw new TaskOutputFileAlreadyExistException(f) case t: Throwable => - throw QueryExecutionErrors.taskFailedWhileWritingRowsError(t) + throw QueryExecutionErrors.taskFailedWhileWritingRowsError(description.path, t) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala index 95c2e5085d9..c0ceebaa9a6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala @@ -178,26 +178,6 @@ trait CharVarcharTestSuite extends QueryTest with SQLTestUtils { } } - test("char/varchar type values length check: partitioned columns of other types") { - Seq("CHAR(5)", "VARCHAR(5)").foreach { typ => - withTable("t") { - sql(s"CREATE TABLE t(i STRING, c $typ) USING $format PARTITIONED BY (c)") - Seq(1, 10, 100, 1000, 10000).foreach { v => - sql(s"INSERT OVERWRITE t VALUES ('1', $v)") - checkPlainResult(spark.table("t"), typ, v.toString) - sql(s"ALTER TABLE t DROP PARTITION(c=$v)") - checkAnswer(spark.table("t"), Nil) - } - - val e1 = intercept[SparkException](sql(s"INSERT OVERWRITE t VALUES ('1', 100000)")) - assert(e1.getCause.getMessage.contains("Exceeds char/varchar type length limitation: 5")) - - val e2 = intercept[RuntimeException](sql("ALTER TABLE t DROP PARTITION(c=100000)")) - assert(e2.getMessage.contains("Exceeds char/varchar type length limitation: 5")) - } - } - } - test("char type values should be padded: nested in struct") { withTable("t") { sql(s"CREATE TABLE t(i STRING, c STRUCT<c: CHAR(5)>) USING $format") @@ -332,12 +312,18 @@ trait CharVarcharTestSuite extends QueryTest with SQLTestUtils { test("length check for input string values: partitioned columns") { // DS V2 doesn't support partitioned table. if (!conf.contains(SQLConf.DEFAULT_CATALOG.key)) { + val tableName = "t" testTableWrite { typeName => - sql(s"CREATE TABLE t(i INT, c $typeName(5)) USING $format PARTITIONED BY (c)") - sql("INSERT INTO t VALUES (1, null)") - checkAnswer(spark.table("t"), Row(1, null)) - val e = intercept[SparkException](sql("INSERT INTO t VALUES (1, '123456')")) - assert(e.getCause.getMessage.contains(s"Exceeds char/varchar type length limitation: 5")) + sql(s"CREATE TABLE $tableName(i INT, c $typeName(5)) USING $format PARTITIONED BY (c)") + sql(s"INSERT INTO $tableName VALUES (1, null)") + checkAnswer(spark.table(tableName), Row(1, null)) + val e = intercept[SparkException](sql(s"INSERT INTO $tableName VALUES (1, '123456')")) + checkError( + exception = e.getCause.asInstanceOf[SparkException], + errorClass = "TASK_WRITE_FAILED", + parameters = Map("path" -> s".*$tableName.*"), + matchPVals = true + ) } } } @@ -884,6 +870,32 @@ class FileSourceCharVarcharTestSuite extends CharVarcharTestSuite with SharedSpa } } } + + test("char/varchar type values length check: partitioned columns of other types") { + val tableName = "t" + Seq("CHAR(5)", "VARCHAR(5)").foreach { typ => + withTable(tableName) { + sql(s"CREATE TABLE $tableName(i STRING, c $typ) USING $format PARTITIONED BY (c)") + Seq(1, 10, 100, 1000, 10000).foreach { v => + sql(s"INSERT OVERWRITE $tableName VALUES ('1', $v)") + checkPlainResult(spark.table(tableName), typ, v.toString) + sql(s"ALTER TABLE $tableName DROP PARTITION(c=$v)") + checkAnswer(spark.table(tableName), Nil) + } + + val e1 = intercept[SparkException](sql(s"INSERT OVERWRITE $tableName VALUES ('1', 100000)")) + checkError( + exception = e1.getCause.asInstanceOf[SparkException], + errorClass = "TASK_WRITE_FAILED", + parameters = Map("path" -> s".*$tableName"), + matchPVals = true + ) + + val e2 = intercept[RuntimeException](sql("ALTER TABLE t DROP PARTITION(c=100000)")) + assert(e2.getMessage.contains("Exceeds char/varchar type length limitation: 5")) + } + } + } } class DSV2CharVarcharTestSuite extends CharVarcharTestSuite @@ -894,4 +906,24 @@ class DSV2CharVarcharTestSuite extends CharVarcharTestSuite .set("spark.sql.catalog.testcat", classOf[InMemoryPartitionTableCatalog].getName) .set(SQLConf.DEFAULT_CATALOG.key, "testcat") } + + test("char/varchar type values length check: partitioned columns of other types") { + Seq("CHAR(5)", "VARCHAR(5)").foreach { typ => + withTable("t") { + sql(s"CREATE TABLE t(i STRING, c $typ) USING $format PARTITIONED BY (c)") + Seq(1, 10, 100, 1000, 10000).foreach { v => + sql(s"INSERT OVERWRITE t VALUES ('1', $v)") + checkPlainResult(spark.table("t"), typ, v.toString) + sql(s"ALTER TABLE t DROP PARTITION(c=$v)") + checkAnswer(spark.table("t"), Nil) + } + + val e1 = intercept[SparkException](sql(s"INSERT OVERWRITE t VALUES ('1', 100000)")) + assert(e1.getCause.getMessage.contains("Exceeds char/varchar type length limitation: 5")) + + val e2 = intercept[RuntimeException](sql("ALTER TABLE t DROP PARTITION(c=100000)")) + assert(e2.getMessage.contains("Exceeds char/varchar type length limitation: 5")) + } + } + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala index 5df9b2ae598..d544b5fde5c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala @@ -2027,27 +2027,33 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { } test("Stop task set if FileAlreadyExistsException was thrown") { + val tableName = "t" Seq(true, false).foreach { fastFail => withSQLConf("fs.file.impl" -> classOf[FileExistingTestFileSystem].getName, "fs.file.impl.disable.cache" -> "true", SQLConf.FASTFAIL_ON_FILEFORMAT_OUTPUT.key -> fastFail.toString) { - withTable("t") { + withTable(tableName) { sql( - """ - |CREATE TABLE t(i INT, part1 INT) USING PARQUET + s""" + |CREATE TABLE $tableName(i INT, part1 INT) USING PARQUET |PARTITIONED BY (part1) """.stripMargin) val df = Seq((1, 1)).toDF("i", "part1") val err = intercept[SparkException] { - df.write.mode("overwrite").format("parquet").insertInto("t") + df.write.mode("overwrite").format("parquet").insertInto(tableName) } if (fastFail) { assert(err.getMessage.contains("can not write to output file: " + "org.apache.hadoop.fs.FileAlreadyExistsException")) } else { - assert(err.getMessage.contains("Task failed while writing rows")) + checkError( + exception = err.getCause.asInstanceOf[SparkException], + errorClass = "TASK_WRITE_FAILED", + parameters = Map("path" -> s".*$tableName"), + matchPVals = true + ) } } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/HiveCharVarcharTestSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/HiveCharVarcharTestSuite.scala index 182047a8c64..1e7820f0c19 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/HiveCharVarcharTestSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/HiveCharVarcharTestSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql +import org.apache.spark.SparkException import org.apache.spark.sql.execution.command.CharVarcharDDLTestBase import org.apache.spark.sql.hive.test.TestHiveSingleton @@ -73,6 +74,32 @@ class HiveCharVarcharTestSuite extends CharVarcharTestSuite with TestHiveSinglet } } } + + test("char/varchar type values length check: partitioned columns of other types") { + val tableName = "t" + Seq("CHAR(5)", "VARCHAR(5)").foreach { typ => + withTable(tableName) { + sql(s"CREATE TABLE $tableName(i STRING, c $typ) USING $format PARTITIONED BY (c)") + Seq(1, 10, 100, 1000, 10000).foreach { v => + sql(s"INSERT OVERWRITE $tableName VALUES ('1', $v)") + checkPlainResult(spark.table(tableName), typ, v.toString) + sql(s"ALTER TABLE $tableName DROP PARTITION(c=$v)") + checkAnswer(spark.table(tableName), Nil) + } + + val e1 = intercept[SparkException](sql(s"INSERT OVERWRITE $tableName VALUES ('1', 100000)")) + checkError( + exception = e1.getCause.asInstanceOf[SparkException], + errorClass = "TASK_WRITE_FAILED", + parameters = Map("path" -> s".*$tableName.*"), + matchPVals = true + ) + + val e2 = intercept[RuntimeException](sql("ALTER TABLE t DROP PARTITION(c=100000)")) + assert(e2.getMessage.contains("Exceeds char/varchar type length limitation: 5")) + } + } + } } class HiveCharVarcharDDLTestSuite extends CharVarcharDDLTestBase with TestHiveSingleton { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org