Hi, Rogin.
If you have upgraded to 1.16, I think your problem will be
solved automatically because the restore mode has been supported from 1.15.
The NO_CLAIM mode is the default restore mode [1] which will help you to
break the lineage of snapshots (both checkpoints and savepoints).
When you use this mode, the first checkpoint will be full, but the
following checkpoints will be incremental if only you enable the
incremental checkpoint.
Of course, you could also try to use CLAIM mode in which flink will help
you to clean the old checkpoints as much as possible.

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/ops/state/savepoints/#resuming-from-savepoints

On Mon, Dec 19, 2022 at 5:28 PM Robin Cassan <robin.cas...@contentsquare.com>
wrote:

> Hey Hangxiang! Thanks a lot for your answer
>
> Indeed we are currently using Flink 1.13 and plan on moving to 1.16 soon,
> so it's great news that the non-incremental checkpoints were optimized,
> thanks for sharing! We decided to no use incremental checkpoints due to
> the fact that it was hard to expire files on S3 since newer checkpoints
> might need shared files from older ones (given our deployment process is
> based on restoring the old job's checkpoint and not taking a savepoint).
> That said, we will keep this idea in mind
>
> Do you however confirm that there is no way to isolate resources used for
> checkpoints and achieve consistent latency? Our only option is to try
> to reduce the checkpointing duration?
>
> Thanks again,
> Robin
>
> Le ven. 16 déc. 2022 à 13:48, Hangxiang Yu <master...@gmail.com> a écrit :
>
>> Hi, Robin.
>> From your code path (*FullSnapshotAsyncWriter.writeKVStateData*), I
>> guess your version of Flink was below 1.16 and you adapted the default
>> config of 'state.backend.incremental'.
>> In the version below 1.16, RocksDBStateBackend will use savepoint format
>> as its full snapshot[1]. So it will iterate all data in db and upload them
>> which will cost lots of CPU when checkpointing.
>>
>> So I'd like to suggest you to:
>> 1. set 'state.backend.incremental' as true to enable incremental
>> checkpoint[2] to avoid the iterator cost and reduce the upload size.
>> 2. upgrade to 1.16 to fix [1]. Of course, I think it's better to enable
>> incremental checkpoint.
>> 3. upgrade to 1.16 and use generic incremental checkpoint (changelog) [3]
>> if you find the CPU is still not stable.
>> The feature could help to make incremental checkpoint size small and
>> stable which could make the CPU more stable.
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-28699
>> [2]
>> https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/datastream/fault-tolerance/checkpointing/#state-backend-incremental
>> [3]
>> https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/deployment/config/#state-changelog-options
>>
>> On Fri, Dec 16, 2022 at 1:03 AM Robin Cassan via user <
>> user@flink.apache.org> wrote:
>>
>>> Hello all!
>>>
>>> We are trying to bring our flink job closer to real-time processing and
>>> currently our main issue is latency that happens during checkpoints. Our
>>> job uses RocksDB with periodic checkpoints, which are a few hundred GBs
>>> every 15 minutes. We are trying to reduce the checkpointing duration but
>>> our main concern is the fact that, during checkpoints, 70% of our CPU is
>>> used for checkpointing (*FullSnapshotAsyncWriter.writeKVStateData*)
>>>
>>> Ideally, we would like to allocate a fixed amount of our CPU resources
>>> to this task (let's say 10%), which would allow the regular processing of
>>> data to remain stable while checkpointing. This comes at the expense of
>>> having 10% idle CPU in-between checkpoints and having longer checkpoint
>>> durations, but we are OK with this tradeoff if it brings more predictable
>>> latency overall.
>>>
>>> However, I didn't find any setting to achieve this. It seems like these
>>> checkpointing tasks are scheduled in the *asyncOperationsThreadPool* that
>>> resides in *StreamTask.java* and this pool seems to be unbounded.
>>> Do you think that having an upper bound to this thread pool would
>>> achieve the outcome we expect? And if so, is there a way to add this bound?
>>>
>>> Thanks a lot!
>>>
>>> Robin
>>>
>>
>>
>> --
>> Best,
>> Hangxiang.
>>
>

-- 
Best,
Hangxiang.

Reply via email to