This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new ccdc19d587c3 [SPARK-46654][SQL][PYTHON] Make `to_csv` explicitly indicate that it does not support some types of data ccdc19d587c3 is described below commit ccdc19d587c35be227df7ad24118bddfcd77495b Author: panbingkun <panbing...@baidu.com> AuthorDate: Wed Mar 13 17:15:52 2024 +0500 [SPARK-46654][SQL][PYTHON] Make `to_csv` explicitly indicate that it does not support some types of data ### What changes were proposed in this pull request? The pr aims to make `to_csv` `explicitly` indicate that it does not support `some types of data`, includes: - Struct - Array - Map - Variant - Binary ### Why are the changes needed? - Because the `CSV specification` does not have standards for these `types`, and cannot be read back through `from_csv`. - For this case Before: <img width="1000" alt="image" src="https://github.com/apache/spark/assets/15246973/f298738d-8817-473c-b759-c3f83026ff33"> After: <img width="1412" alt="image" src="https://github.com/apache/spark/assets/15246973/f32c1282-13b4-4268-8ccb-60ae1f04c358"> ### Does this PR introduce _any_ user-facing change? Yes, `to_csv` will explicitly indicate that it does not support some types of data. ### How was this patch tested? - Add & Update UT. - Pass GA. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #44665 from panbingkun/SPARK-46654. Authored-by: panbingkun <panbing...@baidu.com> Signed-off-by: Max Gekk <max.g...@gmail.com> --- docs/sql-migration-guide.md | 1 + python/pyspark/sql/functions/builtin.py | 18 +- .../sql/catalyst/expressions/csvExpressions.scala | 36 +++- .../org/apache/spark/sql/CsvFunctionsSuite.scala | 225 ++++++++++++++++++++- 4 files changed, 252 insertions(+), 28 deletions(-) diff --git a/docs/sql-migration-guide.md b/docs/sql-migration-guide.md index 9f92d6fc8347..325e58bf753d 100644 --- a/docs/sql-migration-guide.md +++ b/docs/sql-migration-guide.md @@ -39,6 +39,7 @@ license: | - Since Spark 4.0, the default value of `spark.sql.orc.compression.codec` is changed from `snappy` to `zstd`. To restore the previous behavior, set `spark.sql.orc.compression.codec` to `snappy`. - Since Spark 4.0, `SELECT (*)` is equivalent to `SELECT struct(*)` instead of `SELECT *`. To restore the previous behavior, set `spark.sql.legacy.ignoreParenthesesAroundStar` to `true`. - Since Spark 4.0, the SQL config `spark.sql.legacy.allowZeroIndexInFormatString` is deprecated. Consider to change `strfmt` of the `format_string` function to use 1-based indexes. The first argument must be referenced by "1$", the second by "2$", etc. +- Since Spark 4.0, the function `to_csv` no longer supports input with the data type `STRUCT`, `ARRAY`, `MAP`, `VARIANT` and `BINARY` (because the `CSV specification` does not have standards for these data types and cannot be read back using `from_csv`), Spark will throw `DATATYPE_MISMATCH.UNSUPPORTED_INPUT_TYPE` exception. ## Upgrading from Spark SQL 3.4 to 3.5 diff --git a/python/pyspark/sql/functions/builtin.py b/python/pyspark/sql/functions/builtin.py index 6320f9b922ee..d55080111592 100644 --- a/python/pyspark/sql/functions/builtin.py +++ b/python/pyspark/sql/functions/builtin.py @@ -15491,8 +15491,6 @@ def schema_of_csv(csv: Union[Column, str], options: Optional[Dict[str, str]] = N return _invoke_function("schema_of_csv", col, _options_to_str(options)) -# TODO(SPARK-46654) Re-enable the `Example 2` test after fixing the display -# difference between Regular Spark and Spark Connect on `df.show`. @_try_remote_functions def to_csv(col: "ColumnOrName", options: Optional[Dict[str, str]] = None) -> Column: """ @@ -15534,19 +15532,7 @@ def to_csv(col: "ColumnOrName", options: Optional[Dict[str, str]] = None) -> Col | 2,Alice| +-------------+ - Example 2: Converting a complex StructType to a CSV string - - >>> from pyspark.sql import Row, functions as sf - >>> data = [(1, Row(age=2, name='Alice', scores=[100, 200, 300]))] - >>> df = spark.createDataFrame(data, ("key", "value")) - >>> df.select(sf.to_csv(df.value)).show(truncate=False) # doctest: +SKIP - +-----------------------+ - |to_csv(value) | - +-----------------------+ - |2,Alice,"[100,200,300]"| - +-----------------------+ - - Example 3: Converting a StructType with null values to a CSV string + Example 2: Converting a StructType with null values to a CSV string >>> from pyspark.sql import Row, functions as sf >>> from pyspark.sql.types import StructType, StructField, IntegerType, StringType @@ -15566,7 +15552,7 @@ def to_csv(col: "ColumnOrName", options: Optional[Dict[str, str]] = None) -> Col | ,Alice| +-------------+ - Example 4: Converting a StructType with different data types to a CSV string + Example 3: Converting a StructType with different data types to a CSV string >>> from pyspark.sql import Row, functions as sf >>> data = [(1, Row(age=2, name='Alice', isStudent=True))] diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala index a2d17617a10f..ff696ebaf60d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala @@ -19,15 +19,18 @@ package org.apache.spark.sql.catalyst.expressions import java.io.CharArrayWriter +import scala.annotation.tailrec + import com.univocity.parsers.csv.CsvParser -import org.apache.spark.{SparkException, SparkIllegalArgumentException} +import org.apache.spark.SparkException import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.TypeCheckResult -import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.DataTypeMismatch +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{DataTypeMismatch, TypeCheckSuccess} import org.apache.spark.sql.catalyst.csv._ import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodegenFallback, ExprCode} import org.apache.spark.sql.catalyst.util._ +import org.apache.spark.sql.catalyst.util.TypeUtils._ import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryErrorsBase} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ @@ -260,16 +263,33 @@ case class StructsToCsv( child = child, timeZoneId = None) + override def checkInputDataTypes(): TypeCheckResult = { + child.dataType match { + case schema: StructType if schema.map(_.dataType).forall( + dt => isSupportedDataType(dt)) => TypeCheckSuccess + case _ => DataTypeMismatch( + errorSubClass = "UNSUPPORTED_INPUT_TYPE", + messageParameters = Map( + "functionName" -> toSQLId(prettyName), + "dataType" -> toSQLType(child.dataType) + ) + ) + } + } + + @tailrec + private def isSupportedDataType(dataType: DataType): Boolean = dataType match { + case _: VariantType | BinaryType => false + case _: AtomicType | CalendarIntervalType => true + case udt: UserDefinedType[_] => isSupportedDataType(udt.sqlType) + case _ => false + } + @transient lazy val writer = new CharArrayWriter() @transient - lazy val inputSchema: StructType = child.dataType match { - case st: StructType => st - case other => throw new SparkIllegalArgumentException( - errorClass = "_LEGACY_ERROR_TEMP_3234", - messageParameters = Map("other" -> other.catalogString)) - } + lazy val inputSchema: StructType = child.dataType.asInstanceOf[StructType] @transient lazy val gen = new UnivocityGenerator( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala index 5c6891f6a7f5..c3d69b34ff99 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql +import java.nio.charset.StandardCharsets import java.text.SimpleDateFormat import java.time.{Duration, LocalDateTime, Period} import java.util.Locale @@ -31,6 +32,7 @@ import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types._ import org.apache.spark.sql.types.DayTimeIntervalType.{DAY, HOUR, MINUTE, SECOND} import org.apache.spark.sql.types.YearMonthIntervalType.{MONTH, YEAR} +import org.apache.spark.unsafe.types._ class CsvFunctionsSuite extends QueryTest with SharedSparkSession { import testImplicits._ @@ -176,7 +178,7 @@ class CsvFunctionsSuite extends QueryTest with SharedSparkSession { checkError( exception = intercept[SparkUnsupportedOperationException] { - df.select(from_csv(to_csv($"value"), schema, options)).collect() + df.select(from_csv(lit("any value"), schema, options)).collect() }, errorClass = "UNSUPPORTED_DATATYPE", parameters = Map("typeName" -> toSQLType(valueType)) @@ -294,10 +296,19 @@ class CsvFunctionsSuite extends QueryTest with SharedSparkSession { } test("to_csv with option (nullValue)") { - val df = Seq(Tuple1(Tuple1(null))).toDF("a") - val options = Map("nullValue" -> "-").asJava + val rows = new java.util.ArrayList[Row]() + rows.add(Row(1L, Row(2L, "Alice", null))) - checkAnswer(df.select(to_csv($"a", options)), Row("-") :: Nil) + val valueType = StructType(Seq( + StructField("age", LongType), + StructField("name", StringType), + StructField("score", IntegerType))) + + val schema = StructType(Seq(StructField("key", LongType), StructField("value", valueType))) + val df = spark.createDataFrame(rows, schema) + + val options = Map("nullValue" -> "-").asJava + checkAnswer(df.select(to_csv($"value", options)), Row("2,Alice,-") :: Nil) } test("to_csv with option (dateFormat)") { @@ -603,4 +614,210 @@ class CsvFunctionsSuite extends QueryTest with SharedSparkSession { $"csv", schema_of_csv("1,2\n2"), Map.empty[String, String].asJava)) checkAnswer(actual, Row(Row(1, "2\n2"))) } + + test("SPARK-46654: from_csv/to_csv does not support ArrayType data") { + val rows = new java.util.ArrayList[Row]() + rows.add(Row(1L, Row(2L, "Alice", Array(100L, 200L, null, 300L)))) + + val valueSchema = StructType(Seq( + StructField("age", LongType), + StructField("name", StringType), + StructField("scores", ArrayType(LongType)))) + val schema = StructType(Seq( + StructField("key", LongType), + StructField("value", valueSchema))) + + val df = spark.createDataFrame(rows, schema) + + checkError( + exception = intercept[AnalysisException] { + df.select(to_csv($"value")).collect() + }, + errorClass = "DATATYPE_MISMATCH.UNSUPPORTED_INPUT_TYPE", + parameters = Map( + "functionName" -> "`to_csv`", + "dataType" -> "\"STRUCT<age: BIGINT, name: STRING, scores: ARRAY<BIGINT>>\"", + "sqlExpr" -> "\"to_csv(value)\""), + context = ExpectedContext(fragment = "to_csv", getCurrentClassCallSitePattern) + ) + + checkError( + exception = intercept[SparkUnsupportedOperationException] { + df.select(from_csv(lit("data"), valueSchema, Map.empty[String, String])).collect() + }, + errorClass = "UNSUPPORTED_DATATYPE", + parameters = Map("typeName" -> "\"ARRAY<BIGINT>\"") + ) + } + + test("SPARK-46654: from_csv/to_csv does not support MapType data") { + val rows = new java.util.ArrayList[Row]() + rows.add(Row(1L, Row(2L, "Alice", + Map("math" -> 100L, "english" -> 200L, "science" -> null)))) + + val valueSchema = StructType(Seq( + StructField("age", LongType), + StructField("name", StringType), + StructField("scores", MapType(StringType, LongType)))) + val schema = StructType(Seq( + StructField("key", LongType), + StructField("value", valueSchema))) + + val df = spark.createDataFrame(rows, schema) + + checkError( + exception = intercept[AnalysisException] { + df.select(to_csv($"value")).collect() + }, + errorClass = "DATATYPE_MISMATCH.UNSUPPORTED_INPUT_TYPE", + parameters = Map( + "functionName" -> "`to_csv`", + "dataType" -> "\"STRUCT<age: BIGINT, name: STRING, scores: MAP<STRING, BIGINT>>\"", + "sqlExpr" -> "\"to_csv(value)\""), + context = ExpectedContext(fragment = "to_csv", getCurrentClassCallSitePattern) + ) + + checkError( + exception = intercept[SparkUnsupportedOperationException] { + df.select(from_csv(lit("data"), valueSchema, Map.empty[String, String])).collect() + }, + errorClass = "UNSUPPORTED_DATATYPE", + parameters = Map("typeName" -> "\"MAP<STRING, BIGINT>\"") + ) + } + + test("SPARK-46654: from_csv/to_csv does not support StructType data") { + val rows = new java.util.ArrayList[Row]() + rows.add(Row(1L, Row(2L, "Alice", Row(100L, 200L, null)))) + + val valueSchema = StructType(Seq( + StructField("age", LongType), + StructField("name", StringType), + StructField("scores", StructType(Seq( + StructField("id1", LongType), + StructField("id2", LongType), + StructField("id3", LongType)))))) + val schema = StructType(Seq( + StructField("key", LongType), + StructField("value", valueSchema))) + + val df = spark.createDataFrame(rows, schema) + + checkError( + exception = intercept[AnalysisException] { + df.select(to_csv($"value")).collect() + }, + errorClass = "DATATYPE_MISMATCH.UNSUPPORTED_INPUT_TYPE", + parameters = Map( + "functionName" -> "`to_csv`", + "dataType" -> ("\"STRUCT<age: BIGINT, name: STRING, " + + "scores: STRUCT<id1: BIGINT, id2: BIGINT, id3: BIGINT>>\""), + "sqlExpr" -> "\"to_csv(value)\""), + context = ExpectedContext(fragment = "to_csv", getCurrentClassCallSitePattern) + ) + + checkError( + exception = intercept[SparkUnsupportedOperationException] { + df.select(from_csv(lit("data"), valueSchema, Map.empty[String, String])).collect() + }, + errorClass = "UNSUPPORTED_DATATYPE", + parameters = Map("typeName" -> "\"STRUCT<id1: BIGINT, id2: BIGINT, id3: BIGINT>\"") + ) + } + + test("SPARK-46654: from_csv/to_csv does not support VariantType data") { + val rows = new java.util.ArrayList[Row]() + rows.add(Row(1L, Row(2L, "Alice", new VariantVal(Array[Byte](1, 2, 3), Array[Byte](4, 5))))) + + val valueSchema = StructType(Seq( + StructField("age", LongType), + StructField("name", StringType), + StructField("v", VariantType))) + val schema = StructType(Seq( + StructField("key", LongType), + StructField("value", valueSchema))) + + val df = spark.createDataFrame(rows, schema) + + checkError( + exception = intercept[AnalysisException] { + df.select(to_csv($"value")).collect() + }, + errorClass = "DATATYPE_MISMATCH.UNSUPPORTED_INPUT_TYPE", + parameters = Map( + "functionName" -> "`to_csv`", + "dataType" -> "\"STRUCT<age: BIGINT, name: STRING, v: VARIANT>\"", + "sqlExpr" -> "\"to_csv(value)\""), + context = ExpectedContext(fragment = "to_csv", getCurrentClassCallSitePattern) + ) + + checkError( + exception = intercept[SparkUnsupportedOperationException] { + df.select(from_csv(lit("data"), valueSchema, Map.empty[String, String])).collect() + }, + errorClass = "UNSUPPORTED_DATATYPE", + parameters = Map("typeName" -> "\"VARIANT\"") + ) + } + + test("SPARK-46654: from_csv/to_csv does not support BinaryType data") { + val rows = new java.util.ArrayList[Row]() + rows.add(Row(1L, Row(2L, "Alice", "b".getBytes(StandardCharsets.UTF_8)))) + + val valueSchema = StructType(Seq( + StructField("age", LongType), + StructField("name", StringType), + StructField("b", BinaryType))) + val schema = StructType(Seq( + StructField("key", LongType), + StructField("value", valueSchema))) + + val df = spark.createDataFrame(rows, schema) + + checkError( + exception = intercept[AnalysisException] { + df.select(to_csv($"value")).collect() + }, + errorClass = "DATATYPE_MISMATCH.UNSUPPORTED_INPUT_TYPE", + parameters = Map( + "functionName" -> "`to_csv`", + "dataType" -> "\"STRUCT<age: BIGINT, name: STRING, b: BINARY>\"", + "sqlExpr" -> "\"to_csv(value)\""), + context = ExpectedContext(fragment = "to_csv", getCurrentClassCallSitePattern) + ) + + checkError( + exception = intercept[SparkUnsupportedOperationException] { + df.select(from_csv(lit("data"), valueSchema, Map.empty[String, String])).collect() + }, + errorClass = "UNSUPPORTED_DATATYPE", + parameters = Map("typeName" -> "\"BINARY\"") + ) + } + + test("SPARK-46654: from_csv/to_csv does not support NullType data") { + val df = Seq(Tuple1(Tuple1(null))).toDF("value") + val valueSchema = df.schema + val options = Map("nullValue" -> "-") + + checkError( + exception = intercept[AnalysisException] { + df.select(to_csv($"value", options.asJava)).collect() + }, + errorClass = "DATATYPE_MISMATCH.UNSUPPORTED_INPUT_TYPE", + parameters = Map( + "functionName" -> "`to_csv`", + "dataType" -> "\"STRUCT<_1: VOID>\"", + "sqlExpr" -> "\"to_csv(value)\""), + context = ExpectedContext(fragment = "to_csv", getCurrentClassCallSitePattern) + ) + + checkError( + exception = intercept[SparkUnsupportedOperationException] { + df.select(from_csv(lit("data"), valueSchema, options)).collect() + }, + errorClass = "UNSUPPORTED_DATATYPE", + parameters = Map("typeName" -> "\"STRUCT<_1: VOID>\"") + ) + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org