[ 
https://issues.apache.org/jira/browse/SPARK-10816?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16623749#comment-16623749
 ] 

Jungtaek Lim edited comment on SPARK-10816 at 9/22/18 6:00 AM:
---------------------------------------------------------------

[~msukmanowsky]

Thanks for showing your interest on this! If you are interested on my proposal 
you can even pull my patch and build (though it is marked as WIP even now it 
works... handling state is just a bit suboptimal), and play with custom build. 
(If you read my proposal you may be noticed that the proposal addresses the 
missing part you're seeing from map/flatMapGroupsWithState.)

Your scenario looks like fit to simple gap window, with event time & watermark. 
I guess with my patch it would be represented as SQL statement like:
{code:java}
SELECT userId, session.start AS startTimestampMs, session.end AS 
endTimestampMs, session.end - session.start AS durationMs FROM tbl GROUP BY 
userId, session(event_time, 30 minutes){code}
or DSL in below links.

(Here session.end is defined as gap ends. If you would like to have last event 
timestamp in session, max(event_time) would work.)

In append mode you can only see the sessions which are evicted, and in update 
mode you can see all updated sessions for every batch.

I also added UTs which converts structured sessionization example into session 
window function. Please check it out and let me know if something doesn't work 
as you expect.

Append mode:

[https://github.com/HeartSaVioR/spark/blob/fb19879ff2bbafdf7c844d1a8da9d30c07aefd76/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala#L475-L573]

Update mode:

[https://github.com/HeartSaVioR/spark/blob/fb19879ff2bbafdf7c844d1a8da9d30c07aefd76/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala#L618-L715]

The query will work for both batch and streaming without modification of SQL 
statement or DSL. For batch it doesn't leverage state. 


was (Author: kabhwan):
[~msukmanowsky]

Thanks for showing your interest on this! If you are interested on my proposal 
you can even pull my patch and build (though it is marked as WIP even now it 
works... handling state is just a bit suboptimal), and play with custom build. 
(If you read my proposal you may be noticed that the proposal addresses the 
missing part you're seeing from map/flatMapGroupsWithState.)

Your scenario looks like fit to simple gap window, with event time & watermark. 
I guess with my patch it would be represented as SQL statement like:
{code:java}
SELECT userId, session.start AS startTimestampMs, session.end AS 
endTimestampMs, session.end - session.start AS durationMs FROM tbl GROUP BY 
session(event_time, 30 minutes){code}
or DSL in below links.

(Here session.end is defined as gap ends. If you would like to have last event 
timestamp in session, max(event_time) would work.)

In append mode you can only see the sessions which are evicted, and in update 
mode you can see all updated sessions for every batch.

I also added UTs which converts structured sessionization example into session 
window function. Please check it out and let me know if something doesn't work 
as you expect.

Append mode:

[https://github.com/HeartSaVioR/spark/blob/fb19879ff2bbafdf7c844d1a8da9d30c07aefd76/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala#L475-L573]

Update mode:

[https://github.com/HeartSaVioR/spark/blob/fb19879ff2bbafdf7c844d1a8da9d30c07aefd76/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala#L618-L715]

The query will work for both batch and streaming without modification of SQL 
statement or DSL. For batch it doesn't leverage state. 

> EventTime based sessionization
> ------------------------------
>
>                 Key: SPARK-10816
>                 URL: https://issues.apache.org/jira/browse/SPARK-10816
>             Project: Spark
>          Issue Type: New Feature
>          Components: Structured Streaming
>            Reporter: Reynold Xin
>            Priority: Major
>         Attachments: SPARK-10816 Support session window natively.pdf
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to