This is an automated email from the ASF dual-hosted git repository.

renqs pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new d6b687b61 [FLINK-34853] Submit CDC Job To Flink K8S Native Application 
Mode (#3093)
d6b687b61 is described below

commit d6b687b61dd1701f9c7d2c5bc4446a88e1641d2b
Author: ConradJam <jam.gz...@gmail.com>
AuthorDate: Thu Aug 8 22:35:52 2024 +0800

    [FLINK-34853] Submit CDC Job To Flink K8S Native Application Mode (#3093)
---
 Dockerfile                                         | 35 ++++++++
 flink-cdc-cli/pom.xml                              | 10 ++-
 .../java/org/apache/flink/cdc/cli/CliExecutor.java | 51 ++++++++----
 .../java/org/apache/flink/cdc/cli/CliFrontend.java | 11 +--
 .../apache/flink/cdc/cli/CliFrontendOptions.java   | 12 +++
 .../flink/cdc/cli/utils/ConfigurationUtils.java    | 13 +++
 .../org/apache/flink/cdc/cli/CliFrontendTest.java  | 17 ++++
 flink-cdc-composer/pom.xml                         |  7 +-
 .../cdc/composer/PipelineDeploymentExecutor.java   | 33 ++++++++
 .../flink/deployment/ComposeDeploymentFactory.java | 35 ++++++++
 .../K8SApplicationDeploymentExecutor.java          | 93 ++++++++++++++++++++++
 .../cdc/composer/utils/FactoryDiscoveryUtils.java  |  9 ++-
 12 files changed, 301 insertions(+), 25 deletions(-)

diff --git a/Dockerfile b/Dockerfile
new file mode 100644
index 000000000..d0deb5a29
--- /dev/null
+++ b/Dockerfile
@@ -0,0 +1,35 @@
+#/*
+# * Licensed to the Apache Software Foundation (ASF) under one or more
+# * contributor license agreements.  See the NOTICE file distributed with
+# * this work for additional information regarding copyright ownership.
+# * The ASF licenses this file to You under the Apache License, Version 2.0
+# * (the "License"); you may not use this file except in compliance with
+# * the License.  You may obtain a copy of the License at
+# *
+# *      http://www.apache.org/licenses/LICENSE-2.0
+# *
+# * Unless required by applicable law or agreed to in writing, software
+# * distributed under the License is distributed on an "AS IS" BASIS,
+# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# * See the License for the specific language governing permissions and
+# * limitations under the License.
+# */
+
+FROM flink
+
+ARG FLINK_CDC_VERSION=3.2-SNAPSHOT
+ARG PIPELINE_DEFINITION_FILE
+
+RUN mkdir -p /opt/flink-cdc
+RUN mkdir -p /opt/flink/usrlib
+ENV FLINK_CDC_HOME /opt/flink-cdc
+COPY flink-cdc-dist/target/flink-cdc-${FLINK_CDC_VERSION}-bin.tar.gz /tmp/
+RUN tar -xzvf /tmp/flink-cdc-${FLINK_CDC_VERSION}-bin.tar.gz -C /tmp/ && \
+    mv /tmp/flink-cdc-${FLINK_CDC_VERSION}/* /opt/flink-cdc/ && \
+    mv /opt/flink-cdc/lib/flink-cdc-dist-${FLINK_CDC_VERSION}.jar 
/opt/flink-cdc/lib/flink-cdc-dist.jar && \
+    rm -rf /tmp/flink-cdc-${FLINK_CDC_VERSION} 
/tmp/flink-cdc-${FLINK_CDC_VERSION}-bin.tar.gz
+# copy jars to cdc libs
+COPY 
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/target/flink-cdc-pipeline-connector-values-${FLINK_CDC_VERSION}.jar
 /opt/flink/usrlib/flink-cdc-pipeline-connector-values-${FLINK_CDC_VERSION}.jar
+COPY 
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/target/flink-cdc-pipeline-connector-mysql-${FLINK_CDC_VERSION}.jar
 /opt/flink/usrlib/flink-cdc-pipeline-connector-mysql-${FLINK_CDC_VERSION}.jar
+# copy flink cdc pipeline conf file, Here is an example. Users can replace it 
according to their needs.
+COPY $PIPELINE_DEFINITION_FILE $FLINK_CDC_HOME/conf
diff --git a/flink-cdc-cli/pom.xml b/flink-cdc-cli/pom.xml
index 1aa57f336..0cd2d0de2 100644
--- a/flink-cdc-cli/pom.xml
+++ b/flink-cdc-cli/pom.xml
@@ -28,7 +28,7 @@ limitations under the License.
     <artifactId>flink-cdc-cli</artifactId>
 
     <properties>
-        <commons-cli.version>1.6.0</commons-cli.version>
+        <commons-cli.version>1.7.0</commons-cli.version>
         <snakeyaml.version>2.6</snakeyaml.version>
     </properties>
 
@@ -55,6 +55,14 @@ limitations under the License.
             <artifactId>commons-cli</artifactId>
             <version>${commons-cli.version}</version>
         </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-clients</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
     </dependencies>
 
 </project>
\ No newline at end of file
diff --git 
a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliExecutor.java 
b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliExecutor.java
index 3e76607b2..2452ab59e 100644
--- a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliExecutor.java
+++ b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliExecutor.java
@@ -19,14 +19,19 @@ package org.apache.flink.cdc.cli;
 
 import org.apache.flink.cdc.cli.parser.PipelineDefinitionParser;
 import org.apache.flink.cdc.cli.parser.YamlPipelineDefinitionParser;
+import org.apache.flink.cdc.cli.utils.ConfigurationUtils;
 import org.apache.flink.cdc.cli.utils.FlinkEnvironmentUtils;
 import org.apache.flink.cdc.common.annotation.VisibleForTesting;
 import org.apache.flink.cdc.common.configuration.Configuration;
 import org.apache.flink.cdc.composer.PipelineComposer;
+import org.apache.flink.cdc.composer.PipelineDeploymentExecutor;
 import org.apache.flink.cdc.composer.PipelineExecution;
 import org.apache.flink.cdc.composer.definition.PipelineDef;
+import org.apache.flink.cdc.composer.flink.deployment.ComposeDeploymentFactory;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 
+import org.apache.commons.cli.CommandLine;
+
 import java.nio.file.Path;
 import java.util.List;
 
@@ -39,17 +44,21 @@ public class CliExecutor {
     private final boolean useMiniCluster;
     private final List<Path> additionalJars;
 
+    private final CommandLine commandLine;
+
     private PipelineComposer composer = null;
 
     private final SavepointRestoreSettings savepointSettings;
 
     public CliExecutor(
+            CommandLine commandLine,
             Path pipelineDefPath,
             Configuration flinkConfig,
             Configuration globalPipelineConfig,
             boolean useMiniCluster,
             List<Path> additionalJars,
             SavepointRestoreSettings savepointSettings) {
+        this.commandLine = commandLine;
         this.pipelineDefPath = pipelineDefPath;
         this.flinkConfig = flinkConfig;
         this.globalPipelineConfig = globalPipelineConfig;
@@ -59,22 +68,31 @@ public class CliExecutor {
     }
 
     public PipelineExecution.ExecutionInfo run() throws Exception {
-        // Parse pipeline definition file
-        PipelineDefinitionParser pipelineDefinitionParser = new 
YamlPipelineDefinitionParser();
-        PipelineDef pipelineDef =
-                pipelineDefinitionParser.parse(pipelineDefPath, 
globalPipelineConfig);
-
-        // Create composer
-        PipelineComposer composer = getComposer();
-
-        // Compose pipeline
-        PipelineExecution execution = composer.compose(pipelineDef);
-
-        // Execute the pipeline
-        return execution.execute();
+        // Create Submit Executor to deployment flink cdc job Or Run Flink CDC 
Job
+        boolean isDeploymentMode = 
ConfigurationUtils.isDeploymentMode(commandLine);
+        if (isDeploymentMode) {
+            ComposeDeploymentFactory composeDeploymentFactory = new 
ComposeDeploymentFactory();
+            PipelineDeploymentExecutor composeExecutor =
+                    
composeDeploymentFactory.getFlinkComposeExecutor(commandLine);
+            return composeExecutor.deploy(
+                    commandLine,
+                    
org.apache.flink.configuration.Configuration.fromMap(flinkConfig.toMap()),
+                    additionalJars);
+        } else {
+            // Run CDC Job And Parse pipeline definition file
+            PipelineDefinitionParser pipelineDefinitionParser = new 
YamlPipelineDefinitionParser();
+            PipelineDef pipelineDef =
+                    pipelineDefinitionParser.parse(pipelineDefPath, 
globalPipelineConfig);
+            // Create composer
+            PipelineComposer composer = getComposer();
+            // Compose pipeline
+            PipelineExecution execution = composer.compose(pipelineDef);
+            // Execute or submit the pipeline
+            return execution.execute();
+        }
     }
 
-    private PipelineComposer getComposer() {
+    private PipelineComposer getComposer() throws Exception {
         if (composer == null) {
             return FlinkEnvironmentUtils.createComposer(
                     useMiniCluster, flinkConfig, additionalJars, 
savepointSettings);
@@ -102,6 +120,11 @@ public class CliExecutor {
         return additionalJars;
     }
 
+    @VisibleForTesting
+    public String getDeploymentTarget() {
+        return commandLine.getOptionValue("target");
+    }
+
     public SavepointRestoreSettings getSavepointSettings() {
         return savepointSettings;
     }
diff --git 
a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliFrontend.java 
b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliFrontend.java
index c67c3cf99..ac746e224 100644
--- a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliFrontend.java
+++ b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliFrontend.java
@@ -34,8 +34,6 @@ import org.apache.commons.cli.Options;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.FileNotFoundException;
-import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.Arrays;
@@ -83,13 +81,9 @@ public class CliFrontend {
                     "Missing pipeline definition file path in arguments. ");
         }
 
-        // Take the first unparsed argument as the pipeline definition file
         Path pipelineDefPath = Paths.get(unparsedArgs.get(0));
-        if (!Files.exists(pipelineDefPath)) {
-            throw new FileNotFoundException(
-                    String.format("Cannot find pipeline definition file 
\"%s\"", pipelineDefPath));
-        }
-
+        // Take the first unparsed argument as the pipeline definition file
+        LOG.info("Real Path pipelineDefPath {}", pipelineDefPath);
         // Global pipeline configuration
         Configuration globalPipelineConfig = getGlobalConfig(commandLine);
 
@@ -111,6 +105,7 @@ public class CliFrontend {
 
         // Build executor
         return new CliExecutor(
+                commandLine,
                 pipelineDefPath,
                 flinkConfig,
                 globalPipelineConfig,
diff --git 
a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliFrontendOptions.java 
b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliFrontendOptions.java
index 320213285..adf39a40b 100644
--- 
a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliFrontendOptions.java
+++ 
b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliFrontendOptions.java
@@ -46,6 +46,16 @@ public class CliFrontendOptions {
                     .desc("JARs to be submitted together with the pipeline")
                     .build();
 
+    public static final Option TARGET =
+            Option.builder("t")
+                    .longOpt("target")
+                    .hasArg()
+                    .desc(
+                            "The deployment target for the execution. This can 
take one of the following values "
+                                    + 
"local/remote/yarn-session/yarn-application/kubernetes-session/kubernetes"
+                                    + "-application")
+                    .build();
+
     public static final Option USE_MINI_CLUSTER =
             Option.builder()
                     .longOpt("use-mini-cluster")
@@ -91,6 +101,8 @@ public class CliFrontendOptions {
                 .addOption(FLINK_HOME)
                 .addOption(GLOBAL_CONFIG)
                 .addOption(USE_MINI_CLUSTER)
+                .addOption(TARGET)
+                .addOption(USE_MINI_CLUSTER)
                 .addOption(SAVEPOINT_PATH_OPTION)
                 .addOption(SAVEPOINT_CLAIM_MODE)
                 .addOption(SAVEPOINT_ALLOW_NON_RESTORED_OPTION);
diff --git 
a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/utils/ConfigurationUtils.java
 
b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/utils/ConfigurationUtils.java
index 8bc4ba628..6d2100aa5 100644
--- 
a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/utils/ConfigurationUtils.java
+++ 
b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/utils/ConfigurationUtils.java
@@ -18,12 +18,18 @@
 package org.apache.flink.cdc.cli.utils;
 
 import org.apache.flink.cdc.common.configuration.Configuration;
+import org.apache.flink.client.deployment.executors.LocalExecutor;
+import org.apache.flink.client.deployment.executors.RemoteExecutor;
+
+import org.apache.commons.cli.CommandLine;
 
 import java.nio.file.Path;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import static org.apache.flink.cdc.cli.CliFrontendOptions.TARGET;
+
 /** Utilities for handling {@link Configuration}. */
 public class ConfigurationUtils {
 
@@ -62,4 +68,11 @@ public class ConfigurationUtils {
 
         return flattenedMap;
     }
+
+    public static boolean isDeploymentMode(CommandLine commandLine) {
+        String target = commandLine.getOptionValue(TARGET);
+        return target != null
+                && !target.equalsIgnoreCase(LocalExecutor.NAME)
+                && !target.equalsIgnoreCase(RemoteExecutor.NAME);
+    }
 }
diff --git 
a/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/CliFrontendTest.java 
b/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/CliFrontendTest.java
index 83ccb1cbe..501b28840 100644
--- a/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/CliFrontendTest.java
+++ b/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/CliFrontendTest.java
@@ -106,6 +106,19 @@ class CliFrontendTest {
         
assertThat(executor.getSavepointSettings().allowNonRestoredState()).isTrue();
     }
 
+    @Test
+    void testDeploymentTargetConfiguration() throws Exception {
+        CliExecutor executor =
+                createExecutor(
+                        pipelineDef(),
+                        "--flink-home",
+                        flinkHome(),
+                        "-t",
+                        "kubernetes-application",
+                        "-n");
+        
assertThat(executor.getDeploymentTarget()).isEqualTo("kubernetes-application");
+    }
+
     @Test
     void testAdditionalJar() throws Exception {
         String aJar = "/foo/jar/a.jar";
@@ -177,6 +190,10 @@ class CliFrontendTest {
                     + "                                   was triggered.\n"
                     + "    -s,--from-savepoint <arg>      Path to a savepoint 
to restore the job from\n"
                     + "                                   (for example 
hdfs:///flink/savepoint-1537\n"
+                    + "    -t,--target <arg>              The deployment 
target for the execution. This\n"
+                    + "                                   can take one of the 
following values\n"
+                    + "                                   
local/remote/yarn-session/yarn-application/ku\n"
+                    + "                                   
bernetes-session/kubernetes-application\n"
                     + "       --use-mini-cluster          Use Flink 
MiniCluster to run the pipeline\n";
 
     private static class NoOpComposer implements PipelineComposer {
diff --git a/flink-cdc-composer/pom.xml b/flink-cdc-composer/pom.xml
index 12471f032..f9f57fd3c 100644
--- a/flink-cdc-composer/pom.xml
+++ b/flink-cdc-composer/pom.xml
@@ -48,7 +48,7 @@ limitations under the License.
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-clients</artifactId>
             <version>${flink.version}</version>
-            <scope>test</scope>
+            <scope>provided</scope>
         </dependency>
         <dependency>
             <groupId>org.apache.flink</groupId>
@@ -56,6 +56,11 @@ limitations under the License.
             <version>${flink.version}</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-kubernetes</artifactId>
+            <version>${flink.version}</version>
+        </dependency>
     </dependencies>
 
 </project>
\ No newline at end of file
diff --git 
a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/PipelineDeploymentExecutor.java
 
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/PipelineDeploymentExecutor.java
new file mode 100644
index 000000000..37d573e6b
--- /dev/null
+++ 
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/PipelineDeploymentExecutor.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.composer;
+
+import org.apache.flink.configuration.Configuration;
+
+import org.apache.commons.cli.CommandLine;
+
+import java.nio.file.Path;
+import java.util.List;
+
+/** PipelineDeploymentExecutor to execute flink cdc job from different target. 
*/
+public interface PipelineDeploymentExecutor {
+
+    PipelineExecution.ExecutionInfo deploy(
+            CommandLine commandLine, Configuration flinkConfig, List<Path> 
additionalJars)
+            throws Exception;
+}
diff --git 
a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/deployment/ComposeDeploymentFactory.java
 
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/deployment/ComposeDeploymentFactory.java
new file mode 100644
index 000000000..27a005721
--- /dev/null
+++ 
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/deployment/ComposeDeploymentFactory.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.composer.flink.deployment;
+
+import org.apache.flink.cdc.composer.PipelineDeploymentExecutor;
+
+import org.apache.commons.cli.CommandLine;
+
+/** Create deployment methods corresponding to different goals. */
+public class ComposeDeploymentFactory {
+
+    public PipelineDeploymentExecutor getFlinkComposeExecutor(CommandLine 
commandLine) {
+        String target = commandLine.getOptionValue("target");
+        if (target.equalsIgnoreCase("kubernetes-application")) {
+            return new K8SApplicationDeploymentExecutor();
+        }
+        throw new IllegalArgumentException(
+                String.format("Deployment target %s is not supported", 
target));
+    }
+}
diff --git 
a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/deployment/K8SApplicationDeploymentExecutor.java
 
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/deployment/K8SApplicationDeploymentExecutor.java
new file mode 100644
index 000000000..19fcdb262
--- /dev/null
+++ 
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/deployment/K8SApplicationDeploymentExecutor.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.composer.flink.deployment;
+
+import org.apache.flink.cdc.composer.PipelineDeploymentExecutor;
+import org.apache.flink.cdc.composer.PipelineExecution;
+import org.apache.flink.client.deployment.ClusterSpecification;
+import org.apache.flink.client.deployment.application.ApplicationConfiguration;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.ClusterClientProvider;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DeploymentOptions;
+import org.apache.flink.configuration.PipelineOptions;
+import org.apache.flink.kubernetes.KubernetesClusterClientFactory;
+import org.apache.flink.kubernetes.KubernetesClusterDescriptor;
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+import org.apache.flink.kubernetes.configuration.KubernetesDeploymentTarget;
+
+import org.apache.commons.cli.CommandLine;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+
+/** deploy flink cdc job by native k8s application mode. */
+public class K8SApplicationDeploymentExecutor implements 
PipelineDeploymentExecutor {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(K8SApplicationDeploymentExecutor.class);
+
+    @Override
+    public PipelineExecution.ExecutionInfo deploy(
+            CommandLine commandLine, Configuration flinkConfig, List<Path> 
additionalJars) {
+        LOG.info("Submitting application in 'Flink K8S Application Mode'.");
+        flinkConfig.set(DeploymentOptions.TARGET, 
KubernetesDeploymentTarget.APPLICATION.getName());
+        List<String> jars = new ArrayList<>();
+        if (flinkConfig.get(PipelineOptions.JARS) == null) {
+            // must be added cdc dist jar by default docker container path
+            jars.add("local:///opt/flink-cdc/lib/flink-cdc-dist.jar");
+            flinkConfig.set(PipelineOptions.JARS, jars);
+        }
+        // set the default cdc latest docker image
+        flinkConfig.set(KubernetesConfigOptions.CONTAINER_IMAGE, 
"flink/flink-cdc:latest");
+        flinkConfig.set(ApplicationConfiguration.APPLICATION_ARGS, 
commandLine.getArgList());
+        flinkConfig.set(
+                ApplicationConfiguration.APPLICATION_MAIN_CLASS,
+                "org.apache.flink.cdc.cli.CliFrontend");
+        KubernetesClusterClientFactory kubernetesClusterClientFactory =
+                new KubernetesClusterClientFactory();
+        KubernetesClusterDescriptor descriptor =
+                
kubernetesClusterClientFactory.createClusterDescriptor(flinkConfig);
+        ClusterSpecification specification =
+                
kubernetesClusterClientFactory.getClusterSpecification(flinkConfig);
+        ApplicationConfiguration applicationConfiguration =
+                ApplicationConfiguration.fromConfiguration(flinkConfig);
+        ClusterClient<String> client = null;
+        try {
+            ClusterClientProvider<String> clusterClientProvider =
+                    descriptor.deployApplicationCluster(specification, 
applicationConfiguration);
+            client = clusterClientProvider.getClusterClient();
+            String clusterId = client.getClusterId();
+            LOG.info("Deployment Flink CDC From Cluster ID {}", clusterId);
+            return new PipelineExecution.ExecutionInfo(clusterId, "submit job 
successful");
+        } catch (Exception e) {
+            if (client != null) {
+                client.shutDownCluster();
+            }
+            throw new RuntimeException("Failed to deploy Flink CDC job", e);
+        } finally {
+            descriptor.close();
+            if (client != null) {
+                client.close();
+            }
+        }
+    }
+}
diff --git 
a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/utils/FactoryDiscoveryUtils.java
 
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/utils/FactoryDiscoveryUtils.java
index 2eb79aead..4f7649d9e 100644
--- 
a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/utils/FactoryDiscoveryUtils.java
+++ 
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/utils/FactoryDiscoveryUtils.java
@@ -93,6 +93,12 @@ public class FactoryDiscoveryUtils {
         try {
             T factory = getFactoryByIdentifier(identifier, factoryClass);
             URL url = 
factory.getClass().getProtectionDomain().getCodeSource().getLocation();
+            String urlString = url.toString();
+            if (urlString.contains("usrlib")) {
+                String flinkHome = System.getenv("FLINK_HOME");
+                urlString = urlString.replace("usrlib", flinkHome + "/usrlib");
+            }
+            url = new URL(urlString);
             if (Files.isDirectory(Paths.get(url.toURI()))) {
                 LOG.warn(
                         "The factory class \"{}\" is contained by directory 
\"{}\" instead of JAR. "
@@ -104,7 +110,8 @@ public class FactoryDiscoveryUtils {
             return Optional.of(url);
         } catch (Exception e) {
             throw new RuntimeException(
-                    String.format("Failed to search JAR by factory identifier 
\"%s\"", identifier));
+                    String.format("Failed to search JAR by factory identifier 
\"%s\"", identifier),
+                    e);
         }
     }
 }

Reply via email to