This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new bc51c9fea364 [SPARK-46862][SQL][FOLLOWUP] Fix column pruning without 
schema enforcing in V1 CSV datasource
bc51c9fea364 is described below

commit bc51c9fea3645c6ae1d9e1e83b0f94f8b849be20
Author: Max Gekk <max.g...@gmail.com>
AuthorDate: Sat Jan 27 19:22:52 2024 +0300

    [SPARK-46862][SQL][FOLLOWUP] Fix column pruning without schema enforcing in 
V1 CSV datasource
    
    ### What changes were proposed in this pull request?
    In the PR, I propose to invoke `CSVOptons.isColumnPruningEnabled` 
introduced by https://github.com/apache/spark/pull/44872 while matching of CSV 
header to a schema in the V1 CSV datasource.
    
    ### Why are the changes needed?
    To fix the failure when column pruning happens and a schema is not enforced:
    ```scala
    scala> spark.read.
         | option("multiLine", true).
         | option("header", true).
         | option("escape", "\"").
         | option("enforceSchema", false).
         | csv("/Users/maximgekk/tmp/es-939111-data.csv").
         | count()
    24/01/27 12:43:14 ERROR Executor: Exception in task 0.0 in stage 3.0 (TID 3)
    java.lang.IllegalArgumentException: Number of column in CSV header is not 
equal to number of fields in the schema:
     Header length: 4, schema size: 0
    CSV file: file:///Users/maximgekk/tmp/es-939111-data.csv
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    By running the affected test suites:
    ```
    $ build/sbt "test:testOnly *CSVv1Suite"
    $ build/sbt "test:testOnly *CSVv2Suite"
    $ build/sbt "test:testOnly *CSVLegacyTimeParserSuite"
    $ build/sbt "testOnly *.CsvFunctionsSuite"
    ```
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No.
    
    Closes #44910 from MaxGekk/check-header-column-pruning.
    
    Authored-by: Max Gekk <max.g...@gmail.com>
    Signed-off-by: Max Gekk <max.g...@gmail.com>
---
 .../sql/execution/datasources/csv/CSVFileFormat.scala     |  6 +++---
 .../spark/sql/execution/datasources/csv/CSVSuite.scala    | 15 +++++++++------
 2 files changed, 12 insertions(+), 9 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
index 32370562003f..9516a7729481 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
@@ -100,12 +100,12 @@ class CSVFileFormat extends TextBasedFileFormat with 
DataSourceRegister {
       hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = 
{
     val broadcastedHadoopConf =
       sparkSession.sparkContext.broadcast(new 
SerializableConfiguration(hadoopConf))
-    val columnPruning = sparkSession.sessionState.conf.csvColumnPruning
     val parsedOptions = new CSVOptions(
       options,
-      columnPruning,
+      sparkSession.sessionState.conf.csvColumnPruning,
       sparkSession.sessionState.conf.sessionLocalTimeZone,
       sparkSession.sessionState.conf.columnNameOfCorruptRecord)
+    val isColumnPruningEnabled = parsedOptions.isColumnPruningEnabled
 
     // Check a field requirement for corrupt records here to throw an 
exception in a driver side
     ExprUtils.verifyColumnNameOfCorruptRecord(dataSchema, 
parsedOptions.columnNameOfCorruptRecord)
@@ -125,7 +125,7 @@ class CSVFileFormat extends TextBasedFileFormat with 
DataSourceRegister {
         actualRequiredSchema,
         parsedOptions,
         actualFilters)
-      val schema = if (columnPruning) actualRequiredSchema else 
actualDataSchema
+      val schema = if (isColumnPruningEnabled) actualRequiredSchema else 
actualDataSchema
       val isStartOfFile = file.start == 0
       val headerChecker = new CSVHeaderChecker(
         schema, parsedOptions, source = s"CSV file: ${file.urlEncodedPath}", 
isStartOfFile)
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
index 2ec9e1086b92..c7f25c633e0b 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
@@ -3237,12 +3237,15 @@ abstract class CSVSuite
 
     withTempPath { path =>
       Files.write(path.toPath, data.getBytes(StandardCharsets.UTF_8))
-      val df = spark.read
-        .option("multiline", "true")
-        .option("header", "true")
-        .option("escape", "\"")
-        .csv(path.getCanonicalPath)
-      assert(df.count() === 5)
+      Seq(true, false).foreach { enforceSchema =>
+        val df = spark.read
+          .option("multiLine", true)
+          .option("header", true)
+          .option("escape", "\"")
+          .option("enforceSchema", enforceSchema)
+          .csv(path.getCanonicalPath)
+        assert(df.count() === 5)
+      }
     }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to