This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new b14abb3a2ed0 [SPARK-48241][SQL] CSV parsing failure with char/varchar type columns b14abb3a2ed0 is described below commit b14abb3a2ed086d2ff8f340f60c0dc1e460c7a59 Author: joey.ljy <joey....@alibaba-inc.com> AuthorDate: Mon May 13 22:42:31 2024 +0800 [SPARK-48241][SQL] CSV parsing failure with char/varchar type columns ### What changes were proposed in this pull request? CSV table containing char and varchar columns will result in the following error when selecting from the CSV table: ``` spark-sql (default)> show create table test_csv; CREATE TABLE default.test_csv ( id INT, name CHAR(10)) USING csv ``` ``` java.lang.IllegalArgumentException: requirement failed: requiredSchema (struct<id:int,name:string>) should be the subset of dataSchema (struct<id:int,name:string>). at scala.Predef$.require(Predef.scala:281) at org.apache.spark.sql.catalyst.csv.UnivocityParser.<init>(UnivocityParser.scala:56) at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat.$anonfun$buildReader$2(CSVFileFormat.scala:127) at org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(FileFormat.scala:155) at org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(FileFormat.scala:140) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:231) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:293) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:125) ``` ### Why are the changes needed? For char and varchar types, Spark will convert them to `StringType` in `CharVarcharUtils.replaceCharVarcharWithStringInSchema` and record `__CHAR_VARCHAR_TYPE_STRING` in the metadata. The reason for the above error is that the `StringType` columns in the `dataSchema` and `requiredSchema` of `UnivocityParser` are not consistent. The `StringType` in the `dataSchema` has metadata, while the metadata in the `requiredSchema` is empty. We need to retain the metadata when resolving schema. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Add a new test case in `CSVSuite`. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #46537 from liujiayi771/csv-char. Authored-by: joey.ljy <joey....@alibaba-inc.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../sql/catalyst/plans/logical/LogicalPlan.scala | 4 +++- sql/core/src/test/resources/test-data/char.csv | 4 ++++ .../sql/execution/datasources/csv/CSVSuite.scala | 24 ++++++++++++++++++++++ 3 files changed, 31 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index b989233da674..98e91585c2a0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -118,7 +118,9 @@ abstract class LogicalPlan def resolve(schema: StructType, resolver: Resolver): Seq[Attribute] = { schema.map { field => resolve(field.name :: Nil, resolver).map { - case a: AttributeReference => a + case a: AttributeReference => + // Keep the metadata in given schema. + a.withMetadata(field.metadata) case _ => throw QueryExecutionErrors.resolveCannotHandleNestedSchema(this) }.getOrElse { throw QueryCompilationErrors.cannotResolveAttributeError( diff --git a/sql/core/src/test/resources/test-data/char.csv b/sql/core/src/test/resources/test-data/char.csv new file mode 100644 index 000000000000..d2be68a15fc1 --- /dev/null +++ b/sql/core/src/test/resources/test-data/char.csv @@ -0,0 +1,4 @@ +color,name +pink,Bob +blue,Mike +grey,Tom diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 22ea133ee19a..0e58b96531da 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -80,6 +80,7 @@ abstract class CSVSuite private val valueMalformedFile = "test-data/value-malformed.csv" private val badAfterGoodFile = "test-data/bad_after_good.csv" private val malformedRowFile = "test-data/malformedRow.csv" + private val charFile = "test-data/char.csv" /** Verifies data and schema. */ private def verifyCars( @@ -3342,6 +3343,29 @@ abstract class CSVSuite expected) } } + + test("SPARK-48241: CSV parsing failure with char/varchar type columns") { + withTable("charVarcharTable") { + spark.sql( + s""" + |CREATE TABLE charVarcharTable( + | color char(4), + | name varchar(10)) + |USING csv + |OPTIONS ( + | header "true", + | path "${testFile(charFile)}" + |) + """.stripMargin) + val expected = Seq( + Row("pink", "Bob"), + Row("blue", "Mike"), + Row("grey", "Tom")) + checkAnswer( + sql("SELECT * FROM charVarcharTable"), + expected) + } + } } class CSVv1Suite extends CSVSuite { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org