This is an automated email from the ASF dual-hosted git repository. airblader pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit f4db43a3a8d7147f3ebd1279addcb35fb2c5e38b Author: Nicolaus Weidner <nicolaus.weid...@ververica.com> AuthorDate: Wed Nov 24 09:17:34 2021 +0100 [FLINK-24987][streaming-java] Add explicit enum value NO_EXTERNAL_CHECKPOINTS as default for externalized-checkpoint-retention --- .../execution_checkpointing_configuration.html | 4 +- .../reader/CoordinatedSourceRescaleITCase.java | 2 +- .../tests/DataStreamAllroundTestJobFactory.java | 2 +- .../StickyAllocationAndLocalRecoveryTestJob.java | 2 +- .../pyflink/datastream/checkpoint_config.py | 57 +++++++++++++++++++-- .../datastream/tests/test_check_point_config.py | 3 +- .../api/environment/CheckpointConfig.java | 58 ++++++++++++++++------ .../environment/ExecutionCheckpointingOptions.java | 6 ++- .../CheckpointConfigFromConfigurationTest.java | 2 +- .../test/checkpointing/RegionFailoverITCase.java | 2 +- .../ResumeCheckpointManuallyITCase.java | 2 +- .../UnalignedCheckpointCompatibilityITCase.java | 2 +- .../UnalignedCheckpointStressITCase.java | 2 +- .../checkpointing/UnalignedCheckpointTestBase.java | 2 +- 14 files changed, 113 insertions(+), 33 deletions(-) diff --git a/docs/layouts/shortcodes/generated/execution_checkpointing_configuration.html b/docs/layouts/shortcodes/generated/execution_checkpointing_configuration.html index 04e00bd..836b2e0 100644 --- a/docs/layouts/shortcodes/generated/execution_checkpointing_configuration.html +++ b/docs/layouts/shortcodes/generated/execution_checkpointing_configuration.html @@ -22,9 +22,9 @@ </tr> <tr> <td><h5>execution.checkpointing.externalized-checkpoint-retention</h5></td> - <td style="word-wrap: break-word;">(none)</td> + <td style="word-wrap: break-word;">NO_EXTERNALIZED_CHECKPOINTS</td> <td><p>Enum</p></td> - <td>Externalized checkpoints write their meta data out to persistent storage and are not automatically cleaned up when the owning job fails or is suspended (terminating with job status <code class="highlighter-rouge">JobStatus#FAILED</code> or <code class="highlighter-rouge">JobStatus#SUSPENDED</code>. In this case, you have to manually clean up the checkpoint state, both the meta data and actual program state.<br /><br />The mode defines how an externalized checkpoint should [...] + <td>Externalized checkpoints write their meta data out to persistent storage and are not automatically cleaned up when the owning job fails or is suspended (terminating with job status <code class="highlighter-rouge">JobStatus#FAILED</code> or <code class="highlighter-rouge">JobStatus#SUSPENDED</code>). In this case, you have to manually clean up the checkpoint state, both the meta data and actual program state.<br /><br />The mode defines how an externalized checkpoint shoul [...] </tr> <tr> <td><h5>execution.checkpointing.interval</h5></td> diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceRescaleITCase.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceRescaleITCase.java index f9e0e27..0120fee 100644 --- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceRescaleITCase.java +++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceRescaleITCase.java @@ -118,7 +118,7 @@ public class CoordinatedSourceRescaleITCase extends TestLogger { StreamExecutionEnvironment.createLocalEnvironment(p, conf); env.enableCheckpointing(100); env.getCheckpointConfig() - .enableExternalizedCheckpoints( + .setExternalizedCheckpointCleanup( CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); env.setRestartStrategy(RestartStrategies.noRestart()); diff --git a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java index 83e2cde..e219c46 100644 --- a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java +++ b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java @@ -295,7 +295,7 @@ public class DataStreamAllroundTestJobFactory { "Unknown clean up mode for externalized checkpoints: " + cleanupModeConfig); } - env.getCheckpointConfig().enableExternalizedCheckpoints(cleanupMode); + env.getCheckpointConfig().setExternalizedCheckpointCleanup(cleanupMode); final int tolerableDeclinedCheckpointNumber = pt.getInt( diff --git a/flink-end-to-end-tests/flink-local-recovery-and-allocation-test/src/main/java/org/apache/flink/streaming/tests/StickyAllocationAndLocalRecoveryTestJob.java b/flink-end-to-end-tests/flink-local-recovery-and-allocation-test/src/main/java/org/apache/flink/streaming/tests/StickyAllocationAndLocalRecoveryTestJob.java index 62b6ff5..1a707e4 100644 --- a/flink-end-to-end-tests/flink-local-recovery-and-allocation-test/src/main/java/org/apache/flink/streaming/tests/StickyAllocationAndLocalRecoveryTestJob.java +++ b/flink-end-to-end-tests/flink-local-recovery-and-allocation-test/src/main/java/org/apache/flink/streaming/tests/StickyAllocationAndLocalRecoveryTestJob.java @@ -92,7 +92,7 @@ public class StickyAllocationAndLocalRecoveryTestJob { Integer.MAX_VALUE, pt.getInt("restartDelay", 0))); if (pt.getBoolean("externalizedCheckpoints", false)) { env.getCheckpointConfig() - .enableExternalizedCheckpoints( + .setExternalizedCheckpointCleanup( CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); } diff --git a/flink-python/pyflink/datastream/checkpoint_config.py b/flink-python/pyflink/datastream/checkpoint_config.py index 4119a91..f03ecf2 100644 --- a/flink-python/pyflink/datastream/checkpoint_config.py +++ b/flink-python/pyflink/datastream/checkpoint_config.py @@ -247,7 +247,9 @@ class CheckpointConfig(object): self, cleanup_mode: 'ExternalizedCheckpointCleanup') -> 'CheckpointConfig': """ - Enables checkpoints to be persisted externally. + Sets the mode for externalized checkpoint clean-up. Externalized checkpoints will be enabled + automatically unless the mode is set to + :data:`ExternalizedCheckpointCleanup.NO_EXTERNALIZED_CHECKPOINTS`. Externalized checkpoints write their meta data out to persistent storage and are **not** automatically cleaned up when the owning job fails or is suspended (terminating with job @@ -256,7 +258,7 @@ class CheckpointConfig(object): The :class:`ExternalizedCheckpointCleanup` mode defines how an externalized checkpoint should be cleaned up on job cancellation. If you choose to retain externalized checkpoints - on cancellation you have you handle checkpoint clean up manually when you cancel the job as + on cancellation you have to handle checkpoint clean-up manually when you cancel the job as well (terminating with job status ``CANCELED``). The target directory for externalized checkpoints is configured via @@ -268,14 +270,53 @@ class CheckpointConfig(object): >>> config.enable_externalized_checkpoints( ... ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) - :param cleanup_mode: Externalized checkpoint cleanup behaviour, the mode could be - :data:`ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION` or - :data:`ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION` + :param cleanup_mode: Externalized checkpoint clean-up behaviour, the mode could be + :data:`ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION`, + :data:`ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION` or + :data:`ExternalizedCheckpointCleanup.NO_EXTERNALIZED_CHECKPOINTS` + + .. note:: Deprecated in 1.15. Use :func:`set_externalized_checkpoint_cleanup` instead. """ self._j_checkpoint_config.enableExternalizedCheckpoints( ExternalizedCheckpointCleanup._to_j_externalized_checkpoint_cleanup(cleanup_mode)) return self + def set_externalized_checkpoint_cleanup( + self, + cleanup_mode: 'ExternalizedCheckpointCleanup') -> 'CheckpointConfig': + """ + Sets the mode for externalized checkpoint clean-up. Externalized checkpoints will be enabled + automatically unless the mode is set to + :data:`ExternalizedCheckpointCleanup.NO_EXTERNALIZED_CHECKPOINTS`. + + Externalized checkpoints write their meta data out to persistent storage and are **not** + automatically cleaned up when the owning job fails or is suspended (terminating with job + status ``FAILED`` or ``SUSPENDED``). In this case, you have to manually clean up the + checkpoint state, both the meta data and actual program state. + + The :class:`ExternalizedCheckpointCleanup` mode defines how an externalized checkpoint + should be cleaned up on job cancellation. If you choose to retain externalized checkpoints + on cancellation you have to handle checkpoint clean-up manually when you cancel the job as + well (terminating with job status ``CANCELED``). + + The target directory for externalized checkpoints is configured via + ``org.apache.flink.configuration.CheckpointingOptions#CHECKPOINTS_DIRECTORY``. + + Example: + :: + + >>> config.set_externalized_checkpoint_cleanup( + ... ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) + + :param cleanup_mode: Externalized checkpoint clean-up behaviour, the mode could be + :data:`ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION`, + :data:`ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION` or + :data:`ExternalizedCheckpointCleanup.NO_EXTERNALIZED_CHECKPOINTS` + """ + self._j_checkpoint_config.setExternalizedCheckpointCleanup( + ExternalizedCheckpointCleanup._to_j_externalized_checkpoint_cleanup(cleanup_mode)) + return self + def is_externalized_checkpoints_enabled(self) -> bool: """ Returns whether checkpoints should be persisted externally. @@ -449,12 +490,18 @@ class ExternalizedCheckpointCleanup(Enum): Note that checkpoint state is always kept if the job terminates with state ``FAILED``. + + :data:`NO_EXTERNALIZED_CHECKPOINTS`: + + Externalized checkpoints are disabled completely. """ DELETE_ON_CANCELLATION = 0 RETAIN_ON_CANCELLATION = 1 + NO_EXTERNALIZED_CHECKPOINTS = 2 + @staticmethod def _from_j_externalized_checkpoint_cleanup(j_cleanup_mode) \ -> 'ExternalizedCheckpointCleanup': diff --git a/flink-python/pyflink/datastream/tests/test_check_point_config.py b/flink-python/pyflink/datastream/tests/test_check_point_config.py index 11420f1..b7eba60 100644 --- a/flink-python/pyflink/datastream/tests/test_check_point_config.py +++ b/flink-python/pyflink/datastream/tests/test_check_point_config.py @@ -121,7 +121,8 @@ class CheckpointConfigTests(PyFlinkTestCase): self.assertFalse(self.checkpoint_config.is_externalized_checkpoints_enabled()) - self.assertIsNone(self.checkpoint_config.get_externalized_checkpoint_cleanup()) + self.assertEqual(self.checkpoint_config.get_externalized_checkpoint_cleanup(), + ExternalizedCheckpointCleanup.NO_EXTERNALIZED_CHECKPOINTS) self.checkpoint_config.enable_externalized_checkpoints( ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java index f9d42b2..b2a2d8a 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java @@ -107,7 +107,8 @@ public class CheckpointConfig implements java.io.Serializable { private boolean approximateLocalRecovery; /** Cleanup behaviour for persistent checkpoints. */ - private ExternalizedCheckpointCleanup externalizedCheckpointCleanup; + private ExternalizedCheckpointCleanup externalizedCheckpointCleanup = + ExecutionCheckpointingOptions.EXTERNALIZED_CHECKPOINT.defaultValue(); /** * Task would not fail if there is an error in their checkpointing. @@ -428,7 +429,9 @@ public class CheckpointConfig implements java.io.Serializable { } /** - * Enables checkpoints to be persisted externally. + * Sets the mode for externalized checkpoint clean-up. Externalized checkpoints will be enabled + * automatically unless the mode is set to {@link + * ExternalizedCheckpointCleanup#NO_EXTERNALIZED_CHECKPOINTS}. * * <p>Externalized checkpoints write their meta data out to persistent storage and are * <strong>not</strong> automatically cleaned up when the owning job fails or is suspended @@ -438,15 +441,44 @@ public class CheckpointConfig implements java.io.Serializable { * * <p>The {@link ExternalizedCheckpointCleanup} mode defines how an externalized checkpoint * should be cleaned up on job cancellation. If you choose to retain externalized checkpoints on - * cancellation you have you handle checkpoint clean up manually when you cancel the job as well + * cancellation you have to handle checkpoint clean-up manually when you cancel the job as well * (terminating with job status {@link JobStatus#CANCELED}). * * <p>The target directory for externalized checkpoints is configured via {@link * org.apache.flink.configuration.CheckpointingOptions#CHECKPOINTS_DIRECTORY}. * - * @param cleanupMode Externalized checkpoint cleanup behaviour. + * @param cleanupMode Externalized checkpoint clean-up behaviour. */ @PublicEvolving + public void setExternalizedCheckpointCleanup(ExternalizedCheckpointCleanup cleanupMode) { + this.externalizedCheckpointCleanup = checkNotNull(cleanupMode); + } + + /** + * Sets the mode for externalized checkpoint clean-up. Externalized checkpoints will be enabled + * automatically unless the mode is set to {@link + * ExternalizedCheckpointCleanup#NO_EXTERNALIZED_CHECKPOINTS}. + * + * <p>Externalized checkpoints write their meta data out to persistent storage and are + * <strong>not</strong> automatically cleaned up when the owning job fails or is suspended + * (terminating with job status {@link JobStatus#FAILED} or {@link JobStatus#SUSPENDED}). In + * this case, you have to manually clean up the checkpoint state, both the meta data and actual + * program state. + * + * <p>The {@link ExternalizedCheckpointCleanup} mode defines how an externalized checkpoint + * should be cleaned up on job cancellation. If you choose to retain externalized checkpoints on + * cancellation you have to handle checkpoint clean-up manually when you cancel the job as well + * (terminating with job status {@link JobStatus#CANCELED}). + * + * <p>The target directory for externalized checkpoints is configured via {@link + * org.apache.flink.configuration.CheckpointingOptions#CHECKPOINTS_DIRECTORY}. + * + * @param cleanupMode Externalized checkpoint clean-up behaviour. + * @deprecated use {@link #setExternalizedCheckpointCleanup(ExternalizedCheckpointCleanup)} + * instead. + */ + @PublicEvolving + @Deprecated public void enableExternalizedCheckpoints(ExternalizedCheckpointCleanup cleanupMode) { this.externalizedCheckpointCleanup = checkNotNull(cleanupMode); } @@ -458,7 +490,8 @@ public class CheckpointConfig implements java.io.Serializable { */ @PublicEvolving public boolean isExternalizedCheckpointsEnabled() { - return externalizedCheckpointCleanup != null; + return externalizedCheckpointCleanup + != ExternalizedCheckpointCleanup.NO_EXTERNALIZED_CHECKPOINTS; } /** @@ -712,7 +745,7 @@ public class CheckpointConfig implements java.io.Serializable { * <p>Note that checkpoint state is always kept if the job terminates with state {@link * JobStatus#FAILED}. */ - DELETE_ON_CANCELLATION(true), + DELETE_ON_CANCELLATION, /** * Retain externalized checkpoints on job cancellation. @@ -723,13 +756,10 @@ public class CheckpointConfig implements java.io.Serializable { * <p>Note that checkpoint state is always kept if the job terminates with state {@link * JobStatus#FAILED}. */ - RETAIN_ON_CANCELLATION(false); + RETAIN_ON_CANCELLATION, - private final boolean deleteOnCancellation; - - ExternalizedCheckpointCleanup(boolean deleteOnCancellation) { - this.deleteOnCancellation = deleteOnCancellation; - } + /** Externalized checkpoints are disabled completely. */ + NO_EXTERNALIZED_CHECKPOINTS; /** * Returns whether persistent checkpoints shall be discarded on cancellation of the job. @@ -738,7 +768,7 @@ public class CheckpointConfig implements java.io.Serializable { * the job. */ public boolean deleteOnCancellation() { - return deleteOnCancellation; + return this == DELETE_ON_CANCELLATION; } } @@ -772,7 +802,7 @@ public class CheckpointConfig implements java.io.Serializable { .ifPresent(this::setTolerableCheckpointFailureNumber); configuration .getOptional(ExecutionCheckpointingOptions.EXTERNALIZED_CHECKPOINT) - .ifPresent(this::enableExternalizedCheckpoints); + .ifPresent(this::setExternalizedCheckpointCleanup); configuration .getOptional(ExecutionCheckpointingOptions.ENABLE_UNALIGNED) .ifPresent(this::enableUnalignedCheckpoints); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java index 7b83772..536788b 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java @@ -91,13 +91,15 @@ public class ExecutionCheckpointingOptions { EXTERNALIZED_CHECKPOINT = ConfigOptions.key("execution.checkpointing.externalized-checkpoint-retention") .enumType(CheckpointConfig.ExternalizedCheckpointCleanup.class) - .noDefaultValue() + .defaultValue( + CheckpointConfig.ExternalizedCheckpointCleanup + .NO_EXTERNALIZED_CHECKPOINTS) .withDescription( Description.builder() .text( "Externalized checkpoints write their meta data out to persistent storage and are not " + "automatically cleaned up when the owning job fails or is suspended (terminating with job " - + "status %s or %s. In this case, you have to manually clean up the checkpoint state, both the " + + "status %s or %s). In this case, you have to manually clean up the checkpoint state, both the " + "meta data and actual program state.", TextElement.code("JobStatus#FAILED"), TextElement.code("JobStatus#SUSPENDED")) diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/CheckpointConfigFromConfigurationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/CheckpointConfigFromConfigurationTest.java index 6bb1d1d..fb8d59c 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/CheckpointConfigFromConfigurationTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/CheckpointConfigFromConfigurationTest.java @@ -74,7 +74,7 @@ public class CheckpointConfigFromConfigurationTest { .whenSetFromFile( "execution.checkpointing.externalized-checkpoint-retention", "RETAIN_ON_CANCELLATION") - .viaSetter(CheckpointConfig::enableExternalizedCheckpoints) + .viaSetter(CheckpointConfig::setExternalizedCheckpointCleanup) .getterVia(CheckpointConfig::getExternalizedCheckpointCleanup) .nonDefaultValue( CheckpointConfig.ExternalizedCheckpointCleanup diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RegionFailoverITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RegionFailoverITCase.java index 7a46dd1..0bf295b 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RegionFailoverITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RegionFailoverITCase.java @@ -173,7 +173,7 @@ public class RegionFailoverITCase extends TestLogger { env.setMaxParallelism(MAX_PARALLELISM); env.enableCheckpointing(200, CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig() - .enableExternalizedCheckpoints( + .setExternalizedCheckpointCleanup( CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); env.disableOperatorChaining(); diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java index c324931..eeda1b0 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java @@ -371,7 +371,7 @@ public class ResumeCheckpointManuallyITCase extends TestLogger { env.setStateBackend(backend); env.setParallelism(PARALLELISM); env.getCheckpointConfig() - .enableExternalizedCheckpoints( + .setExternalizedCheckpointCleanup( CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); env.addSource(new NotifyingInfiniteTupleSource(10_000)) diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointCompatibilityITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointCompatibilityITCase.java index 0b8d570..c116621 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointCompatibilityITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointCompatibilityITCase.java @@ -198,7 +198,7 @@ public class UnalignedCheckpointCompatibilityITCase extends TestLogger { env.setRestartStrategy(new RestartStrategies.NoRestartStrategyConfiguration()); env.getCheckpointConfig().enableUnalignedCheckpoints(!isAligned); env.getCheckpointConfig().setAlignmentTimeout(Duration.ZERO); - env.getCheckpointConfig().enableExternalizedCheckpoints(RETAIN_ON_CANCELLATION); + env.getCheckpointConfig().setExternalizedCheckpointCleanup(RETAIN_ON_CANCELLATION); if (checkpointingInterval > 0) { env.enableCheckpointing(checkpointingInterval); } diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointStressITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointStressITCase.java index 06e80f5..91ddada 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointStressITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointStressITCase.java @@ -307,7 +307,7 @@ public class UnalignedCheckpointStressITCase extends TestLogger { env.getCheckpointConfig().enableUnalignedCheckpoints(); env.setRestartStrategy(RestartStrategies.noRestart()); env.getCheckpointConfig() - .enableExternalizedCheckpoints( + .setExternalizedCheckpointCleanup( ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION); return env; } diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java index 422fc83..bdfba39 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java @@ -743,7 +743,7 @@ public abstract class UnalignedCheckpointTestBase extends TestLogger { env.getCheckpointConfig().setForceUnalignedCheckpoints(true); if (generateCheckpoint) { env.getCheckpointConfig() - .enableExternalizedCheckpoints( + .setExternalizedCheckpointCleanup( CheckpointConfig.ExternalizedCheckpointCleanup .RETAIN_ON_CANCELLATION); }