This is an automated email from the ASF dual-hosted git repository.

hangxiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f7a2819f482ad788ff9b73cefe8a8d730b71b5a6
Author: Zakelly <zakelly....@gmail.com>
AuthorDate: Tue Jan 23 00:45:10 2024 +0800

    [FLINK-34190][checkpoint] Deprecate RestoreMode#LEGACY
---
 docs/content.zh/docs/ops/state/savepoints.md                       | 6 +++++-
 docs/content/docs/ops/state/savepoints.md                          | 7 ++++++-
 .../shortcodes/generated/savepoint_config_configuration.html       | 2 +-
 .../main/java/org/apache/flink/runtime/jobgraph/RestoreMode.java   | 5 ++++-
 .../org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java | 2 +-
 .../java/org/apache/flink/streaming/runtime/tasks/StreamTask.java  | 7 ++-----
 .../org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java   | 3 +--
 7 files changed, 20 insertions(+), 12 deletions(-)

diff --git a/docs/content.zh/docs/ops/state/savepoints.md 
b/docs/content.zh/docs/ops/state/savepoints.md
index 2294fc97923..7149781c96d 100644
--- a/docs/content.zh/docs/ops/state/savepoints.md
+++ b/docs/content.zh/docs/ops/state/savepoints.md
@@ -223,7 +223,7 @@ $ bin/flink run -s :savepointPath -restoreMode :mode -n 
[:runArgs]
 2. [Native](#savepoint-format) 格式支持增量的 RocksDB savepoints。对于这些 
savepoints,Flink 将所有 SST 存储在 savepoints 目录中。这意味着这些 savepoints 是自包含和目录可移动的。然而,在 
CLAIM 模式下恢复时,后续的 checkpoints 可能会复用一些 SST 文件,这反过来会阻止在 savepoints 被清理时删除 
savepoints 目录。 Flink 之后运行期间可能会删除复用的SST 文件,但不会删除 savepoints 目录。因此,如果在 CLAIM 
模式下恢复,Flink 可能会留下一个空的 savepoints 目录。
 {{< /hint >}}
 
-**LEGACY**
+**LEGACY (已废弃)**
 
 Legacy 模式是 Flink 在 1.15 之前的工作方式。该模式下 Flink 永远不会删除初始恢复的 
checkpoint。同时,用户也不清楚是否可以删除它。导致该的问题原因是, Flink 会在用来恢复的 checkpoint 之上创建增量的 
checkpoint,因此后续的 checkpoint 都有可能会依赖于用于恢复的那个 checkpoint。总而言之,恢复的 checkpoint 
的所有权没有明确的界定。
 
@@ -231,6 +231,10 @@ Legacy 模式是 Flink 在 1.15 之前的工作方式。该模式下 Flink 永
   {{< img src="/fig/restore-mode-legacy.svg" alt="LEGACY restore mode" 
width="70%" >}}
 </div>
 
+{{< hint warning >}}
+**注意:** LEGACY 模式已经被废弃了,在 Flink 2.0 版本将会被移除。请使用 CLAIM 或 NO_CLAIM 模式。
+{{< /hint >}}
+
 ### 删除 Savepoint
 
 ```shell
diff --git a/docs/content/docs/ops/state/savepoints.md 
b/docs/content/docs/ops/state/savepoints.md
index c13cd62e6cc..c5a4d1eee42 100644
--- a/docs/content/docs/ops/state/savepoints.md
+++ b/docs/content/docs/ops/state/savepoints.md
@@ -271,7 +271,7 @@ Please note that, when restored in CLAIM mode, subsequent 
checkpoints might reus
 might delay the deletion the savepoints directory.
 {{< /hint >}}
 
-**LEGACY**
+**LEGACY (deprecated)**
 
 The legacy mode is how Flink worked until 1.15. In this mode Flink will never 
delete the initial
 checkpoint. At the same time, it is not clear if a user can ever delete it as 
well. The problem here,
@@ -282,6 +282,11 @@ subsequent checkpoints depend on the restored checkpoint. 
Overall, the ownership
   {{< img src="/fig/restore-mode-legacy.svg" alt="LEGACY restore mode" 
width="70%" >}}
 </div>
 
+{{< hint warning >}}
+**Attention:** The LEGACY mode is deprecated and will be removed in Flink 2.0. 
Please use CLAIM or
+NO_CLAIM mode instead.
+{{< /hint >}}
+
 
 ### Disposing Savepoints
 
diff --git 
a/docs/layouts/shortcodes/generated/savepoint_config_configuration.html 
b/docs/layouts/shortcodes/generated/savepoint_config_configuration.html
index 13baca7c989..8b1f53e3696 100644
--- a/docs/layouts/shortcodes/generated/savepoint_config_configuration.html
+++ b/docs/layouts/shortcodes/generated/savepoint_config_configuration.html
@@ -12,7 +12,7 @@
             <td><h5>execution.savepoint-restore-mode</h5></td>
             <td style="word-wrap: break-word;">NO_CLAIM</td>
             <td><p>Enum</p></td>
-            <td>Describes the mode how Flink should restore from the given 
savepoint or retained checkpoint.<br /><br />Possible values:<ul><li>"CLAIM": 
Flink will take ownership of the given snapshot. It will clean the snapshot 
once it is subsumed by newer ones.</li><li>"NO_CLAIM": Flink will not claim 
ownership of the snapshot files. However it will make sure it does not depend 
on any artefacts from the restored snapshot. In order to do that, Flink will 
take the first checkpoint as a f [...]
+            <td>Describes the mode how Flink should restore from the given 
savepoint or retained checkpoint.<br /><br />Possible values:<ul><li>"CLAIM": 
Flink will take ownership of the given snapshot. It will clean the snapshot 
once it is subsumed by newer ones.</li><li>"NO_CLAIM": Flink will not claim 
ownership of the snapshot files. However it will make sure it does not depend 
on any artefacts from the restored snapshot. In order to do that, Flink will 
take the first checkpoint as a f [...]
         </tr>
         <tr>
             <td><h5>execution.savepoint.ignore-unclaimed-state</h5></td>
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/RestoreMode.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/RestoreMode.java
index 14bac3c083b..bbee4f0efcd 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/RestoreMode.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/RestoreMode.java
@@ -36,11 +36,14 @@ public enum RestoreMode implements DescribedEnum {
                     + " does not depend on any artefacts from the restored 
snapshot. In order to do that,"
                     + " Flink will take the first checkpoint as a full one, 
which means it might"
                     + " reupload/duplicate files that are part of the restored 
checkpoint."),
+    @Deprecated
     LEGACY(
             "This is the mode in which Flink worked until 1.15. It will not 
claim ownership of the"
                     + " snapshot and will not delete the files. However, it 
can directly depend on"
                     + " the existence of the files of the restored checkpoint. 
It might not be safe"
-                    + " to delete checkpoints that were restored in legacy 
mode ");
+                    + " to delete checkpoints that were restored in legacy 
mode. This mode is"
+                    + " deprecated, please use CLAIM or NO_CLAIM mode to get a 
clear state file"
+                    + " ownership.");
 
     private final String description;
 
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
index 1532e969cdb..3980d79ff8d 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
@@ -271,7 +271,7 @@ public class SourceStreamTask<
             throw new IllegalStateException(
                     "Using externally induced sources, we can not enforce 
taking a full checkpoint."
                             + "If you are restoring from a snapshot in 
NO_CLAIM mode, please use"
-                            + " either CLAIM or LEGACY mode.");
+                            + " CLAIM mode.");
         } else {
             // we do not trigger checkpoints here, we simply state whether we 
can trigger them
             synchronized (lock) {
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index c3f2ed16dd2..a5f26dbc0e5 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -1406,11 +1406,8 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                     String.format(
                             "Configured state backend (%s) does not support 
enforcing a full"
                                     + " snapshot. If you are restoring in %s 
mode, please"
-                                    + " consider choosing either %s or %s 
restore mode.",
-                            stateBackend,
-                            RestoreMode.NO_CLAIM,
-                            RestoreMode.CLAIM,
-                            RestoreMode.LEGACY));
+                                    + " consider choosing %s restore mode.",
+                            stateBackend, RestoreMode.NO_CLAIM, 
RestoreMode.CLAIM));
         } else if (checkpointOptions.getCheckpointType().isSavepoint()) {
             SavepointType savepointType = (SavepointType) 
checkpointOptions.getCheckpointType();
             if 
(!stateBackend.supportsSavepointFormat(savepointType.getFormatType())) {
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index e60adbccdb8..864857f3dd5 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -699,8 +699,7 @@ public class StreamTaskTest {
                     .hasMessage(
                             "Configured state backend 
(OnlyIncrementalStateBackend) does not"
                                     + " support enforcing a full snapshot. If 
you are restoring in"
-                                    + " NO_CLAIM mode, please consider 
choosing either CLAIM or"
-                                    + " LEGACY restore mode.");
+                                    + " NO_CLAIM mode, please consider 
choosing CLAIM restore mode.");
         }
     }
 

Reply via email to