Thank you for the confirmation. The simulations confirm too.

On Fri, Apr 9, 2021 at 11:14 AM Roman Khachatryan <ro...@apache.org> wrote:

> Hi Vishal,
>
> Sorry for the late reply,
> Please find my answers below.
> By state I assume the state obtained via getRuntimeContext (access to
> window state is not allowed)..
>
> > The state is scoped to the key (created per key in the
> ProcessWindowFunction with a ttl )
> Yes.
>
> > The state will remain alive irrespective of whether the Window is closed
> or not (a TTL timer does the collection )
> Right, but you need to configure TTL when accessing the state [1]
>
> >  The execution on a key is sequential , as in if 2 events arrive for the
> 2 Sessions they happen sequentially ( or in any order but without the need
> of synchronization )
> Right.
>
> > The state mutated by an event in Session A, will be visible to Session B
> if an event incident on Session B was to happen subsequently.  There is no
> need of synchronizing access to the state as it for the same key.
> Right.
>
> Your understanding of merging of window contents is also correct.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#state-time-to-live-ttl
>
> Regards,
> Roman
>
>
> On Wed, Mar 31, 2021 at 5:45 PM Vishal Santoshi
> <vishal.santo...@gmail.com> wrote:
> >
> > I had a query Say I have a single key with 2 live sessions ( A and B )
> with a configured lateness .
> >
> > Do these invariants hold?
> >
> > * The state is scoped to the key (created per key in the
> ProcessWindowFunction with a ttl )
> > * The state will remain alive irrespective of whether the Window is
> closed or not (a TTL timer does the collection )
> > *  The execution on a key is sequential , as in if 2 events arrive for
> the 2 Sessions they happen sequentially ( or in any order but without the
> need of synchronization )
> > * The state mutated by an event in Session A, will be visible to Session
> B if an event incident on Session B was to happen subsequently.  There is
> no need of synchronizing access to the state as it for the same key.
> >
> > What I am not sure about is what happens when session A merge with
> session B. I would assume that it just is defining new start and end of the
> merged window, Gcing the old ones ( or at least one of them ) and assigning
> that even to that new window. What one does with the custom state in
> ProcessWindowFunction ( there is a CountTrigger of 1 ) ,  really what is
> done in the process method above,  As in this state is 1 degree removed
> from what ever flink does internally with it's merges given that the state
> is scoped to the key.
> >
> >
> >
> >
> >
> >
> >
> > On Fri, Mar 12, 2021 at 12:37 PM Vishal Santoshi <
> vishal.santo...@gmail.com> wrote:
> >>
> >> Yep, makes sense.
> >>
> >> On Fri, Mar 12, 2021 at 10:12 AM Roman Khachatryan <ro...@apache.org>
> wrote:
> >>>
> >>> > Want to confirm that the keys are GCed ( along with state ) once
> the  (windows close + lateness ) ?
> >>> Window state is cleared (as well as the window itself), but global
> >>> state is not (unless you use TTL).
> >>>
> >>> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#state-time-to-live-ttl
> >>>
> >>> Regards,
> >>> Roman
> >>>
> >>> On Fri, Mar 12, 2021 at 2:16 PM Vishal Santoshi
> >>> <vishal.santo...@gmail.com> wrote:
> >>> >
> >>> > Sometimes writing it down makes you think. I now realize that this
> is not the right approach, given that merging windows will have their own
> states..and how the merge happens is really at the key level....
> >>> >
> >>> >
> >>> >
> >>> > On Fri, Mar 12, 2021 at 6:27 AM Vishal Santoshi <
> vishal.santo...@gmail.com> wrote:
> >>> >>
> >>> >> I intend to augment every event in a session  with a unique ID.  To
> keep the session lean, there is a PurgingTrigger on this aggregate that
> fires on a count of 1.
> >>> >>
> >>> >> >> (except that the number of keys can grow).
> >>> >>
> >>> >> Want to confirm that the keys are GCed ( along with state ) once
> the  (windows close + lateness ) ?
> >>> >>
> >>> >>
> >>> >>
> >>> >> On Fri, Mar 12, 2021 at 5:32 AM Roman Khachatryan <ro...@apache.org>
> wrote:
> >>> >>>
> >>> >>> Hi Vishal,
> >>> >>>
> >>> >>> There is no leak in the code you provided (except that the number
> of
> >>> >>> keys can grow).
> >>> >>> But as you figured out the state is scoped to key, not to
> window+key.
> >>> >>>
> >>> >>> Could you explain what you are trying to achieve and why do you
> need to combine
> >>> >>> sliding windows with state scoped to window+key?
> >>> >>>
> >>> >>> Regards,
> >>> >>> Roman
> >>> >>>
> >>> >>> On Fri, Mar 12, 2021 at 5:13 AM Vishal Santoshi
> >>> >>> <vishal.santo...@gmail.com> wrote:
> >>> >>> >
> >>> >>> > Essentially, Does this code leak state
> >>> >>> >
> >>> >>> > private static class SessionIdProcessWindowFunction<KEY extends
> java.io.Serializable, VALUE extends java.io.Serializable>
> >>> >>> > extends
> >>> >>> > ProcessWindowFunction<KeyedSession<KEY, VALUE>,
> KeyedSessionWithSessionID<KEY, VALUE>, KEY, TimeWindow> {
> >>> >>> > private static final long serialVersionUID = 1L;
> >>> >>> > private final static ValueStateDescriptor<String> sessionId =
> new ValueStateDescriptor<String>("session_uid",
> >>> >>> > String.class);
> >>> >>> >
> >>> >>> > @Override
> >>> >>> > public void process(KEY key,
> >>> >>> > ProcessWindowFunction<KeyedSession<KEY, VALUE>,
> KeyedSessionWithSessionID<KEY, VALUE>, KEY, TimeWindow>.Context context,
> >>> >>> > Iterable<KeyedSession<KEY, VALUE>> elements,
> Collector<KeyedSessionWithSessionID<KEY, VALUE>> out)
> >>> >>> > throws Exception {
> >>> >>> > // I need this scoped to key/window
> >>> >>> > if (getRuntimeContext().getState(sessionId).value() == null) {
> >>> >>> > UUID uuid = UUID.randomUUID();
> >>> >>> > getRuntimeContext().getState(sessionId).update(uuid.toString());
> >>> >>> > }
> >>> >>> > String uuid = getRuntimeContext().getState(sessionId).value();
> >>> >>> > out.collect(new
> KeyedSessionWithSessionID<>(elements.iterator().next(), uuid));
> >>> >>> > }
> >>> >>> > }
> >>> >>> >
> >>> >>> > On Thu, Mar 11, 2021 at 11:09 PM Vishal Santoshi <
> vishal.santo...@gmail.com> wrote:
> >>> >>> >>
> >>> >>> >> Hello folks,
> >>> >>> >>                   The suggestion is to use windowState() for a
> key key per window state and clear the state explicitly.  Also it seems
> that getRuntime().getState() will return a globalWindow() where state is
> shared among windows with the same key. I desire of course to have state
> scoped to a key per window and was wanting to use windowState().. The
> caveat is that my window is a Session Window and when I try to use clear()
> I am thrown this exception  ( Session Windows are Merging Windows )
> >>> >>> >>
> >>> >>> >> Caused by: java.lang.UnsupportedOperationException: Per-window
> state is not allowed when using merging windows.
> >>> >>> >>
> >>> >>> >>
> >>> >>> >> The questions are
> >>> >>> >>
> >>> >>> >> * How do I have state per session window/ per key and still be
> able to clear it ?
> >>> >>> >> * Does getRuntime().getState() give me the clear() semantics
> for free along with state per window per key and thus I  have understood
> getRuntime().getState() wrong ?
> >>> >>> >>
> >>> >>> >> Regards.
> >>> >>> >>
> >>> >>> >>
> >>> >>> >>
>

Reply via email to