I had a query Say I have a single key with 2 live sessions ( A and B ) with a configured lateness .
Do these invariants hold? * The state is scoped to the key (created per key in the ProcessWindowFunction with a ttl ) * The state will remain alive irrespective of whether the Window is closed or not (a TTL timer does the collection ) * The execution on a key is sequential , as in if 2 events arrive for the 2 Sessions they happen sequentially ( or in any order but without the need of synchronization ) * The state mutated by an event in Session A, will be visible to Session B if an event incident on Session B was to happen subsequently. There is no need of synchronizing access to the state as it for the same key. What I am not sure about is what happens when session A merge with session B. I would assume that it just is defining new start and end of the merged window, Gcing the old ones ( or at least one of them ) and assigning that even to that new window. What one does with the custom state in ProcessWindowFunction ( there is a CountTrigger of 1 ) , really what is done in the process method above, As in this state is 1 degree removed from what ever flink does internally with it's merges given that the state is scoped to the key. On Fri, Mar 12, 2021 at 12:37 PM Vishal Santoshi <vishal.santo...@gmail.com> wrote: > Yep, makes sense. > > On Fri, Mar 12, 2021 at 10:12 AM Roman Khachatryan <ro...@apache.org> > wrote: > >> > Want to confirm that the keys are GCed ( along with state ) once the >> (windows close + lateness ) ? >> Window state is cleared (as well as the window itself), but global >> state is not (unless you use TTL). >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#state-time-to-live-ttl >> >> Regards, >> Roman >> >> On Fri, Mar 12, 2021 at 2:16 PM Vishal Santoshi >> <vishal.santo...@gmail.com> wrote: >> > >> > Sometimes writing it down makes you think. I now realize that this is >> not the right approach, given that merging windows will have their own >> states..and how the merge happens is really at the key level.... >> > >> > >> > >> > On Fri, Mar 12, 2021 at 6:27 AM Vishal Santoshi < >> vishal.santo...@gmail.com> wrote: >> >> >> >> I intend to augment every event in a session with a unique ID. To >> keep the session lean, there is a PurgingTrigger on this aggregate that >> fires on a count of 1. >> >> >> >> >> (except that the number of keys can grow). >> >> >> >> Want to confirm that the keys are GCed ( along with state ) once the >> (windows close + lateness ) ? >> >> >> >> >> >> >> >> On Fri, Mar 12, 2021 at 5:32 AM Roman Khachatryan <ro...@apache.org> >> wrote: >> >>> >> >>> Hi Vishal, >> >>> >> >>> There is no leak in the code you provided (except that the number of >> >>> keys can grow). >> >>> But as you figured out the state is scoped to key, not to window+key. >> >>> >> >>> Could you explain what you are trying to achieve and why do you need >> to combine >> >>> sliding windows with state scoped to window+key? >> >>> >> >>> Regards, >> >>> Roman >> >>> >> >>> On Fri, Mar 12, 2021 at 5:13 AM Vishal Santoshi >> >>> <vishal.santo...@gmail.com> wrote: >> >>> > >> >>> > Essentially, Does this code leak state >> >>> > >> >>> > private static class SessionIdProcessWindowFunction<KEY extends >> java.io.Serializable, VALUE extends java.io.Serializable> >> >>> > extends >> >>> > ProcessWindowFunction<KeyedSession<KEY, VALUE>, >> KeyedSessionWithSessionID<KEY, VALUE>, KEY, TimeWindow> { >> >>> > private static final long serialVersionUID = 1L; >> >>> > private final static ValueStateDescriptor<String> sessionId = new >> ValueStateDescriptor<String>("session_uid", >> >>> > String.class); >> >>> > >> >>> > @Override >> >>> > public void process(KEY key, >> >>> > ProcessWindowFunction<KeyedSession<KEY, VALUE>, >> KeyedSessionWithSessionID<KEY, VALUE>, KEY, TimeWindow>.Context context, >> >>> > Iterable<KeyedSession<KEY, VALUE>> elements, >> Collector<KeyedSessionWithSessionID<KEY, VALUE>> out) >> >>> > throws Exception { >> >>> > // I need this scoped to key/window >> >>> > if (getRuntimeContext().getState(sessionId).value() == null) { >> >>> > UUID uuid = UUID.randomUUID(); >> >>> > getRuntimeContext().getState(sessionId).update(uuid.toString()); >> >>> > } >> >>> > String uuid = getRuntimeContext().getState(sessionId).value(); >> >>> > out.collect(new >> KeyedSessionWithSessionID<>(elements.iterator().next(), uuid)); >> >>> > } >> >>> > } >> >>> > >> >>> > On Thu, Mar 11, 2021 at 11:09 PM Vishal Santoshi < >> vishal.santo...@gmail.com> wrote: >> >>> >> >> >>> >> Hello folks, >> >>> >> The suggestion is to use windowState() for a key >> key per window state and clear the state explicitly. Also it seems that >> getRuntime().getState() will return a globalWindow() where state is shared >> among windows with the same key. I desire of course to have state scoped to >> a key per window and was wanting to use windowState().. The caveat is that >> my window is a Session Window and when I try to use clear() I am thrown >> this exception ( Session Windows are Merging Windows ) >> >>> >> >> >>> >> Caused by: java.lang.UnsupportedOperationException: Per-window >> state is not allowed when using merging windows. >> >>> >> >> >>> >> >> >>> >> The questions are >> >>> >> >> >>> >> * How do I have state per session window/ per key and still be >> able to clear it ? >> >>> >> * Does getRuntime().getState() give me the clear() semantics for >> free along with state per window per key and thus I have understood >> getRuntime().getState() wrong ? >> >>> >> >> >>> >> Regards. >> >>> >> >> >>> >> >> >>> >> >> >