Repository: spark
Updated Branches:
  refs/heads/master 56775571c -> f07e71406


[SPARK-6625][SQL] Add common string filters to data sources.

Filters such as startsWith, endsWith, contains will be very useful for data 
sources that provide search functionality, e.g. Succinct, Elastic Search, Solr.

I also took this chance to improve documentation for the data source filters.

Author: Reynold Xin <r...@databricks.com>

Closes #5285 from rxin/ds-string-filters and squashes the following commits:

f021727 [Reynold Xin] Fixed grammar.
7695a52 [Reynold Xin] [SPARK-6625][SQL] Add common string filters to data 
sources.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f07e7140
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f07e7140
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f07e7140

Branch: refs/heads/master
Commit: f07e714062f02feadff10a45f9b9061444bb8ec5
Parents: 5677557
Author: Reynold Xin <r...@databricks.com>
Authored: Tue Mar 31 00:19:51 2015 -0700
Committer: Reynold Xin <r...@databricks.com>
Committed: Tue Mar 31 00:19:51 2015 -0700

----------------------------------------------------------------------
 .../spark/sql/sources/DataSourceStrategy.scala  | 10 +++
 .../org/apache/spark/sql/sources/filters.scala  | 69 ++++++++++++++++++
 .../apache/spark/sql/sources/interfaces.scala   |  3 +
 .../spark/sql/sources/FilteredScanSuite.scala   | 73 ++++++++++++++------
 4 files changed, 133 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f07e7140/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala
index 67f3507..83b603a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala
@@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.planning.PhysicalOperation
 import org.apache.spark.sql.catalyst.plans.logical
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.types.StringType
 import org.apache.spark.sql.{Row, Strategy, execution, sources}
 
 /**
@@ -166,6 +167,15 @@ private[sql] object DataSourceStrategy extends Strategy {
       case expressions.Not(child) =>
         translate(child).map(sources.Not)
 
+      case expressions.StartsWith(a: Attribute, Literal(v: String, 
StringType)) =>
+        Some(sources.StringStartsWith(a.name, v))
+
+      case expressions.EndsWith(a: Attribute, Literal(v: String, StringType)) 
=>
+        Some(sources.StringEndsWith(a.name, v))
+
+      case expressions.EndsWith(a: Attribute, Literal(v: String, StringType)) 
=>
+        Some(sources.StringContains(a.name, v))
+
       case _ => None
     }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/f07e7140/sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala
index 1e4505e..791046e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala
@@ -17,16 +17,85 @@
 
 package org.apache.spark.sql.sources
 
+/**
+ * A filter predicate for data sources.
+ */
 abstract class Filter
 
+/**
+ * A filter that evaluates to `true` iff the attribute evaluates to a value
+ * equal to `value`.
+ */
 case class EqualTo(attribute: String, value: Any) extends Filter
+
+/**
+ * A filter that evaluates to `true` iff the attribute evaluates to a value
+ * greater than `value`.
+ */
 case class GreaterThan(attribute: String, value: Any) extends Filter
+
+/**
+ * A filter that evaluates to `true` iff the attribute evaluates to a value
+ * greater than or equal to `value`.
+ */
 case class GreaterThanOrEqual(attribute: String, value: Any) extends Filter
+
+/**
+ * A filter that evaluates to `true` iff the attribute evaluates to a value
+ * less than `value`.
+ */
 case class LessThan(attribute: String, value: Any) extends Filter
+
+/**
+ * A filter that evaluates to `true` iff the attribute evaluates to a value
+ * less than or equal to `value`.
+ */
 case class LessThanOrEqual(attribute: String, value: Any) extends Filter
+
+/**
+ * A filter that evaluates to `true` iff the attribute evaluates to one of the 
values in the array.
+ */
 case class In(attribute: String, values: Array[Any]) extends Filter
+
+/**
+ * A filter that evaluates to `true` iff the attribute evaluates to null.
+ */
 case class IsNull(attribute: String) extends Filter
+
+/**
+ * A filter that evaluates to `true` iff the attribute evaluates to a non-null 
value.
+ */
 case class IsNotNull(attribute: String) extends Filter
+
+/**
+ * A filter that evaluates to `true` iff both `left` or `right` evaluate to 
`true`.
+ */
 case class And(left: Filter, right: Filter) extends Filter
+
+/**
+ * A filter that evaluates to `true` iff at least one of `left` or `right` 
evaluates to `true`.
+ */
 case class Or(left: Filter, right: Filter) extends Filter
+
+/**
+ * A filter that evaluates to `true` iff `child` is evaluated to `false`.
+ */
 case class Not(child: Filter) extends Filter
+
+/**
+ * A filter that evaluates to `true` iff the attribute evaluates to
+ * a string that starts with `value`.
+ */
+case class StringStartsWith(attribute: String, value: String) extends Filter
+
+/**
+ * A filter that evaluates to `true` iff the attribute evaluates to
+ * a string that starts with `value`.
+ */
+case class StringEndsWith(attribute: String, value: String) extends Filter
+
+/**
+ * A filter that evaluates to `true` iff the attribute evaluates to
+ * a string that contains the string `value`.
+ */
+case class StringContains(attribute: String, value: String) extends Filter

http://git-wip-us.apache.org/repos/asf/spark/blob/f07e7140/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index a046a48..8f9946a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -152,6 +152,9 @@ trait PrunedScan {
  * A BaseRelation that can eliminate unneeded columns and filter using selected
  * predicates before producing an RDD containing all matching tuples as Row 
objects.
  *
+ * The actual filter should be the conjunction of all `filters`,
+ * i.e. they should be "and" together.
+ *
  * The pushed down filters are currently purely an optimization as they will 
all be evaluated
  * again.  This means it is safe to use them with methods that produce false 
positives such
  * as filtering partitions based on a bloom filter.

http://git-wip-us.apache.org/repos/asf/spark/blob/f07e7140/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala
index ffeccf0..72ddc0e 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala
@@ -35,20 +35,23 @@ case class SimpleFilteredScan(from: Int, to: 
Int)(@transient val sqlContext: SQL
   extends BaseRelation
   with PrunedFilteredScan {
 
-  override def schema =
+  override def schema: StructType =
     StructType(
       StructField("a", IntegerType, nullable = false) ::
-      StructField("b", IntegerType, nullable = false) :: Nil)
+      StructField("b", IntegerType, nullable = false) ::
+      StructField("c", StringType, nullable = false) :: Nil)
 
   override def buildScan(requiredColumns: Array[String], filters: 
Array[Filter]) = {
     val rowBuilders = requiredColumns.map {
       case "a" => (i: Int) => Seq(i)
       case "b" => (i: Int) => Seq(i * 2)
+      case "c" => (i: Int) => Seq((i - 1 + 'a').toChar.toString * 10)
     }
 
     FiltersPushed.list = filters
 
-    def translateFilter(filter: Filter): Int => Boolean = filter match {
+    // Predicate test on integer column
+    def translateFilterOnA(filter: Filter): Int => Boolean = filter match {
       case EqualTo("a", v) => (a: Int) => a == v
       case LessThan("a", v: Int) => (a: Int) => a < v
       case LessThanOrEqual("a", v: Int) => (a: Int) => a <= v
@@ -57,13 +60,27 @@ case class SimpleFilteredScan(from: Int, to: 
Int)(@transient val sqlContext: SQL
       case In("a", values) => (a: Int) => 
values.map(_.asInstanceOf[Int]).toSet.contains(a)
       case IsNull("a") => (a: Int) => false // Int can't be null
       case IsNotNull("a") => (a: Int) => true
-      case Not(pred) => (a: Int) => !translateFilter(pred)(a)
-      case And(left, right) => (a: Int) => translateFilter(left)(a) && 
translateFilter(right)(a)
-      case Or(left, right) => (a: Int) => translateFilter(left)(a) || 
translateFilter(right)(a)
+      case Not(pred) => (a: Int) => !translateFilterOnA(pred)(a)
+      case And(left, right) => (a: Int) =>
+        translateFilterOnA(left)(a) && translateFilterOnA(right)(a)
+      case Or(left, right) => (a: Int) =>
+        translateFilterOnA(left)(a) || translateFilterOnA(right)(a)
       case _ => (a: Int) => true
     }
 
-    def eval(a: Int) = !filters.map(translateFilter(_)(a)).contains(false)
+    // Predicate test on string column
+    def translateFilterOnC(filter: Filter): String => Boolean = filter match {
+      case StringStartsWith("c", v) => _.startsWith(v)
+      case StringEndsWith("c", v) => _.endsWith(v)
+      case StringContains("c", v) => _.contains(v)
+      case _ => (c: String) => true
+    }
+
+    def eval(a: Int) = {
+      val c = (a - 1 + 'a').toChar.toString * 10
+      !filters.map(translateFilterOnA(_)(a)).contains(false) &&
+        !filters.map(translateFilterOnC(_)(c)).contains(false)
+    }
 
     sqlContext.sparkContext.parallelize(from to to).filter(eval).map(i =>
       Row.fromSeq(rowBuilders.map(_(i)).reduceOption(_ ++ 
_).getOrElse(Seq.empty)))
@@ -93,7 +110,7 @@ class FilteredScanSuite extends DataSourceTest {
 
   sqlTest(
     "SELECT * FROM oneToTenFiltered",
-    (1 to 10).map(i => Row(i, i * 2)).toSeq)
+    (1 to 10).map(i => Row(i, i * 2, (i - 1 + 'a').toChar.toString * 
10)).toSeq)
 
   sqlTest(
     "SELECT a, b FROM oneToTenFiltered",
@@ -128,41 +145,53 @@ class FilteredScanSuite extends DataSourceTest {
     (2 to 10 by 2).map(i => Row(i, i)).toSeq)
 
   sqlTest(
-    "SELECT * FROM oneToTenFiltered WHERE a = 1",
-    Seq(1).map(i => Row(i, i * 2)).toSeq)
+    "SELECT a, b FROM oneToTenFiltered WHERE a = 1",
+    Seq(1).map(i => Row(i, i * 2)))
 
   sqlTest(
-    "SELECT * FROM oneToTenFiltered WHERE a IN (1,3,5)",
-    Seq(1,3,5).map(i => Row(i, i * 2)).toSeq)
+    "SELECT a, b FROM oneToTenFiltered WHERE a IN (1,3,5)",
+    Seq(1,3,5).map(i => Row(i, i * 2)))
 
   sqlTest(
-    "SELECT * FROM oneToTenFiltered WHERE A = 1",
-    Seq(1).map(i => Row(i, i * 2)).toSeq)
+    "SELECT a, b FROM oneToTenFiltered WHERE A = 1",
+    Seq(1).map(i => Row(i, i * 2)))
 
   sqlTest(
-    "SELECT * FROM oneToTenFiltered WHERE b = 2",
-    Seq(1).map(i => Row(i, i * 2)).toSeq)
+    "SELECT a, b FROM oneToTenFiltered WHERE b = 2",
+    Seq(1).map(i => Row(i, i * 2)))
 
   sqlTest(
-    "SELECT * FROM oneToTenFiltered WHERE a IS NULL",
+    "SELECT a, b FROM oneToTenFiltered WHERE a IS NULL",
     Seq.empty[Row])
 
   sqlTest(
-    "SELECT * FROM oneToTenFiltered WHERE a IS NOT NULL",
+    "SELECT a, b FROM oneToTenFiltered WHERE a IS NOT NULL",
     (1 to 10).map(i => Row(i, i * 2)).toSeq)
 
   sqlTest(
-    "SELECT * FROM oneToTenFiltered WHERE a < 5 AND a > 1",
+    "SELECT a, b FROM oneToTenFiltered WHERE a < 5 AND a > 1",
     (2 to 4).map(i => Row(i, i * 2)).toSeq)
 
   sqlTest(
-    "SELECT * FROM oneToTenFiltered WHERE a < 3 OR a > 8",
-    Seq(1, 2, 9, 10).map(i => Row(i, i * 2)).toSeq)
+    "SELECT a, b FROM oneToTenFiltered WHERE a < 3 OR a > 8",
+    Seq(1, 2, 9, 10).map(i => Row(i, i * 2)))
 
   sqlTest(
-    "SELECT * FROM oneToTenFiltered WHERE NOT (a < 6)",
+    "SELECT a, b FROM oneToTenFiltered WHERE NOT (a < 6)",
     (6 to 10).map(i => Row(i, i * 2)).toSeq)
 
+  sqlTest(
+    "SELECT a, b, c FROM oneToTenFiltered WHERE c like 'c%'",
+    Seq(Row(3, 3 * 2, "c" * 10)))
+
+  sqlTest(
+    "SELECT a, b, c FROM oneToTenFiltered WHERE c like 'd%'",
+    Seq(Row(4, 4 * 2, "d" * 10)))
+
+  sqlTest(
+    "SELECT a, b, c FROM oneToTenFiltered WHERE c like '%e%'",
+    Seq(Row(5, 5 * 2, "e" * 10)))
+
   testPushDown("SELECT * FROM oneToTenFiltered WHERE A = 1", 1)
   testPushDown("SELECT a FROM oneToTenFiltered WHERE A = 1", 1)
   testPushDown("SELECT b FROM oneToTenFiltered WHERE A = 1", 1)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to