This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 43adfa070a40 [SPARK-46850][SQL] Convert `_LEGACY_ERROR_TEMP_2102 ` to 
`UNSUPPORTED_DATATYPE`
43adfa070a40 is described below

commit 43adfa070a40832d8316be8db164e3aca8a4f593
Author: panbingkun <panbing...@baidu.com>
AuthorDate: Thu Jan 25 18:04:17 2024 +0300

    [SPARK-46850][SQL] Convert `_LEGACY_ERROR_TEMP_2102 ` to 
`UNSUPPORTED_DATATYPE`
    
    ### What changes were proposed in this pull request?
    The pr aims to
    - convert `_LEGACY_ERROR_TEMP_2102` to `UNSUPPORTED_DATATYPE`.
    - remove some outdated comments.
    
    ### Why are the changes needed?
    The changes improve the error framework.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    - Add new UT
    - Pass GA
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No.
    
    Closes #44871 from panbingkun/LEGACY_ERROR_TEMP_2102.
    
    Lead-authored-by: panbingkun <panbing...@baidu.com>
    Co-authored-by: Maxim Gekk <max.g...@gmail.com>
    Signed-off-by: Max Gekk <max.g...@gmail.com>
---
 .../src/main/resources/error/error-classes.json    |  5 -----
 .../spark/sql/catalyst/csv/UnivocityParser.scala   |  6 ++---
 .../spark/sql/catalyst/json/JacksonParser.scala    |  4 ++--
 .../spark/sql/errors/QueryExecutionErrors.scala    |  7 ------
 .../spark/sql/execution/columnar/ColumnType.scala  |  4 ++--
 .../org/apache/spark/sql/CsvFunctionsSuite.scala   | 26 +++++++++++++++++++++-
 .../sql/execution/columnar/ColumnTypeSuite.scala   | 14 +++++++-----
 7 files changed, 39 insertions(+), 27 deletions(-)

diff --git a/common/utils/src/main/resources/error/error-classes.json 
b/common/utils/src/main/resources/error/error-classes.json
index 1f3122a502c5..64d65fd4beed 100644
--- a/common/utils/src/main/resources/error/error-classes.json
+++ b/common/utils/src/main/resources/error/error-classes.json
@@ -5966,11 +5966,6 @@
       "Not support non-primitive type now."
     ]
   },
-  "_LEGACY_ERROR_TEMP_2102" : {
-    "message" : [
-      "Unsupported type: <catalogString>."
-    ]
-  },
   "_LEGACY_ERROR_TEMP_2103" : {
     "message" : [
       "Dictionary encoding should not be used because of dictionary overflow."
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
index b99ee630d4b2..eb7e120277bb 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
@@ -30,13 +30,12 @@ import org.apache.spark.sql.catalyst.expressions.{Cast, 
EmptyRow, ExprUtils, Gen
 import org.apache.spark.sql.catalyst.util._
 import org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT
 import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns._
-import org.apache.spark.sql.errors.QueryExecutionErrors
+import org.apache.spark.sql.errors.{ExecutionErrors, QueryExecutionErrors}
 import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, SQLConf}
 import org.apache.spark.sql.sources.Filter
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
 
-
 /**
  * Constructs a parser for a given schema that translates CSV data to an 
[[InternalRow]].
  *
@@ -273,8 +272,7 @@ class UnivocityParser(
     case udt: UserDefinedType[_] =>
       makeConverter(name, udt.sqlType, nullable)
 
-    // We don't actually hit this exception though, we keep it for 
understandability
-    case _ => throw QueryExecutionErrors.unsupportedTypeError(dataType)
+    case _ => throw ExecutionErrors.unsupportedDataTypeError(dataType)
   }
 
   private def nullSafeDatum(
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
index eace96ac8729..36f37888b084 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
@@ -32,7 +32,7 @@ import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.util._
 import org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT
 import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns._
-import org.apache.spark.sql.errors.QueryExecutionErrors
+import org.apache.spark.sql.errors.{ExecutionErrors, QueryExecutionErrors}
 import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, SQLConf}
 import org.apache.spark.sql.sources.Filter
 import org.apache.spark.sql.types._
@@ -381,7 +381,7 @@ class JacksonParser(
       }
 
     // We don't actually hit this exception though, we keep it for 
understandability
-    case _ => throw QueryExecutionErrors.unsupportedTypeError(dataType)
+    case _ => throw ExecutionErrors.unsupportedDataTypeError(dataType)
   }
 
   /**
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
index 69794517f917..b09885c904a5 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
@@ -1097,13 +1097,6 @@ private[sql] object QueryExecutionErrors extends 
QueryErrorsBase with ExecutionE
       messageParameters = Map.empty)
   }
 
-  def unsupportedTypeError(dataType: DataType): Throwable = {
-    new SparkException(
-      errorClass = "_LEGACY_ERROR_TEMP_2102",
-      messageParameters = Map("catalogString" -> dataType.catalogString),
-      cause = null)
-  }
-
   def useDictionaryEncodingWhenDictionaryOverflowError(): Throwable = {
     new SparkException(
       errorClass = "_LEGACY_ERROR_TEMP_2103",
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnType.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnType.scala
index 96c7b4103142..53cb568d2060 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnType.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnType.scala
@@ -25,7 +25,7 @@ import scala.annotation.tailrec
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.types.{PhysicalArrayType, 
PhysicalBinaryType, PhysicalBooleanType, PhysicalByteType, 
PhysicalCalendarIntervalType, PhysicalDataType, PhysicalDecimalType, 
PhysicalDoubleType, PhysicalFloatType, PhysicalIntegerType, PhysicalLongType, 
PhysicalMapType, PhysicalNullType, PhysicalShortType, PhysicalStringType, 
PhysicalStructType}
-import org.apache.spark.sql.errors.QueryExecutionErrors
+import org.apache.spark.sql.errors.ExecutionErrors
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.Platform
 import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
@@ -829,7 +829,7 @@ private[columnar] object ColumnType {
       case map: MapType => MAP(PhysicalMapType(map.keyType, map.valueType, 
map.valueContainsNull))
       case struct: StructType => STRUCT(PhysicalStructType(struct.fields))
       case udt: UserDefinedType[_] => ColumnType(udt.sqlType)
-      case other => throw QueryExecutionErrors.unsupportedTypeError(other)
+      case _ => throw ExecutionErrors.unsupportedDataTypeError(dataType)
     }
   }
 }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala
index e7c1f0414b61..22a439bd179b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala
@@ -23,7 +23,8 @@ import java.util.Locale
 
 import scala.jdk.CollectionConverters._
 
-import org.apache.spark.SparkException
+import org.apache.spark.{SparkException, SparkUnsupportedOperationException}
+import org.apache.spark.sql.errors.DataTypeErrors.toSQLType
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSparkSession
@@ -159,6 +160,29 @@ class CsvFunctionsSuite extends QueryTest with 
SharedSparkSession {
       Row(Row(null)))
   }
 
+  test("from_csv with invalid datatype") {
+    val rows = new java.util.ArrayList[Row]()
+    rows.add(Row(1L, Row(2L, "Alice", Array(100L, 200L, null, 300L))))
+
+    val valueType = StructType(Seq(
+      StructField("age", LongType),
+      StructField("name", StringType),
+      StructField("scores", ArrayType(LongType))))
+
+    val schema = StructType(Seq(StructField("key", LongType), 
StructField("value", valueType)))
+
+    val options = Map.empty[String, String]
+    val df = spark.createDataFrame(rows, schema)
+
+    checkError(
+      exception = intercept[SparkException] {
+        df.select(from_csv(to_csv($"value"), schema, options)).collect()
+      }.getCause.asInstanceOf[SparkUnsupportedOperationException],
+      errorClass = "UNSUPPORTED_DATATYPE",
+      parameters = Map("typeName" -> toSQLType(valueType))
+    )
+  }
+
   test("from_csv with option (nanValue)") {
     val df = Seq("#").toDS()
     val schema = new StructType().add("float", FloatType)
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnTypeSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnTypeSuite.scala
index bea3555348d3..d79ac8dc3545 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnTypeSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnTypeSuite.scala
@@ -21,7 +21,7 @@ import java.nio.{ByteBuffer, ByteOrder}
 import java.nio.charset.StandardCharsets
 import java.time.{Duration, Period}
 
-import org.apache.spark.SparkFunSuite
+import org.apache.spark.{SparkFunSuite, SparkUnsupportedOperationException}
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.catalyst.CatalystTypeConverters
 import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, 
UnsafeProjection}
@@ -163,10 +163,12 @@ class ColumnTypeSuite extends SparkFunSuite {
         override def typeName: String = "invalid type name"
     }
 
-    val message = intercept[java.lang.Exception] {
-      ColumnType(invalidType)
-    }.getMessage
-
-    assert(message.contains("Unsupported type: invalid type name"))
+    checkError(
+      exception = intercept[SparkUnsupportedOperationException] {
+        ColumnType(invalidType)
+      },
+      errorClass = "UNSUPPORTED_DATATYPE",
+      parameters = Map("typeName" -> "\"INVALID TYPE NAME\"")
+    )
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to