Thank you for the confirmation. The simulations confirm too. On Fri, Apr 9, 2021 at 11:14 AM Roman Khachatryan <ro...@apache.org> wrote:
> Hi Vishal, > > Sorry for the late reply, > Please find my answers below. > By state I assume the state obtained via getRuntimeContext (access to > window state is not allowed).. > > > The state is scoped to the key (created per key in the > ProcessWindowFunction with a ttl ) > Yes. > > > The state will remain alive irrespective of whether the Window is closed > or not (a TTL timer does the collection ) > Right, but you need to configure TTL when accessing the state [1] > > > The execution on a key is sequential , as in if 2 events arrive for the > 2 Sessions they happen sequentially ( or in any order but without the need > of synchronization ) > Right. > > > The state mutated by an event in Session A, will be visible to Session B > if an event incident on Session B was to happen subsequently. There is no > need of synchronizing access to the state as it for the same key. > Right. > > Your understanding of merging of window contents is also correct. > > [1] > https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#state-time-to-live-ttl > > Regards, > Roman > > > On Wed, Mar 31, 2021 at 5:45 PM Vishal Santoshi > <vishal.santo...@gmail.com> wrote: > > > > I had a query Say I have a single key with 2 live sessions ( A and B ) > with a configured lateness . > > > > Do these invariants hold? > > > > * The state is scoped to the key (created per key in the > ProcessWindowFunction with a ttl ) > > * The state will remain alive irrespective of whether the Window is > closed or not (a TTL timer does the collection ) > > * The execution on a key is sequential , as in if 2 events arrive for > the 2 Sessions they happen sequentially ( or in any order but without the > need of synchronization ) > > * The state mutated by an event in Session A, will be visible to Session > B if an event incident on Session B was to happen subsequently. There is > no need of synchronizing access to the state as it for the same key. > > > > What I am not sure about is what happens when session A merge with > session B. I would assume that it just is defining new start and end of the > merged window, Gcing the old ones ( or at least one of them ) and assigning > that even to that new window. What one does with the custom state in > ProcessWindowFunction ( there is a CountTrigger of 1 ) , really what is > done in the process method above, As in this state is 1 degree removed > from what ever flink does internally with it's merges given that the state > is scoped to the key. > > > > > > > > > > > > > > > > On Fri, Mar 12, 2021 at 12:37 PM Vishal Santoshi < > vishal.santo...@gmail.com> wrote: > >> > >> Yep, makes sense. > >> > >> On Fri, Mar 12, 2021 at 10:12 AM Roman Khachatryan <ro...@apache.org> > wrote: > >>> > >>> > Want to confirm that the keys are GCed ( along with state ) once > the (windows close + lateness ) ? > >>> Window state is cleared (as well as the window itself), but global > >>> state is not (unless you use TTL). > >>> > >>> [1] > https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#state-time-to-live-ttl > >>> > >>> Regards, > >>> Roman > >>> > >>> On Fri, Mar 12, 2021 at 2:16 PM Vishal Santoshi > >>> <vishal.santo...@gmail.com> wrote: > >>> > > >>> > Sometimes writing it down makes you think. I now realize that this > is not the right approach, given that merging windows will have their own > states..and how the merge happens is really at the key level.... > >>> > > >>> > > >>> > > >>> > On Fri, Mar 12, 2021 at 6:27 AM Vishal Santoshi < > vishal.santo...@gmail.com> wrote: > >>> >> > >>> >> I intend to augment every event in a session with a unique ID. To > keep the session lean, there is a PurgingTrigger on this aggregate that > fires on a count of 1. > >>> >> > >>> >> >> (except that the number of keys can grow). > >>> >> > >>> >> Want to confirm that the keys are GCed ( along with state ) once > the (windows close + lateness ) ? > >>> >> > >>> >> > >>> >> > >>> >> On Fri, Mar 12, 2021 at 5:32 AM Roman Khachatryan <ro...@apache.org> > wrote: > >>> >>> > >>> >>> Hi Vishal, > >>> >>> > >>> >>> There is no leak in the code you provided (except that the number > of > >>> >>> keys can grow). > >>> >>> But as you figured out the state is scoped to key, not to > window+key. > >>> >>> > >>> >>> Could you explain what you are trying to achieve and why do you > need to combine > >>> >>> sliding windows with state scoped to window+key? > >>> >>> > >>> >>> Regards, > >>> >>> Roman > >>> >>> > >>> >>> On Fri, Mar 12, 2021 at 5:13 AM Vishal Santoshi > >>> >>> <vishal.santo...@gmail.com> wrote: > >>> >>> > > >>> >>> > Essentially, Does this code leak state > >>> >>> > > >>> >>> > private static class SessionIdProcessWindowFunction<KEY extends > java.io.Serializable, VALUE extends java.io.Serializable> > >>> >>> > extends > >>> >>> > ProcessWindowFunction<KeyedSession<KEY, VALUE>, > KeyedSessionWithSessionID<KEY, VALUE>, KEY, TimeWindow> { > >>> >>> > private static final long serialVersionUID = 1L; > >>> >>> > private final static ValueStateDescriptor<String> sessionId = > new ValueStateDescriptor<String>("session_uid", > >>> >>> > String.class); > >>> >>> > > >>> >>> > @Override > >>> >>> > public void process(KEY key, > >>> >>> > ProcessWindowFunction<KeyedSession<KEY, VALUE>, > KeyedSessionWithSessionID<KEY, VALUE>, KEY, TimeWindow>.Context context, > >>> >>> > Iterable<KeyedSession<KEY, VALUE>> elements, > Collector<KeyedSessionWithSessionID<KEY, VALUE>> out) > >>> >>> > throws Exception { > >>> >>> > // I need this scoped to key/window > >>> >>> > if (getRuntimeContext().getState(sessionId).value() == null) { > >>> >>> > UUID uuid = UUID.randomUUID(); > >>> >>> > getRuntimeContext().getState(sessionId).update(uuid.toString()); > >>> >>> > } > >>> >>> > String uuid = getRuntimeContext().getState(sessionId).value(); > >>> >>> > out.collect(new > KeyedSessionWithSessionID<>(elements.iterator().next(), uuid)); > >>> >>> > } > >>> >>> > } > >>> >>> > > >>> >>> > On Thu, Mar 11, 2021 at 11:09 PM Vishal Santoshi < > vishal.santo...@gmail.com> wrote: > >>> >>> >> > >>> >>> >> Hello folks, > >>> >>> >> The suggestion is to use windowState() for a > key key per window state and clear the state explicitly. Also it seems > that getRuntime().getState() will return a globalWindow() where state is > shared among windows with the same key. I desire of course to have state > scoped to a key per window and was wanting to use windowState().. The > caveat is that my window is a Session Window and when I try to use clear() > I am thrown this exception ( Session Windows are Merging Windows ) > >>> >>> >> > >>> >>> >> Caused by: java.lang.UnsupportedOperationException: Per-window > state is not allowed when using merging windows. > >>> >>> >> > >>> >>> >> > >>> >>> >> The questions are > >>> >>> >> > >>> >>> >> * How do I have state per session window/ per key and still be > able to clear it ? > >>> >>> >> * Does getRuntime().getState() give me the clear() semantics > for free along with state per window per key and thus I have understood > getRuntime().getState() wrong ? > >>> >>> >> > >>> >>> >> Regards. > >>> >>> >> > >>> >>> >> > >>> >>> >> >