Repository: spark
Updated Branches:
  refs/heads/branch-2.1 16d4bd4a2 -> af12a21ca


[SPARK-18753][SQL] Keep pushed-down null literal as a filter in Spark-side 
post-filter for FileFormat datasources

## What changes were proposed in this pull request?

Currently, `FileSourceStrategy` does not handle the case when the pushed-down 
filter is `Literal(null)` and removes it at the post-filter in Spark-side.

For example, the codes below:

```scala
val df = Seq(Tuple1(Some(true)), Tuple1(None), Tuple1(Some(false))).toDF()
df.filter($"_1" === "true").explain(true)
```

shows it keeps `null` properly.

```
== Parsed Logical Plan ==
'Filter ('_1 = true)
+- LocalRelation [_1#17]

== Analyzed Logical Plan ==
_1: boolean
Filter (cast(_1#17 as double) = cast(true as double))
+- LocalRelation [_1#17]

== Optimized Logical Plan ==
Filter (isnotnull(_1#17) && null)
+- LocalRelation [_1#17]

== Physical Plan ==
*Filter (isnotnull(_1#17) && null)       << Here `null` is there
+- LocalTableScan [_1#17]
```

However, when we read it back from Parquet,

```scala
val path = "/tmp/testfile"
df.write.parquet(path)
spark.read.parquet(path).filter($"_1" === "true").explain(true)
```

`null` is removed at the post-filter.

```
== Parsed Logical Plan ==
'Filter ('_1 = true)
+- Relation[_1#11] parquet

== Analyzed Logical Plan ==
_1: boolean
Filter (cast(_1#11 as double) = cast(true as double))
+- Relation[_1#11] parquet

== Optimized Logical Plan ==
Filter (isnotnull(_1#11) && null)
+- Relation[_1#11] parquet

== Physical Plan ==
*Project [_1#11]
+- *Filter isnotnull(_1#11)       << Here `null` is missing
   +- *FileScan parquet [_1#11] Batched: true, Format: ParquetFormat, Location: 
InMemoryFileIndex[file:/tmp/testfile], PartitionFilters: [null], PushedFilters: 
[IsNotNull(_1)], ReadSchema: struct<_1:boolean>
```

This PR fixes it to keep it properly. In more details,

```scala
val partitionKeyFilters =
  ExpressionSet(normalizedFilters.filter(_.references.subsetOf(partitionSet)))
```

This keeps this `null` in `partitionKeyFilters` as `Literal` always don't have 
`children` and `references` is being empty  which is always the subset of 
`partitionSet`.

And then in

```scala
val afterScanFilters = filterSet -- partitionKeyFilters
```

`null` is always removed from the post filter. So, if the referenced fields are 
empty, it should be applied into data columns too.

After this PR, it becomes as below:

```
== Parsed Logical Plan ==
'Filter ('_1 = true)
+- Relation[_1#276] parquet

== Analyzed Logical Plan ==
_1: boolean
Filter (cast(_1#276 as double) = cast(true as double))
+- Relation[_1#276] parquet

== Optimized Logical Plan ==
Filter (isnotnull(_1#276) && null)
+- Relation[_1#276] parquet

== Physical Plan ==
*Project [_1#276]
+- *Filter (isnotnull(_1#276) && null)
   +- *FileScan parquet [_1#276] Batched: true, Format: ParquetFormat, 
Location: 
InMemoryFileIndex[file:/private/var/folders/9j/gf_c342d7d150mwrxvkqnc180000gn/T/spark-a5d59bdb-5b...,
 PartitionFilters: [null], PushedFilters: [IsNotNull(_1)], ReadSchema: 
struct<_1:boolean>
```

## How was this patch tested?

Unit test in `FileSourceStrategySuite`

Author: hyukjinkwon <gurwls...@gmail.com>

Closes #16184 from HyukjinKwon/SPARK-18753.

(cherry picked from commit 89ae26dcdb73266fbc3a8b6da9f5dff30dc4ec95)
Signed-off-by: Cheng Lian <l...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/af12a21c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/af12a21c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/af12a21c

Branch: refs/heads/branch-2.1
Commit: af12a21ca7145751acdec400134b1bd5c8168f74
Parents: 16d4bd4
Author: hyukjinkwon <gurwls...@gmail.com>
Authored: Wed Dec 14 11:29:11 2016 -0800
Committer: Cheng Lian <l...@databricks.com>
Committed: Wed Dec 14 11:29:23 2016 -0800

----------------------------------------------------------------------
 .../sql/execution/datasources/FileSourceStrategy.scala   |  2 +-
 .../execution/datasources/FileSourceStrategySuite.scala  | 11 +++++++++++
 2 files changed, 12 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/af12a21c/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
index 55ca4f1..ead3233 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
@@ -86,7 +86,7 @@ object FileSourceStrategy extends Strategy with Logging {
       val dataFilters = 
normalizedFilters.filter(_.references.intersect(partitionSet).isEmpty)
 
       // Predicates with both partition keys and attributes need to be 
evaluated after the scan.
-      val afterScanFilters = filterSet -- partitionKeyFilters
+      val afterScanFilters = filterSet -- 
partitionKeyFilters.filter(_.references.nonEmpty)
       logInfo(s"Post-Scan Filters: ${afterScanFilters.mkString(",")}")
 
       val filterAttributes = AttributeSet(afterScanFilters)

http://git-wip-us.apache.org/repos/asf/spark/blob/af12a21c/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
index d900ce7..f361628 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
@@ -476,6 +476,17 @@ class FileSourceStrategySuite extends QueryTest with 
SharedSQLContext with Predi
     }
   }
 
+  test("[SPARK-18753] keep pushed-down null literal as a filter in Spark-side 
post-filter") {
+    val ds = Seq(Tuple1(Some(true)), Tuple1(None), Tuple1(Some(false))).toDS()
+    withTempPath { p =>
+      val path = p.getAbsolutePath
+      ds.write.parquet(path)
+      val readBack = spark.read.parquet(path).filter($"_1" === "true")
+      val filtered = ds.filter($"_1" === "true").toDF()
+      checkAnswer(readBack, filtered)
+    }
+  }
+
   // Helpers for checking the arguments passed to the FileFormat.
 
   protected val checkPartitionSchema =


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to