This is an automated email from the ASF dual-hosted git repository.
corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 2475f1d568 [Feature][Core] Add REST API for viewing Pending queue
details (#10078)
2475f1d568 is described below
commit 2475f1d568f6e6caef20f2119b4f3754e4c47b4f
Author: Jast <[email protected]>
AuthorDate: Tue Nov 25 23:09:25 2025 +0800
[Feature][Core] Add REST API for viewing Pending queue details (#10078)
---
docs/en/seatunnel-engine/rest-api-v2.md | 155 +++++++++-
docs/zh/seatunnel-engine/rest-api-v2.md | 153 +++++++++-
.../seatunnel/engine/e2e/PendingJobsRestIT.java | 180 +++++++++++
.../src/test/resources/pending_jobs_streaming.conf | 42 +++
.../engine/server/CoordinatorService.java | 83 ++++++
.../seatunnel/engine/server/JettyService.java | 4 +
.../server/diagnostic/PendingClusterSnapshot.java | 37 +++
.../diagnostic/PendingDiagnosticsCollector.java | 329 +++++++++++++++++++++
.../server/diagnostic/PendingJobDiagnostic.java | 53 ++++
.../server/diagnostic/PendingJobsResponse.java | 35 +++
.../diagnostic/PendingPipelineDiagnostic.java | 38 +++
.../server/diagnostic/PendingQueueSummary.java | 35 +++
.../diagnostic/PendingTaskGroupDiagnostic.java | 38 +++
.../diagnostic/WorkerResourceDiagnostic.java | 40 +++
.../engine/server/execution/PendingJobInfo.java | 35 +++
.../opeartion/GetPendingJobsOperation.java | 88 ++++++
.../seatunnel/engine/server/rest/RestConstant.java | 4 +
.../server/rest/service/PendingJobsService.java | 45 +++
.../server/rest/servlet/PendingJobsServlet.java | 151 ++++++++++
.../serializable/ResourceDataSerializerHook.java | 5 +
.../PendingDiagnosticsCollectorTest.java | 114 +++++++
21 files changed, 1662 insertions(+), 2 deletions(-)
diff --git a/docs/en/seatunnel-engine/rest-api-v2.md
b/docs/en/seatunnel-engine/rest-api-v2.md
index edaa8721ae..2ba5504af4 100644
--- a/docs/en/seatunnel-engine/rest-api-v2.md
+++ b/docs/en/seatunnel-engine/rest-api-v2.md
@@ -123,6 +123,159 @@ Please refer [security](security.md)
------------------------------------------------------------------------------------------
+### Returns Diagnostic Information For Pending Jobs
+
+<details>
+ <summary><code>GET</code>
<code><b>/pending-jobs?jobId=123&limit=10</b></code> <code>(Inspect the pending
queue, slot usage and blocking reasons.)</code></summary>
+
+#### Parameters
+
+> | name | type | data type | description
|
+>
|----------|----------|-----------|-----------------------------------------------------------------------------|
+> | jobId | optional | long | If set, only returns the diagnostics for
the specified job. When both `jobId` and `limit` are provided, `jobId` takes
precedence and `limit` is ignored. |
+> | limit | optional | integer | Limits the number of jobs returned. This
parameter is ignored when `jobId` is provided. |
+> | pretty | optional | boolean | When `true`, pretty-print JSON and
format timestamp fields. |
+
+#### Responses
+
+```json
+{
+ "queueSummary": {
+ "size": 2,
+ "scheduleStrategy": "WAIT",
+ "oldestEnqueueTimestamp": 1717500000000,
+ "newestEnqueueTimestamp": 1717500005000,
+ "lackingTaskGroups": 6
+ },
+ "clusterSnapshot": {
+ "totalSlots": 8,
+ "freeSlots": 1,
+ "assignedSlots": 7,
+ "workerCount": 2,
+ "workers": [
+ {
+ "address": "10.0.0.8:5801",
+ "tags": {
+ "zone": "az1"
+ },
+ "totalSlots": 4,
+ "freeSlots": 0,
+ "dynamicSlot": false,
+ "cpuUsage": 0.83,
+ "memUsage": 0.64,
+ "runningJobIds": [
+ 1001,
+ 1002
+ ]
+ }
+ ]
+ },
+ "pendingJobs": [
+ {
+ "jobId": 1003,
+ "jobName": "cdc_mysql_to_es",
+ "pendingSourceState": "SUBMIT",
+ "jobStatus": "PENDING",
+ "enqueueTimestamp": 1717500000000,
+ "checkTime": 1717500005000,
+ "waitDurationMs": 5000,
+ "checkCount": 3,
+ "totalTaskGroups": 16,
+ "allocatedTaskGroups": 10,
+ "lackingTaskGroups": 6,
+ "failureReason": "REQUEST_FAILED",
+ "failureMessage": "NoEnoughResourceException: can't apply resource
request",
+ "tagFilter": {},
+ "blockingJobIds": [
+ 1001
+ ],
+ "pipelines": [
+ {
+ "pipelineId": 1,
+ "pipelineName": "Job job-name, Pipeline: [(1/2)]",
+ "totalTaskGroups": 8,
+ "allocatedTaskGroups": 5,
+ "lackingTaskGroups": 3,
+ "taskGroupDiagnostics": [
+ {
+ "taskGroupLocation": {
+ "jobId": 1003,
+ "pipelineId": 1,
+ "taskGroupId": 1
+ },
+ "taskFullName": "Source[0]",
+ "allocated": false,
+ "failureReason": "REQUEST_FAILED",
+ "failureMessage": "NoEnoughResourceException: slot not enough"
+ }
+ ]
+ }
+ ],
+ "lackingTaskGroupDiagnostics": [
+ {
+ "taskGroupLocation": {
+ "jobId": 1003,
+ "pipelineId": 1,
+ "taskGroupId": 1
+ },
+ "taskFullName": "Source[0]",
+ "allocated": false,
+ "failureReason": "REQUEST_FAILED",
+ "failureMessage": "NoEnoughResourceException: slot not enough"
+ }
+ ]
+ }
+ ]
+}
+```
+
+When `pretty=true`, the endpoint returns a pretty-printed JSON response and
formats `oldestEnqueueTimestamp`, `newestEnqueueTimestamp`, `enqueueTimestamp`,
and `checkTime` as `yyyy-MM-dd HH:mm:ss`.
+
+This endpoint helps troubleshoot why jobs stay in `PENDING` by showing the
pending queue order, aggregated resource view, and per task-group slot request
failures (tag mismatch, worker busy, resource exhausted, etc.).
+
+**Pending Jobs Response Fields**
+
+- **queueSummary** – overview of the entire pending queue.
+ - `size`: number of jobs currently pending.
+ - `scheduleStrategy`: strategy in use (e.g. `WAIT`, `FAIL_FAST`) that
dictates what happens when resources are insufficient.
+ - `oldestEnqueueTimestamp` / `newestEnqueueTimestamp`: timestamps (ms) of
the oldest/latest job in the queue.
+ - `lackingTaskGroups`: total TaskGroup count still waiting for slots.
**Note**: This value reflects only the jobs included in the current response
(i.e., the subset limited by the `limit` parameter or filtered by `jobId`), not
the entire pending queue. To view the complete statistics for all pending jobs,
call this API without the `limit` parameter.
+- **clusterSnapshot** – cluster resource snapshot (can be filtered by tags).
+ - `totalSlots` / `assignedSlots` / `freeSlots`: total, allocated and
remaining slots in the filtered view.
+ - `workerCount`: number of workers that match the tag filters.
+ - `workers[]`: per-worker details:
+ - `address`: host:port of the worker.
+ - `tags`: worker-level tags.
+ - `totalSlots` / `freeSlots`: slot capacity and available slot count on
that worker.
+ - `dynamicSlot`: whether the worker uses dynamic slot allocation.
+ - `cpuUsage` / `memUsage`: sampled system load (only present when
`slot-allocate-strategy` is `SYSTEM_LOAD`).
+ - `runningJobIds[]`: jobs currently occupying slots on that worker (helps
identify blockers).
+- **pendingJobs[]** – diagnostics for each pending job.
+ - `jobId` / `jobName`: identifiers.
+ - `pendingSourceState`: whether the job comes from a new submission
(`SUBMIT`) or master switch restore (`RESTORE`).
+ - `jobStatus`: status recorded in the physical plan (typically `PENDING`).
+ - `enqueueTimestamp`: when the job entered the pending queue.
+ - `checkTime`: timestamp of the latest diagnostic snapshot.
+ - `waitDurationMs`: `checkTime - enqueueTimestamp`.
+ - `checkCount`: how many times the scheduler has checked this job.
+ - `totalTaskGroups` / `allocatedTaskGroups` / `lackingTaskGroups`: TaskGroup
totals vs. assigned vs. lacking.
+ - `failureReason` / `failureMessage`: classified cause (e.g.
`RESOURCE_NOT_ENOUGH`, `REQUEST_FAILED`) plus raw message.
+ - `tagFilter`: worker tag requirements declared by the job (if any).
+ - `blockingJobIds[]`: other jobs that currently occupy the required slots.
+ - `pipelines[]`: per-pipeline breakdown.
+ - `pipelineId` / `pipelineName`.
+ - `totalTaskGroups` / `allocatedTaskGroups` / `lackingTaskGroups`.
+ - `taskGroupDiagnostics[]` (per TaskGroup slot request state):
+ - `taskGroupLocation` (`jobId`, `pipelineId`, `taskGroupId`).
+ - `taskFullName`: human-readable name (source/sink, etc.).
+ - `allocated`: whether the slot request succeeded.
+ - `failureReason` / `failureMessage`: task-level cause when allocation
failed.
+ - `lackingTaskGroupDiagnostics[]`: flattened list of `allocated=false`
TaskGroups for quick review.
+
+</details>
+
+------------------------------------------------------------------------------------------
+
### Return Details Of A Job
<details>
@@ -956,4 +1109,4 @@ To get the metrics, you need to open `Telemetry` first, or
you will get an empty
More information about `Telemetry` can be found in the
[Telemetry](telemetry.md) documentation.
-</details>
\ No newline at end of file
+</details>
diff --git a/docs/zh/seatunnel-engine/rest-api-v2.md
b/docs/zh/seatunnel-engine/rest-api-v2.md
index 97c7c6406c..9b72a38300 100644
--- a/docs/zh/seatunnel-engine/rest-api-v2.md
+++ b/docs/zh/seatunnel-engine/rest-api-v2.md
@@ -119,6 +119,157 @@ seatunnel:
------------------------------------------------------------------------------------------
+### 查看 Pending 队列详细信息
+
+<details>
+ <summary><code>GET</code>
<code><b>/pending-jobs?jobId=123&limit=10</b></code> <code>(用于排查作业长时间处于 PENDING
的原因。)</code></summary>
+
+#### 参数
+
+> | 参数名称 | 是否必传 | 参数类型 | 描述 |
+> |----------|----------|----------|--------------------------------|
+> | jobId | 可选 | long | 只查看指定作业的诊断信息。当同时提供 `jobId` 和 `limit`
时,`jobId` 优先生效,`limit` 将被忽略。 |
+> | limit | 可选 | integer | 限制返回的PENDING作业数量。当提供 `jobId` 参数时此参数将被忽略。 |
+> | pretty | 可选 | boolean | 传入 `true` 时返回格式化 JSON,并格式化时间戳。 |
+
+#### 响应
+
+```json
+{
+ "queueSummary": {
+ "size": 2,
+ "scheduleStrategy": "WAIT",
+ "oldestEnqueueTimestamp": 1717500000000,
+ "newestEnqueueTimestamp": 1717500005000,
+ "lackingTaskGroups": 6
+ },
+ "clusterSnapshot": {
+ "totalSlots": 8,
+ "freeSlots": 1,
+ "assignedSlots": 7,
+ "workerCount": 2,
+ "workers": [
+ {
+ "address": "10.0.0.8:5801",
+ "tags": {
+ "zone": "az1"
+ },
+ "totalSlots": 4,
+ "freeSlots": 0,
+ "dynamicSlot": false,
+ "cpuUsage": 0.83,
+ "memUsage": 0.64,
+ "runningJobIds": [
+ 1001,
+ 1002
+ ]
+ }
+ ]
+ },
+ "pendingJobs": [
+ {
+ "jobId": 1003,
+ "jobName": "cdc_mysql_to_es",
+ "pendingSourceState": "SUBMIT",
+ "jobStatus": "PENDING",
+ "enqueueTimestamp": 1717500000000,
+ "checkTime": 1717500005000,
+ "waitDurationMs": 5000,
+ "checkCount": 3,
+ "totalTaskGroups": 16,
+ "allocatedTaskGroups": 10,
+ "lackingTaskGroups": 6,
+ "failureReason": "REQUEST_FAILED",
+ "failureMessage": "NoEnoughResourceException: can't apply resource
request",
+ "tagFilter": {},
+ "blockingJobIds": [
+ 1001
+ ],
+ "pipelines": [
+ {
+ "pipelineId": 1,
+ "pipelineName": "Job job-name, Pipeline: [(1/2)]",
+ "totalTaskGroups": 8,
+ "allocatedTaskGroups": 5,
+ "lackingTaskGroups": 3,
+ "taskGroupDiagnostics": [
+ {
+ "taskGroupLocation": {
+ "jobId": 1003,
+ "pipelineId": 1,
+ "taskGroupId": 1
+ },
+ "taskFullName": "Source[0]",
+ "allocated": false,
+ "failureReason": "REQUEST_FAILED",
+ "failureMessage": "NoEnoughResourceException: slot not enough"
+ }
+ ]
+ }
+ ],
+ "lackingTaskGroupDiagnostics": [
+ {
+ "taskGroupLocation": {
+ "jobId": 1003,
+ "pipelineId": 1,
+ "taskGroupId": 1
+ },
+ "taskFullName": "Source[0]",
+ "allocated": false,
+ "failureReason": "REQUEST_FAILED",
+ "failureMessage": "NoEnoughResourceException: slot not enough"
+ }
+ ]
+ }
+ ]
+}
+```
+
+当 `pretty=true` 时,接口会返回格式化后的 JSON,并把
`oldestEnqueueTimestamp`、`newestEnqueueTimestamp`、`enqueueTimestamp`、`checkTime`
转为 `yyyy-MM-dd HH:mm:ss` 字符串,方便排查。
+
+响应中包含:
+
+- **queueSummary**:Pending 队列整体信息总结
+ - `size`:当前排队的 Job 数量。
+ - `scheduleStrategy`:调度策略,决定资源不足时的处理方式。
+ - `oldestEnqueueTimestamp` / `newestEnqueueTimestamp`:最久/最新进入 Pending 队列 Job
的时间戳(毫秒)。
+ - `lackingTaskGroups`:尚未分配 Slot 的 TaskGroup 数量。**注意**:该值仅统计当前响应中返回的作业子集(即受
`limit` 参数限制或 `jobId` 过滤后的作业),而非整个 Pending 队列的完整统计。如需查看所有 Pending 作业的完整统计信息,请不带
`limit` 参数调用此接口。
+- **clusterSnapshot**:当前集群的资源视图。
+ - `totalSlots` / `assignedSlots` / `freeSlots`:Slot 总数、已分配数、剩余数。
+ - `workerCount`:Worker 数量。
+ - `workers[]`:
+ - `address`:Worker 地址(host:port)。
+ - `tags`:Worker 自带的标签。
+ - `totalSlots` / `freeSlots`:Worker 的 Slot 总数与剩余数。
+ - `dynamicSlot`:是否启用动态 Slot。
+ - `cpuUsage` / `memUsage`:系统负载采样(只有当 `slot-allocate-strategy: SYSTEM_LOAD`
才会有该值)
+ - `runningJobIds[]`:当前占用 Worker Slot 的 JobId 列表。
+- **pendingJobs[]**:队列中的每个 Job 的诊断信息。
+ - `jobId` / `jobName`:作业标识。
+ - `pendingSourceState`:取值:`SUBMIT`,`RESTORE`。
+ - `jobStatus`:物理计划记录的状态(固定为 `PENDING`)。
+ - `enqueueTimestamp`:进入 Pending 队列的时间。
+ - `checkTime`:最近一次Pending检查时间。
+ - `waitDurationMs`:等待时长(`checkTime - enqueueTimestamp`)。
+ - `checkCount`:已被调度线程检查的次数。
+ - `totalTaskGroups` / `allocatedTaskGroups` / `lackingTaskGroups`:Job 全部
TaskGroup 数量、已分配 Slot 的数量、缺少 Slot 的数量。
+ - `failureReason` / `failureMessage`:导致本次资源申请失败的归类及具体信息(如
`RESOURCE_NOT_ENOUGH`、`REQUEST_FAILED` 等)。
+ - `tagFilter`:Job 要求的 Worker 标签(若配置)。
+ - `blockingJobIds[]`:当前占用 Slot 的其他 JobId,用来分析资源竞争。
+ - `pipelines[]`:按 Pipeline 细分:
+ - `pipelineId` / `pipelineName`:
+ - `totalTaskGroups` / `allocatedTaskGroups` / `lackingTaskGroups`:Pipeline
里 TaskGroup 的总数、已分配 Slot 数量、缺少 Slot 的数量。
+ - `taskGroupDiagnostics[]`:每个 TaskGroup 的 Slot 请求状态:
+ - `taskGroupLocation`(`jobId`, `pipelineId`, `taskGroupId`)。
+ - `taskFullName`:方便直接定位 source/sink。
+ - `allocated`:是否已经成功申请 Slot。
+ - `failureReason` / `failureMessage`:TaskGroup 层面的失败原因。
+ - `lackingTaskGroupDiagnostics[]`:聚合所有 `allocated=false` 的 TaskGroup,方便快速查看缺
Slot 的具体任务。
+
+</details>
+
+------------------------------------------------------------------------------------------
+
### 返回作业的详细信息
<details>
@@ -945,4 +1096,4 @@ curl --location 'http://127.0.0.1:8080/submit-job/upload'
--form 'config_file=@"
更多关于`Telemetry`的信息可以在[Telemetry](telemetry.md)文档中找到。
-</details>
\ No newline at end of file
+</details>
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/PendingJobsRestIT.java
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/PendingJobsRestIT.java
new file mode 100644
index 0000000000..4d63d871ad
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/PendingJobsRestIT.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.e2e;
+
+import org.apache.seatunnel.common.config.Common;
+import org.apache.seatunnel.common.config.DeployMode;
+import org.apache.seatunnel.engine.client.SeaTunnelClient;
+import org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment;
+import org.apache.seatunnel.engine.client.job.ClientJobProxy;
+import org.apache.seatunnel.engine.common.config.ConfigProvider;
+import org.apache.seatunnel.engine.common.config.JobConfig;
+import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
+import org.apache.seatunnel.engine.common.config.server.ScheduleStrategy;
+import org.apache.seatunnel.engine.common.job.JobStatus;
+import org.apache.seatunnel.engine.server.SeaTunnelServerStarter;
+import org.apache.seatunnel.engine.server.rest.RestConstant;
+
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import com.hazelcast.client.config.ClientConfig;
+import com.hazelcast.instance.impl.HazelcastInstanceImpl;
+import io.restassured.response.Response;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import static io.restassured.RestAssured.given;
+
+@Slf4j
+public class PendingJobsRestIT {
+
+ private static final String HOST = "http://localhost:";
+ private static final String JOB_FILE = "pending_jobs_streaming.conf";
+
+ private HazelcastInstanceImpl node;
+ private SeaTunnelClient engineClient;
+ private SeaTunnelConfig seaTunnelConfig;
+ private final List<ClientJobProxy> submittedJobs = new ArrayList<>();
+ private int httpPort;
+
+ @BeforeEach
+ void setUp() throws Exception {
+ String testClusterName = TestUtils.getClusterName("PendingJobsRestIT");
+ seaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig();
+ seaTunnelConfig.getHazelcastConfig().setClusterName(testClusterName);
+
seaTunnelConfig.getEngineConfig().getSlotServiceConfig().setDynamicSlot(false);
+ seaTunnelConfig.getEngineConfig().getSlotServiceConfig().setSlotNum(2);
+
seaTunnelConfig.getEngineConfig().setScheduleStrategy(ScheduleStrategy.WAIT);
+ seaTunnelConfig.getEngineConfig().getHttpConfig().setEnabled(true);
+
seaTunnelConfig.getEngineConfig().getHttpConfig().setEnableDynamicPort(false);
+ seaTunnelConfig.getEngineConfig().getHttpConfig().setPort(18082);
+
seaTunnelConfig.getEngineConfig().getHttpConfig().setContextPath("/seatunnel");
+ httpPort = seaTunnelConfig.getEngineConfig().getHttpConfig().getPort();
+
+ node = SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
+
+ Common.setDeployMode(DeployMode.CLIENT);
+ ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
+ clientConfig.setClusterName(testClusterName);
+ engineClient = new SeaTunnelClient(clientConfig);
+ }
+
+ @AfterEach
+ void tearDown() {
+ submittedJobs.forEach(
+ job -> {
+ try {
+ job.cancelJob();
+ } catch (Exception e) {
+ log.warn("Failed to cancel job {}: {}",
job.getJobId(), e.getMessage());
+ }
+ });
+ submittedJobs.clear();
+ if (engineClient != null) {
+ engineClient.close();
+ }
+ if (node != null) {
+ node.shutdown();
+ }
+ }
+
+ @Test
+ void testPendingJobsEndpoint() {
+ String jobName = "pending_waiting_job";
+ ClientJobProxy pendingJob = submitStreamingJob(jobName);
+ waitForStatus(pendingJob, JobStatus.PENDING);
+
+ assertPendingJobVisible(pendingJob.getJobId(), jobName,
JobStatus.PENDING);
+ }
+
+ private ClientJobProxy submitStreamingJob(String jobName) {
+ JobConfig jobConfig = new JobConfig();
+ jobConfig.setName(jobName);
+ String filePath = TestUtils.getResource(JOB_FILE);
+ ClientJobExecutionEnvironment env =
+ engineClient.createExecutionContext(filePath, jobConfig,
seaTunnelConfig);
+ ClientJobProxy jobProxy;
+ try {
+ jobProxy = env.execute();
+ } catch (ExecutionException | InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException("Failed to submit job " + jobName, e);
+ }
+ submittedJobs.add(jobProxy);
+ return jobProxy;
+ }
+
+ private void waitForStatus(ClientJobProxy jobProxy, JobStatus
expectedStatus) {
+ Awaitility.await()
+ .atMost(120, TimeUnit.SECONDS)
+ .until(() -> jobProxy.getJobStatus() == expectedStatus);
+ }
+
+ private void assertPendingJobVisible(
+ long pendingJobId, String expectedJobName, JobStatus
expectedJobStatus) {
+ String baseUrl =
+ HOST
+ + httpPort
+ +
seaTunnelConfig.getEngineConfig().getHttpConfig().getContextPath()
+ + RestConstant.REST_URL_PENDING_JOBS;
+ Awaitility.await()
+ .atMost(60, TimeUnit.SECONDS)
+ .untilAsserted(
+ () -> {
+ Response response =
+ given().get(baseUrl)
+ .then()
+ .statusCode(200)
+ .extract()
+ .response();
+ List<Map<String, Object>> pendingJobs =
+ response.jsonPath().getList("pendingJobs");
+ Assertions.assertNotNull(pendingJobs);
+ Map<String, Object> job =
+ pendingJobs.stream()
+ .filter(
+ pendingJob ->
+ ((Number)
+
pendingJob.get(
+
RestConstant
+
.JOB_ID))
+
.longValue()
+ ==
pendingJobId)
+ .findFirst()
+ .orElseThrow(
+ () ->
+ new AssertionError(
+ "Pending
job "
+ +
pendingJobId
+ +
" not found"));
+ Assertions.assertEquals(
+ expectedJobName,
job.get(RestConstant.JOB_NAME));
+ Assertions.assertEquals(
+ expectedJobStatus.name(),
job.get(RestConstant.JOB_STATUS));
+ });
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/pending_jobs_streaming.conf
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/pending_jobs_streaming.conf
new file mode 100644
index 0000000000..4317278dd2
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/pending_jobs_streaming.conf
@@ -0,0 +1,42 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ job.mode = "STREAMING"
+}
+
+source {
+ FakeSource {
+ parallelism = 2
+ row.num = 1000000
+ split.read-interval = 100
+ schema = {
+ fields {
+ c_int = int
+ }
+ }
+ }
+}
+
+transform {
+}
+
+sink {
+ InMemory {
+ writer_sleep = true
+ }
+}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
index 505517f6c7..7c35bd7ec6 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
@@ -47,6 +47,10 @@ import org.apache.seatunnel.engine.core.job.PipelineStatus;
import org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex;
import org.apache.seatunnel.engine.server.dag.physical.PipelineLocation;
import org.apache.seatunnel.engine.server.dag.physical.SubPlan;
+import
org.apache.seatunnel.engine.server.diagnostic.PendingDiagnosticsCollector;
+import org.apache.seatunnel.engine.server.diagnostic.PendingJobDiagnostic;
+import org.apache.seatunnel.engine.server.diagnostic.PendingJobsResponse;
+import org.apache.seatunnel.engine.server.diagnostic.PendingQueueSummary;
import org.apache.seatunnel.engine.server.event.JobEventHttpReportHandler;
import org.apache.seatunnel.engine.server.event.JobEventProcessor;
import org.apache.seatunnel.engine.server.execution.ExecutionState;
@@ -82,7 +86,9 @@ import com.hazelcast.spi.impl.NodeEngineImpl;
import lombok.NonNull;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -261,6 +267,17 @@ public class CoordinatorService {
boolean preApplyResources = jobMaster.preApplyResources();
if (!preApplyResources) {
+ try {
+ PendingJobDiagnostic diagnostic =
+ PendingDiagnosticsCollector.collectJobDiagnostic(
+ pendingJobInfo, Collections.emptyMap(),
getResourceManager());
+ pendingJobInfo.recordSnapshot(diagnostic);
+ } catch (Exception e) {
+ logger.warning(
+ String.format(
+ "Collect pending diagnostic for job %s failed:
%s",
+ jobId, ExceptionUtils.getMessage(e)));
+ }
logger.info(
String.format(
"Current strategy is %s, and resources is not
enough, skipping this schedule, JobID: %s",
@@ -1074,6 +1091,72 @@ public class CoordinatorService {
return connectorPackageService;
}
+ public PendingJobsResponse getPendingJobs(Map<String, String> tags, Long
jobId, int limit) {
+ Collection<PendingJobInfo> allPendingJobs =
+ new ArrayList<>(pendingJobQueue.getJobIdMap().values());
+
+ List<PendingJobInfo> selectedJobs = new ArrayList<>();
+ if (jobId != null) {
+ PendingJobInfo pendingJobInfo = pendingJobQueue.getById(jobId);
+ if (pendingJobInfo != null) {
+ selectedJobs.add(pendingJobInfo);
+ }
+ } else {
+ selectedJobs.addAll(allPendingJobs);
+
selectedJobs.sort(Comparator.comparingLong(PendingJobInfo::getEnqueueTimestamp));
+ if (limit > 0 && selectedJobs.size() > limit) {
+ selectedJobs = new ArrayList<>(selectedJobs.subList(0, limit));
+ }
+ }
+
+ ResourceManager resourceManager = getResourceManager();
+ List<PendingJobDiagnostic> diagnostics = new ArrayList<>();
+ for (PendingJobInfo jobInfo : selectedJobs) {
+ PendingJobDiagnostic diagnostic = jobInfo.getLastSnapshot();
+ if (diagnostic == null) {
+ diagnostic =
+ PendingDiagnosticsCollector.collectJobDiagnostic(
+ jobInfo, tags, resourceManager);
+ if (diagnostic != null) {
+ diagnostic.setCheckCount(jobInfo.getCheckTimes());
+ }
+ }
+ if (diagnostic != null) {
+ diagnostics.add(diagnostic);
+ }
+ }
+
+ PendingJobsResponse response = new PendingJobsResponse();
+ response.setPendingJobs(diagnostics);
+ response.setClusterSnapshot(
+
PendingDiagnosticsCollector.collectClusterSnapshot(resourceManager, tags));
+ response.setQueueSummary(buildQueueSummary(allPendingJobs,
diagnostics));
+ return response;
+ }
+
+ private PendingQueueSummary buildQueueSummary(
+ Collection<PendingJobInfo> pendingJobs, List<PendingJobDiagnostic>
diagnostics) {
+ PendingQueueSummary summary = new PendingQueueSummary();
+ summary.setSize(pendingJobQueue.size());
+ summary.setScheduleStrategy(scheduleStrategy.name());
+ summary.setLackingTaskGroups(
+
diagnostics.stream().mapToInt(PendingJobDiagnostic::getLackingTaskGroups).sum());
+
+ if (!pendingJobs.isEmpty()) {
+ summary.setOldestEnqueueTimestamp(
+ pendingJobs.stream()
+ .mapToLong(PendingJobInfo::getEnqueueTimestamp)
+ .min()
+ .orElse(0L));
+ summary.setNewestEnqueueTimestamp(
+ pendingJobs.stream()
+ .mapToLong(PendingJobInfo::getEnqueueTimestamp)
+ .max()
+ .orElse(0L));
+ }
+ return summary;
+ }
+
public int getPendingJobCount() {
return pendingJobQueue.getJobIdMap().size();
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/JettyService.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/JettyService.java
index 1d98f25843..8d06b827eb 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/JettyService.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/JettyService.java
@@ -37,6 +37,7 @@ import
org.apache.seatunnel.engine.server.rest.servlet.FinishedJobsServlet;
import org.apache.seatunnel.engine.server.rest.servlet.JobInfoServlet;
import org.apache.seatunnel.engine.server.rest.servlet.MetricsServlet;
import org.apache.seatunnel.engine.server.rest.servlet.OverviewServlet;
+import org.apache.seatunnel.engine.server.rest.servlet.PendingJobsServlet;
import org.apache.seatunnel.engine.server.rest.servlet.RunningJobsServlet;
import org.apache.seatunnel.engine.server.rest.servlet.RunningThreadsServlet;
import org.apache.seatunnel.engine.server.rest.servlet.StopJobServlet;
@@ -70,6 +71,7 @@ import static
org.apache.seatunnel.engine.server.rest.RestConstant.REST_URL_LOGS
import static
org.apache.seatunnel.engine.server.rest.RestConstant.REST_URL_METRICS;
import static
org.apache.seatunnel.engine.server.rest.RestConstant.REST_URL_OPEN_METRICS;
import static
org.apache.seatunnel.engine.server.rest.RestConstant.REST_URL_OVERVIEW;
+import static
org.apache.seatunnel.engine.server.rest.RestConstant.REST_URL_PENDING_JOBS;
import static
org.apache.seatunnel.engine.server.rest.RestConstant.REST_URL_RUNNING_JOB;
import static
org.apache.seatunnel.engine.server.rest.RestConstant.REST_URL_RUNNING_JOBS;
import static
org.apache.seatunnel.engine.server.rest.RestConstant.REST_URL_RUNNING_THREADS;
@@ -174,6 +176,7 @@ public class JettyService {
ServletHolder overviewHolder = new ServletHolder(new
OverviewServlet(nodeEngine));
ServletHolder runningJobsHolder = new ServletHolder(new
RunningJobsServlet(nodeEngine));
+ ServletHolder pendingJobsHolder = new ServletHolder(new
PendingJobsServlet(nodeEngine));
ServletHolder finishedJobsHolder = new ServletHolder(new
FinishedJobsServlet(nodeEngine));
ServletHolder systemMonitoringHolder =
new ServletHolder(new SystemMonitoringServlet(nodeEngine));
@@ -203,6 +206,7 @@ public class JettyService {
context.addServlet(overviewHolder,
convertUrlToPath(REST_URL_OVERVIEW));
context.addServlet(runningJobsHolder,
convertUrlToPath(REST_URL_RUNNING_JOBS));
+ context.addServlet(pendingJobsHolder,
convertUrlToPath(REST_URL_PENDING_JOBS));
context.addServlet(finishedJobsHolder,
convertUrlToPath(REST_URL_FINISHED_JOBS));
context.addServlet(
systemMonitoringHolder,
convertUrlToPath(REST_URL_SYSTEM_MONITORING_INFORMATION));
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/diagnostic/PendingClusterSnapshot.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/diagnostic/PendingClusterSnapshot.java
new file mode 100644
index 0000000000..c6272acb19
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/diagnostic/PendingClusterSnapshot.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.diagnostic;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class PendingClusterSnapshot implements Serializable {
+ private int totalSlots;
+ private int freeSlots;
+ private int assignedSlots;
+ private int workerCount;
+ private List<WorkerResourceDiagnostic> workers = new ArrayList<>();
+}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/diagnostic/PendingDiagnosticsCollector.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/diagnostic/PendingDiagnosticsCollector.java
new file mode 100644
index 0000000000..4044307477
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/diagnostic/PendingDiagnosticsCollector.java
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.diagnostic;
+
+import org.apache.seatunnel.common.utils.ExceptionUtils;
+import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
+import org.apache.seatunnel.engine.server.dag.physical.PhysicalPlan;
+import org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex;
+import org.apache.seatunnel.engine.server.dag.physical.SubPlan;
+import org.apache.seatunnel.engine.server.execution.PendingJobInfo;
+import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
+import org.apache.seatunnel.engine.server.master.JobMaster;
+import org.apache.seatunnel.engine.server.resourcemanager.ResourceManager;
+import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
+import
org.apache.seatunnel.engine.server.resourcemanager.resource.SystemLoadInfo;
+import org.apache.seatunnel.engine.server.resourcemanager.worker.WorkerProfile;
+
+import com.hazelcast.cluster.Address;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletionException;
+import java.util.stream.Collectors;
+
+@Slf4j
+public final class PendingDiagnosticsCollector {
+
+ private static final String REASON_WAITING = "WAITING_SLOT_ASSIGNMENT";
+ private static final String REASON_RESOURCE_NOT_ENOUGH =
"RESOURCE_NOT_ENOUGH";
+ private static final String REASON_REQUEST_FAILED = "REQUEST_FAILED";
+ private static final String REASON_REQUEST_CANCELLED = "REQUEST_CANCELLED";
+
+ private PendingDiagnosticsCollector() {}
+
+ public static PendingJobDiagnostic collectJobDiagnostic(
+ PendingJobInfo pendingJobInfo,
+ Map<String, String> tagFilter,
+ ResourceManager resourceManager) {
+ if (pendingJobInfo == null) {
+ return null;
+ }
+ JobMaster jobMaster = pendingJobInfo.getJobMaster();
+ PendingJobDiagnostic diagnostic = new PendingJobDiagnostic();
+ diagnostic.setJobId(jobMaster.getJobId());
+
diagnostic.setJobName(jobMaster.getJobImmutableInformation().getJobName());
+
diagnostic.setPendingSourceState(pendingJobInfo.getPendingSourceState());
+ diagnostic.setJobStatus(jobMaster.getJobStatus());
+ diagnostic.setEnqueueTimestamp(pendingJobInfo.getEnqueueTimestamp());
+ diagnostic.setCheckTime(System.currentTimeMillis());
+ diagnostic.setWaitDurationMs(
+ diagnostic.getCheckTime() -
pendingJobInfo.getEnqueueTimestamp());
+ diagnostic.setTagFilter(
+ tagFilter == null ? Collections.emptyMap() : new
HashMap<>(tagFilter));
+ Map<TaskGroupLocation, CompletableFuture<SlotProfile>> requestFutures =
+ Optional.ofNullable(jobMaster.getPhysicalPlan())
+ .map(PhysicalPlan::getPreApplyResourceFutures)
+ .map(HashMap::new)
+ .orElseGet(HashMap::new);
+
+ buildPipelineDiagnostics(jobMaster, requestFutures, diagnostic);
+ diagnostic.setTotalTaskGroups(
+ diagnostic.getPipelines().stream()
+
.mapToInt(PendingPipelineDiagnostic::getTotalTaskGroups)
+ .sum());
+ diagnostic.setAllocatedTaskGroups(
+ diagnostic.getPipelines().stream()
+
.mapToInt(PendingPipelineDiagnostic::getAllocatedTaskGroups)
+ .sum());
+ diagnostic.setLackingTaskGroups(
+ diagnostic.getPipelines().stream()
+
.mapToInt(PendingPipelineDiagnostic::getLackingTaskGroups)
+ .sum());
+
+ updateFailureReason(diagnostic);
+ diagnostic.setBlockingJobIds(
+ collectBlockingJobs(resourceManager, jobMaster.getJobId(),
tagFilter));
+
+ return diagnostic;
+ }
+
+ private static void buildPipelineDiagnostics(
+ JobMaster jobMaster,
+ Map<TaskGroupLocation, CompletableFuture<SlotProfile>>
requestFutures,
+ PendingJobDiagnostic diagnostic) {
+ PhysicalPlan plan = jobMaster.getPhysicalPlan();
+ if (plan == null) {
+ diagnostic.setFailureReason(REASON_WAITING);
+ diagnostic.setFailureMessage("Job master not initialized");
+ return;
+ }
+ for (SubPlan subPlan : plan.getPipelineList()) {
+ PendingPipelineDiagnostic pipelineDiagnostic = new
PendingPipelineDiagnostic();
+ pipelineDiagnostic.setPipelineId(subPlan.getPipelineId());
+ pipelineDiagnostic.setPipelineName(subPlan.getPipelineFullName());
+
+ List<PhysicalVertex> vertices = new ArrayList<>();
+ vertices.addAll(subPlan.getCoordinatorVertexList());
+ vertices.addAll(subPlan.getPhysicalVertexList());
+
+ int allocated = 0;
+ int lacking = 0;
+ for (PhysicalVertex vertex : vertices) {
+ TaskGroupLocation location = vertex.getTaskGroupLocation();
+ PendingTaskGroupDiagnostic taskDiagnostic =
+ buildTaskDiagnostic(
+ location, vertex.getTaskFullName(),
requestFutures.get(location));
+
pipelineDiagnostic.getTaskGroupDiagnostics().add(taskDiagnostic);
+ if (taskDiagnostic.isAllocated()) {
+ allocated++;
+ } else {
+ lacking++;
+
diagnostic.getLackingTaskGroupDiagnostics().add(taskDiagnostic);
+ }
+ }
+
+ pipelineDiagnostic.setTotalTaskGroups(vertices.size());
+ pipelineDiagnostic.setAllocatedTaskGroups(allocated);
+ pipelineDiagnostic.setLackingTaskGroups(lacking);
+ diagnostic.getPipelines().add(pipelineDiagnostic);
+ }
+ }
+
+ private static PendingTaskGroupDiagnostic buildTaskDiagnostic(
+ TaskGroupLocation location,
+ String taskFullName,
+ CompletableFuture<SlotProfile> future) {
+ PendingTaskGroupDiagnostic diagnostic = new
PendingTaskGroupDiagnostic();
+ diagnostic.setTaskGroupLocation(location);
+ diagnostic.setTaskFullName(taskFullName);
+
+ if (future == null) {
+ diagnostic.setAllocated(false);
+ diagnostic.setFailureReason(REASON_RESOURCE_NOT_ENOUGH);
+ diagnostic.setFailureMessage("Slot request future not created");
+ return diagnostic;
+ }
+
+ if (future.isCancelled()) {
+ diagnostic.setAllocated(false);
+ diagnostic.setFailureReason(REASON_REQUEST_CANCELLED);
+ diagnostic.setFailureMessage("Slot request cancelled by resource
manager");
+ return diagnostic;
+ }
+
+ if (!future.isDone()) {
+ diagnostic.setAllocated(false);
+ diagnostic.setFailureReason(REASON_WAITING);
+ diagnostic.setFailureMessage("Slot request still pending");
+ return diagnostic;
+ }
+ try {
+ SlotProfile slotProfile = future.join();
+ if (slotProfile != null) {
+ diagnostic.setAllocated(true);
+ return diagnostic;
+ }
+ diagnostic.setAllocated(false);
+ diagnostic.setFailureReason(REASON_RESOURCE_NOT_ENOUGH);
+ diagnostic.setFailureMessage("No available slot profile");
+ } catch (CompletionException e) {
+ diagnostic.setAllocated(false);
+ diagnostic.setFailureReason(REASON_REQUEST_FAILED);
+ diagnostic.setFailureMessage(ExceptionUtils.getMessage(e));
+ }
+ return diagnostic;
+ }
+
+ private static void updateFailureReason(PendingJobDiagnostic diagnostic) {
+ if (diagnostic.getLackingTaskGroupDiagnostics().isEmpty()) {
+ if (diagnostic.getFailureReason() == null) {
+ diagnostic.setFailureReason(REASON_WAITING);
+ diagnostic.setFailureMessage("Job is waiting for scheduler to
retry");
+ }
+ return;
+ }
+
+ Map<String, Long> reasonCounter =
+ diagnostic.getLackingTaskGroupDiagnostics().stream()
+ .collect(
+ Collectors.groupingBy(
+
PendingTaskGroupDiagnostic::getFailureReason,
+ Collectors.counting()));
+ String dominantReason =
+ reasonCounter.entrySet().stream()
+ .max(Map.Entry.comparingByValue())
+ .map(Map.Entry::getKey)
+ .orElse(REASON_RESOURCE_NOT_ENOUGH);
+ diagnostic.setFailureReason(dominantReason);
+ diagnostic.setFailureMessage(
+ diagnostic.getLackingTaskGroupDiagnostics().stream()
+ .filter(diag ->
dominantReason.equals(diag.getFailureReason()))
+ .map(PendingTaskGroupDiagnostic::getFailureMessage)
+ .filter(message -> message != null &&
!message.isEmpty())
+ .distinct()
+ .collect(Collectors.joining("; ")));
+ }
+
+ private static List<Long> collectBlockingJobs(
+ ResourceManager resourceManager, long jobId, Map<String, String>
tagFilter) {
+ if (resourceManager == null) {
+ return Collections.emptyList();
+ }
+ Map<String, String> tags =
+ tagFilter == null ? Collections.emptyMap() : new
HashMap<>(tagFilter);
+ List<SlotProfile> assignedSlots = Collections.emptyList();
+ try {
+ assignedSlots = resourceManager.getAssignedSlots(tags);
+ } catch (Exception e) {
+ log.warn("Collect assigned slots failed: {}",
ExceptionUtils.getMessage(e));
+ }
+ Set<Long> blocking = new HashSet<>();
+ for (SlotProfile slotProfile : assignedSlots) {
+ long ownerId = slotProfile.getOwnerJobID();
+ if (ownerId > 0 && ownerId != jobId) {
+ blocking.add(ownerId);
+ }
+ }
+ return new ArrayList<>(blocking);
+ }
+
+ public static PendingClusterSnapshot collectClusterSnapshot(
+ ResourceManager resourceManager, Map<String, String> tagFilter) {
+ PendingClusterSnapshot snapshot = new PendingClusterSnapshot();
+ if (resourceManager == null) {
+ return snapshot;
+ }
+ Map<String, String> tags =
+ tagFilter == null ? Collections.emptyMap() : new
HashMap<>(tagFilter);
+ List<SlotProfile> assignedSlots = Collections.emptyList();
+ List<SlotProfile> unassignedSlots = Collections.emptyList();
+ try {
+ assignedSlots = resourceManager.getAssignedSlots(tags);
+ unassignedSlots = resourceManager.getUnassignedSlots(tags);
+ } catch (Exception e) {
+ log.warn("Collect slots info failed: {}",
ExceptionUtils.getMessage(e));
+ }
+ snapshot.setAssignedSlots(assignedSlots.size());
+ snapshot.setFreeSlots(unassignedSlots.size());
+ snapshot.setTotalSlots(assignedSlots.size() + unassignedSlots.size());
+ try {
+ snapshot.setWorkerCount(resourceManager.workerCount(tags));
+ } catch (Exception e) {
+ log.warn("Collect worker count failed: {}",
ExceptionUtils.getMessage(e));
+ }
+ snapshot.setWorkers(buildWorkerSnapshots(resourceManager, tags));
+ return snapshot;
+ }
+
+ private static List<WorkerResourceDiagnostic> buildWorkerSnapshots(
+ ResourceManager resourceManager, Map<String, String> tagFilter) {
+ if (resourceManager == null) {
+ return Collections.emptyList();
+ }
+ Map<Address, WorkerProfile> registerWorker =
+ Optional.ofNullable(resourceManager.getRegisterWorker())
+ .map(HashMap::new)
+ .orElseGet(HashMap::new);
+ return registerWorker.values().stream()
+ .map(worker -> convertWorker(worker, tagFilter))
+ .collect(Collectors.toList());
+ }
+
+ /**
+ * TODO The current tagFilter does not actually filter. When the cluster
is particularly large,
+ * tagFilter filtering should be supported, and it will be supported in
the future
+ */
+ private static WorkerResourceDiagnostic convertWorker(
+ WorkerProfile workerProfile, Map<String, String> tagFilter) {
+ WorkerResourceDiagnostic diagnostic = new WorkerResourceDiagnostic();
+ if (workerProfile == null) {
+ return diagnostic;
+ }
+ Address address = workerProfile.getAddress();
+ diagnostic.setAddress(address == null ? "UNKNOWN" :
address.toString());
+ if (workerProfile.getAttributes() != null) {
+ diagnostic.setTags(new HashMap<>(workerProfile.getAttributes()));
+ } else {
+ diagnostic.setTags(Collections.emptyMap());
+ }
+ diagnostic.setDynamicSlot(workerProfile.isDynamicSlot());
+ int assignedSlots =
+ workerProfile.getAssignedSlots() == null
+ ? 0
+ : workerProfile.getAssignedSlots().length;
+ int unassignedSlots =
+ workerProfile.getUnassignedSlots() == null
+ ? 0
+ : workerProfile.getUnassignedSlots().length;
+ diagnostic.setTotalSlots(assignedSlots + unassignedSlots);
+ diagnostic.setFreeSlots(unassignedSlots);
+ SystemLoadInfo systemLoadInfo = workerProfile.getSystemLoadInfo();
+ if (systemLoadInfo != null) {
+ diagnostic.setCpuUsage(systemLoadInfo.getCpuPercentage());
+ diagnostic.setMemUsage(systemLoadInfo.getMemPercentage());
+ }
+ if (workerProfile.getAssignedSlots() != null) {
+ List<Long> runningJobs =
+ java.util.Arrays.stream(workerProfile.getAssignedSlots())
+ .filter(slot -> slot != null &&
slot.getOwnerJobID() > 0)
+ .map(SlotProfile::getOwnerJobID)
+ .distinct()
+ .collect(Collectors.toList());
+ diagnostic.setRunningJobIds(runningJobs);
+ }
+ return diagnostic;
+ }
+}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/diagnostic/PendingJobDiagnostic.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/diagnostic/PendingJobDiagnostic.java
new file mode 100644
index 0000000000..be7e945d9e
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/diagnostic/PendingJobDiagnostic.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.diagnostic;
+
+import org.apache.seatunnel.engine.common.job.JobStatus;
+import org.apache.seatunnel.engine.server.execution.PendingSourceState;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class PendingJobDiagnostic implements Serializable {
+ private long jobId;
+ private String jobName;
+ private PendingSourceState pendingSourceState;
+ private JobStatus jobStatus;
+ private long enqueueTimestamp;
+ private long checkTime;
+ private long waitDurationMs;
+ private int checkCount;
+ private int totalTaskGroups;
+ private int allocatedTaskGroups;
+ private int lackingTaskGroups;
+ private String failureReason;
+ private String failureMessage;
+ private Map<String, String> tagFilter;
+ private List<Long> blockingJobIds = new ArrayList<>();
+ private List<PendingPipelineDiagnostic> pipelines = new ArrayList<>();
+ private List<PendingTaskGroupDiagnostic> lackingTaskGroupDiagnostics = new
ArrayList<>();
+}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/diagnostic/PendingJobsResponse.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/diagnostic/PendingJobsResponse.java
new file mode 100644
index 0000000000..a05fa4faaa
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/diagnostic/PendingJobsResponse.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.diagnostic;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class PendingJobsResponse implements Serializable {
+ private PendingQueueSummary queueSummary;
+ private PendingClusterSnapshot clusterSnapshot;
+ private List<PendingJobDiagnostic> pendingJobs = new ArrayList<>();
+}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/diagnostic/PendingPipelineDiagnostic.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/diagnostic/PendingPipelineDiagnostic.java
new file mode 100644
index 0000000000..4b3bc66c6b
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/diagnostic/PendingPipelineDiagnostic.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.diagnostic;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class PendingPipelineDiagnostic implements Serializable {
+ private int pipelineId;
+ private String pipelineName;
+ private int totalTaskGroups;
+ private int allocatedTaskGroups;
+ private int lackingTaskGroups;
+ private List<PendingTaskGroupDiagnostic> taskGroupDiagnostics = new
ArrayList<>();
+}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/diagnostic/PendingQueueSummary.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/diagnostic/PendingQueueSummary.java
new file mode 100644
index 0000000000..61d6223463
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/diagnostic/PendingQueueSummary.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.diagnostic;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.io.Serializable;
+
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class PendingQueueSummary implements Serializable {
+ private int size;
+ private String scheduleStrategy;
+ private long oldestEnqueueTimestamp;
+ private long newestEnqueueTimestamp;
+ private int lackingTaskGroups;
+}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/diagnostic/PendingTaskGroupDiagnostic.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/diagnostic/PendingTaskGroupDiagnostic.java
new file mode 100644
index 0000000000..f3a0404114
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/diagnostic/PendingTaskGroupDiagnostic.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.diagnostic;
+
+import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.io.Serializable;
+
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class PendingTaskGroupDiagnostic implements Serializable {
+
+ private TaskGroupLocation taskGroupLocation;
+ private String taskFullName;
+ private boolean allocated;
+ private String failureReason;
+ private String failureMessage;
+}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/diagnostic/WorkerResourceDiagnostic.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/diagnostic/WorkerResourceDiagnostic.java
new file mode 100644
index 0000000000..17905516bb
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/diagnostic/WorkerResourceDiagnostic.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.diagnostic;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class WorkerResourceDiagnostic implements Serializable {
+ private String address;
+ private Map<String, String> tags;
+ private int totalSlots;
+ private int freeSlots;
+ private boolean dynamicSlot;
+ private Double cpuUsage;
+ private Double memUsage;
+ private List<Long> runningJobIds;
+}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/PendingJobInfo.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/PendingJobInfo.java
index c17ab3284e..a7d7476d1d 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/PendingJobInfo.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/PendingJobInfo.java
@@ -18,15 +18,24 @@
package org.apache.seatunnel.engine.server.execution;
+import org.apache.seatunnel.engine.server.diagnostic.PendingJobDiagnostic;
import org.apache.seatunnel.engine.server.master.JobMaster;
+import java.util.concurrent.atomic.AtomicInteger;
+
public class PendingJobInfo {
private final PendingSourceState pendingSourceState;
private final JobMaster jobMaster;
+ private final long enqueueTimestamp;
+ private final AtomicInteger checkTimes = new AtomicInteger();
+ private volatile long lastCheckTime;
+ private volatile PendingJobDiagnostic lastSnapshot;
public PendingJobInfo(PendingSourceState pendingSourceState, JobMaster
jobMaster) {
this.pendingSourceState = pendingSourceState;
this.jobMaster = jobMaster;
+ this.enqueueTimestamp = System.currentTimeMillis();
+ this.lastCheckTime = enqueueTimestamp;
}
public PendingSourceState getPendingSourceState() {
@@ -40,4 +49,30 @@ public class PendingJobInfo {
public Long getJobId() {
return jobMaster.getJobId();
}
+
+ public long getEnqueueTimestamp() {
+ return enqueueTimestamp;
+ }
+
+ public long getLastCheckTime() {
+ return lastCheckTime;
+ }
+
+ public int getCheckTimes() {
+ return checkTimes.get();
+ }
+
+ public PendingJobDiagnostic getLastSnapshot() {
+ return lastSnapshot;
+ }
+
+ public void recordSnapshot(PendingJobDiagnostic snapshot) {
+ if (snapshot == null) {
+ return;
+ }
+ this.lastSnapshot = snapshot;
+ this.lastCheckTime = snapshot.getCheckTime();
+ int current = this.checkTimes.incrementAndGet();
+ snapshot.setCheckCount(current);
+ }
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/opeartion/GetPendingJobsOperation.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/opeartion/GetPendingJobsOperation.java
new file mode 100644
index 0000000000..70f6e2d412
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/opeartion/GetPendingJobsOperation.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.resourcemanager.opeartion;
+
+import org.apache.seatunnel.engine.server.SeaTunnelServer;
+import org.apache.seatunnel.engine.server.diagnostic.PendingJobsResponse;
+import
org.apache.seatunnel.engine.server.serializable.ResourceDataSerializerHook;
+
+import com.hazelcast.nio.ObjectDataInput;
+import com.hazelcast.nio.ObjectDataOutput;
+import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
+import com.hazelcast.spi.impl.operationservice.Operation;
+
+import java.io.IOException;
+import java.util.Map;
+
+public class GetPendingJobsOperation extends Operation implements
IdentifiedDataSerializable {
+
+ private Map<String, String> tags;
+ private Long jobId;
+ private int limit;
+ private PendingJobsResponse response;
+
+ public GetPendingJobsOperation() {}
+
+ public GetPendingJobsOperation(Map<String, String> tags, Long jobId, int
limit) {
+ this.tags = tags;
+ this.jobId = jobId;
+ this.limit = limit;
+ }
+
+ @Override
+ public void run() throws Exception {
+ SeaTunnelServer server = getService();
+ response = server.getCoordinatorService().getPendingJobs(tags, jobId,
limit);
+ }
+
+ @Override
+ public Object getResponse() {
+ return response;
+ }
+
+ @Override
+ public int getFactoryId() {
+ return ResourceDataSerializerHook.FACTORY_ID;
+ }
+
+ @Override
+ public int getClassId() {
+ return ResourceDataSerializerHook.GET_PENDING_JOBS_TYPE;
+ }
+
+ @Override
+ public String getServiceName() {
+ return SeaTunnelServer.SERVICE_NAME;
+ }
+
+ @Override
+ protected void writeInternal(ObjectDataOutput out) throws IOException {
+ super.writeInternal(out);
+ out.writeObject(tags);
+ out.writeObject(jobId);
+ out.writeInt(limit);
+ }
+
+ @Override
+ protected void readInternal(ObjectDataInput in) throws IOException {
+ super.readInternal(in);
+ tags = in.readObject();
+ jobId = in.readObject();
+ limit = in.readInt();
+ }
+}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java
index f3e28e1329..8784f31d98 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java
@@ -48,6 +48,7 @@ public class RestConstant {
public static final String ERROR_MSG = "errorMsg";
public static final String METRICS = "metrics";
+ public static final String LIMIT = "limit";
public static final String TABLE_SOURCE_RECEIVED_COUNT =
"TableSourceReceivedCount";
public static final String TABLE_SINK_WRITE_COUNT = "TableSinkWriteCount";
@@ -62,6 +63,8 @@ public class RestConstant {
public static final String CONTEXT_PATH = "/hazelcast/rest/maps";
public static final String INSTANCE_CONTEXT_PATH =
"/hazelcast/rest/instance";
+ public static final String PRETTY = "pretty";
+
// api path start
public static final String REST_URL_OVERVIEW = "/overview";
public static final String REST_URL_RUNNING_JOBS = "/running-jobs";
@@ -82,6 +85,7 @@ public class RestConstant {
public static final String REST_URL_STOP_JOB = "/stop-job";
public static final String REST_URL_STOP_JOBS = "/stop-jobs";
public static final String REST_URL_UPDATE_TAGS = "/update-tags";
+ public static final String REST_URL_PENDING_JOBS = "/pending-jobs";
// Get All Nodes Log
public static final String REST_URL_LOGS = "/logs";
// Get Current Node Log
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/PendingJobsService.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/PendingJobsService.java
new file mode 100644
index 0000000000..c08894b0ee
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/PendingJobsService.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.rest.service;
+
+import org.apache.seatunnel.engine.server.SeaTunnelServer;
+import org.apache.seatunnel.engine.server.diagnostic.PendingJobsResponse;
+import
org.apache.seatunnel.engine.server.resourcemanager.opeartion.GetPendingJobsOperation;
+import org.apache.seatunnel.engine.server.utils.NodeEngineUtil;
+
+import com.hazelcast.spi.impl.NodeEngineImpl;
+
+import java.util.Map;
+
+public class PendingJobsService extends BaseService {
+
+ public PendingJobsService(NodeEngineImpl nodeEngine) {
+ super(nodeEngine);
+ }
+
+ public PendingJobsResponse getPendingJobs(Map<String, String> tags, Long
jobId, int limit) {
+ SeaTunnelServer seaTunnelServer = getSeaTunnelServer(true);
+ if (seaTunnelServer == null) {
+ return (PendingJobsResponse)
+ NodeEngineUtil.sendOperationToMasterNode(
+ nodeEngine, new
GetPendingJobsOperation(tags, jobId, limit))
+ .join();
+ }
+ return seaTunnelServer.getCoordinatorService().getPendingJobs(tags,
jobId, limit);
+ }
+}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/PendingJobsServlet.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/PendingJobsServlet.java
new file mode 100644
index 0000000000..87d06f48d9
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/PendingJobsServlet.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.rest.servlet;
+
+import org.apache.seatunnel.engine.server.diagnostic.PendingJobsResponse;
+import org.apache.seatunnel.engine.server.rest.RestConstant;
+import org.apache.seatunnel.engine.server.rest.service.PendingJobsService;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonPrimitive;
+import com.hazelcast.spi.impl.NodeEngineImpl;
+import lombok.extern.slf4j.Slf4j;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+@Slf4j
+public class PendingJobsServlet extends BaseServlet {
+
+ private final PendingJobsService pendingJobsService;
+ private static final Set<String> TIMESTAMP_FIELDS =
+ new HashSet<>(
+ Arrays.asList(
+ "oldestEnqueueTimestamp",
+ "newestEnqueueTimestamp",
+ "enqueueTimestamp",
+ "checkTime"));
+ private static final DateTimeFormatter PRETTY_TIME_FORMATTER =
+ DateTimeFormatter.ofPattern("yyyy-MM-dd
HH:mm:ss").withZone(ZoneId.systemDefault());
+ private static final Gson PRETTY_GSON = new
GsonBuilder().setPrettyPrinting().create();
+
+ public PendingJobsServlet(NodeEngineImpl nodeEngine) {
+ super(nodeEngine);
+ this.pendingJobsService = new PendingJobsService(nodeEngine);
+ }
+
+ @Override
+ protected void doGet(HttpServletRequest req, HttpServletResponse resp)
+ throws ServletException, IOException {
+
+ Map<String, String> params = new HashMap<>(getParameterMap(req));
+ Long jobId = null;
+ int limit = 0;
+ boolean pretty = false;
+ if (params.containsKey(RestConstant.JOB_ID)) {
+ try {
+ jobId = Long.parseLong(params.remove(RestConstant.JOB_ID));
+ } catch (NumberFormatException e) {
+ resp.sendError(HttpServletResponse.SC_BAD_REQUEST, "Invalid
jobId");
+ return;
+ }
+ }
+
+ if (params.containsKey(RestConstant.LIMIT)) {
+ try {
+ limit = Integer.parseInt(params.remove(RestConstant.LIMIT));
+ } catch (NumberFormatException e) {
+ resp.sendError(HttpServletResponse.SC_BAD_REQUEST, "Invalid
limit");
+ return;
+ }
+ }
+
+ if (params.containsKey(RestConstant.PRETTY)) {
+ pretty = Boolean.parseBoolean(params.remove(RestConstant.PRETTY));
+ }
+
+ PendingJobsResponse response =
pendingJobsService.getPendingJobs(params, jobId, limit);
+ if (pretty) {
+ writePrettyResponse(resp, response);
+ } else {
+ writeJson(resp, response);
+ }
+ }
+
+ private void writePrettyResponse(HttpServletResponse resp,
PendingJobsResponse response)
+ throws IOException {
+ JsonElement tree = PRETTY_GSON.toJsonTree(response);
+ formatTimestampFields(tree);
+ resp.setCharacterEncoding("UTF-8");
+ resp.setContentType("application/json; charset=UTF-8");
+ resp.getWriter().write(PRETTY_GSON.toJson(tree));
+ }
+
+ private void formatTimestampFields(JsonElement element) {
+ if (element == null || element.isJsonNull()) {
+ return;
+ }
+ if (element.isJsonObject()) {
+ JsonObject object = element.getAsJsonObject();
+ for (Map.Entry<String, JsonElement> entry : object.entrySet()) {
+ JsonElement value = entry.getValue();
+ if (shouldFormatTimestamp(entry.getKey(), value)) {
+ long timestamp = value.getAsLong();
+ object.addProperty(entry.getKey(),
formatTimestamp(timestamp));
+ } else {
+ formatTimestampFields(value);
+ }
+ }
+ } else if (element.isJsonArray()) {
+ JsonArray array = element.getAsJsonArray();
+ for (JsonElement child : array) {
+ formatTimestampFields(child);
+ }
+ }
+ }
+
+ private boolean shouldFormatTimestamp(String key, JsonElement element) {
+ if (!TIMESTAMP_FIELDS.contains(key) || element == null) {
+ return false;
+ }
+ if (!element.isJsonPrimitive()) {
+ return false;
+ }
+ JsonPrimitive primitive = element.getAsJsonPrimitive();
+ return primitive.isNumber();
+ }
+
+ private String formatTimestamp(long timestamp) {
+ return PRETTY_TIME_FORMATTER.format(Instant.ofEpochMilli(timestamp));
+ }
+}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/ResourceDataSerializerHook.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/ResourceDataSerializerHook.java
index 09f9a4550c..5e5c12d706 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/ResourceDataSerializerHook.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/ResourceDataSerializerHook.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.engine.server.serializable;
import
org.apache.seatunnel.engine.common.serializeable.SeaTunnelFactoryIdConstant;
import
org.apache.seatunnel.engine.server.resourcemanager.opeartion.GetOverviewOperation;
+import
org.apache.seatunnel.engine.server.resourcemanager.opeartion.GetPendingJobsOperation;
import
org.apache.seatunnel.engine.server.resourcemanager.opeartion.ReleaseSlotOperation;
import
org.apache.seatunnel.engine.server.resourcemanager.opeartion.RequestSlotOperation;
import
org.apache.seatunnel.engine.server.resourcemanager.opeartion.ResetResourceOperation;
@@ -53,6 +54,8 @@ public class ResourceDataSerializerHook implements
DataSerializerHook {
public static final int REQUEST_SLOT_INFO_TYPE = 9;
+ public static final int GET_PENDING_JOBS_TYPE = 10;
+
public static final int FACTORY_ID =
FactoryIdHelper.getFactoryId(
SeaTunnelFactoryIdConstant.SEATUNNEL_RESOURCE_DATA_SERIALIZER_FACTORY,
@@ -91,6 +94,8 @@ public class ResourceDataSerializerHook implements
DataSerializerHook {
return new SyncWorkerProfileOperation();
case REQUEST_SLOT_INFO_TYPE:
return new GetOverviewOperation();
+ case GET_PENDING_JOBS_TYPE:
+ return new GetPendingJobsOperation();
default:
throw new IllegalArgumentException("Unknown type id " +
typeId);
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/diagnostic/PendingDiagnosticsCollectorTest.java
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/diagnostic/PendingDiagnosticsCollectorTest.java
new file mode 100644
index 0000000000..1b9fd5e58a
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/diagnostic/PendingDiagnosticsCollectorTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.diagnostic;
+
+import org.apache.seatunnel.engine.common.job.JobStatus;
+import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
+import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
+import org.apache.seatunnel.engine.server.dag.physical.PhysicalPlan;
+import org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex;
+import org.apache.seatunnel.engine.server.dag.physical.SubPlan;
+import org.apache.seatunnel.engine.server.execution.PendingJobInfo;
+import org.apache.seatunnel.engine.server.execution.PendingSourceState;
+import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
+import org.apache.seatunnel.engine.server.master.JobMaster;
+import org.apache.seatunnel.engine.server.resourcemanager.ResourceManager;
+import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+public class PendingDiagnosticsCollectorTest {
+
+ @Test
+ public void testCollectJobDiagnosticWithFailures() {
+ JobMaster jobMaster = Mockito.mock(JobMaster.class);
+ Mockito.when(jobMaster.getJobId()).thenReturn(1000L);
+ JobImmutableInformation jobImmutableInformation =
+ Mockito.mock(JobImmutableInformation.class);
+
Mockito.when(jobImmutableInformation.getJobName()).thenReturn("test_job");
+
Mockito.when(jobMaster.getJobImmutableInformation()).thenReturn(jobImmutableInformation);
+ Mockito.when(jobMaster.getJobStatus()).thenReturn(JobStatus.PENDING);
+
+ PhysicalPlan physicalPlan = Mockito.mock(PhysicalPlan.class);
+ Mockito.when(jobMaster.getPhysicalPlan()).thenReturn(physicalPlan);
+
+ SubPlan subPlan = Mockito.mock(SubPlan.class);
+ Mockito.when(subPlan.getPipelineId()).thenReturn(1);
+ Mockito.when(subPlan.getPipelineFullName()).thenReturn("pipeline-1");
+
+ PhysicalVertex vertexSuccess = Mockito.mock(PhysicalVertex.class);
+ TaskGroupLocation locationSuccess = new TaskGroupLocation(1000L, 1,
1L);
+
Mockito.when(vertexSuccess.getTaskGroupLocation()).thenReturn(locationSuccess);
+
Mockito.when(vertexSuccess.getTaskFullName()).thenReturn("task-success");
+
+ PhysicalVertex vertexFailA = Mockito.mock(PhysicalVertex.class);
+ TaskGroupLocation locationFailA = new TaskGroupLocation(1000L, 1, 2L);
+
Mockito.when(vertexFailA.getTaskGroupLocation()).thenReturn(locationFailA);
+ Mockito.when(vertexFailA.getTaskFullName()).thenReturn("task-fail-a");
+
+ PhysicalVertex vertexFailB = Mockito.mock(PhysicalVertex.class);
+ TaskGroupLocation locationFailB = new TaskGroupLocation(1000L, 1, 3L);
+
Mockito.when(vertexFailB.getTaskGroupLocation()).thenReturn(locationFailB);
+ Mockito.when(vertexFailB.getTaskFullName()).thenReturn("task-fail-b");
+
+
Mockito.when(subPlan.getCoordinatorVertexList()).thenReturn(Collections.emptyList());
+ Mockito.when(subPlan.getPhysicalVertexList())
+ .thenReturn(Arrays.asList(vertexSuccess, vertexFailA,
vertexFailB));
+
Mockito.when(physicalPlan.getPipelineList()).thenReturn(Collections.singletonList(subPlan));
+
+ Map<TaskGroupLocation, CompletableFuture<SlotProfile>> futures = new
HashMap<>();
+ CompletableFuture<SlotProfile> successFuture =
+
CompletableFuture.completedFuture(Mockito.mock(SlotProfile.class));
+ futures.put(locationSuccess, successFuture);
+
+ CompletableFuture<SlotProfile> failFutureA = new CompletableFuture<>();
+ failFutureA.completeExceptionally(new RuntimeException("no slot
available"));
+ futures.put(locationFailA, failFutureA);
+
+ CompletableFuture<SlotProfile> failFutureB = new CompletableFuture<>();
+ failFutureB.completeExceptionally(new RuntimeException("worker busy"));
+ futures.put(locationFailB, failFutureB);
+
+
Mockito.when(physicalPlan.getPreApplyResourceFutures()).thenReturn(futures);
+
+ PendingJobInfo pendingJobInfo = new
PendingJobInfo(PendingSourceState.SUBMIT, jobMaster);
+
+ ResourceManager resourceManager = Mockito.mock(ResourceManager.class);
+ SlotProfile blockingSlot = Mockito.mock(SlotProfile.class);
+ Mockito.when(blockingSlot.getOwnerJobID()).thenReturn(2000L);
+ Mockito.when(resourceManager.getAssignedSlots(Mockito.anyMap()))
+ .thenReturn(Collections.singletonList(blockingSlot));
+
+ PendingJobDiagnostic diagnostic =
+ PendingDiagnosticsCollector.collectJobDiagnostic(
+ pendingJobInfo, Collections.emptyMap(),
resourceManager);
+
+ Assertions.assertEquals(2, diagnostic.getLackingTaskGroups());
+ Assertions.assertEquals("REQUEST_FAILED",
diagnostic.getFailureReason());
+ Assertions.assertEquals(1, diagnostic.getBlockingJobIds().size());
+ Assertions.assertEquals(3,
diagnostic.getPipelines().get(0).getTotalTaskGroups());
+ Assertions.assertEquals(2,
diagnostic.getPipelines().get(0).getLackingTaskGroups());
+ }
+}