[ https://issues.apache.org/jira/browse/FLINK-26306?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17496834#comment-17496834 ]
Piotr Nowojski commented on FLINK-26306: ---------------------------------------- {quote} We can avoid hardcoding these numbers and put the logic into some wrapper around the pool (with a new method that accepts a list of Runnables or Handles). Maybe batching can be a short-term solution; which can be evolved gradually (by replacing executor in wrapper by multiple queues; and then checking queue size in CheckpointRequestDecider). WDYT? {quote} I still do not think this is a good solution. What other users of the ioExecutor pool are doing and how are they using it, so tweaking the number of batches to "size of the pool - 1" sounds like a bad idea. At the same time I don't see a reason to rush this? I agree that expressing the right condition for which {{CheckpointRequestDecider}} should be back pressured is quite tricky. > Triggered checkpoints can be delayed by discarding shared state > --------------------------------------------------------------- > > Key: FLINK-26306 > URL: https://issues.apache.org/jira/browse/FLINK-26306 > Project: Flink > Issue Type: Improvement > Components: Runtime / Checkpointing > Affects Versions: 1.15.0, 1.14.3 > Reporter: Roman Khachatryan > Assignee: Roman Khachatryan > Priority: Major > Fix For: 1.16.0 > > > Quick note: CheckpointCleaner is not involved here. > When a checkpoint is subsumed, SharedStateRegistry schedules its unused > shared state for async deletion. It uses common IO pool for this and adds a > Runnable per state handle. ( see SharedStateRegistryImpl.scheduleAsyncDelete) > When a checkpoint is started, CheckpointCoordinator uses the same thread pool > to initialize the location for it. (see > CheckpointCoordinator.initializeCheckpoint) > The thread pool is of fixed size > [jobmanager.io-pool.size|https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#jobmanager-io-pool-size]; > by default it's the number of CPU cores) and uses FIFO queue for tasks. > When there is a spike in state deletion, the next checkpoint is delayed > waiting for an available IO thread. > Back-pressure seems reasonable here (similar to CheckpointCleaner); however, > this shared state deletion could be spread across multiple subsequent > checkpoints, not neccesarily the next one. > ---- > I believe the issue is an pre-existing one; but it particularly affects > changelog state backend, because 1) such spikes are likely there; 2) > workloads are latency sensitive. > In the tests, checkpoint duration grows from seconds to minutes immediately > after the materialization. -- This message was sent by Atlassian Jira (v8.20.1#820001)