This is an automated email from the ASF dual-hosted git repository.
zhouyao2023 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 21c4f5245e [Improve][Core] Improve base on plugin name of lookup
strategy (#7278)
21c4f5245e is described below
commit 21c4f5245e8777a9b3a967b6038d00d3f20d9fb8
Author: corgy-w <[email protected]>
AuthorDate: Mon Jul 29 17:28:10 2024 +0800
[Improve][Core] Improve base on plugin name of lookup strategy (#7278)
* [bug][plugin-discovery] fix multi plugin discovery
* [bug][plugin-discovery] optimize code
---------
Co-authored-by: wangchao <[email protected]>
---
.../plugin/discovery/AbstractPluginDiscovery.java | 122 +++++++++++++++++++--
.../SeaTunnelSourcePluginDiscoveryTest.java | 29 ++++-
.../duplicate/connectors/plugin-mapping.properties | 8 +-
3 files changed, 143 insertions(+), 16 deletions(-)
diff --git
a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java
b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java
index 175ba435ed..d4bd43c3d1 100644
---
a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java
+++
b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java
@@ -54,11 +54,13 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.ServiceLoader;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
@@ -430,17 +432,16 @@ public abstract class AbstractPluginDiscovery<T>
implements PluginDiscovery<T> {
if (ArrayUtils.isEmpty(targetPluginFiles)) {
return Optional.empty();
}
- if (targetPluginFiles.length > 1) {
- throw new IllegalArgumentException(
- "Found multiple plugin jar: "
- + Arrays.stream(targetPluginFiles)
- .map(File::getPath)
- .collect(Collectors.joining(","))
- + " for pluginIdentifier: "
- + pluginIdentifier);
- }
try {
- URL pluginJarPath = targetPluginFiles[0].toURI().toURL();
+ URL pluginJarPath;
+ if (targetPluginFiles.length == 1) {
+ pluginJarPath = targetPluginFiles[0].toURI().toURL();
+ } else {
+ pluginJarPath =
+ findMostSimlarPluginJarFile(targetPluginFiles,
pluginJarPrefix)
+ .toURI()
+ .toURL();
+ }
log.info("Discovery plugin jar for: {} at: {}", pluginIdentifier,
pluginJarPath);
return Optional.of(pluginJarPath);
} catch (MalformedURLException e) {
@@ -451,4 +452,105 @@ public abstract class AbstractPluginDiscovery<T>
implements PluginDiscovery<T> {
return Optional.empty();
}
}
+
+ private static File findMostSimlarPluginJarFile(
+ File[] targetPluginFiles, String pluginJarPrefix) {
+ String splitRegex = "\\-|\\_|\\.";
+ double maxSimlarity = -Integer.MAX_VALUE;
+ int mostSimlarPluginJarFileIndex = -1;
+ for (int i = 0; i < targetPluginFiles.length; i++) {
+ File file = targetPluginFiles[i];
+ String fileName = file.getName();
+ double similarity =
+ CosineSimilarityUtil.cosineSimilarity(pluginJarPrefix,
fileName, splitRegex);
+ if (similarity > maxSimlarity) {
+ maxSimlarity = similarity;
+ mostSimlarPluginJarFileIndex = i;
+ }
+ }
+ return targetPluginFiles[mostSimlarPluginJarFileIndex];
+ }
+
+ static class CosineSimilarityUtil {
+ public static double cosineSimilarity(String textA, String textB,
String splitRegrex) {
+ Set<String> words1 =
+ new
HashSet<>(Arrays.asList(textA.toLowerCase().split(splitRegrex)));
+ Set<String> words2 =
+ new
HashSet<>(Arrays.asList(textB.toLowerCase().split(splitRegrex)));
+ int[] termFrequency1 = calculateTermFrequencyVector(textA, words1,
splitRegrex);
+ int[] termFrequency2 = calculateTermFrequencyVector(textB, words2,
splitRegrex);
+ return calculateCosineSimilarity(termFrequency1, termFrequency2);
+ }
+
+ private static int[] calculateTermFrequencyVector(
+ String text, Set<String> words, String splitRegrex) {
+ int[] termFrequencyVector = new int[words.size()];
+ String[] textArray = text.toLowerCase().split(splitRegrex);
+ List<String> orderedWords = new ArrayList<String>();
+ words.clear();
+ for (String word : textArray) {
+ if (!words.contains(word)) {
+ orderedWords.add(word);
+ words.add(word);
+ }
+ }
+ for (String word : textArray) {
+ if (words.contains(word)) {
+ int index = 0;
+ for (String w : orderedWords) {
+ if (w.equals(word)) {
+ termFrequencyVector[index]++;
+ break;
+ }
+ index++;
+ }
+ }
+ }
+ return termFrequencyVector;
+ }
+
+ private static double calculateCosineSimilarity(int[] vectorA, int[]
vectorB) {
+ double dotProduct = 0.0;
+ double magnitudeA = 0.0;
+ double magnitudeB = 0.0;
+ int vectorALength = vectorA.length;
+ int vectorBLength = vectorB.length;
+ if (vectorALength < vectorBLength) {
+ int[] vectorTemp = new int[vectorBLength];
+ for (int i = 0; i < vectorB.length; i++) {
+ if (i <= vectorALength - 1) {
+ vectorTemp[i] = vectorA[i];
+ } else {
+ vectorTemp[i] = 0;
+ }
+ }
+ vectorA = vectorTemp;
+ }
+ if (vectorALength > vectorBLength) {
+ int[] vectorTemp = new int[vectorALength];
+ for (int i = 0; i < vectorA.length; i++) {
+ if (i <= vectorBLength - 1) {
+ vectorTemp[i] = vectorB[i];
+ } else {
+ vectorTemp[i] = 0;
+ }
+ }
+ vectorB = vectorTemp;
+ }
+ for (int i = 0; i < vectorA.length; i++) {
+ dotProduct += vectorA[i] * vectorB[i];
+ magnitudeA += Math.pow(vectorA[i], 2);
+ magnitudeB += Math.pow(vectorB[i], 2);
+ }
+
+ magnitudeA = Math.sqrt(magnitudeA);
+ magnitudeB = Math.sqrt(magnitudeB);
+
+ if (magnitudeA == 0 || magnitudeB == 0) {
+ return 0.0; // Avoid dividing by 0
+ } else {
+ return dotProduct / (magnitudeA * magnitudeB);
+ }
+ }
+ }
}
diff --git
a/seatunnel-plugin-discovery/src/test/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelSourcePluginDiscoveryTest.java
b/seatunnel-plugin-discovery/src/test/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelSourcePluginDiscoveryTest.java
index 81333d4b4d..88fd76d73b 100644
---
a/seatunnel-plugin-discovery/src/test/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelSourcePluginDiscoveryTest.java
+++
b/seatunnel-plugin-discovery/src/test/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelSourcePluginDiscoveryTest.java
@@ -32,10 +32,13 @@ import org.junit.jupiter.api.condition.OS;
import com.google.common.collect.Lists;
import java.io.IOException;
+import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
@DisabledOnOs(OS.WINDOWS)
class SeaTunnelSourcePluginDiscoveryTest {
@@ -47,7 +50,10 @@ class SeaTunnelSourcePluginDiscoveryTest {
private static final List<Path> pluginJars =
Lists.newArrayList(
Paths.get(seatunnelHome, "connectors",
"connector-http-jira.jar"),
- Paths.get(seatunnelHome, "connectors",
"connector-http.jar"));
+ Paths.get(seatunnelHome, "connectors",
"connector-http.jar"),
+ Paths.get(seatunnelHome, "connectors",
"connector-kafka.jar"),
+ Paths.get(seatunnelHome, "connectors",
"connector-kafka-alcs.jar"),
+ Paths.get(seatunnelHome, "connectors",
"connector-kafka-blcs.jar"));
@BeforeEach
public void before() throws IOException {
@@ -67,12 +73,25 @@ class SeaTunnelSourcePluginDiscoveryTest {
List<PluginIdentifier> pluginIdentifiers =
Lists.newArrayList(
PluginIdentifier.of("seatunnel",
PluginType.SOURCE.getType(), "HttpJira"),
- PluginIdentifier.of("seatunnel",
PluginType.SOURCE.getType(), "HttpBase"));
+ PluginIdentifier.of("seatunnel",
PluginType.SOURCE.getType(), "HttpBase"),
+ PluginIdentifier.of("seatunnel",
PluginType.SOURCE.getType(), "Kafka"),
+ PluginIdentifier.of("seatunnel",
PluginType.SINK.getType(), "Kafka-Blcs"));
SeaTunnelSourcePluginDiscovery seaTunnelSourcePluginDiscovery =
new SeaTunnelSourcePluginDiscovery();
- Assertions.assertThrows(
- IllegalArgumentException.class,
- () ->
seaTunnelSourcePluginDiscovery.getPluginJarPaths(pluginIdentifiers));
+ Assertions.assertIterableEquals(
+ Stream.of(
+ Paths.get(seatunnelHome, "connectors",
"connector-http-jira.jar")
+ .toString(),
+ Paths.get(seatunnelHome, "connectors",
"connector-http.jar")
+ .toString(),
+ Paths.get(seatunnelHome, "connectors",
"connector-kafka.jar")
+ .toString(),
+ Paths.get(seatunnelHome, "connectors",
"connector-kafka-blcs.jar")
+ .toString())
+ .collect(Collectors.toList()),
+
seaTunnelSourcePluginDiscovery.getPluginJarPaths(pluginIdentifiers).stream()
+ .map(URL::getPath)
+ .collect(Collectors.toList()));
}
@AfterEach
diff --git
a/seatunnel-plugin-discovery/src/test/resources/duplicate/connectors/plugin-mapping.properties
b/seatunnel-plugin-discovery/src/test/resources/duplicate/connectors/plugin-mapping.properties
index be38939a7f..ea20ad05b0 100644
---
a/seatunnel-plugin-discovery/src/test/resources/duplicate/connectors/plugin-mapping.properties
+++
b/seatunnel-plugin-discovery/src/test/resources/duplicate/connectors/plugin-mapping.properties
@@ -18,4 +18,10 @@
seatunnel.source.HttpBase = connector-http
seatunnel.sink.HttpBase = connector-http
seatunnel.source.HttpJira = connector-http-jira
-seatunnel.sink.HttpJira = connector-http-jira
\ No newline at end of file
+seatunnel.sink.HttpJira = connector-http-jira
+seatunnel.source.Kafka = connector-kafka
+seatunnel.sink.Kafka = connector-kafka
+seatunnel.source.Kafka-Alcs = connector-kafka-alcs
+seatunnel.sink.Kafka-Alcs = connector-kafka-alcs
+seatunnel.source.Kafka-Blcs = connector-kafka-blcs
+seatunnel.sink.Kafka-Blcs = connector-kafka-blcs
\ No newline at end of file