This is an automated email from the ASF dual-hosted git repository. fanrui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit eabc3cef8a1bcb601116caff2a82eeac2616f631 Author: Rui Fan <[email protected]> AuthorDate: Thu Jan 15 17:41:54 2026 +0100 [FLINK-38541][checkpoint] Introducing config option: execution.checkpointing.unaligned.during-recovery.enabled --- .../generated/common_checkpointing_section.html | 6 +++ .../flink/configuration/CheckpointingOptions.java | 47 ++++++++++++++++++++++ .../configuration/CheckpointingOptionsTest.java | 46 +++++++++++++++++++++ 3 files changed, 99 insertions(+) diff --git a/docs/layouts/shortcodes/generated/common_checkpointing_section.html b/docs/layouts/shortcodes/generated/common_checkpointing_section.html index 9db118174a3..f942529b79c 100644 --- a/docs/layouts/shortcodes/generated/common_checkpointing_section.html +++ b/docs/layouts/shortcodes/generated/common_checkpointing_section.html @@ -56,5 +56,11 @@ <td>Integer</td> <td>The maximum number of completed checkpoints to retain.</td> </tr> + <tr> + <td><h5>execution.checkpointing.unaligned.recover-output-on-downstream.enabled</h5></td> + <td style="word-wrap: break-word;">false</td> + <td>Boolean</td> + <td>Whether recovering output buffers of upstream task on downstream task directly when job restores from the unaligned checkpoint.</td> + </tr> </tbody> </table> diff --git a/flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java index ce205077474..d2ccca8c1bc 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java @@ -659,6 +659,7 @@ public class CheckpointingOptions { + "Each subtask will create a new channel state file when this is configured to 1."); @Experimental + @Documentation.Section(Documentation.Sections.COMMON_CHECKPOINTING) public static final ConfigOption<Boolean> UNALIGNED_RECOVER_OUTPUT_ON_DOWNSTREAM = ConfigOptions.key( "execution.checkpointing.unaligned.recover-output-on-downstream.enabled") @@ -668,6 +669,32 @@ public class CheckpointingOptions { "Whether recovering output buffers of upstream task on downstream task directly " + "when job restores from the unaligned checkpoint."); + @Experimental + @Documentation.ExcludeFromDocumentation( + "This option is not yet ready for public use, will be documented in a follow-up commit") + public static final ConfigOption<Boolean> UNALIGNED_DURING_RECOVERY_ENABLED = + ConfigOptions.key("execution.checkpointing.unaligned.during-recovery.enabled") + .booleanType() + .defaultValue(false) + .withDescription( + Description.builder() + .text( + "Whether to enable checkpointing during recovery from an unaligned checkpoint. " + + "When enabled, the job can take checkpoints while still recovering channel state " + + "(inflight data) from a previous unaligned checkpoint. This avoids the need to " + + "wait for full recovery before the first checkpoint can be triggered, which " + + "reduces the window of vulnerability to failures during recovery.") + .linebreak() + .linebreak() + .text( + "This option requires %s to be enabled. " + + "It does not require unaligned checkpoints to be currently enabled, because " + + "a job may restore from an unaligned checkpoint while having unaligned " + + "checkpoints disabled for the new execution.", + TextElement.code( + UNALIGNED_RECOVER_OUTPUT_ON_DOWNSTREAM.key())) + .build()); + /** * Determines whether checkpointing is enabled based on the configuration. * @@ -763,4 +790,24 @@ public class CheckpointingOptions { } return config.get(ENABLE_UNALIGNED_INTERRUPTIBLE_TIMERS); } + + /** + * Determines whether unaligned checkpoint support during recovery is enabled. + * + * <p>This feature requires {@link #UNALIGNED_RECOVER_OUTPUT_ON_DOWNSTREAM} to be enabled. Note + * that it does not require unaligned checkpoints to be currently enabled, because a job may + * restore from an unaligned checkpoint while having unaligned checkpoints disabled for the new + * execution. + * + * @param config the configuration to check + * @return {@code true} if unaligned checkpoint during recovery is enabled, {@code false} + * otherwise + */ + @Internal + public static boolean isUnalignedDuringRecoveryEnabled(Configuration config) { + if (!config.get(UNALIGNED_RECOVER_OUTPUT_ON_DOWNSTREAM)) { + return false; + } + return config.get(UNALIGNED_DURING_RECOVERY_ENABLED); + } } diff --git a/flink-core/src/test/java/org/apache/flink/configuration/CheckpointingOptionsTest.java b/flink-core/src/test/java/org/apache/flink/configuration/CheckpointingOptionsTest.java index 4104dfb762b..747f789af35 100644 --- a/flink-core/src/test/java/org/apache/flink/configuration/CheckpointingOptionsTest.java +++ b/flink-core/src/test/java/org/apache/flink/configuration/CheckpointingOptionsTest.java @@ -328,4 +328,50 @@ class CheckpointingOptionsTest { .as("Interruptible timers should be disabled when runtime mode is BATCH") .isFalse(); } + + @Test + void testIsUnalignedDuringRecoveryEnabled() { + // Test when both options are disabled (default) - should return false + Configuration defaultConfig = new Configuration(); + assertThat(CheckpointingOptions.isUnalignedDuringRecoveryEnabled(defaultConfig)) + .as("During-recovery should be disabled by default") + .isFalse(); + + // Test when during-recovery is enabled but recover-output-on-downstream is disabled + Configuration onlyDuringRecoveryConfig = new Configuration(); + onlyDuringRecoveryConfig.set(CheckpointingOptions.UNALIGNED_DURING_RECOVERY_ENABLED, true); + assertThat(CheckpointingOptions.isUnalignedDuringRecoveryEnabled(onlyDuringRecoveryConfig)) + .as( + "During-recovery should be disabled when recover-output-on-downstream is not enabled") + .isFalse(); + + // Test when recover-output-on-downstream is enabled but during-recovery is disabled + Configuration onlyRecoverOnDownstreamConfig = new Configuration(); + onlyRecoverOnDownstreamConfig.set( + CheckpointingOptions.UNALIGNED_RECOVER_OUTPUT_ON_DOWNSTREAM, true); + assertThat( + CheckpointingOptions.isUnalignedDuringRecoveryEnabled( + onlyRecoverOnDownstreamConfig)) + .as("During-recovery should be disabled when during-recovery option is not enabled") + .isFalse(); + + // Test when both options are enabled - should return true + Configuration bothEnabledConfig = new Configuration(); + bothEnabledConfig.set(CheckpointingOptions.UNALIGNED_RECOVER_OUTPUT_ON_DOWNSTREAM, true); + bothEnabledConfig.set(CheckpointingOptions.UNALIGNED_DURING_RECOVERY_ENABLED, true); + assertThat(CheckpointingOptions.isUnalignedDuringRecoveryEnabled(bothEnabledConfig)) + .as( + "During-recovery should be enabled when both recover-output-on-downstream and during-recovery are enabled") + .isTrue(); + + // Test when recover-output-on-downstream is explicitly false and during-recovery is true + Configuration explicitlyDisabledConfig = new Configuration(); + explicitlyDisabledConfig.set( + CheckpointingOptions.UNALIGNED_RECOVER_OUTPUT_ON_DOWNSTREAM, false); + explicitlyDisabledConfig.set(CheckpointingOptions.UNALIGNED_DURING_RECOVERY_ENABLED, true); + assertThat(CheckpointingOptions.isUnalignedDuringRecoveryEnabled(explicitlyDisabledConfig)) + .as( + "During-recovery should be disabled when recover-output-on-downstream is explicitly false") + .isFalse(); + } }
