Repository: spark
Updated Branches:
  refs/heads/branch-2.2 db821fe55 -> 8b0cb3a7b


[SPARK-20364][SQL] Disable Parquet predicate pushdown for fields having dots in 
the names

## What changes were proposed in this pull request?

This is an alternative workaround by simply avoiding the predicate pushdown for 
columns having dots in the names. This is an approach different with 
https://github.com/apache/spark/pull/17680.

The downside of this PR is, literally it does not push down filters on the 
column having dots in Parquet files at all (both no record level and no 
rowgroup level) whereas the downside of the approach in that PR, it does not 
use the Parquet's API properly but in a hacky way to support this case.

I assume we prefer a safe way here by using the Parquet API properly but this 
does close that PR as we are basically just avoiding here.

This way looks a simple workaround and probably it is fine given the problem 
looks arguably rather corner cases (although it might end up with reading whole 
row groups under the hood but either looks not the best).

Currently, if there are dots in the column name, predicate pushdown seems being 
failed in Parquet.

**With dots**

```scala
val path = "/tmp/abcde"
Seq(Some(1), None).toDF("col.dots").write.parquet(path)
spark.read.parquet(path).where("`col.dots` IS NOT NULL").show()
```

```
+--------+
|col.dots|
+--------+
+--------+
```

**Without dots**

```scala
val path = "/tmp/abcde"
Seq(Some(1), None).toDF("coldots").write.parquet(path)
spark.read.parquet(path).where("`coldots` IS NOT NULL").show()
```

```
+-------+
|coldots|
+-------+
|      1|
+-------+
```

**After**

```scala
val path = "/tmp/abcde"
Seq(Some(1), None).toDF("col.dots").write.parquet(path)
spark.read.parquet(path).where("`col.dots` IS NOT NULL").show()
```

```
+--------+
|col.dots|
+--------+
|       1|
+--------+
```

## How was this patch tested?

Unit tests added in `ParquetFilterSuite`.

Author: hyukjinkwon <gurwls...@gmail.com>

Closes #18000 from HyukjinKwon/SPARK-20364-workaround.

(cherry picked from commit 8fb3d5c6da30922458091837eec17ccca502098a)
Signed-off-by: Xiao Li <gatorsm...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8b0cb3a7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8b0cb3a7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8b0cb3a7

Branch: refs/heads/branch-2.2
Commit: 8b0cb3a7be138d0f2059731ed4bbd8d01f599497
Parents: db821fe
Author: hyukjinkwon <gurwls...@gmail.com>
Authored: Thu May 18 10:52:23 2017 -0700
Committer: Xiao Li <gatorsm...@gmail.com>
Committed: Thu May 18 10:52:32 2017 -0700

----------------------------------------------------------------------
 .../datasources/parquet/ParquetFilters.scala    | 57 +++++++++++---------
 .../parquet/ParquetFilterSuite.scala            | 15 ++++++
 2 files changed, 47 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/8b0cb3a7/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
index a6a6cef..763841e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
@@ -166,7 +166,14 @@ private[parquet] object ParquetFilters {
    * Converts data sources filters to Parquet filter predicates.
    */
   def createFilter(schema: StructType, predicate: sources.Filter): 
Option[FilterPredicate] = {
-    val dataTypeOf = getFieldMap(schema)
+    val nameToType = getFieldMap(schema)
+
+    // Parquet does not allow dots in the column name because dots are used as 
a column path
+    // delimiter. Since Parquet 1.8.2 (PARQUET-389), Parquet accepts the 
filter predicates
+    // with missing columns. The incorrect results could be got from Parquet 
when we push down
+    // filters for the column having dots in the names. Thus, we do not push 
down such filters.
+    // See SPARK-20364.
+    def canMakeFilterOn(name: String): Boolean = nameToType.contains(name) && 
!name.contains(".")
 
     // NOTE:
     //
@@ -184,30 +191,30 @@ private[parquet] object ParquetFilters {
     // Probably I missed something and obviously this should be changed.
 
     predicate match {
-      case sources.IsNull(name) if dataTypeOf.contains(name) =>
-        makeEq.lift(dataTypeOf(name)).map(_(name, null))
-      case sources.IsNotNull(name) if dataTypeOf.contains(name) =>
-        makeNotEq.lift(dataTypeOf(name)).map(_(name, null))
-
-      case sources.EqualTo(name, value) if dataTypeOf.contains(name) =>
-        makeEq.lift(dataTypeOf(name)).map(_(name, value))
-      case sources.Not(sources.EqualTo(name, value)) if 
dataTypeOf.contains(name) =>
-        makeNotEq.lift(dataTypeOf(name)).map(_(name, value))
-
-      case sources.EqualNullSafe(name, value) if dataTypeOf.contains(name) =>
-        makeEq.lift(dataTypeOf(name)).map(_(name, value))
-      case sources.Not(sources.EqualNullSafe(name, value)) if 
dataTypeOf.contains(name) =>
-        makeNotEq.lift(dataTypeOf(name)).map(_(name, value))
-
-      case sources.LessThan(name, value) if dataTypeOf.contains(name) =>
-        makeLt.lift(dataTypeOf(name)).map(_(name, value))
-      case sources.LessThanOrEqual(name, value) if dataTypeOf.contains(name) =>
-        makeLtEq.lift(dataTypeOf(name)).map(_(name, value))
-
-      case sources.GreaterThan(name, value) if dataTypeOf.contains(name) =>
-        makeGt.lift(dataTypeOf(name)).map(_(name, value))
-      case sources.GreaterThanOrEqual(name, value) if 
dataTypeOf.contains(name) =>
-        makeGtEq.lift(dataTypeOf(name)).map(_(name, value))
+      case sources.IsNull(name) if canMakeFilterOn(name) =>
+        makeEq.lift(nameToType(name)).map(_(name, null))
+      case sources.IsNotNull(name) if canMakeFilterOn(name) =>
+        makeNotEq.lift(nameToType(name)).map(_(name, null))
+
+      case sources.EqualTo(name, value) if canMakeFilterOn(name) =>
+        makeEq.lift(nameToType(name)).map(_(name, value))
+      case sources.Not(sources.EqualTo(name, value)) if canMakeFilterOn(name) 
=>
+        makeNotEq.lift(nameToType(name)).map(_(name, value))
+
+      case sources.EqualNullSafe(name, value) if canMakeFilterOn(name) =>
+        makeEq.lift(nameToType(name)).map(_(name, value))
+      case sources.Not(sources.EqualNullSafe(name, value)) if 
canMakeFilterOn(name) =>
+        makeNotEq.lift(nameToType(name)).map(_(name, value))
+
+      case sources.LessThan(name, value) if canMakeFilterOn(name) =>
+        makeLt.lift(nameToType(name)).map(_(name, value))
+      case sources.LessThanOrEqual(name, value) if canMakeFilterOn(name) =>
+        makeLtEq.lift(nameToType(name)).map(_(name, value))
+
+      case sources.GreaterThan(name, value) if canMakeFilterOn(name) =>
+        makeGt.lift(nameToType(name)).map(_(name, value))
+      case sources.GreaterThanOrEqual(name, value) if canMakeFilterOn(name) =>
+        makeGtEq.lift(nameToType(name)).map(_(name, value))
 
       case sources.And(lhs, rhs) =>
         // At here, it is not safe to just convert one side if we do not 
understand the

http://git-wip-us.apache.org/repos/asf/spark/blob/8b0cb3a7/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
index dd53b56..98427cf 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
@@ -538,6 +538,21 @@ class ParquetFilterSuite extends QueryTest with 
ParquetTest with SharedSQLContex
       // scalastyle:on nonascii
     }
   }
+
+  test("SPARK-20364: Disable Parquet predicate pushdown for fields having dots 
in the names") {
+    import testImplicits._
+
+    Seq(true, false).foreach { vectorized =>
+      withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> 
vectorized.toString,
+          SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> true.toString) {
+        withTempPath { path =>
+          Seq(Some(1), 
None).toDF("col.dots").write.parquet(path.getAbsolutePath)
+          val readBack = 
spark.read.parquet(path.getAbsolutePath).where("`col.dots` IS NOT NULL")
+          assert(readBack.count() == 1)
+        }
+      }
+    }
+  }
 }
 
 class NumRowGroupsAcc extends AccumulatorV2[Integer, Integer] {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to