This is an automated email from the ASF dual-hosted git repository.
fcsaky pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 02106293bb8 [FLINK-38717][runtime] Display the assigned task numbers
for TaskManagers on the UI
02106293bb8 is described below
commit 02106293bb827d7c2d87cb0cd0b3e56db39848ac
Author: Yuepeng Pan <[email protected]>
AuthorDate: Tue Dec 30 14:35:14 2025 +0800
[FLINK-38717][runtime] Display the assigned task numbers for TaskManagers
on the UI
---
.../shortcodes/generated/rest_v1_dispatcher.html | 9 ++
.../src/test/resources/rest_api_v1.snapshot | 9 ++
.../src/app/interfaces/task-manager.ts | 3 +
.../list/task-manager-list.component.html | 2 +
.../list/task-manager-list.component.ts | 1 +
.../metrics/task-manager-metrics.component.html | 2 +
.../status/task-manager-status.component.html | 5 +-
.../runtime/resourcemanager/ResourceManager.java | 2 +
.../ClusterResourceStatisticsProvider.java | 3 +
.../slotmanager/DefaultSlotStatusSyncer.java | 12 ++-
.../slotmanager/FineGrainedSlotManager.java | 12 ++-
.../slotmanager/FineGrainedTaskManagerSlot.java | 12 +++
.../slotmanager/FineGrainedTaskManagerTracker.java | 17 ++++
.../resourcemanager/slotmanager/SlotManager.java | 3 +
.../slotmanager/TaskManagerSlotInformation.java | 14 +++
.../rest/messages/taskmanager/SlotInfo.java | 24 ++++-
.../taskmanager/TaskManagerDetailsInfo.java | 5 +
.../rest/messages/taskmanager/TaskManagerInfo.java | 16 +++
.../flink/runtime/taskexecutor/SlotStatus.java | 23 ++++-
.../flink/runtime/taskexecutor/slot/TaskSlot.java | 4 +
.../taskexecutor/slot/TaskSlotTableImpl.java | 12 +--
.../ResourceManagerTaskExecutorTest.java | 107 +++++++++++++++++++++
.../resourcemanager/ResourceManagerTest.java | 1 +
.../slotmanager/TestingSlotManager.java | 5 +
.../taskmanager/TaskManagerDetailsHandlerTest.java | 1 +
.../taskmanager/TaskManagerDetailsInfoTest.java | 3 +-
.../messages/taskmanager/TaskManagerInfoTest.java | 1 +
27 files changed, 288 insertions(+), 20 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
b/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
index a36b1aeaf55..2339c5233f4 100644
--- a/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
+++ b/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
@@ -5240,6 +5240,9 @@ Using 'curl' you can upload a jar via 'curl -X POST -H
"Expect:" -F "jarfile=@pa
"freeSlots" : {
"type" : "integer"
},
+ "assignedTasks" : {
+ "type" : "integer"
+ },
"hardware" : {
"type" : "object",
"id" :
"urn:jsonschema:org:apache:flink:runtime:instance:HardwareDescription",
@@ -5441,6 +5444,9 @@ Using 'curl' you can upload a jar via 'curl -X POST -H
"Expect:" -F "jarfile=@pa
"jobId" : {
"type" : "any"
},
+ "assignedTasks" : {
+ "type" : "integer"
+ },
"resource" : {
"type" : "object",
"$ref" :
"urn:jsonschema:org:apache:flink:runtime:rest:messages:ResourceProfileInfo"
@@ -5461,6 +5467,9 @@ Using 'curl' you can upload a jar via 'curl -X POST -H
"Expect:" -F "jarfile=@pa
"freeSlots" : {
"type" : "integer"
},
+ "assignedTasks" : {
+ "type" : "integer"
+ },
"hardware" : {
"type" : "object",
"id" :
"urn:jsonschema:org:apache:flink:runtime:instance:HardwareDescription",
diff --git a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
index 1444b47b684..406ebe85ef0 100644
--- a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
+++ b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
@@ -4093,6 +4093,9 @@
"freeSlots" : {
"type" : "integer"
},
+ "assignedTasks" : {
+ "type" : "integer"
+ },
"totalResource" : {
"type" : "object",
"id" :
"urn:jsonschema:org:apache:flink:runtime:rest:messages:ResourceProfileInfo",
@@ -4255,6 +4258,9 @@
"freeSlots" : {
"type" : "integer"
},
+ "assignedTasks" : {
+ "type" : "integer"
+ },
"totalResource" : {
"type" : "object",
"id" :
"urn:jsonschema:org:apache:flink:runtime:rest:messages:ResourceProfileInfo",
@@ -4352,6 +4358,9 @@
"jobId" : {
"type" : "any"
},
+ "assignedTasks" : {
+ "type" : "integer"
+ },
"resource" : {
"type" : "object",
"$ref" :
"urn:jsonschema:org:apache:flink:runtime:rest:messages:ResourceProfileInfo"
diff --git a/flink-runtime-web/web-dashboard/src/app/interfaces/task-manager.ts
b/flink-runtime-web/web-dashboard/src/app/interfaces/task-manager.ts
index 0c56202b673..51f0705026e 100644
--- a/flink-runtime-web/web-dashboard/src/app/interfaces/task-manager.ts
+++ b/flink-runtime-web/web-dashboard/src/app/interfaces/task-manager.ts
@@ -27,6 +27,7 @@ export interface TaskManagerDetail {
timeSinceLastHeartbeat: number;
slotsNumber: number;
freeSlots: number;
+ assignedTasks: number;
hardware: Hardware;
metrics: Metrics;
memoryConfiguration: MemoryConfiguration;
@@ -38,6 +39,7 @@ export interface TaskManagerDetail {
export interface AllocatedSlot {
jobId: string;
+ assignedTasks: number;
resource: Resources;
}
@@ -67,6 +69,7 @@ export interface TaskManagersItem {
timeSinceLastHeartbeat: number;
slotsNumber: number;
freeSlots: number;
+ assignedTasks: number;
hardware: Hardware;
blocked?: boolean;
}
diff --git
a/flink-runtime-web/web-dashboard/src/app/pages/task-manager/list/task-manager-list.component.html
b/flink-runtime-web/web-dashboard/src/app/pages/task-manager/list/task-manager-list.component.html
index 3d6ca14a058..89042fa40f1 100644
---
a/flink-runtime-web/web-dashboard/src/app/pages/task-manager/list/task-manager-list.component.html
+++
b/flink-runtime-web/web-dashboard/src/app/pages/task-manager/list/task-manager-list.component.html
@@ -32,6 +32,7 @@
<th [nzSortFn]="sortHeartBeatFn" [nzWidth]="'160px'">Last
Heartbeat</th>
<th [nzSortFn]="sortSlotsNumberFn" [nzWidth]="'90px'">All Slots</th>
<th [nzSortFn]="sortFreeSlotsFn" [nzWidth]="'100px'">Free Slots</th>
+ <th [nzSortFn]="sortAssignedTasksFn" [nzWidth]="'100px'">Assigned
Tasks</th>
<th [nzSortFn]="sortCpuCoresFn" [nzWidth]="'110px'">CPU Cores</th>
<th [nzSortFn]="sortPhysicalMemoryFn" [nzWidth]="'120px'">Physical
MEM</th>
<th [nzSortFn]="sortFreeMemoryFn" [nzWidth]="'130px'">JVM Heap
Size</th>
@@ -53,6 +54,7 @@
<td>{{ manager.timeSinceLastHeartbeat | date: 'yyyy-MM-dd HH:mm:ss'
}}</td>
<td>{{ manager.slotsNumber }}</td>
<td>{{ manager.freeSlots }}</td>
+ <td>{{ manager.assignedTasks }}</td>
<td>{{ manager.hardware.cpuCores }}</td>
<td [attr.title]="manager.hardware.physicalMemory + ' bytes'">
{{ manager.hardware.physicalMemory | humanizeBytes }}
diff --git
a/flink-runtime-web/web-dashboard/src/app/pages/task-manager/list/task-manager-list.component.ts
b/flink-runtime-web/web-dashboard/src/app/pages/task-manager/list/task-manager-list.component.ts
index 420c6de4e59..93bf4c76185 100644
---
a/flink-runtime-web/web-dashboard/src/app/pages/task-manager/list/task-manager-list.component.ts
+++
b/flink-runtime-web/web-dashboard/src/app/pages/task-manager/list/task-manager-list.component.ts
@@ -49,6 +49,7 @@ export class TaskManagerListComponent implements OnInit,
OnDestroy {
public readonly sortHeartBeatFn = createSortFn(item =>
item.timeSinceLastHeartbeat);
public readonly sortSlotsNumberFn = createSortFn(item => item.slotsNumber);
public readonly sortFreeSlotsFn = createSortFn(item => item.freeSlots);
+ public readonly sortAssignedTasksFn = createSortFn(item =>
item.assignedTasks);
public readonly sortCpuCoresFn = createSortFn(item =>
item.hardware?.cpuCores);
public readonly sortPhysicalMemoryFn = createSortFn(item =>
item.hardware?.physicalMemory);
public readonly sortFreeMemoryFn = createSortFn(item =>
item.hardware?.freeMemory);
diff --git
a/flink-runtime-web/web-dashboard/src/app/pages/task-manager/metrics/task-manager-metrics.component.html
b/flink-runtime-web/web-dashboard/src/app/pages/task-manager/metrics/task-manager-metrics.component.html
index b4a41b521f2..07f735be986 100644
---
a/flink-runtime-web/web-dashboard/src/app/pages/task-manager/metrics/task-manager-metrics.component.html
+++
b/flink-runtime-web/web-dashboard/src/app/pages/task-manager/metrics/task-manager-metrics.component.html
@@ -396,6 +396,7 @@
<tr>
<th>#</th>
<th>Job ID</th>
+ <th>Assigned Tasks</th>
<th>CPU (cores)</th>
<th>Task Heap memory (MB)</th>
<th>Task Off-Heap memory (MB)</th>
@@ -409,6 +410,7 @@
<strong>{{ i | number }}</strong>
</td>
<td>{{ slot.jobId }}</td>
+ <td>{{ slot.assignedTasks }}</td>
<td>{{ slot.resource.cpuCores | number }}</td>
<td>{{ slot.resource.taskHeapMemory | number }}</td>
<td>{{ slot.resource.taskOffHeapMemory | number }}</td>
diff --git
a/flink-runtime-web/web-dashboard/src/app/pages/task-manager/status/task-manager-status.component.html
b/flink-runtime-web/web-dashboard/src/app/pages/task-manager/status/task-manager-status.component.html
index c88b16fbb85..46e954d7082 100644
---
a/flink-runtime-web/web-dashboard/src/app/pages/task-manager/status/task-manager-status.component.html
+++
b/flink-runtime-web/web-dashboard/src/app/pages/task-manager/status/task-manager-status.component.html
@@ -22,12 +22,15 @@
<flink-blocked-badge
*ngIf="taskManagerDetail?.blocked"></flink-blocked-badge>
</div>
<nz-descriptions *ngIf="taskManagerDetail" nzBordered nzSize="small">
- <nz-descriptions-item [nzSpan]="2" nzTitle="Path">
+ <nz-descriptions-item [nzSpan]="1" nzTitle="Path">
{{ taskManagerDetail.path }}
</nz-descriptions-item>
<nz-descriptions-item [nzSpan]="1" nzTitle="Free/All Slots">
{{ taskManagerDetail.freeSlots }} / {{ taskManagerDetail.slotsNumber }}
</nz-descriptions-item>
+ <nz-descriptions-item [nzSpan]="1" nzTitle="Assigned Tasks">
+ {{ taskManagerDetail.assignedTasks }}
+ </nz-descriptions-item>
<nz-descriptions-item [nzSpan]="1" nzTitle="Last Heartbeat">
{{ taskManagerDetail.timeSinceLastHeartbeat | date: 'yyyy-MM-dd
HH:mm:ss' }}
</nz-descriptions-item>
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index b3f9404e3ee..13af32c8884 100755
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -687,6 +687,7 @@ public abstract class ResourceManager<WorkerType extends
ResourceIDRetrievable>
taskManagerHeartbeatManager.getLastHeartbeatFrom(resourceId),
slotManager.getNumberRegisteredSlotsOf(taskExecutor.getInstanceID()),
slotManager.getNumberFreeSlotsOf(taskExecutor.getInstanceID()),
+
slotManager.getAssignedTasksOf(taskExecutor.getInstanceID()),
slotManager.getRegisteredResourceOf(taskExecutor.getInstanceID()),
slotManager.getFreeResourceOf(taskExecutor.getInstanceID()),
taskExecutor.getHardwareDescription(),
@@ -717,6 +718,7 @@ public abstract class ResourceManager<WorkerType extends
ResourceIDRetrievable>
taskManagerHeartbeatManager.getLastHeartbeatFrom(resourceId),
slotManager.getNumberRegisteredSlotsOf(instanceId),
slotManager.getNumberFreeSlotsOf(instanceId),
+ slotManager.getAssignedTasksOf(instanceId),
slotManager.getRegisteredResourceOf(instanceId),
slotManager.getFreeResourceOf(instanceId),
taskExecutor.getHardwareDescription(),
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ClusterResourceStatisticsProvider.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ClusterResourceStatisticsProvider.java
index f3b5c4873a0..80539808c93 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ClusterResourceStatisticsProvider.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ClusterResourceStatisticsProvider.java
@@ -24,6 +24,9 @@ import org.apache.flink.runtime.instance.InstanceID;
/** Provides statistics of cluster resources. */
public interface ClusterResourceStatisticsProvider {
+ /** Get total number of tasks assigned to the current instance. slots. */
+ int getAssignedTasks(InstanceID instanceId);
+
/** Get total number of registered slots. */
int getNumberRegisteredSlots();
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultSlotStatusSyncer.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultSlotStatusSyncer.java
index 6b526cf391e..e957eb3b644 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultSlotStatusSyncer.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultSlotStatusSyncer.java
@@ -41,6 +41,7 @@ import javax.annotation.Nullable;
import java.time.Duration;
import java.util.HashSet;
+import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
@@ -295,16 +296,15 @@ public class DefaultSlotStatusSyncer implements
SlotStatusSyncer {
private boolean syncAllocatedSlotStatus(SlotStatus slotStatus,
TaskManagerInfo taskManager) {
final AllocationID allocationId =
Preconditions.checkNotNull(slotStatus.getAllocationID());
final JobID jobId = Preconditions.checkNotNull(slotStatus.getJobID());
+ final int assignedTasks = slotStatus.getAssignedTasks();
try (MdcUtils.MdcCloseable ignored =
MdcUtils.withContext(MdcUtils.asContextData(jobId))) {
final ResourceProfile resourceProfile =
Preconditions.checkNotNull(slotStatus.getResourceProfile());
- if (taskManager.getAllocatedSlots().containsKey(allocationId)) {
- if
(taskManager.getAllocatedSlots().get(allocationId).getState()
- == SlotState.PENDING) {
+ TaskManagerSlotInformation slot =
taskManager.getAllocatedSlots().get(allocationId);
+ if (Objects.nonNull(slot)) {
+ if (slot.getState() == SlotState.PENDING) {
// Allocation Complete
- final TaskManagerSlotInformation slot =
- taskManager.getAllocatedSlots().get(allocationId);
pendingSlotAllocations.remove(slot.getAllocationId());
taskManagerTracker.notifySlotStatus(
slot.getAllocationId(),
@@ -313,6 +313,7 @@ public class DefaultSlotStatusSyncer implements
SlotStatusSyncer {
slot.getResourceProfile(),
SlotState.ALLOCATED);
}
+ slot.setAssignedTasks(assignedTasks);
return true;
} else {
Preconditions.checkState(
@@ -325,6 +326,7 @@ public class DefaultSlotStatusSyncer implements
SlotStatusSyncer {
resourceProfile,
SlotState.ALLOCATED);
resourceTracker.notifyAcquiredResource(jobId, resourceProfile);
+
taskManager.getAllocatedSlots().get(allocationId).setAssignedTasks(assignedTasks);
return false;
}
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java
index c19943cdaa1..6d2511b7821 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java
@@ -755,6 +755,11 @@ public class FineGrainedSlotManager implements SlotManager
{
// Legacy APIs
//
---------------------------------------------------------------------------------------------
+ @Override
+ public int getAssignedTasksOf(InstanceID instanceId) {
+ return taskManagerTracker.getAssignedTasks(instanceId);
+ }
+
@Override
public int getNumberRegisteredSlots() {
return taskManagerTracker.getNumberRegisteredSlots();
@@ -803,7 +808,12 @@ public class FineGrainedSlotManager implements SlotManager
{
.map(Map::values)
.orElse(Collections.emptyList())
.stream()
- .map(slot -> new SlotInfo(slot.getJobId(),
slot.getResourceProfile()))
+ .map(
+ slot ->
+ new SlotInfo(
+ slot.getJobId(),
+ slot.getResourceProfile(),
+ slot.getAssignedTasks()))
.collect(Collectors.toList());
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedTaskManagerSlot.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedTaskManagerSlot.java
index b7e58cc2826..88af3876cdc 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedTaskManagerSlot.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedTaskManagerSlot.java
@@ -51,6 +51,8 @@ public class FineGrainedTaskManagerSlot implements
TaskManagerSlotInformation {
/** Current state of this slot. Should be either PENDING or ALLOCATED. */
private SlotState state;
+ private int assignedTasks = 0;
+
public FineGrainedTaskManagerSlot(
AllocationID allocationId,
JobID jobId,
@@ -72,6 +74,16 @@ public class FineGrainedTaskManagerSlot implements
TaskManagerSlotInformation {
return resourceProfile;
}
+ @Override
+ public void setAssignedTasks(int assignedTasks) {
+ this.assignedTasks = assignedTasks;
+ }
+
+ @Override
+ public int getAssignedTasks() {
+ return assignedTasks;
+ }
+
@Override
public SlotState getState() {
return state;
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedTaskManagerTracker.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedTaskManagerTracker.java
index da3c8ae6291..31ef3a8c269 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedTaskManagerTracker.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedTaskManagerTracker.java
@@ -35,6 +35,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
+import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
@@ -315,6 +316,22 @@ public class FineGrainedTaskManagerTracker implements
TaskManagerTracker {
Collections.emptySet()));
}
+ @Override
+ public int getAssignedTasks(InstanceID instanceId) {
+ FineGrainedTaskManagerRegistration taskManagerRegistration =
+ taskManagerRegistrations.get(instanceId);
+ if (Objects.isNull(taskManagerRegistration)) {
+ return 0;
+ }
+ int totalAssignedTasks = 0;
+ for (TaskManagerSlotInformation slot :
+ taskManagerRegistration.getAllocatedSlots().values()) {
+ final int assignedTasks = slot.getAssignedTasks();
+ totalAssignedTasks = totalAssignedTasks + assignedTasks;
+ }
+ return totalAssignedTasks;
+ }
+
@Override
public int getNumberRegisteredSlots() {
return taskManagerRegistrations.values().stream()
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
index 3effd14504d..2fa72baa65e 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
@@ -45,6 +45,9 @@ import java.util.concurrent.Executor;
* failure, respectively.
*/
public interface SlotManager extends AutoCloseable {
+
+ int getAssignedTasksOf(InstanceID instanceId);
+
int getNumberRegisteredSlots();
int getNumberRegisteredSlotsOf(InstanceID instanceId);
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerSlotInformation.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerSlotInformation.java
index 435bace1409..1e1701ac0a6 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerSlotInformation.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerSlotInformation.java
@@ -60,4 +60,18 @@ public interface TaskManagerSlotInformation {
* @return resource profile of this slot
*/
ResourceProfile getResourceProfile();
+
+ /**
+ * Set the number of tasks assigned to the current slot.
+ *
+ * @param assignedTasks The number of tasks assigned to the current slot.
+ */
+ void setAssignedTasks(int assignedTasks);
+
+ /**
+ * Get the number of tasks assigned to the current slot.
+ *
+ * @return The number of tasks assigned to the current slot.
+ */
+ int getAssignedTasks();
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/SlotInfo.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/SlotInfo.java
index 6da1b4e8004..d2999e94c07 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/SlotInfo.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/SlotInfo.java
@@ -46,6 +46,8 @@ public class SlotInfo implements ResponseBody, Serializable {
public static final String FIELD_NAME_JOB_ID = "jobId";
+ public static final String FIELD_NAME_ASSIGNED_TASKS = "assignedTasks";
+
@JsonProperty(FIELD_NAME_RESOURCE)
private final ResourceProfileInfo resource;
@@ -53,17 +55,22 @@ public class SlotInfo implements ResponseBody, Serializable
{
@JsonSerialize(using = JobIDSerializer.class)
private final JobID jobId;
+ @JsonProperty(FIELD_NAME_ASSIGNED_TASKS)
+ private final int assignedTasks;
+
@JsonCreator
public SlotInfo(
@JsonDeserialize(using = JobIDDeserializer.class)
@JsonProperty(FIELD_NAME_JOB_ID)
JobID jobId,
- @JsonProperty(FIELD_NAME_RESOURCE) ResourceProfileInfo resource) {
+ @JsonProperty(FIELD_NAME_RESOURCE) ResourceProfileInfo resource,
+ @JsonProperty(FIELD_NAME_ASSIGNED_TASKS) int assignedTasks) {
this.jobId = Preconditions.checkNotNull(jobId);
this.resource = Preconditions.checkNotNull(resource);
+ this.assignedTasks = assignedTasks;
}
- public SlotInfo(JobID jobId, ResourceProfile resource) {
- this(jobId, ResourceProfileInfo.fromResourceProfile(resource));
+ public SlotInfo(JobID jobId, ResourceProfile resource, int assignedTasks) {
+ this(jobId, ResourceProfileInfo.fromResourceProfile(resource),
assignedTasks);
}
@JsonIgnore
@@ -71,6 +78,11 @@ public class SlotInfo implements ResponseBody, Serializable {
return jobId;
}
+ @JsonIgnore
+ public int getAssignedTasks() {
+ return assignedTasks;
+ }
+
@JsonIgnore
public ResourceProfileInfo getResource() {
return resource;
@@ -85,11 +97,13 @@ public class SlotInfo implements ResponseBody, Serializable
{
return false;
}
SlotInfo that = (SlotInfo) o;
- return Objects.equals(jobId, that.jobId) && Objects.equals(resource,
that.resource);
+ return Objects.equals(jobId, that.jobId)
+ && Objects.equals(resource, that.resource)
+ && Objects.equals(assignedTasks, that.assignedTasks);
}
@Override
public int hashCode() {
- return Objects.hash(jobId, resource);
+ return Objects.hash(jobId, resource, assignedTasks);
}
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerDetailsInfo.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerDetailsInfo.java
index c9df8325e10..e06ce258531 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerDetailsInfo.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerDetailsInfo.java
@@ -45,6 +45,8 @@ public class TaskManagerDetailsInfo extends TaskManagerInfo {
public static final String FIELD_NAME_ALLOCATED_SLOTS = "allocatedSlots";
+ public static final String FIELD_NAME_ASSIGNED_TASKS = "assignedTasks";
+
@JsonProperty(FIELD_NAME_METRICS)
private final TaskManagerMetricsInfo taskManagerMetrics;
@@ -62,6 +64,7 @@ public class TaskManagerDetailsInfo extends TaskManagerInfo {
@JsonProperty(FIELD_NAME_LAST_HEARTBEAT) long lastHeartbeat,
@JsonProperty(FIELD_NAME_NUMBER_SLOTS) int numberSlots,
@JsonProperty(FIELD_NAME_NUMBER_AVAILABLE_SLOTS) int
numberAvailableSlots,
+ @JsonProperty(FIELD_NAME_ASSIGNED_TASKS) int assignedTasks,
@JsonProperty(FIELD_NAME_TOTAL_RESOURCE) ResourceProfileInfo
totalResource,
@JsonProperty(FIELD_NAME_AVAILABLE_RESOURCE) ResourceProfileInfo
freeResource,
@JsonProperty(FIELD_NAME_HARDWARE) HardwareDescription
hardwareDescription,
@@ -77,6 +80,7 @@ public class TaskManagerDetailsInfo extends TaskManagerInfo {
lastHeartbeat,
numberSlots,
numberAvailableSlots,
+ assignedTasks,
totalResource,
freeResource,
hardwareDescription,
@@ -98,6 +102,7 @@ public class TaskManagerDetailsInfo extends TaskManagerInfo {
taskManagerInfoWithSlots.getTaskManagerInfo().getLastHeartbeat(),
taskManagerInfoWithSlots.getTaskManagerInfo().getNumberSlots(),
taskManagerInfoWithSlots.getTaskManagerInfo().getNumberAvailableSlots(),
+
taskManagerInfoWithSlots.getTaskManagerInfo().getAssignedTasks(),
taskManagerInfoWithSlots.getTaskManagerInfo().getTotalResource(),
taskManagerInfoWithSlots.getTaskManagerInfo().getFreeResource(),
taskManagerInfoWithSlots.getTaskManagerInfo().getHardwareDescription(),
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerInfo.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerInfo.java
index cca037e2aa6..043c2039ec4 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerInfo.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerInfo.java
@@ -59,6 +59,8 @@ public class TaskManagerInfo implements ResponseBody,
Serializable {
public static final String FIELD_NAME_NUMBER_AVAILABLE_SLOTS = "freeSlots";
+ public static final String FIELD_NAME_ASSIGNED_TASKS = "assignedTasks";
+
public static final String FIELD_NAME_TOTAL_RESOURCE = "totalResource";
public static final String FIELD_NAME_AVAILABLE_RESOURCE = "freeResource";
@@ -93,6 +95,9 @@ public class TaskManagerInfo implements ResponseBody,
Serializable {
@JsonProperty(FIELD_NAME_NUMBER_AVAILABLE_SLOTS)
private final int numberAvailableSlots;
+ @JsonProperty(FIELD_NAME_ASSIGNED_TASKS)
+ private final int assignedTasks;
+
@JsonProperty(FIELD_NAME_TOTAL_RESOURCE)
private final ResourceProfileInfo totalResource;
@@ -121,6 +126,7 @@ public class TaskManagerInfo implements ResponseBody,
Serializable {
@JsonProperty(FIELD_NAME_LAST_HEARTBEAT) long lastHeartbeat,
@JsonProperty(FIELD_NAME_NUMBER_SLOTS) int numberSlots,
@JsonProperty(FIELD_NAME_NUMBER_AVAILABLE_SLOTS) int
numberAvailableSlots,
+ @JsonProperty(FIELD_NAME_ASSIGNED_TASKS) int assignedTasks,
@JsonProperty(FIELD_NAME_TOTAL_RESOURCE) ResourceProfileInfo
totalResource,
@JsonProperty(FIELD_NAME_AVAILABLE_RESOURCE) ResourceProfileInfo
freeResource,
@JsonProperty(FIELD_NAME_HARDWARE) HardwareDescription
hardwareDescription,
@@ -133,6 +139,7 @@ public class TaskManagerInfo implements ResponseBody,
Serializable {
this.lastHeartbeat = lastHeartbeat;
this.numberSlots = numberSlots;
this.numberAvailableSlots = numberAvailableSlots;
+ this.assignedTasks = assignedTasks;
this.totalResource = totalResource;
this.freeResource = freeResource;
this.hardwareDescription =
Preconditions.checkNotNull(hardwareDescription);
@@ -148,6 +155,7 @@ public class TaskManagerInfo implements ResponseBody,
Serializable {
long lastHeartbeat,
int numberSlots,
int numberAvailableSlots,
+ int assignedTasks,
ResourceProfile totalResource,
ResourceProfile freeResource,
HardwareDescription hardwareDescription,
@@ -161,6 +169,7 @@ public class TaskManagerInfo implements ResponseBody,
Serializable {
lastHeartbeat,
numberSlots,
numberAvailableSlots,
+ assignedTasks,
ResourceProfileInfo.fromResourceProfile(totalResource),
ResourceProfileInfo.fromResourceProfile(freeResource),
hardwareDescription,
@@ -203,6 +212,11 @@ public class TaskManagerInfo implements ResponseBody,
Serializable {
return numberAvailableSlots;
}
+ @JsonIgnore
+ public int getAssignedTasks() {
+ return assignedTasks;
+ }
+
@JsonIgnore
public ResourceProfileInfo getTotalResource() {
return totalResource;
@@ -242,6 +256,7 @@ public class TaskManagerInfo implements ResponseBody,
Serializable {
&& lastHeartbeat == that.lastHeartbeat
&& numberSlots == that.numberSlots
&& numberAvailableSlots == that.numberAvailableSlots
+ && Objects.equals(assignedTasks, that.assignedTasks)
&& Objects.equals(totalResource, that.totalResource)
&& Objects.equals(freeResource, that.freeResource)
&& Objects.equals(resourceId, that.resourceId)
@@ -261,6 +276,7 @@ public class TaskManagerInfo implements ResponseBody,
Serializable {
lastHeartbeat,
numberSlots,
numberAvailableSlots,
+ assignedTasks,
totalResource,
freeResource,
hardwareDescription,
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/SlotStatus.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/SlotStatus.java
index 2295defb5af..3ca916a22e3 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/SlotStatus.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/SlotStatus.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.taskexecutor;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
@@ -49,19 +50,33 @@ public class SlotStatus implements Serializable {
*/
private final JobID jobID;
+ private final int assignedTasks;
+
+ @VisibleForTesting
public SlotStatus(SlotID slotID, ResourceProfile resourceProfile) {
- this(slotID, resourceProfile, null, null);
+ this(slotID, resourceProfile, null, null, 0);
}
+ @VisibleForTesting
public SlotStatus(
SlotID slotID,
ResourceProfile resourceProfile,
JobID jobID,
AllocationID allocationID) {
+ this(slotID, resourceProfile, jobID, allocationID, 0);
+ }
+
+ public SlotStatus(
+ SlotID slotID,
+ ResourceProfile resourceProfile,
+ JobID jobID,
+ AllocationID allocationID,
+ int assignedTasks) {
this.slotID = checkNotNull(slotID, "slotID cannot be null");
this.resourceProfile = checkNotNull(resourceProfile, "profile cannot
be null");
this.allocationID = allocationID;
this.jobID = jobID;
+ this.assignedTasks = assignedTasks;
}
/**
@@ -100,6 +115,10 @@ public class SlotStatus implements Serializable {
return jobID;
}
+ public int getAssignedTasks() {
+ return assignedTasks;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
@@ -143,6 +162,8 @@ public class SlotStatus implements Serializable {
+ allocationID
+ ", jobID="
+ jobID
+ + ", assignedTasks="
+ + assignedTasks
+ ", resourceProfile="
+ resourceProfile
+ '}';
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlot.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlot.java
index a1f2f981891..817b19ca7cd 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlot.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlot.java
@@ -141,6 +141,10 @@ public class TaskSlot<T extends TaskSlotPayload>
implements AutoCloseableAsync {
return tasks.isEmpty();
}
+ public int getAssignedTasks() {
+ return tasks.size();
+ }
+
public boolean isActive(JobID activeJobId, AllocationID
activeAllocationId) {
Preconditions.checkNotNull(activeJobId);
Preconditions.checkNotNull(activeAllocationId);
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableImpl.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableImpl.java
index bfed45c0a31..8bdc56f04c6 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableImpl.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableImpl.java
@@ -244,9 +244,10 @@ public class TaskSlotTableImpl<T extends TaskSlotPayload>
implements TaskSlotTab
slotId,
taskSlot.getResourceProfile(),
taskSlot.getJobId(),
- taskSlot.getAllocationId());
+ taskSlot.getAllocationId(),
+ taskSlot.getAssignedTasks());
} else {
- slotStatus = new SlotStatus(slotId,
defaultSlotResourceProfile, null, null);
+ slotStatus = new SlotStatus(slotId,
defaultSlotResourceProfile, null, null, 0);
}
slotStatuses.add(slotStatus);
@@ -259,14 +260,13 @@ public class TaskSlotTableImpl<T extends TaskSlotPayload>
implements TaskSlotTab
new SlotID(resourceId, taskSlot.getIndex()),
taskSlot.getResourceProfile(),
taskSlot.getJobId(),
- taskSlot.getAllocationId());
+ taskSlot.getAllocationId(),
+ taskSlot.getAssignedTasks());
slotStatuses.add(slotStatus);
}
}
- final SlotReport slotReport = new SlotReport(slotStatuses);
-
- return slotReport;
+ return new SlotReport(slotStatuses);
}
// ---------------------------------------------------------------------
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
index 761f94b08bc..c95c56f2b04 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
@@ -18,7 +18,9 @@
package org.apache.flink.runtime.resourcemanager;
+import org.apache.flink.api.common.JobID;
import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotID;
@@ -26,6 +28,7 @@ import org.apache.flink.runtime.instance.HardwareDescription;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.leaderelection.TestingLeaderElection;
import org.apache.flink.runtime.registration.RegistrationResponse;
+import org.apache.flink.runtime.rest.messages.taskmanager.SlotInfo;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.rpc.exceptions.FencingTokenException;
@@ -48,6 +51,7 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import java.time.Duration;
+import java.util.Arrays;
import java.util.Collection;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
@@ -370,4 +374,107 @@ class ResourceManagerTaskExecutorTest {
1),
TIMEOUT);
}
+
+ @Test
+ void testRequestTaskManagerDetailsInfo() throws Exception {
+ final Duration fastTimeout = Duration.ofMillis(1L);
+ try {
+ final OneShotLatch startConnection = new OneShotLatch();
+ final OneShotLatch finishConnection = new OneShotLatch();
+
+ // first registration is with blocking connection
+ rpcService.setRpcGatewayFutureFunction(
+ rpcGateway ->
+ CompletableFuture.supplyAsync(
+ () -> {
+ startConnection.trigger();
+ try {
+ finishConnection.await();
+ } catch (InterruptedException ignored)
{
+ }
+ return rpcGateway;
+ },
+ EXECUTOR_EXTENSION.getExecutor()));
+
+ TaskExecutorRegistration taskExecutorRegistration =
+ new TaskExecutorRegistration(
+ taskExecutorGateway.getAddress(),
+ taskExecutorResourceID,
+ dataPort,
+ jmxPort,
+ hardwareDescription,
+ new TaskExecutorMemoryConfiguration(
+ 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L),
+ DEFAULT_SLOT_PROFILE,
+ DEFAULT_SLOT_PROFILE,
+ taskExecutorGateway.getAddress(),
+ 0);
+
+ CompletableFuture<RegistrationResponse> firstFuture =
+ rmGateway.registerTaskExecutor(taskExecutorRegistration,
fastTimeout);
+ assertThatFuture(firstFuture)
+ .as(
+ "Should have failed because connection to
taskmanager is delayed beyond timeout")
+ .eventuallyFails()
+ .withThrowableOfType(Exception.class)
+ .withCauseInstanceOf(TimeoutException.class)
+
.withMessageContaining("ResourceManagerGateway.registerTaskExecutor");
+
+ startConnection.await();
+
+ // second registration after timeout is with no delay, expecting
it to be succeeded
+ rpcService.resetRpcGatewayFutureFunction();
+ CompletableFuture<RegistrationResponse> secondFuture =
+ rmGateway.registerTaskExecutor(taskExecutorRegistration,
TIMEOUT);
+ RegistrationResponse response = secondFuture.get();
+
+ // Test request task manager information before slot report.
+ TaskManagerInfoWithSlots taskManagerInfoWithSlots =
+
rmGateway.requestTaskManagerDetailsInfo(taskExecutorResourceID, TIMEOUT).get();
+
+
assertThat(taskManagerInfoWithSlots.getTaskManagerInfo().getAssignedTasks()).isZero();
+ assertThat(taskManagerInfoWithSlots.getAllocatedSlots())
+ .allMatch(slotInfo -> slotInfo.getAssignedTasks() == 0);
+
+ // on success, send slot report for taskmanager registration
+ final JobID ignoredJobID = new JobID();
+ final SlotReport slotReport =
+ new SlotReport(
+ Arrays.asList(
+ new SlotStatus(
+ new SlotID(taskExecutorResourceID,
1),
+ ResourceProfile.ANY,
+ ignoredJobID,
+ new AllocationID(),
+ 1),
+ new SlotStatus(
+ new SlotID(taskExecutorResourceID,
1),
+ ResourceProfile.ANY,
+ ignoredJobID,
+ new AllocationID(),
+ 0)));
+ rmGateway
+ .sendSlotReport(
+ taskExecutorResourceID,
+ ((TaskExecutorRegistrationSuccess)
response).getRegistrationId(),
+ slotReport,
+ TIMEOUT)
+ .get();
+
+ // let the remaining part of the first registration proceed
+ finishConnection.trigger();
+ Thread.sleep(1L);
+
+ // Test request task manager information after slot report.
+ taskManagerInfoWithSlots =
+
rmGateway.requestTaskManagerDetailsInfo(taskExecutorResourceID, TIMEOUT).get();
+
assertThat(taskManagerInfoWithSlots.getTaskManagerInfo().getAssignedTasks()).isOne();
+ assertThat(
+
taskManagerInfoWithSlots.getAllocatedSlots().stream()
+ .map(SlotInfo::getAssignedTasks))
+ .containsExactlyInAnyOrder(0, 1);
+ } finally {
+ rpcService.resetRpcGatewayFutureFunction();
+ }
+ }
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
index 935096c6e63..2096c95fb97 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
@@ -206,6 +206,7 @@ class ResourceManagerTest {
assertThat(taskManagerInfo.getJmxPort()).isEqualTo(jmxPort);
assertThat(taskManagerInfo.getNumberSlots()).isEqualTo(0);
assertThat(taskManagerInfo.getNumberAvailableSlots()).isEqualTo(0);
+ assertThat(taskManagerInfo.getAssignedTasks()).isZero();
assertThat(taskManagerInfoWithSlots.getAllocatedSlots()).isEmpty();
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingSlotManager.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingSlotManager.java
index d2550b5d9f0..f14a1e5b985 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingSlotManager.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingSlotManager.java
@@ -54,6 +54,11 @@ public class TestingSlotManager implements SlotManager {
this.triggerRequirementsCheckConsumer =
triggerRequirementsCheckConsumer;
}
+ @Override
+ public int getAssignedTasksOf(InstanceID instanceId) {
+ return 0;
+ }
+
@Override
public int getNumberRegisteredSlots() {
return 0;
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerDetailsHandlerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerDetailsHandlerTest.java
index 5e7a1f2dd40..5f3825b252b 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerDetailsHandlerTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerDetailsHandlerTest.java
@@ -173,6 +173,7 @@ class TaskManagerDetailsHandlerTest {
0L,
0,
0,
+ 0,
ResourceProfile.ZERO,
ResourceProfile.ZERO,
new HardwareDescription(0, 0L, 0L, 0L),
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerDetailsInfoTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerDetailsInfoTest.java
index 959f1dcd406..97a20dcb1df 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerDetailsInfoTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerDetailsInfoTest.java
@@ -48,7 +48,8 @@ class TaskManagerDetailsInfoTest extends
RestResponseMarshallingTestBase<TaskMan
final TaskManagerInfoWithSlots taskManagerInfoWithSlots =
new TaskManagerInfoWithSlots(
TaskManagerInfoTest.createRandomTaskManagerInfo(),
- Collections.singletonList(new SlotInfo(new JobID(),
ResourceProfile.ANY)));
+ Collections.singletonList(
+ new SlotInfo(new JobID(), ResourceProfile.ANY,
0)));
final TaskManagerMetricsInfo taskManagerMetricsInfo =
createRandomTaskManagerMetricsInfo();
return new TaskManagerDetailsInfo(taskManagerInfoWithSlots,
taskManagerMetricsInfo);
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerInfoTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerInfoTest.java
index acacd53aba7..2c765bf56fe 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerInfoTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerInfoTest.java
@@ -55,6 +55,7 @@ class TaskManagerInfoTest extends
RestResponseMarshallingTestBase<TaskManagerInf
random.nextLong(),
random.nextInt(),
random.nextInt(),
+ 0,
ResourceProfile.ZERO,
ResourceProfile.ZERO,
new HardwareDescription(