[ https://issues.apache.org/jira/browse/SPARK-10816?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16658599#comment-16658599 ]
Jungtaek Lim edited comment on SPARK-10816 at 10/22/18 5:54 AM: ---------------------------------------------------------------- Just going back to review the origin comment of [~zsxwing]. I guess it is a state-oriented view and various approaches are still available on how to leverage the format of state. (For example, one big physical exec vs smaller physical execs with leveraging existing physical exec) For now let's concentrate on pros and cons of state format. [1] is pretty clear that it works, and very simple to implement based on this. But it is also pretty clear it requires loading all the sessions for given key in memory, as well as requires shifting left and right on array. It may require search whether the session is updated or new. It also always overwrites the value of given key, hence all the sessions for given key will be written to delta once there's any change on the key. [2] would help to overwrite updated sessions if we assume start timestamp of session is unchanged, which would require some overheads on [1] and [3], seeking and writing. Unfortunately the assumption is not correct: start timestamp of session can be modified as well via late event, so we need to handle such corner case, which means we still need to traverse sessions in given key which would be hugely inefficient. If key is not sorted, it requires full traverse of key space. [3] is the thing which let us play with trade-off. How it works is basically similar with [1], since it simulates indexable array. It has some more overheads to manipulate two states, but it doesn't require loading all of sessions in given key to memory. Moreover, it is tweaked to reduce delta on removal of elements, which has possible downside - broken of order - which we should play with trade-off. (One idea of addressing both order of elements and huge delta on shifting is placing tombstones instead of removing elements, and periodically removing tombstones and shifting - maybe during taking snapshot if possible?) I guess it is unlikely to have plenty of valid sessions in given key at the specific time, but it's just me, so [3] may be preferred way to go. My patch leverages [3] but due to out of order after removal on [3], memory issue still exists as same as [1], which I'll try to address without introducing too huge overhead. Please let me know above analysis would be good to be included in SPIP or detailed design doc. Btw, we may also would like to talk about how to add complexity: one big physical exec vs smaller physical execs with leveraging existing physical execs. Both mine and Baidu's patches are taking latter approach, but I can try to make changes if Spark community prefers former to achieve high cohesion to specific exec instead of injecting new execs into multiple places. was (Author: kabhwan): Just going back to review the origin comment of [~zsxwing]. I guess it is a state-oriented view and various approaches are still available on how to leverage the format of state. (For example, one big physical exec vs smaller physical execs with leveraging existing physical exec) For now let's concentrate on pros and cons of state format. [1] is pretty clear that it works, and very simple to implement based on this. But it is also pretty clear it requires loading all the sessions for given key in memory, as well as requires shifting left and right on array. It may require search whether the session is updated or new. It also always overwrites the value of given key, hence all the sessions for given key will be written to delta once there's any change on the key. [2] would help to overwrite updated sessions if we assume start timestamp of session is unchanged, which would require some overheads on [1] and [3], seeking and writing. Unfortunately the assumption is not correct: start timestamp of session can be modified as well via late event, so we need to handle such corner case, which means we still need to traverse sessions in given key which would be hugely inefficient. If key is not sorted, it requires full traverse of key space. [3] is the thing which let us play with trade-off. How it works is basically similar with [1], since it simulates indexable array. It has some more overheads to manipulate two states, but it doesn't require loading all of sessions in given key to memory. Moreover, it can be tweaked to reduce delta on removal of elements, which has possible downside - broken of order - which we should play with trade-off. (One idea of addressing both order of elements and huge delta on shifting is placing tombstones instead of removing elements, and periodically removing tombstones and shifting - maybe during taking snapshot if possible?) I guess it is unlikely to have plenty of valid sessions in given key at the specific time, but it's just me, so [3] may be preferred way to go. My patch leverages [3] but due to out of order after removal on [3], memory issue still exists as same as [1], which I'll try to address without introducing too huge overhead. Please let me know above analysis would be good to be included in SPIP or detailed design doc. Btw, we may also would like to talk about how to add complexity: one big physical exec vs smaller physical execs with leveraging existing physical execs. Both mine and Baidu's patches are taking latter approach, but I can try to make changes if Spark community prefers former to achieve high cohesion to specific exec instead of injecting new execs into multiple places. > EventTime based sessionization > ------------------------------ > > Key: SPARK-10816 > URL: https://issues.apache.org/jira/browse/SPARK-10816 > Project: Spark > Issue Type: New Feature > Components: Structured Streaming > Reporter: Reynold Xin > Priority: Major > Attachments: SPARK-10816 Support session window natively.pdf, Session > Window Support For Structure Streaming.pdf > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org