Repository: spark
Updated Branches:
  refs/heads/master f71e8da5e -> 03545ce6d


[SPARK-24638][SQL] StringStartsWith support push down

## What changes were proposed in this pull request?

`StringStartsWith` support push down. About 50% savings in compute time.

## How was this patch tested?
unit tests, manual tests and performance test:
```scala
cat <<EOF > SPARK-24638.scala
def benchmark(func: () => Unit): Long = {
  val start = System.currentTimeMillis()
  for(i <- 0 until 100) { func() }
  val end = System.currentTimeMillis()
  end - start
}
val path = "/tmp/spark/parquet/string/"
spark.range(10000000).selectExpr("concat(id, 'str', id) as 
id").coalesce(1).write.mode("overwrite").option("parquet.block.size", 
1048576).parquet(path)
val df = spark.read.parquet(path)

spark.sql("set spark.sql.parquet.filterPushdown.string.startsWith=true")
val pushdownEnable = benchmark(() => df.where("id like '999998%'").count())

spark.sql("set spark.sql.parquet.filterPushdown.string.startsWith=false")
val pushdownDisable = benchmark(() => df.where("id like '999998%'").count())

val improvements = pushdownDisable - pushdownEnable
println(s"improvements: $improvements")
EOF

bin/spark-shell -i SPARK-24638.scala
```
result:
```scala
Loading SPARK-24638.scala...
benchmark: (func: () => Unit)Long
path: String = /tmp/spark/parquet/string/
df: org.apache.spark.sql.DataFrame = [id: string]
res1: org.apache.spark.sql.DataFrame = [key: string, value: string]
pushdownEnable: Long = 11608
res2: org.apache.spark.sql.DataFrame = [key: string, value: string]
pushdownDisable: Long = 31981
improvements: Long = 20373
```

Author: Yuming Wang <yumw...@ebay.com>

Closes #21623 from wangyum/SPARK-24638.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/03545ce6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/03545ce6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/03545ce6

Branch: refs/heads/master
Commit: 03545ce6de08bd0ad685c5f59b73bc22dfc40887
Parents: f71e8da
Author: Yuming Wang <yumw...@ebay.com>
Authored: Sat Jun 30 13:58:50 2018 +0800
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Sat Jun 30 13:58:50 2018 +0800

----------------------------------------------------------------------
 .../org/apache/spark/sql/internal/SQLConf.scala | 11 +++
 .../datasources/parquet/ParquetFileFormat.scala |  4 +-
 .../datasources/parquet/ParquetFilters.scala    | 35 +++++++-
 .../parquet/ParquetFilterSuite.scala            | 84 +++++++++++++++++++-
 4 files changed, 130 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/03545ce6/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index e1752ff..da1c34c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -378,6 +378,14 @@ object SQLConf {
     .booleanConf
     .createWithDefault(true)
 
+  val PARQUET_FILTER_PUSHDOWN_STRING_STARTSWITH_ENABLED =
+    buildConf("spark.sql.parquet.filterPushdown.string.startsWith")
+    .doc("If true, enables Parquet filter push-down optimization for string 
startsWith function. " +
+      "This configuration only has an effect when 
'spark.sql.parquet.filterPushdown' is enabled.")
+    .internal()
+    .booleanConf
+    .createWithDefault(true)
+
   val PARQUET_WRITE_LEGACY_FORMAT = 
buildConf("spark.sql.parquet.writeLegacyFormat")
     .doc("Whether to be compatible with the legacy Parquet format adopted by 
Spark 1.4 and prior " +
       "versions, when converting Parquet schema to Spark SQL schema and vice 
versa.")
@@ -1459,6 +1467,9 @@ class SQLConf extends Serializable with Logging {
 
   def parquetFilterPushDownDate: Boolean = 
getConf(PARQUET_FILTER_PUSHDOWN_DATE_ENABLED)
 
+  def parquetFilterPushDownStringStartWith: Boolean =
+    getConf(PARQUET_FILTER_PUSHDOWN_STRING_STARTSWITH_ENABLED)
+
   def orcFilterPushDown: Boolean = getConf(ORC_FILTER_PUSHDOWN_ENABLED)
 
   def verifyPartitionPath: Boolean = getConf(HIVE_VERIFY_PARTITION_PATH)

http://git-wip-us.apache.org/repos/asf/spark/blob/03545ce6/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index 9602a08..93de1fa 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -348,6 +348,7 @@ class ParquetFileFormat
     // Whole stage codegen (PhysicalRDD) is able to deal with batches directly
     val returningBatch = supportBatch(sparkSession, resultSchema)
     val pushDownDate = sqlConf.parquetFilterPushDownDate
+    val pushDownStringStartWith = sqlConf.parquetFilterPushDownStringStartWith
 
     (file: PartitionedFile) => {
       assert(file.partitionValues.numFields == partitionSchema.size)
@@ -358,7 +359,8 @@ class ParquetFileFormat
           // Collects all converted Parquet filter predicates. Notice that not 
all predicates can be
           // converted (`ParquetFilters.createFilter` returns an `Option`). 
That's why a `flatMap`
           // is used here.
-          .flatMap(new 
ParquetFilters(pushDownDate).createFilter(requiredSchema, _))
+          .flatMap(new ParquetFilters(pushDownDate, pushDownStringStartWith)
+          .createFilter(requiredSchema, _))
           .reduceOption(FilterApi.and)
       } else {
         None

http://git-wip-us.apache.org/repos/asf/spark/blob/03545ce6/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
index 3106261..21c9e2e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
@@ -22,16 +22,18 @@ import java.sql.Date
 import org.apache.parquet.filter2.predicate._
 import org.apache.parquet.filter2.predicate.FilterApi._
 import org.apache.parquet.io.api.Binary
+import org.apache.parquet.schema.PrimitiveComparator
 
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.catalyst.util.DateTimeUtils.SQLDate
 import org.apache.spark.sql.sources
 import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
 
 /**
  * Some utility function to convert Spark data source filters to Parquet 
filters.
  */
-private[parquet] class ParquetFilters(pushDownDate: Boolean) {
+private[parquet] class ParquetFilters(pushDownDate: Boolean, 
pushDownStartWith: Boolean) {
 
   private def dateToDays(date: Date): SQLDate = {
     DateTimeUtils.fromJavaDate(date)
@@ -270,6 +272,37 @@ private[parquet] class ParquetFilters(pushDownDate: 
Boolean) {
       case sources.Not(pred) =>
         createFilter(schema, pred).map(FilterApi.not)
 
+      case sources.StringStartsWith(name, prefix) if pushDownStartWith && 
canMakeFilterOn(name) =>
+        Option(prefix).map { v =>
+          FilterApi.userDefined(binaryColumn(name),
+            new UserDefinedPredicate[Binary] with Serializable {
+              private val strToBinary = Binary.fromReusedByteArray(v.getBytes)
+              private val size = strToBinary.length
+
+              override def canDrop(statistics: Statistics[Binary]): Boolean = {
+                val comparator = 
PrimitiveComparator.UNSIGNED_LEXICOGRAPHICAL_BINARY_COMPARATOR
+                val max = statistics.getMax
+                val min = statistics.getMin
+                comparator.compare(max.slice(0, math.min(size, max.length)), 
strToBinary) < 0 ||
+                  comparator.compare(min.slice(0, math.min(size, min.length)), 
strToBinary) > 0
+              }
+
+              override def inverseCanDrop(statistics: Statistics[Binary]): 
Boolean = {
+                val comparator = 
PrimitiveComparator.UNSIGNED_LEXICOGRAPHICAL_BINARY_COMPARATOR
+                val max = statistics.getMax
+                val min = statistics.getMin
+                comparator.compare(max.slice(0, math.min(size, max.length)), 
strToBinary) == 0 &&
+                  comparator.compare(min.slice(0, math.min(size, min.length)), 
strToBinary) == 0
+              }
+
+              override def keep(value: Binary): Boolean = {
+                UTF8String.fromBytes(value.getBytes).startsWith(
+                  UTF8String.fromBytes(strToBinary.getBytes))
+              }
+            }
+          )
+        }
+
       case _ => None
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/03545ce6/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
index 90da7eb..d9ae585 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
@@ -55,7 +55,8 @@ import org.apache.spark.util.{AccumulatorContext, 
AccumulatorV2}
  */
 class ParquetFilterSuite extends QueryTest with ParquetTest with 
SharedSQLContext {
 
-  private lazy val parquetFilters = new 
ParquetFilters(conf.parquetFilterPushDownDate)
+  private lazy val parquetFilters =
+    new ParquetFilters(conf.parquetFilterPushDownDate, 
conf.parquetFilterPushDownStringStartWith)
 
   override def beforeEach(): Unit = {
     super.beforeEach()
@@ -82,6 +83,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest 
with SharedSQLContex
     withSQLConf(
       SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true",
       SQLConf.PARQUET_FILTER_PUSHDOWN_DATE_ENABLED.key -> "true",
+      SQLConf.PARQUET_FILTER_PUSHDOWN_STRING_STARTSWITH_ENABLED.key -> "true",
       SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") {
         val query = df
           .select(output.map(e => Column(e)): _*)
@@ -140,6 +142,31 @@ class ParquetFilterSuite extends QueryTest with 
ParquetTest with SharedSQLContex
     checkBinaryFilterPredicate(predicate, filterClass, Seq(Row(expected)))(df)
   }
 
+  // This function tests that exactly go through the `canDrop` and 
`inverseCanDrop`.
+  private def testStringStartsWith(dataFrame: DataFrame, filter: String): Unit 
= {
+    withTempPath { dir =>
+      val path = dir.getCanonicalPath
+      dataFrame.write.option("parquet.block.size", 512).parquet(path)
+      Seq(true, false).foreach { pushDown =>
+        withSQLConf(
+          SQLConf.PARQUET_FILTER_PUSHDOWN_STRING_STARTSWITH_ENABLED.key -> 
pushDown.toString) {
+          val accu = new NumRowGroupsAcc
+          sparkContext.register(accu)
+
+          val df = spark.read.parquet(path).filter(filter)
+          df.foreachPartition((it: Iterator[Row]) => it.foreach(v => 
accu.add(0)))
+          if (pushDown) {
+            assert(accu.value == 0)
+          } else {
+            assert(accu.value > 0)
+          }
+
+          AccumulatorContext.remove(accu.id)
+        }
+      }
+    }
+  }
+
   test("filter pushdown - boolean") {
     withParquetDataFrame((true :: false :: Nil).map(b => 
Tuple1.apply(Option(b)))) { implicit df =>
       checkFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row])
@@ -574,7 +601,6 @@ class ParquetFilterSuite extends QueryTest with ParquetTest 
with SharedSQLContex
 
             val df = spark.read.parquet(path).filter("a < 100")
             df.foreachPartition((it: Iterator[Row]) => it.foreach(v => 
accu.add(0)))
-            df.collect
 
             if (enablePushDown) {
               assert(accu.value == 0)
@@ -660,6 +686,60 @@ class ParquetFilterSuite extends QueryTest with 
ParquetTest with SharedSQLContex
       assert(df.where("col > 0").count() === 2)
     }
   }
+
+  test("filter pushdown - StringStartsWith") {
+    withParquetDataFrame((1 to 4).map(i => Tuple1(i + "str" + i))) { implicit 
df =>
+      checkFilterPredicate(
+        '_1.startsWith("").asInstanceOf[Predicate],
+        classOf[UserDefinedByInstance[_, _]],
+        Seq("1str1", "2str2", "3str3", "4str4").map(Row(_)))
+
+      Seq("2", "2s", "2st", "2str", "2str2").foreach { prefix =>
+        checkFilterPredicate(
+          '_1.startsWith(prefix).asInstanceOf[Predicate],
+          classOf[UserDefinedByInstance[_, _]],
+          "2str2")
+      }
+
+      Seq("2S", "null", "2str22").foreach { prefix =>
+        checkFilterPredicate(
+          '_1.startsWith(prefix).asInstanceOf[Predicate],
+          classOf[UserDefinedByInstance[_, _]],
+          Seq.empty[Row])
+      }
+
+      checkFilterPredicate(
+        !'_1.startsWith("").asInstanceOf[Predicate],
+        classOf[UserDefinedByInstance[_, _]],
+        Seq().map(Row(_)))
+
+      Seq("2", "2s", "2st", "2str", "2str2").foreach { prefix =>
+        checkFilterPredicate(
+          !'_1.startsWith(prefix).asInstanceOf[Predicate],
+          classOf[UserDefinedByInstance[_, _]],
+          Seq("1str1", "3str3", "4str4").map(Row(_)))
+      }
+
+      Seq("2S", "null", "2str22").foreach { prefix =>
+        checkFilterPredicate(
+          !'_1.startsWith(prefix).asInstanceOf[Predicate],
+          classOf[UserDefinedByInstance[_, _]],
+          Seq("1str1", "2str2", "3str3", "4str4").map(Row(_)))
+      }
+
+      assertResult(None) {
+        parquetFilters.createFilter(
+          df.schema,
+          sources.StringStartsWith("_1", null))
+      }
+    }
+
+    import testImplicits._
+    // Test canDrop() has taken effect
+    testStringStartsWith(spark.range(1024).map(_.toString).toDF(), "value like 
'a%'")
+    // Test inverseCanDrop() has taken effect
+    testStringStartsWith(spark.range(1024).map(c => "100").toDF(), "value not 
like '10%'")
+  }
 }
 
 class NumRowGroupsAcc extends AccumulatorV2[Integer, Integer] {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to