This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 39e8359495f [SPARK-40892][SQL][SS] Loosen the requirement of window_time rule - allow multiple window_time calls 39e8359495f is described below commit 39e8359495fc61df02994e35a9adc62ee104bead Author: Jungtaek Lim <kabhwan.opensou...@gmail.com> AuthorDate: Fri Oct 28 11:25:44 2022 +0900 [SPARK-40892][SQL][SS] Loosen the requirement of window_time rule - allow multiple window_time calls ### What changes were proposed in this pull request? This PR proposes to loosen the requirement of window_time rule to allow multiple distinct window_time calls. After this change, users can call the window_time function with different windows in the same logical node (select, where, etc.). Given that we allow multiple calls of window_time in projection, we no longer be able to use the reserved column name "window_time". This PR picked up the SQL representation of the WindowTime, to distinguish each distinct function call. (This is different from time window/session window, but "arguably" saying, they are incorrect. Just that we can't fix them now since the change would incur backward incompatibility...) ### Why are the changes needed? The rule for window time followed the existing rules of time window / session window which only allows a single function call in a same projection (strictly saying, it considers the call of function as once if the function is called with same parameters). For time window/session window rules , the restriction makes sense since allowing this would produce cartesian product of rows (although Spark can handle it). But given that window_time only produces one value, the restriction no longer makes sense. ### Does this PR introduce _any_ user-facing change? Yes since it changes the resulting column name from window_time function call, but the function is not released yet. ### How was this patch tested? New test case. Closes #38361 from HeartSaVioR/SPARK-40892. Authored-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> --- .../sql/catalyst/analysis/ResolveTimeWindows.scala | 78 ++++++++++++---------- .../sql-functions/sql-expression-schema.md | 4 +- .../spark/sql/DataFrameTimeWindowingSuite.scala | 48 ++++++++----- 3 files changed, 75 insertions(+), 55 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTimeWindows.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTimeWindows.scala index fd5da3ff13d..df6b1c400bb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTimeWindows.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTimeWindows.scala @@ -292,53 +292,59 @@ object ResolveWindowTime extends Rule[LogicalPlan] { val windowTimeExpressions = p.expressions.flatMap(_.collect { case w: WindowTime => w }).toSet - if (windowTimeExpressions.size == 1 && - windowTimeExpressions.head.windowColumn.resolved && - windowTimeExpressions.head.checkInputDataTypes().isSuccess) { + val allWindowTimeExprsResolved = windowTimeExpressions.forall { w => + w.windowColumn.resolved && w.checkInputDataTypes().isSuccess + } - val windowTime = windowTimeExpressions.head + if (windowTimeExpressions.nonEmpty && allWindowTimeExprsResolved) { + val windowTimeToAttrAndNewColumn = windowTimeExpressions.map { windowTime => + val metadata = windowTime.windowColumn match { + case a: Attribute => a.metadata + case _ => Metadata.empty + } - val metadata = windowTime.windowColumn match { - case a: Attribute => a.metadata - case _ => Metadata.empty - } + if (!metadata.contains(TimeWindow.marker) && + !metadata.contains(SessionWindow.marker)) { + // FIXME: error framework? + throw new AnalysisException( + "The input is not a correct window column: $windowTime", plan = Some(p)) + } - if (!metadata.contains(TimeWindow.marker) && - !metadata.contains(SessionWindow.marker)) { - // FIXME: error framework? - throw new AnalysisException( - "The input is not a correct window column: $windowTime", plan = Some(p)) - } + val newMetadata = new MetadataBuilder() + .withMetadata(metadata) + .remove(TimeWindow.marker) + .remove(SessionWindow.marker) + .build() - val newMetadata = new MetadataBuilder() - .withMetadata(metadata) - .remove(TimeWindow.marker) - .remove(SessionWindow.marker) - .build() + val colName = windowTime.sql + + val attr = AttributeReference(colName, windowTime.dataType, metadata = newMetadata)() - val attr = AttributeReference( - "window_time", windowTime.dataType, metadata = newMetadata)() + // NOTE: "window.end" is "exclusive" upper bound of window, so if we use this value as + // it is, it is going to be bound to the different window even if we apply the same window + // spec. Decrease 1 microsecond from window.end to let the window_time be bound to the + // correct window range. + val subtractExpr = + PreciseTimestampConversion( + Subtract(PreciseTimestampConversion( + GetStructField(windowTime.windowColumn, 1), + windowTime.dataType, LongType), Literal(1L)), + LongType, + windowTime.dataType) - // NOTE: "window.end" is "exclusive" upper bound of window, so if we use this value as - // it is, it is going to be bound to the different window even if we apply the same window - // spec. Decrease 1 microsecond from window.end to let the window_time be bound to the - // correct window range. - val subtractExpr = - PreciseTimestampConversion( - Subtract(PreciseTimestampConversion( - GetStructField(windowTime.windowColumn, 1), - windowTime.dataType, LongType), Literal(1L)), - LongType, - windowTime.dataType) + val newColumn = Alias(subtractExpr, colName)( + exprId = attr.exprId, explicitMetadata = Some(newMetadata)) - val newColumn = Alias(subtractExpr, "window_time")( - exprId = attr.exprId, explicitMetadata = Some(newMetadata)) + windowTime -> (attr, newColumn) + }.toMap val replacedPlan = p transformExpressions { - case w: WindowTime => attr + case w: WindowTime => windowTimeToAttrAndNewColumn(w)._1 } - replacedPlan.withNewChildren(Project(newColumn +: child.output, child) :: Nil) + val newColumnsToAdd = windowTimeToAttrAndNewColumn.values.map(_._2) + replacedPlan.withNewChildren( + Project(newColumnsToAdd ++: child.output, child) :: Nil) } else { p // Return unchanged. Analyzer will throw exception later } diff --git a/sql/core/src/test/resources/sql-functions/sql-expression-schema.md b/sql/core/src/test/resources/sql-functions/sql-expression-schema.md index 6f111b777a6..482c72679bb 100644 --- a/sql/core/src/test/resources/sql-functions/sql-expression-schema.md +++ b/sql/core/src/test/resources/sql-functions/sql-expression-schema.md @@ -345,7 +345,7 @@ | org.apache.spark.sql.catalyst.expressions.WeekDay | weekday | SELECT weekday('2009-07-30') | struct<weekday(2009-07-30):int> | | org.apache.spark.sql.catalyst.expressions.WeekOfYear | weekofyear | SELECT weekofyear('2008-02-20') | struct<weekofyear(2008-02-20):int> | | org.apache.spark.sql.catalyst.expressions.WidthBucket | width_bucket | SELECT width_bucket(5.3, 0.2, 10.6, 5) | struct<width_bucket(5.3, 0.2, 10.6, 5):bigint> | -| org.apache.spark.sql.catalyst.expressions.WindowTime | window_time | SELECT a, window.start as start, window.end as end, window_time(window), cnt FROM (SELECT a, window, count(*) as cnt FROM VALUES ('A1', '2021-01-01 00:00:00'), ('A1', '2021-01-01 00:04:30'), ('A1', '2021-01-01 00:06:00'), ('A2', '2021-01-01 00:01:00') AS tab(a, b) GROUP by a, window(b, '5 minutes') ORDER BY a, window.start) | struct<a:string,start:timestamp,end:timestamp,window_time:timestamp,cnt:bigint> | +| org.apache.spark.sql.catalyst.expressions.WindowTime | window_time | SELECT a, window.start as start, window.end as end, window_time(window), cnt FROM (SELECT a, window, count(*) as cnt FROM VALUES ('A1', '2021-01-01 00:00:00'), ('A1', '2021-01-01 00:04:30'), ('A1', '2021-01-01 00:06:00'), ('A2', '2021-01-01 00:01:00') AS tab(a, b) GROUP by a, window(b, '5 minutes') ORDER BY a, window.start) | struct<a:string,start:timestamp,end:timestamp,window_time(window):timestamp,cnt:bigint> | | org.apache.spark.sql.catalyst.expressions.XxHash64 | xxhash64 | SELECT xxhash64('Spark', array(123), 2) | struct<xxhash64(Spark, array(123), 2):bigint> | | org.apache.spark.sql.catalyst.expressions.Year | year | SELECT year('2016-07-30') | struct<year(2016-07-30):int> | | org.apache.spark.sql.catalyst.expressions.ZipWith | zip_with | SELECT zip_with(array(1, 2, 3), array('a', 'b', 'c'), (x, y) -> (y, x)) | struct<zip_with(array(1, 2, 3), array(a, b, c), lambdafunction(named_struct(y, namedlambdavariable(), x, namedlambdavariable()), namedlambdavariable(), namedlambdavariable())):array<struct<y:string,x:int>>> | @@ -413,4 +413,4 @@ | org.apache.spark.sql.catalyst.expressions.xml.XPathList | xpath | SELECT xpath('<a><b>b1</b><b>b2</b><b>b3</b><c>c1</c><c>c2</c></a>','a/b/text()') | struct<xpath(<a><b>b1</b><b>b2</b><b>b3</b><c>c1</c><c>c2</c></a>, a/b/text()):array<string>> | | org.apache.spark.sql.catalyst.expressions.xml.XPathLong | xpath_long | SELECT xpath_long('<a><b>1</b><b>2</b></a>', 'sum(a/b)') | struct<xpath_long(<a><b>1</b><b>2</b></a>, sum(a/b)):bigint> | | org.apache.spark.sql.catalyst.expressions.xml.XPathShort | xpath_short | SELECT xpath_short('<a><b>1</b><b>2</b></a>', 'sum(a/b)') | struct<xpath_short(<a><b>1</b><b>2</b></a>, sum(a/b)):smallint> | -| org.apache.spark.sql.catalyst.expressions.xml.XPathString | xpath_string | SELECT xpath_string('<a><b>b</b><c>cc</c></a>','a/c') | struct<xpath_string(<a><b>b</b><c>cc</c></a>, a/c):string> | \ No newline at end of file +| org.apache.spark.sql.catalyst.expressions.xml.XPathString | xpath_string | SELECT xpath_string('<a><b>b</b><c>cc</c></a>','a/c') | struct<xpath_string(<a><b>b</b><c>cc</c></a>, a/c):string> | diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala index f775eb9ecfc..a878e0ffa51 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala @@ -599,23 +599,37 @@ class DataFrameTimeWindowingSuite extends QueryTest with SharedSparkSession { ("2016-03-27 19:38:18"), ("2016-03-27 19:39:25") ).toDF("time") - val e = intercept[AnalysisException] { - df - .withColumn("time2", expr("time - INTERVAL 5 minutes")) - .select( - window($"time", "10 seconds").as("window1"), - window($"time2", "10 seconds").as("window2") - ) - .select( - $"window1.end".cast("string"), - window_time($"window1").cast("string"), - $"window2.end".cast("string"), - window_time($"window2").cast("string") - ) - } - assert(e.getMessage.contains( - "Multiple time/session window expressions would result in a cartesian product of rows, " + - "therefore they are currently not supported")) + val df2 = df + .withColumn("time2", expr("time - INTERVAL 15 minutes")) + .select(window($"time", "10 seconds").as("window1"), $"time2") + .select($"window1", window($"time2", "10 seconds").as("window2")) + + checkAnswer( + df2.select( + $"window1.end".cast("string"), + window_time($"window1").cast("string"), + $"window2.end".cast("string"), + window_time($"window2").cast("string")), + Seq( + Row("2016-03-27 19:38:20", "2016-03-27 19:38:19.999999", + "2016-03-27 19:23:20", "2016-03-27 19:23:19.999999"), + Row("2016-03-27 19:39:30", "2016-03-27 19:39:29.999999", + "2016-03-27 19:24:30", "2016-03-27 19:24:29.999999")) + ) + + // check column names + val df3 = df2 + .select( + window_time($"window1").cast("string"), + window_time($"window2").cast("string"), + window_time($"window2").as("wt2_aliased").cast("string") + ) + + val schema = df3.schema + + assert(schema.fields.exists(_.name == "window_time(window1)")) + assert(schema.fields.exists(_.name == "window_time(window2)")) + assert(schema.fields.exists(_.name == "wt2_aliased")) } test("window_time function on agg output") { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org