This is an automated email from the ASF dual-hosted git repository.
ic4y pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 79c3de43e [Feature][API] add common options (#3353)
79c3de43e is described below
commit 79c3de43e743ca1c1b6c6aab998159951453b442
Author: Eric <[email protected]>
AuthorDate: Sat Nov 26 15:38:46 2022 +0800
[Feature][API] add common options (#3353)
* add fullOptionRule method to TableSourceFactory and TableSinkFactory
* add common option rules
* add checkpoint interval parameter
---
.../apache/seatunnel/api/env/EnvCommonOptions.java | 57 ++++++++++++++++
.../apache/seatunnel/api/env/EnvOptionRule.java | 22 ++++---
.../seatunnel/api/sink/SinkCommonOptions.java | 40 ++++++++++++
.../seatunnel/api/source/SourceCommonOptions.java | 45 +++++++++++++
.../seatunnel/api/table/factory/FactoryUtil.java | 19 ++++++
.../api/table/factory/TableSinkFactory.java | 3 +-
.../api/table/factory/TableSourceFactory.java | 5 +-
.../api/table/factory/TableTransformFactory.java | 1 -
.../api/transform/TransformCommonOptions.java | 53 +++++++++++++++
.../common/constants/CollectionConstants.java | 8 ---
.../apache/seatunnel/common/constants/JobMode.java | 2 +-
.../flink/execution/SinkExecuteProcessor.java | 6 +-
.../flink/execution/SourceExecuteProcessor.java | 6 +-
.../spark/execution/SinkExecuteProcessor.java | 11 ++--
.../spark/execution/SourceExecuteProcessor.java | 11 ++--
.../seatunnel/command/ClientExecuteCommand.java | 2 +
.../engine/e2e/ClusterFaultToleranceIT.java | 68 +++++++++-----------
.../seatunnel/engine/e2e/JobExecutionIT.java | 6 +-
.../seatunnel/engine/client/SeaTunnelClient.java | 6 +-
.../engine/client/SeaTunnelClientInstance.java | 4 +-
.../engine/client/job/JobExecutionEnvironment.java | 3 +-
.../engine/client/SeaTunnelClientTest.java | 6 +-
.../engine/core/parse/JobConfigParser.java | 75 +++++++++++++---------
.../engine/server/SeaTunnelServerStarter.java | 10 +--
.../seatunnel/engine/server/master/JobMaster.java | 6 +-
.../common/AbstractSeaTunnelTransform.java | 6 +-
.../spark/source/SeaTunnelSourceSupport.java | 6 +-
27 files changed, 359 insertions(+), 128 deletions(-)
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvCommonOptions.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvCommonOptions.java
new file mode 100644
index 000000000..bc188c416
--- /dev/null
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvCommonOptions.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.env;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.common.constants.JobMode;
+
+import java.util.Map;
+
+public class EnvCommonOptions {
+ public static final Option<Integer> PARALLELISM =
+ Options.key("parallelism")
+ .intType()
+ .defaultValue(1)
+ .withDescription("When parallelism is not specified in connector,
the parallelism in env is used by default. " +
+ "When parallelism is specified, it will override the
parallelism in env.");
+
+ public static final Option<String> JOB_NAME =
+ Options.key("job.name")
+ .stringType()
+ .defaultValue("SeaTunnel Job")
+ .withDescription("The job name of this job");
+
+ public static final Option<JobMode> JOB_MODE =
+ Options.key("job.mode")
+ .enumType(JobMode.class)
+ .defaultValue(JobMode.BATCH)
+ .withDescription("The job mode of this job, support Batch and
Stream, Default value is Batch");
+
+ public static final Option<Long> CHECKPOINT_INTERVAL =
+ Options.key("checkpoint.interval")
+ .longType()
+ .noDefaultValue()
+ .withDescription("The interval (in milliseconds) between two
consecutive checkpoints.");
+
+ public static final Option<Map<String, String>> CUSTOM_PARAMETERS =
+ Options.key("custom_parameters")
+ .mapType()
+ .noDefaultValue()
+ .withDescription("custom parameters for run engine");
+}
diff --git
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientInstance.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvOptionRule.java
similarity index 61%
copy from
seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientInstance.java
copy to
seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvOptionRule.java
index 83b117de7..cbfd3d455 100644
---
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientInstance.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvOptionRule.java
@@ -15,17 +15,19 @@
* limitations under the License.
*/
-package org.apache.seatunnel.engine.client;
+package org.apache.seatunnel.api.env;
-import org.apache.seatunnel.engine.client.job.JobClient;
-import org.apache.seatunnel.engine.client.job.JobExecutionEnvironment;
-import org.apache.seatunnel.engine.common.config.JobConfig;
+import org.apache.seatunnel.api.configuration.util.OptionRule;
-public interface SeaTunnelClientInstance {
+public class EnvOptionRule {
- JobExecutionEnvironment createExecutionContext(String filePath, JobConfig
config);
-
- JobClient createJobClient();
-
- void close();
+ public static OptionRule getEnvOptionRules() {
+ return OptionRule.builder()
+ .required(EnvCommonOptions.JOB_MODE)
+ .optional(EnvCommonOptions.JOB_NAME,
+ EnvCommonOptions.PARALLELISM,
+ EnvCommonOptions.CHECKPOINT_INTERVAL,
+ EnvCommonOptions.CUSTOM_PARAMETERS)
+ .build();
+ }
}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommonOptions.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommonOptions.java
new file mode 100644
index 000000000..61db2e08d
--- /dev/null
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommonOptions.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.sink;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+
+public class SinkCommonOptions {
+
+ public static final Option<String> SOURCE_TABLE_NAME =
+ Options.key("source_table_name")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "When source_table_name is not specified, " +
+ "the current plug-in processes the data set dataset output
by the previous plugin in the configuration file. " +
+ "When source_table_name is specified, the current plug-in
is processing the data set corresponding to this parameter.");
+
+ public static final Option<Integer> PARALLELISM =
+ Options.key("parallelism")
+ .intType()
+ .noDefaultValue()
+ .withDescription("When parallelism is not specified, the
parallelism in env is used by default. " +
+ "When parallelism is specified, it will override the
parallelism in env.");
+}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceCommonOptions.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceCommonOptions.java
new file mode 100644
index 000000000..494d34a9c
--- /dev/null
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceCommonOptions.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.source;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+
+public class SourceCommonOptions {
+
+ public static final Option<String> RESULT_TABLE_NAME =
+ Options.key("result_table_name")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "When result_table_name is not specified, " +
+ "the data processed by this plugin will not be registered
as a data set (dataStream/dataset) " +
+ "that can be directly accessed by other plugins, or called
a temporary table (table)" +
+ "When result_table_name is specified, " +
+ "the data processed by this plugin will be registered as a
data set (dataStream/dataset) " +
+ "that can be directly accessed by other plugins, or called
a temporary table (table) . " +
+ "The data set (dataStream/dataset) registered here can be
directly accessed by other plugins " +
+ "by specifying source_table_name .");
+
+ public static final Option<Integer> PARALLELISM =
+ Options.key("parallelism")
+ .intType()
+ .noDefaultValue()
+ .withDescription("When parallelism is not specified, the
parallelism in env is used by default. " +
+ "When parallelism is specified, it will override the
parallelism in env.");
+}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
index 083048c18..89a5deabe 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
@@ -17,13 +17,16 @@
package org.apache.seatunnel.api.table.factory;
+import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceCommonOptions;
import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.seatunnel.api.table.catalog.Catalog;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.connector.TableSource;
+import lombok.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -155,4 +158,20 @@ public final class FactoryUtil {
throw new FactoryException("Could not load service provider for
factories.", e);
}
}
+
+ /**
+ * This method is called by SeaTunnel Web to get the full option rule of a
source.
+ * @return
+ */
+ public static OptionRule sourceFullOptionRule(@NonNull Factory factory) {
+ OptionRule sourceOptionRule = factory.optionRule();
+ if (sourceOptionRule == null) {
+ throw new FactoryException("sourceOptionRule can not be null");
+ }
+
+ OptionRule sourceCommonOptionRule =
+
OptionRule.builder().optional(SourceCommonOptions.PARALLELISM).build();
+
sourceOptionRule.getOptionalOptions().addAll(sourceCommonOptionRule.getOptionalOptions());
+ return sourceOptionRule;
+ }
}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactory.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactory.java
index 84ea4b857..3d964f66b 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactory.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactory.java
@@ -21,7 +21,6 @@ import org.apache.seatunnel.api.table.connector.TableSink;
/**
* This is an SPI interface, used to create {@link TableSink}. Each plugin
need to have it own implementation.
- * todo: now we have not use this interface, we directly use {@link
org.apache.seatunnel.api.sink.SeaTunnelSink} as the SPI interface.
*
* @param <IN> row type
* @param <StateT> state type
@@ -32,11 +31,11 @@ public interface TableSinkFactory<IN, StateT, CommitInfoT,
AggregatedCommitInfoT
/**
* We will never use this method now. So gave a default implement and
return null.
+ *
* @param context TableFactoryContext
* @return
*/
default TableSink<IN, StateT, CommitInfoT, AggregatedCommitInfoT>
createSink(TableFactoryContext context) {
throw new UnsupportedOperationException("unsupported now");
}
-
}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSourceFactory.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSourceFactory.java
index 64eeef881..71b6ed413 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSourceFactory.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSourceFactory.java
@@ -24,15 +24,16 @@ import java.io.Serializable;
/**
* This is an SPI interface, used to create {@link TableSource}. Each plugin
need to have it own implementation.
- * todo: now we have not use this interface, we directly use {@link
org.apache.seatunnel.api.source.SeaTunnelSource} as the SPI interface
*/
public interface TableSourceFactory extends Factory {
/**
* We will never use this method now. So gave a default implement and
return null.
+ *
* @param context TableFactoryContext
*/
- default <T, SplitT extends SourceSplit, StateT extends Serializable>
TableSource<T, SplitT, StateT> createSource(TableFactoryContext context) {
+ default <T, SplitT extends SourceSplit, StateT extends Serializable>
TableSource<T, SplitT, StateT> createSource(
+ TableFactoryContext context) {
throw new UnsupportedOperationException("unsupported now");
}
}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableTransformFactory.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableTransformFactory.java
index afc638ab9..4e6f11ad2 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableTransformFactory.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableTransformFactory.java
@@ -21,7 +21,6 @@ import
org.apache.seatunnel.api.table.connector.TableTransform;
/**
* This is an SPI interface, used to create {@link
org.apache.seatunnel.api.table.connector.TableTransform}. Each plugin need to
have it own implementation.
- * todo: now we have not use this interface, we directly use {@link
org.apache.seatunnel.api.transform.SeaTunnelTransform} as the SPI interface
*/
public interface TableTransformFactory extends Factory {
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/transform/TransformCommonOptions.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/transform/TransformCommonOptions.java
new file mode 100644
index 000000000..ec277d308
--- /dev/null
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/transform/TransformCommonOptions.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.transform;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+
+public class TransformCommonOptions {
+ public static final Option<String> RESULT_TABLE_NAME =
+ Options.key("result_table_name")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "When result_table_name is not specified, " +
+ "the data processed by this plugin will not be registered
as a data set (dataStream/dataset) " +
+ "that can be directly accessed by other plugins, or called
a temporary table (table)" +
+ "When result_table_name is specified, " +
+ "the data processed by this plugin will be registered as a
data set (dataStream/dataset) " +
+ "that can be directly accessed by other plugins, or called
a temporary table (table) . " +
+ "The data set (dataStream/dataset) registered here can be
directly accessed by other plugins " +
+ "by specifying source_table_name .");
+
+ public static final Option<String> SOURCE_TABLE_NAME =
+ Options.key("source_table_name")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "When source_table_name is not specified, " +
+ "the current plug-in processes the data set dataset output
by the previous plugin in the configuration file. " +
+ "When source_table_name is specified, the current plug-in
is processing the data set corresponding to this parameter.");
+
+ public static final Option<Integer> PARALLELISM =
+ Options.key("parallelism")
+ .intType()
+ .noDefaultValue()
+ .withDescription("When parallelism is not specified, the
parallelism in env is used by default. " +
+ "When parallelism is specified, it will override the
parallelism in env.");
+}
diff --git
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/constants/CollectionConstants.java
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/constants/CollectionConstants.java
index fb387b605..ce263b257 100644
---
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/constants/CollectionConstants.java
+++
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/constants/CollectionConstants.java
@@ -19,21 +19,13 @@ package org.apache.seatunnel.common.constants;
public class CollectionConstants {
- public static final int MAP_SIZE = 6;
-
public static final String PLUGIN_NAME = "plugin_name";
public static final String SEATUNNEL_PLUGIN = "seatunnel";
- public static final String FLINK_PLUGIN = "flink";
-
- public static final String SPARK_PLUGIN = "spark";
-
public static final String SOURCE_PLUGIN = "source";
public static final String TRANSFORM_PLUGIN = "transform";
public static final String SINK_PLUGIN = "sink";
-
- public static final String PARALLELISM = "parallelism";
}
diff --git
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/constants/JobMode.java
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/constants/JobMode.java
index a7aa32611..a0df69eda 100644
---
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/constants/JobMode.java
+++
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/constants/JobMode.java
@@ -18,5 +18,5 @@
package org.apache.seatunnel.common.constants;
public enum JobMode {
- BATCH, STREAMING, STRUCTURED_STREAMING
+ BATCH, STREAMING
}
diff --git
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
index 97a576b90..d82f07f3c 100644
---
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
@@ -19,9 +19,9 @@ package org.apache.seatunnel.core.starter.flink.execution;
import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.source.SourceCommonOptions;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.common.constants.CollectionConstants;
import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
import
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery;
@@ -76,8 +76,8 @@ public class SinkExecuteProcessor extends
AbstractPluginExecuteProcessor<SeaTunn
DataStream<Row> stream = fromSourceTable(sinkConfig).orElse(input);
seaTunnelSink.setTypeInfo((SeaTunnelRowType)
TypeConverterUtils.convert(stream.getType()));
DataStreamSink<Row> dataStreamSink = stream.sinkTo(new
FlinkSink<>(seaTunnelSink)).name(seaTunnelSink.getPluginName());
- if (sinkConfig.hasPath(CollectionConstants.PARALLELISM)) {
- int parallelism =
sinkConfig.getInt(CollectionConstants.PARALLELISM);
+ if (sinkConfig.hasPath(SourceCommonOptions.PARALLELISM.key())) {
+ int parallelism =
sinkConfig.getInt(SourceCommonOptions.PARALLELISM.key());
dataStreamSink.setParallelism(parallelism);
}
}
diff --git
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
index 60292ad42..76031f3fc 100644
---
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
@@ -21,8 +21,8 @@ import static
org.apache.flink.util.Preconditions.checkNotNull;
import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceCommonOptions;
import org.apache.seatunnel.api.source.SupportCoordinate;
-import org.apache.seatunnel.common.constants.CollectionConstants;
import org.apache.seatunnel.common.constants.JobMode;
import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
import
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSourcePluginDiscovery;
@@ -73,8 +73,8 @@ public class SourceExecuteProcessor extends
AbstractPluginExecuteProcessor<SeaTu
"SeaTunnel " + internalSource.getClass().getSimpleName(),
internalSource.getBoundedness() ==
org.apache.seatunnel.api.source.Boundedness.BOUNDED);
Config pluginConfig = pluginConfigs.get(i);
- if (pluginConfig.hasPath(CollectionConstants.PARALLELISM)) {
- int parallelism =
pluginConfig.getInt(CollectionConstants.PARALLELISM);
+ if (pluginConfig.hasPath(SourceCommonOptions.PARALLELISM.key())) {
+ int parallelism =
pluginConfig.getInt(SourceCommonOptions.PARALLELISM.key());
sourceStream.setParallelism(parallelism);
}
registerResultTable(pluginConfig, sourceStream);
diff --git
a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
index c5d8276fe..c5f6faf64 100644
---
a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
@@ -18,9 +18,10 @@
package org.apache.seatunnel.core.starter.spark.execution;
import org.apache.seatunnel.api.common.JobContext;
+import org.apache.seatunnel.api.env.EnvCommonOptions;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.sink.SinkCommonOptions;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.common.constants.CollectionConstants;
import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
import
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery;
@@ -73,12 +74,12 @@ public class SinkExecuteProcessor extends
AbstractPluginExecuteProcessor<SeaTunn
SeaTunnelSink<?, ?, ?, ?> seaTunnelSink = plugins.get(i);
Dataset<Row> dataset = fromSourceTable(sinkConfig,
sparkEnvironment).orElse(input);
int parallelism;
- if (sinkConfig.hasPath(CollectionConstants.PARALLELISM)) {
- parallelism =
sinkConfig.getInt(CollectionConstants.PARALLELISM);
+ if (sinkConfig.hasPath(SinkCommonOptions.PARALLELISM.key())) {
+ parallelism =
sinkConfig.getInt(SinkCommonOptions.PARALLELISM.key());
} else {
- parallelism =
sparkEnvironment.getSparkConf().getInt(CollectionConstants.PARALLELISM, 1);
+ parallelism =
sparkEnvironment.getSparkConf().getInt(EnvCommonOptions.PARALLELISM.key(),
EnvCommonOptions.PARALLELISM.defaultValue());
}
-
dataset.sparkSession().read().option(CollectionConstants.PARALLELISM,
parallelism);
+
dataset.sparkSession().read().option(SinkCommonOptions.PARALLELISM.key(),
parallelism);
// TODO modify checkpoint location
seaTunnelSink.setTypeInfo((SeaTunnelRowType)
TypeConverterUtils.convert(dataset.schema()));
SparkSinkInjector.inject(dataset.write(),
seaTunnelSink).option("checkpointLocation", "/tmp").save();
diff --git
a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
index 6f2f7901c..20a5474f5 100644
---
a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
@@ -18,9 +18,10 @@
package org.apache.seatunnel.core.starter.spark.execution;
import org.apache.seatunnel.api.common.JobContext;
+import org.apache.seatunnel.api.env.EnvCommonOptions;
import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceCommonOptions;
import org.apache.seatunnel.common.Constants;
-import org.apache.seatunnel.common.constants.CollectionConstants;
import org.apache.seatunnel.common.utils.SerializationUtils;
import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
import
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSourcePluginDiscovery;
@@ -57,15 +58,15 @@ public class SourceExecuteProcessor extends
AbstractPluginExecuteProcessor<SeaTu
SeaTunnelSource<?, ?, ?> source = plugins.get(i);
Config pluginConfig = pluginConfigs.get(i);
int parallelism;
- if (pluginConfig.hasPath(CollectionConstants.PARALLELISM)) {
- parallelism =
pluginConfig.getInt(CollectionConstants.PARALLELISM);
+ if (pluginConfig.hasPath(SourceCommonOptions.PARALLELISM.key())) {
+ parallelism =
pluginConfig.getInt(SourceCommonOptions.PARALLELISM.key());
} else {
- parallelism =
sparkEnvironment.getSparkConf().getInt(CollectionConstants.PARALLELISM, 1);
+ parallelism =
sparkEnvironment.getSparkConf().getInt(EnvCommonOptions.PARALLELISM.key(),
EnvCommonOptions.PARALLELISM.defaultValue());
}
Dataset<Row> dataset = sparkEnvironment.getSparkSession()
.read()
.format(SeaTunnelSource.class.getSimpleName())
- .option(CollectionConstants.PARALLELISM, parallelism)
+ .option(SourceCommonOptions.PARALLELISM.key(), parallelism)
.option(Constants.SOURCE_SERIALIZATION,
SerializationUtils.objectToString(source))
.schema((StructType)
TypeConverterUtils.convert(source.getProducedType())).load();
sources.add(dataset);
diff --git
a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java
b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java
index 237bce78a..3c9c8af8d 100644
---
a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java
+++
b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java
@@ -58,12 +58,14 @@ public class ClientExecuteCommand implements
Command<ClientCommandArgs> {
public void execute() throws CommandExecuteException {
HazelcastInstance instance = null;
SeaTunnelClient engineClient = null;
+ SeaTunnelConfig seaTunnelConfig =
ConfigProvider.locateAndGetSeaTunnelConfig();
try {
String clusterName = clientCommandArgs.getClusterName();
if
(clientCommandArgs.getExecutionMode().equals(ExecutionMode.LOCAL)) {
clusterName = creatRandomClusterName(clusterName);
instance = createServerInLocal(clusterName);
}
+ seaTunnelConfig.getHazelcastConfig().setClusterName(clusterName);
ClientConfig clientConfig =
ConfigProvider.locateAndGetClientConfig();
clientConfig.setClusterName(clusterName);
engineClient = new SeaTunnelClient(clientConfig);
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java
index afd571e66..b55117bde 100644
---
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java
@@ -28,6 +28,7 @@ import org.apache.seatunnel.engine.client.job.ClientJobProxy;
import org.apache.seatunnel.engine.client.job.JobExecutionEnvironment;
import org.apache.seatunnel.engine.common.config.ConfigProvider;
import org.apache.seatunnel.engine.common.config.JobConfig;
+import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.engine.server.SeaTunnelServerStarter;
@@ -74,15 +75,15 @@ public class ClusterFaultToleranceIT {
HazelcastInstanceImpl node3 = null;
SeaTunnelClient engineClient = null;
+ SeaTunnelConfig seaTunnelConfig =
ConfigProvider.locateAndGetSeaTunnelConfig();
+
seaTunnelConfig.getHazelcastConfig().setClusterName(TestUtils.getClusterName(testClusterName));
+
try {
- node1 = SeaTunnelServerStarter.createHazelcastInstance(
- TestUtils.getClusterName(testClusterName));
+ node1 =
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
- node2 = SeaTunnelServerStarter.createHazelcastInstance(
- TestUtils.getClusterName(testClusterName));
+ node2 =
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
- node3 = SeaTunnelServerStarter.createHazelcastInstance(
- TestUtils.getClusterName(testClusterName));
+ node3 =
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
// waiting all node added to cluster
HazelcastInstanceImpl finalNode = node1;
@@ -182,15 +183,14 @@ public class ClusterFaultToleranceIT {
HazelcastInstanceImpl node3 = null;
SeaTunnelClient engineClient = null;
+ SeaTunnelConfig seaTunnelConfig =
ConfigProvider.locateAndGetSeaTunnelConfig();
+
seaTunnelConfig.getHazelcastConfig().setClusterName(TestUtils.getClusterName(testClusterName));
try {
- node1 = SeaTunnelServerStarter.createHazelcastInstance(
- TestUtils.getClusterName(testClusterName));
+ node1 =
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
- node2 = SeaTunnelServerStarter.createHazelcastInstance(
- TestUtils.getClusterName(testClusterName));
+ node2 =
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
- node3 = SeaTunnelServerStarter.createHazelcastInstance(
- TestUtils.getClusterName(testClusterName));
+ node3 =
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
// waiting all node added to cluster
HazelcastInstanceImpl finalNode = node1;
@@ -263,15 +263,14 @@ public class ClusterFaultToleranceIT {
HazelcastInstanceImpl node3 = null;
SeaTunnelClient engineClient = null;
+ SeaTunnelConfig seaTunnelConfig =
ConfigProvider.locateAndGetSeaTunnelConfig();
+
seaTunnelConfig.getHazelcastConfig().setClusterName(TestUtils.getClusterName(testClusterName));
try {
- node1 = SeaTunnelServerStarter.createHazelcastInstance(
- TestUtils.getClusterName(testClusterName));
+ node1 =
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
- node2 = SeaTunnelServerStarter.createHazelcastInstance(
- TestUtils.getClusterName(testClusterName));
+ node2 =
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
- node3 = SeaTunnelServerStarter.createHazelcastInstance(
- TestUtils.getClusterName(testClusterName));
+ node3 =
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
// waiting all node added to cluster
HazelcastInstanceImpl finalNode = node1;
@@ -346,15 +345,14 @@ public class ClusterFaultToleranceIT {
HazelcastInstanceImpl node3 = null;
SeaTunnelClient engineClient = null;
+ SeaTunnelConfig seaTunnelConfig =
ConfigProvider.locateAndGetSeaTunnelConfig();
+
seaTunnelConfig.getHazelcastConfig().setClusterName(TestUtils.getClusterName(testClusterName));
try {
- node1 = SeaTunnelServerStarter.createHazelcastInstance(
- TestUtils.getClusterName(testClusterName));
+ node1 =
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
- node2 = SeaTunnelServerStarter.createHazelcastInstance(
- TestUtils.getClusterName(testClusterName));
+ node2 =
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
- node3 = SeaTunnelServerStarter.createHazelcastInstance(
- TestUtils.getClusterName(testClusterName));
+ node3 =
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
// waiting all node added to cluster
HazelcastInstanceImpl finalNode = node1;
@@ -444,15 +442,14 @@ public class ClusterFaultToleranceIT {
HazelcastInstanceImpl node3 = null;
SeaTunnelClient engineClient = null;
+ SeaTunnelConfig seaTunnelConfig =
ConfigProvider.locateAndGetSeaTunnelConfig();
+
seaTunnelConfig.getHazelcastConfig().setClusterName(TestUtils.getClusterName(testClusterName));
try {
- node1 = SeaTunnelServerStarter.createHazelcastInstance(
- TestUtils.getClusterName(testClusterName));
+ node1 =
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
- node2 = SeaTunnelServerStarter.createHazelcastInstance(
- TestUtils.getClusterName(testClusterName));
+ node2 =
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
- node3 = SeaTunnelServerStarter.createHazelcastInstance(
- TestUtils.getClusterName(testClusterName));
+ node3 =
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
// waiting all node added to cluster
HazelcastInstanceImpl finalNode = node1;
@@ -527,15 +524,14 @@ public class ClusterFaultToleranceIT {
HazelcastInstanceImpl node3 = null;
SeaTunnelClient engineClient = null;
+ SeaTunnelConfig seaTunnelConfig =
ConfigProvider.locateAndGetSeaTunnelConfig();
+
seaTunnelConfig.getHazelcastConfig().setClusterName(TestUtils.getClusterName(testClusterName));
try {
- node1 = SeaTunnelServerStarter.createHazelcastInstance(
- TestUtils.getClusterName(testClusterName));
+ node1 =
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
- node2 = SeaTunnelServerStarter.createHazelcastInstance(
- TestUtils.getClusterName(testClusterName));
+ node2 =
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
- node3 = SeaTunnelServerStarter.createHazelcastInstance(
- TestUtils.getClusterName(testClusterName));
+ node3 =
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
// waiting all node added to cluster
HazelcastInstanceImpl finalNode = node1;
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java
index c483c61b5..f7532a3f9 100644
---
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java
@@ -65,7 +65,8 @@ public class JobExecutionIT {
ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
clientConfig.setClusterName(TestUtils.getClusterName("JobExecutionIT"));
SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig);
- JobExecutionEnvironment jobExecutionEnv =
engineClient.createExecutionContext(filePath, jobConfig);
+ JobExecutionEnvironment jobExecutionEnv =
+ engineClient.createExecutionContext(filePath, jobConfig);
final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
@@ -89,7 +90,8 @@ public class JobExecutionIT {
ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
clientConfig.setClusterName(TestUtils.getClusterName("JobExecutionIT"));
SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig);
- JobExecutionEnvironment jobExecutionEnv =
engineClient.createExecutionContext(filePath, jobConfig);
+ JobExecutionEnvironment jobExecutionEnv =
+ engineClient.createExecutionContext(filePath, jobConfig);
final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
JobStatus jobStatus1 = clientJobProxy.getJobStatus();
diff --git
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
index bd8d2dfe8..07c7de46d 100644
---
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
+++
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
@@ -36,7 +36,7 @@ public class SeaTunnelClient implements
SeaTunnelClientInstance {
}
@Override
- public JobExecutionEnvironment createExecutionContext(@NonNull String
filePath, JobConfig jobConfig) {
+ public JobExecutionEnvironment createExecutionContext(@NonNull String
filePath, @NonNull JobConfig jobConfig) {
return new JobExecutionEnvironment(jobConfig, filePath,
hazelcastClient);
}
@@ -67,14 +67,14 @@ public class SeaTunnelClient implements
SeaTunnelClientInstance {
}
}
- public String getJobState(Long jobId){
+ public String getJobState(Long jobId) {
return hazelcastClient.requestOnMasterAndDecodeResponse(
SeaTunnelGetJobStateCodec.encodeRequest(jobId),
SeaTunnelGetJobStateCodec::decodeResponse
);
}
- public String listJobStatus(){
+ public String listJobStatus() {
return hazelcastClient.requestOnMasterAndDecodeResponse(
SeaTunnelListJobStatusCodec.encodeRequest(),
SeaTunnelListJobStatusCodec::decodeResponse
diff --git
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientInstance.java
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientInstance.java
index 83b117de7..3e52d5d16 100644
---
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientInstance.java
+++
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientInstance.java
@@ -21,9 +21,11 @@ import org.apache.seatunnel.engine.client.job.JobClient;
import org.apache.seatunnel.engine.client.job.JobExecutionEnvironment;
import org.apache.seatunnel.engine.common.config.JobConfig;
+import lombok.NonNull;
+
public interface SeaTunnelClientInstance {
- JobExecutionEnvironment createExecutionContext(String filePath, JobConfig
config);
+ JobExecutionEnvironment createExecutionContext(@NonNull String filePath,
@NonNull JobConfig config);
JobClient createJobClient();
diff --git
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobExecutionEnvironment.java
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobExecutionEnvironment.java
index 5f9a6ffcd..985b95cc8 100644
---
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobExecutionEnvironment.java
+++
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobExecutionEnvironment.java
@@ -66,7 +66,8 @@ public class JobExecutionEnvironment {
private final JobClient jobClient;
- public JobExecutionEnvironment(JobConfig jobConfig, String jobFilePath,
+ public JobExecutionEnvironment(JobConfig jobConfig,
+ String jobFilePath,
SeaTunnelHazelcastClient
seaTunnelHazelcastClient) {
this.jobConfig = jobConfig;
this.jobFilePath = jobFilePath;
diff --git
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
index c5dce270e..f8c118cfe 100644
---
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
+++
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
@@ -47,13 +47,13 @@ import java.util.concurrent.TimeUnit;
@DisabledOnOs(OS.WINDOWS)
public class SeaTunnelClientTest {
+ private static SeaTunnelConfig SEATUNNEL_CONFIG =
ConfigProvider.locateAndGetSeaTunnelConfig();
private static HazelcastInstance INSTANCE;
@BeforeAll
public static void beforeClass() throws Exception {
- SeaTunnelConfig seaTunnelConfig =
ConfigProvider.locateAndGetSeaTunnelConfig();
-
seaTunnelConfig.getHazelcastConfig().setClusterName(TestUtils.getClusterName("SeaTunnelClientTest"));
- INSTANCE =
HazelcastInstanceFactory.newHazelcastInstance(seaTunnelConfig.getHazelcastConfig(),
+
SEATUNNEL_CONFIG.getHazelcastConfig().setClusterName(TestUtils.getClusterName("SeaTunnelClientTest"));
+ INSTANCE =
HazelcastInstanceFactory.newHazelcastInstance(SEATUNNEL_CONFIG.getHazelcastConfig(),
Thread.currentThread().getName(),
new
SeaTunnelNodeContext(ConfigProvider.locateAndGetSeaTunnelConfig()));
}
diff --git
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java
index 6859171e8..b4f27f9c4 100644
---
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java
+++
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java
@@ -17,20 +17,22 @@
package org.apache.seatunnel.engine.core.parse;
+import org.apache.seatunnel.api.env.EnvCommonOptions;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.sink.SinkCommonOptions;
import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceCommonOptions;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.api.transform.PartitionSeaTunnelTransform;
import org.apache.seatunnel.api.transform.SeaTunnelTransform;
-import org.apache.seatunnel.apis.base.plugin.Plugin;
+import org.apache.seatunnel.api.transform.TransformCommonOptions;
import org.apache.seatunnel.common.config.TypesafeConfigUtils;
import org.apache.seatunnel.common.constants.CollectionConstants;
import org.apache.seatunnel.common.constants.JobMode;
import org.apache.seatunnel.core.starter.config.ConfigBuilder;
import org.apache.seatunnel.engine.common.config.JobConfig;
-import org.apache.seatunnel.engine.common.config.server.ServerConfigOptions;
import org.apache.seatunnel.engine.common.exception.JobDefineCheckException;
import org.apache.seatunnel.engine.common.utils.IdGenerator;
import org.apache.seatunnel.engine.core.dag.actions.Action;
@@ -106,7 +108,8 @@ public class JobConfigParser {
public ImmutablePair<List<Action>, Set<URL>> parse() {
List<? extends Config> sinkConfigs =
seaTunnelJobConfig.getConfigList("sink");
- List<? extends Config> transformConfigs =
TypesafeConfigUtils.getConfigList(seaTunnelJobConfig, "transform",
Collections.emptyList());
+ List<? extends Config> transformConfigs =
+ TypesafeConfigUtils.getConfigList(seaTunnelJobConfig, "transform",
Collections.emptyList());
List<? extends Config> sourceConfigs =
seaTunnelJobConfig.getConfigList("source");
if (CollectionUtils.isEmpty(sinkConfigs) ||
CollectionUtils.isEmpty(sourceConfigs)) {
@@ -135,13 +138,22 @@ public class JobConfigParser {
}
private void jobConfigAnalyze(@NonNull Config envConfigs) {
- if (envConfigs.hasPath("job.mode")) {
-
jobConfig.getJobContext().setJobMode(envConfigs.getEnum(JobMode.class,
"job.mode"));
+ if (envConfigs.hasPath(EnvCommonOptions.JOB_MODE.key())) {
+
jobConfig.getJobContext().setJobMode(envConfigs.getEnum(JobMode.class,
EnvCommonOptions.JOB_MODE.key()));
} else {
- jobConfig.getJobContext().setJobMode(JobMode.BATCH);
+
jobConfig.getJobContext().setJobMode(EnvCommonOptions.JOB_MODE.defaultValue());
}
- if (envConfigs.hasPath("checkpoint.interval")) {
-
jobConfig.getEnvOptions().put(ServerConfigOptions.CHECKPOINT_INTERVAL.key(),
envConfigs.getInt("checkpoint.interval"));
+
+ if (envConfigs.hasPath(EnvCommonOptions.JOB_NAME.key())) {
+
jobConfig.setName(envConfigs.getString(EnvCommonOptions.JOB_NAME.key()));
+ } else {
+ jobConfig.setName(EnvCommonOptions.JOB_NAME.defaultValue());
+ }
+
+ if (envConfigs.hasPath(EnvCommonOptions.CHECKPOINT_INTERVAL.key())) {
+ jobConfig.getEnvOptions()
+ .put(EnvCommonOptions.CHECKPOINT_INTERVAL.key(),
+
envConfigs.getInt(EnvCommonOptions.CHECKPOINT_INTERVAL.key()));
}
}
@@ -167,11 +179,11 @@ public class JobConfigParser {
sinkListImmutablePair.getLeft(),
sinkListImmutablePair.getRight());
actions.add(sinkAction);
- if (!config.hasPath(Plugin.SOURCE_TABLE_NAME)) {
- throw new JobDefineCheckException(Plugin.SOURCE_TABLE_NAME
+ if (!config.hasPath(SinkCommonOptions.SOURCE_TABLE_NAME.key())) {
+ throw new
JobDefineCheckException(SinkCommonOptions.SOURCE_TABLE_NAME
+ " must be set in the sink plugin config when the job
have complex dependencies");
}
- String sourceTableName =
config.getString(Plugin.SOURCE_TABLE_NAME);
+ String sourceTableName =
config.getString(SinkCommonOptions.SOURCE_TABLE_NAME.key());
List<Config> transformConfigList =
transformResultTableNameMap.get(sourceTableName);
SeaTunnelDataType<?> dataType;
if (CollectionUtils.isEmpty(transformConfigList)) {
@@ -235,8 +247,9 @@ public class JobConfigParser {
transformListImmutablePair.getRight());
action.addUpstream(transformAction);
- SeaTunnelDataType dataType =
transformAnalyze(config.getString(Plugin.SOURCE_TABLE_NAME),
- transformAction);
+ SeaTunnelDataType dataType =
+
transformAnalyze(config.getString(SinkCommonOptions.SOURCE_TABLE_NAME.key()),
+ transformAction);
transformListImmutablePair.getLeft().setTypeInfo(dataType);
dataTypeResult =
transformListImmutablePair.getLeft().getProducedType();
totalParallelism.set(totalParallelism.get() +
transformAction.getParallelism());
@@ -248,27 +261,27 @@ public class JobConfigParser {
private void initRelationMap(List<? extends Config> sourceConfigs, List<?
extends Config> transformConfigs) {
for (Config config : sourceConfigs) {
- if (!config.hasPath(Plugin.RESULT_TABLE_NAME)) {
- throw new JobDefineCheckException(Plugin.RESULT_TABLE_NAME
+ if (!config.hasPath(SourceCommonOptions.RESULT_TABLE_NAME.key())) {
+ throw new
JobDefineCheckException(SourceCommonOptions.RESULT_TABLE_NAME.key()
+ " must be set in the source plugin config when the job
have complex dependencies");
}
- String resultTableName =
config.getString(Plugin.RESULT_TABLE_NAME);
+ String resultTableName =
config.getString(SourceCommonOptions.RESULT_TABLE_NAME.key());
sourceResultTableNameMap.computeIfAbsent(resultTableName, k -> new
ArrayList<>());
sourceResultTableNameMap.get(resultTableName).add(config);
}
for (Config config : transformConfigs) {
- if (!config.hasPath(Plugin.RESULT_TABLE_NAME)) {
- throw new JobDefineCheckException(Plugin.RESULT_TABLE_NAME
+ if (!config.hasPath(SourceCommonOptions.RESULT_TABLE_NAME.key())) {
+ throw new
JobDefineCheckException(SourceCommonOptions.RESULT_TABLE_NAME.key()
+ " must be set in the transform plugin config when the
job have complex dependencies");
}
- if (!config.hasPath(Plugin.SOURCE_TABLE_NAME)) {
- throw new JobDefineCheckException(Plugin.SOURCE_TABLE_NAME
+ if (!config.hasPath(SinkCommonOptions.SOURCE_TABLE_NAME.key())) {
+ throw new
JobDefineCheckException(SinkCommonOptions.SOURCE_TABLE_NAME.key()
+ " must be set in the transform plugin config when the
job have complex dependencies");
}
- String resultTableName =
config.getString(Plugin.RESULT_TABLE_NAME);
- String sourceTableName =
config.getString(Plugin.SOURCE_TABLE_NAME);
+ String resultTableName =
config.getString(SourceCommonOptions.RESULT_TABLE_NAME.key());
+ String sourceTableName =
config.getString(SinkCommonOptions.SOURCE_TABLE_NAME.key());
if (Objects.equals(sourceTableName, resultTableName)) {
throw new JobDefineCheckException(String.format(
"Source{%s} and result{%s} table name cannot be equals",
sourceTableName, resultTableName));
@@ -295,7 +308,8 @@ public class JobConfigParser {
List<? extends Config> transformConfigs,
List<? extends Config> sinkConfigs) {
ImmutablePair<SeaTunnelSource, Set<URL>> pair =
- ConnectorInstanceLoader.loadSourceInstance(sourceConfigs.get(0),
jobConfig.getJobContext(), commonPluginJars);
+ ConnectorInstanceLoader.loadSourceInstance(sourceConfigs.get(0),
jobConfig.getJobContext(),
+ commonPluginJars);
SourceAction sourceAction =
createSourceAction(idGenerator.getNextId(),
pair.getLeft().getPluginName(), pair.getLeft(),
pair.getRight());
@@ -306,7 +320,8 @@ public class JobConfigParser {
if (!CollectionUtils.isEmpty(transformConfigs)) {
ImmutablePair<SeaTunnelTransform<?>, Set<URL>>
transformListImmutablePair =
-
ConnectorInstanceLoader.loadTransformInstance(transformConfigs.get(0),
jobConfig.getJobContext(), commonPluginJars);
+
ConnectorInstanceLoader.loadTransformInstance(transformConfigs.get(0),
jobConfig.getJobContext(),
+ commonPluginJars);
transformListImmutablePair.getLeft().setTypeInfo(dataType);
dataType = transformListImmutablePair.getLeft().getProducedType();
@@ -341,10 +356,10 @@ public class JobConfigParser {
private void initTransformParallelism(List<? extends Config>
transformConfigs, Action upstreamAction,
SeaTunnelTransform
seaTunnelTransform, TransformAction transformAction) {
if (seaTunnelTransform instanceof PartitionSeaTunnelTransform
- &&
transformConfigs.get(0).hasPath(CollectionConstants.PARALLELISM)) {
+ &&
transformConfigs.get(0).hasPath(TransformCommonOptions.PARALLELISM.key())) {
transformAction.setParallelism(transformConfigs
.get(0)
- .getInt(CollectionConstants.PARALLELISM));
+ .getInt(TransformCommonOptions.PARALLELISM.key()));
} else {
// If transform type is not RePartitionTransform, Using the
parallelism of its upstream operators.
transformAction.setParallelism(upstreamAction.getParallelism());
@@ -352,13 +367,13 @@ public class JobConfigParser {
}
private int getSourceParallelism(Config sourceConfig) {
- if (sourceConfig.hasPath(CollectionConstants.PARALLELISM)) {
- int sourceParallelism =
sourceConfig.getInt(CollectionConstants.PARALLELISM);
+ if (sourceConfig.hasPath(SourceCommonOptions.PARALLELISM.key())) {
+ int sourceParallelism =
sourceConfig.getInt(SourceCommonOptions.PARALLELISM.key());
return Math.max(sourceParallelism, 1);
}
int executionParallelism = 0;
- if (envConfigs.hasPath(CollectionConstants.PARALLELISM)) {
- executionParallelism =
envConfigs.getInt(CollectionConstants.PARALLELISM);
+ if (envConfigs.hasPath(EnvCommonOptions.PARALLELISM.key())) {
+ executionParallelism =
envConfigs.getInt(EnvCommonOptions.PARALLELISM.key());
}
return Math.max(executionParallelism, 1);
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServerStarter.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServerStarter.java
index 5a9b49507..757e489fe 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServerStarter.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServerStarter.java
@@ -23,6 +23,7 @@ import
org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
import com.hazelcast.instance.impl.HazelcastInstanceFactory;
import com.hazelcast.instance.impl.HazelcastInstanceImpl;
import com.hazelcast.instance.impl.HazelcastInstanceProxy;
+import lombok.NonNull;
public class SeaTunnelServerStarter {
@@ -33,6 +34,10 @@ public class SeaTunnelServerStarter {
public static HazelcastInstanceImpl createHazelcastInstance(String
clusterName) {
SeaTunnelConfig seaTunnelConfig =
ConfigProvider.locateAndGetSeaTunnelConfig();
seaTunnelConfig.getHazelcastConfig().setClusterName(clusterName);
+ return createHazelcastInstance(seaTunnelConfig);
+ }
+
+ public static HazelcastInstanceImpl createHazelcastInstance(@NonNull
SeaTunnelConfig seaTunnelConfig) {
return ((HazelcastInstanceProxy)
HazelcastInstanceFactory.newHazelcastInstance(
seaTunnelConfig.getHazelcastConfig(),
HazelcastInstanceFactory.createInstanceName(seaTunnelConfig.getHazelcastConfig()),
@@ -41,9 +46,6 @@ public class SeaTunnelServerStarter {
public static HazelcastInstanceImpl createHazelcastInstance() {
SeaTunnelConfig seaTunnelConfig =
ConfigProvider.locateAndGetSeaTunnelConfig();
- return ((HazelcastInstanceProxy)
HazelcastInstanceFactory.newHazelcastInstance(
- seaTunnelConfig.getHazelcastConfig(),
-
HazelcastInstanceFactory.createInstanceName(seaTunnelConfig.getHazelcastConfig()),
- new SeaTunnelNodeContext(seaTunnelConfig))).getOriginal();
+ return createHazelcastInstance(seaTunnelConfig);
}
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index 76cca50ad..3a2a8ca48 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -19,13 +19,13 @@ package org.apache.seatunnel.engine.server.master;
import static com.hazelcast.jet.impl.util.ExceptionUtil.withTryCatch;
+import org.apache.seatunnel.api.env.EnvCommonOptions;
import org.apache.seatunnel.common.utils.ExceptionUtils;
import org.apache.seatunnel.common.utils.RetryUtils;
import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.common.config.EngineConfig;
import org.apache.seatunnel.engine.common.config.server.CheckpointConfig;
import
org.apache.seatunnel.engine.common.config.server.CheckpointStorageConfig;
-import org.apache.seatunnel.engine.common.config.server.ServerConfigOptions;
import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
import
org.apache.seatunnel.engine.common.loader.SeatunnelChildFirstClassLoader;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
@@ -163,8 +163,8 @@ public class JobMaster extends Thread {
// TODO replace it after ReadableConfig Support parse yaml format, then
use only one config to read engine and env config.
private CheckpointConfig mergeEnvAndEngineConfig(CheckpointConfig engine,
Map<String, Object> env) {
CheckpointConfig checkpointConfig = new CheckpointConfig();
- if (env.containsKey(ServerConfigOptions.CHECKPOINT_INTERVAL.key())) {
- checkpointConfig.setCheckpointInterval((Integer)
env.get(ServerConfigOptions.CHECKPOINT_INTERVAL.key()));
+ if (env.containsKey(EnvCommonOptions.CHECKPOINT_INTERVAL.key())) {
+ checkpointConfig.setCheckpointInterval((Integer)
env.get(EnvCommonOptions.CHECKPOINT_INTERVAL.key()));
}
checkpointConfig.setCheckpointTimeout(engine.getCheckpointTimeout());
checkpointConfig.setTolerableFailureCheckpoints(engine.getTolerableFailureCheckpoints());
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractSeaTunnelTransform.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractSeaTunnelTransform.java
index df0c543f3..d0de07266 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractSeaTunnelTransform.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractSeaTunnelTransform.java
@@ -18,6 +18,8 @@
package org.apache.seatunnel.transform.common;
import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.sink.SinkCommonOptions;
+import org.apache.seatunnel.api.source.SourceCommonOptions;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
@@ -29,8 +31,8 @@ import java.util.Objects;
public abstract class AbstractSeaTunnelTransform implements
SeaTunnelTransform<SeaTunnelRow> {
- private static final String RESULT_TABLE_NAME = "result_table_name";
- private static final String SOURCE_TABLE_NAME = "source_table_name";
+ private static final String RESULT_TABLE_NAME =
SourceCommonOptions.RESULT_TABLE_NAME.key();
+ private static final String SOURCE_TABLE_NAME =
SinkCommonOptions.SOURCE_TABLE_NAME.key();
private String inputTableName;
private SeaTunnelRowType inputRowType;
diff --git
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/source/SeaTunnelSourceSupport.java
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/source/SeaTunnelSourceSupport.java
index 69b22a625..4e945d671 100644
---
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/source/SeaTunnelSourceSupport.java
+++
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/source/SeaTunnelSourceSupport.java
@@ -18,9 +18,9 @@
package org.apache.seatunnel.translation.spark.source;
import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceCommonOptions;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.common.Constants;
-import org.apache.seatunnel.common.constants.CollectionConstants;
import org.apache.seatunnel.common.utils.SerializationUtils;
import org.apache.seatunnel.translation.spark.source.batch.BatchSourceReader;
import
org.apache.seatunnel.translation.spark.source.micro.MicroBatchSourceReader;
@@ -60,14 +60,14 @@ public class SeaTunnelSourceSupport implements
DataSourceV2, ReadSupport, MicroB
@Override
public DataSourceReader createReader(DataSourceOptions options) {
SeaTunnelSource<SeaTunnelRow, ?, ?> seaTunnelSource =
getSeaTunnelSource(options);
- int parallelism = options.getInt(CollectionConstants.PARALLELISM, 1);
+ int parallelism =
options.getInt(SourceCommonOptions.PARALLELISM.key(), 1);
return new BatchSourceReader(seaTunnelSource, parallelism);
}
@Override
public MicroBatchReader createMicroBatchReader(Optional<StructType>
rowTypeOptional, String checkpointLocation, DataSourceOptions options) {
SeaTunnelSource<SeaTunnelRow, ?, ?> seaTunnelSource =
getSeaTunnelSource(options);
- Integer parallelism = options.getInt(CollectionConstants.PARALLELISM,
1);
+ Integer parallelism =
options.getInt(SourceCommonOptions.PARALLELISM.key(), 1);
Integer checkpointInterval =
options.getInt(Constants.CHECKPOINT_INTERVAL, CHECKPOINT_INTERVAL_DEFAULT);
String checkpointPath = StringUtils.replacePattern(checkpointLocation,
"sources/\\d+", "sources-state");
Configuration configuration =
SparkSession.getActiveSession().get().sparkContext().hadoopConfiguration();