Repository: spark
Updated Branches:
  refs/heads/branch-1.6 df0231952 -> 1dc71ec77


[SPARK-12218][SQL] Invalid splitting of nested AND expressions in Data Source 
filter API

JIRA: https://issues.apache.org/jira/browse/SPARK-12218

When creating filters for Parquet/ORC, we should not push nested AND 
expressions partially.

Author: Yin Huai <yh...@databricks.com>

Closes #10362 from yhuai/SPARK-12218.

(cherry picked from commit 41ee7c57abd9f52065fd7ffb71a8af229603371d)
Signed-off-by: Yin Huai <yh...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1dc71ec7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1dc71ec7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1dc71ec7

Branch: refs/heads/branch-1.6
Commit: 1dc71ec777ff7cac5d3d7adb13f2d63ffe8909b6
Parents: df02319
Author: Yin Huai <yh...@databricks.com>
Authored: Fri Dec 18 10:52:14 2015 -0800
Committer: Yin Huai <yh...@databricks.com>
Committed: Fri Dec 18 10:53:31 2015 -0800

----------------------------------------------------------------------
 .../datasources/parquet/ParquetFilters.scala    | 12 ++++++++++-
 .../parquet/ParquetFilterSuite.scala            | 19 +++++++++++++++++
 .../apache/spark/sql/hive/orc/OrcFilters.scala  | 22 +++++++++-----------
 .../sql/hive/orc/OrcHadoopFsRelationSuite.scala | 20 ++++++++++++++++++
 4 files changed, 60 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/1dc71ec7/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
index 0771432..883013b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
@@ -257,7 +257,17 @@ private[sql] object ParquetFilters {
         makeGtEq.lift(dataTypeOf(name)).map(_(name, value))
 
       case sources.And(lhs, rhs) =>
-        (createFilter(schema, lhs) ++ createFilter(schema, 
rhs)).reduceOption(FilterApi.and)
+        // At here, it is not safe to just convert one side if we do not 
understand the
+        // other side. Here is an example used to explain the reason.
+        // Let's say we have NOT(a = 2 AND b in ('1')) and we do not 
understand how to
+        // convert b in ('1'). If we only convert a = 2, we will end up with a 
filter
+        // NOT(a = 2), which will generate wrong results.
+        // Pushing one side of AND down is only safe to do at the top level.
+        // You can see ParquetRelation's initializeLocalJobFunc method as an 
example.
+        for {
+          lhsFilter <- createFilter(schema, lhs)
+          rhsFilter <- createFilter(schema, rhs)
+        } yield FilterApi.and(lhsFilter, rhsFilter)
 
       case sources.Or(lhs, rhs) =>
         for {

http://git-wip-us.apache.org/repos/asf/spark/blob/1dc71ec7/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
index cc5aae0..ed5a352 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
@@ -364,4 +364,23 @@ class ParquetFilterSuite extends QueryTest with 
ParquetTest with SharedSQLContex
       }
     }
   }
+
+  test("SPARK-12218: 'Not' is included in Parquet filter pushdown") {
+    import testImplicits._
+
+    withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true") {
+      withTempPath { dir =>
+        val path = s"${dir.getCanonicalPath}/table1"
+        (1 to 5).map(i => (i, (i % 2).toString)).toDF("a", 
"b").write.parquet(path)
+
+        checkAnswer(
+          sqlContext.read.parquet(path).where("not (a = 2) or not(b in 
('1'))"),
+          (1 to 5).map(i => Row(i, (i % 2).toString)))
+
+        checkAnswer(
+          sqlContext.read.parquet(path).where("not (a = 2 and b in ('1'))"),
+          (1 to 5).map(i => Row(i, (i % 2).toString)))
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/1dc71ec7/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala
index 27193f5..ebfb175 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala
@@ -74,22 +74,20 @@ private[orc] object OrcFilters extends Logging {
 
     expression match {
       case And(left, right) =>
-        val tryLeft = buildSearchArgument(left, newBuilder)
-        val tryRight = buildSearchArgument(right, newBuilder)
-
-        val conjunction = for {
-          _ <- tryLeft
-          _ <- tryRight
+        // At here, it is not safe to just convert one side if we do not 
understand the
+        // other side. Here is an example used to explain the reason.
+        // Let's say we have NOT(a = 2 AND b in ('1')) and we do not 
understand how to
+        // convert b in ('1'). If we only convert a = 2, we will end up with a 
filter
+        // NOT(a = 2), which will generate wrong results.
+        // Pushing one side of AND down is only safe to do at the top level.
+        // You can see ParquetRelation's initializeLocalJobFunc method as an 
example.
+        for {
+          _ <- buildSearchArgument(left, newBuilder)
+          _ <- buildSearchArgument(right, newBuilder)
           lhs <- buildSearchArgument(left, builder.startAnd())
           rhs <- buildSearchArgument(right, lhs)
         } yield rhs.end()
 
-        // For filter `left AND right`, we can still push down `left` even if 
`right` is not
-        // convertible, and vice versa.
-        conjunction
-          .orElse(tryLeft.flatMap(_ => buildSearchArgument(left, builder)))
-          .orElse(tryRight.flatMap(_ => buildSearchArgument(right, builder)))
-
       case Or(left, right) =>
         for {
           _ <- buildSearchArgument(left, newBuilder)

http://git-wip-us.apache.org/repos/asf/spark/blob/1dc71ec7/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala
index 92043d6..e8a6112 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.hive.orc
 import org.apache.hadoop.fs.Path
 
 import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.sql.{Row, SQLConf}
 import org.apache.spark.sql.sources.HadoopFsRelationTest
 import org.apache.spark.sql.types._
 
@@ -60,4 +61,23 @@ class OrcHadoopFsRelationSuite extends HadoopFsRelationTest {
           "dataSchema" -> 
dataSchemaWithPartition.json)).format(dataSourceName).load())
     }
   }
+
+  test("SPARK-12218: 'Not' is included in ORC filter pushdown") {
+    import testImplicits._
+
+    withSQLConf(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> "true") {
+      withTempPath { dir =>
+        val path = s"${dir.getCanonicalPath}/table1"
+        (1 to 5).map(i => (i, (i % 2).toString)).toDF("a", "b").write.orc(path)
+
+        checkAnswer(
+          sqlContext.read.orc(path).where("not (a = 2) or not(b in ('1'))"),
+          (1 to 5).map(i => Row(i, (i % 2).toString)))
+
+        checkAnswer(
+          sqlContext.read.orc(path).where("not (a = 2 and b in ('1'))"),
+          (1 to 5).map(i => Row(i, (i % 2).toString)))
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to