This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new d28068434c9 [SPARK-41732][SQL][SS] Apply tree-pattern based pruning for the rule SessionWindowing d28068434c9 is described below commit d28068434c96348815afb6fe4883744113af5cde Author: Jungtaek Lim <kabhwan.opensou...@gmail.com> AuthorDate: Wed Dec 28 12:58:06 2022 +0900 [SPARK-41732][SQL][SS] Apply tree-pattern based pruning for the rule SessionWindowing ### What changes were proposed in this pull request? This PR proposes to apply tree-pattern based pruning for the rule SessionWindowing, to minimize the evaluation of rule with SessionWindow node. ### Why are the changes needed? The rule SessionWindowing is unnecessarily evaluated multiple times without proper pruning. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing tests. Closes #39245 from HeartSaVioR/SPARK-41732. Authored-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> --- .../org/apache/spark/sql/catalyst/analysis/ResolveTimeWindows.scala | 5 +++-- .../org/apache/spark/sql/catalyst/expressions/SessionWindow.scala | 2 ++ .../scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala | 1 + .../scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala | 1 + 4 files changed, 7 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTimeWindows.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTimeWindows.scala index df6b1c400bb..be837d72c5a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTimeWindows.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTimeWindows.scala @@ -21,7 +21,7 @@ import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, Cast, CreateNamedStruct, Expression, GetStructField, IsNotNull, Literal, PreciseTimestampConversion, SessionWindow, Subtract, TimeWindow, WindowTime} import org.apache.spark.sql.catalyst.plans.logical.{Expand, Filter, LogicalPlan, Project} import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.catalyst.trees.TreePattern.TIME_WINDOW +import org.apache.spark.sql.catalyst.trees.TreePattern.{SESSION_WINDOW, TIME_WINDOW} import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.types.{CalendarIntervalType, DataType, LongType, Metadata, MetadataBuilder, StructType} import org.apache.spark.unsafe.types.CalendarInterval @@ -187,7 +187,8 @@ object SessionWindowing extends Rule[LogicalPlan] { * This also adds a marker to the session column so that downstream can easily find the column * on session window. */ - def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp { + def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUpWithPruning( + _.containsPattern(SESSION_WINDOW), ruleId) { case p: LogicalPlan if p.children.size == 1 => val child = p.children.head val sessionExpressions = diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SessionWindow.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SessionWindow.scala index 02273b0c461..021f119e0a1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SessionWindow.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SessionWindow.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.catalyst.expressions +import org.apache.spark.sql.catalyst.trees.TreePattern.{SESSION_WINDOW, TreePattern} import org.apache.spark.sql.catalyst.util.IntervalUtils import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String @@ -91,6 +92,7 @@ case class SessionWindow(timeColumn: Expression, gapDuration: Expression) extend override def dataType: DataType = new StructType() .add(StructField("start", children.head.dataType)) .add(StructField("end", children.head.dataType)) + final override val nodePatterns: Seq[TreePattern] = Seq(SESSION_WINDOW) // This expression is replaced in the analyzer. override lazy val resolved = false diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala index 41aa68f0ec6..e824a0b533d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala @@ -94,6 +94,7 @@ object RuleIdCollection { "org.apache.spark.sql.catalyst.analysis.ResolveOrderByAll" :: "org.apache.spark.sql.catalyst.analysis.ResolveTimeZone" :: "org.apache.spark.sql.catalyst.analysis.ResolveUnion" :: + "org.apache.spark.sql.catalyst.analysis.SessionWindowing" :: "org.apache.spark.sql.catalyst.analysis.SubstituteUnresolvedOrdinals" :: "org.apache.spark.sql.catalyst.analysis.TimeWindowing" :: "org.apache.spark.sql.catalyst.analysis.TypeCoercionBase$CombinedTypeCoercionRule" :: diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala index fbe885bda06..e2e7fca27e0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala @@ -82,6 +82,7 @@ object TreePattern extends Enumeration { val SCALAR_SUBQUERY: Value = Value val SCALAR_SUBQUERY_REFERENCE: Value = Value val SCALA_UDF: Value = Value + val SESSION_WINDOW: Value = Value val SORT: Value = Value val SUBQUERY_ALIAS: Value = Value val SUM: Value = Value --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org