This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new ccdc19d587c3 [SPARK-46654][SQL][PYTHON] Make `to_csv` explicitly 
indicate that it does not support some types of data
ccdc19d587c3 is described below

commit ccdc19d587c35be227df7ad24118bddfcd77495b
Author: panbingkun <panbing...@baidu.com>
AuthorDate: Wed Mar 13 17:15:52 2024 +0500

    [SPARK-46654][SQL][PYTHON] Make `to_csv` explicitly indicate that it does 
not support some types of data
    
    ### What changes were proposed in this pull request?
    The pr aims to make `to_csv` `explicitly` indicate that it does not support 
`some types of data`, includes:
    - Struct
    - Array
    - Map
    - Variant
    - Binary
    
    ### Why are the changes needed?
    - Because the `CSV specification` does not have standards for these 
`types`, and cannot be read back through `from_csv`.
    
    - For this case
      Before:
      <img width="1000" alt="image" 
src="https://github.com/apache/spark/assets/15246973/f298738d-8817-473c-b759-c3f83026ff33";>
    
      After:
      <img width="1412" alt="image" 
src="https://github.com/apache/spark/assets/15246973/f32c1282-13b4-4268-8ccb-60ae1f04c358";>
    
    ### Does this PR introduce _any_ user-facing change?
    Yes, `to_csv` will explicitly indicate that it does not support some types 
of data.
    
    ### How was this patch tested?
    - Add & Update UT.
    - Pass GA.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No.
    
    Closes #44665 from panbingkun/SPARK-46654.
    
    Authored-by: panbingkun <panbing...@baidu.com>
    Signed-off-by: Max Gekk <max.g...@gmail.com>
---
 docs/sql-migration-guide.md                        |   1 +
 python/pyspark/sql/functions/builtin.py            |  18 +-
 .../sql/catalyst/expressions/csvExpressions.scala  |  36 +++-
 .../org/apache/spark/sql/CsvFunctionsSuite.scala   | 225 ++++++++++++++++++++-
 4 files changed, 252 insertions(+), 28 deletions(-)

diff --git a/docs/sql-migration-guide.md b/docs/sql-migration-guide.md
index 9f92d6fc8347..325e58bf753d 100644
--- a/docs/sql-migration-guide.md
+++ b/docs/sql-migration-guide.md
@@ -39,6 +39,7 @@ license: |
 - Since Spark 4.0, the default value of `spark.sql.orc.compression.codec` is 
changed from `snappy` to `zstd`. To restore the previous behavior, set 
`spark.sql.orc.compression.codec` to `snappy`.
 - Since Spark 4.0, `SELECT (*)` is equivalent to `SELECT struct(*)` instead of 
`SELECT *`. To restore the previous behavior, set 
`spark.sql.legacy.ignoreParenthesesAroundStar` to `true`.
 - Since Spark 4.0, the SQL config 
`spark.sql.legacy.allowZeroIndexInFormatString` is deprecated. Consider to 
change `strfmt` of the `format_string` function to use 1-based indexes. The 
first argument must be referenced by "1$", the second by "2$", etc.
+- Since Spark 4.0, the function `to_csv` no longer supports input with the 
data type `STRUCT`, `ARRAY`, `MAP`, `VARIANT` and `BINARY` (because the `CSV 
specification` does not have standards for these data types and cannot be read 
back using `from_csv`), Spark will throw 
`DATATYPE_MISMATCH.UNSUPPORTED_INPUT_TYPE` exception.
 
 ## Upgrading from Spark SQL 3.4 to 3.5
 
diff --git a/python/pyspark/sql/functions/builtin.py 
b/python/pyspark/sql/functions/builtin.py
index 6320f9b922ee..d55080111592 100644
--- a/python/pyspark/sql/functions/builtin.py
+++ b/python/pyspark/sql/functions/builtin.py
@@ -15491,8 +15491,6 @@ def schema_of_csv(csv: Union[Column, str], options: 
Optional[Dict[str, str]] = N
     return _invoke_function("schema_of_csv", col, _options_to_str(options))
 
 
-# TODO(SPARK-46654) Re-enable the `Example 2` test after fixing the display
-#  difference between Regular Spark and Spark Connect on `df.show`.
 @_try_remote_functions
 def to_csv(col: "ColumnOrName", options: Optional[Dict[str, str]] = None) -> 
Column:
     """
@@ -15534,19 +15532,7 @@ def to_csv(col: "ColumnOrName", options: 
Optional[Dict[str, str]] = None) -> Col
     |      2,Alice|
     +-------------+
 
-    Example 2: Converting a complex StructType to a CSV string
-
-    >>> from pyspark.sql import Row, functions as sf
-    >>> data = [(1, Row(age=2, name='Alice', scores=[100, 200, 300]))]
-    >>> df = spark.createDataFrame(data, ("key", "value"))
-    >>> df.select(sf.to_csv(df.value)).show(truncate=False) # doctest: +SKIP
-    +-----------------------+
-    |to_csv(value)          |
-    +-----------------------+
-    |2,Alice,"[100,200,300]"|
-    +-----------------------+
-
-    Example 3: Converting a StructType with null values to a CSV string
+    Example 2: Converting a StructType with null values to a CSV string
 
     >>> from pyspark.sql import Row, functions as sf
     >>> from pyspark.sql.types import StructType, StructField, IntegerType, 
StringType
@@ -15566,7 +15552,7 @@ def to_csv(col: "ColumnOrName", options: 
Optional[Dict[str, str]] = None) -> Col
     |       ,Alice|
     +-------------+
 
-    Example 4: Converting a StructType with different data types to a CSV 
string
+    Example 3: Converting a StructType with different data types to a CSV 
string
 
     >>> from pyspark.sql import Row, functions as sf
     >>> data = [(1, Row(age=2, name='Alice', isStudent=True))]
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala
index a2d17617a10f..ff696ebaf60d 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala
@@ -19,15 +19,18 @@ package org.apache.spark.sql.catalyst.expressions
 
 import java.io.CharArrayWriter
 
+import scala.annotation.tailrec
+
 import com.univocity.parsers.csv.CsvParser
 
-import org.apache.spark.{SparkException, SparkIllegalArgumentException}
+import org.apache.spark.SparkException
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
-import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.DataTypeMismatch
+import 
org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{DataTypeMismatch, 
TypeCheckSuccess}
 import org.apache.spark.sql.catalyst.csv._
 import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, 
CodegenFallback, ExprCode}
 import org.apache.spark.sql.catalyst.util._
+import org.apache.spark.sql.catalyst.util.TypeUtils._
 import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryErrorsBase}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
@@ -260,16 +263,33 @@ case class StructsToCsv(
       child = child,
       timeZoneId = None)
 
+  override def checkInputDataTypes(): TypeCheckResult = {
+    child.dataType match {
+      case schema: StructType if schema.map(_.dataType).forall(
+        dt => isSupportedDataType(dt)) => TypeCheckSuccess
+      case _ => DataTypeMismatch(
+        errorSubClass = "UNSUPPORTED_INPUT_TYPE",
+        messageParameters = Map(
+          "functionName" -> toSQLId(prettyName),
+          "dataType" -> toSQLType(child.dataType)
+        )
+      )
+    }
+  }
+
+  @tailrec
+  private def isSupportedDataType(dataType: DataType): Boolean = dataType 
match {
+    case _: VariantType | BinaryType => false
+    case _: AtomicType | CalendarIntervalType => true
+    case udt: UserDefinedType[_] => isSupportedDataType(udt.sqlType)
+    case _ => false
+  }
+
   @transient
   lazy val writer = new CharArrayWriter()
 
   @transient
-  lazy val inputSchema: StructType = child.dataType match {
-    case st: StructType => st
-    case other => throw new SparkIllegalArgumentException(
-      errorClass = "_LEGACY_ERROR_TEMP_3234",
-      messageParameters = Map("other" -> other.catalogString))
-  }
+  lazy val inputSchema: StructType = child.dataType.asInstanceOf[StructType]
 
   @transient
   lazy val gen = new UnivocityGenerator(
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala
index 5c6891f6a7f5..c3d69b34ff99 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql
 
+import java.nio.charset.StandardCharsets
 import java.text.SimpleDateFormat
 import java.time.{Duration, LocalDateTime, Period}
 import java.util.Locale
@@ -31,6 +32,7 @@ import org.apache.spark.sql.test.SharedSparkSession
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.types.DayTimeIntervalType.{DAY, HOUR, MINUTE, 
SECOND}
 import org.apache.spark.sql.types.YearMonthIntervalType.{MONTH, YEAR}
+import org.apache.spark.unsafe.types._
 
 class CsvFunctionsSuite extends QueryTest with SharedSparkSession {
   import testImplicits._
@@ -176,7 +178,7 @@ class CsvFunctionsSuite extends QueryTest with 
SharedSparkSession {
 
     checkError(
       exception = intercept[SparkUnsupportedOperationException] {
-        df.select(from_csv(to_csv($"value"), schema, options)).collect()
+        df.select(from_csv(lit("any value"), schema, options)).collect()
       },
       errorClass = "UNSUPPORTED_DATATYPE",
       parameters = Map("typeName" -> toSQLType(valueType))
@@ -294,10 +296,19 @@ class CsvFunctionsSuite extends QueryTest with 
SharedSparkSession {
   }
 
   test("to_csv with option (nullValue)") {
-    val df = Seq(Tuple1(Tuple1(null))).toDF("a")
-    val options = Map("nullValue" -> "-").asJava
+    val rows = new java.util.ArrayList[Row]()
+    rows.add(Row(1L, Row(2L, "Alice", null)))
 
-    checkAnswer(df.select(to_csv($"a", options)), Row("-") :: Nil)
+    val valueType = StructType(Seq(
+      StructField("age", LongType),
+      StructField("name", StringType),
+      StructField("score", IntegerType)))
+
+    val schema = StructType(Seq(StructField("key", LongType), 
StructField("value", valueType)))
+    val df = spark.createDataFrame(rows, schema)
+
+    val options = Map("nullValue" -> "-").asJava
+    checkAnswer(df.select(to_csv($"value", options)), Row("2,Alice,-") :: Nil)
   }
 
   test("to_csv with option (dateFormat)") {
@@ -603,4 +614,210 @@ class CsvFunctionsSuite extends QueryTest with 
SharedSparkSession {
       $"csv", schema_of_csv("1,2\n2"), Map.empty[String, String].asJava))
     checkAnswer(actual, Row(Row(1, "2\n2")))
   }
+
+  test("SPARK-46654: from_csv/to_csv does not support ArrayType data") {
+    val rows = new java.util.ArrayList[Row]()
+    rows.add(Row(1L, Row(2L, "Alice", Array(100L, 200L, null, 300L))))
+
+    val valueSchema = StructType(Seq(
+      StructField("age", LongType),
+      StructField("name", StringType),
+      StructField("scores", ArrayType(LongType))))
+    val schema = StructType(Seq(
+      StructField("key", LongType),
+      StructField("value", valueSchema)))
+
+    val df = spark.createDataFrame(rows, schema)
+
+    checkError(
+      exception = intercept[AnalysisException] {
+        df.select(to_csv($"value")).collect()
+      },
+      errorClass = "DATATYPE_MISMATCH.UNSUPPORTED_INPUT_TYPE",
+      parameters = Map(
+        "functionName" -> "`to_csv`",
+        "dataType" -> "\"STRUCT<age: BIGINT, name: STRING, scores: 
ARRAY<BIGINT>>\"",
+        "sqlExpr" -> "\"to_csv(value)\""),
+      context = ExpectedContext(fragment = "to_csv", 
getCurrentClassCallSitePattern)
+    )
+
+    checkError(
+      exception = intercept[SparkUnsupportedOperationException] {
+        df.select(from_csv(lit("data"), valueSchema, Map.empty[String, 
String])).collect()
+      },
+      errorClass = "UNSUPPORTED_DATATYPE",
+      parameters = Map("typeName" -> "\"ARRAY<BIGINT>\"")
+    )
+  }
+
+  test("SPARK-46654: from_csv/to_csv does not support MapType data") {
+    val rows = new java.util.ArrayList[Row]()
+    rows.add(Row(1L, Row(2L, "Alice",
+      Map("math" -> 100L, "english" -> 200L, "science" -> null))))
+
+    val valueSchema = StructType(Seq(
+      StructField("age", LongType),
+      StructField("name", StringType),
+      StructField("scores", MapType(StringType, LongType))))
+    val schema = StructType(Seq(
+      StructField("key", LongType),
+      StructField("value", valueSchema)))
+
+    val df = spark.createDataFrame(rows, schema)
+
+    checkError(
+      exception = intercept[AnalysisException] {
+        df.select(to_csv($"value")).collect()
+      },
+      errorClass = "DATATYPE_MISMATCH.UNSUPPORTED_INPUT_TYPE",
+      parameters = Map(
+        "functionName" -> "`to_csv`",
+        "dataType" -> "\"STRUCT<age: BIGINT, name: STRING, scores: MAP<STRING, 
BIGINT>>\"",
+        "sqlExpr" -> "\"to_csv(value)\""),
+      context = ExpectedContext(fragment = "to_csv", 
getCurrentClassCallSitePattern)
+    )
+
+    checkError(
+      exception = intercept[SparkUnsupportedOperationException] {
+        df.select(from_csv(lit("data"), valueSchema, Map.empty[String, 
String])).collect()
+      },
+      errorClass = "UNSUPPORTED_DATATYPE",
+      parameters = Map("typeName" -> "\"MAP<STRING, BIGINT>\"")
+    )
+  }
+
+  test("SPARK-46654: from_csv/to_csv does not support StructType data") {
+    val rows = new java.util.ArrayList[Row]()
+    rows.add(Row(1L, Row(2L, "Alice", Row(100L, 200L, null))))
+
+    val valueSchema = StructType(Seq(
+      StructField("age", LongType),
+      StructField("name", StringType),
+      StructField("scores", StructType(Seq(
+        StructField("id1", LongType),
+        StructField("id2", LongType),
+        StructField("id3", LongType))))))
+    val schema = StructType(Seq(
+      StructField("key", LongType),
+      StructField("value", valueSchema)))
+
+    val df = spark.createDataFrame(rows, schema)
+
+    checkError(
+      exception = intercept[AnalysisException] {
+        df.select(to_csv($"value")).collect()
+      },
+      errorClass = "DATATYPE_MISMATCH.UNSUPPORTED_INPUT_TYPE",
+      parameters = Map(
+        "functionName" -> "`to_csv`",
+        "dataType" -> ("\"STRUCT<age: BIGINT, name: STRING, " +
+          "scores: STRUCT<id1: BIGINT, id2: BIGINT, id3: BIGINT>>\""),
+        "sqlExpr" -> "\"to_csv(value)\""),
+      context = ExpectedContext(fragment = "to_csv", 
getCurrentClassCallSitePattern)
+    )
+
+    checkError(
+      exception = intercept[SparkUnsupportedOperationException] {
+        df.select(from_csv(lit("data"), valueSchema, Map.empty[String, 
String])).collect()
+      },
+      errorClass = "UNSUPPORTED_DATATYPE",
+      parameters = Map("typeName" -> "\"STRUCT<id1: BIGINT, id2: BIGINT, id3: 
BIGINT>\"")
+    )
+  }
+
+  test("SPARK-46654: from_csv/to_csv does not support VariantType data") {
+    val rows = new java.util.ArrayList[Row]()
+    rows.add(Row(1L, Row(2L, "Alice", new VariantVal(Array[Byte](1, 2, 3), 
Array[Byte](4, 5)))))
+
+    val valueSchema = StructType(Seq(
+      StructField("age", LongType),
+      StructField("name", StringType),
+      StructField("v", VariantType)))
+    val schema = StructType(Seq(
+      StructField("key", LongType),
+      StructField("value", valueSchema)))
+
+    val df = spark.createDataFrame(rows, schema)
+
+    checkError(
+      exception = intercept[AnalysisException] {
+        df.select(to_csv($"value")).collect()
+      },
+      errorClass = "DATATYPE_MISMATCH.UNSUPPORTED_INPUT_TYPE",
+      parameters = Map(
+        "functionName" -> "`to_csv`",
+        "dataType" -> "\"STRUCT<age: BIGINT, name: STRING, v: VARIANT>\"",
+        "sqlExpr" -> "\"to_csv(value)\""),
+      context = ExpectedContext(fragment = "to_csv", 
getCurrentClassCallSitePattern)
+    )
+
+    checkError(
+      exception = intercept[SparkUnsupportedOperationException] {
+        df.select(from_csv(lit("data"), valueSchema, Map.empty[String, 
String])).collect()
+      },
+      errorClass = "UNSUPPORTED_DATATYPE",
+      parameters = Map("typeName" -> "\"VARIANT\"")
+    )
+  }
+
+  test("SPARK-46654: from_csv/to_csv does not support BinaryType data") {
+    val rows = new java.util.ArrayList[Row]()
+    rows.add(Row(1L, Row(2L, "Alice", "b".getBytes(StandardCharsets.UTF_8))))
+
+    val valueSchema = StructType(Seq(
+      StructField("age", LongType),
+      StructField("name", StringType),
+      StructField("b", BinaryType)))
+    val schema = StructType(Seq(
+      StructField("key", LongType),
+      StructField("value", valueSchema)))
+
+    val df = spark.createDataFrame(rows, schema)
+
+    checkError(
+      exception = intercept[AnalysisException] {
+        df.select(to_csv($"value")).collect()
+      },
+      errorClass = "DATATYPE_MISMATCH.UNSUPPORTED_INPUT_TYPE",
+      parameters = Map(
+        "functionName" -> "`to_csv`",
+        "dataType" -> "\"STRUCT<age: BIGINT, name: STRING, b: BINARY>\"",
+        "sqlExpr" -> "\"to_csv(value)\""),
+      context = ExpectedContext(fragment = "to_csv", 
getCurrentClassCallSitePattern)
+    )
+
+    checkError(
+      exception = intercept[SparkUnsupportedOperationException] {
+        df.select(from_csv(lit("data"), valueSchema, Map.empty[String, 
String])).collect()
+      },
+      errorClass = "UNSUPPORTED_DATATYPE",
+      parameters = Map("typeName" -> "\"BINARY\"")
+    )
+  }
+
+  test("SPARK-46654: from_csv/to_csv does not support NullType data") {
+    val df = Seq(Tuple1(Tuple1(null))).toDF("value")
+    val valueSchema = df.schema
+    val options = Map("nullValue" -> "-")
+
+    checkError(
+      exception = intercept[AnalysisException] {
+        df.select(to_csv($"value", options.asJava)).collect()
+      },
+      errorClass = "DATATYPE_MISMATCH.UNSUPPORTED_INPUT_TYPE",
+      parameters = Map(
+        "functionName" -> "`to_csv`",
+        "dataType" -> "\"STRUCT<_1: VOID>\"",
+        "sqlExpr" -> "\"to_csv(value)\""),
+      context = ExpectedContext(fragment = "to_csv", 
getCurrentClassCallSitePattern)
+    )
+
+    checkError(
+      exception = intercept[SparkUnsupportedOperationException] {
+        df.select(from_csv(lit("data"), valueSchema, options)).collect()
+      },
+      errorClass = "UNSUPPORTED_DATATYPE",
+      parameters = Map("typeName" -> "\"STRUCT<_1: VOID>\"")
+    )
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to