This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch st-engine
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/st-engine by this push:
new e8929ab60 [DEV][Api] Replace SeaTunnelContext with JobContext and
remove singleton pattern (#2706) (#2731)
e8929ab60 is described below
commit e8929ab605abb2eb0194321bfbfcbdf7e881e423
Author: ic4y <[email protected]>
AuthorDate: Thu Sep 15 09:55:04 2022 +0800
[DEV][Api] Replace SeaTunnelContext with JobContext and remove singleton
pattern (#2706) (#2731)
---
.../{SeaTunnelContext.java => JobContext.java} | 16 +++++-----------
.../apache/seatunnel/api/sink/SeaTunnelSink.java | 4 ++--
...nelContextAware.java => SeaTunnelJobAware.java} | 8 ++++----
.../seatunnel/api/source/SeaTunnelSource.java | 2 +-
.../api/transform/SeaTunnelTransform.java | 4 ++--
.../seatunnel/assertion/sink/AssertSink.java | 7 -------
.../clickhouse/sink/client/ClickhouseSink.java | 6 ------
.../clickhouse/sink/file/ClickhouseFileSink.java | 7 -------
.../seatunnel/fake/source/FakeSource.java | 10 +++++-----
.../seatunnel/file/sink/AbstractFileSink.java | 12 ++++++------
.../connectors/seatunnel/hive/sink/HiveSink.java | 12 ++++++------
.../seatunnel/hive/source/HiveSource.java | 8 --------
.../seatunnel/http/source/HttpSource.java | 10 +++++-----
.../seatunnel/hudi/source/HudiSource.java | 8 --------
.../jdbc/internal/xa/SemanticXidGenerator.java | 6 +++---
.../seatunnel/jdbc/internal/xa/XaGroupOps.java | 4 ++--
.../seatunnel/jdbc/internal/xa/XaGroupOpsImpl.java | 4 ++--
.../seatunnel/jdbc/internal/xa/XidGenerator.java | 6 +++---
.../jdbc/sink/JdbcExactlyOnceSinkWriter.java | 6 +++---
.../connectors/seatunnel/jdbc/sink/JdbcSink.java | 12 ++++++------
.../seatunnel/jdbc/source/JdbcSource.java | 7 -------
.../seatunnel/kafka/source/KafkaSource.java | 10 +++++-----
.../seatunnel/socket/source/SocketSource.java | 10 +++++-----
.../execution/AbstractPluginExecuteProcessor.java | 4 ++++
.../starter/flink/execution/FlinkExecution.java | 11 ++++++-----
.../flink/execution/SinkExecuteProcessor.java | 7 ++++---
.../flink/execution/SourceExecuteProcessor.java | 9 +++++----
.../flink/execution/TransformExecuteProcessor.java | 4 +++-
.../execution/AbstractPluginExecuteProcessor.java | 4 ++++
.../spark/execution/SinkExecuteProcessor.java | 7 ++++---
.../spark/execution/SourceExecuteProcessor.java | 7 ++++---
.../starter/spark/execution/SparkExecution.java | 11 ++++++-----
.../spark/execution/TransformExecuteProcessor.java | 4 +++-
.../engine/client/job/ConnectorInstanceLoader.java | 16 ++++++++--------
.../engine/client/job/JobConfigParser.java | 22 +++++++++++-----------
.../seatunnel/engine/common/config/JobConfig.java | 8 ++++----
.../apache/seatunnel/engine/server/TestUtils.java | 8 ++++----
.../server/checkpoint/CheckpointPlanTest.java | 9 +++++----
.../seatunnel/engine/server/dag/TaskTest.java | 7 ++++---
.../engine/server/master/JobMasterTest.java | 7 ++++---
40 files changed, 148 insertions(+), 176 deletions(-)
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/SeaTunnelContext.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/JobContext.java
similarity index 84%
rename from
seatunnel-api/src/main/java/org/apache/seatunnel/api/common/SeaTunnelContext.java
rename to
seatunnel-api/src/main/java/org/apache/seatunnel/api/common/JobContext.java
index b6152e44f..9e56de36a 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/SeaTunnelContext.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/JobContext.java
@@ -28,14 +28,12 @@ import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
/**
- * This class is used to store the context of the application. e.g. the table
schema, catalog...etc.
+ * This class is used to store the context of the job. e.g. the table schema,
catalog...etc.
*/
-public final class SeaTunnelContext implements Serializable {
+public final class JobContext implements Serializable {
private static final long serialVersionUID = -1L;
- private static final SeaTunnelContext INSTANCE = new SeaTunnelContext();
-
// tableName -> tableSchema
private final Map<String, TableSchema> tableSchemaMap = new
ConcurrentHashMap<>(Common.COLLECTION_SIZE);
@@ -43,8 +41,8 @@ public final class SeaTunnelContext implements Serializable {
private final String jobId;
- public static SeaTunnelContext getContext() {
- return INSTANCE;
+ public JobContext() {
+ this.jobId = UUID.randomUUID().toString().replace("-", "");
}
/**
@@ -67,7 +65,7 @@ public final class SeaTunnelContext implements Serializable {
return Optional.ofNullable(tableSchemaMap.get(tableName));
}
- public SeaTunnelContext setJobMode(JobMode jobMode) {
+ public JobContext setJobMode(JobMode jobMode) {
this.jobMode = jobMode;
return this;
}
@@ -80,8 +78,4 @@ public final class SeaTunnelContext implements Serializable {
return this.jobId;
}
- private SeaTunnelContext() {
- this.jobId = UUID.randomUUID().toString().replace("-", "");
- }
-
}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SeaTunnelSink.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SeaTunnelSink.java
index 59517e409..e09d8110a 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SeaTunnelSink.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SeaTunnelSink.java
@@ -20,7 +20,7 @@ package org.apache.seatunnel.api.sink;
import org.apache.seatunnel.api.common.PluginIdentifierInterface;
import org.apache.seatunnel.api.common.SeaTunnelPluginLifeCycle;
import org.apache.seatunnel.api.serialization.Serializer;
-import org.apache.seatunnel.api.source.SeaTunnelContextAware;
+import org.apache.seatunnel.api.source.SeaTunnelJobAware;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
@@ -43,7 +43,7 @@ import java.util.Optional;
* {@link SinkAggregatedCommitter} handle it,
this class should implement interface {@link Serializable}.
*/
public interface SeaTunnelSink<IN, StateT, CommitInfoT, AggregatedCommitInfoT>
- extends Serializable, PluginIdentifierInterface, SeaTunnelPluginLifeCycle,
SeaTunnelContextAware {
+ extends Serializable, PluginIdentifierInterface, SeaTunnelPluginLifeCycle,
SeaTunnelJobAware {
/**
* Set the row type info of sink row data. This method will be
automatically called by translation.
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelContextAware.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelJobAware.java
similarity index 83%
rename from
seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelContextAware.java
rename to
seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelJobAware.java
index 429f05155..8eb90b04f 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelContextAware.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelJobAware.java
@@ -17,14 +17,14 @@
package org.apache.seatunnel.api.source;
-import org.apache.seatunnel.api.common.SeaTunnelContext;
+import org.apache.seatunnel.api.common.JobContext;
/**
- * This interface defines the runtime environment of the SeaTunnel application.
+ * This interface defines the runtime environment of the SeaTunnel job.
*/
-public interface SeaTunnelContextAware {
+public interface SeaTunnelJobAware {
- default void setSeaTunnelContext(SeaTunnelContext seaTunnelContext){
+ default void setJobContext(JobContext jobContext){
// nothing
}
}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelSource.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelSource.java
index f93f4d3bf..6edac04a8 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelSource.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelSource.java
@@ -34,7 +34,7 @@ import java.io.Serializable;
* @param <StateT> The type of checkpoint states.
*/
public interface SeaTunnelSource<T, SplitT extends SourceSplit, StateT extends
Serializable>
- extends Serializable, PluginIdentifierInterface, SeaTunnelPluginLifeCycle,
SeaTunnelContextAware {
+ extends Serializable, PluginIdentifierInterface, SeaTunnelPluginLifeCycle,
SeaTunnelJobAware {
/**
* Get the boundedness of this source.
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/transform/SeaTunnelTransform.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/transform/SeaTunnelTransform.java
index 0a9fa44d7..8842c7595 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/transform/SeaTunnelTransform.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/transform/SeaTunnelTransform.java
@@ -19,13 +19,13 @@ package org.apache.seatunnel.api.transform;
import org.apache.seatunnel.api.common.PluginIdentifierInterface;
import org.apache.seatunnel.api.common.SeaTunnelPluginLifeCycle;
-import org.apache.seatunnel.api.source.SeaTunnelContextAware;
+import org.apache.seatunnel.api.source.SeaTunnelJobAware;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import java.io.Serializable;
public interface SeaTunnelTransform<T> extends Serializable,
PluginIdentifierInterface,
- SeaTunnelPluginLifeCycle, SeaTunnelContextAware {
+ SeaTunnelPluginLifeCycle, SeaTunnelJobAware {
T map(T row);
diff --git
a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSink.java
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSink.java
index dc0c397d6..11c17a1a4 100644
---
a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSink.java
+++
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSink.java
@@ -17,7 +17,6 @@
package org.apache.seatunnel.connectors.seatunnel.assertion.sink;
-import org.apache.seatunnel.api.common.SeaTunnelContext;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
@@ -41,7 +40,6 @@ import java.util.List;
@AutoService(SeaTunnelSink.class)
public class AssertSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
private static final String RULES = "rules";
- private SeaTunnelContext seaTunnelContext;
private SeaTunnelRowType seaTunnelRowType;
private List<AssertFieldRule> assertFieldRules;
@@ -73,11 +71,6 @@ public class AssertSink extends
AbstractSimpleSink<SeaTunnelRow, Void> {
assertFieldRules = new AssertRuleParser().parseRules(configList);
}
- @Override
- public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
- this.seaTunnelContext = seaTunnelContext;
- }
-
@Override
public String getPluginName() {
return "Assert";
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java
index 295547c74..b3218aaf7 100644
---
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java
@@ -29,7 +29,6 @@ import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config
import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.USERNAME;
import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelContext;
import org.apache.seatunnel.api.serialization.DefaultSerializer;
import org.apache.seatunnel.api.serialization.Serializer;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
@@ -67,7 +66,6 @@ import java.util.Properties;
@AutoService(SeaTunnelSink.class)
public class ClickhouseSink implements SeaTunnelSink<SeaTunnelRow,
ClickhouseSinkState, CKCommitInfo, CKAggCommitInfo> {
- private SeaTunnelContext seaTunnelContext;
private ReaderOption option;
@Override
@@ -165,8 +163,4 @@ public class ClickhouseSink implements
SeaTunnelSink<SeaTunnelRow, ClickhouseSin
return this.option.getSeaTunnelRowType();
}
- @Override
- public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
- this.seaTunnelContext = seaTunnelContext;
- }
}
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java
index 05c511292..6845af1a8 100644
---
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java
@@ -30,7 +30,6 @@ import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config
import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.USERNAME;
import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelContext;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
@@ -65,7 +64,6 @@ import java.util.stream.Collectors;
@AutoService(SeaTunnelSink.class)
public class ClickhouseFileSink implements SeaTunnelSink<SeaTunnelRow,
ClickhouseSinkState, CKCommitInfo, CKAggCommitInfo> {
- private SeaTunnelContext seaTunnelContext;
private FileReaderOption readerOption;
@Override
@@ -137,9 +135,4 @@ public class ClickhouseFileSink implements
SeaTunnelSink<SeaTunnelRow, Clickhous
public SinkWriter<SeaTunnelRow, CKCommitInfo, ClickhouseSinkState>
createWriter(SinkWriter.Context context) throws IOException {
return new ClickhouseFileSinkWriter(readerOption, context);
}
-
- @Override
- public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
- this.seaTunnelContext = seaTunnelContext;
- }
}
diff --git
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
index 65e6587f4..dd1167b9b 100644
---
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
+++
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
@@ -17,7 +17,7 @@
package org.apache.seatunnel.connectors.seatunnel.fake.source;
-import org.apache.seatunnel.api.common.SeaTunnelContext;
+import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.table.type.BasicType;
@@ -37,11 +37,11 @@ import com.google.auto.service.AutoService;
public class FakeSource extends AbstractSingleSplitSource<SeaTunnelRow> {
private Config pluginConfig;
- private SeaTunnelContext seaTunnelContext;
+ private JobContext jobContext;
@Override
public Boundedness getBoundedness() {
- return JobMode.BATCH.equals(seaTunnelContext.getJobMode()) ?
Boundedness.BOUNDED : Boundedness.UNBOUNDED;
+ return JobMode.BATCH.equals(jobContext.getJobMode()) ?
Boundedness.BOUNDED : Boundedness.UNBOUNDED;
}
@Override
@@ -67,7 +67,7 @@ public class FakeSource extends
AbstractSingleSplitSource<SeaTunnelRow> {
}
@Override
- public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
- this.seaTunnelContext = seaTunnelContext;
+ public void setJobContext(JobContext jobContext) {
+ this.jobContext = jobContext;
}
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/AbstractFileSink.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/AbstractFileSink.java
index 77b72f004..84827b717 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/AbstractFileSink.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/AbstractFileSink.java
@@ -17,8 +17,8 @@
package org.apache.seatunnel.connectors.seatunnel.file.sink;
+import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelContext;
import org.apache.seatunnel.api.serialization.DefaultSerializer;
import org.apache.seatunnel.api.serialization.Serializer;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
@@ -47,7 +47,7 @@ public abstract class AbstractFileSink implements
SeaTunnelSink<SeaTunnelRow, Fi
private String jobId;
private Long checkpointId;
private SeaTunnelRowType seaTunnelRowTypeInfo;
- private SeaTunnelContext seaTunnelContext;
+ private JobContext jobContext;
private TextFileSinkConfig textFileSinkConfig;
private SinkFileSystemPlugin sinkFileSystemPlugin;
@@ -72,7 +72,7 @@ public abstract class AbstractFileSink implements
SeaTunnelSink<SeaTunnelRow, Fi
@Override
public SinkWriter<SeaTunnelRow, FileCommitInfo, FileSinkState>
createWriter(SinkWriter.Context context) throws IOException {
- if (!seaTunnelContext.getJobMode().equals(JobMode.BATCH) &&
this.getSinkConfig().getSaveMode().equals(SaveMode.OVERWRITE)) {
+ if (!jobContext.getJobMode().equals(JobMode.BATCH) &&
this.getSinkConfig().getSaveMode().equals(SaveMode.OVERWRITE)) {
throw new RuntimeException("only batch job can overwrite mode");
}
@@ -104,9 +104,9 @@ public abstract class AbstractFileSink implements
SeaTunnelSink<SeaTunnelRow, Fi
}
@Override
- public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
- this.seaTunnelContext = seaTunnelContext;
- this.jobId = seaTunnelContext.getJobId();
+ public void setJobContext(JobContext jobContext) {
+ this.jobContext = jobContext;
+ this.jobId = jobContext.getJobId();
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
index 4df91b1a5..20ee10423 100644
---
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
+++
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
@@ -17,8 +17,8 @@
package org.apache.seatunnel.connectors.seatunnel.hive.sink;
+import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelContext;
import org.apache.seatunnel.api.serialization.DefaultSerializer;
import org.apache.seatunnel.api.serialization.Serializer;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
@@ -49,7 +49,7 @@ public class HiveSink implements SeaTunnelSink<SeaTunnelRow,
HiveSinkState, Hive
private String jobId;
private Long checkpointId;
private SeaTunnelRowType seaTunnelRowTypeInfo;
- private SeaTunnelContext seaTunnelContext;
+ private JobContext jobContext;
private HiveSinkConfig hiveSinkConfig;
@Override
@@ -76,7 +76,7 @@ public class HiveSink implements SeaTunnelSink<SeaTunnelRow,
HiveSinkState, Hive
@Override
public SinkWriter<SeaTunnelRow, HiveCommitInfo, HiveSinkState>
createWriter(SinkWriter.Context context) throws IOException {
- if (!seaTunnelContext.getJobMode().equals(JobMode.BATCH) &&
hiveSinkConfig.getTextFileSinkConfig().getSaveMode().equals(SaveMode.OVERWRITE))
{
+ if (!jobContext.getJobMode().equals(JobMode.BATCH) &&
hiveSinkConfig.getTextFileSinkConfig().getSaveMode().equals(SaveMode.OVERWRITE))
{
throw new RuntimeException("only batch job can overwrite hive
table");
}
@@ -96,9 +96,9 @@ public class HiveSink implements SeaTunnelSink<SeaTunnelRow,
HiveSinkState, Hive
}
@Override
- public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
- this.seaTunnelContext = seaTunnelContext;
- this.jobId = seaTunnelContext.getJobId();
+ public void setJobContext(JobContext jobContext) {
+ this.jobContext = jobContext;
+ this.jobId = jobContext.getJobId();
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSource.java
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSource.java
index ebaf2c51b..27108620a 100644
---
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSource.java
+++
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSource.java
@@ -21,7 +21,6 @@ import static
org.apache.seatunnel.connectors.seatunnel.hive.config.SourceConfig
import static org.apache.hadoop.fs.FileSystem.FS_DEFAULT_NAME_KEY;
import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelContext;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceReader;
@@ -48,8 +47,6 @@ import java.util.List;
@AutoService(SeaTunnelSource.class)
public class HiveSource implements SeaTunnelSource<SeaTunnelRow,
HiveSourceSplit, HiveSourceState> {
- private SeaTunnelContext seaTunnelContext;
-
private SeaTunnelRowType typeInfo;
private ReadStrategy readStrategy;
@@ -90,11 +87,6 @@ public class HiveSource implements
SeaTunnelSource<SeaTunnelRow, HiveSourceSplit
}
}
- @Override
- public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
- this.seaTunnelContext = seaTunnelContext;
- }
-
@Override
public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
return this.typeInfo;
diff --git
a/seatunnel-connectors-v2/connector-http/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSource.java
b/seatunnel-connectors-v2/connector-http/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSource.java
index 0b906144b..02ea5b264 100644
---
a/seatunnel-connectors-v2/connector-http/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSource.java
+++
b/seatunnel-connectors-v2/connector-http/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSource.java
@@ -24,8 +24,8 @@ import static
org.apache.seatunnel.connectors.seatunnel.http.config.Config.METHO
import static
org.apache.seatunnel.connectors.seatunnel.http.config.Config.PARAMS;
import static org.apache.seatunnel.connectors.seatunnel.http.config.Config.URL;
+import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelContext;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.table.type.BasicType;
@@ -51,7 +51,7 @@ import java.util.stream.Collectors;
public class HttpSource extends AbstractSingleSplitSource<SeaTunnelRow> {
private final HttpSourceParameter parameter = new HttpSourceParameter();
private SeaTunnelRowType rowType;
- private SeaTunnelContext seaTunnelContext;
+ private JobContext jobContext;
@Override
public String getPluginName() {
@@ -60,7 +60,7 @@ public class HttpSource extends
AbstractSingleSplitSource<SeaTunnelRow> {
@Override
public Boundedness getBoundedness() {
- return JobMode.BATCH.equals(seaTunnelContext.getJobMode()) ?
Boundedness.BOUNDED : Boundedness.UNBOUNDED;
+ return JobMode.BATCH.equals(jobContext.getJobMode()) ?
Boundedness.BOUNDED : Boundedness.UNBOUNDED;
}
@Override
@@ -93,8 +93,8 @@ public class HttpSource extends
AbstractSingleSplitSource<SeaTunnelRow> {
}
@Override
- public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
- this.seaTunnelContext = seaTunnelContext;
+ public void setJobContext(JobContext jobContext) {
+ this.jobContext = jobContext;
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/source/HudiSource.java
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/source/HudiSource.java
index 2ca69d784..f1cbb619a 100644
---
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/source/HudiSource.java
+++
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/source/HudiSource.java
@@ -25,7 +25,6 @@ import static
org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiSourceCo
import static
org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiSourceConfig.USE_KERBEROS;
import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelContext;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceReader;
@@ -48,8 +47,6 @@ import java.io.IOException;
@AutoService(SeaTunnelSource.class)
public class HudiSource implements SeaTunnelSource<SeaTunnelRow,
HudiSourceSplit, HudiSourceState> {
- private SeaTunnelContext seaTunnelContext;
-
private SeaTunnelRowType typeInfo;
private String filePath;
@@ -103,11 +100,6 @@ public class HudiSource implements
SeaTunnelSource<SeaTunnelRow, HudiSourceSplit
}
- @Override
- public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
- this.seaTunnelContext = seaTunnelContext;
- }
-
@Override
public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
return this.typeInfo;
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/SemanticXidGenerator.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/SemanticXidGenerator.java
index 3d2a82b3d..4f51c31a5 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/SemanticXidGenerator.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/SemanticXidGenerator.java
@@ -19,7 +19,7 @@ package
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa;
import static com.google.common.base.Preconditions.checkArgument;
-import org.apache.seatunnel.api.common.SeaTunnelContext;
+import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.sink.SinkWriter;
import javax.transaction.xa.Xid;
@@ -63,7 +63,7 @@ class SemanticXidGenerator
}
@Override
- public Xid generateXid(SeaTunnelContext context, SinkWriter.Context
sinkContext, long checkpointId) {
+ public Xid generateXid(JobContext context, SinkWriter.Context sinkContext,
long checkpointId) {
byte[] jobIdBytes = context.getJobId().getBytes();
checkArgument(jobIdBytes.length <= JOB_ID_BYTES);
System.arraycopy(jobIdBytes, 0, gtridBuffer, 0, JOB_ID_BYTES);
@@ -75,7 +75,7 @@ class SemanticXidGenerator
}
@Override
- public boolean belongsToSubtask(Xid xid, SeaTunnelContext context,
SinkWriter.Context sinkContext) {
+ public boolean belongsToSubtask(Xid xid, JobContext context,
SinkWriter.Context sinkContext) {
if (xid.getFormatId() != FORMAT_ID) {
return false;
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XaGroupOps.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XaGroupOps.java
index e37e6b05a..11ebf0633 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XaGroupOps.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XaGroupOps.java
@@ -17,7 +17,7 @@
package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa;
-import org.apache.seatunnel.api.common.SeaTunnelContext;
+import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.connectors.seatunnel.jdbc.state.XidInfo;
@@ -38,6 +38,6 @@ public interface XaGroupOps
GroupXaOperationResult<XidInfo> failAndRollback(Collection<XidInfo> xids);
- void recoverAndRollback(SeaTunnelContext context, SinkWriter.Context
sinkContext, XidGenerator xidGenerator, Xid excludeXid);
+ void recoverAndRollback(JobContext context, SinkWriter.Context
sinkContext, XidGenerator xidGenerator, Xid excludeXid);
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XaGroupOpsImpl.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XaGroupOpsImpl.java
index 05ecce160..ff2012fd7 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XaGroupOpsImpl.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XaGroupOpsImpl.java
@@ -17,7 +17,7 @@
package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa;
-import org.apache.seatunnel.api.common.SeaTunnelContext;
+import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.connectors.seatunnel.jdbc.state.XidInfo;
@@ -112,7 +112,7 @@ public class XaGroupOpsImpl
}
@Override
- public void recoverAndRollback(SeaTunnelContext context,
SinkWriter.Context sinkContext, XidGenerator xidGenerator, Xid excludeXid) {
+ public void recoverAndRollback(JobContext context, SinkWriter.Context
sinkContext, XidGenerator xidGenerator, Xid excludeXid) {
Collection<Xid> recovered = xaFacade.recover();
recovered.remove(excludeXid);
if (recovered.isEmpty()) {
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XidGenerator.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XidGenerator.java
index a80175054..de3ef0059 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XidGenerator.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XidGenerator.java
@@ -17,7 +17,7 @@
package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa;
-import org.apache.seatunnel.api.common.SeaTunnelContext;
+import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.sink.SinkWriter;
import javax.transaction.xa.Xid;
@@ -31,14 +31,14 @@ import java.security.SecureRandom;
public interface XidGenerator
extends Serializable, AutoCloseable {
- Xid generateXid(SeaTunnelContext context, SinkWriter.Context sinkContext,
long checkpointId);
+ Xid generateXid(JobContext context, SinkWriter.Context sinkContext, long
checkpointId);
default void open() {}
/**
* @return true if the provided transaction belongs to this subtask
*/
- boolean belongsToSubtask(Xid xid, SeaTunnelContext context,
SinkWriter.Context sinkContext);
+ boolean belongsToSubtask(Xid xid, JobContext context, SinkWriter.Context
sinkContext);
@Override
default void close() {}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcExactlyOnceSinkWriter.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcExactlyOnceSinkWriter.java
index b4527a9ff..20461db9f 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcExactlyOnceSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcExactlyOnceSinkWriter.java
@@ -20,7 +20,7 @@ package org.apache.seatunnel.connectors.seatunnel.jdbc.sink;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
-import org.apache.seatunnel.api.common.SeaTunnelContext;
+import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSinkOptions;
@@ -53,7 +53,7 @@ public class JdbcExactlyOnceSinkWriter
private final SinkWriter.Context sinkcontext;
- private final SeaTunnelContext context;
+ private final JobContext context;
private final List<JdbcSinkState> recoverStates;
@@ -72,7 +72,7 @@ public class JdbcExactlyOnceSinkWriter
public JdbcExactlyOnceSinkWriter(
SinkWriter.Context sinkcontext,
- SeaTunnelContext context,
+ JobContext context,
JdbcStatementBuilder<SeaTunnelRow> statementBuilder,
JdbcSinkOptions jdbcSinkOptions,
List<JdbcSinkState> states) {
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
index 672303f8c..97a63b980 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
@@ -17,8 +17,8 @@
package org.apache.seatunnel.connectors.seatunnel.jdbc.sink;
+import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelContext;
import org.apache.seatunnel.api.serialization.DefaultSerializer;
import org.apache.seatunnel.api.serialization.Serializer;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
@@ -51,7 +51,7 @@ public class JdbcSink
private SeaTunnelRowType seaTunnelRowType;
- private SeaTunnelContext seaTunnelContext;
+ private JobContext jobContext;
private JdbcSinkOptions jdbcSinkOptions;
@@ -76,7 +76,7 @@ public class JdbcSink
if (jdbcSinkOptions.isExactlyOnce()) {
sinkWriter = new JdbcExactlyOnceSinkWriter(
context,
- seaTunnelContext,
+ jobContext,
statementBuilder,
jdbcSinkOptions,
new ArrayList<>()
@@ -98,7 +98,7 @@ public class JdbcSink
JdbcStatementBuilder<SeaTunnelRow> statementBuilder = (st, row) ->
JdbcUtils.setRecordToStatement(st, null, row);
return new JdbcExactlyOnceSinkWriter(
context,
- seaTunnelContext,
+ jobContext,
statementBuilder,
jdbcSinkOptions,
states
@@ -132,8 +132,8 @@ public class JdbcSink
}
@Override
- public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
- this.seaTunnelContext = seaTunnelContext;
+ public void setJobContext(JobContext jobContext) {
+ this.jobContext = jobContext;
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java
index 2717436f1..a8d0766f3 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java
@@ -18,7 +18,6 @@
package org.apache.seatunnel.connectors.seatunnel.jdbc.source;
import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelContext;
import org.apache.seatunnel.api.serialization.Serializer;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
@@ -57,7 +56,6 @@ import java.util.Map;
public class JdbcSource implements SeaTunnelSource<SeaTunnelRow,
JdbcSourceSplit, JdbcSourceState> {
protected static final Logger LOG =
LoggerFactory.getLogger(JdbcSource.class);
- private SeaTunnelContext seaTunnelContext;
private JdbcSourceOptions jdbcSourceOptions;
private SeaTunnelRowType typeInfo;
@@ -96,11 +94,6 @@ public class JdbcSource implements
SeaTunnelSource<SeaTunnelRow, JdbcSourceSplit
);
}
- @Override
- public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
- this.seaTunnelContext = seaTunnelContext;
- }
-
@Override
public Boundedness getBoundedness() {
return Boundedness.BOUNDED;
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
index a4d534e67..62314b87a 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
@@ -23,8 +23,8 @@ import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.CONS
import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.PATTERN;
import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.TOPIC;
+import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelContext;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceReader;
@@ -53,11 +53,11 @@ public class KafkaSource implements
SeaTunnelSource<SeaTunnelRow, KafkaSourceSpl
private final ConsumerMetadata metadata = new ConsumerMetadata();
private SeaTunnelRowType typeInfo;
- private SeaTunnelContext seaTunnelContext;
+ private JobContext jobContext;
@Override
public Boundedness getBoundedness() {
- return JobMode.BATCH.equals(seaTunnelContext.getJobMode()) ?
Boundedness.BOUNDED : Boundedness.UNBOUNDED;
+ return JobMode.BATCH.equals(jobContext.getJobMode()) ?
Boundedness.BOUNDED : Boundedness.UNBOUNDED;
}
@Override
@@ -119,7 +119,7 @@ public class KafkaSource implements
SeaTunnelSource<SeaTunnelRow, KafkaSourceSpl
}
@Override
- public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
- this.seaTunnelContext = seaTunnelContext;
+ public void setJobContext(JobContext jobContext) {
+ this.jobContext = jobContext;
}
}
diff --git
a/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSource.java
b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSource.java
index a237679d4..f97e00e93 100644
---
a/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSource.java
+++
b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSource.java
@@ -17,8 +17,8 @@
package org.apache.seatunnel.connectors.seatunnel.socket.source;
+import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelContext;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.table.type.BasicType;
@@ -38,11 +38,11 @@ import com.google.auto.service.AutoService;
@AutoService(SeaTunnelSource.class)
public class SocketSource extends AbstractSingleSplitSource<SeaTunnelRow> {
private SocketSourceParameter parameter;
- private SeaTunnelContext seaTunnelContext;
+ private JobContext jobContext;
@Override
public Boundedness getBoundedness() {
- return JobMode.BATCH.equals(seaTunnelContext.getJobMode()) ?
Boundedness.BOUNDED : Boundedness.UNBOUNDED;
+ return JobMode.BATCH.equals(jobContext.getJobMode()) ?
Boundedness.BOUNDED : Boundedness.UNBOUNDED;
}
@Override
@@ -56,8 +56,8 @@ public class SocketSource extends
AbstractSingleSplitSource<SeaTunnelRow> {
}
@Override
- public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
- this.seaTunnelContext = seaTunnelContext;
+ public void setJobContext(JobContext jobContext) {
+ this.jobContext = jobContext;
}
@Override
diff --git
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/AbstractPluginExecuteProcessor.java
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/AbstractPluginExecuteProcessor.java
index 5e3178bfb..1b680b9e8 100644
---
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/AbstractPluginExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/AbstractPluginExecuteProcessor.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.core.starter.flink.execution;
import static org.apache.seatunnel.apis.base.plugin.Plugin.RESULT_TABLE_NAME;
import static org.apache.seatunnel.apis.base.plugin.Plugin.SOURCE_TABLE_NAME;
+import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.common.utils.ReflectionUtils;
import org.apache.seatunnel.flink.FlinkEnvironment;
import org.apache.seatunnel.flink.util.TableUtil;
@@ -41,6 +42,7 @@ public abstract class AbstractPluginExecuteProcessor<T>
implements PluginExecute
protected final FlinkEnvironment flinkEnvironment;
protected final List<? extends Config> pluginConfigs;
+ protected final JobContext jobContext;
protected final List<T> plugins;
protected static final String ENGINE_TYPE = "seatunnel";
protected static final String PLUGIN_NAME = "plugin_name";
@@ -57,8 +59,10 @@ public abstract class AbstractPluginExecuteProcessor<T>
implements PluginExecute
};
protected AbstractPluginExecuteProcessor(FlinkEnvironment flinkEnvironment,
+ JobContext jobContext,
List<? extends Config>
pluginConfigs) {
this.flinkEnvironment = flinkEnvironment;
+ this.jobContext = jobContext;
this.pluginConfigs = pluginConfigs;
this.plugins = initializePlugins(pluginConfigs);
}
diff --git
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java
index c0cc1a67e..ec7d2a199 100644
---
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java
+++
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java
@@ -17,7 +17,7 @@
package org.apache.seatunnel.core.starter.flink.execution;
-import org.apache.seatunnel.api.common.SeaTunnelContext;
+import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.core.starter.config.EngineType;
import org.apache.seatunnel.core.starter.config.EnvironmentFactory;
import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
@@ -50,10 +50,11 @@ public class FlinkExecution implements TaskExecution {
public FlinkExecution(Config config) {
this.config = config;
this.flinkEnvironment = new
EnvironmentFactory<FlinkEnvironment>(config, EngineType.FLINK).getEnvironment();
-
SeaTunnelContext.getContext().setJobMode(flinkEnvironment.getJobMode());
- this.sourcePluginExecuteProcessor = new
SourceExecuteProcessor(flinkEnvironment, config.getConfigList("source"));
- this.transformPluginExecuteProcessor = new
TransformExecuteProcessor(flinkEnvironment, config.getConfigList("transform"));
- this.sinkPluginExecuteProcessor = new
SinkExecuteProcessor(flinkEnvironment, config.getConfigList("sink"));
+ JobContext jobContext = new JobContext();
+ jobContext.setJobMode(flinkEnvironment.getJobMode());
+ this.sourcePluginExecuteProcessor = new
SourceExecuteProcessor(flinkEnvironment, jobContext,
config.getConfigList("source"));
+ this.transformPluginExecuteProcessor = new
TransformExecuteProcessor(flinkEnvironment, jobContext,
config.getConfigList("transform"));
+ this.sinkPluginExecuteProcessor = new
SinkExecuteProcessor(flinkEnvironment, jobContext,
config.getConfigList("sink"));
}
@Override
diff --git
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
index 8de5d422d..6a06516d5 100644
---
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
@@ -17,7 +17,7 @@
package org.apache.seatunnel.core.starter.flink.execution;
-import org.apache.seatunnel.api.common.SeaTunnelContext;
+import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
@@ -46,8 +46,9 @@ public class SinkExecuteProcessor extends
AbstractPluginExecuteProcessor<SeaTunn
private static final String PLUGIN_TYPE = "sink";
protected SinkExecuteProcessor(FlinkEnvironment flinkEnvironment,
+ JobContext jobContext,
List<? extends Config> pluginConfigs) {
- super(flinkEnvironment, pluginConfigs);
+ super(flinkEnvironment, jobContext, pluginConfigs);
}
@Override
@@ -60,7 +61,7 @@ public class SinkExecuteProcessor extends
AbstractPluginExecuteProcessor<SeaTunn
SeaTunnelSink<SeaTunnelRow, Serializable, Serializable,
Serializable> seaTunnelSink =
sinkPluginDiscovery.createPluginInstance(pluginIdentifier);
seaTunnelSink.prepare(sinkConfig);
- seaTunnelSink.setSeaTunnelContext(SeaTunnelContext.getContext());
+ seaTunnelSink.setJobContext(jobContext);
return seaTunnelSink;
}).distinct().collect(Collectors.toList());
flinkEnvironment.registerPlugin(pluginJars);
diff --git
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
index a1b31836f..1dc378a83 100644
---
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
@@ -19,7 +19,7 @@ package org.apache.seatunnel.core.starter.flink.execution;
import static org.apache.flink.util.Preconditions.checkNotNull;
-import org.apache.seatunnel.api.common.SeaTunnelContext;
+import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SupportCoordinate;
import org.apache.seatunnel.common.constants.JobMode;
@@ -53,8 +53,9 @@ public class SourceExecuteProcessor extends
AbstractPluginExecuteProcessor<SeaTu
private static final String PLUGIN_TYPE = "source";
public SourceExecuteProcessor(FlinkEnvironment flinkEnvironment,
+ JobContext jobContext,
List<? extends Config> sourceConfigs) {
- super(flinkEnvironment, sourceConfigs);
+ super(flinkEnvironment, jobContext, sourceConfigs);
}
@Override
@@ -110,8 +111,8 @@ public class SourceExecuteProcessor extends
AbstractPluginExecuteProcessor<SeaTu
jars.addAll(sourcePluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier)));
SeaTunnelSource seaTunnelSource =
sourcePluginDiscovery.createPluginInstance(pluginIdentifier);
seaTunnelSource.prepare(sourceConfig);
- seaTunnelSource.setSeaTunnelContext(SeaTunnelContext.getContext());
- if (SeaTunnelContext.getContext().getJobMode() == JobMode.BATCH
+ seaTunnelSource.setJobContext(jobContext);
+ if (jobContext.getJobMode() == JobMode.BATCH
&& seaTunnelSource.getBoundedness() ==
org.apache.seatunnel.api.source.Boundedness.UNBOUNDED) {
throw new UnsupportedOperationException(String.format("'%s'
source don't support off-line job.", seaTunnelSource.getPluginName()));
}
diff --git
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
index d077f6d3a..62d65fbcd 100644
---
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.core.starter.flink.execution;
+import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
import org.apache.seatunnel.flink.FlinkEnvironment;
import org.apache.seatunnel.flink.stream.FlinkStreamTransform;
@@ -39,8 +40,9 @@ public class TransformExecuteProcessor extends
AbstractPluginExecuteProcessor<Fl
private static final String PLUGIN_TYPE = "transform";
protected TransformExecuteProcessor(FlinkEnvironment flinkEnvironment,
+ JobContext jobContext,
List<? extends Config> pluginConfigs) {
- super(flinkEnvironment, pluginConfigs);
+ super(flinkEnvironment, jobContext, pluginConfigs);
}
@Override
diff --git
a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/AbstractPluginExecuteProcessor.java
b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/AbstractPluginExecuteProcessor.java
index 0dad6f680..af087c0c4 100644
---
a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/AbstractPluginExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/AbstractPluginExecuteProcessor.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.core.starter.spark.execution;
import static org.apache.seatunnel.apis.base.plugin.Plugin.RESULT_TABLE_NAME;
import static org.apache.seatunnel.apis.base.plugin.Plugin.SOURCE_TABLE_NAME;
+import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.spark.SparkEnvironment;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
@@ -34,13 +35,16 @@ public abstract class AbstractPluginExecuteProcessor<T>
implements PluginExecute
protected final SparkEnvironment sparkEnvironment;
protected final List<? extends Config> pluginConfigs;
+ protected final JobContext jobContext;
protected final List<T> plugins;
protected static final String ENGINE_TYPE = "seatunnel";
protected static final String PLUGIN_NAME = "plugin_name";
protected AbstractPluginExecuteProcessor(SparkEnvironment sparkEnvironment,
+ JobContext jobContext,
List<? extends Config>
pluginConfigs) {
this.sparkEnvironment = sparkEnvironment;
+ this.jobContext = jobContext;
this.pluginConfigs = pluginConfigs;
this.plugins = initializePlugins(pluginConfigs);
}
diff --git
a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
index 974d0fcf0..b59bbc621 100644
---
a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
@@ -17,7 +17,7 @@
package org.apache.seatunnel.core.starter.spark.execution;
-import org.apache.seatunnel.api.common.SeaTunnelContext;
+import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
@@ -43,8 +43,9 @@ public class SinkExecuteProcessor extends
AbstractPluginExecuteProcessor<SeaTunn
private static final String PLUGIN_TYPE = "sink";
protected SinkExecuteProcessor(SparkEnvironment sparkEnvironment,
+ JobContext jobContext,
List<? extends Config> pluginConfigs) {
- super(sparkEnvironment, pluginConfigs);
+ super(sparkEnvironment, jobContext, pluginConfigs);
}
@Override
@@ -56,7 +57,7 @@ public class SinkExecuteProcessor extends
AbstractPluginExecuteProcessor<SeaTunn
pluginJars.addAll(sinkPluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier)));
SeaTunnelSink<?, ?, ?, ?> seaTunnelSink =
sinkPluginDiscovery.createPluginInstance(pluginIdentifier);
seaTunnelSink.prepare(sinkConfig);
- seaTunnelSink.setSeaTunnelContext(SeaTunnelContext.getContext());
+ seaTunnelSink.setJobContext(jobContext);
return seaTunnelSink;
}).distinct().collect(Collectors.toList());
sparkEnvironment.registerPlugin(pluginJars);
diff --git
a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
index f8f8a59b8..b76ea31f2 100644
---
a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
@@ -17,7 +17,7 @@
package org.apache.seatunnel.core.starter.spark.execution;
-import org.apache.seatunnel.api.common.SeaTunnelContext;
+import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.common.utils.SerializationUtils;
import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
@@ -43,8 +43,9 @@ public class SourceExecuteProcessor extends
AbstractPluginExecuteProcessor<SeaTu
private static final String PLUGIN_TYPE = "source";
public SourceExecuteProcessor(SparkEnvironment sparkEnvironment,
+ JobContext jobContext,
List<? extends Config> sourceConfigs) {
- super(sparkEnvironment, sourceConfigs);
+ super(sparkEnvironment, jobContext, sourceConfigs);
}
@Override
@@ -74,7 +75,7 @@ public class SourceExecuteProcessor extends
AbstractPluginExecuteProcessor<SeaTu
jars.addAll(sourcePluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier)));
SeaTunnelSource<?, ?, ?> seaTunnelSource =
sourcePluginDiscovery.createPluginInstance(pluginIdentifier);
seaTunnelSource.prepare(sourceConfig);
- seaTunnelSource.setSeaTunnelContext(SeaTunnelContext.getContext());
+ seaTunnelSource.setJobContext(jobContext);
sources.add(seaTunnelSource);
}
sparkEnvironment.registerPlugin(new ArrayList<>(jars));
diff --git
a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkExecution.java
b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkExecution.java
index 7b90bd0dd..0bb57268c 100644
---
a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkExecution.java
+++
b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkExecution.java
@@ -17,7 +17,7 @@
package org.apache.seatunnel.core.starter.spark.execution;
-import org.apache.seatunnel.api.common.SeaTunnelContext;
+import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.core.starter.config.EngineType;
import org.apache.seatunnel.core.starter.config.EnvironmentFactory;
import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
@@ -46,10 +46,11 @@ public class SparkExecution {
public SparkExecution(Config config) {
this.config = config;
this.sparkEnvironment = (SparkEnvironment) new
EnvironmentFactory<>(config, EngineType.SPARK).getEnvironment();
-
SeaTunnelContext.getContext().setJobMode(sparkEnvironment.getJobMode());
- this.sourcePluginExecuteProcessor = new
SourceExecuteProcessor(sparkEnvironment, config.getConfigList("source"));
- this.transformPluginExecuteProcessor = new
TransformExecuteProcessor(sparkEnvironment, config.getConfigList("transform"));
- this.sinkPluginExecuteProcessor = new
SinkExecuteProcessor(sparkEnvironment, config.getConfigList("sink"));
+ JobContext jobContext = new JobContext();
+ jobContext.setJobMode(sparkEnvironment.getJobMode());
+ this.sourcePluginExecuteProcessor = new
SourceExecuteProcessor(sparkEnvironment, jobContext,
config.getConfigList("source"));
+ this.transformPluginExecuteProcessor = new
TransformExecuteProcessor(sparkEnvironment, jobContext,
config.getConfigList("transform"));
+ this.sinkPluginExecuteProcessor = new
SinkExecuteProcessor(sparkEnvironment, jobContext,
config.getConfigList("sink"));
}
public void execute() throws TaskExecuteException {
diff --git
a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
index 5668457b7..66d5e1ee6 100644
---
a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.core.starter.spark.execution;
+import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
import
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSparkTransformPluginDiscovery;
@@ -39,8 +40,9 @@ public class TransformExecuteProcessor extends
AbstractPluginExecuteProcessor<Ba
private static final String PLUGIN_TYPE = "transform";
protected TransformExecuteProcessor(SparkEnvironment sparkEnvironment,
+ JobContext jobContext,
List<? extends Config> pluginConfigs) {
- super(sparkEnvironment, pluginConfigs);
+ super(sparkEnvironment, jobContext, pluginConfigs);
}
@Override
diff --git
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ConnectorInstanceLoader.java
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ConnectorInstanceLoader.java
index c62ca2871..44f6aa4f5 100644
---
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ConnectorInstanceLoader.java
+++
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ConnectorInstanceLoader.java
@@ -17,7 +17,7 @@
package org.apache.seatunnel.engine.client.job;
-import org.apache.seatunnel.api.common.SeaTunnelContext;
+import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
@@ -47,7 +47,7 @@ public class ConnectorInstanceLoader {
}
public static ImmutablePair<SeaTunnelSource, Set<URL>>
loadSourceInstance(Config sourceConfig,
-
SeaTunnelContext seaTunnelContext) {
+
JobContext jobContext) {
SeaTunnelSourcePluginDiscovery sourcePluginDiscovery = new
SeaTunnelSourcePluginDiscovery();
PluginIdentifier pluginIdentifier = PluginIdentifier.of(
CollectionConstants.SEATUNNEL_PLUGIN,
@@ -58,8 +58,8 @@ public class ConnectorInstanceLoader {
SeaTunnelSource seaTunnelSource =
sourcePluginDiscovery.createPluginInstance(pluginIdentifier);
seaTunnelSource.prepare(sourceConfig);
- seaTunnelSource.setSeaTunnelContext(seaTunnelContext);
- if (seaTunnelContext.getJobMode() == JobMode.BATCH
+ seaTunnelSource.setJobContext(jobContext);
+ if (jobContext.getJobMode() == JobMode.BATCH
&& seaTunnelSource.getBoundedness() ==
org.apache.seatunnel.api.source.Boundedness.UNBOUNDED) {
throw new UnsupportedOperationException(
String.format("'%s' source don't support off-line job.",
seaTunnelSource.getPluginName()));
@@ -68,7 +68,7 @@ public class ConnectorInstanceLoader {
}
public static ImmutablePair<SeaTunnelSink<SeaTunnelRow, Serializable,
Serializable, Serializable>, Set<URL>> loadSinkInstance(
- Config sinkConfig, SeaTunnelContext seaTunnelContext) {
+ Config sinkConfig, JobContext jobContext) {
SeaTunnelSinkPluginDiscovery sinkPluginDiscovery = new
SeaTunnelSinkPluginDiscovery();
PluginIdentifier pluginIdentifier = PluginIdentifier.of(
CollectionConstants.SEATUNNEL_PLUGIN,
@@ -79,12 +79,12 @@ public class ConnectorInstanceLoader {
sinkPluginDiscovery.createPluginInstance(pluginIdentifier);
seaTunnelSink.prepare(sinkConfig);
seaTunnelSink.setTypeInfo(null);
- seaTunnelSink.setSeaTunnelContext(seaTunnelContext);
+ seaTunnelSink.setJobContext(jobContext);
return new ImmutablePair<>(seaTunnelSink, new
HashSet<>(pluginJarPaths));
}
public static ImmutablePair<SeaTunnelTransform<?>, Set<URL>>
loadTransformInstance(Config transformConfig,
-
SeaTunnelContext seaTunnelContext) {
+
JobContext jobContext) {
SeaTunnelTransformPluginDiscovery transformPluginDiscovery = new
SeaTunnelTransformPluginDiscovery();
PluginIdentifier pluginIdentifier = PluginIdentifier.of(
CollectionConstants.SEATUNNEL_PLUGIN,
@@ -95,7 +95,7 @@ public class ConnectorInstanceLoader {
SeaTunnelTransform<?> seaTunnelTransform =
transformPluginDiscovery.createPluginInstance(pluginIdentifier);
seaTunnelTransform.prepare(transformConfig);
- seaTunnelTransform.setSeaTunnelContext(seaTunnelContext);
+ seaTunnelTransform.setJobContext(jobContext);
return new ImmutablePair<>(seaTunnelTransform, new
HashSet<>(pluginJarPaths));
}
}
diff --git
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobConfigParser.java
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobConfigParser.java
index 964bc5593..608838e9b 100644
---
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobConfigParser.java
+++
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobConfigParser.java
@@ -17,7 +17,7 @@
package org.apache.seatunnel.engine.client.job;
-import org.apache.seatunnel.api.common.SeaTunnelContext;
+import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
@@ -109,13 +109,13 @@ public class JobConfigParser {
}
private void jobConfigAnalyze(@NonNull Config envConfigs) {
- SeaTunnelContext context = SeaTunnelContext.getContext();
+ JobContext jobContext = new JobContext();
if (envConfigs.hasPath("job.mode")) {
- context.setJobMode(envConfigs.getEnum(JobMode.class, "job.mode"));
+ jobContext.setJobMode(envConfigs.getEnum(JobMode.class,
"job.mode"));
} else {
- context.setJobMode(JobMode.BATCH);
+ jobContext.setJobMode(JobMode.BATCH);
}
- jobConfig.setSeaTunnelContext(context);
+ jobConfig.setJobContext(jobContext);
}
/**
@@ -133,7 +133,7 @@ public class JobConfigParser {
for (Config config : sinkConfigs) {
ImmutablePair<SeaTunnelSink<SeaTunnelRow, Serializable,
Serializable, Serializable>, Set<URL>>
sinkListImmutablePair =
- ConnectorInstanceLoader.loadSinkInstance(config,
jobConfig.getSeaTunnelContext());
+ ConnectorInstanceLoader.loadSinkInstance(config,
jobConfig.getJobContext());
SinkAction sinkAction =
createSinkAction(idGenerator.getNextId(),
sinkListImmutablePair.getLeft().getPluginName(),
@@ -173,7 +173,7 @@ public class JobConfigParser {
AtomicInteger totalParallelism = new AtomicInteger();
for (Config sourceConfig : sourceConfigList) {
ImmutablePair<SeaTunnelSource, Set<URL>>
seaTunnelSourceListImmutablePair =
- ConnectorInstanceLoader.loadSourceInstance(sourceConfig,
jobConfig.getSeaTunnelContext());
+ ConnectorInstanceLoader.loadSourceInstance(sourceConfig,
jobConfig.getJobContext());
dataType =
seaTunnelSourceListImmutablePair.getLeft().getProducedType();
SourceAction sourceAction = createSourceAction(
idGenerator.getNextId(),
@@ -200,7 +200,7 @@ public class JobConfigParser {
SeaTunnelDataType<?> dataTypeResult = null;
for (Config config : transformConfigList) {
ImmutablePair<SeaTunnelTransform<?>, Set<URL>>
transformListImmutablePair =
- ConnectorInstanceLoader.loadTransformInstance(config,
jobConfig.getSeaTunnelContext());
+ ConnectorInstanceLoader.loadTransformInstance(config,
jobConfig.getJobContext());
TransformAction transformAction = createTransformAction(
idGenerator.getNextId(),
transformListImmutablePair.getLeft().getPluginName(),
@@ -264,20 +264,20 @@ public class JobConfigParser {
List<? extends Config> transformConfigs,
List<? extends Config> sinkConfigs) {
ImmutablePair<SeaTunnelSource, Set<URL>> pair =
- ConnectorInstanceLoader.loadSourceInstance(sourceConfigs.get(0),
jobConfig.getSeaTunnelContext());
+ ConnectorInstanceLoader.loadSourceInstance(sourceConfigs.get(0),
jobConfig.getJobContext());
SourceAction sourceAction =
createSourceAction(idGenerator.getNextId(),
pair.getLeft().getPluginName(), pair.getLeft(),
pair.getRight());
sourceAction.setParallelism(getSourceParallelism(sourceConfigs.get(0)));
SeaTunnelDataType dataType =
sourceAction.getSource().getProducedType();
ImmutablePair<SeaTunnelSink<SeaTunnelRow, Serializable, Serializable,
Serializable>, Set<URL>>
- sinkListImmutablePair =
ConnectorInstanceLoader.loadSinkInstance(sinkConfigs.get(0),
jobConfig.getSeaTunnelContext());
+ sinkListImmutablePair =
ConnectorInstanceLoader.loadSinkInstance(sinkConfigs.get(0),
jobConfig.getJobContext());
Action sinkUpstreamAction = sourceAction;
if (!CollectionUtils.isEmpty(transformConfigs)) {
ImmutablePair<SeaTunnelTransform<?>, Set<URL>>
transformListImmutablePair =
-
ConnectorInstanceLoader.loadTransformInstance(transformConfigs.get(0),
jobConfig.getSeaTunnelContext());
+
ConnectorInstanceLoader.loadTransformInstance(transformConfigs.get(0),
jobConfig.getJobContext());
transformListImmutablePair.getLeft().setTypeInfo(dataType);
dataType = transformListImmutablePair.getLeft().getProducedType();
diff --git
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/JobConfig.java
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/JobConfig.java
index 2b31a1115..2fb28b14d 100644
---
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/JobConfig.java
+++
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/JobConfig.java
@@ -17,7 +17,7 @@
package org.apache.seatunnel.engine.common.config;
-import org.apache.seatunnel.api.common.SeaTunnelContext;
+import org.apache.seatunnel.api.common.JobContext;
import
org.apache.seatunnel.engine.common.serializeable.ConfigDataSerializerHook;
import com.hazelcast.nio.ObjectDataInput;
@@ -30,7 +30,7 @@ import java.io.IOException;
@Data
public class JobConfig implements IdentifiedDataSerializable {
private String name;
- private SeaTunnelContext seaTunnelContext;
+ private JobContext jobContext;
@Override
public int getFactoryId() {
@@ -45,12 +45,12 @@ public class JobConfig implements
IdentifiedDataSerializable {
@Override
public void writeData(ObjectDataOutput out) throws IOException {
out.writeString(name);
- out.writeObject(seaTunnelContext);
+ out.writeObject(jobContext);
}
@Override
public void readData(ObjectDataInput in) throws IOException {
this.name = in.readString();
- this.seaTunnelContext = in.readObject();
+ this.jobContext = in.readObject();
}
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TestUtils.java
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TestUtils.java
index 518f409c3..dbd2ef76d 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TestUtils.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TestUtils.java
@@ -17,7 +17,7 @@
package org.apache.seatunnel.engine.server;
-import org.apache.seatunnel.api.common.SeaTunnelContext;
+import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSink;
import org.apache.seatunnel.connectors.seatunnel.fake.source.FakeSource;
import org.apache.seatunnel.engine.common.utils.IdGenerator;
@@ -36,10 +36,10 @@ import java.net.URL;
public class TestUtils {
@SuppressWarnings("checkstyle:MagicNumber")
- public static LogicalDag getTestLogicalDag() throws MalformedURLException {
+ public static LogicalDag getTestLogicalDag(JobContext jobContext) throws
MalformedURLException {
IdGenerator idGenerator = new IdGenerator();
FakeSource fakeSource = new FakeSource();
- fakeSource.setSeaTunnelContext(SeaTunnelContext.getContext());
+ fakeSource.setJobContext(jobContext);
Action fake = new SourceAction<>(idGenerator.getNextId(), "fake",
fakeSource,
Sets.newHashSet(new URL("file:///fake.jar")));
@@ -47,7 +47,7 @@ public class TestUtils {
LogicalVertex fakeVertex = new LogicalVertex(fake.getId(), fake, 3);
ConsoleSink consoleSink = new ConsoleSink();
- consoleSink.setSeaTunnelContext(SeaTunnelContext.getContext());
+ consoleSink.setJobContext(jobContext);
Action console = new SinkAction<>(idGenerator.getNextId(), "console",
consoleSink,
Sets.newHashSet(new URL("file:///console.jar")));
console.setParallelism(3);
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointPlanTest.java
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointPlanTest.java
index 5fb54aaad..1913f0c84 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointPlanTest.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointPlanTest.java
@@ -17,7 +17,7 @@
package org.apache.seatunnel.engine.server.checkpoint;
-import org.apache.seatunnel.api.common.SeaTunnelContext;
+import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.common.constants.JobMode;
import org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSink;
import org.apache.seatunnel.connectors.seatunnel.fake.source.FakeSource;
@@ -77,16 +77,17 @@ public class CheckpointPlanTest extends
AbstractSeaTunnelServerTest {
}
private static void fillVirtualVertex(IdGenerator idGenerator, LogicalDag
logicalDag, int parallelism) {
- SeaTunnelContext.getContext().setJobMode(JobMode.BATCH);
+ JobContext jobContext = new JobContext();
+ jobContext.setJobMode(JobMode.BATCH);
FakeSource fakeSource = new FakeSource();
- fakeSource.setSeaTunnelContext(SeaTunnelContext.getContext());
+ fakeSource.setJobContext(jobContext);
Action fake = new SourceAction<>(idGenerator.getNextId(), "fake",
fakeSource, Collections.emptySet());
fake.setParallelism(parallelism);
LogicalVertex fakeVertex = new LogicalVertex(fake.getId(), fake,
parallelism);
ConsoleSink consoleSink = new ConsoleSink();
- consoleSink.setSeaTunnelContext(SeaTunnelContext.getContext());
+ consoleSink.setJobContext(jobContext);
Action console = new SinkAction<>(idGenerator.getNextId(), "console",
consoleSink, Collections.emptySet());
console.setParallelism(parallelism);
LogicalVertex consoleVertex = new LogicalVertex(console.getId(),
console, parallelism);
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
index 125776a29..d9bb880da 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
@@ -17,7 +17,7 @@
package org.apache.seatunnel.engine.server.dag;
-import org.apache.seatunnel.api.common.SeaTunnelContext;
+import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.common.constants.JobMode;
import org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSink;
import org.apache.seatunnel.connectors.seatunnel.fake.source.FakeSource;
@@ -50,8 +50,9 @@ public class TaskTest extends AbstractSeaTunnelServerTest {
@Test
public void testTask() throws MalformedURLException {
- SeaTunnelContext.getContext().setJobMode(JobMode.BATCH);
- LogicalDag testLogicalDag = TestUtils.getTestLogicalDag();
+ JobContext jobContext = new JobContext();
+ jobContext.setJobMode(JobMode.BATCH);
+ LogicalDag testLogicalDag = TestUtils.getTestLogicalDag(jobContext);
JobConfig config = new JobConfig();
config.setName("test");
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMasterTest.java
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMasterTest.java
index 227c347e7..5c98b4bc2 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMasterTest.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMasterTest.java
@@ -19,7 +19,7 @@ package org.apache.seatunnel.engine.server.master;
import static org.awaitility.Awaitility.await;
-import org.apache.seatunnel.api.common.SeaTunnelContext;
+import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.common.constants.JobMode;
import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.common.config.JobConfig;
@@ -52,8 +52,9 @@ public class JobMasterTest extends
AbstractSeaTunnelServerTest {
@Test
public void testHandleCheckpointTimeout() throws Exception {
- SeaTunnelContext.getContext().setJobMode(JobMode.STREAMING);
- LogicalDag testLogicalDag = TestUtils.getTestLogicalDag();
+ JobContext jobContext = new JobContext();
+ jobContext.setJobMode(JobMode.STREAMING);
+ LogicalDag testLogicalDag = TestUtils.getTestLogicalDag(jobContext);
JobConfig config = new JobConfig();
config.setName("test_checkpoint_timeout");