This is an automated email from the ASF dual-hosted git repository. viirya pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 1b7c636445b [SPARK-39002][SQL] StringEndsWith/Contains support push down to Parquet 1b7c636445b is described below commit 1b7c636445b4dd8766149d454ed909eccd9db118 Author: wangguangxin.cn <wangguangxin...@bytedance.com> AuthorDate: Tue Apr 26 21:12:30 2022 -0700 [SPARK-39002][SQL] StringEndsWith/Contains support push down to Parquet ### What changes were proposed in this pull request? Push down StringEndsWith/Contains to Parquet so that we can leverage Parquet Dictionary Filtering ### Why are the changes needed? Improve performance. FilterPushDownBenchmark: ``` ================================================================================================ Pushdown benchmark for StringEndsWith ================================================================================================ OpenJDK 64-Bit Server VM 1.8.0_292-b10 on Mac OS X 10.16 Intel(R) Core(TM) i7-1068NG7 CPU 2.30GHz StringEndsWith filter: (value like '%10'): Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------- Parquet Vectorized 7666 7771 117 2.1 487.4 1.0X Parquet Vectorized (Pushdown) 540 554 18 29.1 34.3 14.2X Native ORC Vectorized 8206 8417 203 1.9 521.7 0.9X Native ORC Vectorized (Pushdown) 8120 8674 422 1.9 516.2 0.9X OpenJDK 64-Bit Server VM 1.8.0_292-b10 on Mac OS X 10.16 Intel(R) Core(TM) i7-1068NG7 CPU 2.30GHz StringEndsWith filter: (value like '%1000'): Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative --------------------------------------------------------------------------------------------------------------------------- Parquet Vectorized 7007 7122 224 2.2 445.5 1.0X Parquet Vectorized (Pushdown) 423 485 92 37.2 26.9 16.6X Native ORC Vectorized 7368 7629 373 2.1 468.5 1.0X Native ORC Vectorized (Pushdown) 7998 8349 270 2.0 508.5 0.9X OpenJDK 64-Bit Server VM 1.8.0_292-b10 on Mac OS X 10.16 Intel(R) Core(TM) i7-1068NG7 CPU 2.30GHz StringEndsWith filter: (value like '%786432'): Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ----------------------------------------------------------------------------------------------------------------------------- Parquet Vectorized 7012 7210 238 2.2 445.8 1.0X Parquet Vectorized (Pushdown) 419 431 14 37.6 26.6 16.7X Native ORC Vectorized 7513 7995 447 2.1 477.6 0.9X Native ORC Vectorized (Pushdown) 8310 8811 448 1.9 528.3 0.8X ================================================================================================ Pushdown benchmark for StringContains ================================================================================================ OpenJDK 64-Bit Server VM 1.8.0_292-b10 on Mac OS X 10.16 Intel(R) Core(TM) i7-1068NG7 CPU 2.30GHz StringContains filter: (value like '%10%'): Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative -------------------------------------------------------------------------------------------------------------------------- Parquet Vectorized 7588 8125 328 2.1 482.4 1.0X Parquet Vectorized (Pushdown) 1029 1068 25 15.3 65.4 7.4X Native ORC Vectorized 7803 7859 92 2.0 496.1 1.0X Native ORC Vectorized (Pushdown) 8944 9443 459 1.8 568.6 0.8X OpenJDK 64-Bit Server VM 1.8.0_292-b10 on Mac OS X 10.16 Intel(R) Core(TM) i7-1068NG7 CPU 2.30GHz StringContains filter: (value like '%1000%'): Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ---------------------------------------------------------------------------------------------------------------------------- Parquet Vectorized 7476 8343 710 2.1 475.3 1.0X Parquet Vectorized (Pushdown) 424 427 2 37.1 27.0 17.6X Native ORC Vectorized 7503 8261 818 2.1 477.0 1.0X Native ORC Vectorized (Pushdown) 8124 8609 548 1.9 516.5 0.9X OpenJDK 64-Bit Server VM 1.8.0_292-b10 on Mac OS X 10.16 Intel(R) Core(TM) i7-1068NG7 CPU 2.30GHz StringContains filter: (value like '%786432%'): Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------------ Parquet Vectorized 7070 7274 199 2.2 449.5 1.0X Parquet Vectorized (Pushdown) 441 478 32 35.6 28.1 16.0X Native ORC Vectorized 7564 7937 323 2.1 480.9 0.9X Native ORC Vectorized (Pushdown) 8623 8921 228 1.8 548.2 0.8X ``` ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? added UT Closes #36328 from WangGuangxin/pushdown_startwith_using_dict. Authored-by: wangguangxin.cn <wangguangxin...@bytedance.com> Signed-off-by: Liang-Chi Hsieh <vii...@gmail.com> --- .../org/apache/spark/sql/internal/SQLConf.scala | 13 ++- .../datasources/parquet/ParquetFileFormat.scala | 4 +- .../datasources/parquet/ParquetFilters.scala | 34 +++++- .../v2/parquet/ParquetPartitionReaderFactory.scala | 4 +- .../v2/parquet/ParquetScanBuilder.scala | 4 +- .../scala/org/apache/spark/sql/SQLQuerySuite.scala | 3 +- .../benchmark/FilterPushdownBenchmark.scala | 32 +++++ .../datasources/parquet/ParquetFilterSuite.scala | 129 ++++++++++++++++----- 8 files changed, 186 insertions(+), 37 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 6d3f283fa73..49cd23851ec 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -959,6 +959,15 @@ object SQLConf { .booleanConf .createWithDefault(true) + val PARQUET_FILTER_PUSHDOWN_STRING_PREDICATE_ENABLED = + buildConf("spark.sql.parquet.filterPushdown.stringPredicate") + .doc("If true, enables Parquet filter push-down optimization for string predicate such " + + "as startsWith/endsWith/contains function. This configuration only has an effect when " + + s"'${PARQUET_FILTER_PUSHDOWN_ENABLED.key}' is enabled.") + .version("3.4.0") + .internal() + .fallbackConf(PARQUET_FILTER_PUSHDOWN_STRING_STARTSWITH_ENABLED) + val PARQUET_FILTER_PUSHDOWN_INFILTERTHRESHOLD = buildConf("spark.sql.parquet.pushdown.inFilterThreshold") .doc("For IN predicate, Parquet filter will push-down a set of OR clauses if its " + @@ -4050,8 +4059,8 @@ class SQLConf extends Serializable with Logging { def parquetFilterPushDownDecimal: Boolean = getConf(PARQUET_FILTER_PUSHDOWN_DECIMAL_ENABLED) - def parquetFilterPushDownStringStartWith: Boolean = - getConf(PARQUET_FILTER_PUSHDOWN_STRING_STARTSWITH_ENABLED) + def parquetFilterPushDownStringPredicate: Boolean = + getConf(PARQUET_FILTER_PUSHDOWN_STRING_PREDICATE_ENABLED) def parquetFilterPushDownInFilterThreshold: Int = getConf(PARQUET_FILTER_PUSHDOWN_INFILTERTHRESHOLD) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 44dc145d36e..de0759979d5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -251,7 +251,7 @@ class ParquetFileFormat val pushDownDate = sqlConf.parquetFilterPushDownDate val pushDownTimestamp = sqlConf.parquetFilterPushDownTimestamp val pushDownDecimal = sqlConf.parquetFilterPushDownDecimal - val pushDownStringStartWith = sqlConf.parquetFilterPushDownStringStartWith + val pushDownStringPredicate = sqlConf.parquetFilterPushDownStringPredicate val pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold val isCaseSensitive = sqlConf.caseSensitiveAnalysis val parquetOptions = new ParquetOptions(options, sparkSession.sessionState.conf) @@ -279,7 +279,7 @@ class ParquetFileFormat pushDownDate, pushDownTimestamp, pushDownDecimal, - pushDownStringStartWith, + pushDownStringPredicate, pushDownInFilterThreshold, isCaseSensitive, datetimeRebaseSpec) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala index 75060cfca24..210c40351b0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala @@ -48,7 +48,7 @@ class ParquetFilters( pushDownDate: Boolean, pushDownTimestamp: Boolean, pushDownDecimal: Boolean, - pushDownStartWith: Boolean, + pushDownStringPredicate: Boolean, pushDownInFilterThreshold: Int, caseSensitive: Boolean, datetimeRebaseSpec: RebaseSpec) { @@ -747,7 +747,7 @@ class ParquetFilters( } case sources.StringStartsWith(name, prefix) - if pushDownStartWith && canMakeFilterOn(name, prefix) => + if pushDownStringPredicate && canMakeFilterOn(name, prefix) => Option(prefix).map { v => FilterApi.userDefined(binaryColumn(nameToParquetField(name).fieldNames), new UserDefinedPredicate[Binary] with Serializable { @@ -778,6 +778,36 @@ class ParquetFilters( ) } + case sources.StringEndsWith(name, suffix) + if pushDownStringPredicate && canMakeFilterOn(name, suffix) => + Option(suffix).map { v => + FilterApi.userDefined(binaryColumn(nameToParquetField(name).fieldNames), + new UserDefinedPredicate[Binary] with Serializable { + private val suffixStr = UTF8String.fromString(v) + override def canDrop(statistics: Statistics[Binary]): Boolean = false + override def inverseCanDrop(statistics: Statistics[Binary]): Boolean = false + override def keep(value: Binary): Boolean = { + value != null && UTF8String.fromBytes(value.getBytes).endsWith(suffixStr) + } + } + ) + } + + case sources.StringContains(name, value) + if pushDownStringPredicate && canMakeFilterOn(name, value) => + Option(value).map { v => + FilterApi.userDefined(binaryColumn(nameToParquetField(name).fieldNames), + new UserDefinedPredicate[Binary] with Serializable { + private val subStr = UTF8String.fromString(v) + override def canDrop(statistics: Statistics[Binary]): Boolean = false + override def inverseCanDrop(statistics: Statistics[Binary]): Boolean = false + override def keep(value: Binary): Boolean = { + value != null && UTF8String.fromBytes(value.getBytes).contains(subStr) + } + } + ) + } + case _ => None } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala index 9a25dd88ff4..c9572e474c8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala @@ -79,7 +79,7 @@ case class ParquetPartitionReaderFactory( private val pushDownDate = sqlConf.parquetFilterPushDownDate private val pushDownTimestamp = sqlConf.parquetFilterPushDownTimestamp private val pushDownDecimal = sqlConf.parquetFilterPushDownDecimal - private val pushDownStringStartWith = sqlConf.parquetFilterPushDownStringStartWith + private val pushDownStringPredicate = sqlConf.parquetFilterPushDownStringPredicate private val pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold private val datetimeRebaseModeInRead = options.datetimeRebaseModeInRead private val int96RebaseModeInRead = options.int96RebaseModeInRead @@ -221,7 +221,7 @@ case class ParquetPartitionReaderFactory( pushDownDate, pushDownTimestamp, pushDownDecimal, - pushDownStringStartWith, + pushDownStringPredicate, pushDownInFilterThreshold, isCaseSensitive, datetimeRebaseSpec) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScanBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScanBuilder.scala index 2093f4a16ef..2e3b9b20b5d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScanBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScanBuilder.scala @@ -52,7 +52,7 @@ case class ParquetScanBuilder( val pushDownDate = sqlConf.parquetFilterPushDownDate val pushDownTimestamp = sqlConf.parquetFilterPushDownTimestamp val pushDownDecimal = sqlConf.parquetFilterPushDownDecimal - val pushDownStringStartWith = sqlConf.parquetFilterPushDownStringStartWith + val pushDownStringPredicate = sqlConf.parquetFilterPushDownStringPredicate val pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold val isCaseSensitive = sqlConf.caseSensitiveAnalysis val parquetSchema = @@ -62,7 +62,7 @@ case class ParquetScanBuilder( pushDownDate, pushDownTimestamp, pushDownDecimal, - pushDownStringStartWith, + pushDownStringPredicate, pushDownInFilterThreshold, isCaseSensitive, // The rebase mode doesn't matter here because the filters are used to determine diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 4d384d3286b..fe8467c8d82 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -3034,7 +3034,8 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark } Seq("orc", "parquet").foreach { format => - withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "") { + withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "", + SQLConf.PARQUET_FILTER_PUSHDOWN_STRING_PREDICATE_ENABLED.key -> "false") { withTempPath { dir => spark.range(10).map(i => (i, i.toString)).toDF("id", "s") .write diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FilterPushdownBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FilterPushdownBenchmark.scala index 2bd03b6cb75..dd2852eea78 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FilterPushdownBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FilterPushdownBenchmark.scala @@ -242,6 +242,38 @@ object FilterPushdownBenchmark extends SqlBasedBenchmark { } } + runBenchmark("Pushdown benchmark for StringEndsWith") { + withTempPath { dir => + withTempTable("orcTable", "parquetTable") { + prepareStringDictTable(dir, numRows, 200, width) + Seq( + "value like '%10'", + "value like '%1000'", + s"value like '%${mid.toString.substring(0, mid.toString.length - 1)}'" + ).foreach { whereExpr => + val title = s"StringEndsWith filter: ($whereExpr)" + filterPushDownBenchmark(numRows, title, whereExpr) + } + } + } + } + + runBenchmark("Pushdown benchmark for StringContains") { + withTempPath { dir => + withTempTable("orcTable", "parquetTable") { + prepareStringDictTable(dir, numRows, 200, width) + Seq( + "value like '%10%'", + "value like '%1000%'", + s"value like '%${mid.toString.substring(0, mid.toString.length - 1)}%'" + ).foreach { whereExpr => + val title = s"StringContains filter: ($whereExpr)" + filterPushDownBenchmark(numRows, title, whereExpr) + } + } + } + } + runBenchmark(s"Pushdown benchmark for ${DecimalType.simpleString}") { withTempPath { dir => Seq( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala index d8eab40c38f..be081dadb2f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -81,7 +81,7 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared datetimeRebaseSpec: RebaseSpec = RebaseSpec(LegacyBehaviorPolicy.CORRECTED) ): ParquetFilters = new ParquetFilters(schema, conf.parquetFilterPushDownDate, conf.parquetFilterPushDownTimestamp, - conf.parquetFilterPushDownDecimal, conf.parquetFilterPushDownStringStartWith, + conf.parquetFilterPushDownDecimal, conf.parquetFilterPushDownStringPredicate, conf.parquetFilterPushDownInFilterThreshold, caseSensitive.getOrElse(conf.caseSensitiveAnalysis), datetimeRebaseSpec) @@ -207,20 +207,24 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared } } - // This function tests that exactly go through the `canDrop` and `inverseCanDrop`. - private def testStringStartsWith(dataFrame: DataFrame, filter: String): Unit = { + // This function tests that exactly go through the `keep`, `canDrop` and `inverseCanDrop`. + private def testStringPredicate(dataFrame: DataFrame, filter: String, + shouldFilterOut: Boolean, enableDictionary: Boolean = true): Unit = { withTempPath { dir => val path = dir.getCanonicalPath - dataFrame.write.option("parquet.block.size", 512).parquet(path) + dataFrame.write + .option("parquet.block.size", 512) + .option(ParquetOutputFormat.ENABLE_DICTIONARY, enableDictionary) + .parquet(path) Seq(true, false).foreach { pushDown => withSQLConf( - SQLConf.PARQUET_FILTER_PUSHDOWN_STRING_STARTSWITH_ENABLED.key -> pushDown.toString) { + SQLConf.PARQUET_FILTER_PUSHDOWN_STRING_PREDICATE_ENABLED.key -> pushDown.toString) { val accu = new NumRowGroupsAcc sparkContext.register(accu) val df = spark.read.parquet(path).filter(filter) df.foreachPartition((it: Iterator[Row]) => it.foreach(v => accu.add(0))) - if (pushDown) { + if (pushDown && shouldFilterOut) { assert(accu.value == 0) } else { assert(accu.value > 0) @@ -970,7 +974,12 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared )) val parquetSchema = new SparkToParquetSchemaConverter(conf).convert(schema) - val parquetFilters = createParquetFilters(parquetSchema) + // Following tests are used to check one arm of AND/OR can't be pushed down, + // so we disable string predicate pushdown here + var parquetFilters: ParquetFilters = null + withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_STRING_PREDICATE_ENABLED.key -> "false") { + parquetFilters = createParquetFilters(parquetSchema) + } assertResult(Some(and( lt(intColumn("a"), 10: Integer), gt(doubleColumn("c"), 1.5: java.lang.Double))) @@ -1114,7 +1123,12 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared )) val parquetSchema = new SparkToParquetSchemaConverter(conf).convert(schema) - val parquetFilters = createParquetFilters(parquetSchema) + // Following tests are used to check one arm of AND/OR can't be pushed down, + // so we disable string predicate pushdown here + var parquetFilters: ParquetFilters = null + withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_STRING_PREDICATE_ENABLED.key -> "false") { + parquetFilters = createParquetFilters(parquetSchema) + } // Testing // case sources.Or(lhs, rhs) => // ... @@ -1169,7 +1183,12 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared )) val parquetSchema = new SparkToParquetSchemaConverter(conf).convert(schema) - val parquetFilters = createParquetFilters(parquetSchema) + // Following tests are used to check one arm of AND/OR can't be pushed down, + // so we disable string predicate pushdown here + var parquetFilters: ParquetFilters = null + withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_STRING_PREDICATE_ENABLED.key -> "false") { + parquetFilters = createParquetFilters(parquetSchema) + } assertResult(Seq(sources.And(sources.LessThan("a", 10), sources.GreaterThan("c", 1.5D)))) { parquetFilters.convertibleFilters( Seq(sources.And( @@ -1423,65 +1442,123 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared } } - test("filter pushdown - StringStartsWith") { + private def checkStringFilterPushdown( + stringPredicate: String => Expression, + sourceFilter: (String, String) => sources.Filter): Unit = { withParquetDataFrame((1 to 4).map(i => Tuple1(i + "str" + i))) { implicit df => checkFilterPredicate( - $"_1".startsWith("").asInstanceOf[Predicate], + stringPredicate("").asInstanceOf[Predicate], classOf[UserDefinedByInstance[_, _]], Seq("1str1", "2str2", "3str3", "4str4").map(Row(_))) - Seq("2", "2s", "2st", "2str", "2str2").foreach { prefix => + Seq("2", "2str2").foreach { str => checkFilterPredicate( - $"_1".startsWith(prefix).asInstanceOf[Predicate], + stringPredicate(str).asInstanceOf[Predicate], classOf[UserDefinedByInstance[_, _]], "2str2") } - Seq("2S", "null", "2str22").foreach { prefix => + Seq("2S", "null", "2str22").foreach { str => checkFilterPredicate( - $"_1".startsWith(prefix).asInstanceOf[Predicate], + stringPredicate(str).asInstanceOf[Predicate], classOf[UserDefinedByInstance[_, _]], Seq.empty[Row]) } checkFilterPredicate( - !$"_1".startsWith("").asInstanceOf[Predicate], + !stringPredicate("").asInstanceOf[Predicate], classOf[Operators.Not], Seq().map(Row(_))) - Seq("2", "2s", "2st", "2str", "2str2").foreach { prefix => + Seq("2", "2str2").foreach { str => checkFilterPredicate( - !$"_1".startsWith(prefix).asInstanceOf[Predicate], + !stringPredicate(str).asInstanceOf[Predicate], classOf[Operators.Not], Seq("1str1", "3str3", "4str4").map(Row(_))) } - Seq("2S", "null", "2str22").foreach { prefix => + Seq("2S", "null", "2str22").foreach { str => checkFilterPredicate( - !$"_1".startsWith(prefix).asInstanceOf[Predicate], + !stringPredicate(str).asInstanceOf[Predicate], classOf[Operators.Not], Seq("1str1", "2str2", "3str3", "4str4").map(Row(_))) } val schema = new SparkToParquetSchemaConverter(conf).convert(df.schema) assertResult(None) { - createParquetFilters(schema).createFilter(sources.StringStartsWith("_1", null)) + createParquetFilters(schema).createFilter(sourceFilter("_1", null)) } } // SPARK-28371: make sure filter is null-safe. withParquetDataFrame(Seq(Tuple1[String](null))) { implicit df => checkFilterPredicate( - $"_1".startsWith("blah").asInstanceOf[Predicate], + stringPredicate("blah").asInstanceOf[Predicate], classOf[UserDefinedByInstance[_, _]], Seq.empty[Row]) } + } + + test("filter pushdown - StringStartsWith") { + checkStringFilterPushdown( + str => $"_1".startsWith(str), + (attr, value) => sources.StringStartsWith(attr, value)) + } + + test("filter pushdown - StringEndsWith") { + checkStringFilterPushdown( + str => $"_1".endsWith(str), + (attr, value) => sources.StringEndsWith(attr, value)) + } + + test("filter pushdown - StringContains") { + checkStringFilterPushdown( + str => $"_1".contains(str), + (attr, value) => sources.StringContains(attr, value)) + } + test("filter pushdown - StringPredicate") { import testImplicits._ - // Test canDrop() has taken effect - testStringStartsWith(spark.range(1024).map(_.toString).toDF(), "value like 'a%'") - // Test inverseCanDrop() has taken effect - testStringStartsWith(spark.range(1024).map(c => "100").toDF(), "value not like '10%'") + // keep() should take effect on StartsWith/EndsWith/Contains + Seq( + "value like 'a%'", // StartsWith + "value like '%a'", // EndsWith + "value like '%a%'" // Contains + ).foreach { filter => + testStringPredicate( + // dictionary will be generated since there are duplicated values + spark.range(1000).map(t => (t % 10).toString).toDF(), + filter, + true) + } + + // canDrop() should take effect on StartsWith, + // and has no effect on EndsWith/Contains + Seq( + ("value like 'a%'", true), // StartsWith + ("value like '%a'", false), // EndsWith + ("value like '%a%'", false) // Contains + ).foreach { case (filter, shouldFilterOut) => + testStringPredicate( + spark.range(1024).map(_.toString).toDF(), + filter, + shouldFilterOut, + enableDictionary = false) + } + + // inverseCanDrop() should take effect on StartsWith, + // and has no effect on EndsWith/Contains + Seq( + ("value not like '10%'", true), // StartsWith + ("value not like '%10'", false), // EndsWith + ("value not like '%10%'", false) // Contains + ).foreach { case (filter, shouldFilterOut) => + testStringPredicate( + spark.range(1024).map(c => "100").toDF(), + filter, + shouldFilterOut, + enableDictionary = false) + } } test("SPARK-17091: Convert IN predicate to Parquet filter push-down") { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org