Re: Long checkpoint duration for Kafka source operators

2021-05-20 Thread Hubert Chen
For the poor soul that stumbles upon this in the future, just increase your
JM resources.

I thought for sure this must have been the TM experiencing some sort of
backpressure. I tried everything from enabling universal compaction to
unaligned checkpoints to profiling the TM. It wasn't until I enabled AWS
debug logs that I noticed the JM will make a lot of DELETE requests to AWS
after a successful checkpoint. If the checkpoint interval is short and the
JM resources limited, then I believe the checkpoint barrier will be delayed
causing long start delays. The JM is too busy making AWS requests to inject
the barrier. After I increased the JM resources, the long start delays
disappeared.

On Thu, May 13, 2021 at 1:56 PM Hubert Chen  wrote:

> Hello,
>
> I have an application that reads from two Kafka sources, joins them, and
> produces to a Kafka sink. The application is experiencing long end to end
> checkpoint durations for the Kafka source operators. I'm hoping I could get
> some direction in how to debug this further.
>
> Here is a UI screenshot of a checkpoint instance:
>
> [image: checkpoint.png]
>
> My goal is to bring the total checkpoint duration to sub-minute.
>
> Here are some observations I made:
>
>- Each source operator task has an E2E checkpoint duration of 1m 7s
>- Each source operator task has sub 100ms sync, async, aligned
>buffered, and start delay
>- Each join operator task has a start delay of 1m 7s
>- There is no backpressure in any operator
>
> These observations are leading me to believe that the source operator is
> taking a long amount of time to checkpoint. I find this a bit strange as
> the fushioned operator is fairly light. It deserializes the event, assigns
> a watermark, and might perform two filters. In addition, it's odd that both
> source operators have tasks with all the same E2E checkpoint duration.
>
> Is there some sort of locking that's occurring on the source operators
> that can explain these long E2E durations?
>
> Best,
> Hubert
>


Long checkpoint duration for Kafka source operators

2021-05-13 Thread Hubert Chen
Hello,

I have an application that reads from two Kafka sources, joins them, and
produces to a Kafka sink. The application is experiencing long end to end
checkpoint durations for the Kafka source operators. I'm hoping I could get
some direction in how to debug this further.

Here is a UI screenshot of a checkpoint instance:

[image: checkpoint.png]

My goal is to bring the total checkpoint duration to sub-minute.

Here are some observations I made:

   - Each source operator task has an E2E checkpoint duration of 1m 7s
   - Each source operator task has sub 100ms sync, async, aligned buffered,
   and start delay
   - Each join operator task has a start delay of 1m 7s
   - There is no backpressure in any operator

These observations are leading me to believe that the source operator is
taking a long amount of time to checkpoint. I find this a bit strange as
the fushioned operator is fairly light. It deserializes the event, assigns
a watermark, and might perform two filters. In addition, it's odd that both
source operators have tasks with all the same E2E checkpoint duration.

Is there some sort of locking that's occurring on the source operators that
can explain these long E2E durations?

Best,
Hubert


Failures due to inevitable high backpressure

2020-08-26 Thread Hubert Chen
Hello,

My Flink application has entered into a bad state and I was wondering if I
could get some advice on how to resolve the issue.

The sequence of events that led to a bad state:

1. A failure occurs (e.g., TM timeout) within the cluster
2. The application successfully recovers from the last completed checkpoint
3. The application consumes events from Kafka as quickly as it can. This
introduces high backpressure.
4. A checkpoint is triggered
5. Another failure occurs (e.g., TM timeout, checkpoint timeout, Kafka
transaction timeout) and the application loops back to step #2. This
creates a vicious cycle where no progress is made.

I believe the underlying issue is the application experiencing high
backpressure. This can cause the TM to not respond to heartbeats or cause
long checkpoint durations due to delayed processing of the checkpoint.

I'm confused on the best next steps to take. How do I ensure that
heartbeats and checkpoints successfully complete when experiencing
inevitable high packpressure?

Best,
Hubert