[ 
https://issues.apache.org/jira/browse/SPARK-10816?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16623749#comment-16623749
 ] 

Jungtaek Lim edited comment on SPARK-10816 at 9/21/18 3:10 PM:
---------------------------------------------------------------

[~msukmanowsky]

Thanks for showing your interest on this! If you are interested on my proposal 
you can even pull my patch and build, and play with custom build. (If you read 
my proposal you may be noticed that the proposal addresses the lack you're 
seeing from map/flatMapGroupsWithState.)

Your scenario looks like fit to simple gap window, with event time & watermark. 
I guess with my patch it would be represented as SQL statement like:
{code:java}
SELECT userId, session.start AS startTimestampMs, session.end AS 
endTimestampMs, session.end - session.start AS durationMs FROM tbl GROUP BY 
session(event_time, 30 minutes){code}
or DSL in below links.

(Here session.end is defined as gap ends. If you would like to have last event 
timestamp in session, max(event_time) would work.)

In append mode you can only see the sessions which are evicted, and in update 
mode you can see all updated sessions for every batch.

I also added UTs which converts structured sessionization example into session 
window function. Please check it out and let me know if something doesn't work 
as you expect.

Append mode:

[https://github.com/HeartSaVioR/spark/blob/fb19879ff2bbafdf7c844d1a8da9d30c07aefd76/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala#L475-L573]

Update mode:

[https://github.com/HeartSaVioR/spark/blob/fb19879ff2bbafdf7c844d1a8da9d30c07aefd76/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala#L618-L715]

 


was (Author: kabhwan):
[~msukmanowsky]

Thanks for showing your interest on this! If you are interested on my proposal 
you can even pull my patch and build, and play with custom build.

Your scenario looks like fit to simple gap window, with event time & watermark. 
I guess with my patch it would be represented as SQL statement like:
{code:java}
SELECT userId, session.start AS startTimestampMs, session.end AS 
endTimestampMs, session.end - session.start AS durationMs FROM tbl GROUP BY 
session(event_time, 30 minutes){code}
or DSL in below links.

(Here session.end is defined as gap ends. If you would like to have last event 
timestamp in session, max(event_time) would work.)

In append mode you can only see the sessions which are evicted, and in update 
mode you can see all updated sessions for every batch.

I also added UTs which converts structured sessionization example into session 
window function. Please check it out and let me know if something doesn't work 
as you expect.

Append mode:

[https://github.com/HeartSaVioR/spark/blob/fb19879ff2bbafdf7c844d1a8da9d30c07aefd76/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala#L475-L573]

Update mode:

[https://github.com/HeartSaVioR/spark/blob/fb19879ff2bbafdf7c844d1a8da9d30c07aefd76/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala#L618-L715]

 

> EventTime based sessionization
> ------------------------------
>
>                 Key: SPARK-10816
>                 URL: https://issues.apache.org/jira/browse/SPARK-10816
>             Project: Spark
>          Issue Type: New Feature
>          Components: Structured Streaming
>            Reporter: Reynold Xin
>            Priority: Major
>         Attachments: SPARK-10816 Support session window natively.pdf
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to