spark git commit: [SPARK-5296] [SQL] Add more filter types for data sources API
Repository: spark Updated Branches: refs/heads/master b4d7c7032 - 6f54dee66 [SPARK-5296] [SQL] Add more filter types for data sources API This PR adds the following filter types for data sources API: - `IsNull` - `IsNotNull` - `Not` - `And` - `Or` The code which converts Catalyst predicate expressions to data sources filters is very similar to filter conversion logics in `ParquetFilters` which converts Catalyst predicates to Parquet filter predicates. In this way we can support nested AND/OR/NOT predicates without changing current `BaseScan` type hierarchy. !-- Reviewable:start -- [img src=https://reviewable.io/review_button.png; height=40 alt=Review on Reviewable/](https://reviewable.io/reviews/apache/spark/4623) !-- Reviewable:end -- Author: Cheng Lian l...@databricks.com This patch had conflicts when merged, resolved by Committer: Michael Armbrust mich...@databricks.com Closes #4623 from liancheng/more-fiters and squashes the following commits: 1b296f4 [Cheng Lian] Add more filter types for data sources API Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6f54dee6 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6f54dee6 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6f54dee6 Branch: refs/heads/master Commit: 6f54dee66100e5e58f6649158db257eb5009bd6a Parents: b4d7c70 Author: Cheng Lian l...@databricks.com Authored: Mon Feb 16 12:48:55 2015 -0800 Committer: Michael Armbrust mich...@databricks.com Committed: Mon Feb 16 12:48:55 2015 -0800 -- .../scala/org/apache/spark/sql/SQLContext.scala | 9 ++- .../apache/spark/sql/parquet/newParquet.scala | 5 +- .../spark/sql/sources/DataSourceStrategy.scala | 81 ++-- .../org/apache/spark/sql/sources/filters.scala | 5 ++ .../spark/sql/sources/FilteredScanSuite.scala | 34 +++- 5 files changed, 103 insertions(+), 31 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/6f54dee6/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index b42a52e..1442250 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -28,16 +28,16 @@ import scala.reflect.runtime.universe.TypeTag import org.apache.spark.annotation.{AlphaComponent, DeveloperApi, Experimental} import org.apache.spark.api.java.{JavaRDD, JavaSparkContext} import org.apache.spark.rdd.RDD -import org.apache.spark.sql.catalyst.ScalaReflection import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.optimizer.{DefaultOptimizer, Optimizer} import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan, NoRelation} import org.apache.spark.sql.catalyst.rules.RuleExecutor -import org.apache.spark.sql.execution._ +import org.apache.spark.sql.catalyst.{ScalaReflection, expressions} +import org.apache.spark.sql.execution.{Filter, _} import org.apache.spark.sql.jdbc.{JDBCPartition, JDBCPartitioningInfo, JDBCRelation} import org.apache.spark.sql.json._ -import org.apache.spark.sql.sources.{BaseRelation, DDLParser, DataSourceStrategy, LogicalRelation, _} +import org.apache.spark.sql.sources._ import org.apache.spark.sql.types._ import org.apache.spark.util.Utils import org.apache.spark.{Partition, SparkContext} @@ -867,7 +867,8 @@ class SQLContext(@transient val sparkContext: SparkContext) val projectSet = AttributeSet(projectList.flatMap(_.references)) val filterSet = AttributeSet(filterPredicates.flatMap(_.references)) - val filterCondition = prunePushedDownFilters(filterPredicates).reduceLeftOption(And) + val filterCondition = + prunePushedDownFilters(filterPredicates).reduceLeftOption(expressions.And) // Right now we still use a projection even if the only evaluation is applying an alias // to a column. Since this is a no-op, it could be avoided. However, using this http://git-wip-us.apache.org/repos/asf/spark/blob/6f54dee6/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala index 9279f5a..9bb34e2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala @@ -32,6 +32,7 @@ import org.apache.hadoop.io.Writable import
spark git commit: [SPARK-5296] [SQL] Add more filter types for data sources API
Repository: spark Updated Branches: refs/heads/branch-1.3 0368494c5 - 363a9a7d5 [SPARK-5296] [SQL] Add more filter types for data sources API This PR adds the following filter types for data sources API: - `IsNull` - `IsNotNull` - `Not` - `And` - `Or` The code which converts Catalyst predicate expressions to data sources filters is very similar to filter conversion logics in `ParquetFilters` which converts Catalyst predicates to Parquet filter predicates. In this way we can support nested AND/OR/NOT predicates without changing current `BaseScan` type hierarchy. !-- Reviewable:start -- [img src=https://reviewable.io/review_button.png; height=40 alt=Review on Reviewable/](https://reviewable.io/reviews/apache/spark/4623) !-- Reviewable:end -- Author: Cheng Lian l...@databricks.com This patch had conflicts when merged, resolved by Committer: Michael Armbrust mich...@databricks.com Closes #4623 from liancheng/more-fiters and squashes the following commits: 1b296f4 [Cheng Lian] Add more filter types for data sources API Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/363a9a7d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/363a9a7d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/363a9a7d Branch: refs/heads/branch-1.3 Commit: 363a9a7d5ad682f828288f792a836c2c0b5e2f89 Parents: 0368494 Author: Cheng Lian l...@databricks.com Authored: Mon Feb 16 12:48:55 2015 -0800 Committer: Michael Armbrust mich...@databricks.com Committed: Mon Feb 16 12:50:10 2015 -0800 -- .../scala/org/apache/spark/sql/SQLContext.scala | 9 ++- .../apache/spark/sql/parquet/newParquet.scala | 5 +- .../spark/sql/sources/DataSourceStrategy.scala | 81 ++-- .../org/apache/spark/sql/sources/filters.scala | 5 ++ .../spark/sql/sources/FilteredScanSuite.scala | 34 +++- 5 files changed, 103 insertions(+), 31 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/363a9a7d/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index b42a52e..1442250 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -28,16 +28,16 @@ import scala.reflect.runtime.universe.TypeTag import org.apache.spark.annotation.{AlphaComponent, DeveloperApi, Experimental} import org.apache.spark.api.java.{JavaRDD, JavaSparkContext} import org.apache.spark.rdd.RDD -import org.apache.spark.sql.catalyst.ScalaReflection import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.optimizer.{DefaultOptimizer, Optimizer} import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan, NoRelation} import org.apache.spark.sql.catalyst.rules.RuleExecutor -import org.apache.spark.sql.execution._ +import org.apache.spark.sql.catalyst.{ScalaReflection, expressions} +import org.apache.spark.sql.execution.{Filter, _} import org.apache.spark.sql.jdbc.{JDBCPartition, JDBCPartitioningInfo, JDBCRelation} import org.apache.spark.sql.json._ -import org.apache.spark.sql.sources.{BaseRelation, DDLParser, DataSourceStrategy, LogicalRelation, _} +import org.apache.spark.sql.sources._ import org.apache.spark.sql.types._ import org.apache.spark.util.Utils import org.apache.spark.{Partition, SparkContext} @@ -867,7 +867,8 @@ class SQLContext(@transient val sparkContext: SparkContext) val projectSet = AttributeSet(projectList.flatMap(_.references)) val filterSet = AttributeSet(filterPredicates.flatMap(_.references)) - val filterCondition = prunePushedDownFilters(filterPredicates).reduceLeftOption(And) + val filterCondition = + prunePushedDownFilters(filterPredicates).reduceLeftOption(expressions.And) // Right now we still use a projection even if the only evaluation is applying an alias // to a column. Since this is a no-op, it could be avoided. However, using this http://git-wip-us.apache.org/repos/asf/spark/blob/363a9a7d/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala index 9279f5a..9bb34e2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala @@ -32,6 +32,7 @@ import org.apache.hadoop.io.Writable import