Essentially, Does this code leak state

private static class SessionIdProcessWindowFunction<KEY extends java.io.
Serializable, VALUE extends java.io.Serializable>
extends
ProcessWindowFunction<KeyedSession<KEY, VALUE>, KeyedSessionWithSessionID<
KEY, VALUE>, KEY, TimeWindow> {
private static final long serialVersionUID = 1L;
private final static ValueStateDescriptor<String> sessionId = new
ValueStateDescriptor<String>("session_uid",
String.class);

@Override
public void process(KEY key,
ProcessWindowFunction<KeyedSession<KEY, VALUE>, KeyedSessionWithSessionID<
KEY, VALUE>, KEY, TimeWindow>.Context context,
Iterable<KeyedSession<KEY, VALUE>> elements, Collector<
KeyedSessionWithSessionID<KEY, VALUE>> out)
throws Exception {
// *I need this scoped to key/window*
if (getRuntimeContext().getState(sessionId).value() == null) {
UUID uuid = UUID.randomUUID();
getRuntimeContext().getState(sessionId).update(uuid.toString());
}
String uuid = getRuntimeContext().getState(sessionId).value();
out.collect(new KeyedSessionWithSessionID<>(elements.iterator().next(), uuid
));
}
}

On Thu, Mar 11, 2021 at 11:09 PM Vishal Santoshi <vishal.santo...@gmail.com>
wrote:

> Hello folks,
>                   The suggestion is to use windowState() for a key key per
> window state and clear the state explicitly.  Also it seems that
> getRuntime().getState() will return a globalWindow() where state is shared
> among windows with the same key. I desire of course to have state scoped to
> a key per window and was wanting to use windowState().. The caveat is that
> my window is a Session Window and when I try to use clear()  I am
> thrown this exception  ( Session Windows are Merging Windows )
>
> Caused by: java.lang.UnsupportedOperationException: Per-window state is
> not allowed when using merging windows.
>
>
> The questions are
>
> * How do I have state per *session* window/ per key and still be able to
> clear it ?
> * Does getRuntime().getState() give me the clear() semantics for free
> along with state per window per key and thus I  have
> understood  getRuntime().getState() wrong ?
>
> Regards.
>
>
>
>

Reply via email to