This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
     new 113ca5132f3c [SPARK-46862][SQL] Disable CSV column pruning in the 
multi-line mode
113ca5132f3c is described below

commit 113ca5132f3c252105b2a344193fbdad612d93e7
Author: Max Gekk <max.g...@gmail.com>
AuthorDate: Fri Jan 26 11:02:15 2024 +0300

    [SPARK-46862][SQL] Disable CSV column pruning in the multi-line mode
    
    ### What changes were proposed in this pull request?
    In the PR, I propose to disable the column pruning feature in the CSV 
datasource for the `multiLine` mode.
    
    ### Why are the changes needed?
    To workaround the issue in the `uniVocity` parser used by the CSV 
datasource: https://github.com/uniVocity/univocity-parsers/issues/529
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    By running the affected test suites:
    ```
    $ build/sbt "test:testOnly *CSVv1Suite"
    $ build/sbt "test:testOnly *CSVv2Suite"
    $ build/sbt "test:testOnly *CSVLegacyTimeParserSuite"
    $ build/sbt "testOnly *.CsvFunctionsSuite"
    ```
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No.
    
    Closes #44872 from MaxGekk/csv-disable-column-pruning.
    
    Authored-by: Max Gekk <max.g...@gmail.com>
    Signed-off-by: Max Gekk <max.g...@gmail.com>
    (cherry picked from commit 829e742df8251c6f5e965cb08ad454ac3ee1a389)
    Signed-off-by: Max Gekk <max.g...@gmail.com>
---
 .../apache/spark/sql/catalyst/csv/CSVOptions.scala | 10 +++++++++
 .../spark/sql/catalyst/csv/UnivocityParser.scala   |  2 +-
 .../v2/csv/CSVPartitionReaderFactory.scala         |  2 +-
 .../sql/execution/datasources/csv/CSVSuite.scala   | 25 +++++++++++++++++++++-
 4 files changed, 36 insertions(+), 3 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala
index 1a9de5bc35ed..72382daf2760 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala
@@ -274,6 +274,15 @@ class CSVOptions(
   val unescapedQuoteHandling: UnescapedQuoteHandling = 
UnescapedQuoteHandling.valueOf(parameters
     .getOrElse(UNESCAPED_QUOTE_HANDLING, 
"STOP_AT_DELIMITER").toUpperCase(Locale.ROOT))
 
+  /**
+   * The column pruning feature can be enabled either via the CSV option 
`columnPruning` or
+   * in non-multiline mode via initialization of CSV options by the SQL config:
+   * `spark.sql.csv.parser.columnPruning.enabled`.
+   * The feature is disabled in the `multiLine` mode because of the issue:
+   * https://github.com/uniVocity/univocity-parsers/issues/529
+   */
+  val isColumnPruningEnabled: Boolean = getBool(COLUMN_PRUNING, !multiLine && 
columnPruning)
+
   def asWriterSettings: CsvWriterSettings = {
     val writerSettings = new CsvWriterSettings()
     val format = writerSettings.getFormat
@@ -373,4 +382,5 @@ object CSVOptions extends DataSourceOptions {
   val SEP = "sep"
   val DELIMITER = "delimiter"
   newOption(SEP, DELIMITER)
+  val COLUMN_PRUNING = newOption("columnPruning")
 }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
index b58649da61c0..59b2857f6b60 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
@@ -72,7 +72,7 @@ class UnivocityParser(
   // positions. Generally assigned by input configuration options, except when 
input column(s) have
   // default values, in which case we omit the explicit indexes in order to 
know how many tokens
   // were present in each line instead.
-  private def columnPruning: Boolean = options.columnPruning &&
+  private def columnPruning: Boolean = options.isColumnPruningEnabled &&
     
!requiredSchema.exists(_.metadata.contains(EXISTS_DEFAULT_COLUMN_METADATA_KEY))
 
   // When column pruning is enabled, the parser only parses the required 
columns based on
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVPartitionReaderFactory.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVPartitionReaderFactory.scala
index 37f6ae4aaa9f..cef5a71ca9c6 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVPartitionReaderFactory.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVPartitionReaderFactory.scala
@@ -58,7 +58,7 @@ case class CSVPartitionReaderFactory(
       actualReadDataSchema,
       options,
       filters)
-    val schema = if (options.columnPruning) actualReadDataSchema else 
actualDataSchema
+    val schema = if (options.isColumnPruningEnabled) actualReadDataSchema else 
actualDataSchema
     val isStartOfFile = file.start == 0
     val headerChecker = new CSVHeaderChecker(
       schema, options, source = s"CSV file: ${file.urlEncodedPath}", 
isStartOfFile)
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
index d27d94546807..b3fb9effebca 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
@@ -2069,6 +2069,7 @@ abstract class CSVSuite
             .option("header", true)
             .option("enforceSchema", false)
             .option("multiLine", multiLine)
+            .option("columnPruning", true)
             .load(dir)
             .select("columnA"),
             Row("a"))
@@ -2079,6 +2080,7 @@ abstract class CSVSuite
             .option("header", true)
             .option("enforceSchema", false)
             .option("multiLine", multiLine)
+            .option("columnPruning", true)
             .load(dir)
             .count() === 1L)
         }
@@ -3102,7 +3104,7 @@ abstract class CSVSuite
   }
 
   test("SPARK-40667: validate CSV Options") {
-    assert(CSVOptions.getAllOptions.size == 38)
+    assert(CSVOptions.getAllOptions.size == 39)
     // Please add validation on any new CSV options here
     assert(CSVOptions.isValidOption("header"))
     assert(CSVOptions.isValidOption("inferSchema"))
@@ -3142,6 +3144,7 @@ abstract class CSVSuite
     assert(CSVOptions.isValidOption("codec"))
     assert(CSVOptions.isValidOption("sep"))
     assert(CSVOptions.isValidOption("delimiter"))
+    assert(CSVOptions.isValidOption("columnPruning"))
     // Please add validation on any new parquet options with alternative here
     assert(CSVOptions.getAlternativeOption("sep").contains("delimiter"))
     assert(CSVOptions.getAlternativeOption("delimiter").contains("sep"))
@@ -3151,6 +3154,26 @@ abstract class CSVSuite
     assert(CSVOptions.getAlternativeOption("codec").contains("compression"))
     assert(CSVOptions.getAlternativeOption("preferDate").isEmpty)
   }
+
+  test("SPARK-46862: column pruning in the multi-line mode") {
+    val data =
+      """"jobID","Name","City","Active"
+        |"1","DE","","Yes"
+        |"5",",","",","
+        |"3","SA","","No"
+        |"10","abcd""efgh"" \ndef","",""
+        |"8","SE","","No"""".stripMargin
+
+    withTempPath { path =>
+      Files.write(path.toPath, data.getBytes(StandardCharsets.UTF_8))
+      val df = spark.read
+        .option("multiline", "true")
+        .option("header", "true")
+        .option("escape", "\"")
+        .csv(path.getCanonicalPath)
+      assert(df.count() === 5)
+    }
+  }
 }
 
 class CSVv1Suite extends CSVSuite {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to