Hi Robin.
Could you share how you got the metric of CPU usage ?
By summing all used CPU cores of TMs or evaluating it by the busy metric in
Flink UI ?
I think it's the first thing we need to align.

> network (async) part of the checkpoint should in theory not cause
backpressure since resources would be used for the main stream during async
waits, but I might be wrong
I also believe it works as you said when the job is running normally.
So I think there are other bottlenecks causing backpressure when checkpoint.
Maybe you could check other resources usage like io utilization (as Zakelly
suggested), heap usage, gc when checkpoint ?

On Tue, Oct 25, 2022 at 2:01 PM Zakelly Lan <zakelly....@gmail.com> wrote:

> Hi Robin,
>
> You said that during the checkpoint async phase the CPU is stable at
> 100%, which is pretty strange to me. Normally the cpu usage of the
> taskmanager process could exceed 100%, depending on what all the
> threads are doing. I'm wondering if there is any scheduling mechanism
> controlling the CPU usage of a process in your setup, such as
> leveraging CGroup in yarn or Kubernetes. In this case, the uploading
> thread may preempt cpu resources from the task processing thread.
> The second thing that might help is, you may check the io utilization
> during the checkpoint. The uploading thread keeps reading from the
> local disk and writing to the remote, which may affect the io and
> state access latency, especially when the state size is large.
>
> Best,
> Zakelly
>
> On Tue, Oct 25, 2022 at 12:10 AM Robin Cassan via user
> <u...@flink.apache.org> wrote:
> >
> > Hello Yuan Mei! Thanks a lot for your answer :)
> >
> > About the CPU usage, it is pretty stable at 80% normally. Every 15
> minutes we trigger a checkpoint, and during this time it is stable at 100%
> > I am starting to wonder if CPU is the real limiting factor, because when
> checking the Flink UI I see that most of the checkpoint duration is async.
> I do not know how the async phase affects backpressure, but it does look
> like the upload to S3 phase is causing the backpressure. The sync phase is
> quite short as well.
> > Looking at this article
> https://flink.apache.org/2022/05/23/latency-part2.html it seems we
> already are in the most efficient configuration (at-least-once,
> non-concurrent checkpointing, rocksdb on local NVME SSDs...), I don't see
> an obvious quick-win apart from scaling up the full cluster.
> >
> > Reducing the state size will be a big challenge but even then it would
> not guarantee consistent latency, same for less frequent checkpoints.
> > For now it looks like our only option to achieve real-time computation
> would be to not use Flink (or at least, not include these computations
> inside a job with a big state that is checkpointed). Thanks again for the
> insight, and if you happen to have any information on how we could prevent
> the async phase of checkpoints to add backpressure on our stream I would be
> very interested!
> >
> > Le mer. 19 oct. 2022 à 10:57, Yuan Mei <yuanmei.w...@gmail.com> a écrit
> :
> >>
> >> Hey Robin,
> >>
> >> Thanks for sharing the detailed information.  May I ask, when you are
> saying "CPU usage is around 80% when checkpoints aren't running, and capped
> at 100% when they are", do you see zigzag patterns of CPU usage, or is it
> kept capped at 100% of CPU?
> >>
> >> I think one possibility is that the sync phase of cp (the writebuffer
> flush during the sync phase) triggers a rocksdb compaction, and we saw this
> happens on Ververica services as well.
> >>
> >> At this moment, maybe you can try to make the checkpoint less frequent
> (increase the checkpoint interval) to reduce the frequency of compaction.
> Please let me know whether this helps.
> >>
> >> In long term, I think we probably need to separate the compaction
> process from the internal db and control/schedule the compaction process
> ourselves (compaction takes a good amount of CPU and reduces TPS).
> >>
> >> Best.
> >> Yuan
> >>
> >>
> >>
> >> On Thu, Oct 13, 2022 at 11:39 PM Robin Cassan via user <
> u...@flink.apache.org> wrote:
> >>>
> >>> Hello all, hope you're well :)
> >>> We are attempting to build a Flink job with minimal and stable latency
> (as much as possible) that consumes data from Kafka. Currently our main
> limitation happens when our job checkpoints the RocksDB state: backpressure
> is applied on the stream, causing latency. I am wondering if there are ways
> to configure Flink so that the checkpointing process affects the flow of
> data as little as possible?
> >>>
> >>> In our case, backpressure seems to arise from CPU consumption, because:
> >>> - CPU usage is around 80% when checkpoints aren't running, and capped
> at 100% when they are
> >>> - checkpoint alignment time is very low, using unaligned checkpoints
> doesn't appear to help with backpressure
> >>> - network (async) part of the checkpoint should in theory not cause
> backpressure since resources would be used for the main stream during async
> waits, but I might be wrong
> >>>
> >>> What we would really like to achieve is isolating the compute resource
> used for checkpointing from the ones used for task slots. Which would of
> course mean that we need to oversize our cluster for having resources
> available for checkpointing even when it's not running, but also that we
> would get longer checkpoints compared to today where checkpoints seem to
> use CPU cores attributed to task slots. We are ok with that to some degree,
> but we don't know how to achieve this isolation. Do you have any clue?
> >>>
> >>> Lastly, we currently have nodes with 8 cores but allocate 6 task
> slots, and we have set the following settings:
> >>>
> >>> state.backend.rocksdb.thread.num: 6
> >>> state.backend.rocksdb.writebuffer.count: 6
> >>>
> >>>
> >>> Thanks all for your help!
>


-- 
Best,
Hangxiang.

Reply via email to