Hello, At work we are using Flink to store timers and notify us when they are triggered. It's been working great over several versions over the years. Flink 1.5 -> Flink 1.9 -> Flink 1.15.2.
A few months ago we upgraded from Flink 1.9 to Flink 1.15.2. In the process we had to upgrade all the Flink API code in our job to use the new APIs. Our job code has a Kafka Source and a Kafka Sink. For our Source, we are currently using `WatermarkStrategy.noWatermarks()`. It has been running fine ever since we upgraded, but in the last few weeks we have faced two outages. Configuration: 2 JobManager nodes 5 TaskManager nodes (4 slots each) Parallelism: 16 Source topic: 30 partitions Using `setStartingOffsets(OffsetsInitializer.latest())` while initializing the source. Outage #1 Our monitoring system alerted us that lag is building up on one partition (out of 30). We did not know of anything we could to do jumpstart consumption on that partition other than by forcing a reassignment. When the TaskManager service on the node to which the partition was assigned was restarted, the lag reduced soon after. Outage #2 Something similar happened again, but this time, lag was building up on 9 (out of 30) partitions. Once again, we restarted the TaskManager services on all the nodes, and it started consuming once again. We asked a question on SO, https://stackoverflow.com/q/74272277/2165719 and was directed to ask on the mailing list as well. https://cwiki.apache.org/confluence/display/FLINK/FLIP-182%3A+Support+watermark+alignment+of+FLIP-27+Sources In another post, https://stackoverflow.com/a/70101290/2165719 there is a suggestion to use `WatermarkStrategy.withIdleness(...)`. Could this help us? Any help/guidance here would be much appreciated. Thanks,