> Want to confirm that the keys are GCed ( along with state ) once the > (windows close + lateness ) ? Window state is cleared (as well as the window itself), but global state is not (unless you use TTL).
[1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#state-time-to-live-ttl Regards, Roman On Fri, Mar 12, 2021 at 2:16 PM Vishal Santoshi <vishal.santo...@gmail.com> wrote: > > Sometimes writing it down makes you think. I now realize that this is not the > right approach, given that merging windows will have their own states..and > how the merge happens is really at the key level.... > > > > On Fri, Mar 12, 2021 at 6:27 AM Vishal Santoshi <vishal.santo...@gmail.com> > wrote: >> >> I intend to augment every event in a session with a unique ID. To keep the >> session lean, there is a PurgingTrigger on this aggregate that fires on a >> count of 1. >> >> >> (except that the number of keys can grow). >> >> Want to confirm that the keys are GCed ( along with state ) once the >> (windows close + lateness ) ? >> >> >> >> On Fri, Mar 12, 2021 at 5:32 AM Roman Khachatryan <ro...@apache.org> wrote: >>> >>> Hi Vishal, >>> >>> There is no leak in the code you provided (except that the number of >>> keys can grow). >>> But as you figured out the state is scoped to key, not to window+key. >>> >>> Could you explain what you are trying to achieve and why do you need to >>> combine >>> sliding windows with state scoped to window+key? >>> >>> Regards, >>> Roman >>> >>> On Fri, Mar 12, 2021 at 5:13 AM Vishal Santoshi >>> <vishal.santo...@gmail.com> wrote: >>> > >>> > Essentially, Does this code leak state >>> > >>> > private static class SessionIdProcessWindowFunction<KEY extends >>> > java.io.Serializable, VALUE extends java.io.Serializable> >>> > extends >>> > ProcessWindowFunction<KeyedSession<KEY, VALUE>, >>> > KeyedSessionWithSessionID<KEY, VALUE>, KEY, TimeWindow> { >>> > private static final long serialVersionUID = 1L; >>> > private final static ValueStateDescriptor<String> sessionId = new >>> > ValueStateDescriptor<String>("session_uid", >>> > String.class); >>> > >>> > @Override >>> > public void process(KEY key, >>> > ProcessWindowFunction<KeyedSession<KEY, VALUE>, >>> > KeyedSessionWithSessionID<KEY, VALUE>, KEY, TimeWindow>.Context context, >>> > Iterable<KeyedSession<KEY, VALUE>> elements, >>> > Collector<KeyedSessionWithSessionID<KEY, VALUE>> out) >>> > throws Exception { >>> > // I need this scoped to key/window >>> > if (getRuntimeContext().getState(sessionId).value() == null) { >>> > UUID uuid = UUID.randomUUID(); >>> > getRuntimeContext().getState(sessionId).update(uuid.toString()); >>> > } >>> > String uuid = getRuntimeContext().getState(sessionId).value(); >>> > out.collect(new KeyedSessionWithSessionID<>(elements.iterator().next(), >>> > uuid)); >>> > } >>> > } >>> > >>> > On Thu, Mar 11, 2021 at 11:09 PM Vishal Santoshi >>> > <vishal.santo...@gmail.com> wrote: >>> >> >>> >> Hello folks, >>> >> The suggestion is to use windowState() for a key key >>> >> per window state and clear the state explicitly. Also it seems that >>> >> getRuntime().getState() will return a globalWindow() where state is >>> >> shared among windows with the same key. I desire of course to have state >>> >> scoped to a key per window and was wanting to use windowState().. The >>> >> caveat is that my window is a Session Window and when I try to use >>> >> clear() I am thrown this exception ( Session Windows are Merging >>> >> Windows ) >>> >> >>> >> Caused by: java.lang.UnsupportedOperationException: Per-window state is >>> >> not allowed when using merging windows. >>> >> >>> >> >>> >> The questions are >>> >> >>> >> * How do I have state per session window/ per key and still be able to >>> >> clear it ? >>> >> * Does getRuntime().getState() give me the clear() semantics for free >>> >> along with state per window per key and thus I have understood >>> >> getRuntime().getState() wrong ? >>> >> >>> >> Regards. >>> >> >>> >> >>> >>