Re: Long checkpoint duration for Kafka source operators
For the poor soul that stumbles upon this in the future, just increase your JM resources. I thought for sure this must have been the TM experiencing some sort of backpressure. I tried everything from enabling universal compaction to unaligned checkpoints to profiling the TM. It wasn't until I enabled AWS debug logs that I noticed the JM will make a lot of DELETE requests to AWS after a successful checkpoint. If the checkpoint interval is short and the JM resources limited, then I believe the checkpoint barrier will be delayed causing long start delays. The JM is too busy making AWS requests to inject the barrier. After I increased the JM resources, the long start delays disappeared. On Thu, May 13, 2021 at 1:56 PM Hubert Chen wrote: > Hello, > > I have an application that reads from two Kafka sources, joins them, and > produces to a Kafka sink. The application is experiencing long end to end > checkpoint durations for the Kafka source operators. I'm hoping I could get > some direction in how to debug this further. > > Here is a UI screenshot of a checkpoint instance: > > [image: checkpoint.png] > > My goal is to bring the total checkpoint duration to sub-minute. > > Here are some observations I made: > >- Each source operator task has an E2E checkpoint duration of 1m 7s >- Each source operator task has sub 100ms sync, async, aligned >buffered, and start delay >- Each join operator task has a start delay of 1m 7s >- There is no backpressure in any operator > > These observations are leading me to believe that the source operator is > taking a long amount of time to checkpoint. I find this a bit strange as > the fushioned operator is fairly light. It deserializes the event, assigns > a watermark, and might perform two filters. In addition, it's odd that both > source operators have tasks with all the same E2E checkpoint duration. > > Is there some sort of locking that's occurring on the source operators > that can explain these long E2E durations? > > Best, > Hubert >
Long checkpoint duration for Kafka source operators
Hello, I have an application that reads from two Kafka sources, joins them, and produces to a Kafka sink. The application is experiencing long end to end checkpoint durations for the Kafka source operators. I'm hoping I could get some direction in how to debug this further. Here is a UI screenshot of a checkpoint instance: [image: checkpoint.png] My goal is to bring the total checkpoint duration to sub-minute. Here are some observations I made: - Each source operator task has an E2E checkpoint duration of 1m 7s - Each source operator task has sub 100ms sync, async, aligned buffered, and start delay - Each join operator task has a start delay of 1m 7s - There is no backpressure in any operator These observations are leading me to believe that the source operator is taking a long amount of time to checkpoint. I find this a bit strange as the fushioned operator is fairly light. It deserializes the event, assigns a watermark, and might perform two filters. In addition, it's odd that both source operators have tasks with all the same E2E checkpoint duration. Is there some sort of locking that's occurring on the source operators that can explain these long E2E durations? Best, Hubert
Failures due to inevitable high backpressure
Hello, My Flink application has entered into a bad state and I was wondering if I could get some advice on how to resolve the issue. The sequence of events that led to a bad state: 1. A failure occurs (e.g., TM timeout) within the cluster 2. The application successfully recovers from the last completed checkpoint 3. The application consumes events from Kafka as quickly as it can. This introduces high backpressure. 4. A checkpoint is triggered 5. Another failure occurs (e.g., TM timeout, checkpoint timeout, Kafka transaction timeout) and the application loops back to step #2. This creates a vicious cycle where no progress is made. I believe the underlying issue is the application experiencing high backpressure. This can cause the TM to not respond to heartbeats or cause long checkpoint durations due to delayed processing of the checkpoint. I'm confused on the best next steps to take. How do I ensure that heartbeats and checkpoints successfully complete when experiencing inevitable high packpressure? Best, Hubert