This is an automated email from the ASF dual-hosted git repository.
corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new e7de54e4e7 [Feature][Zeta] extended jobStateEvent event listening for
zeta (#9689)
e7de54e4e7 is described below
commit e7de54e4e7af8fc2349a81a0fa8369713471b60d
Author: zhangdonghao <[email protected]>
AuthorDate: Wed Aug 20 10:14:01 2025 +0800
[Feature][Zeta] extended jobStateEvent event listening for zeta (#9689)
---
docs/en/concept/event-listener.md | 183 ++++++++++++++++++++-
docs/zh/concept/event-listener.md | 178 ++++++++++++++++++++
.../org/apache/seatunnel/api/event/EventType.java | 1 +
.../seatunnel/command/ClientExecuteCommand.java | 4 +-
.../engine/e2e/ClusterFaultToleranceIT.java | 2 +-
.../e2e/ClusterFaultToleranceTwoPipelineIT.java | 2 +-
.../org/apache/seatunnel/engine/e2e/ClusterIT.java | 4 +-
.../seatunnel/engine/e2e/JobExecutionIT.java | 4 +-
.../seatunnel/engine/e2e/MultiTableMetricsIT.java | 2 +-
.../org/apache/seatunnel/engine/e2e/RestApiIT.java | 2 +-
.../seatunnel/engine/e2e/SeaTunnelSlotIT.java | 2 +-
.../engine/e2e/SplitClusterFaultToleranceIT.java | 2 +-
.../apache/seatunnel/engine/e2e/TextHeaderIT.java | 2 +-
.../engine/client/job/ClientJobProxy.java | 4 +-
.../seatunnel/engine/client/job/JobClient.java | 4 +-
.../engine/client/job/JobStatusRunner.java | 2 +-
.../engine/client/util/ContentFormatUtil.java | 2 +-
.../engine/client/ConnectorPackageClientTest.java | 2 +-
.../engine/client/ContentFormatUtilTest.java | 4 +-
.../engine/client/SeaTunnelClientTest.java | 2 +-
.../client/SeaTunnelEngineClusterRoleTest.java | 4 +-
.../seatunnel/engine/common}/job/JobResult.java | 2 +-
.../engine/common/job/JobStateEvent.java} | 38 +++--
.../seatunnel/engine/common}/job/JobStatus.java | 2 +-
.../engine/common}/job/JobStatusData.java | 2 +-
.../org/apache/seatunnel/engine/core/job/Job.java | 2 +
.../engine/server/CoordinatorService.java | 4 +-
.../server/checkpoint/CheckpointManager.java | 2 +-
.../engine/server/dag/physical/PhysicalPlan.java | 32 ++--
.../engine/server/master/JobHistoryService.java | 4 +-
.../seatunnel/engine/server/master/JobMaster.java | 9 +-
.../server/operation/GetJobStatusOperation.java | 2 +-
.../opeartion/GetOverviewOperation.java | 2 +-
.../engine/server/rest/service/BaseService.java | 2 +-
.../engine/server/ConnectorPackageServiceTest.java | 2 +-
.../engine/server/CoordinatorServiceTest.java | 2 +-
...CoordinatorServiceWithCancelPendingJobTest.java | 2 +-
.../checkpoint/CheckpointErrorRestoreEndTest.java | 2 +-
.../server/checkpoint/CheckpointManagerTest.java | 2 +-
.../server/checkpoint/CheckpointStorageTest.java | 2 +-
.../server/checkpoint/CheckpointTimeOutTest.java | 2 +-
.../engine/server/checkpoint/SavePointTest.java | 2 +-
.../engine/server/event/JobStateEventTest.java | 105 ++++++++++++
.../server/master/JobHistoryServiceTest.java | 4 +-
.../engine/server/master/JobMasterTest.java | 4 +-
.../engine/server/master/JobMetricsTest.java | 2 +-
46 files changed, 570 insertions(+), 76 deletions(-)
diff --git a/docs/en/concept/event-listener.md
b/docs/en/concept/event-listener.md
index 7ba4550205..b567b8152c 100644
--- a/docs/en/concept/event-listener.md
+++ b/docs/en/concept/event-listener.md
@@ -21,9 +21,26 @@ The event API is defined in the
`org.apache.seatunnel.api.event` package.
- `org.apache.seatunnel.api.event.Event` - The interface for event data.
- `org.apache.seatunnel.api.event.EventType` - The enum for event type.
+#### EventType Enumeration Description
+The `EventType` enumeration defines all possible event types in the system,
mainly including:
+
+| Event Type | Description |
Associated Event Class |
+|---------------------------------|---------------------------------|---------------------------------|
+| `JOB_STATUS` | Job status change event |
`JobStateEvent` |
+| `SCHEMA_CHANGE_UPDATE_COLUMNS` | Table structure update event |
`AlterTableColumnsEvent` |
+| `SCHEMA_CHANGE_ADD_COLUMN` | Table column addition event |
`AlterTableAddColumnEvent` |
+| `SCHEMA_CHANGE_DROP_COLUMN` | Table column deletion event |
`AlterTableDropColumnEvent` |
+| `SCHEMA_CHANGE_MODIFY_COLUMN` | Table column modification event |
`AlterTableModifyColumnEvent` |
+| `READER_OPEN` | Reader open event |
`ReaderOpenEvent` |
+| `READER_CLOSE` | Reader close event |
`ReaderCloseEvent` |
+| `WRITER_OPEN` | Writer open event |
`WriterOpenEvent` |
+| `WRITER_CLOSE` | Writer close event |
`WriterCloseEvent` |
+
+> Note: Different event types correspond to different event data structures.
When customizing an event handler, you need to judge the type through
`event.getEventType()` to ensure type-safe conversion.
+
### Event Listener API
-You can customize event handler, such as sending events to external systems
+You can customize event handler, such as sending events to external systems.
- `org.apache.seatunnel.api.event.EventHandler` - The interface for event
handler, SPI will automatically load subclass from the classpath.
@@ -114,3 +131,167 @@ Example:
`org.apache.seatunnel.api.event.LoggingEventHandler`
### Spark Engine
You can define the implementation class of
`org.apache.seatunnel.api.event.EventHandler` interface and add to the
classpath to automatically load it through SPI.
+
+## Steps to Implement a Custom Event Handler
+
+The following takes `JobStateEvent` as an example to illustrate how to
implement a custom event handler. You can extend this method to handle other
types of events as needed.
+
+### 1. Add Dependencies
+
+Introduce the necessary dependencies in the project's `pom.xml`:
+```xml
+<dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-api</artifactId>
+ <version>${seatunnel.version}</version>
+ <scope>provided</scope>
+</dependency>
+<dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-engine-common</artifactId>
+ <version>${seatunnel.version}</version>
+ <scope>provided</scope>
+</dependency>
+```
+> Note: Replace `${seatunnel.version}` with the actual SeaTunnel version used.
+
+
+### 2. Implement the Event Handler
+
+Create a custom class that implements the
`org.apache.seatunnel.api.event.EventHandler` interface, override the `handle`
method, and implement business logic for the event types to be processed.
+
+**Core Logic**: Filter event types through `event.getEventType()` — since the
SeaTunnel engine distributes various types of events, you need to explicitly
judge the event type to ensure only target events are processed.
+
+```java
+import lombok.extern.slf4j.Slf4j;
+import org.apache.seatunnel.api.event.Event;
+import org.apache.seatunnel.api.event.EventHandler;
+import org.apache.seatunnel.api.event.EventType;
+import org.apache.seatunnel.engine.common.job.JobStatus;
+import org.apache.seatunnel.engine.common.job.JobStateEvent;
+import org.apache.seatunnel.api.event.schema.AlterTableAddColumnEvent;
+import org.apache.seatunnel.api.event.source.ReaderOpenEvent;
+import org.apache.seatunnel.api.event.sink.WriterCloseEvent;
+
+/**
+ * Example of a custom multi-type event handler, including processing logic
for multiple events
+ */
+@Slf4j
+public class CustomMultiEventHandler implements EventHandler {
+
+ @Override
+ public void handle(Event event) {
+ // Process differently based on event type
+ EventType eventType = event.getEventType();
+
+ switch (eventType) {
+ case JOB_STATUS:
+ handleJobStateEvent((JobStateEvent) event);
+ break;
+ case SCHEMA_CHANGE_ADD_COLUMN:
+ handleAddColumnEvent((AlterTableAddColumnEvent) event);
+ break;
+ case READER_OPEN:
+ handleReaderOpenEvent((ReaderOpenEvent) event);
+ break;
+ case WRITER_CLOSE:
+ handleWriterCloseEvent((WriterCloseEvent) event);
+ break;
+ // Add processing for other event types as needed
+ default:
+ // Ignore unprocessed event types
+ log.debug("Ignoring unprocessed event type: {}", eventType);
+ }
+ }
+
+ /**
+ * Handle job state events
+ */
+ private void handleJobStateEvent(JobStateEvent jobEvent) {
+ String jobId = jobEvent.getJobId();
+ String jobName = jobEvent.getJobName();
+ JobStatus status = jobEvent.getJobStatus();
+ long eventTime = jobEvent.getCreatedTime();
+
+ switch (status) {
+ case FAILED:
+ log.error("Job failed | jobId: {}, jobName: {}, Time: {}",
+ jobId, jobName, eventTime);
+ // Add failure alert logic
+ sendAlert("Job Failure", "jobId: " + jobId);
+ break;
+ case FINISHED:
+ log.info("Job completed | jobId: {}, jobName: {}, Time: {}",
+ jobId, jobName, eventTime);
+ break;
+ // Handle other statuses...
+ default:
+ log.info("Job status changed | jobId: {}, Status: {}, Time:
{}",
+ jobId, status, eventTime);
+ }
+ }
+
+ /**
+ * Handle table column addition events
+ */
+ private void handleAddColumnEvent(AlterTableAddColumnEvent event) {
+ log.info("Column added to table | Table Name: {}, Added Columns: {},
Time: {}",
+ event.getTableName(), event.getAddedColumns(),
event.getEventTime());
+ // Handle table structure change logic
+ }
+
+ /**
+ * Handle reader open events
+ */
+ private void handleReaderOpenEvent(ReaderOpenEvent event) {
+ log.info("Reader opened | Plugin ID: {}, Parallelism: {}, Time: {}",
+ event.getPluginId(), event.getParallelism(), event.getEventTime());
+ // Handle reader initialization logic
+ }
+
+ /**
+ * Handle writer close events
+ */
+ private void handleWriterCloseEvent(WriterCloseEvent event) {
+ log.info("Writer closed | Plugin ID: {}, Processed Record Count: {},
Time: {}",
+ event.getPluginId(), event.getRecordCount(), event.getEventTime());
+ // Handle writer resource cleanup logic
+ }
+
+ /**
+ * Send alert notifications
+ */
+ private void sendAlert(String title, String content) {
+ // Implement alert logic (e.g., calling HTTP APIs, sending emails,
etc.)
+ log.info("[Alert] {}: {}", title, content);
+ }
+}
+```
+
+
+### 3. Configure SPI Loading
+
+To enable the engine to automatically discover and load the custom handler,
add an SPI configuration file in the project's resource directory:
+
+1. Create the directory: `src/main/resources/META-INF/services/`
+2. Create a new file: `org.apache.seatunnel.api.event.EventHandler`
+3. Add the fully qualified class name of the custom handler to the file:
+ ```
+ com.example.CustomMultiEventHandler
+ ```
+
+
+### 4. Deployment and Verification
+- Place the JAR package containing the custom handler into the SeaTunnel
engine's classpath (e.g., the `lib/` directory)
+- After starting the task, when the corresponding event occurs, the handler
will be triggered automatically and execute the corresponding processing logic
+- Verify whether the handler works properly through log output
+
+
+### Notes
+- The handler logic should be as lightweight as possible to avoid blocking the
event processing thread
+- If network calls are required (e.g., sending alerts), it is recommended to
implement them in an asynchronous manner to prevent timeouts from affecting the
task itself
+- Different engines may have different levels of support for events; for
example, `JobStateEvent` currently only supports the Zeta engine
+- Event types and event classes are in a one-to-one correspondence; ensure
type matching during conversion to avoid `ClassCastException`
+- You can implement multiple event handlers to process different types of
events respectively, or handle multiple event types in a single handler
+
+Through the above steps, you can flexibly monitor and process various events
in SeaTunnel, and implement custom business logic such as status monitoring,
alert notifications, and data statistics.
\ No newline at end of file
diff --git a/docs/zh/concept/event-listener.md
b/docs/zh/concept/event-listener.md
index 69972cbfc5..7f33736ab0 100644
--- a/docs/zh/concept/event-listener.md
+++ b/docs/zh/concept/event-listener.md
@@ -19,6 +19,23 @@ SeaTunnel提供了丰富的事件监听器功能,用于管理数据同步时
- `org.apache.seatunnel.api.event.Event` - 事件数据的接口。
- `org.apache.seatunnel.api.event.EventType` - 事件数据的枚举值。
+#### EventType 枚举说明
+`EventType`枚举定义了系统中所有可能的事件类型,主要包括:
+
+| 事件类型 | 说明 | 关联事件类 |
+|--------------------------------|----------|-------------------------------|
+| `JOB_STATUS` | 作业状态变更事件 | `JobStateEvent` |
+| `SCHEMA_CHANGE_UPDATE_COLUMNS` | 表结构更新事件 | `AlterTableColumnsEvent` |
+| `SCHEMA_CHANGE_ADD_COLUMN` | 表添加列事件 | `AlterTableAddColumnEvent` |
+| `SCHEMA_CHANGE_DROP_COLUMN` | 表删除列事件 | `AlterTableDropColumnEvent` |
+| `SCHEMA_CHANGE_MODIFY_COLUMN` | 表修改列事件 | `AlterTableModifyColumnEvent` |
+| `READER_OPEN` | 读取器打开事件 | `ReaderOpenEvent` |
+| `READER_CLOSE` | 读取器关闭事件 | `ReaderCloseEvent` |
+| `WRITER_OPEN` | 写入器打开事件 | `WriterOpenEvent` |
+| `WRITER_CLOSE` | 写入器关闭事件 | `WriterCloseEvent` |
+
+> 注意:不同事件类型对应不同的事件数据结构,在自定义事件处理器时需通过`event.getEventType()`进行类型判断,以确保类型安全转换。
+
### Event Listener API
您可以自定义事件处理器,例如将事件发送到外部系统。
@@ -112,3 +129,164 @@ seatunnel:
### Spark 引擎
您可以定义 `org.apache.seatunnel.api.event.EventHandler` 接口并添加到类路径,SPI会自动加载。
+
+## 自定义事件处理器实现步骤
+
+下面以 `JobStateEvent` 为例,介绍如何实现一个自定义事件处理器,您可以根据需要扩展此方法以处理其他类型的事件。
+
+### 1. 添加依赖
+在项目 `pom.xml` 中引入必要依赖:
+```xml
+<dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-api</artifactId>
+ <version>${seatunnel.version}</version>
+ <scope>provided</scope>
+</dependency>
+<dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-engine-common</artifactId>
+ <version>${seatunnel.version}</version>
+ <scope>provided</scope>
+</dependency>
+```
+> 注意:需将 `${seatunnel.version}` 替换为实际使用的 SeaTunnel 版本。
+
+
+### 2. 实现事件处理器
+自定义类实现 `org.apache.seatunnel.api.event.EventHandler` 接口,并重写 `handle`
方法,针对需要处理的事件类型进行业务逻辑处理。
+
+**核心逻辑**:通过 `event.getEventType()` 过滤事件类型——由于 SeaTunnel
引擎会分发多种类型的事件,需显式判断事件类型,以确保仅处理目标事件。
+
+```java
+import lombok.extern.slf4j.Slf4j;
+import org.apache.seatunnel.api.event.Event;
+import org.apache.seatunnel.api.event.EventHandler;
+import org.apache.seatunnel.api.event.EventType;
+import org.apache.seatunnel.engine.common.job.JobStatus;
+import org.apache.seatunnel.engine.common.job.JobStateEvent;
+import org.apache.seatunnel.api.event.schema.AlterTableAddColumnEvent;
+import org.apache.seatunnel.api.event.source.ReaderOpenEvent;
+import org.apache.seatunnel.api.event.sink.WriterCloseEvent;
+
+/**
+ * 自定义多类型事件处理器示例,包含多种事件的处理逻辑
+ */
+@Slf4j
+public class CustomMultiEventHandler implements EventHandler {
+
+ @Override
+ public void handle(Event event) {
+ // 根据事件类型进行不同处理
+ EventType eventType = event.getEventType();
+
+ switch (eventType) {
+ case JOB_STATUS:
+ handleJobStateEvent((JobStateEvent) event);
+ break;
+ case SCHEMA_CHANGE_ADD_COLUMN:
+ handleAddColumnEvent((AlterTableAddColumnEvent) event);
+ break;
+ case READER_OPEN:
+ handleReaderOpenEvent((ReaderOpenEvent) event);
+ break;
+ case WRITER_CLOSE:
+ handleWriterCloseEvent((WriterCloseEvent) event);
+ break;
+ // 可根据需要添加其他事件类型的处理
+ default:
+ // 忽略不处理的事件类型
+ log.debug("忽略未处理的事件类型: {}", eventType);
+ }
+ }
+
+ /**
+ * 处理作业状态事件
+ */
+ private void handleJobStateEvent(JobStateEvent jobEvent) {
+ String jobId = jobEvent.getJobId();
+ String jobName = jobEvent.getJobName();
+ JobStatus status = jobEvent.getJobStatus();
+ long eventTime = jobEvent.getCreatedTime();
+
+ switch (status) {
+ case FAILED:
+ log.error("任务失败 | jobId: {}, jobName: {}, 时间: {}",
+ jobId, jobName, eventTime);
+ // 添加失败告警逻辑
+ sendAlert("任务失败", "jobId: " + jobId);
+ break;
+ case FINISHED:
+ log.info("任务完成 | jobId: {}, jobName: {}, 时间: {}",
+ jobId, jobName, eventTime);
+ break;
+ // 处理其他状态...
+ default:
+ log.info("任务状态变更 | jobId: {}, 状态: {}, 时间: {}",
+ jobId, status, eventTime);
+ }
+ }
+
+ /**
+ * 处理表添加列事件
+ */
+ private void handleAddColumnEvent(AlterTableAddColumnEvent event) {
+ log.info("表添加列 | 表名: {}, 新增列: {}, 时间: {}",
+ event.getTableName(), event.getAddedColumns(),
event.getEventTime());
+ // 处理表结构变更逻辑
+ }
+
+ /**
+ * 处理读取器打开事件
+ */
+ private void handleReaderOpenEvent(ReaderOpenEvent event) {
+ log.info("读取器打开 | 插件ID: {}, 并行度: {}, 时间: {}",
+ event.getPluginId(), event.getParallelism(), event.getEventTime());
+ // 处理读取器初始化逻辑
+ }
+
+ /**
+ * 处理写入器关闭事件
+ */
+ private void handleWriterCloseEvent(WriterCloseEvent event) {
+ log.info("写入器关闭 | 插件ID: {}, 处理记录数: {}, 时间: {}",
+ event.getPluginId(), event.getRecordCount(), event.getEventTime());
+ // 处理写入器资源清理逻辑
+ }
+
+ /**
+ * 发送告警通知
+ */
+ private void sendAlert(String title, String content) {
+ // 实现告警逻辑(如调用HTTP接口、发送邮件等)
+ log.info("[告警] {}: {}", title, content);
+ }
+}
+```
+
+
+### 3. 配置 SPI 加载
+为使引擎自动发现并加载自定义处理器,需在项目资源目录中添加 SPI 配置文件:
+
+1. 创建目录:`src/main/resources/META-INF/services/`
+2. 新建文件:`org.apache.seatunnel.api.event.EventHandler`
+3. 在文件中添加自定义处理器的全类名:
+ ```
+ com.example.CustomMultiEventHandler
+ ```
+
+
+### 4. 部署与验证
+- 将包含自定义处理器的 JAR 包放入 SeaTunnel 引擎的类路径(如 `lib/` 目录)
+- 启动任务后,当对应事件发生时,处理器会自动触发并执行相应的处理逻辑
+- 可通过日志输出验证处理器是否生效
+
+
+### 注意事项
+- 处理器逻辑应尽量轻量,避免阻塞事件处理线程
+- 若需网络调用(如发送告警),建议使用异步方式实现,防止超时影响任务本身
+- 不同引擎对事件的支持情况可能不同,例如 `JobStateEvent` 目前仅支持 Zeta 引擎
+- 事件类型与事件类是一一对应的,转换时需确保类型匹配,避免 `ClassCastException`
+- 可以根据业务需求,实现多个事件处理器分别处理不同类型的事件,也可以在一个处理器中处理多种事件类型
+
+通过上述步骤,您可以灵活地监听和处理 SeaTunnel 中的各种事件,实现自定义的业务逻辑,如状态监控、告警通知、数据统计等功能。
\ No newline at end of file
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/event/EventType.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/event/EventType.java
index edb1b72f36..26ae749f0b 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/event/EventType.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/event/EventType.java
@@ -30,4 +30,5 @@ public enum EventType {
LIFECYCLE_READER_CLOSE,
LIFECYCLE_WRITER_CLOSE,
READER_MESSAGE_DELAYED,
+ JOB_STATUS,
}
diff --git
a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java
b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java
index bb7fe67a87..57a4ab9713 100644
---
a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java
+++
b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java
@@ -37,10 +37,10 @@ import
org.apache.seatunnel.engine.common.config.EngineConfig;
import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
+import org.apache.seatunnel.engine.common.job.JobResult;
+import org.apache.seatunnel.engine.common.job.JobStatus;
import org.apache.seatunnel.engine.common.runtime.ExecutionMode;
import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
-import org.apache.seatunnel.engine.core.job.JobResult;
-import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.engine.server.SeaTunnelNodeContext;
import org.apache.commons.lang3.StringUtils;
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java
index f48ca3f181..1ac59209b2 100644
---
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java
@@ -28,7 +28,7 @@ import org.apache.seatunnel.engine.client.job.ClientJobProxy;
import org.apache.seatunnel.engine.common.config.ConfigProvider;
import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
-import org.apache.seatunnel.engine.core.job.JobStatus;
+import org.apache.seatunnel.engine.common.job.JobStatus;
import org.apache.seatunnel.engine.server.SeaTunnelServerStarter;
import org.awaitility.Awaitility;
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceTwoPipelineIT.java
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceTwoPipelineIT.java
index 67352f9d91..dde1feb918 100644
---
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceTwoPipelineIT.java
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceTwoPipelineIT.java
@@ -27,7 +27,7 @@ import org.apache.seatunnel.engine.client.job.ClientJobProxy;
import org.apache.seatunnel.engine.common.config.ConfigProvider;
import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
-import org.apache.seatunnel.engine.core.job.JobStatus;
+import org.apache.seatunnel.engine.common.job.JobStatus;
import org.apache.seatunnel.engine.server.SeaTunnelServerStarter;
import org.awaitility.Awaitility;
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterIT.java
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterIT.java
index 76b4f6fc82..ab8d5e1663 100644
---
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterIT.java
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterIT.java
@@ -23,9 +23,9 @@ import org.apache.seatunnel.engine.client.job.ClientJobProxy;
import org.apache.seatunnel.engine.common.config.ConfigProvider;
import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
+import org.apache.seatunnel.engine.common.job.JobResult;
+import org.apache.seatunnel.engine.common.job.JobStatus;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
-import org.apache.seatunnel.engine.core.job.JobResult;
-import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.engine.server.SeaTunnelServerStarter;
import org.awaitility.Awaitility;
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java
index 1cb5333042..b01a045bcc 100644
---
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java
@@ -26,8 +26,8 @@ import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.common.config.ConfigProvider;
import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
-import org.apache.seatunnel.engine.core.job.JobResult;
-import org.apache.seatunnel.engine.core.job.JobStatus;
+import org.apache.seatunnel.engine.common.job.JobResult;
+import org.apache.seatunnel.engine.common.job.JobStatus;
import org.apache.seatunnel.engine.server.SeaTunnelServerStarter;
import org.apache.seatunnel.engine.server.execution.TaskLocation;
import org.apache.seatunnel.engine.server.metrics.SeaTunnelMetricsContext;
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/MultiTableMetricsIT.java
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/MultiTableMetricsIT.java
index c53b3174b1..787c096745 100644
---
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/MultiTableMetricsIT.java
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/MultiTableMetricsIT.java
@@ -23,7 +23,7 @@ import org.apache.seatunnel.engine.client.job.ClientJobProxy;
import org.apache.seatunnel.engine.common.config.ConfigProvider;
import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
-import org.apache.seatunnel.engine.core.job.JobStatus;
+import org.apache.seatunnel.engine.common.job.JobStatus;
import org.apache.seatunnel.engine.server.SeaTunnelServerStarter;
import org.apache.seatunnel.engine.server.rest.RestConstant;
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
index b54911cd86..b03163c769 100644
---
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
@@ -23,7 +23,7 @@ import org.apache.seatunnel.engine.client.job.ClientJobProxy;
import org.apache.seatunnel.engine.common.config.ConfigProvider;
import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
-import org.apache.seatunnel.engine.core.job.JobStatus;
+import org.apache.seatunnel.engine.common.job.JobStatus;
import org.apache.seatunnel.engine.server.SeaTunnelServerStarter;
import org.apache.seatunnel.engine.server.rest.RestConstant;
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/SeaTunnelSlotIT.java
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/SeaTunnelSlotIT.java
index 33a7be8914..57820b33d2 100644
---
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/SeaTunnelSlotIT.java
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/SeaTunnelSlotIT.java
@@ -25,7 +25,7 @@ import org.apache.seatunnel.engine.client.job.ClientJobProxy;
import org.apache.seatunnel.engine.common.config.ConfigProvider;
import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
-import org.apache.seatunnel.engine.core.job.JobStatus;
+import org.apache.seatunnel.engine.common.job.JobStatus;
import org.apache.seatunnel.engine.server.SeaTunnelServerStarter;
import org.awaitility.Awaitility;
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/SplitClusterFaultToleranceIT.java
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/SplitClusterFaultToleranceIT.java
index 1d2421014c..cd9562e97f 100644
---
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/SplitClusterFaultToleranceIT.java
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/SplitClusterFaultToleranceIT.java
@@ -28,7 +28,7 @@ import org.apache.seatunnel.engine.client.job.ClientJobProxy;
import org.apache.seatunnel.engine.common.config.ConfigProvider;
import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
-import org.apache.seatunnel.engine.core.job.JobStatus;
+import org.apache.seatunnel.engine.common.job.JobStatus;
import org.apache.seatunnel.engine.server.SeaTunnelServerStarter;
import org.awaitility.Awaitility;
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/TextHeaderIT.java
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/TextHeaderIT.java
index 5f8f64ecc5..e77c3a757e 100644
---
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/TextHeaderIT.java
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/TextHeaderIT.java
@@ -27,7 +27,7 @@ import org.apache.seatunnel.engine.client.job.ClientJobProxy;
import org.apache.seatunnel.engine.common.config.ConfigProvider;
import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
-import org.apache.seatunnel.engine.core.job.JobStatus;
+import org.apache.seatunnel.engine.common.job.JobStatus;
import org.apache.seatunnel.engine.server.SeaTunnelServerStarter;
import org.apache.seatunnel.format.text.constant.TextFormatConstant;
diff --git
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobProxy.java
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobProxy.java
index ea3940f5f9..e5d15d7346 100644
---
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobProxy.java
+++
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobProxy.java
@@ -22,12 +22,12 @@ import org.apache.seatunnel.common.utils.RetryUtils;
import org.apache.seatunnel.engine.client.SeaTunnelHazelcastClient;
import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
+import org.apache.seatunnel.engine.common.job.JobResult;
+import org.apache.seatunnel.engine.common.job.JobStatus;
import org.apache.seatunnel.engine.common.utils.ExceptionUtil;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
import org.apache.seatunnel.engine.core.job.Job;
import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
-import org.apache.seatunnel.engine.core.job.JobResult;
-import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelCancelJobCodec;
import
org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetJobStatusCodec;
import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelSubmitJobCodec;
diff --git
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobClient.java
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobClient.java
index 2421829ea9..035d3bc138 100644
---
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobClient.java
+++
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobClient.java
@@ -25,12 +25,12 @@ import
org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.seatunnel.engine.client.SeaTunnelHazelcastClient;
import org.apache.seatunnel.engine.client.util.ContentFormatUtil;
import org.apache.seatunnel.engine.common.Constant;
+import org.apache.seatunnel.engine.common.job.JobStatus;
+import org.apache.seatunnel.engine.common.job.JobStatusData;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
import org.apache.seatunnel.engine.core.job.JobDAGInfo;
import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
import org.apache.seatunnel.engine.core.job.JobPipelineCheckpointData;
-import org.apache.seatunnel.engine.core.job.JobStatus;
-import org.apache.seatunnel.engine.core.job.JobStatusData;
import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelCancelJobCodec;
import
org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetJobCheckpointCodec;
import
org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetJobDetailStatusCodec;
diff --git
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobStatusRunner.java
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobStatusRunner.java
index bed11b5dd7..1a05935765 100644
---
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobStatusRunner.java
+++
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobStatusRunner.java
@@ -18,7 +18,7 @@
package org.apache.seatunnel.engine.client.job;
import org.apache.seatunnel.common.utils.ExceptionUtils;
-import org.apache.seatunnel.engine.core.job.JobStatus;
+import org.apache.seatunnel.engine.common.job.JobStatus;
import lombok.extern.slf4j.Slf4j;
diff --git
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/util/ContentFormatUtil.java
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/util/ContentFormatUtil.java
index a03e77e466..0fcd072034 100644
---
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/util/ContentFormatUtil.java
+++
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/util/ContentFormatUtil.java
@@ -17,7 +17,7 @@
package org.apache.seatunnel.engine.client.util;
-import org.apache.seatunnel.engine.core.job.JobStatusData;
+import org.apache.seatunnel.engine.common.job.JobStatusData;
import org.apache.commons.lang3.StringUtils;
diff --git
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/ConnectorPackageClientTest.java
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/ConnectorPackageClientTest.java
index 79a240e136..c411a41511 100644
---
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/ConnectorPackageClientTest.java
+++
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/ConnectorPackageClientTest.java
@@ -35,10 +35,10 @@ import
org.apache.seatunnel.engine.common.config.ConfigProvider;
import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
+import org.apache.seatunnel.engine.common.job.JobStatus;
import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
import org.apache.seatunnel.engine.core.job.ConnectorJarIdentifier;
import org.apache.seatunnel.engine.core.job.ConnectorJarType;
-import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.engine.server.SeaTunnelServerStarter;
import org.apache.commons.lang3.StringUtils;
diff --git
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/ContentFormatUtilTest.java
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/ContentFormatUtilTest.java
index 923bb31cd7..20c2be714a 100644
---
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/ContentFormatUtilTest.java
+++
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/ContentFormatUtilTest.java
@@ -18,8 +18,8 @@
package org.apache.seatunnel.engine.client;
import org.apache.seatunnel.engine.client.util.ContentFormatUtil;
-import org.apache.seatunnel.engine.core.job.JobStatus;
-import org.apache.seatunnel.engine.core.job.JobStatusData;
+import org.apache.seatunnel.engine.common.job.JobStatus;
+import org.apache.seatunnel.engine.common.job.JobStatusData;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
diff --git
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
index 0c8ecce14a..cc4b8c164e 100644
---
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
+++
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
@@ -31,10 +31,10 @@ import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.common.config.ConfigProvider;
import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
+import org.apache.seatunnel.engine.common.job.JobStatus;
import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
import org.apache.seatunnel.engine.core.job.JobDAGInfo;
-import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.engine.server.SeaTunnelNodeContext;
import org.apache.commons.lang3.StringUtils;
diff --git
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelEngineClusterRoleTest.java
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelEngineClusterRoleTest.java
index 6607e3e458..a4f8ab4cc4 100644
---
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelEngineClusterRoleTest.java
+++
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelEngineClusterRoleTest.java
@@ -26,9 +26,9 @@ import org.apache.seatunnel.engine.common.config.EngineConfig;
import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
import org.apache.seatunnel.engine.common.config.server.ScheduleStrategy;
+import org.apache.seatunnel.engine.common.job.JobResult;
+import org.apache.seatunnel.engine.common.job.JobStatus;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
-import org.apache.seatunnel.engine.core.job.JobResult;
-import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.engine.server.SeaTunnelServerStarter;
import org.awaitility.Awaitility;
diff --git
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobResult.java
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/job/JobResult.java
similarity index 95%
rename from
seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobResult.java
rename to
seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/job/JobResult.java
index 5f946ffe45..34dc18830c 100644
---
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobResult.java
+++
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/job/JobResult.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.seatunnel.engine.core.job;
+package org.apache.seatunnel.engine.common.job;
import lombok.AllArgsConstructor;
import lombok.Data;
diff --git
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobStatusData.java
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/job/JobStateEvent.java
similarity index 55%
copy from
seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobStatusData.java
copy to
seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/job/JobStateEvent.java
index fbe9656ec6..fdb90b577f 100644
---
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobStatusData.java
+++
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/job/JobStateEvent.java
@@ -15,22 +15,34 @@
* limitations under the License.
*/
-package org.apache.seatunnel.engine.core.job;
+package org.apache.seatunnel.engine.common.job;
-import lombok.AllArgsConstructor;
-import lombok.Data;
-import lombok.NoArgsConstructor;
+import org.apache.seatunnel.api.event.Event;
+import org.apache.seatunnel.api.event.EventType;
-import java.io.Serializable;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
-@AllArgsConstructor
-@Data
-@NoArgsConstructor
-public final class JobStatusData implements Serializable {
- private Long jobId;
+@Getter
+@Setter
+@ToString
+public class JobStateEvent implements Event {
+
+ private String jobId;
private String jobName;
private JobStatus jobStatus;
- private long submitTime;
- private Long startTime;
- private Long finishTime;
+ private long createdTime;
+
+ public JobStateEvent(Long jobId, String jobName, JobStatus jobStatus) {
+ this.jobId = String.valueOf(jobId);
+ this.jobName = jobName;
+ this.jobStatus = jobStatus;
+ this.createdTime = System.currentTimeMillis();
+ }
+
+ @Override
+ public EventType getEventType() {
+ return EventType.JOB_STATUS;
+ }
}
diff --git
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobStatus.java
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/job/JobStatus.java
similarity index 98%
rename from
seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobStatus.java
rename to
seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/job/JobStatus.java
index 53d1eb6aee..bd7d266a21 100644
---
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobStatus.java
+++
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/job/JobStatus.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.seatunnel.engine.core.job;
+package org.apache.seatunnel.engine.common.job;
/** Possible states of a job once it has been accepted by the dispatcher. */
public enum JobStatus {
diff --git
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobStatusData.java
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/job/JobStatusData.java
similarity index 96%
rename from
seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobStatusData.java
rename to
seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/job/JobStatusData.java
index fbe9656ec6..f88c4fb779 100644
---
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobStatusData.java
+++
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/job/JobStatusData.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.seatunnel.engine.core.job;
+package org.apache.seatunnel.engine.common.job;
import lombok.AllArgsConstructor;
import lombok.Data;
diff --git
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/Job.java
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/Job.java
index 52fba14205..264c4e95ce 100644
---
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/Job.java
+++
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/Job.java
@@ -17,6 +17,8 @@
package org.apache.seatunnel.engine.core.job;
+import org.apache.seatunnel.engine.common.job.JobResult;
+import org.apache.seatunnel.engine.common.job.JobStatus;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
/** Job interface define the Running job apis */
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
index 404db7a538..621c5fc91a 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
@@ -36,12 +36,12 @@ import
org.apache.seatunnel.engine.common.exception.JobException;
import org.apache.seatunnel.engine.common.exception.JobNotFoundException;
import org.apache.seatunnel.engine.common.exception.SavePointFailedException;
import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
+import org.apache.seatunnel.engine.common.job.JobResult;
+import org.apache.seatunnel.engine.common.job.JobStatus;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
import org.apache.seatunnel.engine.core.job.JobDAGInfo;
import org.apache.seatunnel.engine.core.job.JobInfo;
-import org.apache.seatunnel.engine.core.job.JobResult;
-import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.engine.core.job.PipelineStatus;
import org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex;
import org.apache.seatunnel.engine.server.dag.physical.PipelineLocation;
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
index 707c7c6f0d..db78e47eb1 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
@@ -23,13 +23,13 @@ import org.apache.seatunnel.api.tracing.MDCTracer;
import org.apache.seatunnel.engine.checkpoint.storage.PipelineState;
import org.apache.seatunnel.engine.checkpoint.storage.api.CheckpointStorage;
import org.apache.seatunnel.engine.common.config.server.CheckpointConfig;
+import org.apache.seatunnel.engine.common.job.JobStatus;
import org.apache.seatunnel.engine.common.utils.ExceptionUtil;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
import org.apache.seatunnel.engine.core.checkpoint.CheckpointIDCounter;
import org.apache.seatunnel.engine.core.dag.actions.Action;
import org.apache.seatunnel.engine.core.job.Job;
-import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.engine.core.job.PipelineStatus;
import
org.apache.seatunnel.engine.server.checkpoint.operation.TaskAcknowledgeOperation;
import
org.apache.seatunnel.engine.server.checkpoint.operation.TaskReportStatusOperation;
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
index 1b6490f31b..25aec1ea18 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
@@ -21,12 +21,13 @@ import org.apache.seatunnel.common.utils.ExceptionUtils;
import org.apache.seatunnel.common.utils.RetryUtils;
import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
+import org.apache.seatunnel.engine.common.job.JobResult;
+import org.apache.seatunnel.engine.common.job.JobStateEvent;
+import org.apache.seatunnel.engine.common.job.JobStatus;
import org.apache.seatunnel.engine.common.utils.ExceptionUtil;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
-import org.apache.seatunnel.engine.core.job.JobResult;
-import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.engine.core.job.PipelineExecutionState;
import org.apache.seatunnel.engine.core.job.PipelineStatus;
import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
@@ -191,11 +192,11 @@ public class PhysicalPlan {
}
public void cancelJob() {
- if (getJobStatus().isEndState()) {
+ JobStatus jobStatus = getJobStatus();
+ if (jobStatus.isEndState()) {
log.warn(
String.format(
- "%s is in end state %s, can not be cancel",
- jobFullName, getJobStatus()));
+ "%s is in end state %s, can not be cancel",
jobFullName, jobStatus));
return;
}
@@ -209,11 +210,11 @@ public class PhysicalPlan {
}
public void savepointJob() {
- if (getJobStatus().isEndState()) {
+ JobStatus jobStatus = getJobStatus();
+ if (jobStatus.isEndState()) {
log.warn(
String.format(
- "%s is in end state %s, can not do savepoint",
- jobFullName, getJobStatus()));
+ "%s is in end state %s, can not do savepoint",
jobFullName, jobStatus));
return;
}
updateJobState(JobStatus.DOING_SAVEPOINT);
@@ -318,7 +319,8 @@ public class PhysicalPlan {
log.warn(String.format("%s state process is stopped",
jobFullName));
return;
}
- switch (getJobStatus()) {
+ JobStatus jobStatus = getJobStatus();
+ switch (jobStatus) {
case CREATED:
updateJobState(JobStatus.SCHEDULED);
break;
@@ -347,10 +349,18 @@ public class PhysicalPlan {
case SAVEPOINT_DONE:
case FINISHED:
stopJobStateProcess();
- jobEndFuture.complete(new JobResult(getJobStatus(),
errorBySubPlan.get()));
+ jobEndFuture.complete(new JobResult(jobStatus,
errorBySubPlan.get()));
+ jobMaster
+ .getCoordinatorService()
+ .getEventProcessor()
+ .process(
+ new JobStateEvent(
+ jobImmutableInformation.getJobId(),
+
jobImmutableInformation.getJobConfig().getName(),
+ jobStatus));
return;
default:
- throw new IllegalArgumentException("Unknown Job State: " +
getJobStatus());
+ throw new IllegalArgumentException("Unknown Job State: " +
jobStatus);
}
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobHistoryService.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobHistoryService.java
index e158656712..c8610c538f 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobHistoryService.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobHistoryService.java
@@ -24,11 +24,11 @@ import
org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ObjectNode
import org.apache.seatunnel.api.common.metrics.JobMetrics;
import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
+import org.apache.seatunnel.engine.common.job.JobStatus;
+import org.apache.seatunnel.engine.common.job.JobStatusData;
import org.apache.seatunnel.engine.core.job.ExecutionAddress;
import org.apache.seatunnel.engine.core.job.JobDAGInfo;
import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
-import org.apache.seatunnel.engine.core.job.JobStatus;
-import org.apache.seatunnel.engine.core.job.JobStatusData;
import org.apache.seatunnel.engine.core.job.PipelineStatus;
import org.apache.seatunnel.engine.server.dag.physical.PipelineLocation;
import org.apache.seatunnel.engine.server.execution.ExecutionState;
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index 273749e5be..ee19e4dd85 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -38,6 +38,8 @@ import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.common.config.server.CheckpointConfig;
import
org.apache.seatunnel.engine.common.config.server.CheckpointStorageConfig;
import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
+import org.apache.seatunnel.engine.common.job.JobResult;
+import org.apache.seatunnel.engine.common.job.JobStatus;
import org.apache.seatunnel.engine.common.utils.ExceptionUtil;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
@@ -49,9 +51,8 @@ import org.apache.seatunnel.engine.core.job.ExecutionAddress;
import org.apache.seatunnel.engine.core.job.JobDAGInfo;
import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
import org.apache.seatunnel.engine.core.job.JobInfo;
-import org.apache.seatunnel.engine.core.job.JobResult;
-import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.engine.core.job.PipelineStatus;
+import org.apache.seatunnel.engine.server.CoordinatorService;
import org.apache.seatunnel.engine.server.SeaTunnelServer;
import org.apache.seatunnel.engine.server.checkpoint.CheckpointManager;
import org.apache.seatunnel.engine.server.checkpoint.CheckpointPlan;
@@ -1079,4 +1080,8 @@ public class JobMaster {
public EngineConfig getEngineConfig() {
return this.engineConfig;
}
+
+ public CoordinatorService getCoordinatorService() {
+ return this.seaTunnelServer.getCoordinatorService();
+ }
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetJobStatusOperation.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetJobStatusOperation.java
index c70ad6453a..ab724f4f4e 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetJobStatusOperation.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetJobStatusOperation.java
@@ -17,8 +17,8 @@
package org.apache.seatunnel.engine.server.operation;
+import org.apache.seatunnel.engine.common.job.JobStatus;
import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
-import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.engine.server.SeaTunnelServer;
import
org.apache.seatunnel.engine.server.serializable.ClientToServerOperationDataSerializerHook;
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/opeartion/GetOverviewOperation.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/opeartion/GetOverviewOperation.java
index 8b2533ece5..280e0ccdfc 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/opeartion/GetOverviewOperation.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/opeartion/GetOverviewOperation.java
@@ -18,7 +18,7 @@
package org.apache.seatunnel.engine.server.resourcemanager.opeartion;
import org.apache.seatunnel.engine.common.Constant;
-import org.apache.seatunnel.engine.core.job.JobStatus;
+import org.apache.seatunnel.engine.common.job.JobStatus;
import org.apache.seatunnel.engine.server.SeaTunnelServer;
import org.apache.seatunnel.engine.server.master.JobHistoryService.JobState;
import org.apache.seatunnel.engine.server.resourcemanager.ResourceManager;
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseService.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseService.java
index 05e55a9378..eedcb2da2b 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseService.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseService.java
@@ -28,6 +28,7 @@ import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.common.utils.DateTimeUtils;
import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.common.config.JobConfig;
+import org.apache.seatunnel.engine.common.job.JobStatus;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
import org.apache.seatunnel.engine.core.classloader.ClassLoaderService;
import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
@@ -35,7 +36,6 @@ import org.apache.seatunnel.engine.core.job.ExecutionAddress;
import org.apache.seatunnel.engine.core.job.JobDAGInfo;
import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
import org.apache.seatunnel.engine.core.job.JobInfo;
-import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.engine.server.CoordinatorService;
import org.apache.seatunnel.engine.server.SeaTunnelServer;
import org.apache.seatunnel.engine.server.dag.DAGUtils;
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/ConnectorPackageServiceTest.java
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/ConnectorPackageServiceTest.java
index 6d9bf2f447..3b5abb8b70 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/ConnectorPackageServiceTest.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/ConnectorPackageServiceTest.java
@@ -32,6 +32,7 @@ import
org.apache.seatunnel.engine.common.config.ConfigProvider;
import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
+import org.apache.seatunnel.engine.common.job.JobStatus;
import org.apache.seatunnel.engine.common.utils.IdGenerator;
import org.apache.seatunnel.engine.common.utils.MDUtil;
import org.apache.seatunnel.engine.core.dag.actions.Action;
@@ -42,7 +43,6 @@ import org.apache.seatunnel.engine.core.job.ConnectorJar;
import org.apache.seatunnel.engine.core.job.ConnectorJarIdentifier;
import org.apache.seatunnel.engine.core.job.ConnectorJarType;
import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
-import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.engine.core.job.PipelineStatus;
import org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser;
import org.apache.seatunnel.engine.server.service.jar.ConnectorPackageService;
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/CoordinatorServiceTest.java
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/CoordinatorServiceTest.java
index 941c6e31fe..d7913b56ee 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/CoordinatorServiceTest.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/CoordinatorServiceTest.java
@@ -21,9 +21,9 @@ import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.common.config.ConfigProvider;
import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
+import org.apache.seatunnel.engine.common.job.JobStatus;
import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
-import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.engine.core.job.PipelineStatus;
import org.apache.seatunnel.engine.server.operation.PrintMessageOperation;
import org.apache.seatunnel.engine.server.operation.ReturnRetryTimesOperation;
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/CoordinatorServiceWithCancelPendingJobTest.java
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/CoordinatorServiceWithCancelPendingJobTest.java
index 8dd46b9c77..b2e57bedf0 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/CoordinatorServiceWithCancelPendingJobTest.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/CoordinatorServiceWithCancelPendingJobTest.java
@@ -20,12 +20,12 @@ package org.apache.seatunnel.engine.server;
import org.apache.seatunnel.engine.common.config.EngineConfig;
import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
import org.apache.seatunnel.engine.common.config.server.ScheduleStrategy;
+import org.apache.seatunnel.engine.common.job.JobStatus;
import org.apache.seatunnel.engine.common.runtime.ExecutionMode;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
import org.apache.seatunnel.engine.core.job.JobInfo;
-import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.engine.core.job.PipelineStatus;
import org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex;
import org.apache.seatunnel.engine.server.dag.physical.PipelineLocation;
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointErrorRestoreEndTest.java
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointErrorRestoreEndTest.java
index 4893bd2c2b..643c486962 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointErrorRestoreEndTest.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointErrorRestoreEndTest.java
@@ -17,7 +17,7 @@
package org.apache.seatunnel.engine.server.checkpoint;
-import org.apache.seatunnel.engine.core.job.JobStatus;
+import org.apache.seatunnel.engine.common.job.JobStatus;
import org.apache.seatunnel.engine.server.AbstractSeaTunnelServerTest;
import org.apache.seatunnel.engine.server.master.JobMaster;
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManagerTest.java
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManagerTest.java
index 664bc04395..7a499b2046 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManagerTest.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManagerTest.java
@@ -23,9 +23,9 @@ import
org.apache.seatunnel.engine.checkpoint.storage.api.CheckpointStorageFacto
import
org.apache.seatunnel.engine.checkpoint.storage.exception.CheckpointStorageException;
import org.apache.seatunnel.engine.common.config.server.CheckpointConfig;
import
org.apache.seatunnel.engine.common.config.server.CheckpointStorageConfig;
+import org.apache.seatunnel.engine.common.job.JobStatus;
import org.apache.seatunnel.engine.common.utils.FactoryUtil;
import org.apache.seatunnel.engine.core.checkpoint.CheckpointType;
-import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.engine.core.job.PipelineStatus;
import org.apache.seatunnel.engine.serializer.protobuf.ProtoStuffSerializer;
import org.apache.seatunnel.engine.server.AbstractSeaTunnelServerTest;
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointStorageTest.java
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointStorageTest.java
index 3b054ffb6d..f0f4ca22c8 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointStorageTest.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointStorageTest.java
@@ -23,8 +23,8 @@ import
org.apache.seatunnel.engine.checkpoint.storage.api.CheckpointStorage;
import
org.apache.seatunnel.engine.checkpoint.storage.exception.CheckpointStorageException;
import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
import org.apache.seatunnel.engine.common.config.server.CheckpointConfig;
+import org.apache.seatunnel.engine.common.job.JobStatus;
import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
-import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.engine.server.AbstractSeaTunnelServerTest;
import org.apache.seatunnel.engine.server.CheckpointService;
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointTimeOutTest.java
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointTimeOutTest.java
index 99fcfa4ab9..af118329ae 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointTimeOutTest.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointTimeOutTest.java
@@ -17,10 +17,10 @@
package org.apache.seatunnel.engine.server.checkpoint;
+import org.apache.seatunnel.engine.common.job.JobStatus;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
-import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.engine.server.AbstractSeaTunnelServerTest;
import org.apache.seatunnel.engine.server.TestUtils;
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/SavePointTest.java
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/SavePointTest.java
index 038127dbcb..51bf4960be 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/SavePointTest.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/SavePointTest.java
@@ -19,8 +19,8 @@ package org.apache.seatunnel.engine.server.checkpoint;
import org.apache.seatunnel.common.utils.FileUtils;
import org.apache.seatunnel.engine.common.exception.SavePointFailedException;
+import org.apache.seatunnel.engine.common.job.JobStatus;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
-import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.engine.server.AbstractSeaTunnelServerTest;
import org.junit.jupiter.api.Assertions;
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/event/JobStateEventTest.java
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/event/JobStateEventTest.java
new file mode 100644
index 0000000000..f072cb78b3
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/event/JobStateEventTest.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.event;
+
+import org.apache.seatunnel.api.event.EventHandler;
+import org.apache.seatunnel.api.event.EventType;
+import org.apache.seatunnel.common.utils.ReflectionUtils;
+import org.apache.seatunnel.engine.common.job.JobStateEvent;
+import org.apache.seatunnel.engine.common.job.JobStatus;
+import org.apache.seatunnel.engine.server.AbstractSeaTunnelServerTest;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static
org.apache.seatunnel.engine.server.checkpoint.CheckpointErrorRestoreEndTest.STREAM_CONF_WITH_ERROR_PATH;
+import static org.awaitility.Awaitility.await;
+
+public class JobStateEventTest extends AbstractSeaTunnelServerTest {
+
+ @Test
+ public void testJobStateEvent() {
+
+ JobEventProcessor eventProcessor =
+ (JobEventProcessor)
server.getCoordinatorService().getEventProcessor();
+
+ AtomicInteger accessCounter = new AtomicInteger(0);
+ AtomicReference<JobStateEvent> jobStateEventReference = new
AtomicReference<>();
+ EventHandler eventHandler =
+ event -> {
+ if (event.getEventType() != EventType.JOB_STATUS) {
+ return;
+ }
+ JobStateEvent jobStateEvent = (JobStateEvent) event;
+ JobStatus status = jobStateEvent.getJobStatus();
+ switch (status) {
+ case FAILED:
+ case CANCELED:
+ case SAVEPOINT_DONE:
+ case FINISHED:
+ accessCounter.incrementAndGet();
+ jobStateEventReference.lazySet(jobStateEvent);
+ break;
+ default:
+ break;
+ }
+ };
+ // register the event handler
+ List<EventHandler> handlers =
+ (List<EventHandler>) ReflectionUtils.getField(eventProcessor,
"handlers").get();
+ handlers.add(eventHandler);
+ long jobId_finished = System.currentTimeMillis();
+ long currentTimeMillis = System.currentTimeMillis();
+ startJob(jobId_finished, "fake_to_console.conf", false);
+ await().atMost(60, TimeUnit.SECONDS)
+ .untilAsserted(
+ () ->
+ Assertions.assertEquals(
+ JobStatus.FINISHED,
+ server.getCoordinatorService()
+
.getJobStatus(jobId_finished)));
+ // check whether the event handler is executed
+ Assertions.assertEquals(1, accessCounter.get());
+ JobStateEvent jobStateEventFinished = jobStateEventReference.get();
+ Assertions.assertEquals(String.valueOf(jobId_finished),
jobStateEventFinished.getJobId());
+ Assertions.assertEquals(JobStatus.FINISHED,
jobStateEventFinished.getJobStatus());
+ Assertions.assertTrue(jobStateEventFinished.getCreatedTime() >
currentTimeMillis);
+ Assertions.assertEquals(String.valueOf(jobId_finished),
jobStateEventFinished.getJobName());
+
+ long jobId_failed = System.currentTimeMillis();
+ startJob(jobId_failed, STREAM_CONF_WITH_ERROR_PATH, false);
+ await().atMost(60, TimeUnit.SECONDS)
+ .untilAsserted(
+ () ->
+ Assertions.assertEquals(
+ JobStatus.FAILED,
+
server.getCoordinatorService().getJobStatus(jobId_failed)));
+
+ Assertions.assertEquals(2, accessCounter.get());
+ JobStateEvent jobStateEventFailed = jobStateEventReference.get();
+ Assertions.assertEquals(String.valueOf(jobId_failed),
jobStateEventFailed.getJobId());
+ Assertions.assertEquals(JobStatus.FAILED,
jobStateEventFailed.getJobStatus());
+ Assertions.assertTrue(jobStateEventFailed.getCreatedTime() >
currentTimeMillis);
+ Assertions.assertEquals(String.valueOf(jobId_failed),
jobStateEventFailed.getJobName());
+ }
+}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobHistoryServiceTest.java
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobHistoryServiceTest.java
index 4744eb59b5..50e5fcb317 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobHistoryServiceTest.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobHistoryServiceTest.java
@@ -18,11 +18,11 @@
package org.apache.seatunnel.engine.server.master;
import org.apache.seatunnel.common.utils.JsonUtils;
+import org.apache.seatunnel.engine.common.job.JobStatus;
+import org.apache.seatunnel.engine.common.job.JobStatusData;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
-import org.apache.seatunnel.engine.core.job.JobStatus;
-import org.apache.seatunnel.engine.core.job.JobStatusData;
import org.apache.seatunnel.engine.server.AbstractSeaTunnelServerTest;
import org.apache.seatunnel.engine.server.TestUtils;
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMasterTest.java
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMasterTest.java
index b13bc3150e..53d2f377c1 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMasterTest.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMasterTest.java
@@ -18,12 +18,12 @@
package org.apache.seatunnel.engine.server.master;
import org.apache.seatunnel.engine.common.Constant;
+import org.apache.seatunnel.engine.common.job.JobResult;
+import org.apache.seatunnel.engine.common.job.JobStatus;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
import org.apache.seatunnel.engine.core.job.JobInfo;
-import org.apache.seatunnel.engine.core.job.JobResult;
-import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.engine.core.job.PipelineStatus;
import org.apache.seatunnel.engine.server.AbstractSeaTunnelServerTest;
import org.apache.seatunnel.engine.server.TestUtils;
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMetricsTest.java
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMetricsTest.java
index 8fcd93eb0c..fa818f6ff3 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMetricsTest.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMetricsTest.java
@@ -18,7 +18,7 @@
package org.apache.seatunnel.engine.server.master;
import org.apache.seatunnel.api.common.metrics.JobMetrics;
-import org.apache.seatunnel.engine.core.job.JobStatus;
+import org.apache.seatunnel.engine.common.job.JobStatus;
import org.apache.seatunnel.engine.server.AbstractSeaTunnelServerTest;
import org.apache.seatunnel.engine.server.CoordinatorService;