Repository: spark Updated Branches: refs/heads/master 56775571c -> f07e71406
[SPARK-6625][SQL] Add common string filters to data sources. Filters such as startsWith, endsWith, contains will be very useful for data sources that provide search functionality, e.g. Succinct, Elastic Search, Solr. I also took this chance to improve documentation for the data source filters. Author: Reynold Xin <r...@databricks.com> Closes #5285 from rxin/ds-string-filters and squashes the following commits: f021727 [Reynold Xin] Fixed grammar. 7695a52 [Reynold Xin] [SPARK-6625][SQL] Add common string filters to data sources. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f07e7140 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f07e7140 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f07e7140 Branch: refs/heads/master Commit: f07e714062f02feadff10a45f9b9061444bb8ec5 Parents: 5677557 Author: Reynold Xin <r...@databricks.com> Authored: Tue Mar 31 00:19:51 2015 -0700 Committer: Reynold Xin <r...@databricks.com> Committed: Tue Mar 31 00:19:51 2015 -0700 ---------------------------------------------------------------------- .../spark/sql/sources/DataSourceStrategy.scala | 10 +++ .../org/apache/spark/sql/sources/filters.scala | 69 ++++++++++++++++++ .../apache/spark/sql/sources/interfaces.scala | 3 + .../spark/sql/sources/FilteredScanSuite.scala | 73 ++++++++++++++------ 4 files changed, 133 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/f07e7140/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala index 67f3507..83b603a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala @@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.types.StringType import org.apache.spark.sql.{Row, Strategy, execution, sources} /** @@ -166,6 +167,15 @@ private[sql] object DataSourceStrategy extends Strategy { case expressions.Not(child) => translate(child).map(sources.Not) + case expressions.StartsWith(a: Attribute, Literal(v: String, StringType)) => + Some(sources.StringStartsWith(a.name, v)) + + case expressions.EndsWith(a: Attribute, Literal(v: String, StringType)) => + Some(sources.StringEndsWith(a.name, v)) + + case expressions.EndsWith(a: Attribute, Literal(v: String, StringType)) => + Some(sources.StringContains(a.name, v)) + case _ => None } http://git-wip-us.apache.org/repos/asf/spark/blob/f07e7140/sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala index 1e4505e..791046e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala @@ -17,16 +17,85 @@ package org.apache.spark.sql.sources +/** + * A filter predicate for data sources. + */ abstract class Filter +/** + * A filter that evaluates to `true` iff the attribute evaluates to a value + * equal to `value`. + */ case class EqualTo(attribute: String, value: Any) extends Filter + +/** + * A filter that evaluates to `true` iff the attribute evaluates to a value + * greater than `value`. + */ case class GreaterThan(attribute: String, value: Any) extends Filter + +/** + * A filter that evaluates to `true` iff the attribute evaluates to a value + * greater than or equal to `value`. + */ case class GreaterThanOrEqual(attribute: String, value: Any) extends Filter + +/** + * A filter that evaluates to `true` iff the attribute evaluates to a value + * less than `value`. + */ case class LessThan(attribute: String, value: Any) extends Filter + +/** + * A filter that evaluates to `true` iff the attribute evaluates to a value + * less than or equal to `value`. + */ case class LessThanOrEqual(attribute: String, value: Any) extends Filter + +/** + * A filter that evaluates to `true` iff the attribute evaluates to one of the values in the array. + */ case class In(attribute: String, values: Array[Any]) extends Filter + +/** + * A filter that evaluates to `true` iff the attribute evaluates to null. + */ case class IsNull(attribute: String) extends Filter + +/** + * A filter that evaluates to `true` iff the attribute evaluates to a non-null value. + */ case class IsNotNull(attribute: String) extends Filter + +/** + * A filter that evaluates to `true` iff both `left` or `right` evaluate to `true`. + */ case class And(left: Filter, right: Filter) extends Filter + +/** + * A filter that evaluates to `true` iff at least one of `left` or `right` evaluates to `true`. + */ case class Or(left: Filter, right: Filter) extends Filter + +/** + * A filter that evaluates to `true` iff `child` is evaluated to `false`. + */ case class Not(child: Filter) extends Filter + +/** + * A filter that evaluates to `true` iff the attribute evaluates to + * a string that starts with `value`. + */ +case class StringStartsWith(attribute: String, value: String) extends Filter + +/** + * A filter that evaluates to `true` iff the attribute evaluates to + * a string that starts with `value`. + */ +case class StringEndsWith(attribute: String, value: String) extends Filter + +/** + * A filter that evaluates to `true` iff the attribute evaluates to + * a string that contains the string `value`. + */ +case class StringContains(attribute: String, value: String) extends Filter http://git-wip-us.apache.org/repos/asf/spark/blob/f07e7140/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala index a046a48..8f9946a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala @@ -152,6 +152,9 @@ trait PrunedScan { * A BaseRelation that can eliminate unneeded columns and filter using selected * predicates before producing an RDD containing all matching tuples as Row objects. * + * The actual filter should be the conjunction of all `filters`, + * i.e. they should be "and" together. + * * The pushed down filters are currently purely an optimization as they will all be evaluated * again. This means it is safe to use them with methods that produce false positives such * as filtering partitions based on a bloom filter. http://git-wip-us.apache.org/repos/asf/spark/blob/f07e7140/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala index ffeccf0..72ddc0e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala @@ -35,20 +35,23 @@ case class SimpleFilteredScan(from: Int, to: Int)(@transient val sqlContext: SQL extends BaseRelation with PrunedFilteredScan { - override def schema = + override def schema: StructType = StructType( StructField("a", IntegerType, nullable = false) :: - StructField("b", IntegerType, nullable = false) :: Nil) + StructField("b", IntegerType, nullable = false) :: + StructField("c", StringType, nullable = false) :: Nil) override def buildScan(requiredColumns: Array[String], filters: Array[Filter]) = { val rowBuilders = requiredColumns.map { case "a" => (i: Int) => Seq(i) case "b" => (i: Int) => Seq(i * 2) + case "c" => (i: Int) => Seq((i - 1 + 'a').toChar.toString * 10) } FiltersPushed.list = filters - def translateFilter(filter: Filter): Int => Boolean = filter match { + // Predicate test on integer column + def translateFilterOnA(filter: Filter): Int => Boolean = filter match { case EqualTo("a", v) => (a: Int) => a == v case LessThan("a", v: Int) => (a: Int) => a < v case LessThanOrEqual("a", v: Int) => (a: Int) => a <= v @@ -57,13 +60,27 @@ case class SimpleFilteredScan(from: Int, to: Int)(@transient val sqlContext: SQL case In("a", values) => (a: Int) => values.map(_.asInstanceOf[Int]).toSet.contains(a) case IsNull("a") => (a: Int) => false // Int can't be null case IsNotNull("a") => (a: Int) => true - case Not(pred) => (a: Int) => !translateFilter(pred)(a) - case And(left, right) => (a: Int) => translateFilter(left)(a) && translateFilter(right)(a) - case Or(left, right) => (a: Int) => translateFilter(left)(a) || translateFilter(right)(a) + case Not(pred) => (a: Int) => !translateFilterOnA(pred)(a) + case And(left, right) => (a: Int) => + translateFilterOnA(left)(a) && translateFilterOnA(right)(a) + case Or(left, right) => (a: Int) => + translateFilterOnA(left)(a) || translateFilterOnA(right)(a) case _ => (a: Int) => true } - def eval(a: Int) = !filters.map(translateFilter(_)(a)).contains(false) + // Predicate test on string column + def translateFilterOnC(filter: Filter): String => Boolean = filter match { + case StringStartsWith("c", v) => _.startsWith(v) + case StringEndsWith("c", v) => _.endsWith(v) + case StringContains("c", v) => _.contains(v) + case _ => (c: String) => true + } + + def eval(a: Int) = { + val c = (a - 1 + 'a').toChar.toString * 10 + !filters.map(translateFilterOnA(_)(a)).contains(false) && + !filters.map(translateFilterOnC(_)(c)).contains(false) + } sqlContext.sparkContext.parallelize(from to to).filter(eval).map(i => Row.fromSeq(rowBuilders.map(_(i)).reduceOption(_ ++ _).getOrElse(Seq.empty))) @@ -93,7 +110,7 @@ class FilteredScanSuite extends DataSourceTest { sqlTest( "SELECT * FROM oneToTenFiltered", - (1 to 10).map(i => Row(i, i * 2)).toSeq) + (1 to 10).map(i => Row(i, i * 2, (i - 1 + 'a').toChar.toString * 10)).toSeq) sqlTest( "SELECT a, b FROM oneToTenFiltered", @@ -128,41 +145,53 @@ class FilteredScanSuite extends DataSourceTest { (2 to 10 by 2).map(i => Row(i, i)).toSeq) sqlTest( - "SELECT * FROM oneToTenFiltered WHERE a = 1", - Seq(1).map(i => Row(i, i * 2)).toSeq) + "SELECT a, b FROM oneToTenFiltered WHERE a = 1", + Seq(1).map(i => Row(i, i * 2))) sqlTest( - "SELECT * FROM oneToTenFiltered WHERE a IN (1,3,5)", - Seq(1,3,5).map(i => Row(i, i * 2)).toSeq) + "SELECT a, b FROM oneToTenFiltered WHERE a IN (1,3,5)", + Seq(1,3,5).map(i => Row(i, i * 2))) sqlTest( - "SELECT * FROM oneToTenFiltered WHERE A = 1", - Seq(1).map(i => Row(i, i * 2)).toSeq) + "SELECT a, b FROM oneToTenFiltered WHERE A = 1", + Seq(1).map(i => Row(i, i * 2))) sqlTest( - "SELECT * FROM oneToTenFiltered WHERE b = 2", - Seq(1).map(i => Row(i, i * 2)).toSeq) + "SELECT a, b FROM oneToTenFiltered WHERE b = 2", + Seq(1).map(i => Row(i, i * 2))) sqlTest( - "SELECT * FROM oneToTenFiltered WHERE a IS NULL", + "SELECT a, b FROM oneToTenFiltered WHERE a IS NULL", Seq.empty[Row]) sqlTest( - "SELECT * FROM oneToTenFiltered WHERE a IS NOT NULL", + "SELECT a, b FROM oneToTenFiltered WHERE a IS NOT NULL", (1 to 10).map(i => Row(i, i * 2)).toSeq) sqlTest( - "SELECT * FROM oneToTenFiltered WHERE a < 5 AND a > 1", + "SELECT a, b FROM oneToTenFiltered WHERE a < 5 AND a > 1", (2 to 4).map(i => Row(i, i * 2)).toSeq) sqlTest( - "SELECT * FROM oneToTenFiltered WHERE a < 3 OR a > 8", - Seq(1, 2, 9, 10).map(i => Row(i, i * 2)).toSeq) + "SELECT a, b FROM oneToTenFiltered WHERE a < 3 OR a > 8", + Seq(1, 2, 9, 10).map(i => Row(i, i * 2))) sqlTest( - "SELECT * FROM oneToTenFiltered WHERE NOT (a < 6)", + "SELECT a, b FROM oneToTenFiltered WHERE NOT (a < 6)", (6 to 10).map(i => Row(i, i * 2)).toSeq) + sqlTest( + "SELECT a, b, c FROM oneToTenFiltered WHERE c like 'c%'", + Seq(Row(3, 3 * 2, "c" * 10))) + + sqlTest( + "SELECT a, b, c FROM oneToTenFiltered WHERE c like 'd%'", + Seq(Row(4, 4 * 2, "d" * 10))) + + sqlTest( + "SELECT a, b, c FROM oneToTenFiltered WHERE c like '%e%'", + Seq(Row(5, 5 * 2, "e" * 10))) + testPushDown("SELECT * FROM oneToTenFiltered WHERE A = 1", 1) testPushDown("SELECT a FROM oneToTenFiltered WHERE A = 1", 1) testPushDown("SELECT b FROM oneToTenFiltered WHERE A = 1", 1) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org