[spark] branch master updated (4f8cc999b15 -> 0776f9e7bcb)

2022-09-17 Thread maxgekk
This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from 4f8cc999b15 [SPARK-40436][BUILD] Upgrade Scala to 2.12.17
 add 0776f9e7bcb [SPARK-40468][SQL] Fix column pruning in CSV when 
_corrupt_record is selected

No new revisions were added by this update.

Summary of changes:
 .../execution/datasources/csv/CSVFileFormat.scala  |  4 +---
 .../sql/execution/datasources/v2/csv/CSVScan.scala |  3 +--
 .../sql/execution/datasources/csv/CSVSuite.scala   | 26 ++
 3 files changed, 28 insertions(+), 5 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.3 updated: [SPARK-40468][SQL] Fix column pruning in CSV when _corrupt_record is selected

2022-09-17 Thread maxgekk
This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
 new d616da74a4e [SPARK-40468][SQL] Fix column pruning in CSV when 
_corrupt_record is selected
d616da74a4e is described below

commit d616da74a4ed7202b3480ebc234eb109dcc86fb9
Author: Ivan Sadikov 
AuthorDate: Sat Sep 17 10:59:32 2022 +0300

[SPARK-40468][SQL] Fix column pruning in CSV when _corrupt_record is 
selected

### What changes were proposed in this pull request?

The PR fixes an issue when depending on the name of the `_corrupt_record` 
field, column pruning would behave differently for a record that has no parsing 
errors.

For example, with a CSV file like this (c1 and c2 columns):
```
1,a
```

Before the patch, the following query would return:
```scala
val df = spark.read
  .schema("c1 int, c2 string, x string, _corrupt_record string")
  .csv("file:/tmp/file.csv")
  .withColumn("x", lit("A"))

Result:

+---+---+---+---+
|c1 |c2 |x  |_corrupt_record|
+---+---+---+---+
|1  |a  |A  |1,a|
+---+---+---+---+
```

However, if you rename the corrupt record column, the result is different 
(the original, arguably correct, behaviour before 
https://github.com/apache/spark/commit/959694271e30879c944d7fd5de2740571012460a):

```scala
val df = spark.read
  .option("columnNameCorruptRecord", "corrupt_record")
  .schema("c1 int, c2 string, x string, corrupt_record string")
  .csv("file:/tmp/file.csv") .withColumn("x", lit("A"))

+---+---+---+--+
|c1 |c2 |x  |corrupt_record|
+---+---+---+--+
|1  |a  |A  |null  |
+---+---+---+--+
```

This patch fixes the former so both results would return `null` for corrupt 
record as there are no parsing issues. Note that 
https://issues.apache.org/jira/browse/SPARK-38523 is still fixed and works 
correctly.

### Why are the changes needed?

Fixes a bug where corrupt record would be non-null even though the record 
has no parsing errors.

### Does this PR introduce _any_ user-facing change?

Yes, fixes the output of corrupt record with additional columns provided by 
user. Everything should be unchanged outside of that scenario.

### How was this patch tested?

I added a unit test that reproduces the issue.

Closes #37909 from sadikovi/SPARK-40468.

Authored-by: Ivan Sadikov 
Signed-off-by: Max Gekk 
(cherry picked from commit 0776f9e7bcb10612eb977ed4884e9848aea86c33)
Signed-off-by: Max Gekk 
---
 .../execution/datasources/csv/CSVFileFormat.scala  |  4 +---
 .../sql/execution/datasources/v2/csv/CSVScan.scala |  3 +--
 .../sql/execution/datasources/csv/CSVSuite.scala   | 26 ++
 3 files changed, 28 insertions(+), 5 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
index 8d952507840..93679516a8c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
@@ -100,8 +100,7 @@ class CSVFileFormat extends TextBasedFileFormat with 
DataSourceRegister {
   hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = 
{
 val broadcastedHadoopConf =
   sparkSession.sparkContext.broadcast(new 
SerializableConfiguration(hadoopConf))
-val columnPruning = sparkSession.sessionState.conf.csvColumnPruning &&
-  !requiredSchema.exists(_.name == 
sparkSession.sessionState.conf.columnNameOfCorruptRecord)
+val columnPruning = sparkSession.sessionState.conf.csvColumnPruning
 val parsedOptions = new CSVOptions(
   options,
   columnPruning,
@@ -154,4 +153,3 @@ class CSVFileFormat extends TextBasedFileFormat with 
DataSourceRegister {
   }
 
 }
-
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVScan.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVScan.scala
index 5c33a1047a1..d81223b48a5 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVScan.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVScan.scala
@@ -44,8 +44,7 @@ case class CSVScan(
 dataFilters: Seq[Expression] = Seq.empty)
   extends TextBasedFileScan(sparkSession, options) {
 
-  val columnPruning = sparkSession.sessionState.conf.csvColumnPruning &&
-!readDataSchema.exists(_.name == 
sparkSession.sessionState.conf.columnNa