This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 16ac82092bb7 [SPARK-46890][SQL] Fix CSV parsing bug with existence 
default values and column pruning
16ac82092bb7 is described below

commit 16ac82092bb775aafd010e2fb02b7ddc1eceea73
Author: Daniel Tenedorio <daniel.tenedo...@databricks.com>
AuthorDate: Sat Feb 3 08:50:44 2024 +0300

    [SPARK-46890][SQL] Fix CSV parsing bug with existence default values and 
column pruning
    
    ### What changes were proposed in this pull request?
    
    This PR fixes a CSV parsing bug with existence default values and column 
pruning (https://issues.apache.org/jira/browse/SPARK-46890).
    
    The bug fix includes disabling column pruning specifically when checking 
the CSV header schema against the required schema expected by Catalyst. This 
makes the expected schema match what the CSV parser provides, since later we 
also happen instruct the CSV parser to disable column pruning and instead read 
each entire row in order to correctly assign the default value(s) during 
execution.
    
    ### Why are the changes needed?
    
    Before this change, queries from a subset of the columns in a CSV table 
whose `CREATE TABLE` statement contained default values would return an 
internal exception. For example:
    
    ```
    CREATE TABLE IF NOT EXISTS products (
      product_id INT,
      name STRING,
      price FLOAT default 0.0,
      quantity INT default 0
    )
    USING CSV
    OPTIONS (
      header 'true',
      inferSchema 'false',
      enforceSchema 'false',
      path '/Users/maximgekk/tmp/products.csv'
    );
    ```
    
    The CSV file products.csv:
    
    ```
    product_id,name,price,quantity
    1,Apple,0.50,100
    2,Banana,0.25,200
    3,Orange,0.75,50
    ```
    
    The query fails:
    
    ```
    spark-sql (default)> SELECT price FROM products;
    24/01/28 11:43:09 ERROR Executor: Exception in task 0.0 in stage 8.0 (TID 6)
    java.lang.IllegalArgumentException: Number of column in CSV header is not 
equal to number of fields in the schema:
     Header length: 4, schema size: 1
    CSV file: file:///Users/Daniel.Tenedorio/tmp/products.csv
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    This PR adds test coverage.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #44939 from dtenedor/debug-csv-default.
    
    Authored-by: Daniel Tenedorio <daniel.tenedo...@databricks.com>
    Signed-off-by: Max Gekk <max.g...@gmail.com>
---
 .../apache/spark/sql/catalyst/csv/CSVOptions.scala | 15 ++++++++-
 .../spark/sql/catalyst/csv/UnivocityParser.scala   |  4 +--
 .../execution/datasources/csv/CSVFileFormat.scala  |  5 ++-
 .../v2/csv/CSVPartitionReaderFactory.scala         |  6 +++-
 .../sql/execution/datasources/csv/CSVSuite.scala   | 38 ++++++++++++++++++++++
 5 files changed, 62 insertions(+), 6 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala
index c5a6bf5076de..f4ade722791c 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala
@@ -26,8 +26,10 @@ import com.univocity.parsers.csv.{CsvParserSettings, 
CsvWriterSettings, Unescape
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.{DataSourceOptions, FileSourceOptions}
 import org.apache.spark.sql.catalyst.util._
+import 
org.apache.spark.sql.catalyst.util.ResolveDefaultColumnsUtils.EXISTS_DEFAULT_COLUMN_METADATA_KEY
 import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, SQLConf}
+import org.apache.spark.sql.types.StructType
 
 class CSVOptions(
     @transient val parameters: CaseInsensitiveMap[String],
@@ -278,13 +280,24 @@ class CSVOptions(
     .getOrElse(UNESCAPED_QUOTE_HANDLING, 
"STOP_AT_DELIMITER").toUpperCase(Locale.ROOT))
 
   /**
+   * Returns true if column pruning is enabled and there are no existence 
column default values in
+   * the [[schema]].
+   *
    * The column pruning feature can be enabled either via the CSV option 
`columnPruning` or
    * in non-multiline mode via initialization of CSV options by the SQL config:
    * `spark.sql.csv.parser.columnPruning.enabled`.
    * The feature is disabled in the `multiLine` mode because of the issue:
    * https://github.com/uniVocity/univocity-parsers/issues/529
+   *
+   * We disable column pruning when there are any column defaults, instead 
preferring to reach in
+   * each row and then post-process it to substitute the default values after.
    */
-  val isColumnPruningEnabled: Boolean = getBool(COLUMN_PRUNING, !multiLine && 
columnPruning)
+  def isColumnPruningEnabled(schema: StructType): Boolean =
+    isColumnPruningOptionEnabled &&
+      !schema.exists(_.metadata.contains(EXISTS_DEFAULT_COLUMN_METADATA_KEY))
+
+  private val isColumnPruningOptionEnabled: Boolean =
+    getBool(COLUMN_PRUNING, !multiLine && columnPruning)
 
   def asWriterSettings: CsvWriterSettings = {
     val writerSettings = new CsvWriterSettings()
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
index 34a8b3d09047..06057626461b 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
@@ -29,7 +29,6 @@ import org.apache.spark.sql.catalyst.{InternalRow, 
NoopFilters, OrderedFilters}
 import org.apache.spark.sql.catalyst.expressions.{Cast, EmptyRow, ExprUtils, 
GenericInternalRow, Literal}
 import org.apache.spark.sql.catalyst.util._
 import org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT
-import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns._
 import org.apache.spark.sql.errors.{ExecutionErrors, QueryExecutionErrors}
 import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, SQLConf}
 import org.apache.spark.sql.sources.Filter
@@ -71,8 +70,7 @@ class UnivocityParser(
   // positions. Generally assigned by input configuration options, except when 
input column(s) have
   // default values, in which case we omit the explicit indexes in order to 
know how many tokens
   // were present in each line instead.
-  private def columnPruning: Boolean = options.isColumnPruningEnabled &&
-    
!requiredSchema.exists(_.metadata.contains(EXISTS_DEFAULT_COLUMN_METADATA_KEY))
+  private def columnPruning: Boolean = 
options.isColumnPruningEnabled(requiredSchema)
 
   // When column pruning is enabled, the parser only parses the required 
columns based on
   // their positions in the data schema.
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
index 9516a7729481..3338006b7bf5 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
@@ -105,7 +105,7 @@ class CSVFileFormat extends TextBasedFileFormat with 
DataSourceRegister {
       sparkSession.sessionState.conf.csvColumnPruning,
       sparkSession.sessionState.conf.sessionLocalTimeZone,
       sparkSession.sessionState.conf.columnNameOfCorruptRecord)
-    val isColumnPruningEnabled = parsedOptions.isColumnPruningEnabled
+    val isColumnPruningEnabled = 
parsedOptions.isColumnPruningEnabled(requiredSchema)
 
     // Check a field requirement for corrupt records here to throw an 
exception in a driver side
     ExprUtils.verifyColumnNameOfCorruptRecord(dataSchema, 
parsedOptions.columnNameOfCorruptRecord)
@@ -125,6 +125,9 @@ class CSVFileFormat extends TextBasedFileFormat with 
DataSourceRegister {
         actualRequiredSchema,
         parsedOptions,
         actualFilters)
+      // Use column pruning when specified by Catalyst, except when one or 
more columns have
+      // existence default value(s), since in that case we instruct the CSV 
parser to disable column
+      // pruning and instead read each entire row in order to correctly assign 
the default value(s).
       val schema = if (isColumnPruningEnabled) actualRequiredSchema else 
actualDataSchema
       val isStartOfFile = file.start == 0
       val headerChecker = new CSVHeaderChecker(
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVPartitionReaderFactory.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVPartitionReaderFactory.scala
index cef5a71ca9c6..65eff0647ee2 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVPartitionReaderFactory.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVPartitionReaderFactory.scala
@@ -58,7 +58,11 @@ case class CSVPartitionReaderFactory(
       actualReadDataSchema,
       options,
       filters)
-    val schema = if (options.isColumnPruningEnabled) actualReadDataSchema else 
actualDataSchema
+    val schema = if (options.isColumnPruningEnabled(readDataSchema)) {
+      actualReadDataSchema
+    } else {
+      actualDataSchema
+    }
     val isStartOfFile = file.start == 0
     val headerChecker = new CSVHeaderChecker(
       schema, options, source = s"CSV file: ${file.urlEncodedPath}", 
isStartOfFile)
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
index c7f25c633e0b..12a141944609 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
@@ -56,6 +56,7 @@ abstract class CSVSuite
   override protected def dataSourceFormat = "csv"
 
   protected val carsFile = "test-data/cars.csv"
+  protected val productsFile = "test-data/products.csv"
   private val carsMalformedFile = "test-data/cars-malformed.csv"
   private val carsFile8859 = "test-data/cars_iso-8859-1.csv"
   private val carsTsvFile = "test-data/cars.tsv"
@@ -3248,6 +3249,43 @@ abstract class CSVSuite
       }
     }
   }
+
+  test("SPARK-46890: CSV fails on a column with default and without enforcing 
schema") {
+    withTable("CarsTable") {
+      spark.sql(
+        s"""
+           |CREATE TABLE CarsTable(
+           |  year INT,
+           |  make STRING,
+           |  model STRING,
+           |  comment STRING DEFAULT '',
+           |  blank STRING DEFAULT '')
+           |USING csv
+           |OPTIONS (
+           |  header "true",
+           |  inferSchema "false",
+           |  enforceSchema "false",
+           |  path "${testFile(carsFile)}"
+           |)
+       """.stripMargin)
+      val expected = Seq(
+        Row("No comment"),
+        Row("Go get one now they are going fast"))
+      checkAnswer(
+        sql("SELECT comment FROM CarsTable WHERE year < 2014"),
+        expected)
+      checkAnswer(
+        spark.read.format("csv")
+          .options(Map(
+            "header" -> "true",
+            "inferSchema" -> "true",
+            "enforceSchema" -> "false"))
+          .load(testFile(carsFile))
+          .select("comment")
+          .where("year < 2014"),
+        expected)
+    }
+  }
 }
 
 class CSVv1Suite extends CSVSuite {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to