This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch api-draft
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/api-draft by this push:
new 5dae9c1ef [API-DRAFT] [MERGE] Fix obvious bugs that don't work
properly before merge (#2082)
5dae9c1ef is described below
commit 5dae9c1ef0e1437e27b5e55b74f9b713f414449b
Author: Hisoka <[email protected]>
AuthorDate: Wed Jun 29 17:02:23 2022 +0800
[API-DRAFT] [MERGE] Fix obvious bugs that don't work properly before merge
(#2082)
---
config/flink.batch.conf.template | 11 ++++-----
config/spark.streaming.conf.template | 2 +-
plugin-mapping.properties | 1 +
pom.xml | 6 ++---
.../core/base/config/AbstractExecutionContext.java | 17 ++++++--------
.../apache/seatunnel/core/spark/SparkStarter.java | 27 +++++++++++++++++++---
seatunnel-dist/src/main/assembly/assembly-bin.xml | 1 +
7 files changed, 42 insertions(+), 23 deletions(-)
diff --git a/config/flink.batch.conf.template b/config/flink.batch.conf.template
index 45b3e8532..65f614df0 100644
--- a/config/flink.batch.conf.template
+++ b/config/flink.batch.conf.template
@@ -26,12 +26,11 @@ env {
source {
# This is a example input plugin **only for test and demonstrate the feature
input plugin**
- FileSource {
- path = "hdfs://localhost:9000/output/text"
- format.type = "text"
- schema = "string"
- result_table_name = "test"
- }
+ FakeSource {
+ result_table_name = "test"
+ field_name = "name,age"
+ }
+
# If you would like to get more information about how to configure seatunnel
and see full list of input plugins,
# please go to
https://seatunnel.apache.org/docs/flink/configuration/source-plugins/Fake
diff --git a/config/spark.streaming.conf.template
b/config/spark.streaming.conf.template
index 3c87f56e8..b79a9dec6 100644
--- a/config/spark.streaming.conf.template
+++ b/config/spark.streaming.conf.template
@@ -31,7 +31,7 @@ env {
source {
# This is a example input plugin **only for test and demonstrate the feature
input plugin**
- fakeStream {
+ FakeStream {
content = ["Hello World, SeaTunnel"]
}
diff --git a/plugin-mapping.properties b/plugin-mapping.properties
index 01bb3abc4..5ba2e8121 100644
--- a/plugin-mapping.properties
+++ b/plugin-mapping.properties
@@ -45,6 +45,7 @@ flink.sink.Kafka = seatunnel-connector-flink-kafka
spark.source.ElasticSearch = seatunnel-connector-spark-elasticsearch
spark.source.Fake = seatunnel-connector-spark-fake
+spark.source.FakeStream = seatunnel-connector-spark-fake
spark.source.FeishuSheet = seatunnel-connector-spark-feishu
spark.source.File = seatunnel-connector-spark-file
spark.source.Hbase = seatunnel-connector-spark-hbase
diff --git a/pom.xml b/pom.xml
index 9d21a4343..d60f43735 100644
--- a/pom.xml
+++ b/pom.xml
@@ -82,15 +82,15 @@
<module>seatunnel-core</module>
<module>seatunnel-transforms</module>
<module>seatunnel-connectors</module>
- <module>seatunnel-dist</module>
+ <module>seatunnel-connectors-v2</module>
+ <module>seatunnel-connectors-v2-dist</module>
<module>seatunnel-examples</module>
<module>seatunnel-e2e</module>
<module>seatunnel-api</module>
<module>seatunnel-translation</module>
<module>seatunnel-plugin-discovery</module>
<module>seatunnel-formats</module>
- <module>seatunnel-connectors-v2</module>
- <module>seatunnel-connectors-v2-dist</module>
+ <module>seatunnel-dist</module>
</modules>
<properties>
diff --git
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/AbstractExecutionContext.java
b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/AbstractExecutionContext.java
index e3ed3d72e..92d4232c3 100644
---
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/AbstractExecutionContext.java
+++
b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/AbstractExecutionContext.java
@@ -80,16 +80,13 @@ public abstract class AbstractExecutionContext<ENVIRONMENT
extends RuntimeEnv> {
@SuppressWarnings("checkstyle:Indentation")
protected List<PluginIdentifier> getPluginIdentifiers(PluginType...
pluginTypes) {
- return Arrays.stream(pluginTypes).flatMap(new Function<PluginType,
Stream<PluginIdentifier>>() {
- @Override
- public Stream<PluginIdentifier> apply(PluginType pluginType) {
- List<? extends Config> configList =
config.getConfigList(pluginType.getType());
- return configList.stream()
- .map(pluginConfig -> PluginIdentifier
- .of(engine.getEngine(),
- pluginType.getType(),
- pluginConfig.getString("plugin_name")));
- }
+ return Arrays.stream(pluginTypes).flatMap((Function<PluginType,
Stream<PluginIdentifier>>) pluginType -> {
+ List<? extends Config> configList =
config.getConfigList(pluginType.getType());
+ return configList.stream()
+ .map(pluginConfig -> PluginIdentifier
+ .of(engine.getEngine(),
+ pluginType.getType(),
+ pluginConfig.getString("plugin_name")));
}).collect(Collectors.toList());
}
}
diff --git
a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/SparkStarter.java
b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/SparkStarter.java
index 02678f768..44c445403 100644
---
a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/SparkStarter.java
+++
b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/SparkStarter.java
@@ -22,12 +22,15 @@ import static java.nio.file.FileVisitOption.FOLLOW_LINKS;
import org.apache.seatunnel.common.Constants;
import org.apache.seatunnel.common.config.Common;
import org.apache.seatunnel.common.config.DeployMode;
+import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.core.base.Starter;
import org.apache.seatunnel.core.base.config.ConfigBuilder;
import org.apache.seatunnel.core.base.config.EngineType;
import org.apache.seatunnel.core.base.utils.CompressionUtils;
import org.apache.seatunnel.core.spark.args.SparkCommandArgs;
-import org.apache.seatunnel.core.spark.config.SparkExecutionContext;
+import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
+import org.apache.seatunnel.plugin.discovery.spark.SparkSinkPluginDiscovery;
+import org.apache.seatunnel.plugin.discovery.spark.SparkSourcePluginDiscovery;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
@@ -40,6 +43,7 @@ import org.apache.commons.lang3.StringUtils;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
@@ -50,6 +54,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -220,8 +225,12 @@ public class SparkStarter implements Starter {
return Collections.emptyList();
}
Config config = new
ConfigBuilder(Paths.get(commandArgs.getConfigFile())).getConfig();
- SparkExecutionContext sparkExecutionContext = new
SparkExecutionContext(config, EngineType.SPARK);
- return sparkExecutionContext.getPluginJars().stream().map(url -> new
File(url.getPath()).toPath()).collect(Collectors.toList());
+ List<URL> pluginJars = new ArrayList<>();
+ SparkSourcePluginDiscovery sparkSourcePluginDiscovery = new
SparkSourcePluginDiscovery();
+ SparkSinkPluginDiscovery sparkSinkPluginDiscovery = new
SparkSinkPluginDiscovery();
+
pluginJars.addAll(sparkSourcePluginDiscovery.getPluginJarPaths(getPluginIdentifiers(config,
PluginType.SOURCE)));
+
pluginJars.addAll(sparkSinkPluginDiscovery.getPluginJarPaths(getPluginIdentifiers(config,
PluginType.SINK)));
+ return pluginJars.stream().map(url -> new
File(url.getPath()).toPath()).collect(Collectors.toList());
}
/**
@@ -313,6 +322,18 @@ public class SparkStarter implements Starter {
commands.add(Common.appLibDir().resolve("seatunnel-core-spark.jar").toString());
}
+ @SuppressWarnings("checkstyle:Indentation")
+ private List<PluginIdentifier> getPluginIdentifiers(Config config,
PluginType... pluginTypes) {
+ return Arrays.stream(pluginTypes).flatMap((Function<PluginType,
Stream<PluginIdentifier>>) pluginType -> {
+ List<? extends Config> configList =
config.getConfigList(pluginType.getType());
+ return configList.stream()
+ .map(pluginConfig -> PluginIdentifier
+ .of(EngineType.SPARK.getEngine(),
+ pluginType.getType(),
+ pluginConfig.getString("plugin_name")));
+ }).collect(Collectors.toList());
+ }
+
/**
* a Starter for building spark-submit commands with client mode options
*/
diff --git a/seatunnel-dist/src/main/assembly/assembly-bin.xml
b/seatunnel-dist/src/main/assembly/assembly-bin.xml
index 258d8b916..afd0c7150 100644
--- a/seatunnel-dist/src/main/assembly/assembly-bin.xml
+++ b/seatunnel-dist/src/main/assembly/assembly-bin.xml
@@ -146,6 +146,7 @@
</includes>
<excludes>
<exclude>%regex[.*((javadoc)|(sources))\.jar]</exclude>
+ <exclude>connector-common*.jar</exclude>
</excludes>
<outputDirectory>/connectors/seatunnel</outputDirectory>
</fileSet>