[ https://issues.apache.org/jira/browse/SPARK-10816?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16623749#comment-16623749 ]
Jungtaek Lim edited comment on SPARK-10816 at 9/21/18 3:13 PM: --------------------------------------------------------------- [~msukmanowsky] Thanks for showing your interest on this! If you are interested on my proposal you can even pull my patch and build (though it is marked as WIP even now it works... handling state is just a bit suboptimal), and play with custom build. (If you read my proposal you may be noticed that the proposal addresses the lack you're seeing from map/flatMapGroupsWithState.) Your scenario looks like fit to simple gap window, with event time & watermark. I guess with my patch it would be represented as SQL statement like: {code:java} SELECT userId, session.start AS startTimestampMs, session.end AS endTimestampMs, session.end - session.start AS durationMs FROM tbl GROUP BY session(event_time, 30 minutes){code} or DSL in below links. (Here session.end is defined as gap ends. If you would like to have last event timestamp in session, max(event_time) would work.) In append mode you can only see the sessions which are evicted, and in update mode you can see all updated sessions for every batch. I also added UTs which converts structured sessionization example into session window function. Please check it out and let me know if something doesn't work as you expect. Append mode: [https://github.com/HeartSaVioR/spark/blob/fb19879ff2bbafdf7c844d1a8da9d30c07aefd76/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala#L475-L573] Update mode: [https://github.com/HeartSaVioR/spark/blob/fb19879ff2bbafdf7c844d1a8da9d30c07aefd76/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala#L618-L715] was (Author: kabhwan): [~msukmanowsky] Thanks for showing your interest on this! If you are interested on my proposal you can even pull my patch and build, and play with custom build. (If you read my proposal you may be noticed that the proposal addresses the lack you're seeing from map/flatMapGroupsWithState.) Your scenario looks like fit to simple gap window, with event time & watermark. I guess with my patch it would be represented as SQL statement like: {code:java} SELECT userId, session.start AS startTimestampMs, session.end AS endTimestampMs, session.end - session.start AS durationMs FROM tbl GROUP BY session(event_time, 30 minutes){code} or DSL in below links. (Here session.end is defined as gap ends. If you would like to have last event timestamp in session, max(event_time) would work.) In append mode you can only see the sessions which are evicted, and in update mode you can see all updated sessions for every batch. I also added UTs which converts structured sessionization example into session window function. Please check it out and let me know if something doesn't work as you expect. Append mode: [https://github.com/HeartSaVioR/spark/blob/fb19879ff2bbafdf7c844d1a8da9d30c07aefd76/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala#L475-L573] Update mode: [https://github.com/HeartSaVioR/spark/blob/fb19879ff2bbafdf7c844d1a8da9d30c07aefd76/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala#L618-L715] > EventTime based sessionization > ------------------------------ > > Key: SPARK-10816 > URL: https://issues.apache.org/jira/browse/SPARK-10816 > Project: Spark > Issue Type: New Feature > Components: Structured Streaming > Reporter: Reynold Xin > Priority: Major > Attachments: SPARK-10816 Support session window natively.pdf > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org