This is an automated email from the ASF dual-hosted git repository.

corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new e7de54e4e7 [Feature][Zeta] extended jobStateEvent event listening for 
zeta (#9689)
e7de54e4e7 is described below

commit e7de54e4e7af8fc2349a81a0fa8369713471b60d
Author: zhangdonghao <[email protected]>
AuthorDate: Wed Aug 20 10:14:01 2025 +0800

    [Feature][Zeta] extended jobStateEvent event listening for zeta (#9689)
---
 docs/en/concept/event-listener.md                  | 183 ++++++++++++++++++++-
 docs/zh/concept/event-listener.md                  | 178 ++++++++++++++++++++
 .../org/apache/seatunnel/api/event/EventType.java  |   1 +
 .../seatunnel/command/ClientExecuteCommand.java    |   4 +-
 .../engine/e2e/ClusterFaultToleranceIT.java        |   2 +-
 .../e2e/ClusterFaultToleranceTwoPipelineIT.java    |   2 +-
 .../org/apache/seatunnel/engine/e2e/ClusterIT.java |   4 +-
 .../seatunnel/engine/e2e/JobExecutionIT.java       |   4 +-
 .../seatunnel/engine/e2e/MultiTableMetricsIT.java  |   2 +-
 .../org/apache/seatunnel/engine/e2e/RestApiIT.java |   2 +-
 .../seatunnel/engine/e2e/SeaTunnelSlotIT.java      |   2 +-
 .../engine/e2e/SplitClusterFaultToleranceIT.java   |   2 +-
 .../apache/seatunnel/engine/e2e/TextHeaderIT.java  |   2 +-
 .../engine/client/job/ClientJobProxy.java          |   4 +-
 .../seatunnel/engine/client/job/JobClient.java     |   4 +-
 .../engine/client/job/JobStatusRunner.java         |   2 +-
 .../engine/client/util/ContentFormatUtil.java      |   2 +-
 .../engine/client/ConnectorPackageClientTest.java  |   2 +-
 .../engine/client/ContentFormatUtilTest.java       |   4 +-
 .../engine/client/SeaTunnelClientTest.java         |   2 +-
 .../client/SeaTunnelEngineClusterRoleTest.java     |   4 +-
 .../seatunnel/engine/common}/job/JobResult.java    |   2 +-
 .../engine/common/job/JobStateEvent.java}          |  38 +++--
 .../seatunnel/engine/common}/job/JobStatus.java    |   2 +-
 .../engine/common}/job/JobStatusData.java          |   2 +-
 .../org/apache/seatunnel/engine/core/job/Job.java  |   2 +
 .../engine/server/CoordinatorService.java          |   4 +-
 .../server/checkpoint/CheckpointManager.java       |   2 +-
 .../engine/server/dag/physical/PhysicalPlan.java   |  32 ++--
 .../engine/server/master/JobHistoryService.java    |   4 +-
 .../seatunnel/engine/server/master/JobMaster.java  |   9 +-
 .../server/operation/GetJobStatusOperation.java    |   2 +-
 .../opeartion/GetOverviewOperation.java            |   2 +-
 .../engine/server/rest/service/BaseService.java    |   2 +-
 .../engine/server/ConnectorPackageServiceTest.java |   2 +-
 .../engine/server/CoordinatorServiceTest.java      |   2 +-
 ...CoordinatorServiceWithCancelPendingJobTest.java |   2 +-
 .../checkpoint/CheckpointErrorRestoreEndTest.java  |   2 +-
 .../server/checkpoint/CheckpointManagerTest.java   |   2 +-
 .../server/checkpoint/CheckpointStorageTest.java   |   2 +-
 .../server/checkpoint/CheckpointTimeOutTest.java   |   2 +-
 .../engine/server/checkpoint/SavePointTest.java    |   2 +-
 .../engine/server/event/JobStateEventTest.java     | 105 ++++++++++++
 .../server/master/JobHistoryServiceTest.java       |   4 +-
 .../engine/server/master/JobMasterTest.java        |   4 +-
 .../engine/server/master/JobMetricsTest.java       |   2 +-
 46 files changed, 570 insertions(+), 76 deletions(-)

diff --git a/docs/en/concept/event-listener.md 
b/docs/en/concept/event-listener.md
index 7ba4550205..b567b8152c 100644
--- a/docs/en/concept/event-listener.md
+++ b/docs/en/concept/event-listener.md
@@ -21,9 +21,26 @@ The event API is defined in the 
`org.apache.seatunnel.api.event` package.
 - `org.apache.seatunnel.api.event.Event` - The interface for event data.
 - `org.apache.seatunnel.api.event.EventType` - The enum for event type.
 
+#### EventType Enumeration Description
+The `EventType` enumeration defines all possible event types in the system, 
mainly including:
+
+| Event Type                      | Description                     | 
Associated Event Class          |
+|---------------------------------|---------------------------------|---------------------------------|
+| `JOB_STATUS`                    | Job status change event         | 
`JobStateEvent`                 |
+| `SCHEMA_CHANGE_UPDATE_COLUMNS`  | Table structure update event    | 
`AlterTableColumnsEvent`        |
+| `SCHEMA_CHANGE_ADD_COLUMN`      | Table column addition event     | 
`AlterTableAddColumnEvent`      |
+| `SCHEMA_CHANGE_DROP_COLUMN`     | Table column deletion event     | 
`AlterTableDropColumnEvent`     |
+| `SCHEMA_CHANGE_MODIFY_COLUMN`   | Table column modification event | 
`AlterTableModifyColumnEvent`   |
+| `READER_OPEN`                   | Reader open event               | 
`ReaderOpenEvent`               |
+| `READER_CLOSE`                  | Reader close event              | 
`ReaderCloseEvent`              |
+| `WRITER_OPEN`                   | Writer open event               | 
`WriterOpenEvent`               |
+| `WRITER_CLOSE`                  | Writer close event              | 
`WriterCloseEvent`              |
+
+> Note: Different event types correspond to different event data structures. 
When customizing an event handler, you need to judge the type through 
`event.getEventType()` to ensure type-safe conversion.
+
 ### Event Listener API
 
-You can customize event handler, such as sending events to external systems
+You can customize event handler, such as sending events to external systems.
 
 - `org.apache.seatunnel.api.event.EventHandler` - The interface for event 
handler, SPI will automatically load subclass from the classpath.
 
@@ -114,3 +131,167 @@ Example: 
`org.apache.seatunnel.api.event.LoggingEventHandler`
 ### Spark Engine
 
 You can define the implementation class of 
`org.apache.seatunnel.api.event.EventHandler` interface and add to the 
classpath to automatically load it through SPI.
+
+## Steps to Implement a Custom Event Handler
+
+The following takes `JobStateEvent` as an example to illustrate how to 
implement a custom event handler. You can extend this method to handle other 
types of events as needed.
+
+### 1. Add Dependencies
+
+Introduce the necessary dependencies in the project's `pom.xml`:
+```xml
+<dependency>
+    <groupId>org.apache.seatunnel</groupId>
+    <artifactId>seatunnel-api</artifactId>
+    <version>${seatunnel.version}</version>
+    <scope>provided</scope>
+</dependency>
+<dependency>
+    <groupId>org.apache.seatunnel</groupId>
+    <artifactId>seatunnel-engine-common</artifactId>
+    <version>${seatunnel.version}</version>
+    <scope>provided</scope>
+</dependency>
+```
+> Note: Replace `${seatunnel.version}` with the actual SeaTunnel version used.
+
+
+### 2. Implement the Event Handler
+
+Create a custom class that implements the 
`org.apache.seatunnel.api.event.EventHandler` interface, override the `handle` 
method, and implement business logic for the event types to be processed.
+
+**Core Logic**: Filter event types through `event.getEventType()` — since the 
SeaTunnel engine distributes various types of events, you need to explicitly 
judge the event type to ensure only target events are processed.
+
+```java
+import lombok.extern.slf4j.Slf4j;
+import org.apache.seatunnel.api.event.Event;
+import org.apache.seatunnel.api.event.EventHandler;
+import org.apache.seatunnel.api.event.EventType;
+import org.apache.seatunnel.engine.common.job.JobStatus;
+import org.apache.seatunnel.engine.common.job.JobStateEvent;
+import org.apache.seatunnel.api.event.schema.AlterTableAddColumnEvent;
+import org.apache.seatunnel.api.event.source.ReaderOpenEvent;
+import org.apache.seatunnel.api.event.sink.WriterCloseEvent;
+
+/**
+ * Example of a custom multi-type event handler, including processing logic 
for multiple events
+ */
+@Slf4j
+public class CustomMultiEventHandler implements EventHandler {
+
+    @Override
+    public void handle(Event event) {
+        // Process differently based on event type
+        EventType eventType = event.getEventType();
+        
+        switch (eventType) {
+            case JOB_STATUS:
+                handleJobStateEvent((JobStateEvent) event);
+                break;
+            case SCHEMA_CHANGE_ADD_COLUMN:
+                handleAddColumnEvent((AlterTableAddColumnEvent) event);
+                break;
+            case READER_OPEN:
+                handleReaderOpenEvent((ReaderOpenEvent) event);
+                break;
+            case WRITER_CLOSE:
+                handleWriterCloseEvent((WriterCloseEvent) event);
+                break;
+            // Add processing for other event types as needed
+            default:
+                // Ignore unprocessed event types
+                log.debug("Ignoring unprocessed event type: {}", eventType);
+        }
+    }
+
+    /**
+     * Handle job state events
+     */
+    private void handleJobStateEvent(JobStateEvent jobEvent) {
+        String jobId = jobEvent.getJobId();
+        String jobName = jobEvent.getJobName();
+        JobStatus status = jobEvent.getJobStatus();
+        long eventTime = jobEvent.getCreatedTime();
+
+        switch (status) {
+            case FAILED:
+                log.error("Job failed | jobId: {}, jobName: {}, Time: {}", 
+                    jobId, jobName, eventTime);
+                // Add failure alert logic
+                sendAlert("Job Failure", "jobId: " + jobId);
+                break;
+            case FINISHED:
+                log.info("Job completed | jobId: {}, jobName: {}, Time: {}", 
+                    jobId, jobName, eventTime);
+                break;
+            // Handle other statuses...
+            default:
+                log.info("Job status changed | jobId: {}, Status: {}, Time: 
{}", 
+                    jobId, status, eventTime);
+        }
+    }
+
+    /**
+     * Handle table column addition events
+     */
+    private void handleAddColumnEvent(AlterTableAddColumnEvent event) {
+        log.info("Column added to table | Table Name: {}, Added Columns: {}, 
Time: {}",
+            event.getTableName(), event.getAddedColumns(), 
event.getEventTime());
+        // Handle table structure change logic
+    }
+
+    /**
+     * Handle reader open events
+     */
+    private void handleReaderOpenEvent(ReaderOpenEvent event) {
+        log.info("Reader opened | Plugin ID: {}, Parallelism: {}, Time: {}",
+            event.getPluginId(), event.getParallelism(), event.getEventTime());
+        // Handle reader initialization logic
+    }
+
+    /**
+     * Handle writer close events
+     */
+    private void handleWriterCloseEvent(WriterCloseEvent event) {
+        log.info("Writer closed | Plugin ID: {}, Processed Record Count: {}, 
Time: {}",
+            event.getPluginId(), event.getRecordCount(), event.getEventTime());
+        // Handle writer resource cleanup logic
+    }
+
+    /**
+     * Send alert notifications
+     */
+    private void sendAlert(String title, String content) {
+        // Implement alert logic (e.g., calling HTTP APIs, sending emails, 
etc.)
+        log.info("[Alert] {}: {}", title, content);
+    }
+}
+```
+
+
+### 3. Configure SPI Loading
+
+To enable the engine to automatically discover and load the custom handler, 
add an SPI configuration file in the project's resource directory:
+
+1. Create the directory: `src/main/resources/META-INF/services/`
+2. Create a new file: `org.apache.seatunnel.api.event.EventHandler`
+3. Add the fully qualified class name of the custom handler to the file:
+   ```
+   com.example.CustomMultiEventHandler
+   ```
+
+
+### 4. Deployment and Verification
+- Place the JAR package containing the custom handler into the SeaTunnel 
engine's classpath (e.g., the `lib/` directory)
+- After starting the task, when the corresponding event occurs, the handler 
will be triggered automatically and execute the corresponding processing logic
+- Verify whether the handler works properly through log output
+
+
+### Notes
+- The handler logic should be as lightweight as possible to avoid blocking the 
event processing thread
+- If network calls are required (e.g., sending alerts), it is recommended to 
implement them in an asynchronous manner to prevent timeouts from affecting the 
task itself
+- Different engines may have different levels of support for events; for 
example, `JobStateEvent` currently only supports the Zeta engine
+- Event types and event classes are in a one-to-one correspondence; ensure 
type matching during conversion to avoid `ClassCastException`
+- You can implement multiple event handlers to process different types of 
events respectively, or handle multiple event types in a single handler
+
+Through the above steps, you can flexibly monitor and process various events 
in SeaTunnel, and implement custom business logic such as status monitoring, 
alert notifications, and data statistics.
\ No newline at end of file
diff --git a/docs/zh/concept/event-listener.md 
b/docs/zh/concept/event-listener.md
index 69972cbfc5..7f33736ab0 100644
--- a/docs/zh/concept/event-listener.md
+++ b/docs/zh/concept/event-listener.md
@@ -19,6 +19,23 @@ SeaTunnel提供了丰富的事件监听器功能,用于管理数据同步时
 - `org.apache.seatunnel.api.event.Event` - 事件数据的接口。
 - `org.apache.seatunnel.api.event.EventType` - 事件数据的枚举值。
 
+#### EventType 枚举说明
+`EventType`枚举定义了系统中所有可能的事件类型,主要包括:
+
+| 事件类型                           | 说明       | 关联事件类                         |
+|--------------------------------|----------|-------------------------------|
+| `JOB_STATUS`                   | 作业状态变更事件 | `JobStateEvent`               |
+| `SCHEMA_CHANGE_UPDATE_COLUMNS` | 表结构更新事件  | `AlterTableColumnsEvent`      |
+| `SCHEMA_CHANGE_ADD_COLUMN`     | 表添加列事件   | `AlterTableAddColumnEvent`    |
+| `SCHEMA_CHANGE_DROP_COLUMN`    | 表删除列事件   | `AlterTableDropColumnEvent`   |
+| `SCHEMA_CHANGE_MODIFY_COLUMN`  | 表修改列事件   | `AlterTableModifyColumnEvent` |
+| `READER_OPEN`                  | 读取器打开事件  | `ReaderOpenEvent`             |
+| `READER_CLOSE`                 | 读取器关闭事件  | `ReaderCloseEvent`            |
+| `WRITER_OPEN`                  | 写入器打开事件  | `WriterOpenEvent`             |
+| `WRITER_CLOSE`                 | 写入器关闭事件  | `WriterCloseEvent`            |
+
+> 注意:不同事件类型对应不同的事件数据结构,在自定义事件处理器时需通过`event.getEventType()`进行类型判断,以确保类型安全转换。
+
 ### Event Listener API
 
 您可以自定义事件处理器,例如将事件发送到外部系统。
@@ -112,3 +129,164 @@ seatunnel:
 ### Spark 引擎
 
 您可以定义 `org.apache.seatunnel.api.event.EventHandler` 接口并添加到类路径,SPI会自动加载。
+
+## 自定义事件处理器实现步骤
+
+下面以 `JobStateEvent` 为例,介绍如何实现一个自定义事件处理器,您可以根据需要扩展此方法以处理其他类型的事件。
+
+### 1. 添加依赖
+在项目 `pom.xml` 中引入必要依赖:
+```xml
+<dependency>
+    <groupId>org.apache.seatunnel</groupId>
+    <artifactId>seatunnel-api</artifactId>
+    <version>${seatunnel.version}</version>
+    <scope>provided</scope>
+</dependency>
+<dependency>
+    <groupId>org.apache.seatunnel</groupId>
+    <artifactId>seatunnel-engine-common</artifactId>
+    <version>${seatunnel.version}</version>
+    <scope>provided</scope>
+</dependency>
+```
+> 注意:需将 `${seatunnel.version}` 替换为实际使用的 SeaTunnel 版本。
+
+
+### 2. 实现事件处理器
+自定义类实现 `org.apache.seatunnel.api.event.EventHandler` 接口,并重写 `handle` 
方法,针对需要处理的事件类型进行业务逻辑处理。
+
+**核心逻辑**:通过 `event.getEventType()` 过滤事件类型——由于 SeaTunnel 
引擎会分发多种类型的事件,需显式判断事件类型,以确保仅处理目标事件。
+
+```java
+import lombok.extern.slf4j.Slf4j;
+import org.apache.seatunnel.api.event.Event;
+import org.apache.seatunnel.api.event.EventHandler;
+import org.apache.seatunnel.api.event.EventType;
+import org.apache.seatunnel.engine.common.job.JobStatus;
+import org.apache.seatunnel.engine.common.job.JobStateEvent;
+import org.apache.seatunnel.api.event.schema.AlterTableAddColumnEvent;
+import org.apache.seatunnel.api.event.source.ReaderOpenEvent;
+import org.apache.seatunnel.api.event.sink.WriterCloseEvent;
+
+/**
+ * 自定义多类型事件处理器示例,包含多种事件的处理逻辑
+ */
+@Slf4j
+public class CustomMultiEventHandler implements EventHandler {
+
+    @Override
+    public void handle(Event event) {
+        // 根据事件类型进行不同处理
+        EventType eventType = event.getEventType();
+        
+        switch (eventType) {
+            case JOB_STATUS:
+                handleJobStateEvent((JobStateEvent) event);
+                break;
+            case SCHEMA_CHANGE_ADD_COLUMN:
+                handleAddColumnEvent((AlterTableAddColumnEvent) event);
+                break;
+            case READER_OPEN:
+                handleReaderOpenEvent((ReaderOpenEvent) event);
+                break;
+            case WRITER_CLOSE:
+                handleWriterCloseEvent((WriterCloseEvent) event);
+                break;
+            // 可根据需要添加其他事件类型的处理
+            default:
+                // 忽略不处理的事件类型
+                log.debug("忽略未处理的事件类型: {}", eventType);
+        }
+    }
+
+    /**
+     * 处理作业状态事件
+     */
+    private void handleJobStateEvent(JobStateEvent jobEvent) {
+        String jobId = jobEvent.getJobId();
+        String jobName = jobEvent.getJobName();
+        JobStatus status = jobEvent.getJobStatus();
+        long eventTime = jobEvent.getCreatedTime();
+
+        switch (status) {
+            case FAILED:
+                log.error("任务失败 | jobId: {}, jobName: {}, 时间: {}", 
+                    jobId, jobName, eventTime);
+                // 添加失败告警逻辑
+                sendAlert("任务失败", "jobId: " + jobId);
+                break;
+            case FINISHED:
+                log.info("任务完成 | jobId: {}, jobName: {}, 时间: {}", 
+                    jobId, jobName, eventTime);
+                break;
+            // 处理其他状态...
+            default:
+                log.info("任务状态变更 | jobId: {}, 状态: {}, 时间: {}", 
+                    jobId, status, eventTime);
+        }
+    }
+
+    /**
+     * 处理表添加列事件
+     */
+    private void handleAddColumnEvent(AlterTableAddColumnEvent event) {
+        log.info("表添加列 | 表名: {}, 新增列: {}, 时间: {}",
+            event.getTableName(), event.getAddedColumns(), 
event.getEventTime());
+        // 处理表结构变更逻辑
+    }
+
+    /**
+     * 处理读取器打开事件
+     */
+    private void handleReaderOpenEvent(ReaderOpenEvent event) {
+        log.info("读取器打开 | 插件ID: {}, 并行度: {}, 时间: {}",
+            event.getPluginId(), event.getParallelism(), event.getEventTime());
+        // 处理读取器初始化逻辑
+    }
+
+    /**
+     * 处理写入器关闭事件
+     */
+    private void handleWriterCloseEvent(WriterCloseEvent event) {
+        log.info("写入器关闭 | 插件ID: {}, 处理记录数: {}, 时间: {}",
+            event.getPluginId(), event.getRecordCount(), event.getEventTime());
+        // 处理写入器资源清理逻辑
+    }
+
+    /**
+     * 发送告警通知
+     */
+    private void sendAlert(String title, String content) {
+        // 实现告警逻辑(如调用HTTP接口、发送邮件等)
+        log.info("[告警] {}: {}", title, content);
+    }
+}
+```
+
+
+### 3. 配置 SPI 加载
+为使引擎自动发现并加载自定义处理器,需在项目资源目录中添加 SPI 配置文件:
+
+1. 创建目录:`src/main/resources/META-INF/services/`
+2. 新建文件:`org.apache.seatunnel.api.event.EventHandler`
+3. 在文件中添加自定义处理器的全类名:
+   ```
+   com.example.CustomMultiEventHandler
+   ```
+
+
+### 4. 部署与验证
+- 将包含自定义处理器的 JAR 包放入 SeaTunnel 引擎的类路径(如 `lib/` 目录)
+- 启动任务后,当对应事件发生时,处理器会自动触发并执行相应的处理逻辑
+- 可通过日志输出验证处理器是否生效
+
+
+### 注意事项
+- 处理器逻辑应尽量轻量,避免阻塞事件处理线程
+- 若需网络调用(如发送告警),建议使用异步方式实现,防止超时影响任务本身
+- 不同引擎对事件的支持情况可能不同,例如 `JobStateEvent` 目前仅支持 Zeta 引擎
+- 事件类型与事件类是一一对应的,转换时需确保类型匹配,避免 `ClassCastException`
+- 可以根据业务需求,实现多个事件处理器分别处理不同类型的事件,也可以在一个处理器中处理多种事件类型
+
+通过上述步骤,您可以灵活地监听和处理 SeaTunnel 中的各种事件,实现自定义的业务逻辑,如状态监控、告警通知、数据统计等功能。
\ No newline at end of file
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/event/EventType.java 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/event/EventType.java
index edb1b72f36..26ae749f0b 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/event/EventType.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/event/EventType.java
@@ -30,4 +30,5 @@ public enum EventType {
     LIFECYCLE_READER_CLOSE,
     LIFECYCLE_WRITER_CLOSE,
     READER_MESSAGE_DELAYED,
+    JOB_STATUS,
 }
diff --git 
a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java
 
b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java
index bb7fe67a87..57a4ab9713 100644
--- 
a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java
+++ 
b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java
@@ -37,10 +37,10 @@ import 
org.apache.seatunnel.engine.common.config.EngineConfig;
 import org.apache.seatunnel.engine.common.config.JobConfig;
 import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
 import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
+import org.apache.seatunnel.engine.common.job.JobResult;
+import org.apache.seatunnel.engine.common.job.JobStatus;
 import org.apache.seatunnel.engine.common.runtime.ExecutionMode;
 import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
-import org.apache.seatunnel.engine.core.job.JobResult;
-import org.apache.seatunnel.engine.core.job.JobStatus;
 import org.apache.seatunnel.engine.server.SeaTunnelNodeContext;
 
 import org.apache.commons.lang3.StringUtils;
diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java
index f48ca3f181..1ac59209b2 100644
--- 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java
@@ -28,7 +28,7 @@ import org.apache.seatunnel.engine.client.job.ClientJobProxy;
 import org.apache.seatunnel.engine.common.config.ConfigProvider;
 import org.apache.seatunnel.engine.common.config.JobConfig;
 import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
-import org.apache.seatunnel.engine.core.job.JobStatus;
+import org.apache.seatunnel.engine.common.job.JobStatus;
 import org.apache.seatunnel.engine.server.SeaTunnelServerStarter;
 
 import org.awaitility.Awaitility;
diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceTwoPipelineIT.java
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceTwoPipelineIT.java
index 67352f9d91..dde1feb918 100644
--- 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceTwoPipelineIT.java
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceTwoPipelineIT.java
@@ -27,7 +27,7 @@ import org.apache.seatunnel.engine.client.job.ClientJobProxy;
 import org.apache.seatunnel.engine.common.config.ConfigProvider;
 import org.apache.seatunnel.engine.common.config.JobConfig;
 import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
-import org.apache.seatunnel.engine.core.job.JobStatus;
+import org.apache.seatunnel.engine.common.job.JobStatus;
 import org.apache.seatunnel.engine.server.SeaTunnelServerStarter;
 
 import org.awaitility.Awaitility;
diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterIT.java
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterIT.java
index 76b4f6fc82..ab8d5e1663 100644
--- 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterIT.java
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterIT.java
@@ -23,9 +23,9 @@ import org.apache.seatunnel.engine.client.job.ClientJobProxy;
 import org.apache.seatunnel.engine.common.config.ConfigProvider;
 import org.apache.seatunnel.engine.common.config.JobConfig;
 import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
+import org.apache.seatunnel.engine.common.job.JobResult;
+import org.apache.seatunnel.engine.common.job.JobStatus;
 import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
-import org.apache.seatunnel.engine.core.job.JobResult;
-import org.apache.seatunnel.engine.core.job.JobStatus;
 import org.apache.seatunnel.engine.server.SeaTunnelServerStarter;
 
 import org.awaitility.Awaitility;
diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java
index 1cb5333042..b01a045bcc 100644
--- 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java
@@ -26,8 +26,8 @@ import org.apache.seatunnel.engine.common.Constant;
 import org.apache.seatunnel.engine.common.config.ConfigProvider;
 import org.apache.seatunnel.engine.common.config.JobConfig;
 import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
-import org.apache.seatunnel.engine.core.job.JobResult;
-import org.apache.seatunnel.engine.core.job.JobStatus;
+import org.apache.seatunnel.engine.common.job.JobResult;
+import org.apache.seatunnel.engine.common.job.JobStatus;
 import org.apache.seatunnel.engine.server.SeaTunnelServerStarter;
 import org.apache.seatunnel.engine.server.execution.TaskLocation;
 import org.apache.seatunnel.engine.server.metrics.SeaTunnelMetricsContext;
diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/MultiTableMetricsIT.java
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/MultiTableMetricsIT.java
index c53b3174b1..787c096745 100644
--- 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/MultiTableMetricsIT.java
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/MultiTableMetricsIT.java
@@ -23,7 +23,7 @@ import org.apache.seatunnel.engine.client.job.ClientJobProxy;
 import org.apache.seatunnel.engine.common.config.ConfigProvider;
 import org.apache.seatunnel.engine.common.config.JobConfig;
 import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
-import org.apache.seatunnel.engine.core.job.JobStatus;
+import org.apache.seatunnel.engine.common.job.JobStatus;
 import org.apache.seatunnel.engine.server.SeaTunnelServerStarter;
 import org.apache.seatunnel.engine.server.rest.RestConstant;
 
diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
index b54911cd86..b03163c769 100644
--- 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
@@ -23,7 +23,7 @@ import org.apache.seatunnel.engine.client.job.ClientJobProxy;
 import org.apache.seatunnel.engine.common.config.ConfigProvider;
 import org.apache.seatunnel.engine.common.config.JobConfig;
 import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
-import org.apache.seatunnel.engine.core.job.JobStatus;
+import org.apache.seatunnel.engine.common.job.JobStatus;
 import org.apache.seatunnel.engine.server.SeaTunnelServerStarter;
 import org.apache.seatunnel.engine.server.rest.RestConstant;
 
diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/SeaTunnelSlotIT.java
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/SeaTunnelSlotIT.java
index 33a7be8914..57820b33d2 100644
--- 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/SeaTunnelSlotIT.java
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/SeaTunnelSlotIT.java
@@ -25,7 +25,7 @@ import org.apache.seatunnel.engine.client.job.ClientJobProxy;
 import org.apache.seatunnel.engine.common.config.ConfigProvider;
 import org.apache.seatunnel.engine.common.config.JobConfig;
 import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
-import org.apache.seatunnel.engine.core.job.JobStatus;
+import org.apache.seatunnel.engine.common.job.JobStatus;
 import org.apache.seatunnel.engine.server.SeaTunnelServerStarter;
 
 import org.awaitility.Awaitility;
diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/SplitClusterFaultToleranceIT.java
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/SplitClusterFaultToleranceIT.java
index 1d2421014c..cd9562e97f 100644
--- 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/SplitClusterFaultToleranceIT.java
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/SplitClusterFaultToleranceIT.java
@@ -28,7 +28,7 @@ import org.apache.seatunnel.engine.client.job.ClientJobProxy;
 import org.apache.seatunnel.engine.common.config.ConfigProvider;
 import org.apache.seatunnel.engine.common.config.JobConfig;
 import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
-import org.apache.seatunnel.engine.core.job.JobStatus;
+import org.apache.seatunnel.engine.common.job.JobStatus;
 import org.apache.seatunnel.engine.server.SeaTunnelServerStarter;
 
 import org.awaitility.Awaitility;
diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/TextHeaderIT.java
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/TextHeaderIT.java
index 5f8f64ecc5..e77c3a757e 100644
--- 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/TextHeaderIT.java
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/TextHeaderIT.java
@@ -27,7 +27,7 @@ import org.apache.seatunnel.engine.client.job.ClientJobProxy;
 import org.apache.seatunnel.engine.common.config.ConfigProvider;
 import org.apache.seatunnel.engine.common.config.JobConfig;
 import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
-import org.apache.seatunnel.engine.core.job.JobStatus;
+import org.apache.seatunnel.engine.common.job.JobStatus;
 import org.apache.seatunnel.engine.server.SeaTunnelServerStarter;
 import org.apache.seatunnel.format.text.constant.TextFormatConstant;
 
diff --git 
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobProxy.java
 
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobProxy.java
index ea3940f5f9..e5d15d7346 100644
--- 
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobProxy.java
+++ 
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobProxy.java
@@ -22,12 +22,12 @@ import org.apache.seatunnel.common.utils.RetryUtils;
 import org.apache.seatunnel.engine.client.SeaTunnelHazelcastClient;
 import org.apache.seatunnel.engine.common.Constant;
 import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
+import org.apache.seatunnel.engine.common.job.JobResult;
+import org.apache.seatunnel.engine.common.job.JobStatus;
 import org.apache.seatunnel.engine.common.utils.ExceptionUtil;
 import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
 import org.apache.seatunnel.engine.core.job.Job;
 import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
-import org.apache.seatunnel.engine.core.job.JobResult;
-import org.apache.seatunnel.engine.core.job.JobStatus;
 import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelCancelJobCodec;
 import 
org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetJobStatusCodec;
 import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelSubmitJobCodec;
diff --git 
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobClient.java
 
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobClient.java
index 2421829ea9..035d3bc138 100644
--- 
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobClient.java
+++ 
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobClient.java
@@ -25,12 +25,12 @@ import 
org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.seatunnel.engine.client.SeaTunnelHazelcastClient;
 import org.apache.seatunnel.engine.client.util.ContentFormatUtil;
 import org.apache.seatunnel.engine.common.Constant;
+import org.apache.seatunnel.engine.common.job.JobStatus;
+import org.apache.seatunnel.engine.common.job.JobStatusData;
 import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
 import org.apache.seatunnel.engine.core.job.JobDAGInfo;
 import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
 import org.apache.seatunnel.engine.core.job.JobPipelineCheckpointData;
-import org.apache.seatunnel.engine.core.job.JobStatus;
-import org.apache.seatunnel.engine.core.job.JobStatusData;
 import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelCancelJobCodec;
 import 
org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetJobCheckpointCodec;
 import 
org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetJobDetailStatusCodec;
diff --git 
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobStatusRunner.java
 
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobStatusRunner.java
index bed11b5dd7..1a05935765 100644
--- 
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobStatusRunner.java
+++ 
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobStatusRunner.java
@@ -18,7 +18,7 @@
 package org.apache.seatunnel.engine.client.job;
 
 import org.apache.seatunnel.common.utils.ExceptionUtils;
-import org.apache.seatunnel.engine.core.job.JobStatus;
+import org.apache.seatunnel.engine.common.job.JobStatus;
 
 import lombok.extern.slf4j.Slf4j;
 
diff --git 
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/util/ContentFormatUtil.java
 
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/util/ContentFormatUtil.java
index a03e77e466..0fcd072034 100644
--- 
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/util/ContentFormatUtil.java
+++ 
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/util/ContentFormatUtil.java
@@ -17,7 +17,7 @@
 
 package org.apache.seatunnel.engine.client.util;
 
-import org.apache.seatunnel.engine.core.job.JobStatusData;
+import org.apache.seatunnel.engine.common.job.JobStatusData;
 
 import org.apache.commons.lang3.StringUtils;
 
diff --git 
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/ConnectorPackageClientTest.java
 
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/ConnectorPackageClientTest.java
index 79a240e136..c411a41511 100644
--- 
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/ConnectorPackageClientTest.java
+++ 
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/ConnectorPackageClientTest.java
@@ -35,10 +35,10 @@ import 
org.apache.seatunnel.engine.common.config.ConfigProvider;
 import org.apache.seatunnel.engine.common.config.JobConfig;
 import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
 import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
+import org.apache.seatunnel.engine.common.job.JobStatus;
 import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
 import org.apache.seatunnel.engine.core.job.ConnectorJarIdentifier;
 import org.apache.seatunnel.engine.core.job.ConnectorJarType;
-import org.apache.seatunnel.engine.core.job.JobStatus;
 import org.apache.seatunnel.engine.server.SeaTunnelServerStarter;
 
 import org.apache.commons.lang3.StringUtils;
diff --git 
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/ContentFormatUtilTest.java
 
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/ContentFormatUtilTest.java
index 923bb31cd7..20c2be714a 100644
--- 
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/ContentFormatUtilTest.java
+++ 
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/ContentFormatUtilTest.java
@@ -18,8 +18,8 @@
 package org.apache.seatunnel.engine.client;
 
 import org.apache.seatunnel.engine.client.util.ContentFormatUtil;
-import org.apache.seatunnel.engine.core.job.JobStatus;
-import org.apache.seatunnel.engine.core.job.JobStatusData;
+import org.apache.seatunnel.engine.common.job.JobStatus;
+import org.apache.seatunnel.engine.common.job.JobStatusData;
 
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
diff --git 
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
 
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
index 0c8ecce14a..cc4b8c164e 100644
--- 
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
+++ 
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
@@ -31,10 +31,10 @@ import org.apache.seatunnel.engine.common.Constant;
 import org.apache.seatunnel.engine.common.config.ConfigProvider;
 import org.apache.seatunnel.engine.common.config.JobConfig;
 import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
+import org.apache.seatunnel.engine.common.job.JobStatus;
 import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
 import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
 import org.apache.seatunnel.engine.core.job.JobDAGInfo;
-import org.apache.seatunnel.engine.core.job.JobStatus;
 import org.apache.seatunnel.engine.server.SeaTunnelNodeContext;
 
 import org.apache.commons.lang3.StringUtils;
diff --git 
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelEngineClusterRoleTest.java
 
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelEngineClusterRoleTest.java
index 6607e3e458..a4f8ab4cc4 100644
--- 
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelEngineClusterRoleTest.java
+++ 
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelEngineClusterRoleTest.java
@@ -26,9 +26,9 @@ import org.apache.seatunnel.engine.common.config.EngineConfig;
 import org.apache.seatunnel.engine.common.config.JobConfig;
 import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
 import org.apache.seatunnel.engine.common.config.server.ScheduleStrategy;
+import org.apache.seatunnel.engine.common.job.JobResult;
+import org.apache.seatunnel.engine.common.job.JobStatus;
 import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
-import org.apache.seatunnel.engine.core.job.JobResult;
-import org.apache.seatunnel.engine.core.job.JobStatus;
 import org.apache.seatunnel.engine.server.SeaTunnelServerStarter;
 
 import org.awaitility.Awaitility;
diff --git 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobResult.java
 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/job/JobResult.java
similarity index 95%
rename from 
seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobResult.java
rename to 
seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/job/JobResult.java
index 5f946ffe45..34dc18830c 100644
--- 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobResult.java
+++ 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/job/JobResult.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.engine.core.job;
+package org.apache.seatunnel.engine.common.job;
 
 import lombok.AllArgsConstructor;
 import lombok.Data;
diff --git 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobStatusData.java
 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/job/JobStateEvent.java
similarity index 55%
copy from 
seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobStatusData.java
copy to 
seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/job/JobStateEvent.java
index fbe9656ec6..fdb90b577f 100644
--- 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobStatusData.java
+++ 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/job/JobStateEvent.java
@@ -15,22 +15,34 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.engine.core.job;
+package org.apache.seatunnel.engine.common.job;
 
-import lombok.AllArgsConstructor;
-import lombok.Data;
-import lombok.NoArgsConstructor;
+import org.apache.seatunnel.api.event.Event;
+import org.apache.seatunnel.api.event.EventType;
 
-import java.io.Serializable;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
 
-@AllArgsConstructor
-@Data
-@NoArgsConstructor
-public final class JobStatusData implements Serializable {
-    private Long jobId;
+@Getter
+@Setter
+@ToString
+public class JobStateEvent implements Event {
+
+    private String jobId;
     private String jobName;
     private JobStatus jobStatus;
-    private long submitTime;
-    private Long startTime;
-    private Long finishTime;
+    private long createdTime;
+
+    public JobStateEvent(Long jobId, String jobName, JobStatus jobStatus) {
+        this.jobId = String.valueOf(jobId);
+        this.jobName = jobName;
+        this.jobStatus = jobStatus;
+        this.createdTime = System.currentTimeMillis();
+    }
+
+    @Override
+    public EventType getEventType() {
+        return EventType.JOB_STATUS;
+    }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobStatus.java
 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/job/JobStatus.java
similarity index 98%
rename from 
seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobStatus.java
rename to 
seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/job/JobStatus.java
index 53d1eb6aee..bd7d266a21 100644
--- 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobStatus.java
+++ 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/job/JobStatus.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.engine.core.job;
+package org.apache.seatunnel.engine.common.job;
 
 /** Possible states of a job once it has been accepted by the dispatcher. */
 public enum JobStatus {
diff --git 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobStatusData.java
 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/job/JobStatusData.java
similarity index 96%
rename from 
seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobStatusData.java
rename to 
seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/job/JobStatusData.java
index fbe9656ec6..f88c4fb779 100644
--- 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobStatusData.java
+++ 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/job/JobStatusData.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.engine.core.job;
+package org.apache.seatunnel.engine.common.job;
 
 import lombok.AllArgsConstructor;
 import lombok.Data;
diff --git 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/Job.java
 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/Job.java
index 52fba14205..264c4e95ce 100644
--- 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/Job.java
+++ 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/Job.java
@@ -17,6 +17,8 @@
 
 package org.apache.seatunnel.engine.core.job;
 
+import org.apache.seatunnel.engine.common.job.JobResult;
+import org.apache.seatunnel.engine.common.job.JobStatus;
 import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
 
 /** Job interface define the Running job apis */
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
index 404db7a538..621c5fc91a 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
@@ -36,12 +36,12 @@ import 
org.apache.seatunnel.engine.common.exception.JobException;
 import org.apache.seatunnel.engine.common.exception.JobNotFoundException;
 import org.apache.seatunnel.engine.common.exception.SavePointFailedException;
 import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
+import org.apache.seatunnel.engine.common.job.JobResult;
+import org.apache.seatunnel.engine.common.job.JobStatus;
 import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
 import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
 import org.apache.seatunnel.engine.core.job.JobDAGInfo;
 import org.apache.seatunnel.engine.core.job.JobInfo;
-import org.apache.seatunnel.engine.core.job.JobResult;
-import org.apache.seatunnel.engine.core.job.JobStatus;
 import org.apache.seatunnel.engine.core.job.PipelineStatus;
 import org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex;
 import org.apache.seatunnel.engine.server.dag.physical.PipelineLocation;
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
index 707c7c6f0d..db78e47eb1 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
@@ -23,13 +23,13 @@ import org.apache.seatunnel.api.tracing.MDCTracer;
 import org.apache.seatunnel.engine.checkpoint.storage.PipelineState;
 import org.apache.seatunnel.engine.checkpoint.storage.api.CheckpointStorage;
 import org.apache.seatunnel.engine.common.config.server.CheckpointConfig;
+import org.apache.seatunnel.engine.common.job.JobStatus;
 import org.apache.seatunnel.engine.common.utils.ExceptionUtil;
 import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
 import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
 import org.apache.seatunnel.engine.core.checkpoint.CheckpointIDCounter;
 import org.apache.seatunnel.engine.core.dag.actions.Action;
 import org.apache.seatunnel.engine.core.job.Job;
-import org.apache.seatunnel.engine.core.job.JobStatus;
 import org.apache.seatunnel.engine.core.job.PipelineStatus;
 import 
org.apache.seatunnel.engine.server.checkpoint.operation.TaskAcknowledgeOperation;
 import 
org.apache.seatunnel.engine.server.checkpoint.operation.TaskReportStatusOperation;
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
index 1b6490f31b..25aec1ea18 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
@@ -21,12 +21,13 @@ import org.apache.seatunnel.common.utils.ExceptionUtils;
 import org.apache.seatunnel.common.utils.RetryUtils;
 import org.apache.seatunnel.engine.common.Constant;
 import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
+import org.apache.seatunnel.engine.common.job.JobResult;
+import org.apache.seatunnel.engine.common.job.JobStateEvent;
+import org.apache.seatunnel.engine.common.job.JobStatus;
 import org.apache.seatunnel.engine.common.utils.ExceptionUtil;
 import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
 import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
 import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
-import org.apache.seatunnel.engine.core.job.JobResult;
-import org.apache.seatunnel.engine.core.job.JobStatus;
 import org.apache.seatunnel.engine.core.job.PipelineExecutionState;
 import org.apache.seatunnel.engine.core.job.PipelineStatus;
 import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
@@ -191,11 +192,11 @@ public class PhysicalPlan {
     }
 
     public void cancelJob() {
-        if (getJobStatus().isEndState()) {
+        JobStatus jobStatus = getJobStatus();
+        if (jobStatus.isEndState()) {
             log.warn(
                     String.format(
-                            "%s is in end state %s, can not be cancel",
-                            jobFullName, getJobStatus()));
+                            "%s is in end state %s, can not be cancel", 
jobFullName, jobStatus));
             return;
         }
 
@@ -209,11 +210,11 @@ public class PhysicalPlan {
     }
 
     public void savepointJob() {
-        if (getJobStatus().isEndState()) {
+        JobStatus jobStatus = getJobStatus();
+        if (jobStatus.isEndState()) {
             log.warn(
                     String.format(
-                            "%s is in end state %s, can not do savepoint",
-                            jobFullName, getJobStatus()));
+                            "%s is in end state %s, can not do savepoint", 
jobFullName, jobStatus));
             return;
         }
         updateJobState(JobStatus.DOING_SAVEPOINT);
@@ -318,7 +319,8 @@ public class PhysicalPlan {
             log.warn(String.format("%s state process is stopped", 
jobFullName));
             return;
         }
-        switch (getJobStatus()) {
+        JobStatus jobStatus = getJobStatus();
+        switch (jobStatus) {
             case CREATED:
                 updateJobState(JobStatus.SCHEDULED);
                 break;
@@ -347,10 +349,18 @@ public class PhysicalPlan {
             case SAVEPOINT_DONE:
             case FINISHED:
                 stopJobStateProcess();
-                jobEndFuture.complete(new JobResult(getJobStatus(), 
errorBySubPlan.get()));
+                jobEndFuture.complete(new JobResult(jobStatus, 
errorBySubPlan.get()));
+                jobMaster
+                        .getCoordinatorService()
+                        .getEventProcessor()
+                        .process(
+                                new JobStateEvent(
+                                        jobImmutableInformation.getJobId(),
+                                        
jobImmutableInformation.getJobConfig().getName(),
+                                        jobStatus));
                 return;
             default:
-                throw new IllegalArgumentException("Unknown Job State: " + 
getJobStatus());
+                throw new IllegalArgumentException("Unknown Job State: " + 
jobStatus);
         }
     }
 
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobHistoryService.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobHistoryService.java
index e158656712..c8610c538f 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobHistoryService.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobHistoryService.java
@@ -24,11 +24,11 @@ import 
org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ObjectNode
 
 import org.apache.seatunnel.api.common.metrics.JobMetrics;
 import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
+import org.apache.seatunnel.engine.common.job.JobStatus;
+import org.apache.seatunnel.engine.common.job.JobStatusData;
 import org.apache.seatunnel.engine.core.job.ExecutionAddress;
 import org.apache.seatunnel.engine.core.job.JobDAGInfo;
 import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
-import org.apache.seatunnel.engine.core.job.JobStatus;
-import org.apache.seatunnel.engine.core.job.JobStatusData;
 import org.apache.seatunnel.engine.core.job.PipelineStatus;
 import org.apache.seatunnel.engine.server.dag.physical.PipelineLocation;
 import org.apache.seatunnel.engine.server.execution.ExecutionState;
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index 273749e5be..ee19e4dd85 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -38,6 +38,8 @@ import org.apache.seatunnel.engine.common.config.JobConfig;
 import org.apache.seatunnel.engine.common.config.server.CheckpointConfig;
 import 
org.apache.seatunnel.engine.common.config.server.CheckpointStorageConfig;
 import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
+import org.apache.seatunnel.engine.common.job.JobResult;
+import org.apache.seatunnel.engine.common.job.JobStatus;
 import org.apache.seatunnel.engine.common.utils.ExceptionUtil;
 import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
 import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
@@ -49,9 +51,8 @@ import org.apache.seatunnel.engine.core.job.ExecutionAddress;
 import org.apache.seatunnel.engine.core.job.JobDAGInfo;
 import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
 import org.apache.seatunnel.engine.core.job.JobInfo;
-import org.apache.seatunnel.engine.core.job.JobResult;
-import org.apache.seatunnel.engine.core.job.JobStatus;
 import org.apache.seatunnel.engine.core.job.PipelineStatus;
+import org.apache.seatunnel.engine.server.CoordinatorService;
 import org.apache.seatunnel.engine.server.SeaTunnelServer;
 import org.apache.seatunnel.engine.server.checkpoint.CheckpointManager;
 import org.apache.seatunnel.engine.server.checkpoint.CheckpointPlan;
@@ -1079,4 +1080,8 @@ public class JobMaster {
     public EngineConfig getEngineConfig() {
         return this.engineConfig;
     }
+
+    public CoordinatorService getCoordinatorService() {
+        return this.seaTunnelServer.getCoordinatorService();
+    }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetJobStatusOperation.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetJobStatusOperation.java
index c70ad6453a..ab724f4f4e 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetJobStatusOperation.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetJobStatusOperation.java
@@ -17,8 +17,8 @@
 
 package org.apache.seatunnel.engine.server.operation;
 
+import org.apache.seatunnel.engine.common.job.JobStatus;
 import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
-import org.apache.seatunnel.engine.core.job.JobStatus;
 import org.apache.seatunnel.engine.server.SeaTunnelServer;
 import 
org.apache.seatunnel.engine.server.serializable.ClientToServerOperationDataSerializerHook;
 
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/opeartion/GetOverviewOperation.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/opeartion/GetOverviewOperation.java
index 8b2533ece5..280e0ccdfc 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/opeartion/GetOverviewOperation.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/opeartion/GetOverviewOperation.java
@@ -18,7 +18,7 @@
 package org.apache.seatunnel.engine.server.resourcemanager.opeartion;
 
 import org.apache.seatunnel.engine.common.Constant;
-import org.apache.seatunnel.engine.core.job.JobStatus;
+import org.apache.seatunnel.engine.common.job.JobStatus;
 import org.apache.seatunnel.engine.server.SeaTunnelServer;
 import org.apache.seatunnel.engine.server.master.JobHistoryService.JobState;
 import org.apache.seatunnel.engine.server.resourcemanager.ResourceManager;
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseService.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseService.java
index 05e55a9378..eedcb2da2b 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseService.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseService.java
@@ -28,6 +28,7 @@ import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.common.utils.DateTimeUtils;
 import org.apache.seatunnel.engine.common.Constant;
 import org.apache.seatunnel.engine.common.config.JobConfig;
+import org.apache.seatunnel.engine.common.job.JobStatus;
 import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
 import org.apache.seatunnel.engine.core.classloader.ClassLoaderService;
 import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
@@ -35,7 +36,6 @@ import org.apache.seatunnel.engine.core.job.ExecutionAddress;
 import org.apache.seatunnel.engine.core.job.JobDAGInfo;
 import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
 import org.apache.seatunnel.engine.core.job.JobInfo;
-import org.apache.seatunnel.engine.core.job.JobStatus;
 import org.apache.seatunnel.engine.server.CoordinatorService;
 import org.apache.seatunnel.engine.server.SeaTunnelServer;
 import org.apache.seatunnel.engine.server.dag.DAGUtils;
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/ConnectorPackageServiceTest.java
 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/ConnectorPackageServiceTest.java
index 6d9bf2f447..3b5abb8b70 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/ConnectorPackageServiceTest.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/ConnectorPackageServiceTest.java
@@ -32,6 +32,7 @@ import 
org.apache.seatunnel.engine.common.config.ConfigProvider;
 import org.apache.seatunnel.engine.common.config.JobConfig;
 import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
 import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
+import org.apache.seatunnel.engine.common.job.JobStatus;
 import org.apache.seatunnel.engine.common.utils.IdGenerator;
 import org.apache.seatunnel.engine.common.utils.MDUtil;
 import org.apache.seatunnel.engine.core.dag.actions.Action;
@@ -42,7 +43,6 @@ import org.apache.seatunnel.engine.core.job.ConnectorJar;
 import org.apache.seatunnel.engine.core.job.ConnectorJarIdentifier;
 import org.apache.seatunnel.engine.core.job.ConnectorJarType;
 import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
-import org.apache.seatunnel.engine.core.job.JobStatus;
 import org.apache.seatunnel.engine.core.job.PipelineStatus;
 import org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser;
 import org.apache.seatunnel.engine.server.service.jar.ConnectorPackageService;
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/CoordinatorServiceTest.java
 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/CoordinatorServiceTest.java
index 941c6e31fe..d7913b56ee 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/CoordinatorServiceTest.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/CoordinatorServiceTest.java
@@ -21,9 +21,9 @@ import org.apache.seatunnel.engine.common.Constant;
 import org.apache.seatunnel.engine.common.config.ConfigProvider;
 import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
 import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
+import org.apache.seatunnel.engine.common.job.JobStatus;
 import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
 import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
-import org.apache.seatunnel.engine.core.job.JobStatus;
 import org.apache.seatunnel.engine.core.job.PipelineStatus;
 import org.apache.seatunnel.engine.server.operation.PrintMessageOperation;
 import org.apache.seatunnel.engine.server.operation.ReturnRetryTimesOperation;
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/CoordinatorServiceWithCancelPendingJobTest.java
 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/CoordinatorServiceWithCancelPendingJobTest.java
index 8dd46b9c77..b2e57bedf0 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/CoordinatorServiceWithCancelPendingJobTest.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/CoordinatorServiceWithCancelPendingJobTest.java
@@ -20,12 +20,12 @@ package org.apache.seatunnel.engine.server;
 import org.apache.seatunnel.engine.common.config.EngineConfig;
 import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
 import org.apache.seatunnel.engine.common.config.server.ScheduleStrategy;
+import org.apache.seatunnel.engine.common.job.JobStatus;
 import org.apache.seatunnel.engine.common.runtime.ExecutionMode;
 import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
 import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
 import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
 import org.apache.seatunnel.engine.core.job.JobInfo;
-import org.apache.seatunnel.engine.core.job.JobStatus;
 import org.apache.seatunnel.engine.core.job.PipelineStatus;
 import org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex;
 import org.apache.seatunnel.engine.server.dag.physical.PipelineLocation;
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointErrorRestoreEndTest.java
 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointErrorRestoreEndTest.java
index 4893bd2c2b..643c486962 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointErrorRestoreEndTest.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointErrorRestoreEndTest.java
@@ -17,7 +17,7 @@
 
 package org.apache.seatunnel.engine.server.checkpoint;
 
-import org.apache.seatunnel.engine.core.job.JobStatus;
+import org.apache.seatunnel.engine.common.job.JobStatus;
 import org.apache.seatunnel.engine.server.AbstractSeaTunnelServerTest;
 import org.apache.seatunnel.engine.server.master.JobMaster;
 
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManagerTest.java
 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManagerTest.java
index 664bc04395..7a499b2046 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManagerTest.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManagerTest.java
@@ -23,9 +23,9 @@ import 
org.apache.seatunnel.engine.checkpoint.storage.api.CheckpointStorageFacto
 import 
org.apache.seatunnel.engine.checkpoint.storage.exception.CheckpointStorageException;
 import org.apache.seatunnel.engine.common.config.server.CheckpointConfig;
 import 
org.apache.seatunnel.engine.common.config.server.CheckpointStorageConfig;
+import org.apache.seatunnel.engine.common.job.JobStatus;
 import org.apache.seatunnel.engine.common.utils.FactoryUtil;
 import org.apache.seatunnel.engine.core.checkpoint.CheckpointType;
-import org.apache.seatunnel.engine.core.job.JobStatus;
 import org.apache.seatunnel.engine.core.job.PipelineStatus;
 import org.apache.seatunnel.engine.serializer.protobuf.ProtoStuffSerializer;
 import org.apache.seatunnel.engine.server.AbstractSeaTunnelServerTest;
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointStorageTest.java
 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointStorageTest.java
index 3b054ffb6d..f0f4ca22c8 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointStorageTest.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointStorageTest.java
@@ -23,8 +23,8 @@ import 
org.apache.seatunnel.engine.checkpoint.storage.api.CheckpointStorage;
 import 
org.apache.seatunnel.engine.checkpoint.storage.exception.CheckpointStorageException;
 import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
 import org.apache.seatunnel.engine.common.config.server.CheckpointConfig;
+import org.apache.seatunnel.engine.common.job.JobStatus;
 import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
-import org.apache.seatunnel.engine.core.job.JobStatus;
 import org.apache.seatunnel.engine.server.AbstractSeaTunnelServerTest;
 import org.apache.seatunnel.engine.server.CheckpointService;
 
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointTimeOutTest.java
 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointTimeOutTest.java
index 99fcfa4ab9..af118329ae 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointTimeOutTest.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointTimeOutTest.java
@@ -17,10 +17,10 @@
 
 package org.apache.seatunnel.engine.server.checkpoint;
 
+import org.apache.seatunnel.engine.common.job.JobStatus;
 import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
 import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
 import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
-import org.apache.seatunnel.engine.core.job.JobStatus;
 import org.apache.seatunnel.engine.server.AbstractSeaTunnelServerTest;
 import org.apache.seatunnel.engine.server.TestUtils;
 
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/SavePointTest.java
 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/SavePointTest.java
index 038127dbcb..51bf4960be 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/SavePointTest.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/SavePointTest.java
@@ -19,8 +19,8 @@ package org.apache.seatunnel.engine.server.checkpoint;
 
 import org.apache.seatunnel.common.utils.FileUtils;
 import org.apache.seatunnel.engine.common.exception.SavePointFailedException;
+import org.apache.seatunnel.engine.common.job.JobStatus;
 import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
-import org.apache.seatunnel.engine.core.job.JobStatus;
 import org.apache.seatunnel.engine.server.AbstractSeaTunnelServerTest;
 
 import org.junit.jupiter.api.Assertions;
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/event/JobStateEventTest.java
 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/event/JobStateEventTest.java
new file mode 100644
index 0000000000..f072cb78b3
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/event/JobStateEventTest.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.event;
+
+import org.apache.seatunnel.api.event.EventHandler;
+import org.apache.seatunnel.api.event.EventType;
+import org.apache.seatunnel.common.utils.ReflectionUtils;
+import org.apache.seatunnel.engine.common.job.JobStateEvent;
+import org.apache.seatunnel.engine.common.job.JobStatus;
+import org.apache.seatunnel.engine.server.AbstractSeaTunnelServerTest;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static 
org.apache.seatunnel.engine.server.checkpoint.CheckpointErrorRestoreEndTest.STREAM_CONF_WITH_ERROR_PATH;
+import static org.awaitility.Awaitility.await;
+
+public class JobStateEventTest extends AbstractSeaTunnelServerTest {
+
+    @Test
+    public void testJobStateEvent() {
+
+        JobEventProcessor eventProcessor =
+                (JobEventProcessor) 
server.getCoordinatorService().getEventProcessor();
+
+        AtomicInteger accessCounter = new AtomicInteger(0);
+        AtomicReference<JobStateEvent> jobStateEventReference = new 
AtomicReference<>();
+        EventHandler eventHandler =
+                event -> {
+                    if (event.getEventType() != EventType.JOB_STATUS) {
+                        return;
+                    }
+                    JobStateEvent jobStateEvent = (JobStateEvent) event;
+                    JobStatus status = jobStateEvent.getJobStatus();
+                    switch (status) {
+                        case FAILED:
+                        case CANCELED:
+                        case SAVEPOINT_DONE:
+                        case FINISHED:
+                            accessCounter.incrementAndGet();
+                            jobStateEventReference.lazySet(jobStateEvent);
+                            break;
+                        default:
+                            break;
+                    }
+                };
+        // register the event handler
+        List<EventHandler> handlers =
+                (List<EventHandler>) ReflectionUtils.getField(eventProcessor, 
"handlers").get();
+        handlers.add(eventHandler);
+        long jobId_finished = System.currentTimeMillis();
+        long currentTimeMillis = System.currentTimeMillis();
+        startJob(jobId_finished, "fake_to_console.conf", false);
+        await().atMost(60, TimeUnit.SECONDS)
+                .untilAsserted(
+                        () ->
+                                Assertions.assertEquals(
+                                        JobStatus.FINISHED,
+                                        server.getCoordinatorService()
+                                                
.getJobStatus(jobId_finished)));
+        // check whether the event handler is executed
+        Assertions.assertEquals(1, accessCounter.get());
+        JobStateEvent jobStateEventFinished = jobStateEventReference.get();
+        Assertions.assertEquals(String.valueOf(jobId_finished), 
jobStateEventFinished.getJobId());
+        Assertions.assertEquals(JobStatus.FINISHED, 
jobStateEventFinished.getJobStatus());
+        Assertions.assertTrue(jobStateEventFinished.getCreatedTime() > 
currentTimeMillis);
+        Assertions.assertEquals(String.valueOf(jobId_finished), 
jobStateEventFinished.getJobName());
+
+        long jobId_failed = System.currentTimeMillis();
+        startJob(jobId_failed, STREAM_CONF_WITH_ERROR_PATH, false);
+        await().atMost(60, TimeUnit.SECONDS)
+                .untilAsserted(
+                        () ->
+                                Assertions.assertEquals(
+                                        JobStatus.FAILED,
+                                        
server.getCoordinatorService().getJobStatus(jobId_failed)));
+
+        Assertions.assertEquals(2, accessCounter.get());
+        JobStateEvent jobStateEventFailed = jobStateEventReference.get();
+        Assertions.assertEquals(String.valueOf(jobId_failed), 
jobStateEventFailed.getJobId());
+        Assertions.assertEquals(JobStatus.FAILED, 
jobStateEventFailed.getJobStatus());
+        Assertions.assertTrue(jobStateEventFailed.getCreatedTime() > 
currentTimeMillis);
+        Assertions.assertEquals(String.valueOf(jobId_failed), 
jobStateEventFailed.getJobName());
+    }
+}
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobHistoryServiceTest.java
 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobHistoryServiceTest.java
index 4744eb59b5..50e5fcb317 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobHistoryServiceTest.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobHistoryServiceTest.java
@@ -18,11 +18,11 @@
 package org.apache.seatunnel.engine.server.master;
 
 import org.apache.seatunnel.common.utils.JsonUtils;
+import org.apache.seatunnel.engine.common.job.JobStatus;
+import org.apache.seatunnel.engine.common.job.JobStatusData;
 import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
 import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
 import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
-import org.apache.seatunnel.engine.core.job.JobStatus;
-import org.apache.seatunnel.engine.core.job.JobStatusData;
 import org.apache.seatunnel.engine.server.AbstractSeaTunnelServerTest;
 import org.apache.seatunnel.engine.server.TestUtils;
 
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMasterTest.java
 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMasterTest.java
index b13bc3150e..53d2f377c1 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMasterTest.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMasterTest.java
@@ -18,12 +18,12 @@
 package org.apache.seatunnel.engine.server.master;
 
 import org.apache.seatunnel.engine.common.Constant;
+import org.apache.seatunnel.engine.common.job.JobResult;
+import org.apache.seatunnel.engine.common.job.JobStatus;
 import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
 import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
 import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
 import org.apache.seatunnel.engine.core.job.JobInfo;
-import org.apache.seatunnel.engine.core.job.JobResult;
-import org.apache.seatunnel.engine.core.job.JobStatus;
 import org.apache.seatunnel.engine.core.job.PipelineStatus;
 import org.apache.seatunnel.engine.server.AbstractSeaTunnelServerTest;
 import org.apache.seatunnel.engine.server.TestUtils;
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMetricsTest.java
 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMetricsTest.java
index 8fcd93eb0c..fa818f6ff3 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMetricsTest.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMetricsTest.java
@@ -18,7 +18,7 @@
 package org.apache.seatunnel.engine.server.master;
 
 import org.apache.seatunnel.api.common.metrics.JobMetrics;
-import org.apache.seatunnel.engine.core.job.JobStatus;
+import org.apache.seatunnel.engine.common.job.JobStatus;
 import org.apache.seatunnel.engine.server.AbstractSeaTunnelServerTest;
 import org.apache.seatunnel.engine.server.CoordinatorService;
 

Reply via email to