Hi All,
           I am trying to access elements stored in the state of the
window. As window, itself is a stateful operator I think I should be able
to get records in the process function after the is triggered. Can someone
tell me why in the following code is the state of the window null?

Below is a mocked piece of code we are using. Am I doing something wrong
here?

env.enableCheckpointing(20000L);
env.setStateBackend(new FsStateBackend("path"));
DataStream<S> stream1 =env.addSource(new FlinkKafkaConsumer);
DataStream<S> stream2 =env.addSource(new FlinkKafkaConsumer);

stream1.union(stream2)
.keyBy()
.timeWindow(Time.milliseconds((30L))
.allowedLateness(Time.minutes(1))
.process(new ProcessWindowFunction<T>()
{
public void process(T t, ProcessWindowFunction<T>.Context ctx, Iterable<R>
itr, Collector<R>collector)
KeyedStateStore globalState = ctx.globalState();
ValueState<Tuple6<Long, String, String, String, String, String>> valueState
= ctx.globalState().getState(new ValueStateDescriptor<>("valueState",
TypeInformation.of(new TypeHint<T>() {})));

System.out.println(valueState.value());
collector.collect(T)
})



Regards,
Taher Koitawala
GS Lab Pune
+91 8407979163

Reply via email to