This is an automated email from the ASF dual-hosted git repository.

airblader pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f4db43a3a8d7147f3ebd1279addcb35fb2c5e38b
Author: Nicolaus Weidner <nicolaus.weid...@ververica.com>
AuthorDate: Wed Nov 24 09:17:34 2021 +0100

    [FLINK-24987][streaming-java] Add explicit enum value 
NO_EXTERNAL_CHECKPOINTS as default for externalized-checkpoint-retention
---
 .../execution_checkpointing_configuration.html     |  4 +-
 .../reader/CoordinatedSourceRescaleITCase.java     |  2 +-
 .../tests/DataStreamAllroundTestJobFactory.java    |  2 +-
 .../StickyAllocationAndLocalRecoveryTestJob.java   |  2 +-
 .../pyflink/datastream/checkpoint_config.py        | 57 +++++++++++++++++++--
 .../datastream/tests/test_check_point_config.py    |  3 +-
 .../api/environment/CheckpointConfig.java          | 58 ++++++++++++++++------
 .../environment/ExecutionCheckpointingOptions.java |  6 ++-
 .../CheckpointConfigFromConfigurationTest.java     |  2 +-
 .../test/checkpointing/RegionFailoverITCase.java   |  2 +-
 .../ResumeCheckpointManuallyITCase.java            |  2 +-
 .../UnalignedCheckpointCompatibilityITCase.java    |  2 +-
 .../UnalignedCheckpointStressITCase.java           |  2 +-
 .../checkpointing/UnalignedCheckpointTestBase.java |  2 +-
 14 files changed, 113 insertions(+), 33 deletions(-)

diff --git 
a/docs/layouts/shortcodes/generated/execution_checkpointing_configuration.html 
b/docs/layouts/shortcodes/generated/execution_checkpointing_configuration.html
index 04e00bd..836b2e0 100644
--- 
a/docs/layouts/shortcodes/generated/execution_checkpointing_configuration.html
+++ 
b/docs/layouts/shortcodes/generated/execution_checkpointing_configuration.html
@@ -22,9 +22,9 @@
         </tr>
         <tr>
             
<td><h5>execution.checkpointing.externalized-checkpoint-retention</h5></td>
-            <td style="word-wrap: break-word;">(none)</td>
+            <td style="word-wrap: break-word;">NO_EXTERNALIZED_CHECKPOINTS</td>
             <td><p>Enum</p></td>
-            <td>Externalized checkpoints write their meta data out to 
persistent storage and are not automatically cleaned up when the owning job 
fails or is suspended (terminating with job status <code 
class="highlighter-rouge">JobStatus#FAILED</code> or <code 
class="highlighter-rouge">JobStatus#SUSPENDED</code>. In this case, you have to 
manually clean up the checkpoint state, both the meta data and actual program 
state.<br /><br />The mode defines how an externalized checkpoint should [...]
+            <td>Externalized checkpoints write their meta data out to 
persistent storage and are not automatically cleaned up when the owning job 
fails or is suspended (terminating with job status <code 
class="highlighter-rouge">JobStatus#FAILED</code> or <code 
class="highlighter-rouge">JobStatus#SUSPENDED</code>). In this case, you have 
to manually clean up the checkpoint state, both the meta data and actual 
program state.<br /><br />The mode defines how an externalized checkpoint shoul 
[...]
         </tr>
         <tr>
             <td><h5>execution.checkpointing.interval</h5></td>
diff --git 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceRescaleITCase.java
 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceRescaleITCase.java
index f9e0e27..0120fee 100644
--- 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceRescaleITCase.java
+++ 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceRescaleITCase.java
@@ -118,7 +118,7 @@ public class CoordinatedSourceRescaleITCase extends 
TestLogger {
                 StreamExecutionEnvironment.createLocalEnvironment(p, conf);
         env.enableCheckpointing(100);
         env.getCheckpointConfig()
-                .enableExternalizedCheckpoints(
+                .setExternalizedCheckpointCleanup(
                         
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
         env.setRestartStrategy(RestartStrategies.noRestart());
 
diff --git 
a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java
 
b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java
index 83e2cde..e219c46 100644
--- 
a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java
+++ 
b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java
@@ -295,7 +295,7 @@ public class DataStreamAllroundTestJobFactory {
                             "Unknown clean up mode for externalized 
checkpoints: "
                                     + cleanupModeConfig);
             }
-            
env.getCheckpointConfig().enableExternalizedCheckpoints(cleanupMode);
+            
env.getCheckpointConfig().setExternalizedCheckpointCleanup(cleanupMode);
 
             final int tolerableDeclinedCheckpointNumber =
                     pt.getInt(
diff --git 
a/flink-end-to-end-tests/flink-local-recovery-and-allocation-test/src/main/java/org/apache/flink/streaming/tests/StickyAllocationAndLocalRecoveryTestJob.java
 
b/flink-end-to-end-tests/flink-local-recovery-and-allocation-test/src/main/java/org/apache/flink/streaming/tests/StickyAllocationAndLocalRecoveryTestJob.java
index 62b6ff5..1a707e4 100644
--- 
a/flink-end-to-end-tests/flink-local-recovery-and-allocation-test/src/main/java/org/apache/flink/streaming/tests/StickyAllocationAndLocalRecoveryTestJob.java
+++ 
b/flink-end-to-end-tests/flink-local-recovery-and-allocation-test/src/main/java/org/apache/flink/streaming/tests/StickyAllocationAndLocalRecoveryTestJob.java
@@ -92,7 +92,7 @@ public class StickyAllocationAndLocalRecoveryTestJob {
                         Integer.MAX_VALUE, pt.getInt("restartDelay", 0)));
         if (pt.getBoolean("externalizedCheckpoints", false)) {
             env.getCheckpointConfig()
-                    .enableExternalizedCheckpoints(
+                    .setExternalizedCheckpointCleanup(
                             
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
         }
 
diff --git a/flink-python/pyflink/datastream/checkpoint_config.py 
b/flink-python/pyflink/datastream/checkpoint_config.py
index 4119a91..f03ecf2 100644
--- a/flink-python/pyflink/datastream/checkpoint_config.py
+++ b/flink-python/pyflink/datastream/checkpoint_config.py
@@ -247,7 +247,9 @@ class CheckpointConfig(object):
             self,
             cleanup_mode: 'ExternalizedCheckpointCleanup') -> 
'CheckpointConfig':
         """
-        Enables checkpoints to be persisted externally.
+        Sets the mode for externalized checkpoint clean-up. Externalized 
checkpoints will be enabled
+        automatically unless the mode is set to
+        :data:`ExternalizedCheckpointCleanup.NO_EXTERNALIZED_CHECKPOINTS`.
 
         Externalized checkpoints write their meta data out to persistent 
storage and are **not**
         automatically cleaned up when the owning job fails or is suspended 
(terminating with job
@@ -256,7 +258,7 @@ class CheckpointConfig(object):
 
         The :class:`ExternalizedCheckpointCleanup` mode defines how an 
externalized checkpoint
         should be cleaned up on job cancellation. If you choose to retain 
externalized checkpoints
-        on cancellation you have you handle checkpoint clean up manually when 
you cancel the job as
+        on cancellation you have to handle checkpoint clean-up manually when 
you cancel the job as
         well (terminating with job status ``CANCELED``).
 
         The target directory for externalized checkpoints is configured via
@@ -268,14 +270,53 @@ class CheckpointConfig(object):
             >>> config.enable_externalized_checkpoints(
             ...     ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
 
-        :param cleanup_mode: Externalized checkpoint cleanup behaviour, the 
mode could be
-                             
:data:`ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION` or
-                             
:data:`ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION`
+        :param cleanup_mode: Externalized checkpoint clean-up behaviour, the 
mode could be
+                             
:data:`ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION`,
+                             
:data:`ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION` or
+                             
:data:`ExternalizedCheckpointCleanup.NO_EXTERNALIZED_CHECKPOINTS`
+
+        .. note:: Deprecated in 1.15. Use 
:func:`set_externalized_checkpoint_cleanup` instead.
         """
         self._j_checkpoint_config.enableExternalizedCheckpoints(
             
ExternalizedCheckpointCleanup._to_j_externalized_checkpoint_cleanup(cleanup_mode))
         return self
 
+    def set_externalized_checkpoint_cleanup(
+            self,
+            cleanup_mode: 'ExternalizedCheckpointCleanup') -> 
'CheckpointConfig':
+        """
+        Sets the mode for externalized checkpoint clean-up. Externalized 
checkpoints will be enabled
+        automatically unless the mode is set to
+        :data:`ExternalizedCheckpointCleanup.NO_EXTERNALIZED_CHECKPOINTS`.
+
+        Externalized checkpoints write their meta data out to persistent 
storage and are **not**
+        automatically cleaned up when the owning job fails or is suspended 
(terminating with job
+        status ``FAILED`` or ``SUSPENDED``). In this case, you have to 
manually clean up the
+        checkpoint state, both the meta data and actual program state.
+
+        The :class:`ExternalizedCheckpointCleanup` mode defines how an 
externalized checkpoint
+        should be cleaned up on job cancellation. If you choose to retain 
externalized checkpoints
+        on cancellation you have to handle checkpoint clean-up manually when 
you cancel the job as
+        well (terminating with job status ``CANCELED``).
+
+        The target directory for externalized checkpoints is configured via
+        
``org.apache.flink.configuration.CheckpointingOptions#CHECKPOINTS_DIRECTORY``.
+
+        Example:
+        ::
+
+            >>> config.set_externalized_checkpoint_cleanup(
+            ...     ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
+
+        :param cleanup_mode: Externalized checkpoint clean-up behaviour, the 
mode could be
+                             
:data:`ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION`,
+                             
:data:`ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION` or
+                             
:data:`ExternalizedCheckpointCleanup.NO_EXTERNALIZED_CHECKPOINTS`
+        """
+        self._j_checkpoint_config.setExternalizedCheckpointCleanup(
+            
ExternalizedCheckpointCleanup._to_j_externalized_checkpoint_cleanup(cleanup_mode))
+        return self
+
     def is_externalized_checkpoints_enabled(self) -> bool:
         """
         Returns whether checkpoints should be persisted externally.
@@ -449,12 +490,18 @@ class ExternalizedCheckpointCleanup(Enum):
 
     Note that checkpoint state is always kept if the job terminates
     with state ``FAILED``.
+
+    :data:`NO_EXTERNALIZED_CHECKPOINTS`:
+
+    Externalized checkpoints are disabled completely.
     """
 
     DELETE_ON_CANCELLATION = 0
 
     RETAIN_ON_CANCELLATION = 1
 
+    NO_EXTERNALIZED_CHECKPOINTS = 2
+
     @staticmethod
     def _from_j_externalized_checkpoint_cleanup(j_cleanup_mode) \
             -> 'ExternalizedCheckpointCleanup':
diff --git a/flink-python/pyflink/datastream/tests/test_check_point_config.py 
b/flink-python/pyflink/datastream/tests/test_check_point_config.py
index 11420f1..b7eba60 100644
--- a/flink-python/pyflink/datastream/tests/test_check_point_config.py
+++ b/flink-python/pyflink/datastream/tests/test_check_point_config.py
@@ -121,7 +121,8 @@ class CheckpointConfigTests(PyFlinkTestCase):
 
         
self.assertFalse(self.checkpoint_config.is_externalized_checkpoints_enabled())
 
-        
self.assertIsNone(self.checkpoint_config.get_externalized_checkpoint_cleanup())
+        
self.assertEqual(self.checkpoint_config.get_externalized_checkpoint_cleanup(),
+                         
ExternalizedCheckpointCleanup.NO_EXTERNALIZED_CHECKPOINTS)
 
         self.checkpoint_config.enable_externalized_checkpoints(
             ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
index f9d42b2..b2a2d8a 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
@@ -107,7 +107,8 @@ public class CheckpointConfig implements 
java.io.Serializable {
     private boolean approximateLocalRecovery;
 
     /** Cleanup behaviour for persistent checkpoints. */
-    private ExternalizedCheckpointCleanup externalizedCheckpointCleanup;
+    private ExternalizedCheckpointCleanup externalizedCheckpointCleanup =
+            
ExecutionCheckpointingOptions.EXTERNALIZED_CHECKPOINT.defaultValue();
 
     /**
      * Task would not fail if there is an error in their checkpointing.
@@ -428,7 +429,9 @@ public class CheckpointConfig implements 
java.io.Serializable {
     }
 
     /**
-     * Enables checkpoints to be persisted externally.
+     * Sets the mode for externalized checkpoint clean-up. Externalized 
checkpoints will be enabled
+     * automatically unless the mode is set to {@link
+     * ExternalizedCheckpointCleanup#NO_EXTERNALIZED_CHECKPOINTS}.
      *
      * <p>Externalized checkpoints write their meta data out to persistent 
storage and are
      * <strong>not</strong> automatically cleaned up when the owning job fails 
or is suspended
@@ -438,15 +441,44 @@ public class CheckpointConfig implements 
java.io.Serializable {
      *
      * <p>The {@link ExternalizedCheckpointCleanup} mode defines how an 
externalized checkpoint
      * should be cleaned up on job cancellation. If you choose to retain 
externalized checkpoints on
-     * cancellation you have you handle checkpoint clean up manually when you 
cancel the job as well
+     * cancellation you have to handle checkpoint clean-up manually when you 
cancel the job as well
      * (terminating with job status {@link JobStatus#CANCELED}).
      *
      * <p>The target directory for externalized checkpoints is configured via 
{@link
      * 
org.apache.flink.configuration.CheckpointingOptions#CHECKPOINTS_DIRECTORY}.
      *
-     * @param cleanupMode Externalized checkpoint cleanup behaviour.
+     * @param cleanupMode Externalized checkpoint clean-up behaviour.
      */
     @PublicEvolving
+    public void setExternalizedCheckpointCleanup(ExternalizedCheckpointCleanup 
cleanupMode) {
+        this.externalizedCheckpointCleanup = checkNotNull(cleanupMode);
+    }
+
+    /**
+     * Sets the mode for externalized checkpoint clean-up. Externalized 
checkpoints will be enabled
+     * automatically unless the mode is set to {@link
+     * ExternalizedCheckpointCleanup#NO_EXTERNALIZED_CHECKPOINTS}.
+     *
+     * <p>Externalized checkpoints write their meta data out to persistent 
storage and are
+     * <strong>not</strong> automatically cleaned up when the owning job fails 
or is suspended
+     * (terminating with job status {@link JobStatus#FAILED} or {@link 
JobStatus#SUSPENDED}). In
+     * this case, you have to manually clean up the checkpoint state, both the 
meta data and actual
+     * program state.
+     *
+     * <p>The {@link ExternalizedCheckpointCleanup} mode defines how an 
externalized checkpoint
+     * should be cleaned up on job cancellation. If you choose to retain 
externalized checkpoints on
+     * cancellation you have to handle checkpoint clean-up manually when you 
cancel the job as well
+     * (terminating with job status {@link JobStatus#CANCELED}).
+     *
+     * <p>The target directory for externalized checkpoints is configured via 
{@link
+     * 
org.apache.flink.configuration.CheckpointingOptions#CHECKPOINTS_DIRECTORY}.
+     *
+     * @param cleanupMode Externalized checkpoint clean-up behaviour.
+     * @deprecated use {@link 
#setExternalizedCheckpointCleanup(ExternalizedCheckpointCleanup)}
+     *     instead.
+     */
+    @PublicEvolving
+    @Deprecated
     public void enableExternalizedCheckpoints(ExternalizedCheckpointCleanup 
cleanupMode) {
         this.externalizedCheckpointCleanup = checkNotNull(cleanupMode);
     }
@@ -458,7 +490,8 @@ public class CheckpointConfig implements 
java.io.Serializable {
      */
     @PublicEvolving
     public boolean isExternalizedCheckpointsEnabled() {
-        return externalizedCheckpointCleanup != null;
+        return externalizedCheckpointCleanup
+                != ExternalizedCheckpointCleanup.NO_EXTERNALIZED_CHECKPOINTS;
     }
 
     /**
@@ -712,7 +745,7 @@ public class CheckpointConfig implements 
java.io.Serializable {
          * <p>Note that checkpoint state is always kept if the job terminates 
with state {@link
          * JobStatus#FAILED}.
          */
-        DELETE_ON_CANCELLATION(true),
+        DELETE_ON_CANCELLATION,
 
         /**
          * Retain externalized checkpoints on job cancellation.
@@ -723,13 +756,10 @@ public class CheckpointConfig implements 
java.io.Serializable {
          * <p>Note that checkpoint state is always kept if the job terminates 
with state {@link
          * JobStatus#FAILED}.
          */
-        RETAIN_ON_CANCELLATION(false);
+        RETAIN_ON_CANCELLATION,
 
-        private final boolean deleteOnCancellation;
-
-        ExternalizedCheckpointCleanup(boolean deleteOnCancellation) {
-            this.deleteOnCancellation = deleteOnCancellation;
-        }
+        /** Externalized checkpoints are disabled completely. */
+        NO_EXTERNALIZED_CHECKPOINTS;
 
         /**
          * Returns whether persistent checkpoints shall be discarded on 
cancellation of the job.
@@ -738,7 +768,7 @@ public class CheckpointConfig implements 
java.io.Serializable {
          *     the job.
          */
         public boolean deleteOnCancellation() {
-            return deleteOnCancellation;
+            return this == DELETE_ON_CANCELLATION;
         }
     }
 
@@ -772,7 +802,7 @@ public class CheckpointConfig implements 
java.io.Serializable {
                 .ifPresent(this::setTolerableCheckpointFailureNumber);
         configuration
                 
.getOptional(ExecutionCheckpointingOptions.EXTERNALIZED_CHECKPOINT)
-                .ifPresent(this::enableExternalizedCheckpoints);
+                .ifPresent(this::setExternalizedCheckpointCleanup);
         configuration
                 .getOptional(ExecutionCheckpointingOptions.ENABLE_UNALIGNED)
                 .ifPresent(this::enableUnalignedCheckpoints);
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java
index 7b83772..536788b 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java
@@ -91,13 +91,15 @@ public class ExecutionCheckpointingOptions {
             EXTERNALIZED_CHECKPOINT =
                     
ConfigOptions.key("execution.checkpointing.externalized-checkpoint-retention")
                             
.enumType(CheckpointConfig.ExternalizedCheckpointCleanup.class)
-                            .noDefaultValue()
+                            .defaultValue(
+                                    
CheckpointConfig.ExternalizedCheckpointCleanup
+                                            .NO_EXTERNALIZED_CHECKPOINTS)
                             .withDescription(
                                     Description.builder()
                                             .text(
                                                     "Externalized checkpoints 
write their meta data out to persistent storage and are not "
                                                             + "automatically 
cleaned up when the owning job fails or is suspended (terminating with job "
-                                                            + "status %s or 
%s. In this case, you have to manually clean up the checkpoint state, both the "
+                                                            + "status %s or 
%s). In this case, you have to manually clean up the checkpoint state, both the 
"
                                                             + "meta data and 
actual program state.",
                                                     
TextElement.code("JobStatus#FAILED"),
                                                     
TextElement.code("JobStatus#SUSPENDED"))
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/CheckpointConfigFromConfigurationTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/CheckpointConfigFromConfigurationTest.java
index 6bb1d1d..fb8d59c 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/CheckpointConfigFromConfigurationTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/CheckpointConfigFromConfigurationTest.java
@@ -74,7 +74,7 @@ public class CheckpointConfigFromConfigurationTest {
                         .whenSetFromFile(
                                 
"execution.checkpointing.externalized-checkpoint-retention",
                                 "RETAIN_ON_CANCELLATION")
-                        
.viaSetter(CheckpointConfig::enableExternalizedCheckpoints)
+                        
.viaSetter(CheckpointConfig::setExternalizedCheckpointCleanup)
                         
.getterVia(CheckpointConfig::getExternalizedCheckpointCleanup)
                         .nonDefaultValue(
                                 CheckpointConfig.ExternalizedCheckpointCleanup
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RegionFailoverITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RegionFailoverITCase.java
index 7a46dd1..0bf295b 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RegionFailoverITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RegionFailoverITCase.java
@@ -173,7 +173,7 @@ public class RegionFailoverITCase extends TestLogger {
         env.setMaxParallelism(MAX_PARALLELISM);
         env.enableCheckpointing(200, CheckpointingMode.EXACTLY_ONCE);
         env.getCheckpointConfig()
-                .enableExternalizedCheckpoints(
+                .setExternalizedCheckpointCleanup(
                         
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
         env.disableOperatorChaining();
 
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
index c324931..eeda1b0 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
@@ -371,7 +371,7 @@ public class ResumeCheckpointManuallyITCase extends 
TestLogger {
         env.setStateBackend(backend);
         env.setParallelism(PARALLELISM);
         env.getCheckpointConfig()
-                .enableExternalizedCheckpoints(
+                .setExternalizedCheckpointCleanup(
                         
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
 
         env.addSource(new NotifyingInfiniteTupleSource(10_000))
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointCompatibilityITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointCompatibilityITCase.java
index 0b8d570..c116621 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointCompatibilityITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointCompatibilityITCase.java
@@ -198,7 +198,7 @@ public class UnalignedCheckpointCompatibilityITCase extends 
TestLogger {
         env.setRestartStrategy(new 
RestartStrategies.NoRestartStrategyConfiguration());
         env.getCheckpointConfig().enableUnalignedCheckpoints(!isAligned);
         env.getCheckpointConfig().setAlignmentTimeout(Duration.ZERO);
-        
env.getCheckpointConfig().enableExternalizedCheckpoints(RETAIN_ON_CANCELLATION);
+        
env.getCheckpointConfig().setExternalizedCheckpointCleanup(RETAIN_ON_CANCELLATION);
         if (checkpointingInterval > 0) {
             env.enableCheckpointing(checkpointingInterval);
         }
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointStressITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointStressITCase.java
index 06e80f5..91ddada 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointStressITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointStressITCase.java
@@ -307,7 +307,7 @@ public class UnalignedCheckpointStressITCase extends 
TestLogger {
         env.getCheckpointConfig().enableUnalignedCheckpoints();
         env.setRestartStrategy(RestartStrategies.noRestart());
         env.getCheckpointConfig()
-                .enableExternalizedCheckpoints(
+                .setExternalizedCheckpointCleanup(
                         ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);
         return env;
     }
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java
index 422fc83..bdfba39 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java
@@ -743,7 +743,7 @@ public abstract class UnalignedCheckpointTestBase extends 
TestLogger {
             env.getCheckpointConfig().setForceUnalignedCheckpoints(true);
             if (generateCheckpoint) {
                 env.getCheckpointConfig()
-                        .enableExternalizedCheckpoints(
+                        .setExternalizedCheckpointCleanup(
                                 CheckpointConfig.ExternalizedCheckpointCleanup
                                         .RETAIN_ON_CANCELLATION);
             }

Reply via email to