This is an automated email from the ASF dual-hosted git repository.

renqs pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new b10a666e3 [FLINK-34613][cdc] Support recover from a specific savepoint 
file (#2959)
b10a666e3 is described below

commit b10a666e3ebfc1a8fd0ac50167bee5cbf39bf84a
Author: Kunni <lvyanquan....@alibaba-inc.com>
AuthorDate: Tue Apr 9 15:25:42 2024 +0800

    [FLINK-34613][cdc] Support recover from a specific savepoint file (#2959)
---
 .../legacy-flink-cdc-sources/mysql-cdc.md          |  2 +-
 .../legacy-flink-cdc-sources/mysql-cdc.md          |  2 +-
 .../java/org/apache/flink/cdc/cli/CliExecutor.java | 17 ++++++--
 .../java/org/apache/flink/cdc/cli/CliFrontend.java | 36 +++++++++++++++-
 .../apache/flink/cdc/cli/CliFrontendOptions.java   | 36 +++++++++++++++-
 .../flink/cdc/cli/utils/FlinkEnvironmentUtils.java | 13 ++++--
 .../org/apache/flink/cdc/cli/CliFrontendTest.java  | 49 +++++++++++++++++++---
 7 files changed, 137 insertions(+), 18 deletions(-)

diff --git 
a/docs/content.zh/docs/connectors/legacy-flink-cdc-sources/mysql-cdc.md 
b/docs/content.zh/docs/connectors/legacy-flink-cdc-sources/mysql-cdc.md
index 46cc44d1b..ccae86cfa 100644
--- a/docs/content.zh/docs/connectors/legacy-flink-cdc-sources/mysql-cdc.md
+++ b/docs/content.zh/docs/connectors/legacy-flink-cdc-sources/mysql-cdc.md
@@ -717,7 +717,7 @@ _Step 3_: 从 savepoint 还原更新后的 Flink 作业。
 ```shell
 $ ./bin/flink run \
       --detached \ 
-      --fromSavepoint /tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab \
+      --from-savepoint /tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab \
       ./FlinkCDCExample.jar
 ```
 **注意:** 请参考文档 [Restore the job from previous 
savepoint](https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/cli/#command-line-interface)
 了解更多详细信息。
diff --git a/docs/content/docs/connectors/legacy-flink-cdc-sources/mysql-cdc.md 
b/docs/content/docs/connectors/legacy-flink-cdc-sources/mysql-cdc.md
index bc87a43e4..6937742e1 100644
--- a/docs/content/docs/connectors/legacy-flink-cdc-sources/mysql-cdc.md
+++ b/docs/content/docs/connectors/legacy-flink-cdc-sources/mysql-cdc.md
@@ -749,7 +749,7 @@ _Step 3_: Restore the updated Flink job from savepoint.
 ```shell
 $ ./bin/flink run \
       --detached \ 
-      --fromSavepoint /tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab \
+      --from-savepoint /tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab \
       ./FlinkCDCExample.jar
 ```
 **Note:** Please refer the doc [Restore the job from previous 
savepoint](https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/cli/#command-line-interface)
 for more details.
diff --git 
a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliExecutor.java 
b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliExecutor.java
index febe8b331..3e76607b2 100644
--- a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliExecutor.java
+++ b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliExecutor.java
@@ -25,6 +25,7 @@ import 
org.apache.flink.cdc.common.configuration.Configuration;
 import org.apache.flink.cdc.composer.PipelineComposer;
 import org.apache.flink.cdc.composer.PipelineExecution;
 import org.apache.flink.cdc.composer.definition.PipelineDef;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 
 import java.nio.file.Path;
 import java.util.List;
@@ -40,17 +41,21 @@ public class CliExecutor {
 
     private PipelineComposer composer = null;
 
+    private final SavepointRestoreSettings savepointSettings;
+
     public CliExecutor(
             Path pipelineDefPath,
             Configuration flinkConfig,
             Configuration globalPipelineConfig,
             boolean useMiniCluster,
-            List<Path> additionalJars) {
+            List<Path> additionalJars,
+            SavepointRestoreSettings savepointSettings) {
         this.pipelineDefPath = pipelineDefPath;
         this.flinkConfig = flinkConfig;
         this.globalPipelineConfig = globalPipelineConfig;
         this.useMiniCluster = useMiniCluster;
         this.additionalJars = additionalJars;
+        this.savepointSettings = savepointSettings;
     }
 
     public PipelineExecution.ExecutionInfo run() throws Exception {
@@ -60,7 +65,7 @@ public class CliExecutor {
                 pipelineDefinitionParser.parse(pipelineDefPath, 
globalPipelineConfig);
 
         // Create composer
-        PipelineComposer composer = getComposer(flinkConfig);
+        PipelineComposer composer = getComposer();
 
         // Compose pipeline
         PipelineExecution execution = composer.compose(pipelineDef);
@@ -69,10 +74,10 @@ public class CliExecutor {
         return execution.execute();
     }
 
-    private PipelineComposer getComposer(Configuration flinkConfig) {
+    private PipelineComposer getComposer() {
         if (composer == null) {
             return FlinkEnvironmentUtils.createComposer(
-                    useMiniCluster, flinkConfig, additionalJars);
+                    useMiniCluster, flinkConfig, additionalJars, 
savepointSettings);
         }
         return composer;
     }
@@ -96,4 +101,8 @@ public class CliExecutor {
     public List<Path> getAdditionalJars() {
         return additionalJars;
     }
+
+    public SavepointRestoreSettings getSavepointSettings() {
+        return savepointSettings;
+    }
 }
diff --git 
a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliFrontend.java 
b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliFrontend.java
index cd54333d5..663f03942 100644
--- a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliFrontend.java
+++ b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliFrontend.java
@@ -22,6 +22,9 @@ import org.apache.flink.cdc.cli.utils.FlinkEnvironmentUtils;
 import org.apache.flink.cdc.common.annotation.VisibleForTesting;
 import org.apache.flink.cdc.common.configuration.Configuration;
 import org.apache.flink.cdc.composer.PipelineExecution;
+import org.apache.flink.runtime.jobgraph.RestoreMode;
+import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
@@ -40,6 +43,10 @@ import java.util.List;
 import java.util.Optional;
 import java.util.stream.Collectors;
 
+import static 
org.apache.flink.cdc.cli.CliFrontendOptions.SAVEPOINT_ALLOW_NON_RESTORED_OPTION;
+import static org.apache.flink.cdc.cli.CliFrontendOptions.SAVEPOINT_CLAIM_MODE;
+import static 
org.apache.flink.cdc.cli.CliFrontendOptions.SAVEPOINT_PATH_OPTION;
+
 /** The frontend entrypoint for the command-line interface of Flink CDC. */
 public class CliFrontend {
     private static final Logger LOG = 
LoggerFactory.getLogger(CliFrontend.class);
@@ -90,6 +97,9 @@ public class CliFrontend {
         Path flinkHome = getFlinkHome(commandLine);
         Configuration flinkConfig = 
FlinkEnvironmentUtils.loadFlinkConfiguration(flinkHome);
 
+        // Savepoint
+        SavepointRestoreSettings savepointSettings = 
createSavepointRestoreSettings(commandLine);
+
         // Additional JARs
         List<Path> additionalJars =
                 Arrays.stream(
@@ -105,7 +115,31 @@ public class CliFrontend {
                 flinkConfig,
                 globalPipelineConfig,
                 commandLine.hasOption(CliFrontendOptions.USE_MINI_CLUSTER),
-                additionalJars);
+                additionalJars,
+                savepointSettings);
+    }
+
+    private static SavepointRestoreSettings createSavepointRestoreSettings(
+            CommandLine commandLine) {
+        if (commandLine.hasOption(SAVEPOINT_PATH_OPTION.getOpt())) {
+            String savepointPath = 
commandLine.getOptionValue(SAVEPOINT_PATH_OPTION.getOpt());
+            boolean allowNonRestoredState =
+                    
commandLine.hasOption(SAVEPOINT_ALLOW_NON_RESTORED_OPTION.getOpt());
+            final RestoreMode restoreMode;
+            if (commandLine.hasOption(SAVEPOINT_CLAIM_MODE)) {
+                restoreMode =
+                        
org.apache.flink.configuration.ConfigurationUtils.convertValue(
+                                
commandLine.getOptionValue(SAVEPOINT_CLAIM_MODE),
+                                RestoreMode.class);
+            } else {
+                restoreMode = 
SavepointConfigOptions.RESTORE_MODE.defaultValue();
+            }
+            // allowNonRestoredState is always false because all operators are 
predefined.
+            return SavepointRestoreSettings.forPath(
+                    savepointPath, allowNonRestoredState, restoreMode);
+        } else {
+            return SavepointRestoreSettings.none();
+        }
     }
 
     private static Path getFlinkHome(CommandLine commandLine) {
diff --git 
a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliFrontendOptions.java 
b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliFrontendOptions.java
index fd3507d52..320213285 100644
--- 
a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliFrontendOptions.java
+++ 
b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliFrontendOptions.java
@@ -53,12 +53,46 @@ public class CliFrontendOptions {
                     .desc("Use Flink MiniCluster to run the pipeline")
                     .build();
 
+    public static final Option SAVEPOINT_PATH_OPTION =
+            Option.builder("s")
+                    .longOpt("from-savepoint")
+                    .hasArg(true)
+                    .desc(
+                            "Path to a savepoint to restore the job from (for 
example hdfs:///flink/savepoint-1537")
+                    .build();
+
+    public static final Option SAVEPOINT_CLAIM_MODE =
+            Option.builder("cm")
+                    .longOpt("claim-mode")
+                    .hasArg(true)
+                    .desc(
+                            "Defines how should we restore from the given 
savepoint. Supported options: "
+                                    + "[claim - claim ownership of the 
savepoint and delete once it is"
+                                    + " subsumed, no_claim (default) - do not 
claim ownership, the first"
+                                    + " checkpoint will not reuse any files 
from the restored one, legacy "
+                                    + "- the old behaviour, do not assume 
ownership of the savepoint files,"
+                                    + " but can reuse some shared files")
+                    .build();
+
+    public static final Option SAVEPOINT_ALLOW_NON_RESTORED_OPTION =
+            Option.builder("n")
+                    .longOpt("allow-nonRestored-state")
+                    .hasArg(false)
+                    .desc(
+                            "Allow to skip savepoint state that cannot be 
restored. "
+                                    + "You need to allow this if you removed 
an operator from your "
+                                    + "program that was part of the program 
when the savepoint was triggered.")
+                    .build();
+
     public static Options initializeOptions() {
         return new Options()
                 .addOption(HELP)
                 .addOption(JAR)
                 .addOption(FLINK_HOME)
                 .addOption(GLOBAL_CONFIG)
-                .addOption(USE_MINI_CLUSTER);
+                .addOption(USE_MINI_CLUSTER)
+                .addOption(SAVEPOINT_PATH_OPTION)
+                .addOption(SAVEPOINT_CLAIM_MODE)
+                .addOption(SAVEPOINT_ALLOW_NON_RESTORED_OPTION);
     }
 }
diff --git 
a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/utils/FlinkEnvironmentUtils.java
 
b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/utils/FlinkEnvironmentUtils.java
index 15226a18f..4880fa47b 100644
--- 
a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/utils/FlinkEnvironmentUtils.java
+++ 
b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/utils/FlinkEnvironmentUtils.java
@@ -19,6 +19,7 @@ package org.apache.flink.cdc.cli.utils;
 
 import org.apache.flink.cdc.common.configuration.Configuration;
 import org.apache.flink.cdc.composer.flink.FlinkPipelineComposer;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 
 import java.nio.file.Path;
 import java.util.List;
@@ -35,12 +36,16 @@ public class FlinkEnvironmentUtils {
     }
 
     public static FlinkPipelineComposer createComposer(
-            boolean useMiniCluster, Configuration flinkConfig, List<Path> 
additionalJars) {
+            boolean useMiniCluster,
+            Configuration flinkConfig,
+            List<Path> additionalJars,
+            SavepointRestoreSettings savepointSettings) {
         if (useMiniCluster) {
             return FlinkPipelineComposer.ofMiniCluster();
         }
-        return FlinkPipelineComposer.ofRemoteCluster(
-                
org.apache.flink.configuration.Configuration.fromMap(flinkConfig.toMap()),
-                additionalJars);
+        org.apache.flink.configuration.Configuration configuration =
+                
org.apache.flink.configuration.Configuration.fromMap(flinkConfig.toMap());
+        SavepointRestoreSettings.toConfiguration(savepointSettings, 
configuration);
+        return FlinkPipelineComposer.ofRemoteCluster(configuration, 
additionalJars);
     }
 }
diff --git 
a/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/CliFrontendTest.java 
b/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/CliFrontendTest.java
index 1f8a37236..32e250871 100644
--- a/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/CliFrontendTest.java
+++ b/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/CliFrontendTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.cdc.cli;
 import org.apache.flink.cdc.composer.PipelineComposer;
 import org.apache.flink.cdc.composer.PipelineExecution;
 import org.apache.flink.cdc.composer.definition.PipelineDef;
+import org.apache.flink.runtime.jobgraph.RestoreMode;
 
 import org.apache.flink.shaded.guava31.com.google.common.io.Resources;
 
@@ -85,6 +86,25 @@ class CliFrontendTest {
         
assertThat(executor.getGlobalPipelineConfig().toMap().get("foo")).isEqualTo("bar");
     }
 
+    @Test
+    void testSavePointConfiguration() throws Exception {
+        CliExecutor executor =
+                createExecutor(
+                        pipelineDef(),
+                        "--flink-home",
+                        flinkHome(),
+                        "-s",
+                        flinkHome() + "/savepoints/savepoint-1",
+                        "-cm",
+                        "no_claim",
+                        "-n");
+        assertThat(executor.getSavepointSettings().getRestorePath())
+                .isEqualTo(flinkHome() + "/savepoints/savepoint-1");
+        assertThat(executor.getSavepointSettings().getRestoreMode())
+                .isEqualTo(RestoreMode.NO_CLAIM);
+        
assertThat(executor.getSavepointSettings().allowNonRestoredState()).isTrue();
+    }
+
     @Test
     void testAdditionalJar() throws Exception {
         String aJar = "/foo/jar/a.jar";
@@ -134,12 +154,29 @@ class CliFrontendTest {
 
     private static final String HELP_MESSAGE =
             "usage:\n"
-                    + "       --flink-home <arg>      Path of Flink home 
directory\n"
-                    + "       --global-config <arg>   Path of the global 
configuration file for Flink\n"
-                    + "                               CDC pipelines\n"
-                    + "    -h,--help                  Display help message\n"
-                    + "       --jar <arg>             JARs to be submitted 
together with the pipeline\n"
-                    + "       --use-mini-cluster      Use Flink MiniCluster to 
run the pipeline\n";
+                    + "    -cm,--claim-mode <arg>         Defines how should 
we restore from the given\n"
+                    + "                                   savepoint. Supported 
options: [claim - claim\n"
+                    + "                                   ownership of the 
savepoint and delete once it\n"
+                    + "                                   is subsumed, 
no_claim (default) - do not\n"
+                    + "                                   claim ownership, the 
first checkpoint will\n"
+                    + "                                   not reuse any files 
from the restored one,\n"
+                    + "                                   legacy - the old 
behaviour, do not assume\n"
+                    + "                                   ownership of the 
savepoint files, but can\n"
+                    + "                                   reuse some shared 
files\n"
+                    + "       --flink-home <arg>          Path of Flink home 
directory\n"
+                    + "       --global-config <arg>       Path of the global 
configuration file for\n"
+                    + "                                   Flink CDC 
pipelines\n"
+                    + "    -h,--help                      Display help 
message\n"
+                    + "       --jar <arg>                 JARs to be submitted 
together with the\n"
+                    + "                                   pipeline\n"
+                    + "    -n,--allow-nonRestored-state   Allow to skip 
savepoint state that cannot be\n"
+                    + "                                   restored. You need 
to allow this if you\n"
+                    + "                                   removed an operator 
from your program that\n"
+                    + "                                   was part of the 
program when the savepoint\n"
+                    + "                                   was triggered.\n"
+                    + "    -s,--from-savepoint <arg>      Path to a savepoint 
to restore the job from\n"
+                    + "                                   (for example 
hdfs:///flink/savepoint-1537\n"
+                    + "       --use-mini-cluster          Use Flink 
MiniCluster to run the pipeline\n";
 
     private static class NoOpComposer implements PipelineComposer {
 

Reply via email to