This is an automated email from the ASF dual-hosted git repository.

zakelly pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f5a8edbcb911c2247c83dae104bbe82adf4910e6
Author: spoon-lz <971066...@qq.com>
AuthorDate: Sat May 11 16:48:04 2024 +0800

    [FLINK-34615]Create a new class `ExternalizedCheckpointRetention` in python 
and replace the original class `ExternalizedCheckpointCleanup`.
    
    This closes #24461
---
 .../reference/pyflink.datastream/checkpoint.rst    |  1 +
 flink-python/pyflink/datastream/__init__.py        |  2 +
 .../pyflink/datastream/checkpoint_config.py        | 58 ++++++++++++++++++
 .../externalized_checkpoint_retention.py           | 71 ++++++++++++++++++++++
 .../datastream/tests/test_check_point_config.py    | 26 +++++++-
 5 files changed, 157 insertions(+), 1 deletion(-)

diff --git a/flink-python/docs/reference/pyflink.datastream/checkpoint.rst 
b/flink-python/docs/reference/pyflink.datastream/checkpoint.rst
index 23239b85775..10a2783fc7d 100644
--- a/flink-python/docs/reference/pyflink.datastream/checkpoint.rst
+++ b/flink-python/docs/reference/pyflink.datastream/checkpoint.rst
@@ -82,6 +82,7 @@ The default limit of concurrently happening checkpoints: one.
     CheckpointConfig.set_checkpoint_storage_dir
     CheckpointConfig.get_checkpoint_storage
     ExternalizedCheckpointCleanup
+    ExternalizedCheckpointRetention
 
 
 CheckpointStorage
diff --git a/flink-python/pyflink/datastream/__init__.py 
b/flink-python/pyflink/datastream/__init__.py
index 6c3ba5e9559..dbb5b978a01 100644
--- a/flink-python/pyflink/datastream/__init__.py
+++ b/flink-python/pyflink/datastream/__init__.py
@@ -260,6 +260,7 @@ Other important classes:
       Tag with a name and type for identifying side output of an operator
 """
 from pyflink.datastream.checkpoint_config import CheckpointConfig, 
ExternalizedCheckpointCleanup
+from pyflink.datastream.externalized_checkpoint_retention import 
ExternalizedCheckpointRetention
 from pyflink.datastream.checkpointing_mode import CheckpointingMode
 from pyflink.datastream.data_stream import DataStream, KeyedStream, 
WindowedStream, \
     ConnectedStreams, DataStreamSink, BroadcastStream, BroadcastConnectedStream
@@ -317,6 +318,7 @@ __all__ = [
     'CheckpointingMode',
     'CheckpointConfig',
     'ExternalizedCheckpointCleanup',
+    'ExternalizedCheckpointRetention',
     'StateBackend',
     'HashMapStateBackend',
     'EmbeddedRocksDBStateBackend',
diff --git a/flink-python/pyflink/datastream/checkpoint_config.py 
b/flink-python/pyflink/datastream/checkpoint_config.py
index 6d6406320fc..5034e9d85bd 100644
--- a/flink-python/pyflink/datastream/checkpoint_config.py
+++ b/flink-python/pyflink/datastream/checkpoint_config.py
@@ -20,6 +20,7 @@ from enum import Enum
 from typing import Optional
 
 from pyflink.common import Duration
+from pyflink.datastream.externalized_checkpoint_retention import 
ExternalizedCheckpointRetention
 from pyflink.datastream.checkpoint_storage import CheckpointStorage, 
_from_j_checkpoint_storage
 from pyflink.datastream.checkpointing_mode import CheckpointingMode
 from pyflink.java_gateway import get_gateway
@@ -313,11 +314,49 @@ class CheckpointConfig(object):
                              
:data:`ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION`,
                              
:data:`ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION` or
                              
:data:`ExternalizedCheckpointCleanup.NO_EXTERNALIZED_CHECKPOINTS`
+
+        note:: Deprecated. Use :func:`set_externalized_checkpoint_retention` 
instead.
         """
         self._j_checkpoint_config.setExternalizedCheckpointCleanup(
             
ExternalizedCheckpointCleanup._to_j_externalized_checkpoint_cleanup(cleanup_mode))
         return self
 
+    def set_externalized_checkpoint_retention(
+            self,
+            retention_mode: 'ExternalizedCheckpointRetention') -> 
'CheckpointConfig':
+        """
+        Sets the mode for externalized checkpoint clean-up. Externalized 
checkpoints will be enabled
+        automatically unless the mode is set to
+        :data:`ExternalizedCheckpointRetention.NO_EXTERNALIZED_CHECKPOINTS`.
+
+        Externalized checkpoints write their meta data out to persistent 
storage and are **not**
+        automatically cleaned up when the owning job fails or is suspended 
(terminating with job
+        status ``FAILED`` or ``SUSPENDED``). In this case, you have to 
manually clean up the
+        checkpoint state, both the meta data and actual program state.
+
+        The :class:`ExternalizedCheckpointRetention` mode defines how an 
externalized checkpoint
+        should be cleaned up on job cancellation. If you choose to retain 
externalized checkpoints
+        on cancellation you have to handle checkpoint clean-up manually when 
you cancel the job as
+        well (terminating with job status ``CANCELED``).
+
+        The target directory for externalized checkpoints is configured via
+        
``org.apache.flink.configuration.CheckpointingOptions#CHECKPOINTS_DIRECTORY``.
+
+        Example:
+        ::
+
+            >>> config.set_externalized_checkpoint_retention(
+            ...     ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION)
+
+        :param retention_mode: Externalized checkpoint clean-up behaviour, the 
mode could be
+                             
:data:`ExternalizedCheckpointRetention.DELETE_ON_CANCELLATION`,
+                             
:data:`ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION` or
+                             
:data:`ExternalizedCheckpointRetention.NO_EXTERNALIZED_CHECKPOINTS`
+        """
+        self._j_checkpoint_config.setExternalizedCheckpointRetention(
+            
ExternalizedCheckpointRetention._to_j_externalized_checkpoint_retention(retention_mode))
+        return self
+
     def is_externalized_checkpoints_enabled(self) -> bool:
         """
         Returns whether checkpoints should be persisted externally.
@@ -332,6 +371,8 @@ class CheckpointConfig(object):
 
         :return: The cleanup behaviour for externalized checkpoints or 
``None`` if none is
                  configured.
+
+        note:: Deprecated. Use :func:`get_externalized_checkpoint_retention` 
instead.
         """
         cleanup_mode = 
self._j_checkpoint_config.getExternalizedCheckpointCleanup()
         if cleanup_mode is None:
@@ -340,6 +381,20 @@ class CheckpointConfig(object):
             return 
ExternalizedCheckpointCleanup._from_j_externalized_checkpoint_cleanup(
                 cleanup_mode)
 
+    def get_externalized_checkpoint_retention(self) -> 
Optional['ExternalizedCheckpointRetention']:
+        """
+        Returns the cleanup behaviour for externalized checkpoints.
+
+        :return: The cleanup behaviour for externalized checkpoints or 
``None`` if none is
+                 configured.
+        """
+        retention_mode = 
self._j_checkpoint_config.getExternalizedCheckpointRetention()
+        if retention_mode is None:
+            return None
+        else:
+            return 
ExternalizedCheckpointRetention._from_j_externalized_checkpoint_retention(
+                retention_mode)
+
     def is_unaligned_checkpoints_enabled(self) -> bool:
         """
         Returns whether unaligned checkpoints are enabled.
@@ -516,6 +571,9 @@ class ExternalizedCheckpointCleanup(Enum):
     :data:`NO_EXTERNALIZED_CHECKPOINTS`:
 
     Externalized checkpoints are disabled completely.
+
+    note:: Deprecated.  Please use
+    
pyflink.datastream.externalized_checkpoint_retention.ExternalizedCheckpointRetention
 instead.
     """
 
     DELETE_ON_CANCELLATION = 0
diff --git 
a/flink-python/pyflink/datastream/externalized_checkpoint_retention.py 
b/flink-python/pyflink/datastream/externalized_checkpoint_retention.py
new file mode 100644
index 00000000000..f66aad517c0
--- /dev/null
+++ b/flink-python/pyflink/datastream/externalized_checkpoint_retention.py
@@ -0,0 +1,71 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from enum import Enum
+from pyflink.java_gateway import get_gateway
+__all__ = ['ExternalizedCheckpointRetention']
+
+
+class ExternalizedCheckpointRetention(Enum):
+    """
+    Cleanup behaviour for externalized checkpoints when the job is cancelled.
+
+    :data:`DELETE_ON_CANCELLATION`:
+
+    Delete externalized checkpoints on job cancellation.
+
+    All checkpoint state will be deleted when you cancel the owning
+    job, both the meta data and actual program state. Therefore, you
+    cannot resume from externalized checkpoints after the job has been
+    cancelled.
+
+    Note that checkpoint state is always kept if the job terminates
+    with state ``FAILED``.
+
+    :data:`RETAIN_ON_CANCELLATION`:
+
+    Retain externalized checkpoints on job cancellation.
+
+    All checkpoint state is kept when you cancel the owning job. You
+    have to manually delete both the checkpoint meta data and actual
+    program state after cancelling the job.
+
+    Note that checkpoint state is always kept if the job terminates
+    with state ``FAILED``.
+
+    :data:`NO_EXTERNALIZED_CHECKPOINTS`:
+
+    Externalized checkpoints are disabled completely.
+    """
+
+    DELETE_ON_CANCELLATION = 0
+
+    RETAIN_ON_CANCELLATION = 1
+
+    NO_EXTERNALIZED_CHECKPOINTS = 2
+
+    @staticmethod
+    def _from_j_externalized_checkpoint_retention(j_retention_mode) \
+            -> 'ExternalizedCheckpointRetention':
+        return ExternalizedCheckpointRetention[j_retention_mode.name()]
+
+    def _to_j_externalized_checkpoint_retention(self):
+        gateway = get_gateway()
+        JExternalizedCheckpointRetention = \
+            
gateway.jvm.org.apache.flink.streaming.api.environment.CheckpointConfig \
+            .ExternalizedCheckpointRetention
+        return getattr(JExternalizedCheckpointRetention, self.name)
diff --git a/flink-python/pyflink/datastream/tests/test_check_point_config.py 
b/flink-python/pyflink/datastream/tests/test_check_point_config.py
index b7eba6038b2..115e89ccee6 100644
--- a/flink-python/pyflink/datastream/tests/test_check_point_config.py
+++ b/flink-python/pyflink/datastream/tests/test_check_point_config.py
@@ -16,7 +16,10 @@
 # limitations under the License.
 
################################################################################
 from pyflink.common import Duration
-from pyflink.datastream import (CheckpointConfig, CheckpointingMode, 
ExternalizedCheckpointCleanup,
+from pyflink.datastream import (CheckpointConfig,
+                                CheckpointingMode,
+                                ExternalizedCheckpointCleanup,
+                                ExternalizedCheckpointRetention,
                                 StreamExecutionEnvironment)
 from pyflink.java_gateway import get_gateway
 from pyflink.testing.test_case_utils import PyFlinkTestCase
@@ -138,6 +141,27 @@ class CheckpointConfigTests(PyFlinkTestCase):
         
self.assertEqual(self.checkpoint_config.get_externalized_checkpoint_cleanup(),
                          ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION)
 
+    def test_get_set_externalized_checkpoints_retention(self):
+
+        
self.assertFalse(self.checkpoint_config.is_externalized_checkpoints_enabled())
+
+        
self.assertEqual(self.checkpoint_config.get_externalized_checkpoint_retention(),
+                         
ExternalizedCheckpointRetention.NO_EXTERNALIZED_CHECKPOINTS)
+
+        self.checkpoint_config.enable_externalized_checkpoints(
+            ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION)
+
+        
self.assertTrue(self.checkpoint_config.is_externalized_checkpoints_enabled())
+
+        
self.assertEqual(self.checkpoint_config.get_externalized_checkpoint_retention(),
+                         
ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION)
+
+        self.checkpoint_config.enable_externalized_checkpoints(
+            ExternalizedCheckpointRetention.DELETE_ON_CANCELLATION)
+
+        
self.assertEqual(self.checkpoint_config.get_externalized_checkpoint_retention(),
+                         
ExternalizedCheckpointRetention.DELETE_ON_CANCELLATION)
+
     def test_is_unaligned_checkpointing_enabled(self):
 
         
self.assertFalse(self.checkpoint_config.is_unaligned_checkpoints_enabled())

Reply via email to