Hi,
I am using a broadcast pattern for publishing rules and aggregating the
data(https://flink.apache.org/news/2020/01/15/demo-fraud-detection.html).
My use case is similar and also the code.
One thing I wanted to capture is to figure out any latevents if any and
send them to a sink. But when there are events already on the kafka topic
which weren't consumed and start the app after a couple of hours I see
output timestamps messed up.
timestamp: 2021-12-02T04:48:20.324+0000, watermark:
292269055-12-02T16:47:04.192+0000, timeService.watermark:
292269055-12-02T16:47:04.192+0000
I have watermark strategy set on KafkaSource as:
WatermarkStrategy<Record> wmStrategy = WatermarkStrategy
.<CDRRecord>forBoundedOutOfOrderness(Duration.ofSeconds(config.get(OUT_OF_ORDERNESS)))
.withTimestampAssigner((cdrRecord, timestamp) ->
record.getEventTime());
return
env.addSource(recordSource.assignTimestampsAndWatermarks(wmStrategy))
.name("records Source")
.setParallelism(config.get(SOURCE_PARALLELISM));
Please let me know if you need any more information.
Thanks,
Sweta