I had a query Say I have a single key with 2 live sessions ( A and B )
with a configured lateness .

Do these invariants hold?

* The state is scoped to the key (created per key in the
ProcessWindowFunction with a ttl )
* The state will remain alive irrespective of whether the Window is closed
or not (a TTL timer does the collection )
*  The execution on a key is sequential , as in if 2 events arrive for the
2 Sessions they happen sequentially ( or in any order but without the need
of synchronization )
* The state mutated by an event in Session A, will be visible to Session B
if an event incident on Session B was to happen subsequently.  There is no
need of synchronizing access to the state as it for the same key.

What I am not sure about is what happens when session A merge with session
B. I would assume that it just is defining new start and end of the merged
window, Gcing the old ones ( or at least one of them ) and assigning that
even to that new window. What one does with the custom state in
ProcessWindowFunction ( there is a CountTrigger of 1 ) ,  really what is
done in the process method above,  As in this state is 1 degree removed
from what ever flink does internally with it's merges given that the state
is scoped to the key.







On Fri, Mar 12, 2021 at 12:37 PM Vishal Santoshi <vishal.santo...@gmail.com>
wrote:

> Yep, makes sense.
>
> On Fri, Mar 12, 2021 at 10:12 AM Roman Khachatryan <ro...@apache.org>
> wrote:
>
>> > Want to confirm that the keys are GCed ( along with state ) once the
>> (windows close + lateness ) ?
>> Window state is cleared (as well as the window itself), but global
>> state is not (unless you use TTL).
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#state-time-to-live-ttl
>>
>> Regards,
>> Roman
>>
>> On Fri, Mar 12, 2021 at 2:16 PM Vishal Santoshi
>> <vishal.santo...@gmail.com> wrote:
>> >
>> > Sometimes writing it down makes you think. I now realize that this is
>> not the right approach, given that merging windows will have their own
>> states..and how the merge happens is really at the key level....
>> >
>> >
>> >
>> > On Fri, Mar 12, 2021 at 6:27 AM Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>> >>
>> >> I intend to augment every event in a session  with a unique ID.  To
>> keep the session lean, there is a PurgingTrigger on this aggregate that
>> fires on a count of 1.
>> >>
>> >> >> (except that the number of keys can grow).
>> >>
>> >> Want to confirm that the keys are GCed ( along with state ) once the
>> (windows close + lateness ) ?
>> >>
>> >>
>> >>
>> >> On Fri, Mar 12, 2021 at 5:32 AM Roman Khachatryan <ro...@apache.org>
>> wrote:
>> >>>
>> >>> Hi Vishal,
>> >>>
>> >>> There is no leak in the code you provided (except that the number of
>> >>> keys can grow).
>> >>> But as you figured out the state is scoped to key, not to window+key.
>> >>>
>> >>> Could you explain what you are trying to achieve and why do you need
>> to combine
>> >>> sliding windows with state scoped to window+key?
>> >>>
>> >>> Regards,
>> >>> Roman
>> >>>
>> >>> On Fri, Mar 12, 2021 at 5:13 AM Vishal Santoshi
>> >>> <vishal.santo...@gmail.com> wrote:
>> >>> >
>> >>> > Essentially, Does this code leak state
>> >>> >
>> >>> > private static class SessionIdProcessWindowFunction<KEY extends
>> java.io.Serializable, VALUE extends java.io.Serializable>
>> >>> > extends
>> >>> > ProcessWindowFunction<KeyedSession<KEY, VALUE>,
>> KeyedSessionWithSessionID<KEY, VALUE>, KEY, TimeWindow> {
>> >>> > private static final long serialVersionUID = 1L;
>> >>> > private final static ValueStateDescriptor<String> sessionId = new
>> ValueStateDescriptor<String>("session_uid",
>> >>> > String.class);
>> >>> >
>> >>> > @Override
>> >>> > public void process(KEY key,
>> >>> > ProcessWindowFunction<KeyedSession<KEY, VALUE>,
>> KeyedSessionWithSessionID<KEY, VALUE>, KEY, TimeWindow>.Context context,
>> >>> > Iterable<KeyedSession<KEY, VALUE>> elements,
>> Collector<KeyedSessionWithSessionID<KEY, VALUE>> out)
>> >>> > throws Exception {
>> >>> > // I need this scoped to key/window
>> >>> > if (getRuntimeContext().getState(sessionId).value() == null) {
>> >>> > UUID uuid = UUID.randomUUID();
>> >>> > getRuntimeContext().getState(sessionId).update(uuid.toString());
>> >>> > }
>> >>> > String uuid = getRuntimeContext().getState(sessionId).value();
>> >>> > out.collect(new
>> KeyedSessionWithSessionID<>(elements.iterator().next(), uuid));
>> >>> > }
>> >>> > }
>> >>> >
>> >>> > On Thu, Mar 11, 2021 at 11:09 PM Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>> >>> >>
>> >>> >> Hello folks,
>> >>> >>                   The suggestion is to use windowState() for a key
>> key per window state and clear the state explicitly.  Also it seems that
>> getRuntime().getState() will return a globalWindow() where state is shared
>> among windows with the same key. I desire of course to have state scoped to
>> a key per window and was wanting to use windowState().. The caveat is that
>> my window is a Session Window and when I try to use clear()  I am thrown
>> this exception  ( Session Windows are Merging Windows )
>> >>> >>
>> >>> >> Caused by: java.lang.UnsupportedOperationException: Per-window
>> state is not allowed when using merging windows.
>> >>> >>
>> >>> >>
>> >>> >> The questions are
>> >>> >>
>> >>> >> * How do I have state per session window/ per key and still be
>> able to clear it ?
>> >>> >> * Does getRuntime().getState() give me the clear() semantics for
>> free along with state per window per key and thus I  have understood
>> getRuntime().getState() wrong ?
>> >>> >>
>> >>> >> Regards.
>> >>> >>
>> >>> >>
>> >>> >>
>>
>

Reply via email to