This is an automated email from the ASF dual-hosted git repository.
ic4y pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 512d982ee3 Throw IllegalArgumentException when find multiple connector
jar for one pluginIdentifier (#5551)
512d982ee3 is described below
commit 512d982ee31ad6fa4fa36f8fd1017913f4bacbc2
Author: Wenjun Ruan <[email protected]>
AuthorDate: Thu Sep 28 14:02:45 2023 +0800
Throw IllegalArgumentException when find multiple connector jar for one
pluginIdentifier (#5551)
---
.../org/apache/seatunnel/common/config/Common.java | 9 ++-
.../plugin/discovery/AbstractPluginDiscovery.java | 37 ++++++----
.../discovery/AbstractPluginDiscoveryTest.java | 27 +++++--
.../SeaTunnelSourcePluginDiscoveryTest.java | 86 ++++++++++++++++++++++
.../duplicate/connectors/plugin-mapping.properties | 21 ++++++
5 files changed, 158 insertions(+), 22 deletions(-)
diff --git
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/Common.java
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/Common.java
index 88a13fe781..e6ca3cbe49 100644
---
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/Common.java
+++
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/Common.java
@@ -19,6 +19,8 @@ package org.apache.seatunnel.common.config;
import org.apache.commons.lang3.StringUtils;
+import com.google.common.annotations.VisibleForTesting;
+
import java.io.File;
import java.io.IOException;
import java.net.URI;
@@ -67,7 +69,7 @@ public class Common {
return MODE;
}
- private static String getSeaTunnelHome() {
+ public static String getSeaTunnelHome() {
if (StringUtils.isNotEmpty(SEATUNNEL_HOME)) {
return SEATUNNEL_HOME;
@@ -83,6 +85,11 @@ public class Common {
return SEATUNNEL_HOME;
}
+ @VisibleForTesting
+ public static void setSeaTunnelHome(String seatunnelHome) {
+ SEATUNNEL_HOME = seatunnelHome;
+ }
+
/**
* Root dir varies between different spark master and deploy mode, it also
varies between
* relative and absolute path. When running seatunnel in --master local,
you can put plugins
diff --git
a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java
b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java
index ee27429fce..a2006f7dc9 100644
---
a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java
+++
b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java
@@ -47,6 +47,7 @@ import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLClassLoader;
import java.nio.file.Path;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -78,7 +79,7 @@ public abstract class AbstractPluginDiscovery<T> implements
PluginDiscovery<T> {
};
private final Path pluginDir;
- private final Config pluginConfig;
+ private final Config pluginMappingConfig;
private final BiConsumer<ClassLoader, URL> addURLToClassLoaderConsumer;
protected final ConcurrentHashMap<PluginIdentifier, Optional<URL>>
pluginJarPath =
new ConcurrentHashMap<>(Common.COLLECTION_SIZE);
@@ -95,16 +96,16 @@ public abstract class AbstractPluginDiscovery<T> implements
PluginDiscovery<T> {
this(pluginDir, loadConnectorPluginConfig());
}
- public AbstractPluginDiscovery(Path pluginDir, Config pluginConfig) {
- this(pluginDir, pluginConfig, DEFAULT_URL_TO_CLASSLOADER);
+ public AbstractPluginDiscovery(Path pluginDir, Config pluginMappingConfig)
{
+ this(pluginDir, pluginMappingConfig, DEFAULT_URL_TO_CLASSLOADER);
}
public AbstractPluginDiscovery(
Path pluginDir,
- Config pluginConfig,
+ Config pluginMappingConfig,
BiConsumer<ClassLoader, URL> addURLToClassLoaderConsumer) {
this.pluginDir = pluginDir;
- this.pluginConfig = pluginConfig;
+ this.pluginMappingConfig = pluginMappingConfig;
this.addURLToClassLoaderConsumer = addURLToClassLoaderConsumer;
log.info("Load {} Plugin from {}",
getPluginBaseClass().getSimpleName(), pluginDir);
}
@@ -328,16 +329,13 @@ public abstract class AbstractPluginDiscovery<T>
implements PluginDiscovery<T> {
* @return plugin jar path.
*/
private Optional<URL> findPluginJarPath(PluginIdentifier pluginIdentifier)
{
- if (pluginConfig.isEmpty()) {
- return Optional.empty();
- }
final String engineType =
pluginIdentifier.getEngineType().toLowerCase();
final String pluginType =
pluginIdentifier.getPluginType().toLowerCase();
final String pluginName =
pluginIdentifier.getPluginName().toLowerCase();
- if (!pluginConfig.hasPath(engineType)) {
+ if (!pluginMappingConfig.hasPath(engineType)) {
return Optional.empty();
}
- Config engineConfig = pluginConfig.getConfig(engineType);
+ Config engineConfig = pluginMappingConfig.getConfig(engineType);
if (!engineConfig.hasPath(pluginType)) {
return Optional.empty();
}
@@ -365,15 +363,24 @@ public abstract class AbstractPluginDiscovery<T>
implements PluginDiscovery<T> {
if (ArrayUtils.isEmpty(targetPluginFiles)) {
return Optional.empty();
}
+ if (targetPluginFiles.length > 1) {
+ throw new IllegalArgumentException(
+ "Found multiple plugin jar: "
+ + Arrays.stream(targetPluginFiles)
+ .map(File::getPath)
+ .collect(Collectors.joining(","))
+ + " for pluginIdentifier: "
+ + pluginIdentifier);
+ }
try {
URL pluginJarPath = targetPluginFiles[0].toURI().toURL();
- log.info(
- "Discovery plugin jar: {} at: {}",
- pluginIdentifier.getPluginName(),
- pluginJarPath);
+ log.info("Discovery plugin jar for: {} at: {}", pluginIdentifier,
pluginJarPath);
return Optional.of(pluginJarPath);
} catch (MalformedURLException e) {
- log.warn("Cannot get plugin URL: " + targetPluginFiles[0], e);
+ log.warn(
+ "Cannot get plugin URL: {} for pluginIdentifier: {}" +
targetPluginFiles[0],
+ pluginIdentifier,
+ e);
return Optional.empty();
}
}
diff --git
a/seatunnel-plugin-discovery/src/test/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscoveryTest.java
b/seatunnel-plugin-discovery/src/test/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscoveryTest.java
index 5141d7b71d..379fcd2ace 100644
---
a/seatunnel-plugin-discovery/src/test/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscoveryTest.java
+++
b/seatunnel-plugin-discovery/src/test/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscoveryTest.java
@@ -21,24 +21,33 @@ import org.apache.seatunnel.common.config.Common;
import org.apache.seatunnel.common.config.DeployMode;
import org.apache.seatunnel.common.constants.PluginType;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledOnOs;
import org.junit.jupiter.api.condition.OS;
import java.util.Map;
-import java.util.Objects;
@DisabledOnOs(OS.WINDOWS)
public class AbstractPluginDiscoveryTest {
+ private String originSeatunnelHome = null;
+ private DeployMode originMode = null;
+ private static final String seatunnelHome =
+ AbstractPluginDiscoveryTest.class.getResource("/home").getPath();
+
+ @BeforeEach
+ public void before() {
+ originMode = Common.getDeployMode();
+ Common.setDeployMode(DeployMode.CLIENT);
+ originSeatunnelHome = Common.getSeaTunnelHome();
+ Common.setSeaTunnelHome(seatunnelHome);
+ }
+
@Test
public void testGetAllPlugins() {
- Common.setDeployMode(DeployMode.CLIENT);
- System.setProperty(
- "SEATUNNEL_HOME",
-
Objects.requireNonNull(AbstractPluginDiscoveryTest.class.getResource("/home"))
- .getPath());
Map<PluginIdentifier, String> sourcePlugins =
AbstractPluginDiscovery.getAllSupportedPlugins(PluginType.SOURCE);
Assertions.assertEquals(27, sourcePlugins.size());
@@ -47,4 +56,10 @@ public class AbstractPluginDiscoveryTest {
AbstractPluginDiscovery.getAllSupportedPlugins(PluginType.SINK);
Assertions.assertEquals(30, sinkPlugins.size());
}
+
+ @AfterEach
+ public void after() {
+ Common.setSeaTunnelHome(originSeatunnelHome);
+ Common.setDeployMode(originMode);
+ }
}
diff --git
a/seatunnel-plugin-discovery/src/test/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelSourcePluginDiscoveryTest.java
b/seatunnel-plugin-discovery/src/test/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelSourcePluginDiscoveryTest.java
new file mode 100644
index 0000000000..81333d4b4d
--- /dev/null
+++
b/seatunnel-plugin-discovery/src/test/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelSourcePluginDiscoveryTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.plugin.discovery.seatunnel;
+
+import org.apache.seatunnel.common.config.Common;
+import org.apache.seatunnel.common.config.DeployMode;
+import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.DisabledOnOs;
+import org.junit.jupiter.api.condition.OS;
+
+import com.google.common.collect.Lists;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.List;
+
+@DisabledOnOs(OS.WINDOWS)
+class SeaTunnelSourcePluginDiscoveryTest {
+
+ private String originSeatunnelHome = null;
+ private DeployMode originMode = null;
+ private static final String seatunnelHome =
+
SeaTunnelSourcePluginDiscoveryTest.class.getResource("/duplicate").getPath();
+ private static final List<Path> pluginJars =
+ Lists.newArrayList(
+ Paths.get(seatunnelHome, "connectors",
"connector-http-jira.jar"),
+ Paths.get(seatunnelHome, "connectors",
"connector-http.jar"));
+
+ @BeforeEach
+ public void before() throws IOException {
+ originMode = Common.getDeployMode();
+ Common.setDeployMode(DeployMode.CLIENT);
+ originSeatunnelHome = Common.getSeaTunnelHome();
+ Common.setSeaTunnelHome(seatunnelHome);
+
+ // The file is created under target directory.
+ for (Path pluginJar : pluginJars) {
+ Files.createFile(pluginJar);
+ }
+ }
+
+ @Test
+ void getPluginBaseClass() {
+ List<PluginIdentifier> pluginIdentifiers =
+ Lists.newArrayList(
+ PluginIdentifier.of("seatunnel",
PluginType.SOURCE.getType(), "HttpJira"),
+ PluginIdentifier.of("seatunnel",
PluginType.SOURCE.getType(), "HttpBase"));
+ SeaTunnelSourcePluginDiscovery seaTunnelSourcePluginDiscovery =
+ new SeaTunnelSourcePluginDiscovery();
+ Assertions.assertThrows(
+ IllegalArgumentException.class,
+ () ->
seaTunnelSourcePluginDiscovery.getPluginJarPaths(pluginIdentifiers));
+ }
+
+ @AfterEach
+ public void after() throws IOException {
+ for (Path pluginJar : pluginJars) {
+ Files.deleteIfExists(pluginJar);
+ }
+ Common.setSeaTunnelHome(originSeatunnelHome);
+ Common.setDeployMode(originMode);
+ }
+}
diff --git
a/seatunnel-plugin-discovery/src/test/resources/duplicate/connectors/plugin-mapping.properties
b/seatunnel-plugin-discovery/src/test/resources/duplicate/connectors/plugin-mapping.properties
new file mode 100644
index 0000000000..be38939a7f
--- /dev/null
+++
b/seatunnel-plugin-discovery/src/test/resources/duplicate/connectors/plugin-mapping.properties
@@ -0,0 +1,21 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+seatunnel.source.HttpBase = connector-http
+seatunnel.sink.HttpBase = connector-http
+seatunnel.source.HttpJira = connector-http-jira
+seatunnel.sink.HttpJira = connector-http-jira
\ No newline at end of file