This is an automated email from the ASF dual-hosted git repository.

zakelly pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a438f93dd0c83e3de3deb99456b7b499cbdd1a3c
Author: spoon-lz <971066...@qq.com>
AuthorDate: Mon Mar 25 15:56:25 2024 +0800

    [FLINK-34615]Renamed class "ExternalizedCheckpointCleanup" to 
"ExternalizedCheckpointRetention" in Java section.
---
 .../datastream/fault-tolerance/checkpointing.md    | 10 +++++-----
 docs/content.zh/docs/ops/state/checkpoints.md      |  8 ++++----
 .../datastream/fault-tolerance/checkpointing.md    | 10 +++++-----
 docs/content/docs/ops/state/checkpoints.md         |  8 ++++----
 .../reader/CoordinatedSourceRescaleITCase.java     |  6 +++---
 ...p.java => ExternalizedCheckpointRetention.java} |  4 ++--
 .../tests/DataStreamAllroundTestJobFactory.java    | 10 +++++-----
 .../StickyAllocationAndLocalRecoveryTestJob.java   |  6 +++---
 .../api/environment/CheckpointConfig.java          | 20 +++++++++-----------
 .../environment/ExecutionCheckpointingOptions.java | 10 +++++-----
 .../api/graph/StreamingJobGraphGenerator.java      |  4 ++--
 .../CheckpointConfigFromConfigurationTest.java     | 22 ++++++++++------------
 .../ChangelogLocalRecoveryITCase.java              |  6 +++---
 .../ChangelogRecoveryRescaleITCase.java            |  6 +++---
 .../ChangelogRecoverySwitchStateBackendITCase.java | 10 +++++-----
 .../test/checkpointing/RegionFailoverITCase.java   |  6 +++---
 .../RescaleCheckpointManuallyITCase.java           |  6 +++---
 .../checkpointing/RestoreUpgradedJobITCase.java    |  6 +++---
 .../ResumeCheckpointManuallyITCase.java            |  6 +++---
 .../flink/test/checkpointing/SavepointITCase.java  |  6 +++---
 .../UnalignedCheckpointCompatibilityITCase.java    |  4 ++--
 .../UnalignedCheckpointStressITCase.java           |  6 +++---
 .../checkpointing/UnalignedCheckpointTestBase.java |  6 +++---
 .../test/state/ChangelogCompatibilityITCase.java   |  5 ++---
 .../test/state/ChangelogRecoveryCachingITCase.java |  2 +-
 .../flink/test/state/ChangelogRescalingITCase.java |  2 +-
 26 files changed, 95 insertions(+), 100 deletions(-)

diff --git 
a/docs/content.zh/docs/dev/datastream/fault-tolerance/checkpointing.md 
b/docs/content.zh/docs/dev/datastream/fault-tolerance/checkpointing.md
index 9acbdd4c324..70cb1d5b95d 100644
--- a/docs/content.zh/docs/dev/datastream/fault-tolerance/checkpointing.md
+++ b/docs/content.zh/docs/dev/datastream/fault-tolerance/checkpointing.md
@@ -107,8 +107,8 @@ 
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(2);
 env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
 
 // 使用 externalized checkpoints,这样 checkpoint 在作业取消后仍就会被保留
-env.getCheckpointConfig().setExternalizedCheckpointCleanup(
-        ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
+env.getCheckpointConfig().setExternalizedCheckpointRetention(
+        ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION);
 
 // 开启实验性的 unaligned checkpoints
 env.getCheckpointConfig().enableUnalignedCheckpoints();
@@ -139,8 +139,8 @@ 
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(2)
 env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
 
 // 使用 externalized checkpoints,这样 checkpoint 在作业取消后仍就会被保留
-env.getCheckpointConfig().setExternalizedCheckpointCleanup(
-  ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
+env.getCheckpointConfig().setExternalizedCheckpointRetention(
+  ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION)
 
 // 开启实验性的 unaligned checkpoints
 env.getCheckpointConfig.enableUnalignedCheckpoints()
@@ -172,7 +172,7 @@ 
env.get_checkpoint_config().set_max_concurrent_checkpoints(1)
 
 # 使用 externalized checkpoints,这样 checkpoint 在作业取消后仍就会被保留
 env.get_checkpoint_config().enable_externalized_checkpoints(
-    ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
+    ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION)
     
 # 开启实验性的 unaligned checkpoints
 env.get_checkpoint_config().enable_unaligned_checkpoints()
diff --git a/docs/content.zh/docs/ops/state/checkpoints.md 
b/docs/content.zh/docs/ops/state/checkpoints.md
index 12a010af09b..e45e9077af1 100644
--- a/docs/content.zh/docs/ops/state/checkpoints.md
+++ b/docs/content.zh/docs/ops/state/checkpoints.md
@@ -91,12 +91,12 @@ Checkpoint 在默认的情况下仅用于恢复失败的作业,并不保留,
 
 ```java
 CheckpointConfig config = env.getCheckpointConfig();
-config.setExternalizedCheckpointCleanup(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
+config.setExternalizedCheckpointRetention(ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION);
 ```
 
-`ExternalizedCheckpointCleanup` 配置项定义了当作业取消时,对作业 checkpoint 的操作:
-- **`ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION`**:当作业取消时,保留作业的 
checkpoint。注意,这种情况下,需要手动清除该作业保留的 checkpoint。
-- **`ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION`**:当作业取消时,删除作业的 
checkpoint。仅当作业失败时,作业的 checkpoint 才会被保留。
+`ExternalizedCheckpointRetention` 配置项定义了当作业取消时,对作业 checkpoint 的操作:
+- **`ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION`**:当作业取消时,保留作业的 
checkpoint。注意,这种情况下,需要手动清除该作业保留的 checkpoint。
+- **`ExternalizedCheckpointRetention.DELETE_ON_CANCELLATION`**:当作业取消时,删除作业的 
checkpoint。仅当作业失败时,作业的 checkpoint 才会被保留。
 
 ### 目录结构
 
diff --git a/docs/content/docs/dev/datastream/fault-tolerance/checkpointing.md 
b/docs/content/docs/dev/datastream/fault-tolerance/checkpointing.md
index b8eb5888fa8..b4e8c126f52 100644
--- a/docs/content/docs/dev/datastream/fault-tolerance/checkpointing.md
+++ b/docs/content/docs/dev/datastream/fault-tolerance/checkpointing.md
@@ -116,8 +116,8 @@ env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
 
 // enable externalized checkpoints which are retained
 // after job cancellation
-env.getCheckpointConfig().setExternalizedCheckpointCleanup(
-    ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
+env.getCheckpointConfig().setExternalizedCheckpointRetention(
+    ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION);
 
 // enables the unaligned checkpoints
 env.getCheckpointConfig().enableUnalignedCheckpoints();
@@ -160,8 +160,8 @@ env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
 
 // enable externalized checkpoints which are retained 
 // after job cancellation
-env.getCheckpointConfig().setExternalizedCheckpointCleanup(
-    ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
+env.getCheckpointConfig().setExternalizedCheckpointRetention(
+    ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION)
 
 // enables the unaligned checkpoints
 env.getCheckpointConfig.enableUnalignedCheckpoints()
@@ -200,7 +200,7 @@ 
env.get_checkpoint_config().set_tolerable_checkpoint_failure_number(2)
 env.get_checkpoint_config().set_max_concurrent_checkpoints(1)
 
 # enable externalized checkpoints which are retained after job cancellation
-env.get_checkpoint_config().enable_externalized_checkpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
+env.get_checkpoint_config().enable_externalized_checkpoints(ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION)
 
 # enables the unaligned checkpoints
 env.get_checkpoint_config().enable_unaligned_checkpoints()
diff --git a/docs/content/docs/ops/state/checkpoints.md 
b/docs/content/docs/ops/state/checkpoints.md
index f316845e115..db6a8b186b4 100644
--- a/docs/content/docs/ops/state/checkpoints.md
+++ b/docs/content/docs/ops/state/checkpoints.md
@@ -98,14 +98,14 @@ This way, you will have a checkpoint around to resume from 
if your job fails.
 
 ```java
 CheckpointConfig config = env.getCheckpointConfig();
-config.setExternalizedCheckpointCleanup(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
+config.setExternalizedCheckpointRetention(ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION);
 ```
 
-The `ExternalizedCheckpointCleanup` mode configures what happens with 
checkpoints when you cancel the job:
+The `ExternalizedCheckpointRetention` mode configures what happens with 
checkpoints when you cancel the job:
 
-- **`ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION`**: Retain the 
checkpoint when the job is cancelled. Note that you have to manually clean up 
the checkpoint state after cancellation in this case.
+- **`ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION`**: Retain the 
checkpoint when the job is cancelled. Note that you have to manually clean up 
the checkpoint state after cancellation in this case.
 
-- **`ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION`**: Delete the 
checkpoint when the job is cancelled. The checkpoint state will only be 
available if the job fails.
+- **`ExternalizedCheckpointRetention.DELETE_ON_CANCELLATION`**: Delete the 
checkpoint when the job is cancelled. The checkpoint state will only be 
available if the job fails.
 
 ### Directory Structure
 
diff --git 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceRescaleITCase.java
 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceRescaleITCase.java
index e9483a0911e..6a805a22d1a 100644
--- 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceRescaleITCase.java
+++ 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceRescaleITCase.java
@@ -22,7 +22,7 @@ import 
org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.state.CheckpointListener;
 import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.ExternalizedCheckpointCleanup;
+import org.apache.flink.configuration.ExternalizedCheckpointRetention;
 import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.StateRecoveryOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
@@ -122,8 +122,8 @@ public class CoordinatedSourceRescaleITCase extends 
TestLogger {
         env.setParallelism(p);
         env.enableCheckpointing(100);
         env.getCheckpointConfig()
-                .setExternalizedCheckpointCleanupRetention(
-                        ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
+                .setExternalizedCheckpointRetention(
+                        
ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION);
         env.setRestartStrategy(RestartStrategies.noRestart());
 
         DataStream<Long> stream = env.fromSequence(0, Long.MAX_VALUE);
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/ExternalizedCheckpointCleanup.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/ExternalizedCheckpointRetention.java
similarity index 95%
rename from 
flink-core/src/main/java/org/apache/flink/configuration/ExternalizedCheckpointCleanup.java
rename to 
flink-core/src/main/java/org/apache/flink/configuration/ExternalizedCheckpointRetention.java
index 5c56b76251e..8edaa1f42ea 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/ExternalizedCheckpointCleanup.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/ExternalizedCheckpointRetention.java
@@ -27,7 +27,7 @@ import static 
org.apache.flink.configuration.description.TextElement.text;
 
 /** Cleanup behaviour for externalized checkpoints when the job is cancelled. 
*/
 @PublicEvolving
-public enum ExternalizedCheckpointCleanup implements DescribedEnum {
+public enum ExternalizedCheckpointRetention implements DescribedEnum {
 
     /**
      * Delete externalized checkpoints on job cancellation.
@@ -61,7 +61,7 @@ public enum ExternalizedCheckpointCleanup implements 
DescribedEnum {
 
     private final InlineElement description;
 
-    ExternalizedCheckpointCleanup(InlineElement description) {
+    ExternalizedCheckpointRetention(InlineElement description) {
         this.description = description;
     }
 
diff --git 
a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java
 
b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java
index b2ee4ec8b61..e14ad4a6041 100644
--- 
a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java
+++ 
b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java
@@ -32,7 +32,7 @@ import 
org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
-import org.apache.flink.configuration.ExternalizedCheckpointCleanup;
+import org.apache.flink.configuration.ExternalizedCheckpointRetention;
 import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
 import org.apache.flink.core.execution.CheckpointingMode;
 import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
@@ -303,20 +303,20 @@ public class DataStreamAllroundTestJobFactory {
                             ENVIRONMENT_EXTERNALIZE_CHECKPOINT_CLEANUP.key(),
                             
ENVIRONMENT_EXTERNALIZE_CHECKPOINT_CLEANUP.defaultValue());
 
-            ExternalizedCheckpointCleanup cleanupMode;
+            ExternalizedCheckpointRetention cleanupMode;
             switch (cleanupModeConfig) {
                 case "retain":
-                    cleanupMode = 
ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION;
+                    cleanupMode = 
ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION;
                     break;
                 case "delete":
-                    cleanupMode = 
ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION;
+                    cleanupMode = 
ExternalizedCheckpointRetention.DELETE_ON_CANCELLATION;
                     break;
                 default:
                     throw new IllegalArgumentException(
                             "Unknown clean up mode for externalized 
checkpoints: "
                                     + cleanupModeConfig);
             }
-            
env.getCheckpointConfig().setExternalizedCheckpointCleanupRetention(cleanupMode);
+            
env.getCheckpointConfig().setExternalizedCheckpointRetention(cleanupMode);
 
             final int tolerableDeclinedCheckpointNumber =
                     pt.getInt(
diff --git 
a/flink-end-to-end-tests/flink-local-recovery-and-allocation-test/src/main/java/org/apache/flink/streaming/tests/StickyAllocationAndLocalRecoveryTestJob.java
 
b/flink-end-to-end-tests/flink-local-recovery-and-allocation-test/src/main/java/org/apache/flink/streaming/tests/StickyAllocationAndLocalRecoveryTestJob.java
index 545e3e7a136..65e331e7814 100644
--- 
a/flink-end-to-end-tests/flink-local-recovery-and-allocation-test/src/main/java/org/apache/flink/streaming/tests/StickyAllocationAndLocalRecoveryTestJob.java
+++ 
b/flink-end-to-end-tests/flink-local-recovery-and-allocation-test/src/main/java/org/apache/flink/streaming/tests/StickyAllocationAndLocalRecoveryTestJob.java
@@ -28,7 +28,7 @@ import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.utils.ParameterTool;
-import org.apache.flink.configuration.ExternalizedCheckpointCleanup;
+import org.apache.flink.configuration.ExternalizedCheckpointRetention;
 import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
@@ -93,8 +93,8 @@ public class StickyAllocationAndLocalRecoveryTestJob {
                         Integer.MAX_VALUE, pt.getInt("restartDelay", 0)));
         if (pt.getBoolean("externalizedCheckpoints", false)) {
             env.getCheckpointConfig()
-                    .setExternalizedCheckpointCleanupRetention(
-                            
ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
+                    .setExternalizedCheckpointRetention(
+                            
ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION);
         }
 
         String checkpointDir = pt.getRequired("checkpointDir");
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
index c786e1befb1..413c77e837b 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DescribedEnum;
+import org.apache.flink.configuration.ExternalizedCheckpointRetention;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.configuration.StateRecoveryOptions;
 import org.apache.flink.configuration.description.InlineElement;
@@ -544,7 +545,7 @@ public class CheckpointConfig implements 
java.io.Serializable {
      * CheckpointingOptions#CHECKPOINTS_DIRECTORY}.
      *
      * @param cleanupMode Externalized checkpoint clean-up behaviour.
-     * @deprecated Use {@link #setExternalizedCheckpointCleanupRetention} 
instead.
+     * @deprecated Use {@link #setExternalizedCheckpointRetention} instead.
      */
     @Deprecated
     @PublicEvolving
@@ -555,7 +556,7 @@ public class CheckpointConfig implements 
java.io.Serializable {
     /**
      * Sets the mode for externalized checkpoint clean-up. Externalized 
checkpoints will be enabled
      * automatically unless the mode is set to {@link
-     * 
org.apache.flink.configuration.ExternalizedCheckpointCleanup#NO_EXTERNALIZED_CHECKPOINTS}.
+     * ExternalizedCheckpointRetention#NO_EXTERNALIZED_CHECKPOINTS}.
      *
      * <p>Externalized checkpoints write their meta data out to persistent 
storage and are
      * <strong>not</strong> automatically cleaned up when the owning job fails 
or is suspended
@@ -563,7 +564,7 @@ public class CheckpointConfig implements 
java.io.Serializable {
      * this case, you have to manually clean up the checkpoint state, both the 
meta data and actual
      * program state.
      *
-     * <p>The {@link ExternalizedCheckpointCleanup} mode defines how an 
externalized checkpoint
+     * <p>The {@link ExternalizedCheckpointRetention} mode defines how an 
externalized checkpoint
      * should be cleaned up on job cancellation. If you choose to retain 
externalized checkpoints on
      * cancellation you have to handle checkpoint clean-up manually when you 
cancel the job as well
      * (terminating with job status {@link JobStatus#CANCELED}).
@@ -574,8 +575,7 @@ public class CheckpointConfig implements 
java.io.Serializable {
      * @param cleanupMode Externalized checkpoint clean-up behaviour.
      */
     @PublicEvolving
-    public void setExternalizedCheckpointCleanupRetention(
-            org.apache.flink.configuration.ExternalizedCheckpointCleanup 
cleanupMode) {
+    public void 
setExternalizedCheckpointRetention(ExternalizedCheckpointRetention cleanupMode) 
{
         configuration.set(
                 
ExecutionCheckpointingOptions.EXTERNALIZED_CHECKPOINT_RETENTION, cleanupMode);
     }
@@ -782,7 +782,7 @@ public class CheckpointConfig implements 
java.io.Serializable {
      *
      * @return The cleanup behaviour for externalized checkpoints or 
<code>null</code> if none is
      *     configured.
-     * @deprecated Use {@link #getExternalizedCheckpointCleanupRetention} 
instead.
+     * @deprecated Use {@link #getExternalizedCheckpointRetention} instead.
      */
     @Deprecated
     @PublicEvolving
@@ -797,8 +797,7 @@ public class CheckpointConfig implements 
java.io.Serializable {
      *     configured.
      */
     @PublicEvolving
-    public org.apache.flink.configuration.ExternalizedCheckpointCleanup
-            getExternalizedCheckpointCleanupRetention() {
+    public ExternalizedCheckpointRetention 
getExternalizedCheckpointRetention() {
         return 
configuration.get(ExecutionCheckpointingOptions.EXTERNALIZED_CHECKPOINT_RETENTION);
     }
 
@@ -977,8 +976,7 @@ public class CheckpointConfig implements 
java.io.Serializable {
     /**
      * Cleanup behaviour for externalized checkpoints when the job is 
cancelled.
      *
-     * @deprecated This class has been moved to {@link
-     *     org.apache.flink.configuration.ExternalizedCheckpointCleanup}.
+     * @deprecated This class has been moved to {@link 
ExternalizedCheckpointRetention}.
      */
     @Deprecated
     @PublicEvolving
@@ -1073,7 +1071,7 @@ public class CheckpointConfig implements 
java.io.Serializable {
                 .ifPresent(this::setExternalizedCheckpointCleanup);
         configuration
                 
.getOptional(ExecutionCheckpointingOptions.EXTERNALIZED_CHECKPOINT_RETENTION)
-                .ifPresent(this::setExternalizedCheckpointCleanupRetention);
+                .ifPresent(this::setExternalizedCheckpointRetention);
         configuration
                 .getOptional(ExecutionCheckpointingOptions.ENABLE_UNALIGNED)
                 .ifPresent(this::enableUnalignedCheckpoints);
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java
index 8d0c8016193..ab64fefb51a 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java
@@ -24,8 +24,7 @@ import org.apache.flink.annotation.docs.Documentation;
 import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
-import org.apache.flink.configuration.ExternalizedCheckpointCleanup;
-import org.apache.flink.configuration.StateRecoveryOptions;
+import org.apache.flink.configuration.ExternalizedCheckpointRetention;
 import org.apache.flink.configuration.description.Description;
 import org.apache.flink.configuration.description.TextElement;
 import org.apache.flink.core.execution.CheckpointingMode;
@@ -144,11 +143,12 @@ public class ExecutionCheckpointingOptions {
                                                                     .key()))
                                             .build());
 
-    public static final ConfigOption<ExternalizedCheckpointCleanup>
+    public static final ConfigOption<ExternalizedCheckpointRetention>
             EXTERNALIZED_CHECKPOINT_RETENTION =
                     
ConfigOptions.key("execution.checkpointing.externalized-checkpoint-retention")
-                            .enumType(ExternalizedCheckpointCleanup.class)
-                            
.defaultValue(ExternalizedCheckpointCleanup.NO_EXTERNALIZED_CHECKPOINTS)
+                            .enumType(ExternalizedCheckpointRetention.class)
+                            .defaultValue(
+                                    
ExternalizedCheckpointRetention.NO_EXTERNALIZED_CHECKPOINTS)
                             .withDescription(
                                     Description.builder()
                                             .text(
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 67049f94b47..dd0d045b509 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -29,7 +29,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ExecutionOptions;
-import org.apache.flink.configuration.ExternalizedCheckpointCleanup;
+import org.apache.flink.configuration.ExternalizedCheckpointRetention;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.configuration.StateChangelogOptions;
 import org.apache.flink.core.execution.CheckpointingMode;
@@ -1957,7 +1957,7 @@ public class StreamingJobGraphGenerator {
 
         CheckpointRetentionPolicy retentionAfterTermination;
         if (cfg.isExternalizedCheckpointsEnabled()) {
-            ExternalizedCheckpointCleanup cleanup = 
cfg.getExternalizedCheckpointCleanupRetention();
+            ExternalizedCheckpointRetention cleanup = 
cfg.getExternalizedCheckpointRetention();
             // Sanity check
             if (cleanup == null) {
                 throw new IllegalStateException(
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/CheckpointConfigFromConfigurationTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/CheckpointConfigFromConfigurationTest.java
index ddb6813059c..40c4131204e 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/CheckpointConfigFromConfigurationTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/CheckpointConfigFromConfigurationTest.java
@@ -20,7 +20,7 @@ package org.apache.flink.streaming.api.environment;
 
 import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.ExternalizedCheckpointCleanup;
+import org.apache.flink.configuration.ExternalizedCheckpointRetention;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.core.execution.CheckpointingMode;
 import org.apache.flink.runtime.state.CheckpointStorage;
@@ -106,14 +106,14 @@ public class CheckpointConfigFromConfigurationTest {
                         .nonDefaultValue(
                                 CheckpointConfig.ExternalizedCheckpointCleanup
                                         .DELETE_ON_CANCELLATION),
-                
TestSpec.testValue(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
+                
TestSpec.testValue(ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION)
                         .whenSetFromFile(
                                 
"execution.checkpointing.externalized-checkpoint-retention",
                                 "RETAIN_ON_CANCELLATION")
-                        
.viaSetter(CheckpointConfig::setExternalizedCheckpointCleanupRetention)
-                        
.getterVia(CheckpointConfig::getExternalizedCheckpointCleanupRetention)
-                        
.nonDefaultValue(ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION),
-                
TestSpec.testValue(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
+                        
.viaSetter(CheckpointConfig::setExternalizedCheckpointRetention)
+                        
.getterVia(CheckpointConfig::getExternalizedCheckpointRetention)
+                        
.nonDefaultValue(ExternalizedCheckpointRetention.DELETE_ON_CANCELLATION),
+                
TestSpec.testValue(ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION)
                         .whenSetFromFile(
                                 
"execution.checkpointing.externalized-checkpoint-retention",
                                 "RETAIN_ON_CANCELLATION")
@@ -123,8 +123,8 @@ public class CheckpointConfigFromConfigurationTest {
                                             
CheckpointConfig.ExternalizedCheckpointCleanup.valueOf(
                                                     v.name()));
                                 })
-                        
.getterVia(CheckpointConfig::getExternalizedCheckpointCleanupRetention)
-                        
.nonDefaultValue(ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION),
+                        
.getterVia(CheckpointConfig::getExternalizedCheckpointRetention)
+                        
.nonDefaultValue(ExternalizedCheckpointRetention.DELETE_ON_CANCELLATION),
                 TestSpec.testValue(
                                 CheckpointConfig.ExternalizedCheckpointCleanup
                                         .RETAIN_ON_CANCELLATION)
@@ -133,10 +133,8 @@ public class CheckpointConfigFromConfigurationTest {
                                 "RETAIN_ON_CANCELLATION")
                         .viaSetter(
                                 (config, v) -> {
-                                    
config.setExternalizedCheckpointCleanupRetention(
-                                            org.apache.flink.configuration
-                                                    
.ExternalizedCheckpointCleanup.valueOf(
-                                                    v.name()));
+                                    config.setExternalizedCheckpointRetention(
+                                            
ExternalizedCheckpointRetention.valueOf(v.name()));
                                 })
                         
.getterVia(CheckpointConfig::getExternalizedCheckpointCleanup)
                         .nonDefaultValue(
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogLocalRecoveryITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogLocalRecoveryITCase.java
index c58dba04973..6350009a3d3 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogLocalRecoveryITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogLocalRecoveryITCase.java
@@ -21,7 +21,7 @@ import 
org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.changelog.fs.FsStateChangelogStorageFactory;
 import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.ExternalizedCheckpointCleanup;
+import org.apache.flink.configuration.ExternalizedCheckpointRetention;
 import org.apache.flink.configuration.StateChangelogOptions;
 import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -176,8 +176,8 @@ public class ChangelogLocalRecoveryITCase extends 
TestLogger {
                                 Duration.ofMillis(materializationInterval))
                         
.set(StateChangelogOptions.MATERIALIZATION_MAX_FAILURES_ALLOWED, 1));
         env.getCheckpointConfig()
-                .setExternalizedCheckpointCleanupRetention(
-                        ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
+                .setExternalizedCheckpointRetention(
+                        
ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION);
         Configuration configuration = new Configuration();
         configuration.set(CheckpointingOptions.MAX_RETAINED_CHECKPOINTS, 1);
         env.configure(configuration);
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogRecoveryRescaleITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogRecoveryRescaleITCase.java
index f358194ad4d..a0a305a4eba 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogRecoveryRescaleITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogRecoveryRescaleITCase.java
@@ -17,7 +17,7 @@
 
 package org.apache.flink.test.checkpointing;
 
-import org.apache.flink.configuration.ExternalizedCheckpointCleanup;
+import org.apache.flink.configuration.ExternalizedCheckpointRetention;
 import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 
@@ -46,8 +46,8 @@ public class ChangelogRecoveryRescaleITCase extends 
ChangelogRecoverySwitchEnvTe
     private StreamExecutionEnvironment getEnv(int parallelism) {
         StreamExecutionEnvironment env = getEnv(delegatedStateBackend, 50, 0, 
20, 0);
         env.getCheckpointConfig()
-                .setExternalizedCheckpointCleanupRetention(
-                        ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
+                .setExternalizedCheckpointRetention(
+                        
ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION);
         env.setParallelism(parallelism);
         return env;
     }
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogRecoverySwitchStateBackendITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogRecoverySwitchStateBackendITCase.java
index cbaf766067e..7c9254d5ab6 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogRecoverySwitchStateBackendITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogRecoverySwitchStateBackendITCase.java
@@ -20,7 +20,7 @@ package org.apache.flink.test.checkpointing;
 import org.apache.flink.changelog.fs.FsStateChangelogStorageFactory;
 import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.ExternalizedCheckpointCleanup;
+import org.apache.flink.configuration.ExternalizedCheckpointRetention;
 import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.core.execution.RestoreMode;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -148,8 +148,8 @@ public class ChangelogRecoverySwitchStateBackendITCase 
extends ChangelogRecovery
         env.enableChangelogStateBackend(enableChangelog);
         env.setParallelism(parallelism);
         env.getCheckpointConfig()
-                .setExternalizedCheckpointCleanupRetention(
-                        ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
+                .setExternalizedCheckpointRetention(
+                        
ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION);
         return env;
     }
 
@@ -169,8 +169,8 @@ public class ChangelogRecoverySwitchStateBackendITCase 
extends ChangelogRecovery
                         0);
         env.enableChangelogStateBackend(changelogEnabled);
         env.getCheckpointConfig()
-                .setExternalizedCheckpointCleanupRetention(
-                        ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
+                .setExternalizedCheckpointRetention(
+                        
ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION);
         return env;
     }
 
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RegionFailoverITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RegionFailoverITCase.java
index 9eee0b5eb21..eb342b1b9f2 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RegionFailoverITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RegionFailoverITCase.java
@@ -30,7 +30,7 @@ import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.ExternalizedCheckpointCleanup;
+import org.apache.flink.configuration.ExternalizedCheckpointRetention;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.core.execution.CheckpointingMode;
@@ -174,8 +174,8 @@ public class RegionFailoverITCase extends TestLogger {
         env.setMaxParallelism(MAX_PARALLELISM);
         env.enableCheckpointing(200, CheckpointingMode.EXACTLY_ONCE);
         env.getCheckpointConfig()
-                .setExternalizedCheckpointCleanupRetention(
-                        ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
+                .setExternalizedCheckpointRetention(
+                        
ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION);
         env.disableOperatorChaining();
 
         // Use DataStreamUtils#reinterpretAsKeyed to avoid merge regions and 
this stream graph would
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescaleCheckpointManuallyITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescaleCheckpointManuallyITCase.java
index 1c9802f23d0..868889edb1a 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescaleCheckpointManuallyITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescaleCheckpointManuallyITCase.java
@@ -27,7 +27,7 @@ import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.ExternalizedCheckpointCleanup;
+import org.apache.flink.configuration.ExternalizedCheckpointRetention;
 import org.apache.flink.configuration.StateBackendOptions;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.minicluster.MiniCluster;
@@ -240,8 +240,8 @@ public class RescaleCheckpointManuallyITCase extends 
TestLogger {
         env.enableCheckpointing(checkpointingInterval);
         
env.getCheckpointConfig().setCheckpointStorage(temporaryFolder.newFolder().toURI());
         env.getCheckpointConfig()
-                .setExternalizedCheckpointCleanupRetention(
-                        ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
+                .setExternalizedCheckpointRetention(
+                        
ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION);
         env.setRestartStrategy(RestartStrategies.noRestart());
         env.getConfig().setUseSnapshotCompression(true);
 
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RestoreUpgradedJobITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RestoreUpgradedJobITCase.java
index 00adb15b5a9..ed850297a9d 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RestoreUpgradedJobITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RestoreUpgradedJobITCase.java
@@ -24,7 +24,7 @@ import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.ExternalizedCheckpointCleanup;
+import org.apache.flink.configuration.ExternalizedCheckpointRetention;
 import org.apache.flink.configuration.StateRecoveryOptions;
 import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.core.execution.SavepointFormatType;
@@ -172,8 +172,8 @@ public class RestoreUpgradedJobITCase extends TestLogger {
         conf.set(CheckpointingOptions.FILE_MERGING_ENABLED, false);
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment(conf);
         env.getCheckpointConfig()
-                .setExternalizedCheckpointCleanupRetention(
-                        ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
+                .setExternalizedCheckpointRetention(
+                        
ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION);
         env.getCheckpointConfig().enableUnalignedCheckpoints(false);
         env.getCheckpointConfig()
                 .setCheckpointStorage("file://" + 
temporaryFolder.getRoot().getAbsolutePath());
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
index d470b6e367f..a7c8d4f3f89 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
@@ -29,7 +29,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.changelog.fs.FsStateChangelogStorageFactory;
 import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.ExternalizedCheckpointCleanup;
+import org.apache.flink.configuration.ExternalizedCheckpointRetention;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.StateRecoveryOptions;
 import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
@@ -430,8 +430,8 @@ public class ResumeCheckpointManuallyITCase extends 
TestLogger {
         env.setStateBackend(backend);
         env.setParallelism(PARALLELISM);
         env.getCheckpointConfig()
-                .setExternalizedCheckpointCleanupRetention(
-                        ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
+                .setExternalizedCheckpointRetention(
+                        
ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION);
         env.setRestartStrategy(RestartStrategies.noRestart());
 
         env.addSource(new NotifyingInfiniteTupleSource(10_000))
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index 7b094a183d9..c8ed8d2807f 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -38,7 +38,7 @@ import org.apache.flink.client.program.rest.RestClusterClient;
 import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.ClusterOptions;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.ExternalizedCheckpointCleanup;
+import org.apache.flink.configuration.ExternalizedCheckpointRetention;
 import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.StateBackendOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
@@ -468,8 +468,8 @@ public class SavepointITCase extends TestLogger {
         final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
         env.setStateBackend(new EmbeddedRocksDBStateBackend(true));
         env.getCheckpointConfig()
-                .setExternalizedCheckpointCleanupRetention(
-                        ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
+                .setExternalizedCheckpointRetention(
+                        
ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION);
         
env.getCheckpointConfig().setCheckpointStorage(folder.newFolder().toURI());
         env.setParallelism(parallelism);
 
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointCompatibilityITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointCompatibilityITCase.java
index 5995589eb73..40d740f8636 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointCompatibilityITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointCompatibilityITCase.java
@@ -58,7 +58,7 @@ import java.util.stream.Stream;
 import static java.util.Collections.emptyMap;
 import static 
org.apache.flink.configuration.CheckpointingOptions.CHECKPOINTS_DIRECTORY;
 import static 
org.apache.flink.configuration.CheckpointingOptions.MAX_RETAINED_CHECKPOINTS;
-import static 
org.apache.flink.configuration.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION;
+import static 
org.apache.flink.configuration.ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION;
 import static org.apache.flink.runtime.checkpoint.CheckpointType.CHECKPOINT;
 import static 
org.apache.flink.runtime.testutils.CommonTestUtils.waitForAllTaskRunning;
 import static org.apache.flink.util.Preconditions.checkState;
@@ -200,7 +200,7 @@ public class UnalignedCheckpointCompatibilityITCase extends 
TestLogger {
         env.setRestartStrategy(new 
RestartStrategies.NoRestartStrategyConfiguration());
         env.getCheckpointConfig().enableUnalignedCheckpoints(!isAligned);
         env.getCheckpointConfig().setAlignmentTimeout(Duration.ZERO);
-        
env.getCheckpointConfig().setExternalizedCheckpointCleanupRetention(RETAIN_ON_CANCELLATION);
+        
env.getCheckpointConfig().setExternalizedCheckpointRetention(RETAIN_ON_CANCELLATION);
         if (checkpointingInterval > 0) {
             env.enableCheckpointing(checkpointingInterval);
         }
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointStressITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointStressITCase.java
index bf7e017d627..bbdeee91276 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointStressITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointStressITCase.java
@@ -27,7 +27,7 @@ import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.time.Deadline;
 import org.apache.flink.changelog.fs.FsStateChangelogStorageFactory;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.ExternalizedCheckpointCleanup;
+import org.apache.flink.configuration.ExternalizedCheckpointRetention;
 import org.apache.flink.runtime.io.network.logger.NetworkActionsLogger;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
@@ -303,8 +303,8 @@ class UnalignedCheckpointStressITCase {
         env.getCheckpointConfig().enableUnalignedCheckpoints();
         env.setRestartStrategy(RestartStrategies.noRestart());
         env.getCheckpointConfig()
-                .setExternalizedCheckpointCleanupRetention(
-                        ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);
+                .setExternalizedCheckpointRetention(
+                        
ExternalizedCheckpointRetention.DELETE_ON_CANCELLATION);
         return env;
     }
 
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java
index 481a9b1d308..188bc9fdbc3 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java
@@ -44,7 +44,7 @@ import org.apache.flink.api.connector.source.SplitsAssignment;
 import org.apache.flink.changelog.fs.FsStateChangelogStorageFactory;
 import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.ExternalizedCheckpointCleanup;
+import org.apache.flink.configuration.ExternalizedCheckpointRetention;
 import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
 import org.apache.flink.configuration.RpcOptions;
@@ -763,8 +763,8 @@ public abstract class UnalignedCheckpointTestBase extends 
TestLogger {
             env.getCheckpointConfig().setForceUnalignedCheckpoints(true);
             if (generateCheckpoint) {
                 env.getCheckpointConfig()
-                        .setExternalizedCheckpointCleanupRetention(
-                                
ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
+                        .setExternalizedCheckpointRetention(
+                                
ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION);
             }
         }
 
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogCompatibilityITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogCompatibilityITCase.java
index b158682fa95..3844194821a 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogCompatibilityITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogCompatibilityITCase.java
@@ -47,7 +47,7 @@ import java.util.Optional;
 import static 
org.apache.flink.configuration.CheckpointingOptions.CHECKPOINTS_DIRECTORY;
 import static 
org.apache.flink.configuration.CheckpointingOptions.FILE_MERGING_ENABLED;
 import static 
org.apache.flink.configuration.CheckpointingOptions.SAVEPOINT_DIRECTORY;
-import static 
org.apache.flink.configuration.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION;
+import static 
org.apache.flink.configuration.ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION;
 import static 
org.apache.flink.runtime.jobgraph.SavepointRestoreSettings.forPath;
 import static 
org.apache.flink.runtime.testutils.CommonTestUtils.waitForAllTaskRunning;
 import static 
org.apache.flink.runtime.testutils.CommonTestUtils.waitForCheckpoint;
@@ -143,8 +143,7 @@ public class ChangelogCompatibilityITCase {
         env.enableChangelogStateBackend(testCase.startWithChangelog);
         if (testCase.restoreSource == RestoreSource.CHECKPOINT) {
             env.enableCheckpointing(50);
-            env.getCheckpointConfig()
-                    
.setExternalizedCheckpointCleanupRetention(RETAIN_ON_CANCELLATION);
+            
env.getCheckpointConfig().setExternalizedCheckpointRetention(RETAIN_ON_CANCELLATION);
         }
         return env;
     }
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogRecoveryCachingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogRecoveryCachingITCase.java
index 55191ef960a..af2cc09222c 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogRecoveryCachingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogRecoveryCachingITCase.java
@@ -64,7 +64,7 @@ import static 
org.apache.flink.configuration.CheckpointingOptions.CHECKPOINTS_DI
 import static 
org.apache.flink.configuration.CheckpointingOptions.CHECKPOINT_STORAGE;
 import static 
org.apache.flink.configuration.CheckpointingOptions.FILE_MERGING_ENABLED;
 import static org.apache.flink.configuration.CoreOptions.DEFAULT_PARALLELISM;
-import static 
org.apache.flink.configuration.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION;
+import static 
org.apache.flink.configuration.ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION;
 import static 
org.apache.flink.configuration.RestartStrategyOptions.RESTART_STRATEGY;
 import static org.apache.flink.configuration.StateBackendOptions.STATE_BACKEND;
 import static 
org.apache.flink.configuration.StateChangelogOptions.ENABLE_STATE_CHANGE_LOG;
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogRescalingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogRescalingITCase.java
index 4637dd5002d..2ddf3681a63 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogRescalingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogRescalingITCase.java
@@ -73,7 +73,7 @@ import static 
org.apache.flink.configuration.CheckpointingOptions.CHECKPOINT_STO
 import static 
org.apache.flink.configuration.CheckpointingOptions.FILE_MERGING_ENABLED;
 import static 
org.apache.flink.configuration.CheckpointingOptions.FS_SMALL_FILE_THRESHOLD;
 import static org.apache.flink.configuration.CoreOptions.DEFAULT_PARALLELISM;
-import static 
org.apache.flink.configuration.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION;
+import static 
org.apache.flink.configuration.ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION;
 import static org.apache.flink.configuration.PipelineOptions.OBJECT_REUSE;
 import static 
org.apache.flink.configuration.RestartStrategyOptions.RESTART_STRATEGY;
 import static org.apache.flink.configuration.StateBackendOptions.STATE_BACKEND;

Reply via email to