Repository: spark Updated Branches: refs/heads/master 8cf4a1f02 -> ec1003219
[SPARK-5465] [SQL] Fixes filter push-down for Parquet data source Not all Catalyst filter expressions can be converted to Parquet filter predicates. We should try to convert each individual predicate and then collect those convertible ones. <!-- Reviewable:start --> [<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/4255) <!-- Reviewable:end --> Author: Cheng Lian <l...@databricks.com> Closes #4255 from liancheng/spark-5465 and squashes the following commits: 14ccd37 [Cheng Lian] Fixes filter push-down for Parquet data source Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ec100321 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ec100321 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ec100321 Branch: refs/heads/master Commit: ec1003219b8978291abca2fc409ee61b1bb40a38 Parents: 8cf4a1f Author: Cheng Lian <l...@databricks.com> Authored: Sun Feb 1 18:52:39 2015 -0800 Committer: Michael Armbrust <mich...@databricks.com> Committed: Sun Feb 1 18:52:39 2015 -0800 ---------------------------------------------------------------------- .../org/apache/spark/sql/parquet/newParquet.scala | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/ec100321/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala index 1b50afb..1e794ca 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala @@ -20,26 +20,26 @@ import java.util.{List => JList} import scala.collection.JavaConversions._ -import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} import org.apache.hadoop.conf.{Configurable, Configuration} +import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} import org.apache.hadoop.io.Writable -import org.apache.hadoop.mapreduce.{JobContext, InputSplit, Job} - +import org.apache.hadoop.mapreduce.{InputSplit, Job, JobContext} +import parquet.filter2.predicate.FilterApi import parquet.hadoop.ParquetInputFormat import parquet.hadoop.util.ContextUtil import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.{Partition => SparkPartition, Logging} import org.apache.spark.rdd.{NewHadoopPartition, RDD} -import org.apache.spark.sql.{SQLConf, Row, SQLContext} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.sources._ import org.apache.spark.sql.types.{IntegerType, StructField, StructType} +import org.apache.spark.sql.{Row, SQLConf, SQLContext} +import org.apache.spark.{Logging, Partition => SparkPartition} /** * Allows creation of parquet based tables using the syntax - * `CREATE TEMPORARY TABLE ... USING org.apache.spark.sql.parquet`. Currently the only option + * `CREATE TEMPORARY TABLE ... USING org.apache.spark.sql.parquet`. Currently the only option * required is `path`, which should be the location of a collection of, optionally partitioned, * parquet files. */ @@ -193,10 +193,12 @@ case class ParquetRelation2(path: String)(@transient val sqlContext: SQLContext) org.apache.hadoop.mapreduce.lib.input.FileInputFormat.setInputPaths(job, selectedFiles: _*) } - // Push down filters when possible + // Push down filters when possible. Notice that not all filters can be converted to Parquet + // filter predicate. Here we try to convert each individual predicate and only collect those + // convertible ones. predicates - .reduceOption(And) .flatMap(ParquetFilters.createFilter) + .reduceOption(FilterApi.and) .filter(_ => sqlContext.conf.parquetFilterPushDown) .foreach(ParquetInputFormat.setFilterPredicate(jobConf, _)) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org