This is an automated email from the ASF dual-hosted git repository. zakelly pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit a438f93dd0c83e3de3deb99456b7b499cbdd1a3c Author: spoon-lz <971066...@qq.com> AuthorDate: Mon Mar 25 15:56:25 2024 +0800 [FLINK-34615]Renamed class "ExternalizedCheckpointCleanup" to "ExternalizedCheckpointRetention" in Java section. --- .../datastream/fault-tolerance/checkpointing.md | 10 +++++----- docs/content.zh/docs/ops/state/checkpoints.md | 8 ++++---- .../datastream/fault-tolerance/checkpointing.md | 10 +++++----- docs/content/docs/ops/state/checkpoints.md | 8 ++++---- .../reader/CoordinatedSourceRescaleITCase.java | 6 +++--- ...p.java => ExternalizedCheckpointRetention.java} | 4 ++-- .../tests/DataStreamAllroundTestJobFactory.java | 10 +++++----- .../StickyAllocationAndLocalRecoveryTestJob.java | 6 +++--- .../api/environment/CheckpointConfig.java | 20 +++++++++----------- .../environment/ExecutionCheckpointingOptions.java | 10 +++++----- .../api/graph/StreamingJobGraphGenerator.java | 4 ++-- .../CheckpointConfigFromConfigurationTest.java | 22 ++++++++++------------ .../ChangelogLocalRecoveryITCase.java | 6 +++--- .../ChangelogRecoveryRescaleITCase.java | 6 +++--- .../ChangelogRecoverySwitchStateBackendITCase.java | 10 +++++----- .../test/checkpointing/RegionFailoverITCase.java | 6 +++--- .../RescaleCheckpointManuallyITCase.java | 6 +++--- .../checkpointing/RestoreUpgradedJobITCase.java | 6 +++--- .../ResumeCheckpointManuallyITCase.java | 6 +++--- .../flink/test/checkpointing/SavepointITCase.java | 6 +++--- .../UnalignedCheckpointCompatibilityITCase.java | 4 ++-- .../UnalignedCheckpointStressITCase.java | 6 +++--- .../checkpointing/UnalignedCheckpointTestBase.java | 6 +++--- .../test/state/ChangelogCompatibilityITCase.java | 5 ++--- .../test/state/ChangelogRecoveryCachingITCase.java | 2 +- .../flink/test/state/ChangelogRescalingITCase.java | 2 +- 26 files changed, 95 insertions(+), 100 deletions(-) diff --git a/docs/content.zh/docs/dev/datastream/fault-tolerance/checkpointing.md b/docs/content.zh/docs/dev/datastream/fault-tolerance/checkpointing.md index 9acbdd4c324..70cb1d5b95d 100644 --- a/docs/content.zh/docs/dev/datastream/fault-tolerance/checkpointing.md +++ b/docs/content.zh/docs/dev/datastream/fault-tolerance/checkpointing.md @@ -107,8 +107,8 @@ env.getCheckpointConfig().setTolerableCheckpointFailureNumber(2); env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // 使用 externalized checkpoints,这样 checkpoint 在作业取消后仍就会被保留 -env.getCheckpointConfig().setExternalizedCheckpointCleanup( - ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); +env.getCheckpointConfig().setExternalizedCheckpointRetention( + ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION); // 开启实验性的 unaligned checkpoints env.getCheckpointConfig().enableUnalignedCheckpoints(); @@ -139,8 +139,8 @@ env.getCheckpointConfig().setTolerableCheckpointFailureNumber(2) env.getCheckpointConfig.setMaxConcurrentCheckpoints(1) // 使用 externalized checkpoints,这样 checkpoint 在作业取消后仍就会被保留 -env.getCheckpointConfig().setExternalizedCheckpointCleanup( - ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) +env.getCheckpointConfig().setExternalizedCheckpointRetention( + ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION) // 开启实验性的 unaligned checkpoints env.getCheckpointConfig.enableUnalignedCheckpoints() @@ -172,7 +172,7 @@ env.get_checkpoint_config().set_max_concurrent_checkpoints(1) # 使用 externalized checkpoints,这样 checkpoint 在作业取消后仍就会被保留 env.get_checkpoint_config().enable_externalized_checkpoints( - ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) + ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION) # 开启实验性的 unaligned checkpoints env.get_checkpoint_config().enable_unaligned_checkpoints() diff --git a/docs/content.zh/docs/ops/state/checkpoints.md b/docs/content.zh/docs/ops/state/checkpoints.md index 12a010af09b..e45e9077af1 100644 --- a/docs/content.zh/docs/ops/state/checkpoints.md +++ b/docs/content.zh/docs/ops/state/checkpoints.md @@ -91,12 +91,12 @@ Checkpoint 在默认的情况下仅用于恢复失败的作业,并不保留, ```java CheckpointConfig config = env.getCheckpointConfig(); -config.setExternalizedCheckpointCleanup(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); +config.setExternalizedCheckpointRetention(ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION); ``` -`ExternalizedCheckpointCleanup` 配置项定义了当作业取消时,对作业 checkpoint 的操作: -- **`ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION`**:当作业取消时,保留作业的 checkpoint。注意,这种情况下,需要手动清除该作业保留的 checkpoint。 -- **`ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION`**:当作业取消时,删除作业的 checkpoint。仅当作业失败时,作业的 checkpoint 才会被保留。 +`ExternalizedCheckpointRetention` 配置项定义了当作业取消时,对作业 checkpoint 的操作: +- **`ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION`**:当作业取消时,保留作业的 checkpoint。注意,这种情况下,需要手动清除该作业保留的 checkpoint。 +- **`ExternalizedCheckpointRetention.DELETE_ON_CANCELLATION`**:当作业取消时,删除作业的 checkpoint。仅当作业失败时,作业的 checkpoint 才会被保留。 ### 目录结构 diff --git a/docs/content/docs/dev/datastream/fault-tolerance/checkpointing.md b/docs/content/docs/dev/datastream/fault-tolerance/checkpointing.md index b8eb5888fa8..b4e8c126f52 100644 --- a/docs/content/docs/dev/datastream/fault-tolerance/checkpointing.md +++ b/docs/content/docs/dev/datastream/fault-tolerance/checkpointing.md @@ -116,8 +116,8 @@ env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // enable externalized checkpoints which are retained // after job cancellation -env.getCheckpointConfig().setExternalizedCheckpointCleanup( - ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); +env.getCheckpointConfig().setExternalizedCheckpointRetention( + ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION); // enables the unaligned checkpoints env.getCheckpointConfig().enableUnalignedCheckpoints(); @@ -160,8 +160,8 @@ env.getCheckpointConfig.setMaxConcurrentCheckpoints(1) // enable externalized checkpoints which are retained // after job cancellation -env.getCheckpointConfig().setExternalizedCheckpointCleanup( - ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) +env.getCheckpointConfig().setExternalizedCheckpointRetention( + ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION) // enables the unaligned checkpoints env.getCheckpointConfig.enableUnalignedCheckpoints() @@ -200,7 +200,7 @@ env.get_checkpoint_config().set_tolerable_checkpoint_failure_number(2) env.get_checkpoint_config().set_max_concurrent_checkpoints(1) # enable externalized checkpoints which are retained after job cancellation -env.get_checkpoint_config().enable_externalized_checkpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) +env.get_checkpoint_config().enable_externalized_checkpoints(ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION) # enables the unaligned checkpoints env.get_checkpoint_config().enable_unaligned_checkpoints() diff --git a/docs/content/docs/ops/state/checkpoints.md b/docs/content/docs/ops/state/checkpoints.md index f316845e115..db6a8b186b4 100644 --- a/docs/content/docs/ops/state/checkpoints.md +++ b/docs/content/docs/ops/state/checkpoints.md @@ -98,14 +98,14 @@ This way, you will have a checkpoint around to resume from if your job fails. ```java CheckpointConfig config = env.getCheckpointConfig(); -config.setExternalizedCheckpointCleanup(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); +config.setExternalizedCheckpointRetention(ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION); ``` -The `ExternalizedCheckpointCleanup` mode configures what happens with checkpoints when you cancel the job: +The `ExternalizedCheckpointRetention` mode configures what happens with checkpoints when you cancel the job: -- **`ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION`**: Retain the checkpoint when the job is cancelled. Note that you have to manually clean up the checkpoint state after cancellation in this case. +- **`ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION`**: Retain the checkpoint when the job is cancelled. Note that you have to manually clean up the checkpoint state after cancellation in this case. -- **`ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION`**: Delete the checkpoint when the job is cancelled. The checkpoint state will only be available if the job fails. +- **`ExternalizedCheckpointRetention.DELETE_ON_CANCELLATION`**: Delete the checkpoint when the job is cancelled. The checkpoint state will only be available if the job fails. ### Directory Structure diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceRescaleITCase.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceRescaleITCase.java index e9483a0911e..6a805a22d1a 100644 --- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceRescaleITCase.java +++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceRescaleITCase.java @@ -22,7 +22,7 @@ import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.state.CheckpointListener; import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.ExternalizedCheckpointCleanup; +import org.apache.flink.configuration.ExternalizedCheckpointRetention; import org.apache.flink.configuration.MemorySize; import org.apache.flink.configuration.StateRecoveryOptions; import org.apache.flink.configuration.TaskManagerOptions; @@ -122,8 +122,8 @@ public class CoordinatedSourceRescaleITCase extends TestLogger { env.setParallelism(p); env.enableCheckpointing(100); env.getCheckpointConfig() - .setExternalizedCheckpointCleanupRetention( - ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); + .setExternalizedCheckpointRetention( + ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION); env.setRestartStrategy(RestartStrategies.noRestart()); DataStream<Long> stream = env.fromSequence(0, Long.MAX_VALUE); diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ExternalizedCheckpointCleanup.java b/flink-core/src/main/java/org/apache/flink/configuration/ExternalizedCheckpointRetention.java similarity index 95% rename from flink-core/src/main/java/org/apache/flink/configuration/ExternalizedCheckpointCleanup.java rename to flink-core/src/main/java/org/apache/flink/configuration/ExternalizedCheckpointRetention.java index 5c56b76251e..8edaa1f42ea 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/ExternalizedCheckpointCleanup.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ExternalizedCheckpointRetention.java @@ -27,7 +27,7 @@ import static org.apache.flink.configuration.description.TextElement.text; /** Cleanup behaviour for externalized checkpoints when the job is cancelled. */ @PublicEvolving -public enum ExternalizedCheckpointCleanup implements DescribedEnum { +public enum ExternalizedCheckpointRetention implements DescribedEnum { /** * Delete externalized checkpoints on job cancellation. @@ -61,7 +61,7 @@ public enum ExternalizedCheckpointCleanup implements DescribedEnum { private final InlineElement description; - ExternalizedCheckpointCleanup(InlineElement description) { + ExternalizedCheckpointRetention(InlineElement description) { this.description = description; } diff --git a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java index b2ee4ec8b61..e14ad4a6041 100644 --- a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java +++ b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java @@ -32,7 +32,7 @@ import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; -import org.apache.flink.configuration.ExternalizedCheckpointCleanup; +import org.apache.flink.configuration.ExternalizedCheckpointRetention; import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend; import org.apache.flink.core.execution.CheckpointingMode; import org.apache.flink.runtime.state.hashmap.HashMapStateBackend; @@ -303,20 +303,20 @@ public class DataStreamAllroundTestJobFactory { ENVIRONMENT_EXTERNALIZE_CHECKPOINT_CLEANUP.key(), ENVIRONMENT_EXTERNALIZE_CHECKPOINT_CLEANUP.defaultValue()); - ExternalizedCheckpointCleanup cleanupMode; + ExternalizedCheckpointRetention cleanupMode; switch (cleanupModeConfig) { case "retain": - cleanupMode = ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION; + cleanupMode = ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION; break; case "delete": - cleanupMode = ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION; + cleanupMode = ExternalizedCheckpointRetention.DELETE_ON_CANCELLATION; break; default: throw new IllegalArgumentException( "Unknown clean up mode for externalized checkpoints: " + cleanupModeConfig); } - env.getCheckpointConfig().setExternalizedCheckpointCleanupRetention(cleanupMode); + env.getCheckpointConfig().setExternalizedCheckpointRetention(cleanupMode); final int tolerableDeclinedCheckpointNumber = pt.getInt( diff --git a/flink-end-to-end-tests/flink-local-recovery-and-allocation-test/src/main/java/org/apache/flink/streaming/tests/StickyAllocationAndLocalRecoveryTestJob.java b/flink-end-to-end-tests/flink-local-recovery-and-allocation-test/src/main/java/org/apache/flink/streaming/tests/StickyAllocationAndLocalRecoveryTestJob.java index 545e3e7a136..65e331e7814 100644 --- a/flink-end-to-end-tests/flink-local-recovery-and-allocation-test/src/main/java/org/apache/flink/streaming/tests/StickyAllocationAndLocalRecoveryTestJob.java +++ b/flink-end-to-end-tests/flink-local-recovery-and-allocation-test/src/main/java/org/apache/flink/streaming/tests/StickyAllocationAndLocalRecoveryTestJob.java @@ -28,7 +28,7 @@ import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.utils.ParameterTool; -import org.apache.flink.configuration.ExternalizedCheckpointCleanup; +import org.apache.flink.configuration.ExternalizedCheckpointRetention; import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; @@ -93,8 +93,8 @@ public class StickyAllocationAndLocalRecoveryTestJob { Integer.MAX_VALUE, pt.getInt("restartDelay", 0))); if (pt.getBoolean("externalizedCheckpoints", false)) { env.getCheckpointConfig() - .setExternalizedCheckpointCleanupRetention( - ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); + .setExternalizedCheckpointRetention( + ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION); } String checkpointDir = pt.getRequired("checkpointDir"); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java index c786e1befb1..413c77e837b 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java @@ -26,6 +26,7 @@ import org.apache.flink.api.common.JobStatus; import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.DescribedEnum; +import org.apache.flink.configuration.ExternalizedCheckpointRetention; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.configuration.StateRecoveryOptions; import org.apache.flink.configuration.description.InlineElement; @@ -544,7 +545,7 @@ public class CheckpointConfig implements java.io.Serializable { * CheckpointingOptions#CHECKPOINTS_DIRECTORY}. * * @param cleanupMode Externalized checkpoint clean-up behaviour. - * @deprecated Use {@link #setExternalizedCheckpointCleanupRetention} instead. + * @deprecated Use {@link #setExternalizedCheckpointRetention} instead. */ @Deprecated @PublicEvolving @@ -555,7 +556,7 @@ public class CheckpointConfig implements java.io.Serializable { /** * Sets the mode for externalized checkpoint clean-up. Externalized checkpoints will be enabled * automatically unless the mode is set to {@link - * org.apache.flink.configuration.ExternalizedCheckpointCleanup#NO_EXTERNALIZED_CHECKPOINTS}. + * ExternalizedCheckpointRetention#NO_EXTERNALIZED_CHECKPOINTS}. * * <p>Externalized checkpoints write their meta data out to persistent storage and are * <strong>not</strong> automatically cleaned up when the owning job fails or is suspended @@ -563,7 +564,7 @@ public class CheckpointConfig implements java.io.Serializable { * this case, you have to manually clean up the checkpoint state, both the meta data and actual * program state. * - * <p>The {@link ExternalizedCheckpointCleanup} mode defines how an externalized checkpoint + * <p>The {@link ExternalizedCheckpointRetention} mode defines how an externalized checkpoint * should be cleaned up on job cancellation. If you choose to retain externalized checkpoints on * cancellation you have to handle checkpoint clean-up manually when you cancel the job as well * (terminating with job status {@link JobStatus#CANCELED}). @@ -574,8 +575,7 @@ public class CheckpointConfig implements java.io.Serializable { * @param cleanupMode Externalized checkpoint clean-up behaviour. */ @PublicEvolving - public void setExternalizedCheckpointCleanupRetention( - org.apache.flink.configuration.ExternalizedCheckpointCleanup cleanupMode) { + public void setExternalizedCheckpointRetention(ExternalizedCheckpointRetention cleanupMode) { configuration.set( ExecutionCheckpointingOptions.EXTERNALIZED_CHECKPOINT_RETENTION, cleanupMode); } @@ -782,7 +782,7 @@ public class CheckpointConfig implements java.io.Serializable { * * @return The cleanup behaviour for externalized checkpoints or <code>null</code> if none is * configured. - * @deprecated Use {@link #getExternalizedCheckpointCleanupRetention} instead. + * @deprecated Use {@link #getExternalizedCheckpointRetention} instead. */ @Deprecated @PublicEvolving @@ -797,8 +797,7 @@ public class CheckpointConfig implements java.io.Serializable { * configured. */ @PublicEvolving - public org.apache.flink.configuration.ExternalizedCheckpointCleanup - getExternalizedCheckpointCleanupRetention() { + public ExternalizedCheckpointRetention getExternalizedCheckpointRetention() { return configuration.get(ExecutionCheckpointingOptions.EXTERNALIZED_CHECKPOINT_RETENTION); } @@ -977,8 +976,7 @@ public class CheckpointConfig implements java.io.Serializable { /** * Cleanup behaviour for externalized checkpoints when the job is cancelled. * - * @deprecated This class has been moved to {@link - * org.apache.flink.configuration.ExternalizedCheckpointCleanup}. + * @deprecated This class has been moved to {@link ExternalizedCheckpointRetention}. */ @Deprecated @PublicEvolving @@ -1073,7 +1071,7 @@ public class CheckpointConfig implements java.io.Serializable { .ifPresent(this::setExternalizedCheckpointCleanup); configuration .getOptional(ExecutionCheckpointingOptions.EXTERNALIZED_CHECKPOINT_RETENTION) - .ifPresent(this::setExternalizedCheckpointCleanupRetention); + .ifPresent(this::setExternalizedCheckpointRetention); configuration .getOptional(ExecutionCheckpointingOptions.ENABLE_UNALIGNED) .ifPresent(this::enableUnalignedCheckpoints); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java index 8d0c8016193..ab64fefb51a 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java @@ -24,8 +24,7 @@ import org.apache.flink.annotation.docs.Documentation; import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; -import org.apache.flink.configuration.ExternalizedCheckpointCleanup; -import org.apache.flink.configuration.StateRecoveryOptions; +import org.apache.flink.configuration.ExternalizedCheckpointRetention; import org.apache.flink.configuration.description.Description; import org.apache.flink.configuration.description.TextElement; import org.apache.flink.core.execution.CheckpointingMode; @@ -144,11 +143,12 @@ public class ExecutionCheckpointingOptions { .key())) .build()); - public static final ConfigOption<ExternalizedCheckpointCleanup> + public static final ConfigOption<ExternalizedCheckpointRetention> EXTERNALIZED_CHECKPOINT_RETENTION = ConfigOptions.key("execution.checkpointing.externalized-checkpoint-retention") - .enumType(ExternalizedCheckpointCleanup.class) - .defaultValue(ExternalizedCheckpointCleanup.NO_EXTERNALIZED_CHECKPOINTS) + .enumType(ExternalizedCheckpointRetention.class) + .defaultValue( + ExternalizedCheckpointRetention.NO_EXTERNALIZED_CHECKPOINTS) .withDescription( Description.builder() .text( diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java index 67049f94b47..dd0d045b509 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java @@ -29,7 +29,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ExecutionOptions; -import org.apache.flink.configuration.ExternalizedCheckpointCleanup; +import org.apache.flink.configuration.ExternalizedCheckpointRetention; import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.configuration.StateChangelogOptions; import org.apache.flink.core.execution.CheckpointingMode; @@ -1957,7 +1957,7 @@ public class StreamingJobGraphGenerator { CheckpointRetentionPolicy retentionAfterTermination; if (cfg.isExternalizedCheckpointsEnabled()) { - ExternalizedCheckpointCleanup cleanup = cfg.getExternalizedCheckpointCleanupRetention(); + ExternalizedCheckpointRetention cleanup = cfg.getExternalizedCheckpointRetention(); // Sanity check if (cleanup == null) { throw new IllegalStateException( diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/CheckpointConfigFromConfigurationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/CheckpointConfigFromConfigurationTest.java index ddb6813059c..40c4131204e 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/CheckpointConfigFromConfigurationTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/CheckpointConfigFromConfigurationTest.java @@ -20,7 +20,7 @@ package org.apache.flink.streaming.api.environment; import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.ExternalizedCheckpointCleanup; +import org.apache.flink.configuration.ExternalizedCheckpointRetention; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.core.execution.CheckpointingMode; import org.apache.flink.runtime.state.CheckpointStorage; @@ -106,14 +106,14 @@ public class CheckpointConfigFromConfigurationTest { .nonDefaultValue( CheckpointConfig.ExternalizedCheckpointCleanup .DELETE_ON_CANCELLATION), - TestSpec.testValue(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) + TestSpec.testValue(ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION) .whenSetFromFile( "execution.checkpointing.externalized-checkpoint-retention", "RETAIN_ON_CANCELLATION") - .viaSetter(CheckpointConfig::setExternalizedCheckpointCleanupRetention) - .getterVia(CheckpointConfig::getExternalizedCheckpointCleanupRetention) - .nonDefaultValue(ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION), - TestSpec.testValue(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) + .viaSetter(CheckpointConfig::setExternalizedCheckpointRetention) + .getterVia(CheckpointConfig::getExternalizedCheckpointRetention) + .nonDefaultValue(ExternalizedCheckpointRetention.DELETE_ON_CANCELLATION), + TestSpec.testValue(ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION) .whenSetFromFile( "execution.checkpointing.externalized-checkpoint-retention", "RETAIN_ON_CANCELLATION") @@ -123,8 +123,8 @@ public class CheckpointConfigFromConfigurationTest { CheckpointConfig.ExternalizedCheckpointCleanup.valueOf( v.name())); }) - .getterVia(CheckpointConfig::getExternalizedCheckpointCleanupRetention) - .nonDefaultValue(ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION), + .getterVia(CheckpointConfig::getExternalizedCheckpointRetention) + .nonDefaultValue(ExternalizedCheckpointRetention.DELETE_ON_CANCELLATION), TestSpec.testValue( CheckpointConfig.ExternalizedCheckpointCleanup .RETAIN_ON_CANCELLATION) @@ -133,10 +133,8 @@ public class CheckpointConfigFromConfigurationTest { "RETAIN_ON_CANCELLATION") .viaSetter( (config, v) -> { - config.setExternalizedCheckpointCleanupRetention( - org.apache.flink.configuration - .ExternalizedCheckpointCleanup.valueOf( - v.name())); + config.setExternalizedCheckpointRetention( + ExternalizedCheckpointRetention.valueOf(v.name())); }) .getterVia(CheckpointConfig::getExternalizedCheckpointCleanup) .nonDefaultValue( diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogLocalRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogLocalRecoveryITCase.java index c58dba04973..6350009a3d3 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogLocalRecoveryITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogLocalRecoveryITCase.java @@ -21,7 +21,7 @@ import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.changelog.fs.FsStateChangelogStorageFactory; import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.ExternalizedCheckpointCleanup; +import org.apache.flink.configuration.ExternalizedCheckpointRetention; import org.apache.flink.configuration.StateChangelogOptions; import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend; import org.apache.flink.runtime.jobgraph.JobGraph; @@ -176,8 +176,8 @@ public class ChangelogLocalRecoveryITCase extends TestLogger { Duration.ofMillis(materializationInterval)) .set(StateChangelogOptions.MATERIALIZATION_MAX_FAILURES_ALLOWED, 1)); env.getCheckpointConfig() - .setExternalizedCheckpointCleanupRetention( - ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); + .setExternalizedCheckpointRetention( + ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION); Configuration configuration = new Configuration(); configuration.set(CheckpointingOptions.MAX_RETAINED_CHECKPOINTS, 1); env.configure(configuration); diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogRecoveryRescaleITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogRecoveryRescaleITCase.java index f358194ad4d..a0a305a4eba 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogRecoveryRescaleITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogRecoveryRescaleITCase.java @@ -17,7 +17,7 @@ package org.apache.flink.test.checkpointing; -import org.apache.flink.configuration.ExternalizedCheckpointCleanup; +import org.apache.flink.configuration.ExternalizedCheckpointRetention; import org.apache.flink.runtime.state.AbstractStateBackend; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -46,8 +46,8 @@ public class ChangelogRecoveryRescaleITCase extends ChangelogRecoverySwitchEnvTe private StreamExecutionEnvironment getEnv(int parallelism) { StreamExecutionEnvironment env = getEnv(delegatedStateBackend, 50, 0, 20, 0); env.getCheckpointConfig() - .setExternalizedCheckpointCleanupRetention( - ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); + .setExternalizedCheckpointRetention( + ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION); env.setParallelism(parallelism); return env; } diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogRecoverySwitchStateBackendITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogRecoverySwitchStateBackendITCase.java index cbaf766067e..7c9254d5ab6 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogRecoverySwitchStateBackendITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogRecoverySwitchStateBackendITCase.java @@ -20,7 +20,7 @@ package org.apache.flink.test.checkpointing; import org.apache.flink.changelog.fs.FsStateChangelogStorageFactory; import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.ExternalizedCheckpointCleanup; +import org.apache.flink.configuration.ExternalizedCheckpointRetention; import org.apache.flink.configuration.MemorySize; import org.apache.flink.core.execution.RestoreMode; import org.apache.flink.runtime.jobgraph.JobGraph; @@ -148,8 +148,8 @@ public class ChangelogRecoverySwitchStateBackendITCase extends ChangelogRecovery env.enableChangelogStateBackend(enableChangelog); env.setParallelism(parallelism); env.getCheckpointConfig() - .setExternalizedCheckpointCleanupRetention( - ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); + .setExternalizedCheckpointRetention( + ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION); return env; } @@ -169,8 +169,8 @@ public class ChangelogRecoverySwitchStateBackendITCase extends ChangelogRecovery 0); env.enableChangelogStateBackend(changelogEnabled); env.getCheckpointConfig() - .setExternalizedCheckpointCleanupRetention( - ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); + .setExternalizedCheckpointRetention( + ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION); return env; } diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RegionFailoverITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RegionFailoverITCase.java index 9eee0b5eb21..eb342b1b9f2 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RegionFailoverITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RegionFailoverITCase.java @@ -30,7 +30,7 @@ import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.client.program.ClusterClient; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.ExternalizedCheckpointCleanup; +import org.apache.flink.configuration.ExternalizedCheckpointRetention; import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.core.execution.CheckpointingMode; @@ -174,8 +174,8 @@ public class RegionFailoverITCase extends TestLogger { env.setMaxParallelism(MAX_PARALLELISM); env.enableCheckpointing(200, CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig() - .setExternalizedCheckpointCleanupRetention( - ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); + .setExternalizedCheckpointRetention( + ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION); env.disableOperatorChaining(); // Use DataStreamUtils#reinterpretAsKeyed to avoid merge regions and this stream graph would diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescaleCheckpointManuallyITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescaleCheckpointManuallyITCase.java index 1c9802f23d0..868889edb1a 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescaleCheckpointManuallyITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescaleCheckpointManuallyITCase.java @@ -27,7 +27,7 @@ import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.ExternalizedCheckpointCleanup; +import org.apache.flink.configuration.ExternalizedCheckpointRetention; import org.apache.flink.configuration.StateBackendOptions; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.minicluster.MiniCluster; @@ -240,8 +240,8 @@ public class RescaleCheckpointManuallyITCase extends TestLogger { env.enableCheckpointing(checkpointingInterval); env.getCheckpointConfig().setCheckpointStorage(temporaryFolder.newFolder().toURI()); env.getCheckpointConfig() - .setExternalizedCheckpointCleanupRetention( - ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); + .setExternalizedCheckpointRetention( + ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION); env.setRestartStrategy(RestartStrategies.noRestart()); env.getConfig().setUseSnapshotCompression(true); diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RestoreUpgradedJobITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RestoreUpgradedJobITCase.java index 00adb15b5a9..ed850297a9d 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RestoreUpgradedJobITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RestoreUpgradedJobITCase.java @@ -24,7 +24,7 @@ import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.ExternalizedCheckpointCleanup; +import org.apache.flink.configuration.ExternalizedCheckpointRetention; import org.apache.flink.configuration.StateRecoveryOptions; import org.apache.flink.core.execution.JobClient; import org.apache.flink.core.execution.SavepointFormatType; @@ -172,8 +172,8 @@ public class RestoreUpgradedJobITCase extends TestLogger { conf.set(CheckpointingOptions.FILE_MERGING_ENABLED, false); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf); env.getCheckpointConfig() - .setExternalizedCheckpointCleanupRetention( - ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); + .setExternalizedCheckpointRetention( + ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION); env.getCheckpointConfig().enableUnalignedCheckpoints(false); env.getCheckpointConfig() .setCheckpointStorage("file://" + temporaryFolder.getRoot().getAbsolutePath()); diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java index d470b6e367f..a7c8d4f3f89 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java @@ -29,7 +29,7 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.changelog.fs.FsStateChangelogStorageFactory; import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.ExternalizedCheckpointCleanup; +import org.apache.flink.configuration.ExternalizedCheckpointRetention; import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.configuration.StateRecoveryOptions; import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; @@ -430,8 +430,8 @@ public class ResumeCheckpointManuallyITCase extends TestLogger { env.setStateBackend(backend); env.setParallelism(PARALLELISM); env.getCheckpointConfig() - .setExternalizedCheckpointCleanupRetention( - ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); + .setExternalizedCheckpointRetention( + ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION); env.setRestartStrategy(RestartStrategies.noRestart()); env.addSource(new NotifyingInfiniteTupleSource(10_000)) diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java index 7b094a183d9..c8ed8d2807f 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java @@ -38,7 +38,7 @@ import org.apache.flink.client.program.rest.RestClusterClient; import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.configuration.ClusterOptions; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.ExternalizedCheckpointCleanup; +import org.apache.flink.configuration.ExternalizedCheckpointRetention; import org.apache.flink.configuration.MemorySize; import org.apache.flink.configuration.StateBackendOptions; import org.apache.flink.configuration.TaskManagerOptions; @@ -468,8 +468,8 @@ public class SavepointITCase extends TestLogger { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStateBackend(new EmbeddedRocksDBStateBackend(true)); env.getCheckpointConfig() - .setExternalizedCheckpointCleanupRetention( - ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); + .setExternalizedCheckpointRetention( + ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION); env.getCheckpointConfig().setCheckpointStorage(folder.newFolder().toURI()); env.setParallelism(parallelism); diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointCompatibilityITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointCompatibilityITCase.java index 5995589eb73..40d740f8636 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointCompatibilityITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointCompatibilityITCase.java @@ -58,7 +58,7 @@ import java.util.stream.Stream; import static java.util.Collections.emptyMap; import static org.apache.flink.configuration.CheckpointingOptions.CHECKPOINTS_DIRECTORY; import static org.apache.flink.configuration.CheckpointingOptions.MAX_RETAINED_CHECKPOINTS; -import static org.apache.flink.configuration.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION; +import static org.apache.flink.configuration.ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION; import static org.apache.flink.runtime.checkpoint.CheckpointType.CHECKPOINT; import static org.apache.flink.runtime.testutils.CommonTestUtils.waitForAllTaskRunning; import static org.apache.flink.util.Preconditions.checkState; @@ -200,7 +200,7 @@ public class UnalignedCheckpointCompatibilityITCase extends TestLogger { env.setRestartStrategy(new RestartStrategies.NoRestartStrategyConfiguration()); env.getCheckpointConfig().enableUnalignedCheckpoints(!isAligned); env.getCheckpointConfig().setAlignmentTimeout(Duration.ZERO); - env.getCheckpointConfig().setExternalizedCheckpointCleanupRetention(RETAIN_ON_CANCELLATION); + env.getCheckpointConfig().setExternalizedCheckpointRetention(RETAIN_ON_CANCELLATION); if (checkpointingInterval > 0) { env.enableCheckpointing(checkpointingInterval); } diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointStressITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointStressITCase.java index bf7e017d627..bbdeee91276 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointStressITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointStressITCase.java @@ -27,7 +27,7 @@ import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.time.Deadline; import org.apache.flink.changelog.fs.FsStateChangelogStorageFactory; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.ExternalizedCheckpointCleanup; +import org.apache.flink.configuration.ExternalizedCheckpointRetention; import org.apache.flink.runtime.io.network.logger.NetworkActionsLogger; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; @@ -303,8 +303,8 @@ class UnalignedCheckpointStressITCase { env.getCheckpointConfig().enableUnalignedCheckpoints(); env.setRestartStrategy(RestartStrategies.noRestart()); env.getCheckpointConfig() - .setExternalizedCheckpointCleanupRetention( - ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION); + .setExternalizedCheckpointRetention( + ExternalizedCheckpointRetention.DELETE_ON_CANCELLATION); return env; } diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java index 481a9b1d308..188bc9fdbc3 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java @@ -44,7 +44,7 @@ import org.apache.flink.api.connector.source.SplitsAssignment; import org.apache.flink.changelog.fs.FsStateChangelogStorageFactory; import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.ExternalizedCheckpointCleanup; +import org.apache.flink.configuration.ExternalizedCheckpointRetention; import org.apache.flink.configuration.MemorySize; import org.apache.flink.configuration.NettyShuffleEnvironmentOptions; import org.apache.flink.configuration.RpcOptions; @@ -763,8 +763,8 @@ public abstract class UnalignedCheckpointTestBase extends TestLogger { env.getCheckpointConfig().setForceUnalignedCheckpoints(true); if (generateCheckpoint) { env.getCheckpointConfig() - .setExternalizedCheckpointCleanupRetention( - ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); + .setExternalizedCheckpointRetention( + ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION); } } diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogCompatibilityITCase.java b/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogCompatibilityITCase.java index b158682fa95..3844194821a 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogCompatibilityITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogCompatibilityITCase.java @@ -47,7 +47,7 @@ import java.util.Optional; import static org.apache.flink.configuration.CheckpointingOptions.CHECKPOINTS_DIRECTORY; import static org.apache.flink.configuration.CheckpointingOptions.FILE_MERGING_ENABLED; import static org.apache.flink.configuration.CheckpointingOptions.SAVEPOINT_DIRECTORY; -import static org.apache.flink.configuration.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION; +import static org.apache.flink.configuration.ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION; import static org.apache.flink.runtime.jobgraph.SavepointRestoreSettings.forPath; import static org.apache.flink.runtime.testutils.CommonTestUtils.waitForAllTaskRunning; import static org.apache.flink.runtime.testutils.CommonTestUtils.waitForCheckpoint; @@ -143,8 +143,7 @@ public class ChangelogCompatibilityITCase { env.enableChangelogStateBackend(testCase.startWithChangelog); if (testCase.restoreSource == RestoreSource.CHECKPOINT) { env.enableCheckpointing(50); - env.getCheckpointConfig() - .setExternalizedCheckpointCleanupRetention(RETAIN_ON_CANCELLATION); + env.getCheckpointConfig().setExternalizedCheckpointRetention(RETAIN_ON_CANCELLATION); } return env; } diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogRecoveryCachingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogRecoveryCachingITCase.java index 55191ef960a..af2cc09222c 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogRecoveryCachingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogRecoveryCachingITCase.java @@ -64,7 +64,7 @@ import static org.apache.flink.configuration.CheckpointingOptions.CHECKPOINTS_DI import static org.apache.flink.configuration.CheckpointingOptions.CHECKPOINT_STORAGE; import static org.apache.flink.configuration.CheckpointingOptions.FILE_MERGING_ENABLED; import static org.apache.flink.configuration.CoreOptions.DEFAULT_PARALLELISM; -import static org.apache.flink.configuration.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION; +import static org.apache.flink.configuration.ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION; import static org.apache.flink.configuration.RestartStrategyOptions.RESTART_STRATEGY; import static org.apache.flink.configuration.StateBackendOptions.STATE_BACKEND; import static org.apache.flink.configuration.StateChangelogOptions.ENABLE_STATE_CHANGE_LOG; diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogRescalingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogRescalingITCase.java index 4637dd5002d..2ddf3681a63 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogRescalingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogRescalingITCase.java @@ -73,7 +73,7 @@ import static org.apache.flink.configuration.CheckpointingOptions.CHECKPOINT_STO import static org.apache.flink.configuration.CheckpointingOptions.FILE_MERGING_ENABLED; import static org.apache.flink.configuration.CheckpointingOptions.FS_SMALL_FILE_THRESHOLD; import static org.apache.flink.configuration.CoreOptions.DEFAULT_PARALLELISM; -import static org.apache.flink.configuration.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION; +import static org.apache.flink.configuration.ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION; import static org.apache.flink.configuration.PipelineOptions.OBJECT_REUSE; import static org.apache.flink.configuration.RestartStrategyOptions.RESTART_STRATEGY; import static org.apache.flink.configuration.StateBackendOptions.STATE_BACKEND;