This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new a445536 [SPARK-38349][SS] No need to filter events when sessionwindow gapDuration greater than 0 a445536 is described below commit a44553648f75e9243b8a7dc27185ae6901f35f94 Author: nyingping <smileyingp...@163.com> AuthorDate: Wed Mar 30 12:00:40 2022 +0900 [SPARK-38349][SS] No need to filter events when sessionwindow gapDuration greater than 0 ### What changes were proposed in this pull request? Static gapDuration on session Window,No need to filter events when sessionwindow gapDuration greater than 0. ### Why are the changes needed? save calculation resources and improve performance. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? add new UT and benchmark. a simple benchmark on [[9dae8a5](https://github.com/nyingping/spark/commit/9dae8a555e82a59d2160bbb4518704cec81b219e)] . thanks again [HeartSaVioRd532b6f](https://github.com/HeartSaVioR/spark/commit/d532b6f6bcdd80cdaac520b21587ebb69ff2df8f). --------------------------------------- case 1 --------------------------------------- ``` spark.range(numOfRow) .selectExpr("CAST(id AS timestamp) AS time") .select(session_window(col("time"), "10 seconds")) .count() ``` Result: ``` Java HotSpot(TM) 64-Bit Server VM 1.8.0_291-b10 on Windows 10 10.0 AMD64 Family 23 Model 96 Stepping 1, AuthenticAMD session windows: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ old logic 1688 1730 61 5.9 168.8 1.0X new logic 21 26 5 487.3 2.1 82.3X ``` Closes #35680 from nyingping/SPARK-38349. Lead-authored-by: nyingping <smileyingp...@163.com> Co-authored-by: Nie yingping <smileyingp...@163.com> Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> --- .../spark/sql/catalyst/analysis/Analyzer.scala | 16 +++++-- .../spark/sql/DataFrameSessionWindowingSuite.scala | 56 +++++++++++++++++++++- 2 files changed, 67 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 6d95067..f69f17d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -57,7 +57,7 @@ import org.apache.spark.sql.internal.connector.V1Function import org.apache.spark.sql.types._ import org.apache.spark.sql.types.DayTimeIntervalType.DAY import org.apache.spark.sql.util.{CaseInsensitiveStringMap, SchemaUtils} -import org.apache.spark.unsafe.types.UTF8String +import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String} import org.apache.spark.util.Utils /** @@ -4058,10 +4058,20 @@ object SessionWindowing extends Rule[LogicalPlan] { case s: SessionWindow => sessionAttr } + val filterByTimeRange = session.gapDuration match { + case Literal(interval: CalendarInterval, CalendarIntervalType) => + interval == null || interval.months + interval.days + interval.microseconds <= 0 + case _ => true + } + // As same as tumbling window, we add a filter to filter out nulls. // And we also filter out events with negative or zero or invalid gap duration. - val filterExpr = IsNotNull(session.timeColumn) && - (sessionAttr.getField(SESSION_END) > sessionAttr.getField(SESSION_START)) + val filterExpr = if (filterByTimeRange) { + IsNotNull(session.timeColumn) && + (sessionAttr.getField(SESSION_END) > sessionAttr.getField(SESSION_START)) + } else { + IsNotNull(session.timeColumn) + } replacedPlan.withNewChildren( Filter(filterExpr, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSessionWindowingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSessionWindowingSuite.scala index a5414f3..4c2d0f5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSessionWindowingSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSessionWindowingSuite.scala @@ -22,8 +22,8 @@ import java.time.LocalDateTime import org.scalatest.BeforeAndAfterEach import org.apache.spark.sql.catalyst.encoders.RowEncoder -import org.apache.spark.sql.catalyst.expressions.AttributeReference -import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Expand} +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, GreaterThan} +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Expand, Filter, LogicalPlan, Project} import org.apache.spark.sql.functions._ import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types._ @@ -495,4 +495,56 @@ class DataFrameSessionWindowingSuite extends QueryTest with SharedSparkSession validateWindowColumnInSchema(schema2, "session") } } + + test("SPARK-38349: No need to filter events when gapDuration greater than 0") { + // negative gap duration + check("-5 seconds", true, "Need to filter events when gap duration less than 0") + + // positive gap duration + check("5 seconds", false, "No need to filter events when gap duration greater than 0") + + // invalid gap duration + check("x seconds", true, "Need to filter events when gap duration invalid") + + // dynamic gap duration + check(when(col("time").equalTo("1"), "5 seconds") + .when(col("time").equalTo("2"), "10 seconds") + .otherwise("10 seconds"), true, "Need to filter events when gap duration dynamically") + + def check( + gapDuration: Any, + expectTimeRange: Boolean, + assertHintMsg: String): Unit = { + val data = Seq( + ("2016-03-27 19:39:30", 1, "a")).toDF("time", "value", "id") + val df = if (gapDuration.isInstanceOf[String]) { + data.groupBy(session_window($"time", gapDuration.asInstanceOf[String])) + } else { + data.groupBy(session_window($"time", gapDuration.asInstanceOf[Column])) + } + val aggregate = df.agg(count("*").as("counts")) + .select($"session_window.start".cast("string"), $"session_window.end".cast("string"), + $"counts") + + checkFilterCondition(aggregate.queryExecution.logical, expectTimeRange, assertHintMsg) + } + + def checkFilterCondition( + logicalPlan: LogicalPlan, + expectTimeRange: Boolean, + assertHintMsg: String): Unit = { + val filter = logicalPlan.find { plan => + plan.isInstanceOf[Filter] && plan.children.head.isInstanceOf[Project] + } + assert(filter.isDefined) + val exist = filter.get.expressions.flatMap { expr => + expr.collect { case gt: GreaterThan => gt } + } + if (expectTimeRange) { + assert(exist.nonEmpty, assertHintMsg) + } else { + assert(exist.isEmpty, assertHintMsg) + } + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org