This is an automated email from the ASF dual-hosted git repository.
maxgekk pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.3 by this push:
new d616da74a4e [SPARK-40468][SQL] Fix column pruning in CSV when
_corrupt_record is selected
d616da74a4e is described below
commit d616da74a4ed7202b3480ebc234eb109dcc86fb9
Author: Ivan Sadikov
AuthorDate: Sat Sep 17 10:59:32 2022 +0300
[SPARK-40468][SQL] Fix column pruning in CSV when _corrupt_record is
selected
### What changes were proposed in this pull request?
The PR fixes an issue when depending on the name of the `_corrupt_record`
field, column pruning would behave differently for a record that has no parsing
errors.
For example, with a CSV file like this (c1 and c2 columns):
```
1,a
```
Before the patch, the following query would return:
```scala
val df = spark.read
.schema("c1 int, c2 string, x string, _corrupt_record string")
.csv("file:/tmp/file.csv")
.withColumn("x", lit("A"))
Result:
+---+---+---+---+
|c1 |c2 |x |_corrupt_record|
+---+---+---+---+
|1 |a |A |1,a|
+---+---+---+---+
```
However, if you rename the corrupt record column, the result is different
(the original, arguably correct, behaviour before
https://github.com/apache/spark/commit/959694271e30879c944d7fd5de2740571012460a):
```scala
val df = spark.read
.option("columnNameCorruptRecord", "corrupt_record")
.schema("c1 int, c2 string, x string, corrupt_record string")
.csv("file:/tmp/file.csv") .withColumn("x", lit("A"))
+---+---+---+--+
|c1 |c2 |x |corrupt_record|
+---+---+---+--+
|1 |a |A |null |
+---+---+---+--+
```
This patch fixes the former so both results would return `null` for corrupt
record as there are no parsing issues. Note that
https://issues.apache.org/jira/browse/SPARK-38523 is still fixed and works
correctly.
### Why are the changes needed?
Fixes a bug where corrupt record would be non-null even though the record
has no parsing errors.
### Does this PR introduce _any_ user-facing change?
Yes, fixes the output of corrupt record with additional columns provided by
user. Everything should be unchanged outside of that scenario.
### How was this patch tested?
I added a unit test that reproduces the issue.
Closes #37909 from sadikovi/SPARK-40468.
Authored-by: Ivan Sadikov
Signed-off-by: Max Gekk
(cherry picked from commit 0776f9e7bcb10612eb977ed4884e9848aea86c33)
Signed-off-by: Max Gekk
---
.../execution/datasources/csv/CSVFileFormat.scala | 4 +---
.../sql/execution/datasources/v2/csv/CSVScan.scala | 3 +--
.../sql/execution/datasources/csv/CSVSuite.scala | 26 ++
3 files changed, 28 insertions(+), 5 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
index 8d952507840..93679516a8c 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
@@ -100,8 +100,7 @@ class CSVFileFormat extends TextBasedFileFormat with
DataSourceRegister {
hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] =
{
val broadcastedHadoopConf =
sparkSession.sparkContext.broadcast(new
SerializableConfiguration(hadoopConf))
-val columnPruning = sparkSession.sessionState.conf.csvColumnPruning &&
- !requiredSchema.exists(_.name ==
sparkSession.sessionState.conf.columnNameOfCorruptRecord)
+val columnPruning = sparkSession.sessionState.conf.csvColumnPruning
val parsedOptions = new CSVOptions(
options,
columnPruning,
@@ -154,4 +153,3 @@ class CSVFileFormat extends TextBasedFileFormat with
DataSourceRegister {
}
}
-
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVScan.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVScan.scala
index 5c33a1047a1..d81223b48a5 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVScan.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVScan.scala
@@ -44,8 +44,7 @@ case class CSVScan(
dataFilters: Seq[Expression] = Seq.empty)
extends TextBasedFileScan(sparkSession, options) {
- val columnPruning = sparkSession.sessionState.conf.csvColumnPruning &&
-!readDataSchema.exists(_.name ==
sparkSession.sessionState.conf.columnNa