This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 43adfa070a40 [SPARK-46850][SQL] Convert `_LEGACY_ERROR_TEMP_2102 ` to `UNSUPPORTED_DATATYPE` 43adfa070a40 is described below commit 43adfa070a40832d8316be8db164e3aca8a4f593 Author: panbingkun <panbing...@baidu.com> AuthorDate: Thu Jan 25 18:04:17 2024 +0300 [SPARK-46850][SQL] Convert `_LEGACY_ERROR_TEMP_2102 ` to `UNSUPPORTED_DATATYPE` ### What changes were proposed in this pull request? The pr aims to - convert `_LEGACY_ERROR_TEMP_2102` to `UNSUPPORTED_DATATYPE`. - remove some outdated comments. ### Why are the changes needed? The changes improve the error framework. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? - Add new UT - Pass GA ### Was this patch authored or co-authored using generative AI tooling? No. Closes #44871 from panbingkun/LEGACY_ERROR_TEMP_2102. Lead-authored-by: panbingkun <panbing...@baidu.com> Co-authored-by: Maxim Gekk <max.g...@gmail.com> Signed-off-by: Max Gekk <max.g...@gmail.com> --- .../src/main/resources/error/error-classes.json | 5 ----- .../spark/sql/catalyst/csv/UnivocityParser.scala | 6 ++--- .../spark/sql/catalyst/json/JacksonParser.scala | 4 ++-- .../spark/sql/errors/QueryExecutionErrors.scala | 7 ------ .../spark/sql/execution/columnar/ColumnType.scala | 4 ++-- .../org/apache/spark/sql/CsvFunctionsSuite.scala | 26 +++++++++++++++++++++- .../sql/execution/columnar/ColumnTypeSuite.scala | 14 +++++++----- 7 files changed, 39 insertions(+), 27 deletions(-) diff --git a/common/utils/src/main/resources/error/error-classes.json b/common/utils/src/main/resources/error/error-classes.json index 1f3122a502c5..64d65fd4beed 100644 --- a/common/utils/src/main/resources/error/error-classes.json +++ b/common/utils/src/main/resources/error/error-classes.json @@ -5966,11 +5966,6 @@ "Not support non-primitive type now." ] }, - "_LEGACY_ERROR_TEMP_2102" : { - "message" : [ - "Unsupported type: <catalogString>." - ] - }, "_LEGACY_ERROR_TEMP_2103" : { "message" : [ "Dictionary encoding should not be used because of dictionary overflow." diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala index b99ee630d4b2..eb7e120277bb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala @@ -30,13 +30,12 @@ import org.apache.spark.sql.catalyst.expressions.{Cast, EmptyRow, ExprUtils, Gen import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns._ -import org.apache.spark.sql.errors.QueryExecutionErrors +import org.apache.spark.sql.errors.{ExecutionErrors, QueryExecutionErrors} import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, SQLConf} import org.apache.spark.sql.sources.Filter import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String - /** * Constructs a parser for a given schema that translates CSV data to an [[InternalRow]]. * @@ -273,8 +272,7 @@ class UnivocityParser( case udt: UserDefinedType[_] => makeConverter(name, udt.sqlType, nullable) - // We don't actually hit this exception though, we keep it for understandability - case _ => throw QueryExecutionErrors.unsupportedTypeError(dataType) + case _ => throw ExecutionErrors.unsupportedDataTypeError(dataType) } private def nullSafeDatum( diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala index eace96ac8729..36f37888b084 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala @@ -32,7 +32,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns._ -import org.apache.spark.sql.errors.QueryExecutionErrors +import org.apache.spark.sql.errors.{ExecutionErrors, QueryExecutionErrors} import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, SQLConf} import org.apache.spark.sql.sources.Filter import org.apache.spark.sql.types._ @@ -381,7 +381,7 @@ class JacksonParser( } // We don't actually hit this exception though, we keep it for understandability - case _ => throw QueryExecutionErrors.unsupportedTypeError(dataType) + case _ => throw ExecutionErrors.unsupportedDataTypeError(dataType) } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala index 69794517f917..b09885c904a5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala @@ -1097,13 +1097,6 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase with ExecutionE messageParameters = Map.empty) } - def unsupportedTypeError(dataType: DataType): Throwable = { - new SparkException( - errorClass = "_LEGACY_ERROR_TEMP_2102", - messageParameters = Map("catalogString" -> dataType.catalogString), - cause = null) - } - def useDictionaryEncodingWhenDictionaryOverflowError(): Throwable = { new SparkException( errorClass = "_LEGACY_ERROR_TEMP_2103", diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnType.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnType.scala index 96c7b4103142..53cb568d2060 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnType.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnType.scala @@ -25,7 +25,7 @@ import scala.annotation.tailrec import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.types.{PhysicalArrayType, PhysicalBinaryType, PhysicalBooleanType, PhysicalByteType, PhysicalCalendarIntervalType, PhysicalDataType, PhysicalDecimalType, PhysicalDoubleType, PhysicalFloatType, PhysicalIntegerType, PhysicalLongType, PhysicalMapType, PhysicalNullType, PhysicalShortType, PhysicalStringType, PhysicalStructType} -import org.apache.spark.sql.errors.QueryExecutionErrors +import org.apache.spark.sql.errors.ExecutionErrors import org.apache.spark.sql.types._ import org.apache.spark.unsafe.Platform import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String} @@ -829,7 +829,7 @@ private[columnar] object ColumnType { case map: MapType => MAP(PhysicalMapType(map.keyType, map.valueType, map.valueContainsNull)) case struct: StructType => STRUCT(PhysicalStructType(struct.fields)) case udt: UserDefinedType[_] => ColumnType(udt.sqlType) - case other => throw QueryExecutionErrors.unsupportedTypeError(other) + case _ => throw ExecutionErrors.unsupportedDataTypeError(dataType) } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala index e7c1f0414b61..22a439bd179b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala @@ -23,7 +23,8 @@ import java.util.Locale import scala.jdk.CollectionConverters._ -import org.apache.spark.SparkException +import org.apache.spark.{SparkException, SparkUnsupportedOperationException} +import org.apache.spark.sql.errors.DataTypeErrors.toSQLType import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession @@ -159,6 +160,29 @@ class CsvFunctionsSuite extends QueryTest with SharedSparkSession { Row(Row(null))) } + test("from_csv with invalid datatype") { + val rows = new java.util.ArrayList[Row]() + rows.add(Row(1L, Row(2L, "Alice", Array(100L, 200L, null, 300L)))) + + val valueType = StructType(Seq( + StructField("age", LongType), + StructField("name", StringType), + StructField("scores", ArrayType(LongType)))) + + val schema = StructType(Seq(StructField("key", LongType), StructField("value", valueType))) + + val options = Map.empty[String, String] + val df = spark.createDataFrame(rows, schema) + + checkError( + exception = intercept[SparkException] { + df.select(from_csv(to_csv($"value"), schema, options)).collect() + }.getCause.asInstanceOf[SparkUnsupportedOperationException], + errorClass = "UNSUPPORTED_DATATYPE", + parameters = Map("typeName" -> toSQLType(valueType)) + ) + } + test("from_csv with option (nanValue)") { val df = Seq("#").toDS() val schema = new StructType().add("float", FloatType) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnTypeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnTypeSuite.scala index bea3555348d3..d79ac8dc3545 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnTypeSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnTypeSuite.scala @@ -21,7 +21,7 @@ import java.nio.{ByteBuffer, ByteOrder} import java.nio.charset.StandardCharsets import java.time.{Duration, Period} -import org.apache.spark.SparkFunSuite +import org.apache.spark.{SparkFunSuite, SparkUnsupportedOperationException} import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.CatalystTypeConverters import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeProjection} @@ -163,10 +163,12 @@ class ColumnTypeSuite extends SparkFunSuite { override def typeName: String = "invalid type name" } - val message = intercept[java.lang.Exception] { - ColumnType(invalidType) - }.getMessage - - assert(message.contains("Unsupported type: invalid type name")) + checkError( + exception = intercept[SparkUnsupportedOperationException] { + ColumnType(invalidType) + }, + errorClass = "UNSUPPORTED_DATATYPE", + parameters = Map("typeName" -> "\"INVALID TYPE NAME\"") + ) } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org