This is an automated email from the ASF dual-hosted git repository. zakelly pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit f5a8edbcb911c2247c83dae104bbe82adf4910e6 Author: spoon-lz <971066...@qq.com> AuthorDate: Sat May 11 16:48:04 2024 +0800 [FLINK-34615]Create a new class `ExternalizedCheckpointRetention` in python and replace the original class `ExternalizedCheckpointCleanup`. This closes #24461 --- .../reference/pyflink.datastream/checkpoint.rst | 1 + flink-python/pyflink/datastream/__init__.py | 2 + .../pyflink/datastream/checkpoint_config.py | 58 ++++++++++++++++++ .../externalized_checkpoint_retention.py | 71 ++++++++++++++++++++++ .../datastream/tests/test_check_point_config.py | 26 +++++++- 5 files changed, 157 insertions(+), 1 deletion(-) diff --git a/flink-python/docs/reference/pyflink.datastream/checkpoint.rst b/flink-python/docs/reference/pyflink.datastream/checkpoint.rst index 23239b85775..10a2783fc7d 100644 --- a/flink-python/docs/reference/pyflink.datastream/checkpoint.rst +++ b/flink-python/docs/reference/pyflink.datastream/checkpoint.rst @@ -82,6 +82,7 @@ The default limit of concurrently happening checkpoints: one. CheckpointConfig.set_checkpoint_storage_dir CheckpointConfig.get_checkpoint_storage ExternalizedCheckpointCleanup + ExternalizedCheckpointRetention CheckpointStorage diff --git a/flink-python/pyflink/datastream/__init__.py b/flink-python/pyflink/datastream/__init__.py index 6c3ba5e9559..dbb5b978a01 100644 --- a/flink-python/pyflink/datastream/__init__.py +++ b/flink-python/pyflink/datastream/__init__.py @@ -260,6 +260,7 @@ Other important classes: Tag with a name and type for identifying side output of an operator """ from pyflink.datastream.checkpoint_config import CheckpointConfig, ExternalizedCheckpointCleanup +from pyflink.datastream.externalized_checkpoint_retention import ExternalizedCheckpointRetention from pyflink.datastream.checkpointing_mode import CheckpointingMode from pyflink.datastream.data_stream import DataStream, KeyedStream, WindowedStream, \ ConnectedStreams, DataStreamSink, BroadcastStream, BroadcastConnectedStream @@ -317,6 +318,7 @@ __all__ = [ 'CheckpointingMode', 'CheckpointConfig', 'ExternalizedCheckpointCleanup', + 'ExternalizedCheckpointRetention', 'StateBackend', 'HashMapStateBackend', 'EmbeddedRocksDBStateBackend', diff --git a/flink-python/pyflink/datastream/checkpoint_config.py b/flink-python/pyflink/datastream/checkpoint_config.py index 6d6406320fc..5034e9d85bd 100644 --- a/flink-python/pyflink/datastream/checkpoint_config.py +++ b/flink-python/pyflink/datastream/checkpoint_config.py @@ -20,6 +20,7 @@ from enum import Enum from typing import Optional from pyflink.common import Duration +from pyflink.datastream.externalized_checkpoint_retention import ExternalizedCheckpointRetention from pyflink.datastream.checkpoint_storage import CheckpointStorage, _from_j_checkpoint_storage from pyflink.datastream.checkpointing_mode import CheckpointingMode from pyflink.java_gateway import get_gateway @@ -313,11 +314,49 @@ class CheckpointConfig(object): :data:`ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION`, :data:`ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION` or :data:`ExternalizedCheckpointCleanup.NO_EXTERNALIZED_CHECKPOINTS` + + note:: Deprecated. Use :func:`set_externalized_checkpoint_retention` instead. """ self._j_checkpoint_config.setExternalizedCheckpointCleanup( ExternalizedCheckpointCleanup._to_j_externalized_checkpoint_cleanup(cleanup_mode)) return self + def set_externalized_checkpoint_retention( + self, + retention_mode: 'ExternalizedCheckpointRetention') -> 'CheckpointConfig': + """ + Sets the mode for externalized checkpoint clean-up. Externalized checkpoints will be enabled + automatically unless the mode is set to + :data:`ExternalizedCheckpointRetention.NO_EXTERNALIZED_CHECKPOINTS`. + + Externalized checkpoints write their meta data out to persistent storage and are **not** + automatically cleaned up when the owning job fails or is suspended (terminating with job + status ``FAILED`` or ``SUSPENDED``). In this case, you have to manually clean up the + checkpoint state, both the meta data and actual program state. + + The :class:`ExternalizedCheckpointRetention` mode defines how an externalized checkpoint + should be cleaned up on job cancellation. If you choose to retain externalized checkpoints + on cancellation you have to handle checkpoint clean-up manually when you cancel the job as + well (terminating with job status ``CANCELED``). + + The target directory for externalized checkpoints is configured via + ``org.apache.flink.configuration.CheckpointingOptions#CHECKPOINTS_DIRECTORY``. + + Example: + :: + + >>> config.set_externalized_checkpoint_retention( + ... ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION) + + :param retention_mode: Externalized checkpoint clean-up behaviour, the mode could be + :data:`ExternalizedCheckpointRetention.DELETE_ON_CANCELLATION`, + :data:`ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION` or + :data:`ExternalizedCheckpointRetention.NO_EXTERNALIZED_CHECKPOINTS` + """ + self._j_checkpoint_config.setExternalizedCheckpointRetention( + ExternalizedCheckpointRetention._to_j_externalized_checkpoint_retention(retention_mode)) + return self + def is_externalized_checkpoints_enabled(self) -> bool: """ Returns whether checkpoints should be persisted externally. @@ -332,6 +371,8 @@ class CheckpointConfig(object): :return: The cleanup behaviour for externalized checkpoints or ``None`` if none is configured. + + note:: Deprecated. Use :func:`get_externalized_checkpoint_retention` instead. """ cleanup_mode = self._j_checkpoint_config.getExternalizedCheckpointCleanup() if cleanup_mode is None: @@ -340,6 +381,20 @@ class CheckpointConfig(object): return ExternalizedCheckpointCleanup._from_j_externalized_checkpoint_cleanup( cleanup_mode) + def get_externalized_checkpoint_retention(self) -> Optional['ExternalizedCheckpointRetention']: + """ + Returns the cleanup behaviour for externalized checkpoints. + + :return: The cleanup behaviour for externalized checkpoints or ``None`` if none is + configured. + """ + retention_mode = self._j_checkpoint_config.getExternalizedCheckpointRetention() + if retention_mode is None: + return None + else: + return ExternalizedCheckpointRetention._from_j_externalized_checkpoint_retention( + retention_mode) + def is_unaligned_checkpoints_enabled(self) -> bool: """ Returns whether unaligned checkpoints are enabled. @@ -516,6 +571,9 @@ class ExternalizedCheckpointCleanup(Enum): :data:`NO_EXTERNALIZED_CHECKPOINTS`: Externalized checkpoints are disabled completely. + + note:: Deprecated. Please use + pyflink.datastream.externalized_checkpoint_retention.ExternalizedCheckpointRetention instead. """ DELETE_ON_CANCELLATION = 0 diff --git a/flink-python/pyflink/datastream/externalized_checkpoint_retention.py b/flink-python/pyflink/datastream/externalized_checkpoint_retention.py new file mode 100644 index 00000000000..f66aad517c0 --- /dev/null +++ b/flink-python/pyflink/datastream/externalized_checkpoint_retention.py @@ -0,0 +1,71 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +from enum import Enum +from pyflink.java_gateway import get_gateway +__all__ = ['ExternalizedCheckpointRetention'] + + +class ExternalizedCheckpointRetention(Enum): + """ + Cleanup behaviour for externalized checkpoints when the job is cancelled. + + :data:`DELETE_ON_CANCELLATION`: + + Delete externalized checkpoints on job cancellation. + + All checkpoint state will be deleted when you cancel the owning + job, both the meta data and actual program state. Therefore, you + cannot resume from externalized checkpoints after the job has been + cancelled. + + Note that checkpoint state is always kept if the job terminates + with state ``FAILED``. + + :data:`RETAIN_ON_CANCELLATION`: + + Retain externalized checkpoints on job cancellation. + + All checkpoint state is kept when you cancel the owning job. You + have to manually delete both the checkpoint meta data and actual + program state after cancelling the job. + + Note that checkpoint state is always kept if the job terminates + with state ``FAILED``. + + :data:`NO_EXTERNALIZED_CHECKPOINTS`: + + Externalized checkpoints are disabled completely. + """ + + DELETE_ON_CANCELLATION = 0 + + RETAIN_ON_CANCELLATION = 1 + + NO_EXTERNALIZED_CHECKPOINTS = 2 + + @staticmethod + def _from_j_externalized_checkpoint_retention(j_retention_mode) \ + -> 'ExternalizedCheckpointRetention': + return ExternalizedCheckpointRetention[j_retention_mode.name()] + + def _to_j_externalized_checkpoint_retention(self): + gateway = get_gateway() + JExternalizedCheckpointRetention = \ + gateway.jvm.org.apache.flink.streaming.api.environment.CheckpointConfig \ + .ExternalizedCheckpointRetention + return getattr(JExternalizedCheckpointRetention, self.name) diff --git a/flink-python/pyflink/datastream/tests/test_check_point_config.py b/flink-python/pyflink/datastream/tests/test_check_point_config.py index b7eba6038b2..115e89ccee6 100644 --- a/flink-python/pyflink/datastream/tests/test_check_point_config.py +++ b/flink-python/pyflink/datastream/tests/test_check_point_config.py @@ -16,7 +16,10 @@ # limitations under the License. ################################################################################ from pyflink.common import Duration -from pyflink.datastream import (CheckpointConfig, CheckpointingMode, ExternalizedCheckpointCleanup, +from pyflink.datastream import (CheckpointConfig, + CheckpointingMode, + ExternalizedCheckpointCleanup, + ExternalizedCheckpointRetention, StreamExecutionEnvironment) from pyflink.java_gateway import get_gateway from pyflink.testing.test_case_utils import PyFlinkTestCase @@ -138,6 +141,27 @@ class CheckpointConfigTests(PyFlinkTestCase): self.assertEqual(self.checkpoint_config.get_externalized_checkpoint_cleanup(), ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION) + def test_get_set_externalized_checkpoints_retention(self): + + self.assertFalse(self.checkpoint_config.is_externalized_checkpoints_enabled()) + + self.assertEqual(self.checkpoint_config.get_externalized_checkpoint_retention(), + ExternalizedCheckpointRetention.NO_EXTERNALIZED_CHECKPOINTS) + + self.checkpoint_config.enable_externalized_checkpoints( + ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION) + + self.assertTrue(self.checkpoint_config.is_externalized_checkpoints_enabled()) + + self.assertEqual(self.checkpoint_config.get_externalized_checkpoint_retention(), + ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION) + + self.checkpoint_config.enable_externalized_checkpoints( + ExternalizedCheckpointRetention.DELETE_ON_CANCELLATION) + + self.assertEqual(self.checkpoint_config.get_externalized_checkpoint_retention(), + ExternalizedCheckpointRetention.DELETE_ON_CANCELLATION) + def test_is_unaligned_checkpointing_enabled(self): self.assertFalse(self.checkpoint_config.is_unaligned_checkpoints_enabled())