This is an automated email from the ASF dual-hosted git repository. renqs pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push: new d6b687b61 [FLINK-34853] Submit CDC Job To Flink K8S Native Application Mode (#3093) d6b687b61 is described below commit d6b687b61dd1701f9c7d2c5bc4446a88e1641d2b Author: ConradJam <jam.gz...@gmail.com> AuthorDate: Thu Aug 8 22:35:52 2024 +0800 [FLINK-34853] Submit CDC Job To Flink K8S Native Application Mode (#3093) --- Dockerfile | 35 ++++++++ flink-cdc-cli/pom.xml | 10 ++- .../java/org/apache/flink/cdc/cli/CliExecutor.java | 51 ++++++++---- .../java/org/apache/flink/cdc/cli/CliFrontend.java | 11 +-- .../apache/flink/cdc/cli/CliFrontendOptions.java | 12 +++ .../flink/cdc/cli/utils/ConfigurationUtils.java | 13 +++ .../org/apache/flink/cdc/cli/CliFrontendTest.java | 17 ++++ flink-cdc-composer/pom.xml | 7 +- .../cdc/composer/PipelineDeploymentExecutor.java | 33 ++++++++ .../flink/deployment/ComposeDeploymentFactory.java | 35 ++++++++ .../K8SApplicationDeploymentExecutor.java | 93 ++++++++++++++++++++++ .../cdc/composer/utils/FactoryDiscoveryUtils.java | 9 ++- 12 files changed, 301 insertions(+), 25 deletions(-) diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 000000000..d0deb5a29 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,35 @@ +#/* +# * Licensed to the Apache Software Foundation (ASF) under one or more +# * contributor license agreements. See the NOTICE file distributed with +# * this work for additional information regarding copyright ownership. +# * The ASF licenses this file to You under the Apache License, Version 2.0 +# * (the "License"); you may not use this file except in compliance with +# * the License. You may obtain a copy of the License at +# * +# * http://www.apache.org/licenses/LICENSE-2.0 +# * +# * Unless required by applicable law or agreed to in writing, software +# * distributed under the License is distributed on an "AS IS" BASIS, +# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# * See the License for the specific language governing permissions and +# * limitations under the License. +# */ + +FROM flink + +ARG FLINK_CDC_VERSION=3.2-SNAPSHOT +ARG PIPELINE_DEFINITION_FILE + +RUN mkdir -p /opt/flink-cdc +RUN mkdir -p /opt/flink/usrlib +ENV FLINK_CDC_HOME /opt/flink-cdc +COPY flink-cdc-dist/target/flink-cdc-${FLINK_CDC_VERSION}-bin.tar.gz /tmp/ +RUN tar -xzvf /tmp/flink-cdc-${FLINK_CDC_VERSION}-bin.tar.gz -C /tmp/ && \ + mv /tmp/flink-cdc-${FLINK_CDC_VERSION}/* /opt/flink-cdc/ && \ + mv /opt/flink-cdc/lib/flink-cdc-dist-${FLINK_CDC_VERSION}.jar /opt/flink-cdc/lib/flink-cdc-dist.jar && \ + rm -rf /tmp/flink-cdc-${FLINK_CDC_VERSION} /tmp/flink-cdc-${FLINK_CDC_VERSION}-bin.tar.gz +# copy jars to cdc libs +COPY flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/target/flink-cdc-pipeline-connector-values-${FLINK_CDC_VERSION}.jar /opt/flink/usrlib/flink-cdc-pipeline-connector-values-${FLINK_CDC_VERSION}.jar +COPY flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/target/flink-cdc-pipeline-connector-mysql-${FLINK_CDC_VERSION}.jar /opt/flink/usrlib/flink-cdc-pipeline-connector-mysql-${FLINK_CDC_VERSION}.jar +# copy flink cdc pipeline conf file, Here is an example. Users can replace it according to their needs. +COPY $PIPELINE_DEFINITION_FILE $FLINK_CDC_HOME/conf diff --git a/flink-cdc-cli/pom.xml b/flink-cdc-cli/pom.xml index 1aa57f336..0cd2d0de2 100644 --- a/flink-cdc-cli/pom.xml +++ b/flink-cdc-cli/pom.xml @@ -28,7 +28,7 @@ limitations under the License. <artifactId>flink-cdc-cli</artifactId> <properties> - <commons-cli.version>1.6.0</commons-cli.version> + <commons-cli.version>1.7.0</commons-cli.version> <snakeyaml.version>2.6</snakeyaml.version> </properties> @@ -55,6 +55,14 @@ limitations under the License. <artifactId>commons-cli</artifactId> <version>${commons-cli.version}</version> </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-clients</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + </dependencies> </project> \ No newline at end of file diff --git a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliExecutor.java b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliExecutor.java index 3e76607b2..2452ab59e 100644 --- a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliExecutor.java +++ b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliExecutor.java @@ -19,14 +19,19 @@ package org.apache.flink.cdc.cli; import org.apache.flink.cdc.cli.parser.PipelineDefinitionParser; import org.apache.flink.cdc.cli.parser.YamlPipelineDefinitionParser; +import org.apache.flink.cdc.cli.utils.ConfigurationUtils; import org.apache.flink.cdc.cli.utils.FlinkEnvironmentUtils; import org.apache.flink.cdc.common.annotation.VisibleForTesting; import org.apache.flink.cdc.common.configuration.Configuration; import org.apache.flink.cdc.composer.PipelineComposer; +import org.apache.flink.cdc.composer.PipelineDeploymentExecutor; import org.apache.flink.cdc.composer.PipelineExecution; import org.apache.flink.cdc.composer.definition.PipelineDef; +import org.apache.flink.cdc.composer.flink.deployment.ComposeDeploymentFactory; import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; +import org.apache.commons.cli.CommandLine; + import java.nio.file.Path; import java.util.List; @@ -39,17 +44,21 @@ public class CliExecutor { private final boolean useMiniCluster; private final List<Path> additionalJars; + private final CommandLine commandLine; + private PipelineComposer composer = null; private final SavepointRestoreSettings savepointSettings; public CliExecutor( + CommandLine commandLine, Path pipelineDefPath, Configuration flinkConfig, Configuration globalPipelineConfig, boolean useMiniCluster, List<Path> additionalJars, SavepointRestoreSettings savepointSettings) { + this.commandLine = commandLine; this.pipelineDefPath = pipelineDefPath; this.flinkConfig = flinkConfig; this.globalPipelineConfig = globalPipelineConfig; @@ -59,22 +68,31 @@ public class CliExecutor { } public PipelineExecution.ExecutionInfo run() throws Exception { - // Parse pipeline definition file - PipelineDefinitionParser pipelineDefinitionParser = new YamlPipelineDefinitionParser(); - PipelineDef pipelineDef = - pipelineDefinitionParser.parse(pipelineDefPath, globalPipelineConfig); - - // Create composer - PipelineComposer composer = getComposer(); - - // Compose pipeline - PipelineExecution execution = composer.compose(pipelineDef); - - // Execute the pipeline - return execution.execute(); + // Create Submit Executor to deployment flink cdc job Or Run Flink CDC Job + boolean isDeploymentMode = ConfigurationUtils.isDeploymentMode(commandLine); + if (isDeploymentMode) { + ComposeDeploymentFactory composeDeploymentFactory = new ComposeDeploymentFactory(); + PipelineDeploymentExecutor composeExecutor = + composeDeploymentFactory.getFlinkComposeExecutor(commandLine); + return composeExecutor.deploy( + commandLine, + org.apache.flink.configuration.Configuration.fromMap(flinkConfig.toMap()), + additionalJars); + } else { + // Run CDC Job And Parse pipeline definition file + PipelineDefinitionParser pipelineDefinitionParser = new YamlPipelineDefinitionParser(); + PipelineDef pipelineDef = + pipelineDefinitionParser.parse(pipelineDefPath, globalPipelineConfig); + // Create composer + PipelineComposer composer = getComposer(); + // Compose pipeline + PipelineExecution execution = composer.compose(pipelineDef); + // Execute or submit the pipeline + return execution.execute(); + } } - private PipelineComposer getComposer() { + private PipelineComposer getComposer() throws Exception { if (composer == null) { return FlinkEnvironmentUtils.createComposer( useMiniCluster, flinkConfig, additionalJars, savepointSettings); @@ -102,6 +120,11 @@ public class CliExecutor { return additionalJars; } + @VisibleForTesting + public String getDeploymentTarget() { + return commandLine.getOptionValue("target"); + } + public SavepointRestoreSettings getSavepointSettings() { return savepointSettings; } diff --git a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliFrontend.java b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliFrontend.java index c67c3cf99..ac746e224 100644 --- a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliFrontend.java +++ b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliFrontend.java @@ -34,8 +34,6 @@ import org.apache.commons.cli.Options; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.FileNotFoundException; -import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.util.Arrays; @@ -83,13 +81,9 @@ public class CliFrontend { "Missing pipeline definition file path in arguments. "); } - // Take the first unparsed argument as the pipeline definition file Path pipelineDefPath = Paths.get(unparsedArgs.get(0)); - if (!Files.exists(pipelineDefPath)) { - throw new FileNotFoundException( - String.format("Cannot find pipeline definition file \"%s\"", pipelineDefPath)); - } - + // Take the first unparsed argument as the pipeline definition file + LOG.info("Real Path pipelineDefPath {}", pipelineDefPath); // Global pipeline configuration Configuration globalPipelineConfig = getGlobalConfig(commandLine); @@ -111,6 +105,7 @@ public class CliFrontend { // Build executor return new CliExecutor( + commandLine, pipelineDefPath, flinkConfig, globalPipelineConfig, diff --git a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliFrontendOptions.java b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliFrontendOptions.java index 320213285..adf39a40b 100644 --- a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliFrontendOptions.java +++ b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliFrontendOptions.java @@ -46,6 +46,16 @@ public class CliFrontendOptions { .desc("JARs to be submitted together with the pipeline") .build(); + public static final Option TARGET = + Option.builder("t") + .longOpt("target") + .hasArg() + .desc( + "The deployment target for the execution. This can take one of the following values " + + "local/remote/yarn-session/yarn-application/kubernetes-session/kubernetes" + + "-application") + .build(); + public static final Option USE_MINI_CLUSTER = Option.builder() .longOpt("use-mini-cluster") @@ -91,6 +101,8 @@ public class CliFrontendOptions { .addOption(FLINK_HOME) .addOption(GLOBAL_CONFIG) .addOption(USE_MINI_CLUSTER) + .addOption(TARGET) + .addOption(USE_MINI_CLUSTER) .addOption(SAVEPOINT_PATH_OPTION) .addOption(SAVEPOINT_CLAIM_MODE) .addOption(SAVEPOINT_ALLOW_NON_RESTORED_OPTION); diff --git a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/utils/ConfigurationUtils.java b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/utils/ConfigurationUtils.java index 8bc4ba628..6d2100aa5 100644 --- a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/utils/ConfigurationUtils.java +++ b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/utils/ConfigurationUtils.java @@ -18,12 +18,18 @@ package org.apache.flink.cdc.cli.utils; import org.apache.flink.cdc.common.configuration.Configuration; +import org.apache.flink.client.deployment.executors.LocalExecutor; +import org.apache.flink.client.deployment.executors.RemoteExecutor; + +import org.apache.commons.cli.CommandLine; import java.nio.file.Path; import java.util.HashMap; import java.util.List; import java.util.Map; +import static org.apache.flink.cdc.cli.CliFrontendOptions.TARGET; + /** Utilities for handling {@link Configuration}. */ public class ConfigurationUtils { @@ -62,4 +68,11 @@ public class ConfigurationUtils { return flattenedMap; } + + public static boolean isDeploymentMode(CommandLine commandLine) { + String target = commandLine.getOptionValue(TARGET); + return target != null + && !target.equalsIgnoreCase(LocalExecutor.NAME) + && !target.equalsIgnoreCase(RemoteExecutor.NAME); + } } diff --git a/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/CliFrontendTest.java b/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/CliFrontendTest.java index 83ccb1cbe..501b28840 100644 --- a/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/CliFrontendTest.java +++ b/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/CliFrontendTest.java @@ -106,6 +106,19 @@ class CliFrontendTest { assertThat(executor.getSavepointSettings().allowNonRestoredState()).isTrue(); } + @Test + void testDeploymentTargetConfiguration() throws Exception { + CliExecutor executor = + createExecutor( + pipelineDef(), + "--flink-home", + flinkHome(), + "-t", + "kubernetes-application", + "-n"); + assertThat(executor.getDeploymentTarget()).isEqualTo("kubernetes-application"); + } + @Test void testAdditionalJar() throws Exception { String aJar = "/foo/jar/a.jar"; @@ -177,6 +190,10 @@ class CliFrontendTest { + " was triggered.\n" + " -s,--from-savepoint <arg> Path to a savepoint to restore the job from\n" + " (for example hdfs:///flink/savepoint-1537\n" + + " -t,--target <arg> The deployment target for the execution. This\n" + + " can take one of the following values\n" + + " local/remote/yarn-session/yarn-application/ku\n" + + " bernetes-session/kubernetes-application\n" + " --use-mini-cluster Use Flink MiniCluster to run the pipeline\n"; private static class NoOpComposer implements PipelineComposer { diff --git a/flink-cdc-composer/pom.xml b/flink-cdc-composer/pom.xml index 12471f032..f9f57fd3c 100644 --- a/flink-cdc-composer/pom.xml +++ b/flink-cdc-composer/pom.xml @@ -48,7 +48,7 @@ limitations under the License. <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>${flink.version}</version> - <scope>test</scope> + <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> @@ -56,6 +56,11 @@ limitations under the License. <version>${flink.version}</version> <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-kubernetes</artifactId> + <version>${flink.version}</version> + </dependency> </dependencies> </project> \ No newline at end of file diff --git a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/PipelineDeploymentExecutor.java b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/PipelineDeploymentExecutor.java new file mode 100644 index 000000000..37d573e6b --- /dev/null +++ b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/PipelineDeploymentExecutor.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.composer; + +import org.apache.flink.configuration.Configuration; + +import org.apache.commons.cli.CommandLine; + +import java.nio.file.Path; +import java.util.List; + +/** PipelineDeploymentExecutor to execute flink cdc job from different target. */ +public interface PipelineDeploymentExecutor { + + PipelineExecution.ExecutionInfo deploy( + CommandLine commandLine, Configuration flinkConfig, List<Path> additionalJars) + throws Exception; +} diff --git a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/deployment/ComposeDeploymentFactory.java b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/deployment/ComposeDeploymentFactory.java new file mode 100644 index 000000000..27a005721 --- /dev/null +++ b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/deployment/ComposeDeploymentFactory.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.composer.flink.deployment; + +import org.apache.flink.cdc.composer.PipelineDeploymentExecutor; + +import org.apache.commons.cli.CommandLine; + +/** Create deployment methods corresponding to different goals. */ +public class ComposeDeploymentFactory { + + public PipelineDeploymentExecutor getFlinkComposeExecutor(CommandLine commandLine) { + String target = commandLine.getOptionValue("target"); + if (target.equalsIgnoreCase("kubernetes-application")) { + return new K8SApplicationDeploymentExecutor(); + } + throw new IllegalArgumentException( + String.format("Deployment target %s is not supported", target)); + } +} diff --git a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/deployment/K8SApplicationDeploymentExecutor.java b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/deployment/K8SApplicationDeploymentExecutor.java new file mode 100644 index 000000000..19fcdb262 --- /dev/null +++ b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/deployment/K8SApplicationDeploymentExecutor.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.composer.flink.deployment; + +import org.apache.flink.cdc.composer.PipelineDeploymentExecutor; +import org.apache.flink.cdc.composer.PipelineExecution; +import org.apache.flink.client.deployment.ClusterSpecification; +import org.apache.flink.client.deployment.application.ApplicationConfiguration; +import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.client.program.ClusterClientProvider; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.DeploymentOptions; +import org.apache.flink.configuration.PipelineOptions; +import org.apache.flink.kubernetes.KubernetesClusterClientFactory; +import org.apache.flink.kubernetes.KubernetesClusterDescriptor; +import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; +import org.apache.flink.kubernetes.configuration.KubernetesDeploymentTarget; + +import org.apache.commons.cli.CommandLine; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; + +/** deploy flink cdc job by native k8s application mode. */ +public class K8SApplicationDeploymentExecutor implements PipelineDeploymentExecutor { + + private static final Logger LOG = + LoggerFactory.getLogger(K8SApplicationDeploymentExecutor.class); + + @Override + public PipelineExecution.ExecutionInfo deploy( + CommandLine commandLine, Configuration flinkConfig, List<Path> additionalJars) { + LOG.info("Submitting application in 'Flink K8S Application Mode'."); + flinkConfig.set(DeploymentOptions.TARGET, KubernetesDeploymentTarget.APPLICATION.getName()); + List<String> jars = new ArrayList<>(); + if (flinkConfig.get(PipelineOptions.JARS) == null) { + // must be added cdc dist jar by default docker container path + jars.add("local:///opt/flink-cdc/lib/flink-cdc-dist.jar"); + flinkConfig.set(PipelineOptions.JARS, jars); + } + // set the default cdc latest docker image + flinkConfig.set(KubernetesConfigOptions.CONTAINER_IMAGE, "flink/flink-cdc:latest"); + flinkConfig.set(ApplicationConfiguration.APPLICATION_ARGS, commandLine.getArgList()); + flinkConfig.set( + ApplicationConfiguration.APPLICATION_MAIN_CLASS, + "org.apache.flink.cdc.cli.CliFrontend"); + KubernetesClusterClientFactory kubernetesClusterClientFactory = + new KubernetesClusterClientFactory(); + KubernetesClusterDescriptor descriptor = + kubernetesClusterClientFactory.createClusterDescriptor(flinkConfig); + ClusterSpecification specification = + kubernetesClusterClientFactory.getClusterSpecification(flinkConfig); + ApplicationConfiguration applicationConfiguration = + ApplicationConfiguration.fromConfiguration(flinkConfig); + ClusterClient<String> client = null; + try { + ClusterClientProvider<String> clusterClientProvider = + descriptor.deployApplicationCluster(specification, applicationConfiguration); + client = clusterClientProvider.getClusterClient(); + String clusterId = client.getClusterId(); + LOG.info("Deployment Flink CDC From Cluster ID {}", clusterId); + return new PipelineExecution.ExecutionInfo(clusterId, "submit job successful"); + } catch (Exception e) { + if (client != null) { + client.shutDownCluster(); + } + throw new RuntimeException("Failed to deploy Flink CDC job", e); + } finally { + descriptor.close(); + if (client != null) { + client.close(); + } + } + } +} diff --git a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/utils/FactoryDiscoveryUtils.java b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/utils/FactoryDiscoveryUtils.java index 2eb79aead..4f7649d9e 100644 --- a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/utils/FactoryDiscoveryUtils.java +++ b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/utils/FactoryDiscoveryUtils.java @@ -93,6 +93,12 @@ public class FactoryDiscoveryUtils { try { T factory = getFactoryByIdentifier(identifier, factoryClass); URL url = factory.getClass().getProtectionDomain().getCodeSource().getLocation(); + String urlString = url.toString(); + if (urlString.contains("usrlib")) { + String flinkHome = System.getenv("FLINK_HOME"); + urlString = urlString.replace("usrlib", flinkHome + "/usrlib"); + } + url = new URL(urlString); if (Files.isDirectory(Paths.get(url.toURI()))) { LOG.warn( "The factory class \"{}\" is contained by directory \"{}\" instead of JAR. " @@ -104,7 +110,8 @@ public class FactoryDiscoveryUtils { return Optional.of(url); } catch (Exception e) { throw new RuntimeException( - String.format("Failed to search JAR by factory identifier \"%s\"", identifier)); + String.format("Failed to search JAR by factory identifier \"%s\"", identifier), + e); } } }