Hi Paul,

I am not sure whether task thread is involverd in some works during snapshoting 
states for FsStateBackend. But I have another experience which might also cause 
your problem.
From your descriptions below, the last task is blocked by 
`SingleInputGate.getNextBufferOrEvent` that means the middle task does not have 
any outpus or the middle operator does not process records.
The backpressure is high between source and middle task which results in 
blocking the source task in `requestBufferBuilder`.

Based on above two points, I guess the middle task is waiting for barrier from 
some source tasks. For the input channels which already receives the barriers, 
the middle task would not process the following data buffers and just cache 
them, so it would result in backpressure the corresponding source based on 
credit-based flow control.  For the input channels without barriers, if there 
are also no data buffers, then the middle task would not have any outputs. So I 
think one hint is to trace why some source task emits barrier delay.

In order to double check the above analysis, you can change the checkpoint mode 
from `exactly-once` to `at-least once`, if the cpu usages and task TPS are not 
decreased for a period as before, I think we could confirm the above analysis. 
:)

Best,
Zhijiang
------------------------------------------------------------------
From:Paul Lam <paullin3...@gmail.com>
Send Time:2019年2月28日(星期四) 15:17
To:user <user@flink.apache.org>
Subject:Flink performance drops when async checkpoint is slow

Hi,

I have a Flink job (version 1.5.3) that consumes from Kafka topic, does some 
transformations and aggregates, and write to two Kafka topics respectively. 
Meanwhile, there’s a custom source that pulls configurations for the 
transformations periodically. The generic job graph is as below.



The job uses FsStateBackend and checkpoints to HDFS, but HDFS’s load is 
unstable, and sometimes HDFS client reports slow read and slow 
waitForAckedSeqno during checkpoints. When that happens, the Flink job consume 
rate drops significantly, and some taskmanager’ cpu usage drops from about 140% 
to 1%, all the task threads on that taskmanager are blocked. This situation 
lasts from seconds to a minute. We started a parallel job with everything the 
same except checkpointing disabled, and it runs very steady.
But I think as the checkpointing is async, it should not affect the task 
threads.

There are some additional information that we observed:

-  When the performance drops, jstack shows that Kafka source and the task 
right after it is blocked at requesting memory buffer (with back pressure close 
to 1), and the last task is blocked at  `SingleInputGate.getNextBufferOrEvent`. 
- The dashboard shows that the buffer during alignment is less than 10 MB, even 
when back pressure is high.

We’ve been struggling with this problem for weeks, and any help is appreciated. 
Thanks a lot!

Best,
Paul Lam

Attachment: 屏幕快照 2019-02-25 11.24.54.png
Description: Binary data

Reply via email to