[ 
https://issues.apache.org/jira/browse/FLINK-39267?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Liu updated FLINK-39267:
------------------------
    Description: 
h2. Motivation

Currently, checkpoint configuration parameters such as 
{{execution.checkpointing.timeout}} are fixed at job submission time and cannot 
be changed without restarting the job. This creates operational pain in several 
real-world scenarios:

*Scenario 1: Resolving consecutive checkpoint timeout failures*

When a job experiences consecutive checkpoint timeout failures (e.g., due to 
state growth or temporary I/O slowdowns), users often urgently need a 
successful checkpoint for:
 * Committing offsets in exactly-once sinks (e.g., Kafka transactions)
 * Performing job rescaling (which requires a recent checkpoint/savepoint)
 * Preventing job failure when 
{{execution.checkpointing.tolerable-failed-checkpoints}} is exceeded

The current workaround — stop-with-savepoint -> modify config -> restart — 
introduces significant downtime and is impractical when the checkpoint itself 
is failing.

*Scenario 2: Adapting to changing workload characteristics*

Long-running streaming jobs may experience varying checkpoint durations across 
different time periods (e.g., peak vs. off-peak hours, backlog processing vs. 
normal processing). A static timeout value forces users to choose between:
 * A large timeout that delays failure detection when checkpoints are truly 
stuck
 * A small timeout that causes unnecessary failures during legitimate slow 
checkpoints

*Scenario 3: Avoiding wasted checkpoint work (Future Phase)*

When a checkpoint has completed 80%+ of task acknowledgements but is about to 
expire, all the snapshot I/O work is wasted. Dynamically extending the timeout 
could save significant cluster resources. This is proposed as a future 
enhancement (see "Phased Approach" below).
h2. Phased Approach

This improvement is designed to be implemented incrementally:

*Phase 1 (This Issue): Dynamic checkpoint timeout*
 * Change {{CheckpointCoordinator.checkpointTimeout}} from {{final}} to 
{{{}volatile{}}}.
 * Add a setter method to update the timeout at runtime.
 * Expose a new REST API endpoint for runtime configuration updates.
 * The new timeout takes effect for the next triggered checkpoint. Already 
in-flight checkpoints (whose {{CheckpointCanceller}} has been scheduled) are 
not affected.
 * This follows the same design pattern as {{setIsProcessingBacklog()}} 
(FLIP-309), which dynamically switches checkpoint intervals at runtime without 
affecting in-flight checkpoints.

*Phase 2 (Follow-up): Additional checkpoint parameters*
 * Extend the REST API to support dynamically updating other checkpoint 
configuration parameters, such as {{checkpointInterval}} and 
{{{}minPauseBetweenCheckpoints{}}}, following the same pattern.

*Phase 3 (Future): Timeout extension for in-flight checkpoints*
 * Reschedule the {{CheckpointCanceller}} for pending checkpoints when timeout 
is updated, so that the new timeout also applies to currently running 
checkpoints.
 * This requires modifying {{PendingCheckpoint}} to support resetting its 
canceller handle (currently {{setCancellerHandle()}} throws 
{{IllegalStateException}} if called twice).
 * Edge cases (new timeout already elapsed, concurrent modifications) need 
careful design.

h2. Public API Changes

New REST API endpoint:
{code:java|title=Endpoint Definition}
PATCH /jobs/:jobid/checkpointing/configuration

Request body (Phase 1):
{
"checkpointTimeout": 600000
}

Response: 200 OK
{code}
The endpoint path {{/checkpointing/configuration}} is intentionally designed to 
be extensible — additional parameters (interval, minPause, etc.) can be added 
to the request body in Phase 2 without changing the API contract.

This is consistent with the existing {{PUT /jobs/:jobid/resource-requirements}} 
pattern for runtime job configuration updates.
h2. Design Details (Phase 1)
 * The {{checkpointTimeout}} field in {{CheckpointCoordinator}} is read once 
per checkpoint in {{createPendingCheckpoint()}} when scheduling the 
{{{}CheckpointCanceller{}}}. Making it {{volatile}} ensures visibility across 
threads with minimal performance impact.
 * No changes needed to {{{}PendingCheckpoint{}}}, 
{{{}CheckpointCoordinatorConfiguration{}}}, or Task-side code.
 * The REST endpoint routes through {{RestfulGateway}} -> {{JobMaster}} -> 
{{{}CheckpointCoordinator{}}}.
 * Validation: the new timeout must be a positive value.

Core change in CheckpointCoordinator:
{code:java}
// Change field from:
private final long checkpointTimeout;
// To:
private volatile long checkpointTimeout;

// New setter method:
public void setCheckpointTimeout(long newTimeout) {
Preconditions.checkArgument(newTimeout > 0,
"Checkpoint timeout must be positive, but was %s", newTimeout);
this.checkpointTimeout = newTimeout;
LOG.info("Checkpoint timeout for job {} updated to {} ms.", job, newTimeout);
}
{code}
h2. Compatibility, Deprecation, and Migration Plan

Fully backward compatible. If the REST API is not called, behavior is identical 
to the current implementation.

No configuration deprecation.

No changes to existing REST APIs.
h2. Related Issues / Prior Art

FLIP-309 ({{{}setIsProcessingBacklog{}}}) — dynamic checkpoint interval 
switching at runtime, same pattern

FLIP-160 / {{JobResourcesRequirementsUpdateHeaders}} — REST API for runtime job 
config updates

  was:
### Motivation

Currently, checkpoint configuration parameters such as 
`execution.checkpointing.timeout` are fixed at job submission time and cannot 
be changed without restarting the job. This creates operational pain in several 
real-world scenarios:

**Scenario 1: Resolving consecutive checkpoint timeout failures**

When a job experiences consecutive checkpoint timeout failures (e.g., due to 
state growth or temporary I/O slowdowns), users often urgently need a 
successful checkpoint for:
- Committing offsets in exactly-once sinks (e.g., Kafka transactions)
- Performing job rescaling (which requires a recent checkpoint/savepoint)
- Preventing job failure when 
`execution.checkpointing.tolerable-failed-checkpoints` is exceeded

The current workaround — stop-with-savepoint → modify config → restart — 
introduces significant downtime and is impractical when the checkpoint itself 
is failing.

**Scenario 2: Adapting to changing workload characteristics**

Long-running streaming jobs may experience varying checkpoint durations across 
different time periods (e.g., peak vs. off-peak hours, backlog processing vs. 
normal processing). A static timeout value forces users to choose between:
- A large timeout that delays failure detection when checkpoints are truly stuck
- A small timeout that causes unnecessary failures during legitimate slow 
checkpoints

**Scenario 3: Avoiding wasted checkpoint work (Future Phase)**

When a checkpoint has completed 80%+ of task acknowledgements but is about to 
expire, all the snapshot I/O work is wasted. Dynamically extending the timeout 
could save significant cluster resources. This is proposed as a future 
enhancement (see "Phased Approach" below).

### Phased Approach

This improvement is designed to be implemented incrementally:

**Phase 1 (This Issue): Dynamic checkpoint timeout**
- Change `CheckpointCoordinator.checkpointTimeout` from `final` to `volatile`.
- Add a setter method to update the timeout at runtime.
- Expose a new REST API endpoint for runtime configuration updates.
- The new timeout takes effect for the **next** triggered checkpoint. Already 
in-flight checkpoints (whose `CheckpointCanceller` has been scheduled) are not 
affected.
- This follows the same design pattern as `setIsProcessingBacklog()` 
(FLIP-309), which dynamically switches checkpoint intervals at runtime without 
affecting in-flight checkpoints.

**Phase 2 (Follow-up): Additional checkpoint parameters**
- Extend the REST API to support dynamically updating other checkpoint 
configuration parameters, such as `checkpointInterval` and 
`minPauseBetweenCheckpoints`, following the same pattern.

**Phase 3 (Future): Timeout extension for in-flight checkpoints**
- Reschedule the `CheckpointCanceller` for pending checkpoints when timeout is 
updated, so that the new timeout also applies to currently running checkpoints.
- This requires modifying `PendingCheckpoint` to support resetting its 
canceller handle (currently `setCancellerHandle()` throws 
`IllegalStateException` if called twice).
- Edge cases (new timeout already elapsed, concurrent modifications) need 
careful design.

### Public API Changes

**New REST API endpoint:**

```
PATCH /jobs/:jobid/checkpointing/configuration

Request body (Phase 1):
{
"checkpointTimeout": 600000
}

Response: 200 OK
```

The endpoint path `/checkpointing/configuration` is intentionally designed to 
be extensible — additional parameters (interval, minPause, etc.) can be added 
to the request body in Phase 2 without changing the API contract.

This is consistent with the existing `PUT /jobs/:jobid/resource-requirements` 
pattern for runtime job configuration updates.

### Design Details (Phase 1)

- The `checkpointTimeout` field in `CheckpointCoordinator` is read once per 
checkpoint in `createPendingCheckpoint()` when scheduling the 
`CheckpointCanceller`. Making it `volatile` ensures visibility across threads 
with minimal performance impact.
- No changes needed to `PendingCheckpoint`, 
`CheckpointCoordinatorConfiguration`, or Task-side code.
- The REST endpoint routes through `RestfulGateway` → `JobMaster` → 
`CheckpointCoordinator`.
- Validation: the new timeout must be a positive value.

**Core change in CheckpointCoordinator:**

```java
// Change field from:
private final long checkpointTimeout;
// To:
private volatile long checkpointTimeout;

// New setter method:
public void setCheckpointTimeout(long newTimeout) {
Preconditions.checkArgument(newTimeout > 0,
"Checkpoint timeout must be positive, but was %s", newTimeout);
this.checkpointTimeout = newTimeout;
LOG.info("Checkpoint timeout for job {} updated to {} ms.", job, newTimeout);
}
```

### Compatibility, Deprecation, and Migration Plan

- Fully backward compatible. If the REST API is not called, behavior is 
identical to the current implementation.
- No configuration deprecation.
- No changes to existing REST APIs.

### Related Issues / Prior Art

- **FLIP-309** (`setIsProcessingBacklog`) — dynamic checkpoint interval 
switching at runtime, same pattern
- **FLIP-160** / `JobResourcesRequirementsUpdateHeaders` — REST API for runtime 
job config updates


> Support dynamically updating checkpoint configuration at runtime via REST API
> -----------------------------------------------------------------------------
>
>                 Key: FLINK-39267
>                 URL: https://issues.apache.org/jira/browse/FLINK-39267
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Checkpointing
>            Reporter: Liu
>            Priority: Major
>
> h2. Motivation
> Currently, checkpoint configuration parameters such as 
> {{execution.checkpointing.timeout}} are fixed at job submission time and 
> cannot be changed without restarting the job. This creates operational pain 
> in several real-world scenarios:
> *Scenario 1: Resolving consecutive checkpoint timeout failures*
> When a job experiences consecutive checkpoint timeout failures (e.g., due to 
> state growth or temporary I/O slowdowns), users often urgently need a 
> successful checkpoint for:
>  * Committing offsets in exactly-once sinks (e.g., Kafka transactions)
>  * Performing job rescaling (which requires a recent checkpoint/savepoint)
>  * Preventing job failure when 
> {{execution.checkpointing.tolerable-failed-checkpoints}} is exceeded
> The current workaround — stop-with-savepoint -> modify config -> restart — 
> introduces significant downtime and is impractical when the checkpoint itself 
> is failing.
> *Scenario 2: Adapting to changing workload characteristics*
> Long-running streaming jobs may experience varying checkpoint durations 
> across different time periods (e.g., peak vs. off-peak hours, backlog 
> processing vs. normal processing). A static timeout value forces users to 
> choose between:
>  * A large timeout that delays failure detection when checkpoints are truly 
> stuck
>  * A small timeout that causes unnecessary failures during legitimate slow 
> checkpoints
> *Scenario 3: Avoiding wasted checkpoint work (Future Phase)*
> When a checkpoint has completed 80%+ of task acknowledgements but is about to 
> expire, all the snapshot I/O work is wasted. Dynamically extending the 
> timeout could save significant cluster resources. This is proposed as a 
> future enhancement (see "Phased Approach" below).
> h2. Phased Approach
> This improvement is designed to be implemented incrementally:
> *Phase 1 (This Issue): Dynamic checkpoint timeout*
>  * Change {{CheckpointCoordinator.checkpointTimeout}} from {{final}} to 
> {{{}volatile{}}}.
>  * Add a setter method to update the timeout at runtime.
>  * Expose a new REST API endpoint for runtime configuration updates.
>  * The new timeout takes effect for the next triggered checkpoint. Already 
> in-flight checkpoints (whose {{CheckpointCanceller}} has been scheduled) are 
> not affected.
>  * This follows the same design pattern as {{setIsProcessingBacklog()}} 
> (FLIP-309), which dynamically switches checkpoint intervals at runtime 
> without affecting in-flight checkpoints.
> *Phase 2 (Follow-up): Additional checkpoint parameters*
>  * Extend the REST API to support dynamically updating other checkpoint 
> configuration parameters, such as {{checkpointInterval}} and 
> {{{}minPauseBetweenCheckpoints{}}}, following the same pattern.
> *Phase 3 (Future): Timeout extension for in-flight checkpoints*
>  * Reschedule the {{CheckpointCanceller}} for pending checkpoints when 
> timeout is updated, so that the new timeout also applies to currently running 
> checkpoints.
>  * This requires modifying {{PendingCheckpoint}} to support resetting its 
> canceller handle (currently {{setCancellerHandle()}} throws 
> {{IllegalStateException}} if called twice).
>  * Edge cases (new timeout already elapsed, concurrent modifications) need 
> careful design.
> h2. Public API Changes
> New REST API endpoint:
> {code:java|title=Endpoint Definition}
> PATCH /jobs/:jobid/checkpointing/configuration
> Request body (Phase 1):
> {
> "checkpointTimeout": 600000
> }
> Response: 200 OK
> {code}
> The endpoint path {{/checkpointing/configuration}} is intentionally designed 
> to be extensible — additional parameters (interval, minPause, etc.) can be 
> added to the request body in Phase 2 without changing the API contract.
> This is consistent with the existing {{PUT 
> /jobs/:jobid/resource-requirements}} pattern for runtime job configuration 
> updates.
> h2. Design Details (Phase 1)
>  * The {{checkpointTimeout}} field in {{CheckpointCoordinator}} is read once 
> per checkpoint in {{createPendingCheckpoint()}} when scheduling the 
> {{{}CheckpointCanceller{}}}. Making it {{volatile}} ensures visibility across 
> threads with minimal performance impact.
>  * No changes needed to {{{}PendingCheckpoint{}}}, 
> {{{}CheckpointCoordinatorConfiguration{}}}, or Task-side code.
>  * The REST endpoint routes through {{RestfulGateway}} -> {{JobMaster}} -> 
> {{{}CheckpointCoordinator{}}}.
>  * Validation: the new timeout must be a positive value.
> Core change in CheckpointCoordinator:
> {code:java}
> // Change field from:
> private final long checkpointTimeout;
> // To:
> private volatile long checkpointTimeout;
> // New setter method:
> public void setCheckpointTimeout(long newTimeout) {
> Preconditions.checkArgument(newTimeout > 0,
> "Checkpoint timeout must be positive, but was %s", newTimeout);
> this.checkpointTimeout = newTimeout;
> LOG.info("Checkpoint timeout for job {} updated to {} ms.", job, newTimeout);
> }
> {code}
> h2. Compatibility, Deprecation, and Migration Plan
> Fully backward compatible. If the REST API is not called, behavior is 
> identical to the current implementation.
> No configuration deprecation.
> No changes to existing REST APIs.
> h2. Related Issues / Prior Art
> FLIP-309 ({{{}setIsProcessingBacklog{}}}) — dynamic checkpoint interval 
> switching at runtime, same pattern
> FLIP-160 / {{JobResourcesRequirementsUpdateHeaders}} — REST API for runtime 
> job config updates



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to