This is an automated email from the ASF dual-hosted git repository. zakelly pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit f3239e09c2bef6fa00599ffa084895d16777f7f8 Author: Zakelly <zakelly....@gmail.com> AuthorDate: Wed Sep 18 15:51:59 2024 +0800 [FLINK-34511][Checkpoint] Cleanup `CheckpointConfig` --- .../reference/pyflink.datastream/checkpoint.rst | 2 + .../pyflink/datastream/checkpoint_config.py | 43 ++++++++++-- .../api/environment/CheckpointConfig.java | 76 ---------------------- ...heckpointExceptionHandlerConfigurationTest.java | 51 --------------- .../checkpointing/IgnoreInFlightDataITCase.java | 2 +- .../UnalignedCheckpointCompatibilityITCase.java | 2 +- .../checkpointing/UnalignedCheckpointTestBase.java | 3 +- 7 files changed, 45 insertions(+), 134 deletions(-) diff --git a/flink-python/docs/reference/pyflink.datastream/checkpoint.rst b/flink-python/docs/reference/pyflink.datastream/checkpoint.rst index d8abff3e2b7..66b0a4de4a4 100644 --- a/flink-python/docs/reference/pyflink.datastream/checkpoint.rst +++ b/flink-python/docs/reference/pyflink.datastream/checkpoint.rst @@ -50,6 +50,8 @@ Configuration that captures all checkpointing related settings. CheckpointConfig.is_unaligned_checkpoints_enabled CheckpointConfig.enable_unaligned_checkpoints CheckpointConfig.disable_unaligned_checkpoints + CheckpointConfig.set_aligned_checkpoint_timeout + CheckpointConfig.get_aligned_checkpoint_timeout CheckpointConfig.set_alignment_timeout CheckpointConfig.get_alignment_timeout CheckpointConfig.set_force_unaligned_checkpoints diff --git a/flink-python/pyflink/datastream/checkpoint_config.py b/flink-python/pyflink/datastream/checkpoint_config.py index 2beae636f7e..4fb0a34da84 100644 --- a/flink-python/pyflink/datastream/checkpoint_config.py +++ b/flink-python/pyflink/datastream/checkpoint_config.py @@ -166,8 +166,10 @@ class CheckpointConfig(object): decline the failed checkpoint. :return: ``True`` if failing on checkpointing errors, false otherwise. + + .. note:: Deprecated in 2.0. Use :func:`get_tolerable_checkpoint_failure_number` instead. """ - return self._j_checkpoint_config.isFailOnCheckpointingErrors() + return self.get_tolerable_checkpoint_failure_number() == 0 def set_fail_on_checkpointing_errors(self, fail_on_checkpointing_errors: bool) -> 'CheckpointConfig': @@ -184,8 +186,13 @@ class CheckpointConfig(object): :param fail_on_checkpointing_errors: ``True`` if failing on checkpointing errors, false otherwise. + + .. note:: Deprecated in 2.0. Use :func:`set_tolerable_checkpoint_failure_number` instead. """ - self._j_checkpoint_config.setFailOnCheckpointingErrors(fail_on_checkpointing_errors) + if fail_on_checkpointing_errors: + self.set_tolerable_checkpoint_failure_number(0) + else: + self.set_tolerable_checkpoint_failure_number(2147483647) return self def get_tolerable_checkpoint_failure_number(self) -> int: @@ -316,6 +323,30 @@ class CheckpointConfig(object): self.enable_unaligned_checkpoints(False) return self + def set_aligned_checkpoint_timeout(self, alignment_timeout: Duration) -> 'CheckpointConfig': + """ + Only relevant if :func:`enable_unaligned_checkpoints` is enabled. + + If ``alignment_timeout`` has value equal to ``0``, checkpoints will always start unaligned. + If ``alignment_timeout`` has value greater then ``0``, checkpoints will start aligned. If + during checkpointing, checkpoint start delay exceeds this ``alignment_timeout``, alignment + will timeout and checkpoint will start working as unaligned checkpoint. + + :param alignment_timeout: The duration until the aligned checkpoint will be converted into + an unaligned checkpoint. + """ + self._j_checkpoint_config.setAlignedCheckpointTimeout(alignment_timeout._j_duration) + return self + + def get_aligned_checkpoint_timeout(self) -> 'Duration': + """ + Returns the alignment timeout, as configured via :func:`set_alignment_timeout` or + ``org.apache.flink.configuration.CheckpointingOptions#ALIGNED_CHECKPOINT_TIMEOUT``. + + :return: the alignment timeout. + """ + return Duration(self._j_checkpoint_config.getAlignedCheckpointTimeout()) + def set_alignment_timeout(self, alignment_timeout: Duration) -> 'CheckpointConfig': """ Only relevant if :func:`enable_unaligned_checkpoints` is enabled. @@ -327,8 +358,10 @@ class CheckpointConfig(object): :param alignment_timeout: The duration until the aligned checkpoint will be converted into an unaligned checkpoint. + + .. note:: Deprecated in 2.0. Use :func:`set_aligned_checkpoint_timeout` instead. """ - self._j_checkpoint_config.setAlignmentTimeout(alignment_timeout._j_duration) + self.set_aligned_checkpoint_timeout(alignment_timeout) return self def get_alignment_timeout(self) -> 'Duration': @@ -337,8 +370,10 @@ class CheckpointConfig(object): ``org.apache.flink.configuration.CheckpointingOptions#ALIGNED_CHECKPOINT_TIMEOUT``. :return: the alignment timeout. + + .. note:: Deprecated in 2.0. Use :func:`get_aligned_checkpoint_timeout` instead. """ - return Duration(self._j_checkpoint_config.getAlignmentTimeout()) + return self.get_aligned_checkpoint_timeout() def set_force_unaligned_checkpoints( self, diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java index 7c45701ed08..62d8b5bad26 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java @@ -37,7 +37,6 @@ import org.slf4j.LoggerFactory; import java.time.Duration; -import static org.apache.flink.runtime.checkpoint.CheckpointFailureManager.UNLIMITED_TOLERABLE_FAILURE_NUMBER; import static org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration.MINIMAL_CHECKPOINT_TIME; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -351,51 +350,6 @@ public class CheckpointConfig implements java.io.Serializable { configuration.set(CheckpointingOptions.FORCE_UNALIGNED, forceUnalignedCheckpoints); } - /** - * This determines the behaviour when meeting checkpoint errors. If this returns true, which is - * equivalent to get tolerableCheckpointFailureNumber as zero, job manager would fail the whole - * job once it received a decline checkpoint message. If this returns false, which is equivalent - * to get tolerableCheckpointFailureNumber as the maximum of integer (means unlimited), job - * manager would not fail the whole job no matter how many declined checkpoints it received. - * - * @deprecated Use {@link #getTolerableCheckpointFailureNumber()}. - */ - @Deprecated - public boolean isFailOnCheckpointingErrors() { - return getTolerableCheckpointFailureNumber() == 0; - } - - /** - * Sets the expected behaviour for tasks in case that they encounter an error when - * checkpointing. If this is set as true, which is equivalent to set - * tolerableCheckpointFailureNumber as zero, job manager would fail the whole job once it - * received a decline checkpoint message. If this is set as false, which is equivalent to set - * tolerableCheckpointFailureNumber as the maximum of integer (means unlimited), job manager - * would not fail the whole job no matter how many declined checkpoints it received. - * - * <p>{@link #setTolerableCheckpointFailureNumber(int)} would always overrule this deprecated - * method if they have conflicts. - * - * @deprecated Use {@link #setTolerableCheckpointFailureNumber(int)}. - */ - @Deprecated - public void setFailOnCheckpointingErrors(boolean failOnCheckpointingErrors) { - if (configuration.getOptional(CheckpointingOptions.TOLERABLE_FAILURE_NUMBER).isPresent()) { - LOG.warn( - "Since CheckpointingOptions.TOLERABLE_FAILURE_NUMBER has been configured as {}, deprecated " - + "#setFailOnCheckpointingErrors(boolean) method would not take any effect and please use " - + "#setTolerableCheckpointFailureNumber(int) method to " - + "determine your expected behaviour when checkpoint errors on task side.", - getTolerableCheckpointFailureNumber()); - return; - } - if (failOnCheckpointingErrors) { - setTolerableCheckpointFailureNumber(0); - } else { - setTolerableCheckpointFailureNumber(UNLIMITED_TOLERABLE_FAILURE_NUMBER); - } - } - /** * Get the defined number of consecutive checkpoint failures that will be tolerated, before the * whole job is failed over. @@ -515,36 +469,6 @@ public class CheckpointConfig implements java.io.Serializable { return configuration.get(CheckpointingOptions.ENABLE_UNALIGNED_INTERRUPTIBLE_TIMERS); } - /** - * Only relevant if {@link #isUnalignedCheckpointsEnabled} is enabled. - * - * <p>If {@link CheckpointingOptions#ALIGNED_CHECKPOINT_TIMEOUT} has value equal to <code>0 - * </code>, checkpoints will always start unaligned. - * - * <p>If {@link CheckpointingOptions#ALIGNED_CHECKPOINT_TIMEOUT} has value greater then <code>0 - * </code>, checkpoints will start aligned. If during checkpointing, checkpoint start delay - * exceeds this {@link CheckpointingOptions#ALIGNED_CHECKPOINT_TIMEOUT}, alignment will timeout - * and checkpoint will start working as unaligned checkpoint. - * - * @deprecated Use {@link #setAlignedCheckpointTimeout(Duration)} instead. - */ - @Deprecated - @PublicEvolving - public void setAlignmentTimeout(Duration alignmentTimeout) { - setAlignedCheckpointTimeout(alignmentTimeout); - } - - /** - * @return value of alignment timeout, as configured via {@link #setAlignmentTimeout(Duration)} - * or {@link CheckpointingOptions#ALIGNED_CHECKPOINT_TIMEOUT}. - * @deprecated User {@link #getAlignedCheckpointTimeout()} instead. - */ - @Deprecated - @PublicEvolving - public Duration getAlignmentTimeout() { - return getAlignedCheckpointTimeout(); - } - /** * @return value of alignment timeout, as configured via {@link * #setAlignedCheckpointTimeout(Duration)} or {@link diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandlerConfigurationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandlerConfigurationTest.java index 6284c0b8854..ca83757125d 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandlerConfigurationTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandlerConfigurationTest.java @@ -18,11 +18,8 @@ package org.apache.flink.streaming.runtime.tasks; -import org.apache.flink.runtime.checkpoint.CheckpointFailureManager; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink; -import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.junit.jupiter.api.Test; @@ -38,7 +35,6 @@ class CheckpointExceptionHandlerConfigurationTest { StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); CheckpointConfig checkpointConfig = streamExecutionEnvironment.getCheckpointConfig(); - assertThat(checkpointConfig.isFailOnCheckpointingErrors()).isTrue(); assertThat(checkpointConfig.getTolerableCheckpointFailureNumber()).isZero(); } @@ -48,54 +44,7 @@ class CheckpointExceptionHandlerConfigurationTest { StreamExecutionEnvironment.getExecutionEnvironment(); CheckpointConfig checkpointConfig = streamExecutionEnvironment.getCheckpointConfig(); - // use deprecated API to set not fail on checkpoint errors - checkpointConfig.setFailOnCheckpointingErrors(false); - assertThat(checkpointConfig.isFailOnCheckpointingErrors()).isFalse(); - assertThat(checkpointConfig.getTolerableCheckpointFailureNumber()) - .isEqualTo(CheckpointFailureManager.UNLIMITED_TOLERABLE_FAILURE_NUMBER); - - // use new API to set tolerable declined checkpoint number checkpointConfig.setTolerableCheckpointFailureNumber(5); assertThat(checkpointConfig.getTolerableCheckpointFailureNumber()).isEqualTo(5); - - // after we configure the tolerable declined checkpoint number, deprecated API would not - // take effect - checkpointConfig.setFailOnCheckpointingErrors(true); - assertThat(checkpointConfig.getTolerableCheckpointFailureNumber()).isEqualTo(5); - } - - @Test - void testPropagationFailFromCheckpointConfig() { - try { - doTestPropagationFromCheckpointConfig(true); - } catch (IllegalArgumentException ignored) { - // ignored - } - } - - @Test - void testPropagationDeclineFromCheckpointConfig() { - doTestPropagationFromCheckpointConfig(false); - } - - public void doTestPropagationFromCheckpointConfig(boolean failTaskOnCheckpointErrors) { - StreamExecutionEnvironment streamExecutionEnvironment = - StreamExecutionEnvironment.getExecutionEnvironment(); - streamExecutionEnvironment.setParallelism(1); - streamExecutionEnvironment.getCheckpointConfig().setCheckpointInterval(1000); - streamExecutionEnvironment - .getCheckpointConfig() - .setFailOnCheckpointingErrors(failTaskOnCheckpointErrors); - streamExecutionEnvironment - .addSource( - new SourceFunction<Integer>() { - - @Override - public void run(SourceContext<Integer> ctx) {} - - @Override - public void cancel() {} - }) - .sinkTo(new DiscardingSink<>()); } } diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/IgnoreInFlightDataITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/IgnoreInFlightDataITCase.java index 739300b319a..064f05f7d26 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/IgnoreInFlightDataITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/IgnoreInFlightDataITCase.java @@ -122,7 +122,7 @@ public class IgnoreInFlightDataITCase extends TestLogger { env.enableCheckpointing(checkpointInterval *= 2); env.disableOperatorChaining(); env.getCheckpointConfig().enableUnalignedCheckpoints(); - env.getCheckpointConfig().setAlignmentTimeout(Duration.ZERO); + env.getCheckpointConfig().setAlignedCheckpointTimeout(Duration.ZERO); env.getCheckpointConfig().setCheckpointingConsistencyMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setCheckpointIdOfIgnoredInFlightData(1); RestartStrategyUtils.configureFixedDelayRestartStrategy(env, 1, 0L); diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointCompatibilityITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointCompatibilityITCase.java index db21bd203f9..4e4d0a325c0 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointCompatibilityITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointCompatibilityITCase.java @@ -199,7 +199,7 @@ public class UnalignedCheckpointCompatibilityITCase extends TestLogger { env.setParallelism(PARALLELISM); RestartStrategyUtils.configureNoRestartStrategy(env); env.getCheckpointConfig().enableUnalignedCheckpoints(!isAligned); - env.getCheckpointConfig().setAlignmentTimeout(Duration.ZERO); + env.getCheckpointConfig().setAlignedCheckpointTimeout(Duration.ZERO); env.getCheckpointConfig().setExternalizedCheckpointRetention(RETAIN_ON_CANCELLATION); if (checkpointingInterval > 0) { env.enableCheckpointing(checkpointingInterval); diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java index 46cdb2bf19b..a264c94bd3f 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java @@ -741,7 +741,8 @@ public abstract class UnalignedCheckpointTestBase extends TestLogger { public void configure(StreamExecutionEnvironment env) { env.enableCheckpointing(Math.max(100L, parallelism * 50L)); - env.getCheckpointConfig().setAlignmentTimeout(Duration.ofMillis(alignmentTimeout)); + env.getCheckpointConfig() + .setAlignedCheckpointTimeout(Duration.ofMillis(alignmentTimeout)); env.getCheckpointConfig().setCheckpointTimeout(checkpointTimeout.toMillis()); env.getCheckpointConfig() .setTolerableCheckpointFailureNumber(tolerableCheckpointFailures);