[ 
https://issues.apache.org/jira/browse/FLINK-26306?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17496634#comment-17496634
 ] 

Roman Khachatryan edited comment on FLINK-26306 at 2/23/22, 10:12 AM:
----------------------------------------------------------------------

> > 2. Spread changelog materialization across multiple checkpoints; i.e. 
> > materialize different tasks at different times
> Can you Roman Khachatryan elaborate why would that help? Is it because 
> materialised parts of the changelog checkpoints are causing those deletion 
> spikes? If so, why is that the case? Why is this only because of the 
> "materialised parts"?

Let me calrify what happens currently:
1. JM triggers a checkpoint
2. TMs send non-materialized changes
3. The above is repeated until the materialization happens (with 1s checkpoint 
and 10m materialization interval - that's 600 checkpoints times nr. of tasks))
4. Materialization finishes more or less simultaneously on all tasks
5. Checkpoint N is triggered - TMs don't send "old" non-materialized state 
(only mat. state + changelog after it)
6. Checkpoint N is completed, checkpoint (N - 1) is subsumed; all "old" 
non-materialized state is scheduled for async deletion
7. Checkpoint (N + 1) is triggered; but it is waiting for an IO thread to 
initialize the location

It *is* desirable to preserve this back-pressure from deletion to new 
checkpoints. But if possible, deletions should be spread more evenly. 
So I was thinking that distributing different task materializations evenly 
should reduce the wait time (although it does not eliminate it completely).
The other way is to adjust threads workings (which I think is a better way).

> Maybe we should think about some more fair thread pool for async jobs? For 
> example every async IO job could get assigned an id/key, and each id/key 
> would have it's own queue of tasks to perform. Based on that we could 
> implement all kinds of fancy priority schemes, but we could start with 
> something as simple as just going in a round robing fashion through all 
> individual per id/key queues when polling for a new task to execute. This 
> could be generic and flexible enough to be re-used in other use cases (I was 
> thinking about something like that for the TMs IO executor in the past).

I think only priorities won't work here because we'd need to assign different 
priorities depending on the "queue length" to preserve back-pressure.
If we always prioritize new checkpoints over deletions, we'll likely end up 
with OOMs (the case before the CheckpointCleaner was introduced).
Having different queues would work I think - but with a check of the length of 
the deletion queue.

> Re batching. Isn't this more of an independent potential optimisation that we 
> could consider independently of the main issue? Depending how long is single 
> IO operation. If it's more then a couple of ms, I would prefer to leave them 
> separate.

I think this can be viewed as an optimization if the problem solved by other 
means; or as an actual way to solve this problem.
The benefits of the batching solution are simplicity and lesser invasiveness.


was (Author: roman_khachatryan):
> > 2. Spread changelog materialization across multiple checkpoints; i.e. 
> > materialize different tasks at different times
> Can you Roman Khachatryan elaborate why would that help? Is it because 
> materialised parts of the changelog checkpoints are causing those deletion 
> spikes? If so, why is that the case? Why is this only because of the 
> "materialised parts"?

Let me calrify what happens currently:
1. JM triggers a checkpoint
2. TMs send non-materialized changes
3. The above is repeated until the materialization happens (with 1s checkpoint 
and 10m materialization interval - that's 600 checkpoints times nr. of tasks))
4. Materialization finishes more or less simultaneously on all tasks
5. Checkpoint N is triggered - TMs don't send "old" non-materialized state 
(only mat. state + changelog after it)
6. Checkpoint N is completed, checkpoint (N - 1) is subsumed; all "old" 
non-materialized state is scheduled for async deletion
7. Checkpoint (N + 1) is triggered; but it is waiting for an IO thread to 
initialize the location

It *is* desirable to preserve this back-pressure from deletion to new 
checkpoints. But if possible, deletions should be spread more evenly. 
So I was thinking that distributing different task materializations evenly 
should reduce the wait time (although it does not eliminate it completely).
The other way is to adjust threads workings (which I think is a better way).

> Maybe we should think about some more fair thread pool for async jobs? For 
> example every async IO job could get assigned an id/key, and each id/key 
> would have it's own queue of tasks to perform. Based on that we could 
> implement all kinds of fancy priority schemes, but we could start with 
> something as simple as just going in a round robing fashion through all 
> individual per id/key queues when polling for a new task to execute. This 
> could be generic and flexible enough to be re-used in other use cases (I was 
> thinking about something like that for the TMs IO executor in the past).

I think only priorities won't work here because we'd need to assign different 
priorities depending on the "queue length" to preserve back-pressure.
If we always prioritize new checkpoints over deletions, we'll likely end up 
with OOMs (the case before the CheckpointCleaner was introduced).
Having different queues would work I think - but with a check of the length of 
the deletion queue.

> Re batching. Isn't this more of an independent potential optimisation that we 
> could consider independently of the main issue? Depending how long is single 
> IO operation. If it's more then a couple of ms, I would prefer to leave them 
> separate.
I think this can be viewed as an optimization if the problem solved by other 
means; or as an actual way to solve this problem.
The benefits of the batching solution are simplicity and lesser invasiveness.

> Triggered checkpoints can be delayed by discarding shared state
> ---------------------------------------------------------------
>
>                 Key: FLINK-26306
>                 URL: https://issues.apache.org/jira/browse/FLINK-26306
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Checkpointing
>    Affects Versions: 1.15.0, 1.14.3
>            Reporter: Roman Khachatryan
>            Assignee: Roman Khachatryan
>            Priority: Major
>             Fix For: 1.15.0
>
>
> Quick note: CheckpointCleaner is not involved here.
> When a checkpoint is subsumed, SharedStateRegistry schedules its unused 
> shared state for async deletion. It uses common IO pool for this and adds a 
> Runnable per state handle. ( see SharedStateRegistryImpl.scheduleAsyncDelete)
> When a checkpoint is started, CheckpointCoordinator uses the same thread pool 
> to initialize the location for it. (see 
> CheckpointCoordinator.initializeCheckpoint)
> The thread pool is of fixed size 
> [jobmanager.io-pool.size|https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#jobmanager-io-pool-size];
>  by default it's the number of CPU cores) and uses FIFO queue for tasks.
> When there is a spike in state deletion, the next checkpoint is delayed 
> waiting for an available IO thread.
> Back-pressure seems reasonable here (similar to CheckpointCleaner); however, 
> this shared state deletion could be spread across multiple subsequent 
> checkpoints, not neccesarily the next one.
> ---- 
> I believe the issue is an pre-existing one; but it particularly affects 
> changelog state backend, because 1) such spikes are likely there; 2) 
> workloads are latency sensitive.
> In the tests, checkpoint duration grows from seconds to minutes immediately 
> after the materialization.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to