[ 
https://issues.apache.org/jira/browse/SPARK-46450?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17798381#comment-17798381
 ] 

Jungtaek Lim commented on SPARK-46450:
--------------------------------------

It's a missing one and maybe we will have to document - session window is only 
working properly with batch/streaming aggregation. If you use it as normal 
function and not ingesting the value to aggregation, merging sessions is never 
triggered.

> session_window doesn't identify sessions with provided gap when used as a 
> window function
> -----------------------------------------------------------------------------------------
>
>                 Key: SPARK-46450
>                 URL: https://issues.apache.org/jira/browse/SPARK-46450
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 3.4.1, 3.5.0
>            Reporter: Juan Pumarino
>            Priority: Minor
>
> {{PARTITION BY session_window}} doesn't produce the expected results. Here's 
> an example:
> {code:sql}
> SELECT 
>   id,
>   ts,
>   collect_list(id) OVER (PARTITION BY session_window(ts, '1 hour')) as 
> window_ids
> FROM VALUES
>   (1, "2023-12-11 01:10"),
>   (2, "2023-12-11 01:15"),
>   (3, "2023-12-11 01:40"),
>   (4, "2023-12-11 02:05"),
>   (5, "2023-12-11 03:15"),
>   (6, "2023-12-11 03:20"),
>   (7, "2023-12-11 04:10"),
>   (8, "2023-12-11 05:05")
>   AS tab(id, ts)
> {code}
> Actual result:
> {code:java}
> +---+----------------+----------+
> |id |ts              |window_ids|
> +---+----------------+----------+
> |1  |2023-12-11 01:10|[1]       |
> |2  |2023-12-11 01:15|[2]       |
> |3  |2023-12-11 01:40|[3]       |
> |4  |2023-12-11 02:05|[4]       |
> |5  |2023-12-11 03:15|[5]       |
> |6  |2023-12-11 03:20|[6]       |
> |7  |2023-12-11 04:10|[7]       |
> |8  |2023-12-11 05:05|[8]       |
> +---+----------------+----------+
> {code}
> Expected result, assigning rows to two sessions with 1-hour gap:
> {code:java}
> +---+----------------+------------+
> |id |ts              |window_ids  |
> +---+----------------+------------+
> |1  |2023-12-11 01:10|[1, 2, 3, 4]|
> |2  |2023-12-11 01:15|[1, 2, 3, 4]|
> |3  |2023-12-11 01:40|[1, 2, 3, 4]|
> |4  |2023-12-11 02:05|[1, 2, 3, 4]|
> |5  |2023-12-11 03:15|[5, 6, 7, 8]|
> |6  |2023-12-11 03:20|[5, 6, 7, 8]|
> |7  |2023-12-11 04:10|[5, 6, 7, 8]|
> |8  |2023-12-11 05:05|[5, 6, 7, 8]|
> +---+----------------+------------+
> {code}
> I compared its behavior with the results as a grouping function and with how 
> {{window()}} behaves in both cases, which seems to confirm that the result is 
> inconsistent. Here are the other examples:
> *{{group by window()}}*
> {code:sql}
> SELECT 
>   collect_list(id) AS ids,
>   collect_list(ts) AS tss,
>   window
> FROM VALUES
>   (1, "2023-12-11 01:10"),
>   (2, "2023-12-11 01:15"),
>   (3, "2023-12-11 01:40"),
>   (4, "2023-12-11 02:05"),
>   (5, "2023-12-11 03:15"),
>   (6, "2023-12-11 03:20"),
>   (7, "2023-12-11 04:10"),
>   (8, "2023-12-11 05:05")
>   AS tab(id, ts)
> GROUP by window(ts, '1 hour')
> {code}
> Correctly assigns rows to 1-hour windows:
> {code:java}
> +---------+------------------------------------------------------+------------------------------------------+
> |ids      |tss                                                   |window      
>                               |
> +---------+------------------------------------------------------+------------------------------------------+
> |[1, 2, 3]|[2023-12-11 01:10, 2023-12-11 01:15, 2023-12-11 01:40]|{2023-12-11 
> 01:00:00, 2023-12-11 02:00:00}|
> |[4]      |[2023-12-11 02:05]                                    |{2023-12-11 
> 02:00:00, 2023-12-11 03:00:00}|
> |[5, 6]   |[2023-12-11 03:15, 2023-12-11 03:20]                  |{2023-12-11 
> 03:00:00, 2023-12-11 04:00:00}|
> |[7]      |[2023-12-11 04:10]                                    |{2023-12-11 
> 04:00:00, 2023-12-11 05:00:00}|
> |[8]      |[2023-12-11 05:05]                                    |{2023-12-11 
> 05:00:00, 2023-12-11 06:00:00}|
> +---------+------------------------------------------------------+------------------------------------------+
> {code}
>  
> *{{group by session_window()}}*
> {code:sql}
> SELECT 
>   collect_list(id) AS ids,
>   collect_list(ts) AS tss,
>   session_window
> FROM VALUES
>   (1, "2023-12-11 01:10"),
>   (2, "2023-12-11 01:15"),
>   (3, "2023-12-11 01:40"),
>   (4, "2023-12-11 02:05"),
>   (5, "2023-12-11 03:15"),
>   (6, "2023-12-11 03:20"),
>   (7, "2023-12-11 04:10"),
>   (8, "2023-12-11 05:05")
>   AS tab(id, ts)
> GROUP by session_window(ts, '1 hour')
> {code}
> Correctly assigns rows to two sessions with 1-hour gap:
> {code:java}
> +------------+------------------------------------------------------------------------+------------------------------------------+
> |ids         |tss                                                             
>         |session_window                            |
> +------------+------------------------------------------------------------------------+------------------------------------------+
> |[1, 2, 3, 4]|[2023-12-11 01:10, 2023-12-11 01:15, 2023-12-11 01:40, 
> 2023-12-11 02:05]|{2023-12-11 01:10:00, 2023-12-11 03:05:00}|
> |[5, 6, 7, 8]|[2023-12-11 03:15, 2023-12-11 03:20, 2023-12-11 04:10, 
> 2023-12-11 05:05]|{2023-12-11 03:15:00, 2023-12-11 06:05:00}|
> +------------+------------------------------------------------------------------------+------------------------------------------+
> {code}
>  
> *{{partition by window()}}*
> {code:sql}
> SELECT 
>   id,
>   ts,
>   collect_list(id) OVER (PARTITION BY window(ts, '1 hour')) as window_ids
> FROM VALUES
>   (1, "2023-12-11 01:10"),
>   (2, "2023-12-11 01:15"),
>   (3, "2023-12-11 01:40"),
>   (4, "2023-12-11 02:05"),
>   (5, "2023-12-11 03:15"),
>   (6, "2023-12-11 03:20"),
>   (7, "2023-12-11 04:10"),
>   (8, "2023-12-11 05:05")
>   AS tab(id, ts)
> {code}
> Correctly assigns rows to 1-hour windows:
> {code:java}
> +---+----------------+----------+
> |id |ts              |window_ids|
> +---+----------------+----------+
> |1  |2023-12-11 01:10|[1, 2, 3] |
> |2  |2023-12-11 01:15|[1, 2, 3] |
> |3  |2023-12-11 01:40|[1, 2, 3] |
> |4  |2023-12-11 02:05|[4]       |
> |5  |2023-12-11 03:15|[5, 6]    |
> |6  |2023-12-11 03:20|[5, 6]    |
> |7  |2023-12-11 04:10|[7]       |
> |8  |2023-12-11 05:05|[8]       |
> +---+----------------+----------+
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to