Repository: spark
Updated Branches:
  refs/heads/master 784fcd532 -> 71a0d40eb


[SPARK-6554] [SQL] Don't push down predicates which reference partition 
column(s)

There are two cases for the new Parquet data source:

1. Partition columns exist in the Parquet data files

   We don't need to push-down these predicates since partition pruning already 
handles them.

1. Partition columns don't exist in the Parquet data files

   We can't push-down these predicates since they are considered as invalid 
columns by Parquet.

<!-- Reviewable:start -->
[<img src="https://reviewable.io/review_button.png"; height=40 alt="Review on 
Reviewable"/>](https://reviewable.io/reviews/apache/spark/5210)
<!-- Reviewable:end -->

Author: Cheng Lian <l...@databricks.com>

Closes #5210 from liancheng/spark-6554 and squashes the following commits:

4f7ec03 [Cheng Lian] Adds comments
e134ced [Cheng Lian] Don't push down predicates which reference partition 
column(s)


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/71a0d40e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/71a0d40e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/71a0d40e

Branch: refs/heads/master
Commit: 71a0d40ebd37c80d8020e184366778b57c762285
Parents: 784fcd5
Author: Cheng Lian <l...@databricks.com>
Authored: Thu Mar 26 13:11:37 2015 -0700
Committer: Michael Armbrust <mich...@databricks.com>
Committed: Thu Mar 26 13:11:37 2015 -0700

----------------------------------------------------------------------
 .../org/apache/spark/sql/parquet/newParquet.scala  | 17 ++++++++++++-----
 .../spark/sql/parquet/ParquetFilterSuite.scala     | 17 +++++++++++++++++
 2 files changed, 29 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/71a0d40e/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
index 3516cfe..0d68810 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
@@ -435,11 +435,18 @@ private[sql] case class ParquetRelation2(
     // Push down filters when possible. Notice that not all filters can be 
converted to Parquet
     // filter predicate. Here we try to convert each individual predicate and 
only collect those
     // convertible ones.
-    predicates
-      .flatMap(ParquetFilters.createFilter)
-      .reduceOption(FilterApi.and)
-      .filter(_ => sqlContext.conf.parquetFilterPushDown)
-      .foreach(ParquetInputFormat.setFilterPredicate(jobConf, _))
+    if (sqlContext.conf.parquetFilterPushDown) {
+      predicates
+        // Don't push down predicates which reference partition columns
+        .filter { pred =>
+          val partitionColNames = partitionColumns.map(_.name).toSet
+          val referencedColNames = pred.references.map(_.name).toSet
+          referencedColNames.intersect(partitionColNames).isEmpty
+        }
+        .flatMap(ParquetFilters.createFilter)
+        .reduceOption(FilterApi.and)
+        .foreach(ParquetInputFormat.setFilterPredicate(jobConf, _))
+    }
 
     if (isPartitioned) {
       logInfo {

http://git-wip-us.apache.org/repos/asf/spark/blob/71a0d40e/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala
index 4d32e84..6a2c2a7 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala
@@ -321,6 +321,23 @@ class ParquetDataSourceOnFilterSuite extends 
ParquetFilterSuiteBase with BeforeA
   override protected def afterAll(): Unit = {
     sqlContext.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, 
originalConf.toString)
   }
+
+  test("SPARK-6554: don't push down predicates which reference partition 
columns") {
+    import sqlContext.implicits._
+
+    withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED -> "true") {
+      withTempPath { dir =>
+        val path = s"${dir.getCanonicalPath}/part=1"
+        (1 to 3).map(i => (i, i.toString)).toDF("a", 
"b").saveAsParquetFile(path)
+
+        // If the "part = 1" filter gets pushed down, this query will throw an 
exception since
+        // "part" is not a valid column in the actual Parquet file
+        checkAnswer(
+          sqlContext.parquetFile(path).filter("part = 1"),
+          (1 to 3).map(i => Row(i, i.toString, 1)))
+      }
+    }
+  }
 }
 
 class ParquetDataSourceOffFilterSuite extends ParquetFilterSuiteBase with 
BeforeAndAfterAll {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to