[ 
https://issues.apache.org/jira/browse/FLINK-26306?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17496834#comment-17496834
 ] 

Piotr Nowojski edited comment on FLINK-26306 at 2/23/22, 3:16 PM:
------------------------------------------------------------------

{quote}
We can avoid hardcoding these numbers and put the logic into some wrapper 
around the pool (with a new method that accepts a list of Runnables or Handles).

Maybe batching can be a short-term solution; which can be evolved gradually (by 
replacing executor in wrapper by multiple queues; and then checking queue size 
in CheckpointRequestDecider). WDYT?
{quote}

I still do not think this is a good solution. You would need to assume what 
other users of the ioExecutor pool are doing and how are they using it, so 
tweaking the number of batches to "size of the pool - 1" sounds like a bad 
idea. At the same time I don't see a reason to rush this?

I agree that expressing the right condition for which 
{{CheckpointRequestDecider}} should be back pressured is quite tricky.


was (Author: pnowojski):
{quote}
We can avoid hardcoding these numbers and put the logic into some wrapper 
around the pool (with a new method that accepts a list of Runnables or Handles).

Maybe batching can be a short-term solution; which can be evolved gradually (by 
replacing executor in wrapper by multiple queues; and then checking queue size 
in CheckpointRequestDecider). WDYT?
{quote}

I still do not think this is a good solution. What other users of the 
ioExecutor pool are doing and how are they using it, so tweaking the number of 
batches to "size of the pool - 1" sounds like a bad idea. At the same time I 
don't see a reason to rush this?

I agree that expressing the right condition for which 
{{CheckpointRequestDecider}} should be back pressured is quite tricky.

> Triggered checkpoints can be delayed by discarding shared state
> ---------------------------------------------------------------
>
>                 Key: FLINK-26306
>                 URL: https://issues.apache.org/jira/browse/FLINK-26306
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Checkpointing
>    Affects Versions: 1.15.0, 1.14.3
>            Reporter: Roman Khachatryan
>            Assignee: Roman Khachatryan
>            Priority: Major
>             Fix For: 1.16.0
>
>
> Quick note: CheckpointCleaner is not involved here.
> When a checkpoint is subsumed, SharedStateRegistry schedules its unused 
> shared state for async deletion. It uses common IO pool for this and adds a 
> Runnable per state handle. ( see SharedStateRegistryImpl.scheduleAsyncDelete)
> When a checkpoint is started, CheckpointCoordinator uses the same thread pool 
> to initialize the location for it. (see 
> CheckpointCoordinator.initializeCheckpoint)
> The thread pool is of fixed size 
> [jobmanager.io-pool.size|https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#jobmanager-io-pool-size];
>  by default it's the number of CPU cores) and uses FIFO queue for tasks.
> When there is a spike in state deletion, the next checkpoint is delayed 
> waiting for an available IO thread.
> Back-pressure seems reasonable here (similar to CheckpointCleaner); however, 
> this shared state deletion could be spread across multiple subsequent 
> checkpoints, not neccesarily the next one.
> ---- 
> I believe the issue is an pre-existing one; but it particularly affects 
> changelog state backend, because 1) such spikes are likely there; 2) 
> workloads are latency sensitive.
> In the tests, checkpoint duration grows from seconds to minutes immediately 
> after the materialization.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to