Repository: spark
Updated Branches:
  refs/heads/master 8cf4a1f02 -> ec1003219


[SPARK-5465] [SQL] Fixes filter push-down for Parquet data source

Not all Catalyst filter expressions can be converted to Parquet filter 
predicates. We should try to convert each individual predicate and then collect 
those convertible ones.

<!-- Reviewable:start -->
[<img src="https://reviewable.io/review_button.png"; height=40 alt="Review on 
Reviewable"/>](https://reviewable.io/reviews/apache/spark/4255)
<!-- Reviewable:end -->

Author: Cheng Lian <l...@databricks.com>

Closes #4255 from liancheng/spark-5465 and squashes the following commits:

14ccd37 [Cheng Lian] Fixes filter push-down for Parquet data source


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ec100321
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ec100321
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ec100321

Branch: refs/heads/master
Commit: ec1003219b8978291abca2fc409ee61b1bb40a38
Parents: 8cf4a1f
Author: Cheng Lian <l...@databricks.com>
Authored: Sun Feb 1 18:52:39 2015 -0800
Committer: Michael Armbrust <mich...@databricks.com>
Committed: Sun Feb 1 18:52:39 2015 -0800

----------------------------------------------------------------------
 .../org/apache/spark/sql/parquet/newParquet.scala | 18 ++++++++++--------
 1 file changed, 10 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ec100321/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
index 1b50afb..1e794ca 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
@@ -20,26 +20,26 @@ import java.util.{List => JList}
 
 import scala.collection.JavaConversions._
 
-import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
 import org.apache.hadoop.conf.{Configurable, Configuration}
+import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
 import org.apache.hadoop.io.Writable
-import org.apache.hadoop.mapreduce.{JobContext, InputSplit, Job}
-
+import org.apache.hadoop.mapreduce.{InputSplit, Job, JobContext}
+import parquet.filter2.predicate.FilterApi
 import parquet.hadoop.ParquetInputFormat
 import parquet.hadoop.util.ContextUtil
 
 import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.{Partition => SparkPartition, Logging}
 import org.apache.spark.rdd.{NewHadoopPartition, RDD}
-import org.apache.spark.sql.{SQLConf, Row, SQLContext}
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
+import org.apache.spark.sql.{Row, SQLConf, SQLContext}
+import org.apache.spark.{Logging, Partition => SparkPartition}
 
 
 /**
  * Allows creation of parquet based tables using the syntax
- * `CREATE TEMPORARY TABLE ... USING org.apache.spark.sql.parquet`.  Currently 
the only option 
+ * `CREATE TEMPORARY TABLE ... USING org.apache.spark.sql.parquet`.  Currently 
the only option
  * required is `path`, which should be the location of a collection of, 
optionally partitioned,
  * parquet files.
  */
@@ -193,10 +193,12 @@ case class ParquetRelation2(path: String)(@transient val 
sqlContext: SQLContext)
       org.apache.hadoop.mapreduce.lib.input.FileInputFormat.setInputPaths(job, 
selectedFiles: _*)
     }
 
-    // Push down filters when possible
+    // Push down filters when possible. Notice that not all filters can be 
converted to Parquet
+    // filter predicate. Here we try to convert each individual predicate and 
only collect those
+    // convertible ones.
     predicates
-      .reduceOption(And)
       .flatMap(ParquetFilters.createFilter)
+      .reduceOption(FilterApi.and)
       .filter(_ => sqlContext.conf.parquetFilterPushDown)
       .foreach(ParquetInputFormat.setFilterPredicate(jobConf, _))
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to