This is an automated email from the ASF dual-hosted git repository.

corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 2475f1d568 [Feature][Core] Add REST API for viewing Pending queue 
details (#10078)
2475f1d568 is described below

commit 2475f1d568f6e6caef20f2119b4f3754e4c47b4f
Author: Jast <[email protected]>
AuthorDate: Tue Nov 25 23:09:25 2025 +0800

    [Feature][Core] Add REST API for viewing Pending queue details (#10078)
---
 docs/en/seatunnel-engine/rest-api-v2.md            | 155 +++++++++-
 docs/zh/seatunnel-engine/rest-api-v2.md            | 153 +++++++++-
 .../seatunnel/engine/e2e/PendingJobsRestIT.java    | 180 +++++++++++
 .../src/test/resources/pending_jobs_streaming.conf |  42 +++
 .../engine/server/CoordinatorService.java          |  83 ++++++
 .../seatunnel/engine/server/JettyService.java      |   4 +
 .../server/diagnostic/PendingClusterSnapshot.java  |  37 +++
 .../diagnostic/PendingDiagnosticsCollector.java    | 329 +++++++++++++++++++++
 .../server/diagnostic/PendingJobDiagnostic.java    |  53 ++++
 .../server/diagnostic/PendingJobsResponse.java     |  35 +++
 .../diagnostic/PendingPipelineDiagnostic.java      |  38 +++
 .../server/diagnostic/PendingQueueSummary.java     |  35 +++
 .../diagnostic/PendingTaskGroupDiagnostic.java     |  38 +++
 .../diagnostic/WorkerResourceDiagnostic.java       |  40 +++
 .../engine/server/execution/PendingJobInfo.java    |  35 +++
 .../opeartion/GetPendingJobsOperation.java         |  88 ++++++
 .../seatunnel/engine/server/rest/RestConstant.java |   4 +
 .../server/rest/service/PendingJobsService.java    |  45 +++
 .../server/rest/servlet/PendingJobsServlet.java    | 151 ++++++++++
 .../serializable/ResourceDataSerializerHook.java   |   5 +
 .../PendingDiagnosticsCollectorTest.java           | 114 +++++++
 21 files changed, 1662 insertions(+), 2 deletions(-)

diff --git a/docs/en/seatunnel-engine/rest-api-v2.md 
b/docs/en/seatunnel-engine/rest-api-v2.md
index edaa8721ae..2ba5504af4 100644
--- a/docs/en/seatunnel-engine/rest-api-v2.md
+++ b/docs/en/seatunnel-engine/rest-api-v2.md
@@ -123,6 +123,159 @@ Please refer [security](security.md)
 
 
------------------------------------------------------------------------------------------
 
+### Returns Diagnostic Information For Pending Jobs
+
+<details>
+ <summary><code>GET</code> 
<code><b>/pending-jobs?jobId=123&limit=10</b></code> <code>(Inspect the pending 
queue, slot usage and blocking reasons.)</code></summary>
+
+#### Parameters
+
+> |   name   |   type   | data type | description                              
                                   |
+> 
|----------|----------|-----------|-----------------------------------------------------------------------------|
+> | jobId    | optional | long      | If set, only returns the diagnostics for 
the specified job. When both `jobId` and `limit` are provided, `jobId` takes 
precedence and `limit` is ignored. |
+> | limit    | optional | integer   | Limits the number of jobs returned. This 
parameter is ignored when `jobId` is provided. |
+> | pretty   | optional | boolean   | When `true`, pretty-print JSON and 
format timestamp fields.                 |
+
+#### Responses
+
+```json
+{
+  "queueSummary": {
+    "size": 2,
+    "scheduleStrategy": "WAIT",
+    "oldestEnqueueTimestamp": 1717500000000,
+    "newestEnqueueTimestamp": 1717500005000,
+    "lackingTaskGroups": 6
+  },
+  "clusterSnapshot": {
+    "totalSlots": 8,
+    "freeSlots": 1,
+    "assignedSlots": 7,
+    "workerCount": 2,
+    "workers": [
+      {
+        "address": "10.0.0.8:5801",
+        "tags": {
+          "zone": "az1"
+        },
+        "totalSlots": 4,
+        "freeSlots": 0,
+        "dynamicSlot": false,
+        "cpuUsage": 0.83,
+        "memUsage": 0.64,
+        "runningJobIds": [
+          1001,
+          1002
+        ]
+      }
+    ]
+  },
+  "pendingJobs": [
+    {
+      "jobId": 1003,
+      "jobName": "cdc_mysql_to_es",
+      "pendingSourceState": "SUBMIT",
+      "jobStatus": "PENDING",
+      "enqueueTimestamp": 1717500000000,
+      "checkTime": 1717500005000,
+      "waitDurationMs": 5000,
+      "checkCount": 3,
+      "totalTaskGroups": 16,
+      "allocatedTaskGroups": 10,
+      "lackingTaskGroups": 6,
+      "failureReason": "REQUEST_FAILED",
+      "failureMessage": "NoEnoughResourceException: can't apply resource 
request",
+      "tagFilter": {},
+      "blockingJobIds": [
+        1001
+      ],
+      "pipelines": [
+        {
+          "pipelineId": 1,
+          "pipelineName": "Job job-name, Pipeline: [(1/2)]",
+          "totalTaskGroups": 8,
+          "allocatedTaskGroups": 5,
+          "lackingTaskGroups": 3,
+          "taskGroupDiagnostics": [
+            {
+              "taskGroupLocation": {
+                "jobId": 1003,
+                "pipelineId": 1,
+                "taskGroupId": 1
+              },
+              "taskFullName": "Source[0]",
+              "allocated": false,
+              "failureReason": "REQUEST_FAILED",
+              "failureMessage": "NoEnoughResourceException: slot not enough"
+            }
+          ]
+        }
+      ],
+      "lackingTaskGroupDiagnostics": [
+        {
+          "taskGroupLocation": {
+            "jobId": 1003,
+            "pipelineId": 1,
+            "taskGroupId": 1
+          },
+          "taskFullName": "Source[0]",
+          "allocated": false,
+          "failureReason": "REQUEST_FAILED",
+          "failureMessage": "NoEnoughResourceException: slot not enough"
+        }
+      ]
+    }
+  ]
+}
+```
+
+When `pretty=true`, the endpoint returns a pretty-printed JSON response and 
formats `oldestEnqueueTimestamp`, `newestEnqueueTimestamp`, `enqueueTimestamp`, 
and `checkTime` as `yyyy-MM-dd HH:mm:ss`.
+
+This endpoint helps troubleshoot why jobs stay in `PENDING` by showing the 
pending queue order, aggregated resource view, and per task-group slot request 
failures (tag mismatch, worker busy, resource exhausted, etc.).
+
+**Pending Jobs Response Fields**
+
+- **queueSummary** – overview of the entire pending queue.
+  - `size`: number of jobs currently pending.
+  - `scheduleStrategy`: strategy in use (e.g. `WAIT`, `FAIL_FAST`) that 
dictates what happens when resources are insufficient.
+  - `oldestEnqueueTimestamp` / `newestEnqueueTimestamp`: timestamps (ms) of 
the oldest/latest job in the queue.
+  - `lackingTaskGroups`: total TaskGroup count still waiting for slots. 
**Note**: This value reflects only the jobs included in the current response 
(i.e., the subset limited by the `limit` parameter or filtered by `jobId`), not 
the entire pending queue. To view the complete statistics for all pending jobs, 
call this API without the `limit` parameter.
+- **clusterSnapshot** – cluster resource snapshot (can be filtered by tags).
+  - `totalSlots` / `assignedSlots` / `freeSlots`: total, allocated and 
remaining slots in the filtered view.
+  - `workerCount`: number of workers that match the tag filters.
+  - `workers[]`: per-worker details:
+    - `address`: host:port of the worker.
+    - `tags`: worker-level tags.
+    - `totalSlots` / `freeSlots`: slot capacity and available slot count on 
that worker.
+    - `dynamicSlot`: whether the worker uses dynamic slot allocation.
+    - `cpuUsage` / `memUsage`: sampled system load (only present when 
`slot-allocate-strategy` is `SYSTEM_LOAD`).
+    - `runningJobIds[]`: jobs currently occupying slots on that worker (helps 
identify blockers).
+- **pendingJobs[]** – diagnostics for each pending job.
+  - `jobId` / `jobName`: identifiers.
+  - `pendingSourceState`: whether the job comes from a new submission 
(`SUBMIT`) or master switch restore (`RESTORE`).
+  - `jobStatus`: status recorded in the physical plan (typically `PENDING`).
+  - `enqueueTimestamp`: when the job entered the pending queue.
+  - `checkTime`: timestamp of the latest diagnostic snapshot.
+  - `waitDurationMs`: `checkTime - enqueueTimestamp`.
+  - `checkCount`: how many times the scheduler has checked this job.
+  - `totalTaskGroups` / `allocatedTaskGroups` / `lackingTaskGroups`: TaskGroup 
totals vs. assigned vs. lacking.
+  - `failureReason` / `failureMessage`: classified cause (e.g. 
`RESOURCE_NOT_ENOUGH`, `REQUEST_FAILED`) plus raw message.
+  - `tagFilter`: worker tag requirements declared by the job (if any).
+  - `blockingJobIds[]`: other jobs that currently occupy the required slots.
+  - `pipelines[]`: per-pipeline breakdown.
+    - `pipelineId` / `pipelineName`.
+    - `totalTaskGroups` / `allocatedTaskGroups` / `lackingTaskGroups`.
+    - `taskGroupDiagnostics[]` (per TaskGroup slot request state):
+      - `taskGroupLocation` (`jobId`, `pipelineId`, `taskGroupId`).
+      - `taskFullName`: human-readable name (source/sink, etc.).
+      - `allocated`: whether the slot request succeeded.
+      - `failureReason` / `failureMessage`: task-level cause when allocation 
failed.
+  - `lackingTaskGroupDiagnostics[]`: flattened list of `allocated=false` 
TaskGroups for quick review.
+
+</details>
+
+------------------------------------------------------------------------------------------
+
 ### Return Details Of A Job
 
 <details>
@@ -956,4 +1109,4 @@ To get the metrics, you need to open `Telemetry` first, or 
you will get an empty
 
 More information about `Telemetry` can be found in the 
[Telemetry](telemetry.md) documentation.
 
-</details>
\ No newline at end of file
+</details>
diff --git a/docs/zh/seatunnel-engine/rest-api-v2.md 
b/docs/zh/seatunnel-engine/rest-api-v2.md
index 97c7c6406c..9b72a38300 100644
--- a/docs/zh/seatunnel-engine/rest-api-v2.md
+++ b/docs/zh/seatunnel-engine/rest-api-v2.md
@@ -119,6 +119,157 @@ seatunnel:
 
 
------------------------------------------------------------------------------------------
 
+### 查看 Pending 队列详细信息
+
+<details>
+ <summary><code>GET</code> 
<code><b>/pending-jobs?jobId=123&limit=10</b></code> <code>(用于排查作业长时间处于 PENDING 
的原因。)</code></summary>
+
+#### 参数
+
+> | 参数名称 | 是否必传 | 参数类型 | 描述                             |
+> |----------|----------|----------|--------------------------------|
+> | jobId    | 可选     | long     | 只查看指定作业的诊断信息。当同时提供 `jobId` 和 `limit` 
时,`jobId` 优先生效,`limit` 将被忽略。 |
+> | limit    | 可选     | integer  | 限制返回的PENDING作业数量。当提供 `jobId` 参数时此参数将被忽略。 |
+> | pretty   | 可选     | boolean  | 传入 `true` 时返回格式化 JSON,并格式化时间戳。   |
+
+#### 响应
+
+```json
+{
+  "queueSummary": {
+    "size": 2,
+    "scheduleStrategy": "WAIT",
+    "oldestEnqueueTimestamp": 1717500000000,
+    "newestEnqueueTimestamp": 1717500005000,
+    "lackingTaskGroups": 6
+  },
+  "clusterSnapshot": {
+    "totalSlots": 8,
+    "freeSlots": 1,
+    "assignedSlots": 7,
+    "workerCount": 2,
+    "workers": [
+      {
+        "address": "10.0.0.8:5801",
+        "tags": {
+          "zone": "az1"
+        },
+        "totalSlots": 4,
+        "freeSlots": 0,
+        "dynamicSlot": false,
+        "cpuUsage": 0.83,
+        "memUsage": 0.64,
+        "runningJobIds": [
+          1001,
+          1002
+        ]
+      }
+    ]
+  },
+  "pendingJobs": [
+    {
+      "jobId": 1003,
+      "jobName": "cdc_mysql_to_es",
+      "pendingSourceState": "SUBMIT",
+      "jobStatus": "PENDING",
+      "enqueueTimestamp": 1717500000000,
+      "checkTime": 1717500005000,
+      "waitDurationMs": 5000,
+      "checkCount": 3,
+      "totalTaskGroups": 16,
+      "allocatedTaskGroups": 10,
+      "lackingTaskGroups": 6,
+      "failureReason": "REQUEST_FAILED",
+      "failureMessage": "NoEnoughResourceException: can't apply resource 
request",
+      "tagFilter": {},
+      "blockingJobIds": [
+        1001
+      ],
+      "pipelines": [
+        {
+          "pipelineId": 1,
+          "pipelineName": "Job job-name, Pipeline: [(1/2)]",
+          "totalTaskGroups": 8,
+          "allocatedTaskGroups": 5,
+          "lackingTaskGroups": 3,
+          "taskGroupDiagnostics": [
+            {
+              "taskGroupLocation": {
+                "jobId": 1003,
+                "pipelineId": 1,
+                "taskGroupId": 1
+              },
+              "taskFullName": "Source[0]",
+              "allocated": false,
+              "failureReason": "REQUEST_FAILED",
+              "failureMessage": "NoEnoughResourceException: slot not enough"
+            }
+          ]
+        }
+      ],
+      "lackingTaskGroupDiagnostics": [
+        {
+          "taskGroupLocation": {
+            "jobId": 1003,
+            "pipelineId": 1,
+            "taskGroupId": 1
+          },
+          "taskFullName": "Source[0]",
+          "allocated": false,
+          "failureReason": "REQUEST_FAILED",
+          "failureMessage": "NoEnoughResourceException: slot not enough"
+        }
+      ]
+    }
+  ]
+}
+```
+
+当 `pretty=true` 时,接口会返回格式化后的 JSON,并把 
`oldestEnqueueTimestamp`、`newestEnqueueTimestamp`、`enqueueTimestamp`、`checkTime`
 转为 `yyyy-MM-dd HH:mm:ss` 字符串,方便排查。
+
+响应中包含:
+
+- **queueSummary**:Pending 队列整体信息总结
+  - `size`:当前排队的 Job 数量。
+  - `scheduleStrategy`:调度策略,决定资源不足时的处理方式。
+  - `oldestEnqueueTimestamp` / `newestEnqueueTimestamp`:最久/最新进入 Pending 队列 Job 
的时间戳(毫秒)。
+  - `lackingTaskGroups`:尚未分配 Slot 的 TaskGroup 数量。**注意**:该值仅统计当前响应中返回的作业子集(即受 
`limit` 参数限制或 `jobId` 过滤后的作业),而非整个 Pending 队列的完整统计。如需查看所有 Pending 作业的完整统计信息,请不带 
`limit` 参数调用此接口。
+- **clusterSnapshot**:当前集群的资源视图。
+  - `totalSlots` / `assignedSlots` / `freeSlots`:Slot 总数、已分配数、剩余数。
+  - `workerCount`:Worker 数量。
+  - `workers[]`:
+    - `address`:Worker 地址(host:port)。
+    - `tags`:Worker 自带的标签。
+    - `totalSlots` / `freeSlots`:Worker 的 Slot 总数与剩余数。
+    - `dynamicSlot`:是否启用动态 Slot。
+    - `cpuUsage` / `memUsage`:系统负载采样(只有当 `slot-allocate-strategy: SYSTEM_LOAD` 
才会有该值)
+    - `runningJobIds[]`:当前占用 Worker Slot 的 JobId 列表。
+- **pendingJobs[]**:队列中的每个 Job 的诊断信息。
+  - `jobId` / `jobName`:作业标识。
+  - `pendingSourceState`:取值:`SUBMIT`,`RESTORE`。
+  - `jobStatus`:物理计划记录的状态(固定为 `PENDING`)。
+  - `enqueueTimestamp`:进入 Pending 队列的时间。
+  - `checkTime`:最近一次Pending检查时间。
+  - `waitDurationMs`:等待时长(`checkTime - enqueueTimestamp`)。
+  - `checkCount`:已被调度线程检查的次数。
+  - `totalTaskGroups` / `allocatedTaskGroups` / `lackingTaskGroups`:Job 全部 
TaskGroup 数量、已分配 Slot 的数量、缺少 Slot 的数量。
+  - `failureReason` / `failureMessage`:导致本次资源申请失败的归类及具体信息(如 
`RESOURCE_NOT_ENOUGH`、`REQUEST_FAILED` 等)。
+  - `tagFilter`:Job 要求的 Worker 标签(若配置)。
+  - `blockingJobIds[]`:当前占用 Slot 的其他 JobId,用来分析资源竞争。
+  - `pipelines[]`:按 Pipeline 细分:
+    - `pipelineId` / `pipelineName`:
+    - `totalTaskGroups` / `allocatedTaskGroups` / `lackingTaskGroups`:Pipeline 
里 TaskGroup 的总数、已分配 Slot 数量、缺少 Slot 的数量。
+    - `taskGroupDiagnostics[]`:每个 TaskGroup 的 Slot 请求状态:
+      - `taskGroupLocation`(`jobId`, `pipelineId`, `taskGroupId`)。
+      - `taskFullName`:方便直接定位 source/sink。
+      - `allocated`:是否已经成功申请 Slot。
+      - `failureReason` / `failureMessage`:TaskGroup 层面的失败原因。
+  - `lackingTaskGroupDiagnostics[]`:聚合所有 `allocated=false` 的 TaskGroup,方便快速查看缺 
Slot 的具体任务。
+
+</details>
+
+------------------------------------------------------------------------------------------
+
 ### 返回作业的详细信息
 
 <details>
@@ -945,4 +1096,4 @@ curl --location 'http://127.0.0.1:8080/submit-job/upload' 
--form 'config_file=@"
 
 更多关于`Telemetry`的信息可以在[Telemetry](telemetry.md)文档中找到。
 
-</details>
\ No newline at end of file
+</details>
diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/PendingJobsRestIT.java
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/PendingJobsRestIT.java
new file mode 100644
index 0000000000..4d63d871ad
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/PendingJobsRestIT.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.e2e;
+
+import org.apache.seatunnel.common.config.Common;
+import org.apache.seatunnel.common.config.DeployMode;
+import org.apache.seatunnel.engine.client.SeaTunnelClient;
+import org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment;
+import org.apache.seatunnel.engine.client.job.ClientJobProxy;
+import org.apache.seatunnel.engine.common.config.ConfigProvider;
+import org.apache.seatunnel.engine.common.config.JobConfig;
+import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
+import org.apache.seatunnel.engine.common.config.server.ScheduleStrategy;
+import org.apache.seatunnel.engine.common.job.JobStatus;
+import org.apache.seatunnel.engine.server.SeaTunnelServerStarter;
+import org.apache.seatunnel.engine.server.rest.RestConstant;
+
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import com.hazelcast.client.config.ClientConfig;
+import com.hazelcast.instance.impl.HazelcastInstanceImpl;
+import io.restassured.response.Response;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import static io.restassured.RestAssured.given;
+
+@Slf4j
+public class PendingJobsRestIT {
+
+    private static final String HOST = "http://localhost:";;
+    private static final String JOB_FILE = "pending_jobs_streaming.conf";
+
+    private HazelcastInstanceImpl node;
+    private SeaTunnelClient engineClient;
+    private SeaTunnelConfig seaTunnelConfig;
+    private final List<ClientJobProxy> submittedJobs = new ArrayList<>();
+    private int httpPort;
+
+    @BeforeEach
+    void setUp() throws Exception {
+        String testClusterName = TestUtils.getClusterName("PendingJobsRestIT");
+        seaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig();
+        seaTunnelConfig.getHazelcastConfig().setClusterName(testClusterName);
+        
seaTunnelConfig.getEngineConfig().getSlotServiceConfig().setDynamicSlot(false);
+        seaTunnelConfig.getEngineConfig().getSlotServiceConfig().setSlotNum(2);
+        
seaTunnelConfig.getEngineConfig().setScheduleStrategy(ScheduleStrategy.WAIT);
+        seaTunnelConfig.getEngineConfig().getHttpConfig().setEnabled(true);
+        
seaTunnelConfig.getEngineConfig().getHttpConfig().setEnableDynamicPort(false);
+        seaTunnelConfig.getEngineConfig().getHttpConfig().setPort(18082);
+        
seaTunnelConfig.getEngineConfig().getHttpConfig().setContextPath("/seatunnel");
+        httpPort = seaTunnelConfig.getEngineConfig().getHttpConfig().getPort();
+
+        node = SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
+
+        Common.setDeployMode(DeployMode.CLIENT);
+        ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
+        clientConfig.setClusterName(testClusterName);
+        engineClient = new SeaTunnelClient(clientConfig);
+    }
+
+    @AfterEach
+    void tearDown() {
+        submittedJobs.forEach(
+                job -> {
+                    try {
+                        job.cancelJob();
+                    } catch (Exception e) {
+                        log.warn("Failed to cancel job {}: {}", 
job.getJobId(), e.getMessage());
+                    }
+                });
+        submittedJobs.clear();
+        if (engineClient != null) {
+            engineClient.close();
+        }
+        if (node != null) {
+            node.shutdown();
+        }
+    }
+
+    @Test
+    void testPendingJobsEndpoint() {
+        String jobName = "pending_waiting_job";
+        ClientJobProxy pendingJob = submitStreamingJob(jobName);
+        waitForStatus(pendingJob, JobStatus.PENDING);
+
+        assertPendingJobVisible(pendingJob.getJobId(), jobName, 
JobStatus.PENDING);
+    }
+
+    private ClientJobProxy submitStreamingJob(String jobName) {
+        JobConfig jobConfig = new JobConfig();
+        jobConfig.setName(jobName);
+        String filePath = TestUtils.getResource(JOB_FILE);
+        ClientJobExecutionEnvironment env =
+                engineClient.createExecutionContext(filePath, jobConfig, 
seaTunnelConfig);
+        ClientJobProxy jobProxy;
+        try {
+            jobProxy = env.execute();
+        } catch (ExecutionException | InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new RuntimeException("Failed to submit job " + jobName, e);
+        }
+        submittedJobs.add(jobProxy);
+        return jobProxy;
+    }
+
+    private void waitForStatus(ClientJobProxy jobProxy, JobStatus 
expectedStatus) {
+        Awaitility.await()
+                .atMost(120, TimeUnit.SECONDS)
+                .until(() -> jobProxy.getJobStatus() == expectedStatus);
+    }
+
+    private void assertPendingJobVisible(
+            long pendingJobId, String expectedJobName, JobStatus 
expectedJobStatus) {
+        String baseUrl =
+                HOST
+                        + httpPort
+                        + 
seaTunnelConfig.getEngineConfig().getHttpConfig().getContextPath()
+                        + RestConstant.REST_URL_PENDING_JOBS;
+        Awaitility.await()
+                .atMost(60, TimeUnit.SECONDS)
+                .untilAsserted(
+                        () -> {
+                            Response response =
+                                    given().get(baseUrl)
+                                            .then()
+                                            .statusCode(200)
+                                            .extract()
+                                            .response();
+                            List<Map<String, Object>> pendingJobs =
+                                    response.jsonPath().getList("pendingJobs");
+                            Assertions.assertNotNull(pendingJobs);
+                            Map<String, Object> job =
+                                    pendingJobs.stream()
+                                            .filter(
+                                                    pendingJob ->
+                                                            ((Number)
+                                                                               
     pendingJob.get(
+                                                                               
             RestConstant
+                                                                               
                     .JOB_ID))
+                                                                            
.longValue()
+                                                                    == 
pendingJobId)
+                                            .findFirst()
+                                            .orElseThrow(
+                                                    () ->
+                                                            new AssertionError(
+                                                                    "Pending 
job "
+                                                                            + 
pendingJobId
+                                                                            + 
" not found"));
+                            Assertions.assertEquals(
+                                    expectedJobName, 
job.get(RestConstant.JOB_NAME));
+                            Assertions.assertEquals(
+                                    expectedJobStatus.name(), 
job.get(RestConstant.JOB_STATUS));
+                        });
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/pending_jobs_streaming.conf
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/pending_jobs_streaming.conf
new file mode 100644
index 0000000000..4317278dd2
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/pending_jobs_streaming.conf
@@ -0,0 +1,42 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  job.mode = "STREAMING"
+}
+
+source {
+  FakeSource {
+    parallelism = 2
+    row.num = 1000000
+    split.read-interval = 100
+    schema = {
+      fields {
+        c_int = int
+      }
+    }
+  }
+}
+
+transform {
+}
+
+sink {
+  InMemory {
+    writer_sleep = true
+  }
+}
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
index 505517f6c7..7c35bd7ec6 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
@@ -47,6 +47,10 @@ import org.apache.seatunnel.engine.core.job.PipelineStatus;
 import org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex;
 import org.apache.seatunnel.engine.server.dag.physical.PipelineLocation;
 import org.apache.seatunnel.engine.server.dag.physical.SubPlan;
+import 
org.apache.seatunnel.engine.server.diagnostic.PendingDiagnosticsCollector;
+import org.apache.seatunnel.engine.server.diagnostic.PendingJobDiagnostic;
+import org.apache.seatunnel.engine.server.diagnostic.PendingJobsResponse;
+import org.apache.seatunnel.engine.server.diagnostic.PendingQueueSummary;
 import org.apache.seatunnel.engine.server.event.JobEventHttpReportHandler;
 import org.apache.seatunnel.engine.server.event.JobEventProcessor;
 import org.apache.seatunnel.engine.server.execution.ExecutionState;
@@ -82,7 +86,9 @@ import com.hazelcast.spi.impl.NodeEngineImpl;
 import lombok.NonNull;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -261,6 +267,17 @@ public class CoordinatorService {
 
         boolean preApplyResources = jobMaster.preApplyResources();
         if (!preApplyResources) {
+            try {
+                PendingJobDiagnostic diagnostic =
+                        PendingDiagnosticsCollector.collectJobDiagnostic(
+                                pendingJobInfo, Collections.emptyMap(), 
getResourceManager());
+                pendingJobInfo.recordSnapshot(diagnostic);
+            } catch (Exception e) {
+                logger.warning(
+                        String.format(
+                                "Collect pending diagnostic for job %s failed: 
%s",
+                                jobId, ExceptionUtils.getMessage(e)));
+            }
             logger.info(
                     String.format(
                             "Current strategy is %s, and resources is not 
enough, skipping this schedule, JobID: %s",
@@ -1074,6 +1091,72 @@ public class CoordinatorService {
         return connectorPackageService;
     }
 
+    public PendingJobsResponse getPendingJobs(Map<String, String> tags, Long 
jobId, int limit) {
+        Collection<PendingJobInfo> allPendingJobs =
+                new ArrayList<>(pendingJobQueue.getJobIdMap().values());
+
+        List<PendingJobInfo> selectedJobs = new ArrayList<>();
+        if (jobId != null) {
+            PendingJobInfo pendingJobInfo = pendingJobQueue.getById(jobId);
+            if (pendingJobInfo != null) {
+                selectedJobs.add(pendingJobInfo);
+            }
+        } else {
+            selectedJobs.addAll(allPendingJobs);
+            
selectedJobs.sort(Comparator.comparingLong(PendingJobInfo::getEnqueueTimestamp));
+            if (limit > 0 && selectedJobs.size() > limit) {
+                selectedJobs = new ArrayList<>(selectedJobs.subList(0, limit));
+            }
+        }
+
+        ResourceManager resourceManager = getResourceManager();
+        List<PendingJobDiagnostic> diagnostics = new ArrayList<>();
+        for (PendingJobInfo jobInfo : selectedJobs) {
+            PendingJobDiagnostic diagnostic = jobInfo.getLastSnapshot();
+            if (diagnostic == null) {
+                diagnostic =
+                        PendingDiagnosticsCollector.collectJobDiagnostic(
+                                jobInfo, tags, resourceManager);
+                if (diagnostic != null) {
+                    diagnostic.setCheckCount(jobInfo.getCheckTimes());
+                }
+            }
+            if (diagnostic != null) {
+                diagnostics.add(diagnostic);
+            }
+        }
+
+        PendingJobsResponse response = new PendingJobsResponse();
+        response.setPendingJobs(diagnostics);
+        response.setClusterSnapshot(
+                
PendingDiagnosticsCollector.collectClusterSnapshot(resourceManager, tags));
+        response.setQueueSummary(buildQueueSummary(allPendingJobs, 
diagnostics));
+        return response;
+    }
+
+    private PendingQueueSummary buildQueueSummary(
+            Collection<PendingJobInfo> pendingJobs, List<PendingJobDiagnostic> 
diagnostics) {
+        PendingQueueSummary summary = new PendingQueueSummary();
+        summary.setSize(pendingJobQueue.size());
+        summary.setScheduleStrategy(scheduleStrategy.name());
+        summary.setLackingTaskGroups(
+                
diagnostics.stream().mapToInt(PendingJobDiagnostic::getLackingTaskGroups).sum());
+
+        if (!pendingJobs.isEmpty()) {
+            summary.setOldestEnqueueTimestamp(
+                    pendingJobs.stream()
+                            .mapToLong(PendingJobInfo::getEnqueueTimestamp)
+                            .min()
+                            .orElse(0L));
+            summary.setNewestEnqueueTimestamp(
+                    pendingJobs.stream()
+                            .mapToLong(PendingJobInfo::getEnqueueTimestamp)
+                            .max()
+                            .orElse(0L));
+        }
+        return summary;
+    }
+
     public int getPendingJobCount() {
         return pendingJobQueue.getJobIdMap().size();
     }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/JettyService.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/JettyService.java
index 1d98f25843..8d06b827eb 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/JettyService.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/JettyService.java
@@ -37,6 +37,7 @@ import 
org.apache.seatunnel.engine.server.rest.servlet.FinishedJobsServlet;
 import org.apache.seatunnel.engine.server.rest.servlet.JobInfoServlet;
 import org.apache.seatunnel.engine.server.rest.servlet.MetricsServlet;
 import org.apache.seatunnel.engine.server.rest.servlet.OverviewServlet;
+import org.apache.seatunnel.engine.server.rest.servlet.PendingJobsServlet;
 import org.apache.seatunnel.engine.server.rest.servlet.RunningJobsServlet;
 import org.apache.seatunnel.engine.server.rest.servlet.RunningThreadsServlet;
 import org.apache.seatunnel.engine.server.rest.servlet.StopJobServlet;
@@ -70,6 +71,7 @@ import static 
org.apache.seatunnel.engine.server.rest.RestConstant.REST_URL_LOGS
 import static 
org.apache.seatunnel.engine.server.rest.RestConstant.REST_URL_METRICS;
 import static 
org.apache.seatunnel.engine.server.rest.RestConstant.REST_URL_OPEN_METRICS;
 import static 
org.apache.seatunnel.engine.server.rest.RestConstant.REST_URL_OVERVIEW;
+import static 
org.apache.seatunnel.engine.server.rest.RestConstant.REST_URL_PENDING_JOBS;
 import static 
org.apache.seatunnel.engine.server.rest.RestConstant.REST_URL_RUNNING_JOB;
 import static 
org.apache.seatunnel.engine.server.rest.RestConstant.REST_URL_RUNNING_JOBS;
 import static 
org.apache.seatunnel.engine.server.rest.RestConstant.REST_URL_RUNNING_THREADS;
@@ -174,6 +176,7 @@ public class JettyService {
 
         ServletHolder overviewHolder = new ServletHolder(new 
OverviewServlet(nodeEngine));
         ServletHolder runningJobsHolder = new ServletHolder(new 
RunningJobsServlet(nodeEngine));
+        ServletHolder pendingJobsHolder = new ServletHolder(new 
PendingJobsServlet(nodeEngine));
         ServletHolder finishedJobsHolder = new ServletHolder(new 
FinishedJobsServlet(nodeEngine));
         ServletHolder systemMonitoringHolder =
                 new ServletHolder(new SystemMonitoringServlet(nodeEngine));
@@ -203,6 +206,7 @@ public class JettyService {
 
         context.addServlet(overviewHolder, 
convertUrlToPath(REST_URL_OVERVIEW));
         context.addServlet(runningJobsHolder, 
convertUrlToPath(REST_URL_RUNNING_JOBS));
+        context.addServlet(pendingJobsHolder, 
convertUrlToPath(REST_URL_PENDING_JOBS));
         context.addServlet(finishedJobsHolder, 
convertUrlToPath(REST_URL_FINISHED_JOBS));
         context.addServlet(
                 systemMonitoringHolder, 
convertUrlToPath(REST_URL_SYSTEM_MONITORING_INFORMATION));
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/diagnostic/PendingClusterSnapshot.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/diagnostic/PendingClusterSnapshot.java
new file mode 100644
index 0000000000..c6272acb19
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/diagnostic/PendingClusterSnapshot.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.diagnostic;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class PendingClusterSnapshot implements Serializable {
+    private int totalSlots;
+    private int freeSlots;
+    private int assignedSlots;
+    private int workerCount;
+    private List<WorkerResourceDiagnostic> workers = new ArrayList<>();
+}
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/diagnostic/PendingDiagnosticsCollector.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/diagnostic/PendingDiagnosticsCollector.java
new file mode 100644
index 0000000000..4044307477
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/diagnostic/PendingDiagnosticsCollector.java
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.diagnostic;
+
+import org.apache.seatunnel.common.utils.ExceptionUtils;
+import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
+import org.apache.seatunnel.engine.server.dag.physical.PhysicalPlan;
+import org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex;
+import org.apache.seatunnel.engine.server.dag.physical.SubPlan;
+import org.apache.seatunnel.engine.server.execution.PendingJobInfo;
+import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
+import org.apache.seatunnel.engine.server.master.JobMaster;
+import org.apache.seatunnel.engine.server.resourcemanager.ResourceManager;
+import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
+import 
org.apache.seatunnel.engine.server.resourcemanager.resource.SystemLoadInfo;
+import org.apache.seatunnel.engine.server.resourcemanager.worker.WorkerProfile;
+
+import com.hazelcast.cluster.Address;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletionException;
+import java.util.stream.Collectors;
+
+@Slf4j
+public final class PendingDiagnosticsCollector {
+
+    private static final String REASON_WAITING = "WAITING_SLOT_ASSIGNMENT";
+    private static final String REASON_RESOURCE_NOT_ENOUGH = 
"RESOURCE_NOT_ENOUGH";
+    private static final String REASON_REQUEST_FAILED = "REQUEST_FAILED";
+    private static final String REASON_REQUEST_CANCELLED = "REQUEST_CANCELLED";
+
+    private PendingDiagnosticsCollector() {}
+
+    public static PendingJobDiagnostic collectJobDiagnostic(
+            PendingJobInfo pendingJobInfo,
+            Map<String, String> tagFilter,
+            ResourceManager resourceManager) {
+        if (pendingJobInfo == null) {
+            return null;
+        }
+        JobMaster jobMaster = pendingJobInfo.getJobMaster();
+        PendingJobDiagnostic diagnostic = new PendingJobDiagnostic();
+        diagnostic.setJobId(jobMaster.getJobId());
+        
diagnostic.setJobName(jobMaster.getJobImmutableInformation().getJobName());
+        
diagnostic.setPendingSourceState(pendingJobInfo.getPendingSourceState());
+        diagnostic.setJobStatus(jobMaster.getJobStatus());
+        diagnostic.setEnqueueTimestamp(pendingJobInfo.getEnqueueTimestamp());
+        diagnostic.setCheckTime(System.currentTimeMillis());
+        diagnostic.setWaitDurationMs(
+                diagnostic.getCheckTime() - 
pendingJobInfo.getEnqueueTimestamp());
+        diagnostic.setTagFilter(
+                tagFilter == null ? Collections.emptyMap() : new 
HashMap<>(tagFilter));
+        Map<TaskGroupLocation, CompletableFuture<SlotProfile>> requestFutures =
+                Optional.ofNullable(jobMaster.getPhysicalPlan())
+                        .map(PhysicalPlan::getPreApplyResourceFutures)
+                        .map(HashMap::new)
+                        .orElseGet(HashMap::new);
+
+        buildPipelineDiagnostics(jobMaster, requestFutures, diagnostic);
+        diagnostic.setTotalTaskGroups(
+                diagnostic.getPipelines().stream()
+                        
.mapToInt(PendingPipelineDiagnostic::getTotalTaskGroups)
+                        .sum());
+        diagnostic.setAllocatedTaskGroups(
+                diagnostic.getPipelines().stream()
+                        
.mapToInt(PendingPipelineDiagnostic::getAllocatedTaskGroups)
+                        .sum());
+        diagnostic.setLackingTaskGroups(
+                diagnostic.getPipelines().stream()
+                        
.mapToInt(PendingPipelineDiagnostic::getLackingTaskGroups)
+                        .sum());
+
+        updateFailureReason(diagnostic);
+        diagnostic.setBlockingJobIds(
+                collectBlockingJobs(resourceManager, jobMaster.getJobId(), 
tagFilter));
+
+        return diagnostic;
+    }
+
+    private static void buildPipelineDiagnostics(
+            JobMaster jobMaster,
+            Map<TaskGroupLocation, CompletableFuture<SlotProfile>> 
requestFutures,
+            PendingJobDiagnostic diagnostic) {
+        PhysicalPlan plan = jobMaster.getPhysicalPlan();
+        if (plan == null) {
+            diagnostic.setFailureReason(REASON_WAITING);
+            diagnostic.setFailureMessage("Job master not initialized");
+            return;
+        }
+        for (SubPlan subPlan : plan.getPipelineList()) {
+            PendingPipelineDiagnostic pipelineDiagnostic = new 
PendingPipelineDiagnostic();
+            pipelineDiagnostic.setPipelineId(subPlan.getPipelineId());
+            pipelineDiagnostic.setPipelineName(subPlan.getPipelineFullName());
+
+            List<PhysicalVertex> vertices = new ArrayList<>();
+            vertices.addAll(subPlan.getCoordinatorVertexList());
+            vertices.addAll(subPlan.getPhysicalVertexList());
+
+            int allocated = 0;
+            int lacking = 0;
+            for (PhysicalVertex vertex : vertices) {
+                TaskGroupLocation location = vertex.getTaskGroupLocation();
+                PendingTaskGroupDiagnostic taskDiagnostic =
+                        buildTaskDiagnostic(
+                                location, vertex.getTaskFullName(), 
requestFutures.get(location));
+                
pipelineDiagnostic.getTaskGroupDiagnostics().add(taskDiagnostic);
+                if (taskDiagnostic.isAllocated()) {
+                    allocated++;
+                } else {
+                    lacking++;
+                    
diagnostic.getLackingTaskGroupDiagnostics().add(taskDiagnostic);
+                }
+            }
+
+            pipelineDiagnostic.setTotalTaskGroups(vertices.size());
+            pipelineDiagnostic.setAllocatedTaskGroups(allocated);
+            pipelineDiagnostic.setLackingTaskGroups(lacking);
+            diagnostic.getPipelines().add(pipelineDiagnostic);
+        }
+    }
+
+    private static PendingTaskGroupDiagnostic buildTaskDiagnostic(
+            TaskGroupLocation location,
+            String taskFullName,
+            CompletableFuture<SlotProfile> future) {
+        PendingTaskGroupDiagnostic diagnostic = new 
PendingTaskGroupDiagnostic();
+        diagnostic.setTaskGroupLocation(location);
+        diagnostic.setTaskFullName(taskFullName);
+
+        if (future == null) {
+            diagnostic.setAllocated(false);
+            diagnostic.setFailureReason(REASON_RESOURCE_NOT_ENOUGH);
+            diagnostic.setFailureMessage("Slot request future not created");
+            return diagnostic;
+        }
+
+        if (future.isCancelled()) {
+            diagnostic.setAllocated(false);
+            diagnostic.setFailureReason(REASON_REQUEST_CANCELLED);
+            diagnostic.setFailureMessage("Slot request cancelled by resource 
manager");
+            return diagnostic;
+        }
+
+        if (!future.isDone()) {
+            diagnostic.setAllocated(false);
+            diagnostic.setFailureReason(REASON_WAITING);
+            diagnostic.setFailureMessage("Slot request still pending");
+            return diagnostic;
+        }
+        try {
+            SlotProfile slotProfile = future.join();
+            if (slotProfile != null) {
+                diagnostic.setAllocated(true);
+                return diagnostic;
+            }
+            diagnostic.setAllocated(false);
+            diagnostic.setFailureReason(REASON_RESOURCE_NOT_ENOUGH);
+            diagnostic.setFailureMessage("No available slot profile");
+        } catch (CompletionException e) {
+            diagnostic.setAllocated(false);
+            diagnostic.setFailureReason(REASON_REQUEST_FAILED);
+            diagnostic.setFailureMessage(ExceptionUtils.getMessage(e));
+        }
+        return diagnostic;
+    }
+
+    private static void updateFailureReason(PendingJobDiagnostic diagnostic) {
+        if (diagnostic.getLackingTaskGroupDiagnostics().isEmpty()) {
+            if (diagnostic.getFailureReason() == null) {
+                diagnostic.setFailureReason(REASON_WAITING);
+                diagnostic.setFailureMessage("Job is waiting for scheduler to 
retry");
+            }
+            return;
+        }
+
+        Map<String, Long> reasonCounter =
+                diagnostic.getLackingTaskGroupDiagnostics().stream()
+                        .collect(
+                                Collectors.groupingBy(
+                                        
PendingTaskGroupDiagnostic::getFailureReason,
+                                        Collectors.counting()));
+        String dominantReason =
+                reasonCounter.entrySet().stream()
+                        .max(Map.Entry.comparingByValue())
+                        .map(Map.Entry::getKey)
+                        .orElse(REASON_RESOURCE_NOT_ENOUGH);
+        diagnostic.setFailureReason(dominantReason);
+        diagnostic.setFailureMessage(
+                diagnostic.getLackingTaskGroupDiagnostics().stream()
+                        .filter(diag -> 
dominantReason.equals(diag.getFailureReason()))
+                        .map(PendingTaskGroupDiagnostic::getFailureMessage)
+                        .filter(message -> message != null && 
!message.isEmpty())
+                        .distinct()
+                        .collect(Collectors.joining("; ")));
+    }
+
+    private static List<Long> collectBlockingJobs(
+            ResourceManager resourceManager, long jobId, Map<String, String> 
tagFilter) {
+        if (resourceManager == null) {
+            return Collections.emptyList();
+        }
+        Map<String, String> tags =
+                tagFilter == null ? Collections.emptyMap() : new 
HashMap<>(tagFilter);
+        List<SlotProfile> assignedSlots = Collections.emptyList();
+        try {
+            assignedSlots = resourceManager.getAssignedSlots(tags);
+        } catch (Exception e) {
+            log.warn("Collect assigned slots failed: {}", 
ExceptionUtils.getMessage(e));
+        }
+        Set<Long> blocking = new HashSet<>();
+        for (SlotProfile slotProfile : assignedSlots) {
+            long ownerId = slotProfile.getOwnerJobID();
+            if (ownerId > 0 && ownerId != jobId) {
+                blocking.add(ownerId);
+            }
+        }
+        return new ArrayList<>(blocking);
+    }
+
+    public static PendingClusterSnapshot collectClusterSnapshot(
+            ResourceManager resourceManager, Map<String, String> tagFilter) {
+        PendingClusterSnapshot snapshot = new PendingClusterSnapshot();
+        if (resourceManager == null) {
+            return snapshot;
+        }
+        Map<String, String> tags =
+                tagFilter == null ? Collections.emptyMap() : new 
HashMap<>(tagFilter);
+        List<SlotProfile> assignedSlots = Collections.emptyList();
+        List<SlotProfile> unassignedSlots = Collections.emptyList();
+        try {
+            assignedSlots = resourceManager.getAssignedSlots(tags);
+            unassignedSlots = resourceManager.getUnassignedSlots(tags);
+        } catch (Exception e) {
+            log.warn("Collect slots info failed: {}", 
ExceptionUtils.getMessage(e));
+        }
+        snapshot.setAssignedSlots(assignedSlots.size());
+        snapshot.setFreeSlots(unassignedSlots.size());
+        snapshot.setTotalSlots(assignedSlots.size() + unassignedSlots.size());
+        try {
+            snapshot.setWorkerCount(resourceManager.workerCount(tags));
+        } catch (Exception e) {
+            log.warn("Collect worker count failed: {}", 
ExceptionUtils.getMessage(e));
+        }
+        snapshot.setWorkers(buildWorkerSnapshots(resourceManager, tags));
+        return snapshot;
+    }
+
+    private static List<WorkerResourceDiagnostic> buildWorkerSnapshots(
+            ResourceManager resourceManager, Map<String, String> tagFilter) {
+        if (resourceManager == null) {
+            return Collections.emptyList();
+        }
+        Map<Address, WorkerProfile> registerWorker =
+                Optional.ofNullable(resourceManager.getRegisterWorker())
+                        .map(HashMap::new)
+                        .orElseGet(HashMap::new);
+        return registerWorker.values().stream()
+                .map(worker -> convertWorker(worker, tagFilter))
+                .collect(Collectors.toList());
+    }
+
+    /**
+     * TODO The current tagFilter does not actually filter. When the cluster 
is particularly large,
+     * tagFilter filtering should be supported, and it will be supported in 
the future
+     */
+    private static WorkerResourceDiagnostic convertWorker(
+            WorkerProfile workerProfile, Map<String, String> tagFilter) {
+        WorkerResourceDiagnostic diagnostic = new WorkerResourceDiagnostic();
+        if (workerProfile == null) {
+            return diagnostic;
+        }
+        Address address = workerProfile.getAddress();
+        diagnostic.setAddress(address == null ? "UNKNOWN" : 
address.toString());
+        if (workerProfile.getAttributes() != null) {
+            diagnostic.setTags(new HashMap<>(workerProfile.getAttributes()));
+        } else {
+            diagnostic.setTags(Collections.emptyMap());
+        }
+        diagnostic.setDynamicSlot(workerProfile.isDynamicSlot());
+        int assignedSlots =
+                workerProfile.getAssignedSlots() == null
+                        ? 0
+                        : workerProfile.getAssignedSlots().length;
+        int unassignedSlots =
+                workerProfile.getUnassignedSlots() == null
+                        ? 0
+                        : workerProfile.getUnassignedSlots().length;
+        diagnostic.setTotalSlots(assignedSlots + unassignedSlots);
+        diagnostic.setFreeSlots(unassignedSlots);
+        SystemLoadInfo systemLoadInfo = workerProfile.getSystemLoadInfo();
+        if (systemLoadInfo != null) {
+            diagnostic.setCpuUsage(systemLoadInfo.getCpuPercentage());
+            diagnostic.setMemUsage(systemLoadInfo.getMemPercentage());
+        }
+        if (workerProfile.getAssignedSlots() != null) {
+            List<Long> runningJobs =
+                    java.util.Arrays.stream(workerProfile.getAssignedSlots())
+                            .filter(slot -> slot != null && 
slot.getOwnerJobID() > 0)
+                            .map(SlotProfile::getOwnerJobID)
+                            .distinct()
+                            .collect(Collectors.toList());
+            diagnostic.setRunningJobIds(runningJobs);
+        }
+        return diagnostic;
+    }
+}
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/diagnostic/PendingJobDiagnostic.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/diagnostic/PendingJobDiagnostic.java
new file mode 100644
index 0000000000..be7e945d9e
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/diagnostic/PendingJobDiagnostic.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.diagnostic;
+
+import org.apache.seatunnel.engine.common.job.JobStatus;
+import org.apache.seatunnel.engine.server.execution.PendingSourceState;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class PendingJobDiagnostic implements Serializable {
+    private long jobId;
+    private String jobName;
+    private PendingSourceState pendingSourceState;
+    private JobStatus jobStatus;
+    private long enqueueTimestamp;
+    private long checkTime;
+    private long waitDurationMs;
+    private int checkCount;
+    private int totalTaskGroups;
+    private int allocatedTaskGroups;
+    private int lackingTaskGroups;
+    private String failureReason;
+    private String failureMessage;
+    private Map<String, String> tagFilter;
+    private List<Long> blockingJobIds = new ArrayList<>();
+    private List<PendingPipelineDiagnostic> pipelines = new ArrayList<>();
+    private List<PendingTaskGroupDiagnostic> lackingTaskGroupDiagnostics = new 
ArrayList<>();
+}
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/diagnostic/PendingJobsResponse.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/diagnostic/PendingJobsResponse.java
new file mode 100644
index 0000000000..a05fa4faaa
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/diagnostic/PendingJobsResponse.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.diagnostic;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class PendingJobsResponse implements Serializable {
+    private PendingQueueSummary queueSummary;
+    private PendingClusterSnapshot clusterSnapshot;
+    private List<PendingJobDiagnostic> pendingJobs = new ArrayList<>();
+}
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/diagnostic/PendingPipelineDiagnostic.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/diagnostic/PendingPipelineDiagnostic.java
new file mode 100644
index 0000000000..4b3bc66c6b
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/diagnostic/PendingPipelineDiagnostic.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.diagnostic;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class PendingPipelineDiagnostic implements Serializable {
+    private int pipelineId;
+    private String pipelineName;
+    private int totalTaskGroups;
+    private int allocatedTaskGroups;
+    private int lackingTaskGroups;
+    private List<PendingTaskGroupDiagnostic> taskGroupDiagnostics = new 
ArrayList<>();
+}
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/diagnostic/PendingQueueSummary.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/diagnostic/PendingQueueSummary.java
new file mode 100644
index 0000000000..61d6223463
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/diagnostic/PendingQueueSummary.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.diagnostic;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.io.Serializable;
+
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class PendingQueueSummary implements Serializable {
+    private int size;
+    private String scheduleStrategy;
+    private long oldestEnqueueTimestamp;
+    private long newestEnqueueTimestamp;
+    private int lackingTaskGroups;
+}
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/diagnostic/PendingTaskGroupDiagnostic.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/diagnostic/PendingTaskGroupDiagnostic.java
new file mode 100644
index 0000000000..f3a0404114
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/diagnostic/PendingTaskGroupDiagnostic.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.diagnostic;
+
+import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.io.Serializable;
+
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class PendingTaskGroupDiagnostic implements Serializable {
+
+    private TaskGroupLocation taskGroupLocation;
+    private String taskFullName;
+    private boolean allocated;
+    private String failureReason;
+    private String failureMessage;
+}
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/diagnostic/WorkerResourceDiagnostic.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/diagnostic/WorkerResourceDiagnostic.java
new file mode 100644
index 0000000000..17905516bb
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/diagnostic/WorkerResourceDiagnostic.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.diagnostic;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class WorkerResourceDiagnostic implements Serializable {
+    private String address;
+    private Map<String, String> tags;
+    private int totalSlots;
+    private int freeSlots;
+    private boolean dynamicSlot;
+    private Double cpuUsage;
+    private Double memUsage;
+    private List<Long> runningJobIds;
+}
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/PendingJobInfo.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/PendingJobInfo.java
index c17ab3284e..a7d7476d1d 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/PendingJobInfo.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/PendingJobInfo.java
@@ -18,15 +18,24 @@
 
 package org.apache.seatunnel.engine.server.execution;
 
+import org.apache.seatunnel.engine.server.diagnostic.PendingJobDiagnostic;
 import org.apache.seatunnel.engine.server.master.JobMaster;
 
+import java.util.concurrent.atomic.AtomicInteger;
+
 public class PendingJobInfo {
     private final PendingSourceState pendingSourceState;
     private final JobMaster jobMaster;
+    private final long enqueueTimestamp;
+    private final AtomicInteger checkTimes = new AtomicInteger();
+    private volatile long lastCheckTime;
+    private volatile PendingJobDiagnostic lastSnapshot;
 
     public PendingJobInfo(PendingSourceState pendingSourceState, JobMaster 
jobMaster) {
         this.pendingSourceState = pendingSourceState;
         this.jobMaster = jobMaster;
+        this.enqueueTimestamp = System.currentTimeMillis();
+        this.lastCheckTime = enqueueTimestamp;
     }
 
     public PendingSourceState getPendingSourceState() {
@@ -40,4 +49,30 @@ public class PendingJobInfo {
     public Long getJobId() {
         return jobMaster.getJobId();
     }
+
+    public long getEnqueueTimestamp() {
+        return enqueueTimestamp;
+    }
+
+    public long getLastCheckTime() {
+        return lastCheckTime;
+    }
+
+    public int getCheckTimes() {
+        return checkTimes.get();
+    }
+
+    public PendingJobDiagnostic getLastSnapshot() {
+        return lastSnapshot;
+    }
+
+    public void recordSnapshot(PendingJobDiagnostic snapshot) {
+        if (snapshot == null) {
+            return;
+        }
+        this.lastSnapshot = snapshot;
+        this.lastCheckTime = snapshot.getCheckTime();
+        int current = this.checkTimes.incrementAndGet();
+        snapshot.setCheckCount(current);
+    }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/opeartion/GetPendingJobsOperation.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/opeartion/GetPendingJobsOperation.java
new file mode 100644
index 0000000000..70f6e2d412
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/opeartion/GetPendingJobsOperation.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.resourcemanager.opeartion;
+
+import org.apache.seatunnel.engine.server.SeaTunnelServer;
+import org.apache.seatunnel.engine.server.diagnostic.PendingJobsResponse;
+import 
org.apache.seatunnel.engine.server.serializable.ResourceDataSerializerHook;
+
+import com.hazelcast.nio.ObjectDataInput;
+import com.hazelcast.nio.ObjectDataOutput;
+import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
+import com.hazelcast.spi.impl.operationservice.Operation;
+
+import java.io.IOException;
+import java.util.Map;
+
+public class GetPendingJobsOperation extends Operation implements 
IdentifiedDataSerializable {
+
+    private Map<String, String> tags;
+    private Long jobId;
+    private int limit;
+    private PendingJobsResponse response;
+
+    public GetPendingJobsOperation() {}
+
+    public GetPendingJobsOperation(Map<String, String> tags, Long jobId, int 
limit) {
+        this.tags = tags;
+        this.jobId = jobId;
+        this.limit = limit;
+    }
+
+    @Override
+    public void run() throws Exception {
+        SeaTunnelServer server = getService();
+        response = server.getCoordinatorService().getPendingJobs(tags, jobId, 
limit);
+    }
+
+    @Override
+    public Object getResponse() {
+        return response;
+    }
+
+    @Override
+    public int getFactoryId() {
+        return ResourceDataSerializerHook.FACTORY_ID;
+    }
+
+    @Override
+    public int getClassId() {
+        return ResourceDataSerializerHook.GET_PENDING_JOBS_TYPE;
+    }
+
+    @Override
+    public String getServiceName() {
+        return SeaTunnelServer.SERVICE_NAME;
+    }
+
+    @Override
+    protected void writeInternal(ObjectDataOutput out) throws IOException {
+        super.writeInternal(out);
+        out.writeObject(tags);
+        out.writeObject(jobId);
+        out.writeInt(limit);
+    }
+
+    @Override
+    protected void readInternal(ObjectDataInput in) throws IOException {
+        super.readInternal(in);
+        tags = in.readObject();
+        jobId = in.readObject();
+        limit = in.readInt();
+    }
+}
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java
index f3e28e1329..8784f31d98 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java
@@ -48,6 +48,7 @@ public class RestConstant {
     public static final String ERROR_MSG = "errorMsg";
 
     public static final String METRICS = "metrics";
+    public static final String LIMIT = "limit";
 
     public static final String TABLE_SOURCE_RECEIVED_COUNT = 
"TableSourceReceivedCount";
     public static final String TABLE_SINK_WRITE_COUNT = "TableSinkWriteCount";
@@ -62,6 +63,8 @@ public class RestConstant {
     public static final String CONTEXT_PATH = "/hazelcast/rest/maps";
     public static final String INSTANCE_CONTEXT_PATH = 
"/hazelcast/rest/instance";
 
+    public static final String PRETTY = "pretty";
+
     // api path start
     public static final String REST_URL_OVERVIEW = "/overview";
     public static final String REST_URL_RUNNING_JOBS = "/running-jobs";
@@ -82,6 +85,7 @@ public class RestConstant {
     public static final String REST_URL_STOP_JOB = "/stop-job";
     public static final String REST_URL_STOP_JOBS = "/stop-jobs";
     public static final String REST_URL_UPDATE_TAGS = "/update-tags";
+    public static final String REST_URL_PENDING_JOBS = "/pending-jobs";
     // Get All Nodes Log
     public static final String REST_URL_LOGS = "/logs";
     // Get Current Node Log
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/PendingJobsService.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/PendingJobsService.java
new file mode 100644
index 0000000000..c08894b0ee
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/PendingJobsService.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.rest.service;
+
+import org.apache.seatunnel.engine.server.SeaTunnelServer;
+import org.apache.seatunnel.engine.server.diagnostic.PendingJobsResponse;
+import 
org.apache.seatunnel.engine.server.resourcemanager.opeartion.GetPendingJobsOperation;
+import org.apache.seatunnel.engine.server.utils.NodeEngineUtil;
+
+import com.hazelcast.spi.impl.NodeEngineImpl;
+
+import java.util.Map;
+
+public class PendingJobsService extends BaseService {
+
+    public PendingJobsService(NodeEngineImpl nodeEngine) {
+        super(nodeEngine);
+    }
+
+    public PendingJobsResponse getPendingJobs(Map<String, String> tags, Long 
jobId, int limit) {
+        SeaTunnelServer seaTunnelServer = getSeaTunnelServer(true);
+        if (seaTunnelServer == null) {
+            return (PendingJobsResponse)
+                    NodeEngineUtil.sendOperationToMasterNode(
+                                    nodeEngine, new 
GetPendingJobsOperation(tags, jobId, limit))
+                            .join();
+        }
+        return seaTunnelServer.getCoordinatorService().getPendingJobs(tags, 
jobId, limit);
+    }
+}
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/PendingJobsServlet.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/PendingJobsServlet.java
new file mode 100644
index 0000000000..87d06f48d9
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/PendingJobsServlet.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.rest.servlet;
+
+import org.apache.seatunnel.engine.server.diagnostic.PendingJobsResponse;
+import org.apache.seatunnel.engine.server.rest.RestConstant;
+import org.apache.seatunnel.engine.server.rest.service.PendingJobsService;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonPrimitive;
+import com.hazelcast.spi.impl.NodeEngineImpl;
+import lombok.extern.slf4j.Slf4j;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+@Slf4j
+public class PendingJobsServlet extends BaseServlet {
+
+    private final PendingJobsService pendingJobsService;
+    private static final Set<String> TIMESTAMP_FIELDS =
+            new HashSet<>(
+                    Arrays.asList(
+                            "oldestEnqueueTimestamp",
+                            "newestEnqueueTimestamp",
+                            "enqueueTimestamp",
+                            "checkTime"));
+    private static final DateTimeFormatter PRETTY_TIME_FORMATTER =
+            DateTimeFormatter.ofPattern("yyyy-MM-dd 
HH:mm:ss").withZone(ZoneId.systemDefault());
+    private static final Gson PRETTY_GSON = new 
GsonBuilder().setPrettyPrinting().create();
+
+    public PendingJobsServlet(NodeEngineImpl nodeEngine) {
+        super(nodeEngine);
+        this.pendingJobsService = new PendingJobsService(nodeEngine);
+    }
+
+    @Override
+    protected void doGet(HttpServletRequest req, HttpServletResponse resp)
+            throws ServletException, IOException {
+
+        Map<String, String> params = new HashMap<>(getParameterMap(req));
+        Long jobId = null;
+        int limit = 0;
+        boolean pretty = false;
+        if (params.containsKey(RestConstant.JOB_ID)) {
+            try {
+                jobId = Long.parseLong(params.remove(RestConstant.JOB_ID));
+            } catch (NumberFormatException e) {
+                resp.sendError(HttpServletResponse.SC_BAD_REQUEST, "Invalid 
jobId");
+                return;
+            }
+        }
+
+        if (params.containsKey(RestConstant.LIMIT)) {
+            try {
+                limit = Integer.parseInt(params.remove(RestConstant.LIMIT));
+            } catch (NumberFormatException e) {
+                resp.sendError(HttpServletResponse.SC_BAD_REQUEST, "Invalid 
limit");
+                return;
+            }
+        }
+
+        if (params.containsKey(RestConstant.PRETTY)) {
+            pretty = Boolean.parseBoolean(params.remove(RestConstant.PRETTY));
+        }
+
+        PendingJobsResponse response = 
pendingJobsService.getPendingJobs(params, jobId, limit);
+        if (pretty) {
+            writePrettyResponse(resp, response);
+        } else {
+            writeJson(resp, response);
+        }
+    }
+
+    private void writePrettyResponse(HttpServletResponse resp, 
PendingJobsResponse response)
+            throws IOException {
+        JsonElement tree = PRETTY_GSON.toJsonTree(response);
+        formatTimestampFields(tree);
+        resp.setCharacterEncoding("UTF-8");
+        resp.setContentType("application/json; charset=UTF-8");
+        resp.getWriter().write(PRETTY_GSON.toJson(tree));
+    }
+
+    private void formatTimestampFields(JsonElement element) {
+        if (element == null || element.isJsonNull()) {
+            return;
+        }
+        if (element.isJsonObject()) {
+            JsonObject object = element.getAsJsonObject();
+            for (Map.Entry<String, JsonElement> entry : object.entrySet()) {
+                JsonElement value = entry.getValue();
+                if (shouldFormatTimestamp(entry.getKey(), value)) {
+                    long timestamp = value.getAsLong();
+                    object.addProperty(entry.getKey(), 
formatTimestamp(timestamp));
+                } else {
+                    formatTimestampFields(value);
+                }
+            }
+        } else if (element.isJsonArray()) {
+            JsonArray array = element.getAsJsonArray();
+            for (JsonElement child : array) {
+                formatTimestampFields(child);
+            }
+        }
+    }
+
+    private boolean shouldFormatTimestamp(String key, JsonElement element) {
+        if (!TIMESTAMP_FIELDS.contains(key) || element == null) {
+            return false;
+        }
+        if (!element.isJsonPrimitive()) {
+            return false;
+        }
+        JsonPrimitive primitive = element.getAsJsonPrimitive();
+        return primitive.isNumber();
+    }
+
+    private String formatTimestamp(long timestamp) {
+        return PRETTY_TIME_FORMATTER.format(Instant.ofEpochMilli(timestamp));
+    }
+}
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/ResourceDataSerializerHook.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/ResourceDataSerializerHook.java
index 09f9a4550c..5e5c12d706 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/ResourceDataSerializerHook.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/ResourceDataSerializerHook.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.engine.server.serializable;
 
 import 
org.apache.seatunnel.engine.common.serializeable.SeaTunnelFactoryIdConstant;
 import 
org.apache.seatunnel.engine.server.resourcemanager.opeartion.GetOverviewOperation;
+import 
org.apache.seatunnel.engine.server.resourcemanager.opeartion.GetPendingJobsOperation;
 import 
org.apache.seatunnel.engine.server.resourcemanager.opeartion.ReleaseSlotOperation;
 import 
org.apache.seatunnel.engine.server.resourcemanager.opeartion.RequestSlotOperation;
 import 
org.apache.seatunnel.engine.server.resourcemanager.opeartion.ResetResourceOperation;
@@ -53,6 +54,8 @@ public class ResourceDataSerializerHook implements 
DataSerializerHook {
 
     public static final int REQUEST_SLOT_INFO_TYPE = 9;
 
+    public static final int GET_PENDING_JOBS_TYPE = 10;
+
     public static final int FACTORY_ID =
             FactoryIdHelper.getFactoryId(
                     
SeaTunnelFactoryIdConstant.SEATUNNEL_RESOURCE_DATA_SERIALIZER_FACTORY,
@@ -91,6 +94,8 @@ public class ResourceDataSerializerHook implements 
DataSerializerHook {
                     return new SyncWorkerProfileOperation();
                 case REQUEST_SLOT_INFO_TYPE:
                     return new GetOverviewOperation();
+                case GET_PENDING_JOBS_TYPE:
+                    return new GetPendingJobsOperation();
                 default:
                     throw new IllegalArgumentException("Unknown type id " + 
typeId);
             }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/diagnostic/PendingDiagnosticsCollectorTest.java
 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/diagnostic/PendingDiagnosticsCollectorTest.java
new file mode 100644
index 0000000000..1b9fd5e58a
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/diagnostic/PendingDiagnosticsCollectorTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.diagnostic;
+
+import org.apache.seatunnel.engine.common.job.JobStatus;
+import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
+import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
+import org.apache.seatunnel.engine.server.dag.physical.PhysicalPlan;
+import org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex;
+import org.apache.seatunnel.engine.server.dag.physical.SubPlan;
+import org.apache.seatunnel.engine.server.execution.PendingJobInfo;
+import org.apache.seatunnel.engine.server.execution.PendingSourceState;
+import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
+import org.apache.seatunnel.engine.server.master.JobMaster;
+import org.apache.seatunnel.engine.server.resourcemanager.ResourceManager;
+import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+public class PendingDiagnosticsCollectorTest {
+
+    @Test
+    public void testCollectJobDiagnosticWithFailures() {
+        JobMaster jobMaster = Mockito.mock(JobMaster.class);
+        Mockito.when(jobMaster.getJobId()).thenReturn(1000L);
+        JobImmutableInformation jobImmutableInformation =
+                Mockito.mock(JobImmutableInformation.class);
+        
Mockito.when(jobImmutableInformation.getJobName()).thenReturn("test_job");
+        
Mockito.when(jobMaster.getJobImmutableInformation()).thenReturn(jobImmutableInformation);
+        Mockito.when(jobMaster.getJobStatus()).thenReturn(JobStatus.PENDING);
+
+        PhysicalPlan physicalPlan = Mockito.mock(PhysicalPlan.class);
+        Mockito.when(jobMaster.getPhysicalPlan()).thenReturn(physicalPlan);
+
+        SubPlan subPlan = Mockito.mock(SubPlan.class);
+        Mockito.when(subPlan.getPipelineId()).thenReturn(1);
+        Mockito.when(subPlan.getPipelineFullName()).thenReturn("pipeline-1");
+
+        PhysicalVertex vertexSuccess = Mockito.mock(PhysicalVertex.class);
+        TaskGroupLocation locationSuccess = new TaskGroupLocation(1000L, 1, 
1L);
+        
Mockito.when(vertexSuccess.getTaskGroupLocation()).thenReturn(locationSuccess);
+        
Mockito.when(vertexSuccess.getTaskFullName()).thenReturn("task-success");
+
+        PhysicalVertex vertexFailA = Mockito.mock(PhysicalVertex.class);
+        TaskGroupLocation locationFailA = new TaskGroupLocation(1000L, 1, 2L);
+        
Mockito.when(vertexFailA.getTaskGroupLocation()).thenReturn(locationFailA);
+        Mockito.when(vertexFailA.getTaskFullName()).thenReturn("task-fail-a");
+
+        PhysicalVertex vertexFailB = Mockito.mock(PhysicalVertex.class);
+        TaskGroupLocation locationFailB = new TaskGroupLocation(1000L, 1, 3L);
+        
Mockito.when(vertexFailB.getTaskGroupLocation()).thenReturn(locationFailB);
+        Mockito.when(vertexFailB.getTaskFullName()).thenReturn("task-fail-b");
+
+        
Mockito.when(subPlan.getCoordinatorVertexList()).thenReturn(Collections.emptyList());
+        Mockito.when(subPlan.getPhysicalVertexList())
+                .thenReturn(Arrays.asList(vertexSuccess, vertexFailA, 
vertexFailB));
+        
Mockito.when(physicalPlan.getPipelineList()).thenReturn(Collections.singletonList(subPlan));
+
+        Map<TaskGroupLocation, CompletableFuture<SlotProfile>> futures = new 
HashMap<>();
+        CompletableFuture<SlotProfile> successFuture =
+                
CompletableFuture.completedFuture(Mockito.mock(SlotProfile.class));
+        futures.put(locationSuccess, successFuture);
+
+        CompletableFuture<SlotProfile> failFutureA = new CompletableFuture<>();
+        failFutureA.completeExceptionally(new RuntimeException("no slot 
available"));
+        futures.put(locationFailA, failFutureA);
+
+        CompletableFuture<SlotProfile> failFutureB = new CompletableFuture<>();
+        failFutureB.completeExceptionally(new RuntimeException("worker busy"));
+        futures.put(locationFailB, failFutureB);
+
+        
Mockito.when(physicalPlan.getPreApplyResourceFutures()).thenReturn(futures);
+
+        PendingJobInfo pendingJobInfo = new 
PendingJobInfo(PendingSourceState.SUBMIT, jobMaster);
+
+        ResourceManager resourceManager = Mockito.mock(ResourceManager.class);
+        SlotProfile blockingSlot = Mockito.mock(SlotProfile.class);
+        Mockito.when(blockingSlot.getOwnerJobID()).thenReturn(2000L);
+        Mockito.when(resourceManager.getAssignedSlots(Mockito.anyMap()))
+                .thenReturn(Collections.singletonList(blockingSlot));
+
+        PendingJobDiagnostic diagnostic =
+                PendingDiagnosticsCollector.collectJobDiagnostic(
+                        pendingJobInfo, Collections.emptyMap(), 
resourceManager);
+
+        Assertions.assertEquals(2, diagnostic.getLackingTaskGroups());
+        Assertions.assertEquals("REQUEST_FAILED", 
diagnostic.getFailureReason());
+        Assertions.assertEquals(1, diagnostic.getBlockingJobIds().size());
+        Assertions.assertEquals(3, 
diagnostic.getPipelines().get(0).getTotalTaskGroups());
+        Assertions.assertEquals(2, 
diagnostic.getPipelines().get(0).getLackingTaskGroups());
+    }
+}


Reply via email to