[ https://issues.apache.org/jira/browse/SPARK-10816?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16623749#comment-16623749 ]
Jungtaek Lim edited comment on SPARK-10816 at 9/22/18 6:00 AM: --------------------------------------------------------------- [~msukmanowsky] Thanks for showing your interest on this! If you are interested on my proposal you can even pull my patch and build (though it is marked as WIP even now it works... handling state is just a bit suboptimal), and play with custom build. (If you read my proposal you may be noticed that the proposal addresses the missing part you're seeing from map/flatMapGroupsWithState.) Your scenario looks like fit to simple gap window, with event time & watermark. I guess with my patch it would be represented as SQL statement like: {code:java} SELECT userId, session.start AS startTimestampMs, session.end AS endTimestampMs, session.end - session.start AS durationMs FROM tbl GROUP BY userId, session(event_time, 30 minutes){code} or DSL in below links. (Here session.end is defined as gap ends. If you would like to have last event timestamp in session, max(event_time) would work.) In append mode you can only see the sessions which are evicted, and in update mode you can see all updated sessions for every batch. I also added UTs which converts structured sessionization example into session window function. Please check it out and let me know if something doesn't work as you expect. Append mode: [https://github.com/HeartSaVioR/spark/blob/fb19879ff2bbafdf7c844d1a8da9d30c07aefd76/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala#L475-L573] Update mode: [https://github.com/HeartSaVioR/spark/blob/fb19879ff2bbafdf7c844d1a8da9d30c07aefd76/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala#L618-L715] The query will work for both batch and streaming without modification of SQL statement or DSL. For batch it doesn't leverage state. was (Author: kabhwan): [~msukmanowsky] Thanks for showing your interest on this! If you are interested on my proposal you can even pull my patch and build (though it is marked as WIP even now it works... handling state is just a bit suboptimal), and play with custom build. (If you read my proposal you may be noticed that the proposal addresses the missing part you're seeing from map/flatMapGroupsWithState.) Your scenario looks like fit to simple gap window, with event time & watermark. I guess with my patch it would be represented as SQL statement like: {code:java} SELECT userId, session.start AS startTimestampMs, session.end AS endTimestampMs, session.end - session.start AS durationMs FROM tbl GROUP BY session(event_time, 30 minutes){code} or DSL in below links. (Here session.end is defined as gap ends. If you would like to have last event timestamp in session, max(event_time) would work.) In append mode you can only see the sessions which are evicted, and in update mode you can see all updated sessions for every batch. I also added UTs which converts structured sessionization example into session window function. Please check it out and let me know if something doesn't work as you expect. Append mode: [https://github.com/HeartSaVioR/spark/blob/fb19879ff2bbafdf7c844d1a8da9d30c07aefd76/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala#L475-L573] Update mode: [https://github.com/HeartSaVioR/spark/blob/fb19879ff2bbafdf7c844d1a8da9d30c07aefd76/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala#L618-L715] The query will work for both batch and streaming without modification of SQL statement or DSL. For batch it doesn't leverage state. > EventTime based sessionization > ------------------------------ > > Key: SPARK-10816 > URL: https://issues.apache.org/jira/browse/SPARK-10816 > Project: Spark > Issue Type: New Feature > Components: Structured Streaming > Reporter: Reynold Xin > Priority: Major > Attachments: SPARK-10816 Support session window natively.pdf > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org