This is an automated email from the ASF dual-hosted git repository.

zakelly pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f3239e09c2bef6fa00599ffa084895d16777f7f8
Author: Zakelly <zakelly....@gmail.com>
AuthorDate: Wed Sep 18 15:51:59 2024 +0800

    [FLINK-34511][Checkpoint] Cleanup `CheckpointConfig`
---
 .../reference/pyflink.datastream/checkpoint.rst    |  2 +
 .../pyflink/datastream/checkpoint_config.py        | 43 ++++++++++--
 .../api/environment/CheckpointConfig.java          | 76 ----------------------
 ...heckpointExceptionHandlerConfigurationTest.java | 51 ---------------
 .../checkpointing/IgnoreInFlightDataITCase.java    |  2 +-
 .../UnalignedCheckpointCompatibilityITCase.java    |  2 +-
 .../checkpointing/UnalignedCheckpointTestBase.java |  3 +-
 7 files changed, 45 insertions(+), 134 deletions(-)

diff --git a/flink-python/docs/reference/pyflink.datastream/checkpoint.rst 
b/flink-python/docs/reference/pyflink.datastream/checkpoint.rst
index d8abff3e2b7..66b0a4de4a4 100644
--- a/flink-python/docs/reference/pyflink.datastream/checkpoint.rst
+++ b/flink-python/docs/reference/pyflink.datastream/checkpoint.rst
@@ -50,6 +50,8 @@ Configuration that captures all checkpointing related 
settings.
     CheckpointConfig.is_unaligned_checkpoints_enabled
     CheckpointConfig.enable_unaligned_checkpoints
     CheckpointConfig.disable_unaligned_checkpoints
+    CheckpointConfig.set_aligned_checkpoint_timeout
+    CheckpointConfig.get_aligned_checkpoint_timeout
     CheckpointConfig.set_alignment_timeout
     CheckpointConfig.get_alignment_timeout
     CheckpointConfig.set_force_unaligned_checkpoints
diff --git a/flink-python/pyflink/datastream/checkpoint_config.py 
b/flink-python/pyflink/datastream/checkpoint_config.py
index 2beae636f7e..4fb0a34da84 100644
--- a/flink-python/pyflink/datastream/checkpoint_config.py
+++ b/flink-python/pyflink/datastream/checkpoint_config.py
@@ -166,8 +166,10 @@ class CheckpointConfig(object):
         decline the failed checkpoint.
 
         :return: ``True`` if failing on checkpointing errors, false otherwise.
+
+        .. note:: Deprecated in 2.0. Use 
:func:`get_tolerable_checkpoint_failure_number` instead.
         """
-        return self._j_checkpoint_config.isFailOnCheckpointingErrors()
+        return self.get_tolerable_checkpoint_failure_number() == 0
 
     def set_fail_on_checkpointing_errors(self,
                                          fail_on_checkpointing_errors: bool) 
-> 'CheckpointConfig':
@@ -184,8 +186,13 @@ class CheckpointConfig(object):
 
         :param fail_on_checkpointing_errors: ``True`` if failing on 
checkpointing errors,
                                              false otherwise.
+
+        .. note:: Deprecated in 2.0. Use 
:func:`set_tolerable_checkpoint_failure_number` instead.
         """
-        
self._j_checkpoint_config.setFailOnCheckpointingErrors(fail_on_checkpointing_errors)
+        if fail_on_checkpointing_errors:
+            self.set_tolerable_checkpoint_failure_number(0)
+        else:
+            self.set_tolerable_checkpoint_failure_number(2147483647)
         return self
 
     def get_tolerable_checkpoint_failure_number(self) -> int:
@@ -316,6 +323,30 @@ class CheckpointConfig(object):
         self.enable_unaligned_checkpoints(False)
         return self
 
+    def set_aligned_checkpoint_timeout(self, alignment_timeout: Duration) -> 
'CheckpointConfig':
+        """
+        Only relevant if :func:`enable_unaligned_checkpoints` is enabled.
+
+        If ``alignment_timeout`` has value equal to ``0``, checkpoints will 
always start unaligned.
+        If ``alignment_timeout`` has value greater then ``0``, checkpoints 
will start aligned. If
+        during checkpointing, checkpoint start delay exceeds this 
``alignment_timeout``, alignment
+        will timeout and checkpoint will start working as unaligned checkpoint.
+
+        :param alignment_timeout: The duration until the aligned checkpoint 
will be converted into
+                                  an unaligned checkpoint.
+        """
+        
self._j_checkpoint_config.setAlignedCheckpointTimeout(alignment_timeout._j_duration)
+        return self
+
+    def get_aligned_checkpoint_timeout(self) -> 'Duration':
+        """
+        Returns the alignment timeout, as configured via 
:func:`set_alignment_timeout` or
+        
``org.apache.flink.configuration.CheckpointingOptions#ALIGNED_CHECKPOINT_TIMEOUT``.
+
+        :return: the alignment timeout.
+        """
+        return 
Duration(self._j_checkpoint_config.getAlignedCheckpointTimeout())
+
     def set_alignment_timeout(self, alignment_timeout: Duration) -> 
'CheckpointConfig':
         """
         Only relevant if :func:`enable_unaligned_checkpoints` is enabled.
@@ -327,8 +358,10 @@ class CheckpointConfig(object):
 
         :param alignment_timeout: The duration until the aligned checkpoint 
will be converted into
                                   an unaligned checkpoint.
+
+        .. note:: Deprecated in 2.0. Use 
:func:`set_aligned_checkpoint_timeout` instead.
         """
-        
self._j_checkpoint_config.setAlignmentTimeout(alignment_timeout._j_duration)
+        self.set_aligned_checkpoint_timeout(alignment_timeout)
         return self
 
     def get_alignment_timeout(self) -> 'Duration':
@@ -337,8 +370,10 @@ class CheckpointConfig(object):
         
``org.apache.flink.configuration.CheckpointingOptions#ALIGNED_CHECKPOINT_TIMEOUT``.
 
         :return: the alignment timeout.
+
+        .. note:: Deprecated in 2.0. Use 
:func:`get_aligned_checkpoint_timeout` instead.
         """
-        return Duration(self._j_checkpoint_config.getAlignmentTimeout())
+        return self.get_aligned_checkpoint_timeout()
 
     def set_force_unaligned_checkpoints(
             self,
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
index 7c45701ed08..62d8b5bad26 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
@@ -37,7 +37,6 @@ import org.slf4j.LoggerFactory;
 
 import java.time.Duration;
 
-import static 
org.apache.flink.runtime.checkpoint.CheckpointFailureManager.UNLIMITED_TOLERABLE_FAILURE_NUMBER;
 import static 
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration.MINIMAL_CHECKPOINT_TIME;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -351,51 +350,6 @@ public class CheckpointConfig implements 
java.io.Serializable {
         configuration.set(CheckpointingOptions.FORCE_UNALIGNED, 
forceUnalignedCheckpoints);
     }
 
-    /**
-     * This determines the behaviour when meeting checkpoint errors. If this 
returns true, which is
-     * equivalent to get tolerableCheckpointFailureNumber as zero, job manager 
would fail the whole
-     * job once it received a decline checkpoint message. If this returns 
false, which is equivalent
-     * to get tolerableCheckpointFailureNumber as the maximum of integer 
(means unlimited), job
-     * manager would not fail the whole job no matter how many declined 
checkpoints it received.
-     *
-     * @deprecated Use {@link #getTolerableCheckpointFailureNumber()}.
-     */
-    @Deprecated
-    public boolean isFailOnCheckpointingErrors() {
-        return getTolerableCheckpointFailureNumber() == 0;
-    }
-
-    /**
-     * Sets the expected behaviour for tasks in case that they encounter an 
error when
-     * checkpointing. If this is set as true, which is equivalent to set
-     * tolerableCheckpointFailureNumber as zero, job manager would fail the 
whole job once it
-     * received a decline checkpoint message. If this is set as false, which 
is equivalent to set
-     * tolerableCheckpointFailureNumber as the maximum of integer (means 
unlimited), job manager
-     * would not fail the whole job no matter how many declined checkpoints it 
received.
-     *
-     * <p>{@link #setTolerableCheckpointFailureNumber(int)} would always 
overrule this deprecated
-     * method if they have conflicts.
-     *
-     * @deprecated Use {@link #setTolerableCheckpointFailureNumber(int)}.
-     */
-    @Deprecated
-    public void setFailOnCheckpointingErrors(boolean 
failOnCheckpointingErrors) {
-        if 
(configuration.getOptional(CheckpointingOptions.TOLERABLE_FAILURE_NUMBER).isPresent())
 {
-            LOG.warn(
-                    "Since CheckpointingOptions.TOLERABLE_FAILURE_NUMBER has 
been configured as {}, deprecated "
-                            + "#setFailOnCheckpointingErrors(boolean) method 
would not take any effect and please use "
-                            + "#setTolerableCheckpointFailureNumber(int) 
method to "
-                            + "determine your expected behaviour when 
checkpoint errors on task side.",
-                    getTolerableCheckpointFailureNumber());
-            return;
-        }
-        if (failOnCheckpointingErrors) {
-            setTolerableCheckpointFailureNumber(0);
-        } else {
-            
setTolerableCheckpointFailureNumber(UNLIMITED_TOLERABLE_FAILURE_NUMBER);
-        }
-    }
-
     /**
      * Get the defined number of consecutive checkpoint failures that will be 
tolerated, before the
      * whole job is failed over.
@@ -515,36 +469,6 @@ public class CheckpointConfig implements 
java.io.Serializable {
         return 
configuration.get(CheckpointingOptions.ENABLE_UNALIGNED_INTERRUPTIBLE_TIMERS);
     }
 
-    /**
-     * Only relevant if {@link #isUnalignedCheckpointsEnabled} is enabled.
-     *
-     * <p>If {@link CheckpointingOptions#ALIGNED_CHECKPOINT_TIMEOUT} has value 
equal to <code>0
-     * </code>, checkpoints will always start unaligned.
-     *
-     * <p>If {@link CheckpointingOptions#ALIGNED_CHECKPOINT_TIMEOUT} has value 
greater then <code>0
-     * </code>, checkpoints will start aligned. If during checkpointing, 
checkpoint start delay
-     * exceeds this {@link CheckpointingOptions#ALIGNED_CHECKPOINT_TIMEOUT}, 
alignment will timeout
-     * and checkpoint will start working as unaligned checkpoint.
-     *
-     * @deprecated Use {@link #setAlignedCheckpointTimeout(Duration)} instead.
-     */
-    @Deprecated
-    @PublicEvolving
-    public void setAlignmentTimeout(Duration alignmentTimeout) {
-        setAlignedCheckpointTimeout(alignmentTimeout);
-    }
-
-    /**
-     * @return value of alignment timeout, as configured via {@link 
#setAlignmentTimeout(Duration)}
-     *     or {@link CheckpointingOptions#ALIGNED_CHECKPOINT_TIMEOUT}.
-     * @deprecated User {@link #getAlignedCheckpointTimeout()} instead.
-     */
-    @Deprecated
-    @PublicEvolving
-    public Duration getAlignmentTimeout() {
-        return getAlignedCheckpointTimeout();
-    }
-
     /**
      * @return value of alignment timeout, as configured via {@link
      *     #setAlignedCheckpointTimeout(Duration)} or {@link
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandlerConfigurationTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandlerConfigurationTest.java
index 6284c0b8854..ca83757125d 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandlerConfigurationTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandlerConfigurationTest.java
@@ -18,11 +18,8 @@
 
 package org.apache.flink.streaming.runtime.tasks;
 
-import org.apache.flink.runtime.checkpoint.CheckpointFailureManager;
 import org.apache.flink.streaming.api.environment.CheckpointConfig;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
 
 import org.junit.jupiter.api.Test;
 
@@ -38,7 +35,6 @@ class CheckpointExceptionHandlerConfigurationTest {
         StreamExecutionEnvironment streamExecutionEnvironment =
                 StreamExecutionEnvironment.getExecutionEnvironment();
         CheckpointConfig checkpointConfig = 
streamExecutionEnvironment.getCheckpointConfig();
-        assertThat(checkpointConfig.isFailOnCheckpointingErrors()).isTrue();
         
assertThat(checkpointConfig.getTolerableCheckpointFailureNumber()).isZero();
     }
 
@@ -48,54 +44,7 @@ class CheckpointExceptionHandlerConfigurationTest {
                 StreamExecutionEnvironment.getExecutionEnvironment();
         CheckpointConfig checkpointConfig = 
streamExecutionEnvironment.getCheckpointConfig();
 
-        // use deprecated API to set not fail on checkpoint errors
-        checkpointConfig.setFailOnCheckpointingErrors(false);
-        assertThat(checkpointConfig.isFailOnCheckpointingErrors()).isFalse();
-        assertThat(checkpointConfig.getTolerableCheckpointFailureNumber())
-                
.isEqualTo(CheckpointFailureManager.UNLIMITED_TOLERABLE_FAILURE_NUMBER);
-
-        // use new API to set tolerable declined checkpoint number
         checkpointConfig.setTolerableCheckpointFailureNumber(5);
         
assertThat(checkpointConfig.getTolerableCheckpointFailureNumber()).isEqualTo(5);
-
-        // after we configure the tolerable declined checkpoint number, 
deprecated API would not
-        // take effect
-        checkpointConfig.setFailOnCheckpointingErrors(true);
-        
assertThat(checkpointConfig.getTolerableCheckpointFailureNumber()).isEqualTo(5);
-    }
-
-    @Test
-    void testPropagationFailFromCheckpointConfig() {
-        try {
-            doTestPropagationFromCheckpointConfig(true);
-        } catch (IllegalArgumentException ignored) {
-            // ignored
-        }
-    }
-
-    @Test
-    void testPropagationDeclineFromCheckpointConfig() {
-        doTestPropagationFromCheckpointConfig(false);
-    }
-
-    public void doTestPropagationFromCheckpointConfig(boolean 
failTaskOnCheckpointErrors) {
-        StreamExecutionEnvironment streamExecutionEnvironment =
-                StreamExecutionEnvironment.getExecutionEnvironment();
-        streamExecutionEnvironment.setParallelism(1);
-        
streamExecutionEnvironment.getCheckpointConfig().setCheckpointInterval(1000);
-        streamExecutionEnvironment
-                .getCheckpointConfig()
-                .setFailOnCheckpointingErrors(failTaskOnCheckpointErrors);
-        streamExecutionEnvironment
-                .addSource(
-                        new SourceFunction<Integer>() {
-
-                            @Override
-                            public void run(SourceContext<Integer> ctx) {}
-
-                            @Override
-                            public void cancel() {}
-                        })
-                .sinkTo(new DiscardingSink<>());
     }
 }
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/IgnoreInFlightDataITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/IgnoreInFlightDataITCase.java
index 739300b319a..064f05f7d26 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/IgnoreInFlightDataITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/IgnoreInFlightDataITCase.java
@@ -122,7 +122,7 @@ public class IgnoreInFlightDataITCase extends TestLogger {
         env.enableCheckpointing(checkpointInterval *= 2);
         env.disableOperatorChaining();
         env.getCheckpointConfig().enableUnalignedCheckpoints();
-        env.getCheckpointConfig().setAlignmentTimeout(Duration.ZERO);
+        env.getCheckpointConfig().setAlignedCheckpointTimeout(Duration.ZERO);
         
env.getCheckpointConfig().setCheckpointingConsistencyMode(CheckpointingMode.EXACTLY_ONCE);
         env.getCheckpointConfig().setCheckpointIdOfIgnoredInFlightData(1);
         RestartStrategyUtils.configureFixedDelayRestartStrategy(env, 1, 0L);
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointCompatibilityITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointCompatibilityITCase.java
index db21bd203f9..4e4d0a325c0 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointCompatibilityITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointCompatibilityITCase.java
@@ -199,7 +199,7 @@ public class UnalignedCheckpointCompatibilityITCase extends 
TestLogger {
         env.setParallelism(PARALLELISM);
         RestartStrategyUtils.configureNoRestartStrategy(env);
         env.getCheckpointConfig().enableUnalignedCheckpoints(!isAligned);
-        env.getCheckpointConfig().setAlignmentTimeout(Duration.ZERO);
+        env.getCheckpointConfig().setAlignedCheckpointTimeout(Duration.ZERO);
         
env.getCheckpointConfig().setExternalizedCheckpointRetention(RETAIN_ON_CANCELLATION);
         if (checkpointingInterval > 0) {
             env.enableCheckpointing(checkpointingInterval);
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java
index 46cdb2bf19b..a264c94bd3f 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java
@@ -741,7 +741,8 @@ public abstract class UnalignedCheckpointTestBase extends 
TestLogger {
 
         public void configure(StreamExecutionEnvironment env) {
             env.enableCheckpointing(Math.max(100L, parallelism * 50L));
-            
env.getCheckpointConfig().setAlignmentTimeout(Duration.ofMillis(alignmentTimeout));
+            env.getCheckpointConfig()
+                    
.setAlignedCheckpointTimeout(Duration.ofMillis(alignmentTimeout));
             
env.getCheckpointConfig().setCheckpointTimeout(checkpointTimeout.toMillis());
             env.getCheckpointConfig()
                     
.setTolerableCheckpointFailureNumber(tolerableCheckpointFailures);

Reply via email to