Hello,
I am stuck with a weird problem and not able to wrap my head around it.
Here is my pipeline:
SingleOutputStreamOperator<Data> data =
flattenedPlayerStatsData
.keyBy(new KeySelector())
.window(TumblingEventTimeWindows.of(Time.seconds(300)))
.sideOutputLateData(lateOutputTag)
.reduce(new MyReducer())
.name("reduce");
DataStream<Data> lateData = data.getSideOutput(lateOutputTag);
// re-process late data
SingleOutputStreamOperator<Data> reducedLateData =
lateData
.assignTimestampsAndWatermarks(
WatermarkStrategy.<Data>forBoundedOutOfOrderness(Duration.ofSeconds(600))
.withTimestampAssigner((event, timestamp) -> event
!= null ? event.timestamp : timestamp))
.startNewChain()
.keyBy(new KeySelector())
.window(TumblingEventTimeWindows.of(Time.seconds(300)))
.reduce(new MyReducer())
.name("lateReduce");
// add sink
reducedLateData
.keyBy(new KeySelector())
.addSink(new MySink<>())
.name("lateSink");
What I observe is that the *lateReduce* step is receiving incoming records
but it is not outputting the reduced records to the *lateSink* step.
It seems to be accumulating late records forever.
Is there any issue with the *Timestamps/Watermarks* step ?
I see that this step is also receiving the records and outputting the same
numbers of records to the *lateReduce* step.
Please let me know what I may be doing wrong.
If I don't assign fresh timestamp and watermarking to the *reducedLateData*
stream then I notice that this *lateReduce* step now drops late records
which were before dropped by *reduce* step.
Thanks
Sachin