Hello,

At work we are using Flink to store timers and notify us when they are
triggered. It's been working great over several versions over the
years. Flink 1.5 -> Flink 1.9 -> Flink 1.15.2.

A few months ago we upgraded from Flink 1.9 to Flink 1.15.2. In the
process we had to upgrade all the Flink API code in our job to use the
new APIs.

Our job code has a Kafka Source and a Kafka Sink. For our Source, we
are currently using `WatermarkStrategy.noWatermarks()`. It has been
running fine ever since we upgraded, but in the last few weeks we have
faced two outages.

Configuration:

2 JobManager nodes
5 TaskManager nodes (4 slots each)
Parallelism: 16
Source topic: 30 partitions
Using `setStartingOffsets(OffsetsInitializer.latest())` while
initializing the source.

Outage #1

Our monitoring system alerted us that lag is building up on one
partition (out of 30). We did not know of anything we could to do
jumpstart consumption on that partition other than by forcing a
reassignment. When the TaskManager service on the node to which the
partition was assigned was restarted, the lag reduced soon after.

Outage #2

Something similar happened again, but this time, lag was building up
on 9 (out of 30) partitions. Once again, we restarted the TaskManager
services on all the nodes, and it started consuming once again.

We asked a question on SO,
https://stackoverflow.com/q/74272277/2165719 and was directed to ask
on the mailing list as well.

https://cwiki.apache.org/confluence/display/FLINK/FLIP-182%3A+Support+watermark+alignment+of+FLIP-27+Sources

In another post, https://stackoverflow.com/a/70101290/2165719 there is
a suggestion to use `WatermarkStrategy.withIdleness(...)`. Could this
help us?

Any help/guidance here would be much appreciated.

Thanks,

Reply via email to