Hey guys,

I have a KeyedProcessFunction that gathers statistics on the events that
flow through and emits it periodically (every few seconds) to a SideOutput.
However, at the end of stream the last set of statistics don't get emitted.
I read on the mailing list that processing time timers that are pending
don't get triggered when Flink cleans up a stream, but that event timers do
get triggered because a watermark with Long.MAX_VALUE is sent through the
stream.
Hence, I thought that I could register a "backup" event timer for
Long.MAX_VALUE-1 to make sure that my process function gets notified when
the stream ends to emit the in-flight statistics.

However, now my simple test case (with a data source fromCollection of 4
elements) keeps iterating over the same 4 elements in an infinite loop.

I don't know how to make sense of this and would appreciate your help.
Is there a better way to set a timer that gets triggered at the end of
stream?
And for my education: Why does registering an event timer cause an infinite
loop over the source elements?

Thanks a lot and have a wonderful weekend,
Matthias

Reply via email to