This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit eabc3cef8a1bcb601116caff2a82eeac2616f631
Author: Rui Fan <[email protected]>
AuthorDate: Thu Jan 15 17:41:54 2026 +0100

    [FLINK-38541][checkpoint] Introducing config option: 
execution.checkpointing.unaligned.during-recovery.enabled
---
 .../generated/common_checkpointing_section.html    |  6 +++
 .../flink/configuration/CheckpointingOptions.java  | 47 ++++++++++++++++++++++
 .../configuration/CheckpointingOptionsTest.java    | 46 +++++++++++++++++++++
 3 files changed, 99 insertions(+)

diff --git 
a/docs/layouts/shortcodes/generated/common_checkpointing_section.html 
b/docs/layouts/shortcodes/generated/common_checkpointing_section.html
index 9db118174a3..f942529b79c 100644
--- a/docs/layouts/shortcodes/generated/common_checkpointing_section.html
+++ b/docs/layouts/shortcodes/generated/common_checkpointing_section.html
@@ -56,5 +56,11 @@
             <td>Integer</td>
             <td>The maximum number of completed checkpoints to retain.</td>
         </tr>
+        <tr>
+            
<td><h5>execution.checkpointing.unaligned.recover-output-on-downstream.enabled</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>Whether recovering output buffers of upstream task on 
downstream task directly when job restores from the unaligned checkpoint.</td>
+        </tr>
     </tbody>
 </table>
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java
index ce205077474..d2ccca8c1bc 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java
@@ -659,6 +659,7 @@ public class CheckpointingOptions {
                                     + "Each subtask will create a new channel 
state file when this is configured to 1.");
 
     @Experimental
+    @Documentation.Section(Documentation.Sections.COMMON_CHECKPOINTING)
     public static final ConfigOption<Boolean> 
UNALIGNED_RECOVER_OUTPUT_ON_DOWNSTREAM =
             ConfigOptions.key(
                             
"execution.checkpointing.unaligned.recover-output-on-downstream.enabled")
@@ -668,6 +669,32 @@ public class CheckpointingOptions {
                             "Whether recovering output buffers of upstream 
task on downstream task directly "
                                     + "when job restores from the unaligned 
checkpoint.");
 
+    @Experimental
+    @Documentation.ExcludeFromDocumentation(
+            "This option is not yet ready for public use, will be documented 
in a follow-up commit")
+    public static final ConfigOption<Boolean> 
UNALIGNED_DURING_RECOVERY_ENABLED =
+            
ConfigOptions.key("execution.checkpointing.unaligned.during-recovery.enabled")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "Whether to enable checkpointing 
during recovery from an unaligned checkpoint. "
+                                                    + "When enabled, the job 
can take checkpoints while still recovering channel state "
+                                                    + "(inflight data) from a 
previous unaligned checkpoint. This avoids the need to "
+                                                    + "wait for full recovery 
before the first checkpoint can be triggered, which "
+                                                    + "reduces the window of 
vulnerability to failures during recovery.")
+                                    .linebreak()
+                                    .linebreak()
+                                    .text(
+                                            "This option requires %s to be 
enabled. "
+                                                    + "It does not require 
unaligned checkpoints to be currently enabled, because "
+                                                    + "a job may restore from 
an unaligned checkpoint while having unaligned "
+                                                    + "checkpoints disabled 
for the new execution.",
+                                            TextElement.code(
+                                                    
UNALIGNED_RECOVER_OUTPUT_ON_DOWNSTREAM.key()))
+                                    .build());
+
     /**
      * Determines whether checkpointing is enabled based on the configuration.
      *
@@ -763,4 +790,24 @@ public class CheckpointingOptions {
         }
         return config.get(ENABLE_UNALIGNED_INTERRUPTIBLE_TIMERS);
     }
+
+    /**
+     * Determines whether unaligned checkpoint support during recovery is 
enabled.
+     *
+     * <p>This feature requires {@link 
#UNALIGNED_RECOVER_OUTPUT_ON_DOWNSTREAM} to be enabled. Note
+     * that it does not require unaligned checkpoints to be currently enabled, 
because a job may
+     * restore from an unaligned checkpoint while having unaligned checkpoints 
disabled for the new
+     * execution.
+     *
+     * @param config the configuration to check
+     * @return {@code true} if unaligned checkpoint during recovery is 
enabled, {@code false}
+     *     otherwise
+     */
+    @Internal
+    public static boolean isUnalignedDuringRecoveryEnabled(Configuration 
config) {
+        if (!config.get(UNALIGNED_RECOVER_OUTPUT_ON_DOWNSTREAM)) {
+            return false;
+        }
+        return config.get(UNALIGNED_DURING_RECOVERY_ENABLED);
+    }
 }
diff --git 
a/flink-core/src/test/java/org/apache/flink/configuration/CheckpointingOptionsTest.java
 
b/flink-core/src/test/java/org/apache/flink/configuration/CheckpointingOptionsTest.java
index 4104dfb762b..747f789af35 100644
--- 
a/flink-core/src/test/java/org/apache/flink/configuration/CheckpointingOptionsTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/configuration/CheckpointingOptionsTest.java
@@ -328,4 +328,50 @@ class CheckpointingOptionsTest {
                 .as("Interruptible timers should be disabled when runtime mode 
is BATCH")
                 .isFalse();
     }
+
+    @Test
+    void testIsUnalignedDuringRecoveryEnabled() {
+        // Test when both options are disabled (default) - should return false
+        Configuration defaultConfig = new Configuration();
+        
assertThat(CheckpointingOptions.isUnalignedDuringRecoveryEnabled(defaultConfig))
+                .as("During-recovery should be disabled by default")
+                .isFalse();
+
+        // Test when during-recovery is enabled but 
recover-output-on-downstream is disabled
+        Configuration onlyDuringRecoveryConfig = new Configuration();
+        
onlyDuringRecoveryConfig.set(CheckpointingOptions.UNALIGNED_DURING_RECOVERY_ENABLED,
 true);
+        
assertThat(CheckpointingOptions.isUnalignedDuringRecoveryEnabled(onlyDuringRecoveryConfig))
+                .as(
+                        "During-recovery should be disabled when 
recover-output-on-downstream is not enabled")
+                .isFalse();
+
+        // Test when recover-output-on-downstream is enabled but 
during-recovery is disabled
+        Configuration onlyRecoverOnDownstreamConfig = new Configuration();
+        onlyRecoverOnDownstreamConfig.set(
+                CheckpointingOptions.UNALIGNED_RECOVER_OUTPUT_ON_DOWNSTREAM, 
true);
+        assertThat(
+                        CheckpointingOptions.isUnalignedDuringRecoveryEnabled(
+                                onlyRecoverOnDownstreamConfig))
+                .as("During-recovery should be disabled when during-recovery 
option is not enabled")
+                .isFalse();
+
+        // Test when both options are enabled - should return true
+        Configuration bothEnabledConfig = new Configuration();
+        
bothEnabledConfig.set(CheckpointingOptions.UNALIGNED_RECOVER_OUTPUT_ON_DOWNSTREAM,
 true);
+        
bothEnabledConfig.set(CheckpointingOptions.UNALIGNED_DURING_RECOVERY_ENABLED, 
true);
+        
assertThat(CheckpointingOptions.isUnalignedDuringRecoveryEnabled(bothEnabledConfig))
+                .as(
+                        "During-recovery should be enabled when both 
recover-output-on-downstream and during-recovery are enabled")
+                .isTrue();
+
+        // Test when recover-output-on-downstream is explicitly false and 
during-recovery is true
+        Configuration explicitlyDisabledConfig = new Configuration();
+        explicitlyDisabledConfig.set(
+                CheckpointingOptions.UNALIGNED_RECOVER_OUTPUT_ON_DOWNSTREAM, 
false);
+        
explicitlyDisabledConfig.set(CheckpointingOptions.UNALIGNED_DURING_RECOVERY_ENABLED,
 true);
+        
assertThat(CheckpointingOptions.isUnalignedDuringRecoveryEnabled(explicitlyDisabledConfig))
+                .as(
+                        "During-recovery should be disabled when 
recover-output-on-downstream is explicitly false")
+                .isFalse();
+    }
 }

Reply via email to