[ 
https://issues.apache.org/jira/browse/FLINK-26306?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17496736#comment-17496736
 ] 

Piotr Nowojski edited comment on FLINK-26306 at 2/23/22, 12:55 PM:
-------------------------------------------------------------------

Thanks for the explanation, I get it now.

> 1. Batch deletions and leave one thread idle (e.g. group 1K handles into 10 
> big batches handled by 11 IO threads)

Is this the right level to provide back pressure functionality? Would it even 
work if you hardcoded in the {{CheckpointCoordinator}} assumptions about pool 
size and the number of used threads? We don't know how else this thread pool is 
being used.

Apart of that. Don't we already have a backpressure mechanism on a higher 
level? {{CheckpointRequestDecider#numberOfCleaningCheckpointsSupplier}} from 
FLINK-17073? It looks like simple fair io thread pool as I described above, 
without any priorities + addjusting/relaxing 
{{numberOfCleaningCheckpointsSupplier.getAsInt() > 
maxConcurrentCheckpointAttempts}} check to something like 
{{numberOfCleaningCheckpointsSupplier.getAsInt() > 
maxConcurrentCheckpointAttempts + CONSTANT}} would do the trick, wouldn't it?


was (Author: pnowojski):
Thanks for the explanation, I get it now.

> 1. Batch deletions and leave one thread idle (e.g. group 1K handles into 10 
> big batches handled by 11 IO threads)

Is this the right level to provide back pressure functionality? Would it even 
work if you hardcoded in the {{CheckpointCoordinator}} assumptions about pool 
size and the number of used threads? We don't know how else this thread pool is 
being used.

Apart of that. Don't we already have a backpressure mechanism on a higher 
level? {{CheckpointRequestDecider#numberOfCleaningCheckpointsSupplier}}? It 
looks like simple fair io thread pool as I described above, without any 
priorities + addjusting/relaxing 
{{numberOfCleaningCheckpointsSupplier.getAsInt() > 
maxConcurrentCheckpointAttempts}} check to something like 
{{numberOfCleaningCheckpointsSupplier.getAsInt() > 
maxConcurrentCheckpointAttempts + CONSTANT}} would do the trick, wouldn't it?

> Triggered checkpoints can be delayed by discarding shared state
> ---------------------------------------------------------------
>
>                 Key: FLINK-26306
>                 URL: https://issues.apache.org/jira/browse/FLINK-26306
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Checkpointing
>    Affects Versions: 1.15.0, 1.14.3
>            Reporter: Roman Khachatryan
>            Assignee: Roman Khachatryan
>            Priority: Major
>             Fix For: 1.15.0
>
>
> Quick note: CheckpointCleaner is not involved here.
> When a checkpoint is subsumed, SharedStateRegistry schedules its unused 
> shared state for async deletion. It uses common IO pool for this and adds a 
> Runnable per state handle. ( see SharedStateRegistryImpl.scheduleAsyncDelete)
> When a checkpoint is started, CheckpointCoordinator uses the same thread pool 
> to initialize the location for it. (see 
> CheckpointCoordinator.initializeCheckpoint)
> The thread pool is of fixed size 
> [jobmanager.io-pool.size|https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#jobmanager-io-pool-size];
>  by default it's the number of CPU cores) and uses FIFO queue for tasks.
> When there is a spike in state deletion, the next checkpoint is delayed 
> waiting for an available IO thread.
> Back-pressure seems reasonable here (similar to CheckpointCleaner); however, 
> this shared state deletion could be spread across multiple subsequent 
> checkpoints, not neccesarily the next one.
> ---- 
> I believe the issue is an pre-existing one; but it particularly affects 
> changelog state backend, because 1) such spikes are likely there; 2) 
> workloads are latency sensitive.
> In the tests, checkpoint duration grows from seconds to minutes immediately 
> after the materialization.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to