This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new aaee89a12fd [SPARK-41575][SQL] Assign name to _LEGACY_ERROR_TEMP_2054
aaee89a12fd is described below

commit aaee89a12fd9b8ca3c57fa4283a51ce229dd7b71
Author: itholic <haejoon....@databricks.com>
AuthorDate: Tue Jan 10 16:25:15 2023 +0300

    [SPARK-41575][SQL] Assign name to _LEGACY_ERROR_TEMP_2054
    
    ### What changes were proposed in this pull request?
    
    This PR proposes to assign name to _LEGACY_ERROR_TEMP_2054, 
"TASK_WRITE_FAILED".
    
    ### Why are the changes needed?
    
    We should assign proper name to _LEGACY_ERROR_TEMP_*
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    `./build/sbt "sql/testOnly org.apache.spark.sql.SQLQueryTestSuite*`
    
    Closes #39394 from itholic/LEGACY_2054.
    
    Authored-by: itholic <haejoon....@databricks.com>
    Signed-off-by: Max Gekk <max.g...@gmail.com>
---
 core/src/main/resources/error/error-classes.json   | 10 +--
 .../spark/sql/errors/QueryExecutionErrors.scala    |  6 +-
 .../execution/datasources/FileFormatWriter.scala   |  2 +-
 .../apache/spark/sql/CharVarcharTestSuite.scala    | 82 +++++++++++++++-------
 .../org/apache/spark/sql/sources/InsertSuite.scala | 16 +++--
 .../spark/sql/HiveCharVarcharTestSuite.scala       | 27 +++++++
 6 files changed, 104 insertions(+), 39 deletions(-)

diff --git a/core/src/main/resources/error/error-classes.json 
b/core/src/main/resources/error/error-classes.json
index a3acb940585..edf46a0fe09 100644
--- a/core/src/main/resources/error/error-classes.json
+++ b/core/src/main/resources/error/error-classes.json
@@ -1187,6 +1187,11 @@
     ],
     "sqlState" : "42000"
   },
+  "TASK_WRITE_FAILED" : {
+    "message" : [
+      "Task failed while writing rows to <path>."
+    ]
+  },
   "TEMP_TABLE_OR_VIEW_ALREADY_EXISTS" : {
     "message" : [
       "Cannot create the temporary view <relationName> because it already 
exists.",
@@ -3728,11 +3733,6 @@
       "buildReader is not supported for <format>"
     ]
   },
-  "_LEGACY_ERROR_TEMP_2054" : {
-    "message" : [
-      "Task failed while writing rows. <message>"
-    ]
-  },
   "_LEGACY_ERROR_TEMP_2055" : {
     "message" : [
       "<message>",
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
index 17fc38812f8..9598933d941 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
@@ -782,10 +782,10 @@ private[sql] object QueryExecutionErrors extends 
QueryErrorsBase {
       messageParameters = Map("format" -> format))
   }
 
-  def taskFailedWhileWritingRowsError(cause: Throwable): Throwable = {
+  def taskFailedWhileWritingRowsError(path: String, cause: Throwable): 
Throwable = {
     new SparkException(
-      errorClass = "_LEGACY_ERROR_TEMP_2054",
-      messageParameters = Map("message" -> cause.getMessage),
+      errorClass = "TASK_WRITE_FAILED",
+      messageParameters = Map("path" -> path),
       cause = cause)
   }
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
index 6285095c647..5c4d662c145 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
@@ -423,7 +423,7 @@ object FileFormatWriter extends Logging {
         // We throw the exception and let Executor throw ExceptionFailure to 
abort the job.
         throw new TaskOutputFileAlreadyExistException(f)
       case t: Throwable =>
-        throw QueryExecutionErrors.taskFailedWhileWritingRowsError(t)
+        throw 
QueryExecutionErrors.taskFailedWhileWritingRowsError(description.path, t)
     }
   }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala
index 95c2e5085d9..c0ceebaa9a6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala
@@ -178,26 +178,6 @@ trait CharVarcharTestSuite extends QueryTest with 
SQLTestUtils {
     }
   }
 
-  test("char/varchar type values length check: partitioned columns of other 
types") {
-    Seq("CHAR(5)", "VARCHAR(5)").foreach { typ =>
-      withTable("t") {
-        sql(s"CREATE TABLE t(i STRING, c $typ) USING $format PARTITIONED BY 
(c)")
-        Seq(1, 10, 100, 1000, 10000).foreach { v =>
-          sql(s"INSERT OVERWRITE t VALUES ('1', $v)")
-          checkPlainResult(spark.table("t"), typ, v.toString)
-          sql(s"ALTER TABLE t DROP PARTITION(c=$v)")
-          checkAnswer(spark.table("t"), Nil)
-        }
-
-        val e1 = intercept[SparkException](sql(s"INSERT OVERWRITE t VALUES 
('1', 100000)"))
-        assert(e1.getCause.getMessage.contains("Exceeds char/varchar type 
length limitation: 5"))
-
-        val e2 = intercept[RuntimeException](sql("ALTER TABLE t DROP 
PARTITION(c=100000)"))
-        assert(e2.getMessage.contains("Exceeds char/varchar type length 
limitation: 5"))
-      }
-    }
-  }
-
   test("char type values should be padded: nested in struct") {
     withTable("t") {
       sql(s"CREATE TABLE t(i STRING, c STRUCT<c: CHAR(5)>) USING $format")
@@ -332,12 +312,18 @@ trait CharVarcharTestSuite extends QueryTest with 
SQLTestUtils {
   test("length check for input string values: partitioned columns") {
     // DS V2 doesn't support partitioned table.
     if (!conf.contains(SQLConf.DEFAULT_CATALOG.key)) {
+      val tableName = "t"
       testTableWrite { typeName =>
-        sql(s"CREATE TABLE t(i INT, c $typeName(5)) USING $format PARTITIONED 
BY (c)")
-        sql("INSERT INTO t VALUES (1, null)")
-        checkAnswer(spark.table("t"), Row(1, null))
-        val e = intercept[SparkException](sql("INSERT INTO t VALUES (1, 
'123456')"))
-        assert(e.getCause.getMessage.contains(s"Exceeds char/varchar type 
length limitation: 5"))
+        sql(s"CREATE TABLE $tableName(i INT, c $typeName(5)) USING $format 
PARTITIONED BY (c)")
+        sql(s"INSERT INTO $tableName VALUES (1, null)")
+        checkAnswer(spark.table(tableName), Row(1, null))
+        val e = intercept[SparkException](sql(s"INSERT INTO $tableName VALUES 
(1, '123456')"))
+        checkError(
+          exception = e.getCause.asInstanceOf[SparkException],
+          errorClass = "TASK_WRITE_FAILED",
+          parameters = Map("path" -> s".*$tableName.*"),
+          matchPVals = true
+        )
       }
     }
   }
@@ -884,6 +870,32 @@ class FileSourceCharVarcharTestSuite extends 
CharVarcharTestSuite with SharedSpa
       }
     }
   }
+
+  test("char/varchar type values length check: partitioned columns of other 
types") {
+    val tableName = "t"
+    Seq("CHAR(5)", "VARCHAR(5)").foreach { typ =>
+      withTable(tableName) {
+        sql(s"CREATE TABLE $tableName(i STRING, c $typ) USING $format 
PARTITIONED BY (c)")
+        Seq(1, 10, 100, 1000, 10000).foreach { v =>
+          sql(s"INSERT OVERWRITE $tableName VALUES ('1', $v)")
+          checkPlainResult(spark.table(tableName), typ, v.toString)
+          sql(s"ALTER TABLE $tableName DROP PARTITION(c=$v)")
+          checkAnswer(spark.table(tableName), Nil)
+        }
+
+        val e1 = intercept[SparkException](sql(s"INSERT OVERWRITE $tableName 
VALUES ('1', 100000)"))
+        checkError(
+          exception = e1.getCause.asInstanceOf[SparkException],
+          errorClass = "TASK_WRITE_FAILED",
+          parameters = Map("path" -> s".*$tableName"),
+          matchPVals = true
+        )
+
+        val e2 = intercept[RuntimeException](sql("ALTER TABLE t DROP 
PARTITION(c=100000)"))
+        assert(e2.getMessage.contains("Exceeds char/varchar type length 
limitation: 5"))
+      }
+    }
+  }
 }
 
 class DSV2CharVarcharTestSuite extends CharVarcharTestSuite
@@ -894,4 +906,24 @@ class DSV2CharVarcharTestSuite extends CharVarcharTestSuite
       .set("spark.sql.catalog.testcat", 
classOf[InMemoryPartitionTableCatalog].getName)
       .set(SQLConf.DEFAULT_CATALOG.key, "testcat")
   }
+
+  test("char/varchar type values length check: partitioned columns of other 
types") {
+    Seq("CHAR(5)", "VARCHAR(5)").foreach { typ =>
+      withTable("t") {
+        sql(s"CREATE TABLE t(i STRING, c $typ) USING $format PARTITIONED BY 
(c)")
+        Seq(1, 10, 100, 1000, 10000).foreach { v =>
+          sql(s"INSERT OVERWRITE t VALUES ('1', $v)")
+          checkPlainResult(spark.table("t"), typ, v.toString)
+          sql(s"ALTER TABLE t DROP PARTITION(c=$v)")
+          checkAnswer(spark.table("t"), Nil)
+        }
+
+        val e1 = intercept[SparkException](sql(s"INSERT OVERWRITE t VALUES 
('1', 100000)"))
+        assert(e1.getCause.getMessage.contains("Exceeds char/varchar type 
length limitation: 5"))
+
+        val e2 = intercept[RuntimeException](sql("ALTER TABLE t DROP 
PARTITION(c=100000)"))
+        assert(e2.getMessage.contains("Exceeds char/varchar type length 
limitation: 5"))
+      }
+    }
+  }
 }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
index 5df9b2ae598..d544b5fde5c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
@@ -2027,27 +2027,33 @@ class InsertSuite extends DataSourceTest with 
SharedSparkSession {
   }
 
   test("Stop task set if FileAlreadyExistsException was thrown") {
+    val tableName = "t"
     Seq(true, false).foreach { fastFail =>
       withSQLConf("fs.file.impl" -> 
classOf[FileExistingTestFileSystem].getName,
         "fs.file.impl.disable.cache" -> "true",
         SQLConf.FASTFAIL_ON_FILEFORMAT_OUTPUT.key -> fastFail.toString) {
-        withTable("t") {
+        withTable(tableName) {
           sql(
-            """
-              |CREATE TABLE t(i INT, part1 INT) USING PARQUET
+            s"""
+              |CREATE TABLE $tableName(i INT, part1 INT) USING PARQUET
               |PARTITIONED BY (part1)
           """.stripMargin)
 
           val df = Seq((1, 1)).toDF("i", "part1")
           val err = intercept[SparkException] {
-            df.write.mode("overwrite").format("parquet").insertInto("t")
+            df.write.mode("overwrite").format("parquet").insertInto(tableName)
           }
 
           if (fastFail) {
             assert(err.getMessage.contains("can not write to output file: " +
               "org.apache.hadoop.fs.FileAlreadyExistsException"))
           } else {
-            assert(err.getMessage.contains("Task failed while writing rows"))
+            checkError(
+              exception = err.getCause.asInstanceOf[SparkException],
+              errorClass = "TASK_WRITE_FAILED",
+              parameters = Map("path" -> s".*$tableName"),
+              matchPVals = true
+            )
           }
         }
       }
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/HiveCharVarcharTestSuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/HiveCharVarcharTestSuite.scala
index 182047a8c64..1e7820f0c19 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/HiveCharVarcharTestSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/HiveCharVarcharTestSuite.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql
 
+import org.apache.spark.SparkException
 import org.apache.spark.sql.execution.command.CharVarcharDDLTestBase
 import org.apache.spark.sql.hive.test.TestHiveSingleton
 
@@ -73,6 +74,32 @@ class HiveCharVarcharTestSuite extends CharVarcharTestSuite 
with TestHiveSinglet
       }
     }
   }
+
+  test("char/varchar type values length check: partitioned columns of other 
types") {
+    val tableName = "t"
+    Seq("CHAR(5)", "VARCHAR(5)").foreach { typ =>
+      withTable(tableName) {
+        sql(s"CREATE TABLE $tableName(i STRING, c $typ) USING $format 
PARTITIONED BY (c)")
+        Seq(1, 10, 100, 1000, 10000).foreach { v =>
+          sql(s"INSERT OVERWRITE $tableName VALUES ('1', $v)")
+          checkPlainResult(spark.table(tableName), typ, v.toString)
+          sql(s"ALTER TABLE $tableName DROP PARTITION(c=$v)")
+          checkAnswer(spark.table(tableName), Nil)
+        }
+
+        val e1 = intercept[SparkException](sql(s"INSERT OVERWRITE $tableName 
VALUES ('1', 100000)"))
+        checkError(
+          exception = e1.getCause.asInstanceOf[SparkException],
+          errorClass = "TASK_WRITE_FAILED",
+          parameters = Map("path" -> s".*$tableName.*"),
+          matchPVals = true
+        )
+
+        val e2 = intercept[RuntimeException](sql("ALTER TABLE t DROP 
PARTITION(c=100000)"))
+        assert(e2.getMessage.contains("Exceeds char/varchar type length 
limitation: 5"))
+      }
+    }
+  }
 }
 
 class HiveCharVarcharDDLTestSuite extends CharVarcharDDLTestBase with 
TestHiveSingleton {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to