This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 39e8359495f [SPARK-40892][SQL][SS] Loosen the requirement of 
window_time rule - allow multiple window_time calls
39e8359495f is described below

commit 39e8359495fc61df02994e35a9adc62ee104bead
Author: Jungtaek Lim <kabhwan.opensou...@gmail.com>
AuthorDate: Fri Oct 28 11:25:44 2022 +0900

    [SPARK-40892][SQL][SS] Loosen the requirement of window_time rule - allow 
multiple window_time calls
    
    ### What changes were proposed in this pull request?
    
    This PR proposes to loosen the requirement of window_time rule to allow 
multiple distinct window_time calls. After this change, users can call the 
window_time function with different windows in the same logical node (select, 
where, etc.).
    
    Given that we allow multiple calls of window_time in projection, we no 
longer be able to use the reserved column name "window_time". This PR picked up 
the SQL representation of the WindowTime, to distinguish each distinct function 
call.
    (This is different from time window/session window, but "arguably" saying, 
they are incorrect. Just that we can't fix them now since the change would 
incur backward incompatibility...)
    
    ### Why are the changes needed?
    
    The rule for window time followed the existing rules of time window / 
session window which only allows a single function call in a same projection 
(strictly saying, it considers the call of function as once if the function is 
called with same parameters).
    
    For time window/session window rules , the restriction makes sense since 
allowing this would produce cartesian product of rows (although Spark can 
handle it). But given that window_time only produces one value, the restriction 
no longer makes sense.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes since it changes the resulting column name from window_time function 
call, but the function is not released yet.
    
    ### How was this patch tested?
    
    New test case.
    
    Closes #38361 from HeartSaVioR/SPARK-40892.
    
    Authored-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 .../sql/catalyst/analysis/ResolveTimeWindows.scala | 78 ++++++++++++----------
 .../sql-functions/sql-expression-schema.md         |  4 +-
 .../spark/sql/DataFrameTimeWindowingSuite.scala    | 48 ++++++++-----
 3 files changed, 75 insertions(+), 55 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTimeWindows.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTimeWindows.scala
index fd5da3ff13d..df6b1c400bb 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTimeWindows.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTimeWindows.scala
@@ -292,53 +292,59 @@ object ResolveWindowTime extends Rule[LogicalPlan] {
       val windowTimeExpressions =
         p.expressions.flatMap(_.collect { case w: WindowTime => w }).toSet
 
-      if (windowTimeExpressions.size == 1 &&
-        windowTimeExpressions.head.windowColumn.resolved &&
-        windowTimeExpressions.head.checkInputDataTypes().isSuccess) {
+      val allWindowTimeExprsResolved = windowTimeExpressions.forall { w =>
+        w.windowColumn.resolved && w.checkInputDataTypes().isSuccess
+      }
 
-        val windowTime = windowTimeExpressions.head
+      if (windowTimeExpressions.nonEmpty && allWindowTimeExprsResolved) {
+        val windowTimeToAttrAndNewColumn = windowTimeExpressions.map { 
windowTime =>
+          val metadata = windowTime.windowColumn match {
+            case a: Attribute => a.metadata
+            case _ => Metadata.empty
+          }
 
-        val metadata = windowTime.windowColumn match {
-          case a: Attribute => a.metadata
-          case _ => Metadata.empty
-        }
+          if (!metadata.contains(TimeWindow.marker) &&
+            !metadata.contains(SessionWindow.marker)) {
+            // FIXME: error framework?
+            throw new AnalysisException(
+              "The input is not a correct window column: $windowTime", plan = 
Some(p))
+          }
 
-        if (!metadata.contains(TimeWindow.marker) &&
-          !metadata.contains(SessionWindow.marker)) {
-          // FIXME: error framework?
-          throw new AnalysisException(
-            "The input is not a correct window column: $windowTime", plan = 
Some(p))
-        }
+          val newMetadata = new MetadataBuilder()
+            .withMetadata(metadata)
+            .remove(TimeWindow.marker)
+            .remove(SessionWindow.marker)
+            .build()
 
-        val newMetadata = new MetadataBuilder()
-          .withMetadata(metadata)
-          .remove(TimeWindow.marker)
-          .remove(SessionWindow.marker)
-          .build()
+          val colName = windowTime.sql
+
+          val attr = AttributeReference(colName, windowTime.dataType, metadata 
= newMetadata)()
 
-        val attr = AttributeReference(
-          "window_time", windowTime.dataType, metadata = newMetadata)()
+          // NOTE: "window.end" is "exclusive" upper bound of window, so if we 
use this value as
+          // it is, it is going to be bound to the different window even if we 
apply the same window
+          // spec. Decrease 1 microsecond from window.end to let the 
window_time be bound to the
+          // correct window range.
+          val subtractExpr =
+          PreciseTimestampConversion(
+            Subtract(PreciseTimestampConversion(
+              GetStructField(windowTime.windowColumn, 1),
+              windowTime.dataType, LongType), Literal(1L)),
+            LongType,
+            windowTime.dataType)
 
-        // NOTE: "window.end" is "exclusive" upper bound of window, so if we 
use this value as
-        // it is, it is going to be bound to the different window even if we 
apply the same window
-        // spec. Decrease 1 microsecond from window.end to let the window_time 
be bound to the
-        // correct window range.
-        val subtractExpr =
-        PreciseTimestampConversion(
-          Subtract(PreciseTimestampConversion(
-            GetStructField(windowTime.windowColumn, 1),
-            windowTime.dataType, LongType), Literal(1L)),
-          LongType,
-          windowTime.dataType)
+          val newColumn = Alias(subtractExpr, colName)(
+            exprId = attr.exprId, explicitMetadata = Some(newMetadata))
 
-        val newColumn = Alias(subtractExpr, "window_time")(
-          exprId = attr.exprId, explicitMetadata = Some(newMetadata))
+          windowTime -> (attr, newColumn)
+        }.toMap
 
         val replacedPlan = p transformExpressions {
-          case w: WindowTime => attr
+          case w: WindowTime => windowTimeToAttrAndNewColumn(w)._1
         }
 
-        replacedPlan.withNewChildren(Project(newColumn +: child.output, child) 
:: Nil)
+        val newColumnsToAdd = windowTimeToAttrAndNewColumn.values.map(_._2)
+        replacedPlan.withNewChildren(
+          Project(newColumnsToAdd ++: child.output, child) :: Nil)
       } else {
         p // Return unchanged. Analyzer will throw exception later
       }
diff --git a/sql/core/src/test/resources/sql-functions/sql-expression-schema.md 
b/sql/core/src/test/resources/sql-functions/sql-expression-schema.md
index 6f111b777a6..482c72679bb 100644
--- a/sql/core/src/test/resources/sql-functions/sql-expression-schema.md
+++ b/sql/core/src/test/resources/sql-functions/sql-expression-schema.md
@@ -345,7 +345,7 @@
 | org.apache.spark.sql.catalyst.expressions.WeekDay | weekday | SELECT 
weekday('2009-07-30') | struct<weekday(2009-07-30):int> |
 | org.apache.spark.sql.catalyst.expressions.WeekOfYear | weekofyear | SELECT 
weekofyear('2008-02-20') | struct<weekofyear(2008-02-20):int> |
 | org.apache.spark.sql.catalyst.expressions.WidthBucket | width_bucket | 
SELECT width_bucket(5.3, 0.2, 10.6, 5) | struct<width_bucket(5.3, 0.2, 10.6, 
5):bigint> |
-| org.apache.spark.sql.catalyst.expressions.WindowTime | window_time | SELECT 
a, window.start as start, window.end as end, window_time(window), cnt FROM 
(SELECT a, window, count(*) as cnt FROM VALUES ('A1', '2021-01-01 00:00:00'), 
('A1', '2021-01-01 00:04:30'), ('A1', '2021-01-01 00:06:00'), ('A2', 
'2021-01-01 00:01:00') AS tab(a, b) GROUP by a, window(b, '5 minutes') ORDER BY 
a, window.start) | 
struct<a:string,start:timestamp,end:timestamp,window_time:timestamp,cnt:bigint> 
|
+| org.apache.spark.sql.catalyst.expressions.WindowTime | window_time | SELECT 
a, window.start as start, window.end as end, window_time(window), cnt FROM 
(SELECT a, window, count(*) as cnt FROM VALUES ('A1', '2021-01-01 00:00:00'), 
('A1', '2021-01-01 00:04:30'), ('A1', '2021-01-01 00:06:00'), ('A2', 
'2021-01-01 00:01:00') AS tab(a, b) GROUP by a, window(b, '5 minutes') ORDER BY 
a, window.start) | 
struct<a:string,start:timestamp,end:timestamp,window_time(window):timestamp,cnt:bigint>
 |
 | org.apache.spark.sql.catalyst.expressions.XxHash64 | xxhash64 | SELECT 
xxhash64('Spark', array(123), 2) | struct<xxhash64(Spark, array(123), 
2):bigint> |
 | org.apache.spark.sql.catalyst.expressions.Year | year | SELECT 
year('2016-07-30') | struct<year(2016-07-30):int> |
 | org.apache.spark.sql.catalyst.expressions.ZipWith | zip_with | SELECT 
zip_with(array(1, 2, 3), array('a', 'b', 'c'), (x, y) -> (y, x)) | 
struct<zip_with(array(1, 2, 3), array(a, b, c), lambdafunction(named_struct(y, 
namedlambdavariable(), x, namedlambdavariable()), namedlambdavariable(), 
namedlambdavariable())):array<struct<y:string,x:int>>> |
@@ -413,4 +413,4 @@
 | org.apache.spark.sql.catalyst.expressions.xml.XPathList | xpath | SELECT 
xpath('<a><b>b1</b><b>b2</b><b>b3</b><c>c1</c><c>c2</c></a>','a/b/text()') | 
struct<xpath(<a><b>b1</b><b>b2</b><b>b3</b><c>c1</c><c>c2</c></a>, 
a/b/text()):array<string>> |
 | org.apache.spark.sql.catalyst.expressions.xml.XPathLong | xpath_long | 
SELECT xpath_long('<a><b>1</b><b>2</b></a>', 'sum(a/b)') | 
struct<xpath_long(<a><b>1</b><b>2</b></a>, sum(a/b)):bigint> |
 | org.apache.spark.sql.catalyst.expressions.xml.XPathShort | xpath_short | 
SELECT xpath_short('<a><b>1</b><b>2</b></a>', 'sum(a/b)') | 
struct<xpath_short(<a><b>1</b><b>2</b></a>, sum(a/b)):smallint> |
-| org.apache.spark.sql.catalyst.expressions.xml.XPathString | xpath_string | 
SELECT xpath_string('<a><b>b</b><c>cc</c></a>','a/c') | 
struct<xpath_string(<a><b>b</b><c>cc</c></a>, a/c):string> |
\ No newline at end of file
+| org.apache.spark.sql.catalyst.expressions.xml.XPathString | xpath_string | 
SELECT xpath_string('<a><b>b</b><c>cc</c></a>','a/c') | 
struct<xpath_string(<a><b>b</b><c>cc</c></a>, a/c):string> |
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala
index f775eb9ecfc..a878e0ffa51 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala
@@ -599,23 +599,37 @@ class DataFrameTimeWindowingSuite extends QueryTest with 
SharedSparkSession {
       ("2016-03-27 19:38:18"), ("2016-03-27 19:39:25")
     ).toDF("time")
 
-    val e = intercept[AnalysisException] {
-      df
-        .withColumn("time2", expr("time - INTERVAL 5 minutes"))
-        .select(
-          window($"time", "10 seconds").as("window1"),
-          window($"time2", "10 seconds").as("window2")
-        )
-        .select(
-          $"window1.end".cast("string"),
-          window_time($"window1").cast("string"),
-          $"window2.end".cast("string"),
-          window_time($"window2").cast("string")
-        )
-    }
-    assert(e.getMessage.contains(
-      "Multiple time/session window expressions would result in a cartesian 
product of rows, " +
-        "therefore they are currently not supported"))
+    val df2 = df
+      .withColumn("time2", expr("time - INTERVAL 15 minutes"))
+      .select(window($"time", "10 seconds").as("window1"), $"time2")
+      .select($"window1", window($"time2", "10 seconds").as("window2"))
+
+    checkAnswer(
+      df2.select(
+        $"window1.end".cast("string"),
+        window_time($"window1").cast("string"),
+        $"window2.end".cast("string"),
+        window_time($"window2").cast("string")),
+      Seq(
+        Row("2016-03-27 19:38:20", "2016-03-27 19:38:19.999999",
+            "2016-03-27 19:23:20", "2016-03-27 19:23:19.999999"),
+        Row("2016-03-27 19:39:30", "2016-03-27 19:39:29.999999",
+            "2016-03-27 19:24:30", "2016-03-27 19:24:29.999999"))
+    )
+
+    // check column names
+    val df3 = df2
+      .select(
+        window_time($"window1").cast("string"),
+        window_time($"window2").cast("string"),
+        window_time($"window2").as("wt2_aliased").cast("string")
+      )
+
+    val schema = df3.schema
+
+    assert(schema.fields.exists(_.name == "window_time(window1)"))
+    assert(schema.fields.exists(_.name == "window_time(window2)"))
+    assert(schema.fields.exists(_.name == "wt2_aliased"))
   }
 
   test("window_time function on agg output") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to