Repository: spark
Updated Branches:
  refs/heads/branch-1.6 57f281c1a -> 83906411c


[SPARK-11661][SQL] Still pushdown filters returned by unhandledFilters.

https://issues.apache.org/jira/browse/SPARK-11661

Author: Yin Huai <yh...@databricks.com>

Closes #9634 from yhuai/unhandledFilters.

(cherry picked from commit 14cf753704ea60f358cb870b018cbcf73654f198)
Signed-off-by: Cheng Lian <l...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/83906411
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/83906411
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/83906411

Branch: refs/heads/branch-1.6
Commit: 83906411cd3143b7eb8f0997ff630c441087509f
Parents: 57f281c
Author: Yin Huai <yh...@databricks.com>
Authored: Thu Nov 12 16:47:00 2015 +0800
Committer: Cheng Lian <l...@databricks.com>
Committed: Thu Nov 12 16:47:20 2015 +0800

----------------------------------------------------------------------
 .../datasources/DataSourceStrategy.scala        | 15 ++++++--
 .../apache/spark/sql/sources/interfaces.scala   |  8 ++--
 .../parquet/ParquetFilterSuite.scala            | 25 +++++++++++++
 .../spark/sql/sources/FilteredScanSuite.scala   | 39 +++++++++++++-------
 .../SimpleTextHadoopFsRelationSuite.scala       |  8 ++--
 5 files changed, 71 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/83906411/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 7265d6a..d7c01b6 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -453,8 +453,8 @@ private[sql] object DataSourceStrategy extends Strategy 
with Logging {
    *
    * @return A pair of `Seq[Expression]` and `Seq[Filter]`. The first element 
contains all Catalyst
    *         predicate [[Expression]]s that are either not convertible or 
cannot be handled by
-   *         `relation`. The second element contains all converted data source 
[[Filter]]s that can
-   *        be handled by `relation`.
+   *         `relation`. The second element contains all converted data source 
[[Filter]]s that
+   *         will be pushed down to the data source.
    */
   protected[sql] def selectFilters(
     relation: BaseRelation,
@@ -476,7 +476,9 @@ private[sql] object DataSourceStrategy extends Strategy 
with Logging {
     // Catalyst predicate expressions that cannot be translated to data source 
filters.
     val unrecognizedPredicates = predicates.filterNot(translatedMap.contains)
 
-    // Data source filters that cannot be handled by `relation`
+    // Data source filters that cannot be handled by `relation`. The semantic 
of a unhandled filter
+    // at here is that a data source may not be able to apply this filter to 
every row
+    // of the underlying dataset.
     val unhandledFilters = 
relation.unhandledFilters(translatedMap.values.toArray).toSet
 
     val (unhandled, handled) = translated.partition {
@@ -491,6 +493,11 @@ private[sql] object DataSourceStrategy extends Strategy 
with Logging {
     // Translated data source filters that can be handled by `relation`
     val (_, handledFilters) = handled.unzip
 
-    (unrecognizedPredicates ++ unhandledPredicates, handledFilters)
+    // translated contains all filters that have been converted to the public 
Filter interface.
+    // We should always push them to the data source no matter whether the 
data source can apply
+    // a filter to every row or not.
+    val (_, translatedFilters) = translated.unzip
+
+    (unrecognizedPredicates ++ unhandledPredicates, translatedFilters)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/83906411/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index 48de693..2be6cd4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -235,9 +235,11 @@ abstract class BaseRelation {
   def needConversion: Boolean = true
 
   /**
-   * Given an array of [[Filter]]s, returns an array of [[Filter]]s that this 
data source relation
-   * cannot handle.  Spark SQL will apply all returned [[Filter]]s against 
rows returned by this
-   * data source relation.
+   * Returns the list of [[Filter]]s that this datasource may not be able to 
handle.
+   * These returned [[Filter]]s will be evaluated by Spark SQL after data is 
output by a scan.
+   * By default, this function will return all filters, as it is always safe to
+   * double evaluate a [[Filter]]. However, specific implementations can 
override this function to
+   * avoid double filtering when they are capable of processing a filter 
internally.
    *
    * @since 1.6.0
    */

http://git-wip-us.apache.org/repos/asf/spark/blob/83906411/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
index 579dabf..2ac87ad 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
@@ -336,4 +336,29 @@ class ParquetFilterSuite extends QueryTest with 
ParquetTest with SharedSQLContex
       }
     }
   }
+
+  test("SPARK-11661 Still pushdown filters returned by unhandledFilters") {
+    import testImplicits._
+    withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true") {
+      withTempPath { dir =>
+        val path = s"${dir.getCanonicalPath}/part=1"
+        (1 to 3).map(i => (i, i.toString)).toDF("a", "b").write.parquet(path)
+        val df = sqlContext.read.parquet(path).filter("a = 2")
+
+        // This is the source RDD without Spark-side filtering.
+        val childRDD =
+          df
+            .queryExecution
+            .executedPlan.asInstanceOf[org.apache.spark.sql.execution.Filter]
+            .child
+            .execute()
+
+        // The result should be single row.
+        // When a filter is pushed to Parquet, Parquet can apply it to every 
row.
+        // So, we can check the number of rows returned from the Parquet
+        // to make sure our filter pushdown work.
+        assert(childRDD.count == 1)
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/83906411/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala
index 2cad964..398b8a1 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala
@@ -254,7 +254,11 @@ class FilteredScanSuite extends DataSourceTest with 
SharedSQLContext with Predic
   testPushDown("SELECT * FROM oneToTenFiltered WHERE a IN (1,3,5)", 3, 
Set("a", "b", "c"))
 
   testPushDown("SELECT * FROM oneToTenFiltered WHERE a = 20", 0, Set("a", "b", 
"c"))
-  testPushDown("SELECT * FROM oneToTenFiltered WHERE b = 1", 10, Set("a", "b", 
"c"))
+  testPushDown(
+    "SELECT * FROM oneToTenFiltered WHERE b = 1",
+    10,
+    Set("a", "b", "c"),
+    Set(EqualTo("b", 1)))
 
   testPushDown("SELECT * FROM oneToTenFiltered WHERE a < 5 AND a > 1", 3, 
Set("a", "b", "c"))
   testPushDown("SELECT * FROM oneToTenFiltered WHERE a < 3 OR a > 8", 4, 
Set("a", "b", "c"))
@@ -283,12 +287,23 @@ class FilteredScanSuite extends DataSourceTest with 
SharedSQLContext with Predic
       | WHERE a + b > 9
       |   AND b < 16
       |   AND c IN ('bbbbbBBBBB', 'cccccCCCCC', 'dddddDDDDD', 'foo')
-    """.stripMargin.split("\n").map(_.trim).mkString(" "), 3, Set("a", "b"))
+    """.stripMargin.split("\n").map(_.trim).mkString(" "),
+    3,
+    Set("a", "b"),
+    Set(LessThan("b", 16)))
 
   def testPushDown(
-      sqlString: String,
-      expectedCount: Int,
-      requiredColumnNames: Set[String]): Unit = {
+    sqlString: String,
+    expectedCount: Int,
+    requiredColumnNames: Set[String]): Unit = {
+    testPushDown(sqlString, expectedCount, requiredColumnNames, 
Set.empty[Filter])
+  }
+
+  def testPushDown(
+    sqlString: String,
+    expectedCount: Int,
+    requiredColumnNames: Set[String],
+    expectedUnhandledFilters: Set[Filter]): Unit = {
     test(s"PushDown Returns $expectedCount: $sqlString") {
       val queryExecution = sql(sqlString).queryExecution
       val rawPlan = queryExecution.executedPlan.collect {
@@ -300,15 +315,13 @@ class FilteredScanSuite extends DataSourceTest with 
SharedSQLContext with Predic
       val rawCount = rawPlan.execute().count()
       assert(ColumnsRequired.set === requiredColumnNames)
 
-      assert {
-        val table = caseInsensitiveContext.table("oneToTenFiltered")
-        val relation = table.queryExecution.logical.collectFirst {
-          case LogicalRelation(r, _) => r
-        }.get
+      val table = caseInsensitiveContext.table("oneToTenFiltered")
+      val relation = table.queryExecution.logical.collectFirst {
+        case LogicalRelation(r, _) => r
+      }.get
 
-        // `relation` should be able to handle all pushed filters
-        relation.unhandledFilters(FiltersPushed.list.toArray).isEmpty
-      }
+      assert(
+        relation.unhandledFilters(FiltersPushed.list.toArray).toSet === 
expectedUnhandledFilters)
 
       if (rawCount != expectedCount) {
         fail(

http://git-wip-us.apache.org/repos/asf/spark/blob/83906411/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala
index 9251a69..81af684 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala
@@ -248,7 +248,7 @@ class SimpleTextHadoopFsRelationSuite extends 
HadoopFsRelationTest with Predicat
     projections = Seq('c, 'p),
     filter = 'a < 3 && 'p > 0,
     requiredColumns = Seq("c", "a"),
-    pushedFilters = Nil,
+    pushedFilters = Seq(LessThan("a", 3)),
     inconvertibleFilters = Nil,
     unhandledFilters = Seq('a < 3),
     partitioningFilters = Seq('p > 0)
@@ -327,7 +327,7 @@ class SimpleTextHadoopFsRelationSuite extends 
HadoopFsRelationTest with Predicat
     projections = Seq('b, 'p),
     filter = 'c > "val_7" && 'b < 18 && 'p > 0,
     requiredColumns = Seq("b"),
-    pushedFilters = Seq(GreaterThan("c", "val_7")),
+    pushedFilters = Seq(GreaterThan("c", "val_7"), LessThan("b", 18)),
     inconvertibleFilters = Nil,
     unhandledFilters = Seq('b < 18),
     partitioningFilters = Seq('p > 0)
@@ -344,7 +344,7 @@ class SimpleTextHadoopFsRelationSuite extends 
HadoopFsRelationTest with Predicat
     projections = Seq('b, 'p),
     filter = 'a % 2 === 0 && 'c > "val_7" && 'b < 18 && 'p > 0,
     requiredColumns = Seq("b", "a"),
-    pushedFilters = Seq(GreaterThan("c", "val_7")),
+    pushedFilters = Seq(GreaterThan("c", "val_7"), LessThan("b", 18)),
     inconvertibleFilters = Seq('a % 2 === 0),
     unhandledFilters = Seq('b < 18),
     partitioningFilters = Seq('p > 0)
@@ -361,7 +361,7 @@ class SimpleTextHadoopFsRelationSuite extends 
HadoopFsRelationTest with Predicat
     projections = Seq('b, 'p),
     filter = 'a > 7 && 'a < 9,
     requiredColumns = Seq("b", "a"),
-    pushedFilters = Seq(GreaterThan("a", 7)),
+    pushedFilters = Seq(GreaterThan("a", 7), LessThan("a", 9)),
     inconvertibleFilters = Nil,
     unhandledFilters = Seq('a < 9),
     partitioningFilters = Nil


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to