[ https://issues.apache.org/jira/browse/SPARK-46450?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17798381#comment-17798381 ]
Jungtaek Lim commented on SPARK-46450: -------------------------------------- It's a missing one and maybe we will have to document - session window is only working properly with batch/streaming aggregation. If you use it as normal function and not ingesting the value to aggregation, merging sessions is never triggered. > session_window doesn't identify sessions with provided gap when used as a > window function > ----------------------------------------------------------------------------------------- > > Key: SPARK-46450 > URL: https://issues.apache.org/jira/browse/SPARK-46450 > Project: Spark > Issue Type: Bug > Components: Spark Core > Affects Versions: 3.4.1, 3.5.0 > Reporter: Juan Pumarino > Priority: Minor > > {{PARTITION BY session_window}} doesn't produce the expected results. Here's > an example: > {code:sql} > SELECT > id, > ts, > collect_list(id) OVER (PARTITION BY session_window(ts, '1 hour')) as > window_ids > FROM VALUES > (1, "2023-12-11 01:10"), > (2, "2023-12-11 01:15"), > (3, "2023-12-11 01:40"), > (4, "2023-12-11 02:05"), > (5, "2023-12-11 03:15"), > (6, "2023-12-11 03:20"), > (7, "2023-12-11 04:10"), > (8, "2023-12-11 05:05") > AS tab(id, ts) > {code} > Actual result: > {code:java} > +---+----------------+----------+ > |id |ts |window_ids| > +---+----------------+----------+ > |1 |2023-12-11 01:10|[1] | > |2 |2023-12-11 01:15|[2] | > |3 |2023-12-11 01:40|[3] | > |4 |2023-12-11 02:05|[4] | > |5 |2023-12-11 03:15|[5] | > |6 |2023-12-11 03:20|[6] | > |7 |2023-12-11 04:10|[7] | > |8 |2023-12-11 05:05|[8] | > +---+----------------+----------+ > {code} > Expected result, assigning rows to two sessions with 1-hour gap: > {code:java} > +---+----------------+------------+ > |id |ts |window_ids | > +---+----------------+------------+ > |1 |2023-12-11 01:10|[1, 2, 3, 4]| > |2 |2023-12-11 01:15|[1, 2, 3, 4]| > |3 |2023-12-11 01:40|[1, 2, 3, 4]| > |4 |2023-12-11 02:05|[1, 2, 3, 4]| > |5 |2023-12-11 03:15|[5, 6, 7, 8]| > |6 |2023-12-11 03:20|[5, 6, 7, 8]| > |7 |2023-12-11 04:10|[5, 6, 7, 8]| > |8 |2023-12-11 05:05|[5, 6, 7, 8]| > +---+----------------+------------+ > {code} > I compared its behavior with the results as a grouping function and with how > {{window()}} behaves in both cases, which seems to confirm that the result is > inconsistent. Here are the other examples: > *{{group by window()}}* > {code:sql} > SELECT > collect_list(id) AS ids, > collect_list(ts) AS tss, > window > FROM VALUES > (1, "2023-12-11 01:10"), > (2, "2023-12-11 01:15"), > (3, "2023-12-11 01:40"), > (4, "2023-12-11 02:05"), > (5, "2023-12-11 03:15"), > (6, "2023-12-11 03:20"), > (7, "2023-12-11 04:10"), > (8, "2023-12-11 05:05") > AS tab(id, ts) > GROUP by window(ts, '1 hour') > {code} > Correctly assigns rows to 1-hour windows: > {code:java} > +---------+------------------------------------------------------+------------------------------------------+ > |ids |tss |window > | > +---------+------------------------------------------------------+------------------------------------------+ > |[1, 2, 3]|[2023-12-11 01:10, 2023-12-11 01:15, 2023-12-11 01:40]|{2023-12-11 > 01:00:00, 2023-12-11 02:00:00}| > |[4] |[2023-12-11 02:05] |{2023-12-11 > 02:00:00, 2023-12-11 03:00:00}| > |[5, 6] |[2023-12-11 03:15, 2023-12-11 03:20] |{2023-12-11 > 03:00:00, 2023-12-11 04:00:00}| > |[7] |[2023-12-11 04:10] |{2023-12-11 > 04:00:00, 2023-12-11 05:00:00}| > |[8] |[2023-12-11 05:05] |{2023-12-11 > 05:00:00, 2023-12-11 06:00:00}| > +---------+------------------------------------------------------+------------------------------------------+ > {code} > > *{{group by session_window()}}* > {code:sql} > SELECT > collect_list(id) AS ids, > collect_list(ts) AS tss, > session_window > FROM VALUES > (1, "2023-12-11 01:10"), > (2, "2023-12-11 01:15"), > (3, "2023-12-11 01:40"), > (4, "2023-12-11 02:05"), > (5, "2023-12-11 03:15"), > (6, "2023-12-11 03:20"), > (7, "2023-12-11 04:10"), > (8, "2023-12-11 05:05") > AS tab(id, ts) > GROUP by session_window(ts, '1 hour') > {code} > Correctly assigns rows to two sessions with 1-hour gap: > {code:java} > +------------+------------------------------------------------------------------------+------------------------------------------+ > |ids |tss > |session_window | > +------------+------------------------------------------------------------------------+------------------------------------------+ > |[1, 2, 3, 4]|[2023-12-11 01:10, 2023-12-11 01:15, 2023-12-11 01:40, > 2023-12-11 02:05]|{2023-12-11 01:10:00, 2023-12-11 03:05:00}| > |[5, 6, 7, 8]|[2023-12-11 03:15, 2023-12-11 03:20, 2023-12-11 04:10, > 2023-12-11 05:05]|{2023-12-11 03:15:00, 2023-12-11 06:05:00}| > +------------+------------------------------------------------------------------------+------------------------------------------+ > {code} > > *{{partition by window()}}* > {code:sql} > SELECT > id, > ts, > collect_list(id) OVER (PARTITION BY window(ts, '1 hour')) as window_ids > FROM VALUES > (1, "2023-12-11 01:10"), > (2, "2023-12-11 01:15"), > (3, "2023-12-11 01:40"), > (4, "2023-12-11 02:05"), > (5, "2023-12-11 03:15"), > (6, "2023-12-11 03:20"), > (7, "2023-12-11 04:10"), > (8, "2023-12-11 05:05") > AS tab(id, ts) > {code} > Correctly assigns rows to 1-hour windows: > {code:java} > +---+----------------+----------+ > |id |ts |window_ids| > +---+----------------+----------+ > |1 |2023-12-11 01:10|[1, 2, 3] | > |2 |2023-12-11 01:15|[1, 2, 3] | > |3 |2023-12-11 01:40|[1, 2, 3] | > |4 |2023-12-11 02:05|[4] | > |5 |2023-12-11 03:15|[5, 6] | > |6 |2023-12-11 03:20|[5, 6] | > |7 |2023-12-11 04:10|[7] | > |8 |2023-12-11 05:05|[8] | > +---+----------------+----------+ > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org