This is an automated email from the ASF dual-hosted git repository.

fcsaky pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 02106293bb8 [FLINK-38717][runtime] Display the assigned task numbers 
for TaskManagers on the UI
02106293bb8 is described below

commit 02106293bb827d7c2d87cb0cd0b3e56db39848ac
Author: Yuepeng Pan <[email protected]>
AuthorDate: Tue Dec 30 14:35:14 2025 +0800

    [FLINK-38717][runtime] Display the assigned task numbers for TaskManagers 
on the UI
---
 .../shortcodes/generated/rest_v1_dispatcher.html   |   9 ++
 .../src/test/resources/rest_api_v1.snapshot        |   9 ++
 .../src/app/interfaces/task-manager.ts             |   3 +
 .../list/task-manager-list.component.html          |   2 +
 .../list/task-manager-list.component.ts            |   1 +
 .../metrics/task-manager-metrics.component.html    |   2 +
 .../status/task-manager-status.component.html      |   5 +-
 .../runtime/resourcemanager/ResourceManager.java   |   2 +
 .../ClusterResourceStatisticsProvider.java         |   3 +
 .../slotmanager/DefaultSlotStatusSyncer.java       |  12 ++-
 .../slotmanager/FineGrainedSlotManager.java        |  12 ++-
 .../slotmanager/FineGrainedTaskManagerSlot.java    |  12 +++
 .../slotmanager/FineGrainedTaskManagerTracker.java |  17 ++++
 .../resourcemanager/slotmanager/SlotManager.java   |   3 +
 .../slotmanager/TaskManagerSlotInformation.java    |  14 +++
 .../rest/messages/taskmanager/SlotInfo.java        |  24 ++++-
 .../taskmanager/TaskManagerDetailsInfo.java        |   5 +
 .../rest/messages/taskmanager/TaskManagerInfo.java |  16 +++
 .../flink/runtime/taskexecutor/SlotStatus.java     |  23 ++++-
 .../flink/runtime/taskexecutor/slot/TaskSlot.java  |   4 +
 .../taskexecutor/slot/TaskSlotTableImpl.java       |  12 +--
 .../ResourceManagerTaskExecutorTest.java           | 107 +++++++++++++++++++++
 .../resourcemanager/ResourceManagerTest.java       |   1 +
 .../slotmanager/TestingSlotManager.java            |   5 +
 .../taskmanager/TaskManagerDetailsHandlerTest.java |   1 +
 .../taskmanager/TaskManagerDetailsInfoTest.java    |   3 +-
 .../messages/taskmanager/TaskManagerInfoTest.java  |   1 +
 27 files changed, 288 insertions(+), 20 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html 
b/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
index a36b1aeaf55..2339c5233f4 100644
--- a/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
+++ b/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
@@ -5240,6 +5240,9 @@ Using 'curl' you can upload a jar via 'curl -X POST -H 
"Expect:" -F "jarfile=@pa
           "freeSlots" : {
             "type" : "integer"
           },
+          "assignedTasks" : {
+            "type" : "integer"
+          },
           "hardware" : {
             "type" : "object",
             "id" : 
"urn:jsonschema:org:apache:flink:runtime:instance:HardwareDescription",
@@ -5441,6 +5444,9 @@ Using 'curl' you can upload a jar via 'curl -X POST -H 
"Expect:" -F "jarfile=@pa
           "jobId" : {
             "type" : "any"
           },
+          "assignedTasks" : {
+            "type" : "integer"
+          },
           "resource" : {
             "type" : "object",
             "$ref" : 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:ResourceProfileInfo"
@@ -5461,6 +5467,9 @@ Using 'curl' you can upload a jar via 'curl -X POST -H 
"Expect:" -F "jarfile=@pa
     "freeSlots" : {
       "type" : "integer"
     },
+    "assignedTasks" : {
+      "type" : "integer"
+    },
     "hardware" : {
       "type" : "object",
       "id" : 
"urn:jsonschema:org:apache:flink:runtime:instance:HardwareDescription",
diff --git a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot 
b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
index 1444b47b684..406ebe85ef0 100644
--- a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
+++ b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
@@ -4093,6 +4093,9 @@
               "freeSlots" : {
                 "type" : "integer"
               },
+              "assignedTasks" : {
+                "type" : "integer"
+              },
               "totalResource" : {
                 "type" : "object",
                 "id" : 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:ResourceProfileInfo",
@@ -4255,6 +4258,9 @@
         "freeSlots" : {
           "type" : "integer"
         },
+        "assignedTasks" : {
+          "type" : "integer"
+        },
         "totalResource" : {
           "type" : "object",
           "id" : 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:ResourceProfileInfo",
@@ -4352,6 +4358,9 @@
               "jobId" : {
                 "type" : "any"
               },
+              "assignedTasks" : {
+                "type" : "integer"
+              },
               "resource" : {
                 "type" : "object",
                 "$ref" : 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:ResourceProfileInfo"
diff --git a/flink-runtime-web/web-dashboard/src/app/interfaces/task-manager.ts 
b/flink-runtime-web/web-dashboard/src/app/interfaces/task-manager.ts
index 0c56202b673..51f0705026e 100644
--- a/flink-runtime-web/web-dashboard/src/app/interfaces/task-manager.ts
+++ b/flink-runtime-web/web-dashboard/src/app/interfaces/task-manager.ts
@@ -27,6 +27,7 @@ export interface TaskManagerDetail {
   timeSinceLastHeartbeat: number;
   slotsNumber: number;
   freeSlots: number;
+  assignedTasks: number;
   hardware: Hardware;
   metrics: Metrics;
   memoryConfiguration: MemoryConfiguration;
@@ -38,6 +39,7 @@ export interface TaskManagerDetail {
 
 export interface AllocatedSlot {
   jobId: string;
+  assignedTasks: number;
   resource: Resources;
 }
 
@@ -67,6 +69,7 @@ export interface TaskManagersItem {
   timeSinceLastHeartbeat: number;
   slotsNumber: number;
   freeSlots: number;
+  assignedTasks: number;
   hardware: Hardware;
   blocked?: boolean;
 }
diff --git 
a/flink-runtime-web/web-dashboard/src/app/pages/task-manager/list/task-manager-list.component.html
 
b/flink-runtime-web/web-dashboard/src/app/pages/task-manager/list/task-manager-list.component.html
index 3d6ca14a058..89042fa40f1 100644
--- 
a/flink-runtime-web/web-dashboard/src/app/pages/task-manager/list/task-manager-list.component.html
+++ 
b/flink-runtime-web/web-dashboard/src/app/pages/task-manager/list/task-manager-list.component.html
@@ -32,6 +32,7 @@
         <th [nzSortFn]="sortHeartBeatFn" [nzWidth]="'160px'">Last 
Heartbeat</th>
         <th [nzSortFn]="sortSlotsNumberFn" [nzWidth]="'90px'">All Slots</th>
         <th [nzSortFn]="sortFreeSlotsFn" [nzWidth]="'100px'">Free Slots</th>
+        <th [nzSortFn]="sortAssignedTasksFn" [nzWidth]="'100px'">Assigned 
Tasks</th>
         <th [nzSortFn]="sortCpuCoresFn" [nzWidth]="'110px'">CPU Cores</th>
         <th [nzSortFn]="sortPhysicalMemoryFn" [nzWidth]="'120px'">Physical 
MEM</th>
         <th [nzSortFn]="sortFreeMemoryFn" [nzWidth]="'130px'">JVM Heap 
Size</th>
@@ -53,6 +54,7 @@
           <td>{{ manager.timeSinceLastHeartbeat | date: 'yyyy-MM-dd HH:mm:ss' 
}}</td>
           <td>{{ manager.slotsNumber }}</td>
           <td>{{ manager.freeSlots }}</td>
+          <td>{{ manager.assignedTasks }}</td>
           <td>{{ manager.hardware.cpuCores }}</td>
           <td [attr.title]="manager.hardware.physicalMemory + ' bytes'">
             {{ manager.hardware.physicalMemory | humanizeBytes }}
diff --git 
a/flink-runtime-web/web-dashboard/src/app/pages/task-manager/list/task-manager-list.component.ts
 
b/flink-runtime-web/web-dashboard/src/app/pages/task-manager/list/task-manager-list.component.ts
index 420c6de4e59..93bf4c76185 100644
--- 
a/flink-runtime-web/web-dashboard/src/app/pages/task-manager/list/task-manager-list.component.ts
+++ 
b/flink-runtime-web/web-dashboard/src/app/pages/task-manager/list/task-manager-list.component.ts
@@ -49,6 +49,7 @@ export class TaskManagerListComponent implements OnInit, 
OnDestroy {
   public readonly sortHeartBeatFn = createSortFn(item => 
item.timeSinceLastHeartbeat);
   public readonly sortSlotsNumberFn = createSortFn(item => item.slotsNumber);
   public readonly sortFreeSlotsFn = createSortFn(item => item.freeSlots);
+  public readonly sortAssignedTasksFn = createSortFn(item => 
item.assignedTasks);
   public readonly sortCpuCoresFn = createSortFn(item => 
item.hardware?.cpuCores);
   public readonly sortPhysicalMemoryFn = createSortFn(item => 
item.hardware?.physicalMemory);
   public readonly sortFreeMemoryFn = createSortFn(item => 
item.hardware?.freeMemory);
diff --git 
a/flink-runtime-web/web-dashboard/src/app/pages/task-manager/metrics/task-manager-metrics.component.html
 
b/flink-runtime-web/web-dashboard/src/app/pages/task-manager/metrics/task-manager-metrics.component.html
index b4a41b521f2..07f735be986 100644
--- 
a/flink-runtime-web/web-dashboard/src/app/pages/task-manager/metrics/task-manager-metrics.component.html
+++ 
b/flink-runtime-web/web-dashboard/src/app/pages/task-manager/metrics/task-manager-metrics.component.html
@@ -396,6 +396,7 @@
       <tr>
         <th>#</th>
         <th>Job ID</th>
+        <th>Assigned Tasks</th>
         <th>CPU (cores)</th>
         <th>Task Heap memory (MB)</th>
         <th>Task Off-Heap memory (MB)</th>
@@ -409,6 +410,7 @@
           <strong>{{ i | number }}</strong>
         </td>
         <td>{{ slot.jobId }}</td>
+        <td>{{ slot.assignedTasks }}</td>
         <td>{{ slot.resource.cpuCores | number }}</td>
         <td>{{ slot.resource.taskHeapMemory | number }}</td>
         <td>{{ slot.resource.taskOffHeapMemory | number }}</td>
diff --git 
a/flink-runtime-web/web-dashboard/src/app/pages/task-manager/status/task-manager-status.component.html
 
b/flink-runtime-web/web-dashboard/src/app/pages/task-manager/status/task-manager-status.component.html
index c88b16fbb85..46e954d7082 100644
--- 
a/flink-runtime-web/web-dashboard/src/app/pages/task-manager/status/task-manager-status.component.html
+++ 
b/flink-runtime-web/web-dashboard/src/app/pages/task-manager/status/task-manager-status.component.html
@@ -22,12 +22,15 @@
     <flink-blocked-badge 
*ngIf="taskManagerDetail?.blocked"></flink-blocked-badge>
   </div>
   <nz-descriptions *ngIf="taskManagerDetail" nzBordered nzSize="small">
-    <nz-descriptions-item [nzSpan]="2" nzTitle="Path">
+    <nz-descriptions-item [nzSpan]="1" nzTitle="Path">
       {{ taskManagerDetail.path }}
     </nz-descriptions-item>
     <nz-descriptions-item [nzSpan]="1" nzTitle="Free/All Slots">
       {{ taskManagerDetail.freeSlots }} / {{ taskManagerDetail.slotsNumber }}
     </nz-descriptions-item>
+    <nz-descriptions-item [nzSpan]="1" nzTitle="Assigned Tasks">
+      {{ taskManagerDetail.assignedTasks }}
+    </nz-descriptions-item>
     <nz-descriptions-item [nzSpan]="1" nzTitle="Last Heartbeat">
       {{ taskManagerDetail.timeSinceLastHeartbeat | date: 'yyyy-MM-dd 
HH:mm:ss' }}
     </nz-descriptions-item>
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index b3f9404e3ee..13af32c8884 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -687,6 +687,7 @@ public abstract class ResourceManager<WorkerType extends 
ResourceIDRetrievable>
                             
taskManagerHeartbeatManager.getLastHeartbeatFrom(resourceId),
                             
slotManager.getNumberRegisteredSlotsOf(taskExecutor.getInstanceID()),
                             
slotManager.getNumberFreeSlotsOf(taskExecutor.getInstanceID()),
+                            
slotManager.getAssignedTasksOf(taskExecutor.getInstanceID()),
                             
slotManager.getRegisteredResourceOf(taskExecutor.getInstanceID()),
                             
slotManager.getFreeResourceOf(taskExecutor.getInstanceID()),
                             taskExecutor.getHardwareDescription(),
@@ -717,6 +718,7 @@ public abstract class ResourceManager<WorkerType extends 
ResourceIDRetrievable>
                                     
taskManagerHeartbeatManager.getLastHeartbeatFrom(resourceId),
                                     
slotManager.getNumberRegisteredSlotsOf(instanceId),
                                     
slotManager.getNumberFreeSlotsOf(instanceId),
+                                    slotManager.getAssignedTasksOf(instanceId),
                                     
slotManager.getRegisteredResourceOf(instanceId),
                                     slotManager.getFreeResourceOf(instanceId),
                                     taskExecutor.getHardwareDescription(),
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ClusterResourceStatisticsProvider.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ClusterResourceStatisticsProvider.java
index f3b5c4873a0..80539808c93 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ClusterResourceStatisticsProvider.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ClusterResourceStatisticsProvider.java
@@ -24,6 +24,9 @@ import org.apache.flink.runtime.instance.InstanceID;
 /** Provides statistics of cluster resources. */
 public interface ClusterResourceStatisticsProvider {
 
+    /** Get total number of tasks assigned to the current instance. slots. */
+    int getAssignedTasks(InstanceID instanceId);
+
     /** Get total number of registered slots. */
     int getNumberRegisteredSlots();
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultSlotStatusSyncer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultSlotStatusSyncer.java
index 6b526cf391e..e957eb3b644 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultSlotStatusSyncer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultSlotStatusSyncer.java
@@ -41,6 +41,7 @@ import javax.annotation.Nullable;
 
 import java.time.Duration;
 import java.util.HashSet;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
@@ -295,16 +296,15 @@ public class DefaultSlotStatusSyncer implements 
SlotStatusSyncer {
     private boolean syncAllocatedSlotStatus(SlotStatus slotStatus, 
TaskManagerInfo taskManager) {
         final AllocationID allocationId = 
Preconditions.checkNotNull(slotStatus.getAllocationID());
         final JobID jobId = Preconditions.checkNotNull(slotStatus.getJobID());
+        final int assignedTasks = slotStatus.getAssignedTasks();
         try (MdcUtils.MdcCloseable ignored = 
MdcUtils.withContext(MdcUtils.asContextData(jobId))) {
             final ResourceProfile resourceProfile =
                     
Preconditions.checkNotNull(slotStatus.getResourceProfile());
 
-            if (taskManager.getAllocatedSlots().containsKey(allocationId)) {
-                if 
(taskManager.getAllocatedSlots().get(allocationId).getState()
-                        == SlotState.PENDING) {
+            TaskManagerSlotInformation slot = 
taskManager.getAllocatedSlots().get(allocationId);
+            if (Objects.nonNull(slot)) {
+                if (slot.getState() == SlotState.PENDING) {
                     // Allocation Complete
-                    final TaskManagerSlotInformation slot =
-                            taskManager.getAllocatedSlots().get(allocationId);
                     pendingSlotAllocations.remove(slot.getAllocationId());
                     taskManagerTracker.notifySlotStatus(
                             slot.getAllocationId(),
@@ -313,6 +313,7 @@ public class DefaultSlotStatusSyncer implements 
SlotStatusSyncer {
                             slot.getResourceProfile(),
                             SlotState.ALLOCATED);
                 }
+                slot.setAssignedTasks(assignedTasks);
                 return true;
             } else {
                 Preconditions.checkState(
@@ -325,6 +326,7 @@ public class DefaultSlotStatusSyncer implements 
SlotStatusSyncer {
                         resourceProfile,
                         SlotState.ALLOCATED);
                 resourceTracker.notifyAcquiredResource(jobId, resourceProfile);
+                
taskManager.getAllocatedSlots().get(allocationId).setAssignedTasks(assignedTasks);
                 return false;
             }
         }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java
index c19943cdaa1..6d2511b7821 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java
@@ -755,6 +755,11 @@ public class FineGrainedSlotManager implements SlotManager 
{
     // Legacy APIs
     // 
---------------------------------------------------------------------------------------------
 
+    @Override
+    public int getAssignedTasksOf(InstanceID instanceId) {
+        return taskManagerTracker.getAssignedTasks(instanceId);
+    }
+
     @Override
     public int getNumberRegisteredSlots() {
         return taskManagerTracker.getNumberRegisteredSlots();
@@ -803,7 +808,12 @@ public class FineGrainedSlotManager implements SlotManager 
{
                 .map(Map::values)
                 .orElse(Collections.emptyList())
                 .stream()
-                .map(slot -> new SlotInfo(slot.getJobId(), 
slot.getResourceProfile()))
+                .map(
+                        slot ->
+                                new SlotInfo(
+                                        slot.getJobId(),
+                                        slot.getResourceProfile(),
+                                        slot.getAssignedTasks()))
                 .collect(Collectors.toList());
     }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedTaskManagerSlot.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedTaskManagerSlot.java
index b7e58cc2826..88af3876cdc 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedTaskManagerSlot.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedTaskManagerSlot.java
@@ -51,6 +51,8 @@ public class FineGrainedTaskManagerSlot implements 
TaskManagerSlotInformation {
     /** Current state of this slot. Should be either PENDING or ALLOCATED. */
     private SlotState state;
 
+    private int assignedTasks = 0;
+
     public FineGrainedTaskManagerSlot(
             AllocationID allocationId,
             JobID jobId,
@@ -72,6 +74,16 @@ public class FineGrainedTaskManagerSlot implements 
TaskManagerSlotInformation {
         return resourceProfile;
     }
 
+    @Override
+    public void setAssignedTasks(int assignedTasks) {
+        this.assignedTasks = assignedTasks;
+    }
+
+    @Override
+    public int getAssignedTasks() {
+        return assignedTasks;
+    }
+
     @Override
     public SlotState getState() {
         return state;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedTaskManagerTracker.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedTaskManagerTracker.java
index da3c8ae6291..31ef3a8c269 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedTaskManagerTracker.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedTaskManagerTracker.java
@@ -35,6 +35,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
@@ -315,6 +316,22 @@ public class FineGrainedTaskManagerTracker implements 
TaskManagerTracker {
                         Collections.emptySet()));
     }
 
+    @Override
+    public int getAssignedTasks(InstanceID instanceId) {
+        FineGrainedTaskManagerRegistration taskManagerRegistration =
+                taskManagerRegistrations.get(instanceId);
+        if (Objects.isNull(taskManagerRegistration)) {
+            return 0;
+        }
+        int totalAssignedTasks = 0;
+        for (TaskManagerSlotInformation slot :
+                taskManagerRegistration.getAllocatedSlots().values()) {
+            final int assignedTasks = slot.getAssignedTasks();
+            totalAssignedTasks = totalAssignedTasks + assignedTasks;
+        }
+        return totalAssignedTasks;
+    }
+
     @Override
     public int getNumberRegisteredSlots() {
         return taskManagerRegistrations.values().stream()
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
index 3effd14504d..2fa72baa65e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
@@ -45,6 +45,9 @@ import java.util.concurrent.Executor;
  * failure, respectively.
  */
 public interface SlotManager extends AutoCloseable {
+
+    int getAssignedTasksOf(InstanceID instanceId);
+
     int getNumberRegisteredSlots();
 
     int getNumberRegisteredSlotsOf(InstanceID instanceId);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerSlotInformation.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerSlotInformation.java
index 435bace1409..1e1701ac0a6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerSlotInformation.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerSlotInformation.java
@@ -60,4 +60,18 @@ public interface TaskManagerSlotInformation {
      * @return resource profile of this slot
      */
     ResourceProfile getResourceProfile();
+
+    /**
+     * Set the number of tasks assigned to the current slot.
+     *
+     * @param assignedTasks The number of tasks assigned to the current slot.
+     */
+    void setAssignedTasks(int assignedTasks);
+
+    /**
+     * Get the number of tasks assigned to the current slot.
+     *
+     * @return The number of tasks assigned to the current slot.
+     */
+    int getAssignedTasks();
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/SlotInfo.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/SlotInfo.java
index 6da1b4e8004..d2999e94c07 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/SlotInfo.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/SlotInfo.java
@@ -46,6 +46,8 @@ public class SlotInfo implements ResponseBody, Serializable {
 
     public static final String FIELD_NAME_JOB_ID = "jobId";
 
+    public static final String FIELD_NAME_ASSIGNED_TASKS = "assignedTasks";
+
     @JsonProperty(FIELD_NAME_RESOURCE)
     private final ResourceProfileInfo resource;
 
@@ -53,17 +55,22 @@ public class SlotInfo implements ResponseBody, Serializable 
{
     @JsonSerialize(using = JobIDSerializer.class)
     private final JobID jobId;
 
+    @JsonProperty(FIELD_NAME_ASSIGNED_TASKS)
+    private final int assignedTasks;
+
     @JsonCreator
     public SlotInfo(
             @JsonDeserialize(using = JobIDDeserializer.class) 
@JsonProperty(FIELD_NAME_JOB_ID)
                     JobID jobId,
-            @JsonProperty(FIELD_NAME_RESOURCE) ResourceProfileInfo resource) {
+            @JsonProperty(FIELD_NAME_RESOURCE) ResourceProfileInfo resource,
+            @JsonProperty(FIELD_NAME_ASSIGNED_TASKS) int assignedTasks) {
         this.jobId = Preconditions.checkNotNull(jobId);
         this.resource = Preconditions.checkNotNull(resource);
+        this.assignedTasks = assignedTasks;
     }
 
-    public SlotInfo(JobID jobId, ResourceProfile resource) {
-        this(jobId, ResourceProfileInfo.fromResourceProfile(resource));
+    public SlotInfo(JobID jobId, ResourceProfile resource, int assignedTasks) {
+        this(jobId, ResourceProfileInfo.fromResourceProfile(resource), 
assignedTasks);
     }
 
     @JsonIgnore
@@ -71,6 +78,11 @@ public class SlotInfo implements ResponseBody, Serializable {
         return jobId;
     }
 
+    @JsonIgnore
+    public int getAssignedTasks() {
+        return assignedTasks;
+    }
+
     @JsonIgnore
     public ResourceProfileInfo getResource() {
         return resource;
@@ -85,11 +97,13 @@ public class SlotInfo implements ResponseBody, Serializable 
{
             return false;
         }
         SlotInfo that = (SlotInfo) o;
-        return Objects.equals(jobId, that.jobId) && Objects.equals(resource, 
that.resource);
+        return Objects.equals(jobId, that.jobId)
+                && Objects.equals(resource, that.resource)
+                && Objects.equals(assignedTasks, that.assignedTasks);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(jobId, resource);
+        return Objects.hash(jobId, resource, assignedTasks);
     }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerDetailsInfo.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerDetailsInfo.java
index c9df8325e10..e06ce258531 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerDetailsInfo.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerDetailsInfo.java
@@ -45,6 +45,8 @@ public class TaskManagerDetailsInfo extends TaskManagerInfo {
 
     public static final String FIELD_NAME_ALLOCATED_SLOTS = "allocatedSlots";
 
+    public static final String FIELD_NAME_ASSIGNED_TASKS = "assignedTasks";
+
     @JsonProperty(FIELD_NAME_METRICS)
     private final TaskManagerMetricsInfo taskManagerMetrics;
 
@@ -62,6 +64,7 @@ public class TaskManagerDetailsInfo extends TaskManagerInfo {
             @JsonProperty(FIELD_NAME_LAST_HEARTBEAT) long lastHeartbeat,
             @JsonProperty(FIELD_NAME_NUMBER_SLOTS) int numberSlots,
             @JsonProperty(FIELD_NAME_NUMBER_AVAILABLE_SLOTS) int 
numberAvailableSlots,
+            @JsonProperty(FIELD_NAME_ASSIGNED_TASKS) int assignedTasks,
             @JsonProperty(FIELD_NAME_TOTAL_RESOURCE) ResourceProfileInfo 
totalResource,
             @JsonProperty(FIELD_NAME_AVAILABLE_RESOURCE) ResourceProfileInfo 
freeResource,
             @JsonProperty(FIELD_NAME_HARDWARE) HardwareDescription 
hardwareDescription,
@@ -77,6 +80,7 @@ public class TaskManagerDetailsInfo extends TaskManagerInfo {
                 lastHeartbeat,
                 numberSlots,
                 numberAvailableSlots,
+                assignedTasks,
                 totalResource,
                 freeResource,
                 hardwareDescription,
@@ -98,6 +102,7 @@ public class TaskManagerDetailsInfo extends TaskManagerInfo {
                 
taskManagerInfoWithSlots.getTaskManagerInfo().getLastHeartbeat(),
                 taskManagerInfoWithSlots.getTaskManagerInfo().getNumberSlots(),
                 
taskManagerInfoWithSlots.getTaskManagerInfo().getNumberAvailableSlots(),
+                
taskManagerInfoWithSlots.getTaskManagerInfo().getAssignedTasks(),
                 
taskManagerInfoWithSlots.getTaskManagerInfo().getTotalResource(),
                 
taskManagerInfoWithSlots.getTaskManagerInfo().getFreeResource(),
                 
taskManagerInfoWithSlots.getTaskManagerInfo().getHardwareDescription(),
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerInfo.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerInfo.java
index cca037e2aa6..043c2039ec4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerInfo.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerInfo.java
@@ -59,6 +59,8 @@ public class TaskManagerInfo implements ResponseBody, 
Serializable {
 
     public static final String FIELD_NAME_NUMBER_AVAILABLE_SLOTS = "freeSlots";
 
+    public static final String FIELD_NAME_ASSIGNED_TASKS = "assignedTasks";
+
     public static final String FIELD_NAME_TOTAL_RESOURCE = "totalResource";
 
     public static final String FIELD_NAME_AVAILABLE_RESOURCE = "freeResource";
@@ -93,6 +95,9 @@ public class TaskManagerInfo implements ResponseBody, 
Serializable {
     @JsonProperty(FIELD_NAME_NUMBER_AVAILABLE_SLOTS)
     private final int numberAvailableSlots;
 
+    @JsonProperty(FIELD_NAME_ASSIGNED_TASKS)
+    private final int assignedTasks;
+
     @JsonProperty(FIELD_NAME_TOTAL_RESOURCE)
     private final ResourceProfileInfo totalResource;
 
@@ -121,6 +126,7 @@ public class TaskManagerInfo implements ResponseBody, 
Serializable {
             @JsonProperty(FIELD_NAME_LAST_HEARTBEAT) long lastHeartbeat,
             @JsonProperty(FIELD_NAME_NUMBER_SLOTS) int numberSlots,
             @JsonProperty(FIELD_NAME_NUMBER_AVAILABLE_SLOTS) int 
numberAvailableSlots,
+            @JsonProperty(FIELD_NAME_ASSIGNED_TASKS) int assignedTasks,
             @JsonProperty(FIELD_NAME_TOTAL_RESOURCE) ResourceProfileInfo 
totalResource,
             @JsonProperty(FIELD_NAME_AVAILABLE_RESOURCE) ResourceProfileInfo 
freeResource,
             @JsonProperty(FIELD_NAME_HARDWARE) HardwareDescription 
hardwareDescription,
@@ -133,6 +139,7 @@ public class TaskManagerInfo implements ResponseBody, 
Serializable {
         this.lastHeartbeat = lastHeartbeat;
         this.numberSlots = numberSlots;
         this.numberAvailableSlots = numberAvailableSlots;
+        this.assignedTasks = assignedTasks;
         this.totalResource = totalResource;
         this.freeResource = freeResource;
         this.hardwareDescription = 
Preconditions.checkNotNull(hardwareDescription);
@@ -148,6 +155,7 @@ public class TaskManagerInfo implements ResponseBody, 
Serializable {
             long lastHeartbeat,
             int numberSlots,
             int numberAvailableSlots,
+            int assignedTasks,
             ResourceProfile totalResource,
             ResourceProfile freeResource,
             HardwareDescription hardwareDescription,
@@ -161,6 +169,7 @@ public class TaskManagerInfo implements ResponseBody, 
Serializable {
                 lastHeartbeat,
                 numberSlots,
                 numberAvailableSlots,
+                assignedTasks,
                 ResourceProfileInfo.fromResourceProfile(totalResource),
                 ResourceProfileInfo.fromResourceProfile(freeResource),
                 hardwareDescription,
@@ -203,6 +212,11 @@ public class TaskManagerInfo implements ResponseBody, 
Serializable {
         return numberAvailableSlots;
     }
 
+    @JsonIgnore
+    public int getAssignedTasks() {
+        return assignedTasks;
+    }
+
     @JsonIgnore
     public ResourceProfileInfo getTotalResource() {
         return totalResource;
@@ -242,6 +256,7 @@ public class TaskManagerInfo implements ResponseBody, 
Serializable {
                 && lastHeartbeat == that.lastHeartbeat
                 && numberSlots == that.numberSlots
                 && numberAvailableSlots == that.numberAvailableSlots
+                && Objects.equals(assignedTasks, that.assignedTasks)
                 && Objects.equals(totalResource, that.totalResource)
                 && Objects.equals(freeResource, that.freeResource)
                 && Objects.equals(resourceId, that.resourceId)
@@ -261,6 +276,7 @@ public class TaskManagerInfo implements ResponseBody, 
Serializable {
                 lastHeartbeat,
                 numberSlots,
                 numberAvailableSlots,
+                assignedTasks,
                 totalResource,
                 freeResource,
                 hardwareDescription,
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/SlotStatus.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/SlotStatus.java
index 2295defb5af..3ca916a22e3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/SlotStatus.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/SlotStatus.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.taskexecutor;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
@@ -49,19 +50,33 @@ public class SlotStatus implements Serializable {
      */
     private final JobID jobID;
 
+    private final int assignedTasks;
+
+    @VisibleForTesting
     public SlotStatus(SlotID slotID, ResourceProfile resourceProfile) {
-        this(slotID, resourceProfile, null, null);
+        this(slotID, resourceProfile, null, null, 0);
     }
 
+    @VisibleForTesting
     public SlotStatus(
             SlotID slotID,
             ResourceProfile resourceProfile,
             JobID jobID,
             AllocationID allocationID) {
+        this(slotID, resourceProfile, jobID, allocationID, 0);
+    }
+
+    public SlotStatus(
+            SlotID slotID,
+            ResourceProfile resourceProfile,
+            JobID jobID,
+            AllocationID allocationID,
+            int assignedTasks) {
         this.slotID = checkNotNull(slotID, "slotID cannot be null");
         this.resourceProfile = checkNotNull(resourceProfile, "profile cannot 
be null");
         this.allocationID = allocationID;
         this.jobID = jobID;
+        this.assignedTasks = assignedTasks;
     }
 
     /**
@@ -100,6 +115,10 @@ public class SlotStatus implements Serializable {
         return jobID;
     }
 
+    public int getAssignedTasks() {
+        return assignedTasks;
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) {
@@ -143,6 +162,8 @@ public class SlotStatus implements Serializable {
                 + allocationID
                 + ", jobID="
                 + jobID
+                + ", assignedTasks="
+                + assignedTasks
                 + ", resourceProfile="
                 + resourceProfile
                 + '}';
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlot.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlot.java
index a1f2f981891..817b19ca7cd 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlot.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlot.java
@@ -141,6 +141,10 @@ public class TaskSlot<T extends TaskSlotPayload> 
implements AutoCloseableAsync {
         return tasks.isEmpty();
     }
 
+    public int getAssignedTasks() {
+        return tasks.size();
+    }
+
     public boolean isActive(JobID activeJobId, AllocationID 
activeAllocationId) {
         Preconditions.checkNotNull(activeJobId);
         Preconditions.checkNotNull(activeAllocationId);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableImpl.java
index bfed45c0a31..8bdc56f04c6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableImpl.java
@@ -244,9 +244,10 @@ public class TaskSlotTableImpl<T extends TaskSlotPayload> 
implements TaskSlotTab
                                 slotId,
                                 taskSlot.getResourceProfile(),
                                 taskSlot.getJobId(),
-                                taskSlot.getAllocationId());
+                                taskSlot.getAllocationId(),
+                                taskSlot.getAssignedTasks());
             } else {
-                slotStatus = new SlotStatus(slotId, 
defaultSlotResourceProfile, null, null);
+                slotStatus = new SlotStatus(slotId, 
defaultSlotResourceProfile, null, null, 0);
             }
 
             slotStatuses.add(slotStatus);
@@ -259,14 +260,13 @@ public class TaskSlotTableImpl<T extends TaskSlotPayload> 
implements TaskSlotTab
                                 new SlotID(resourceId, taskSlot.getIndex()),
                                 taskSlot.getResourceProfile(),
                                 taskSlot.getJobId(),
-                                taskSlot.getAllocationId());
+                                taskSlot.getAllocationId(),
+                                taskSlot.getAssignedTasks());
                 slotStatuses.add(slotStatus);
             }
         }
 
-        final SlotReport slotReport = new SlotReport(slotStatuses);
-
-        return slotReport;
+        return new SlotReport(slotStatuses);
     }
 
     // ---------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
index 761f94b08bc..c95c56f2b04 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
@@ -18,7 +18,9 @@
 
 package org.apache.flink.runtime.resourcemanager;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.clusterframework.types.SlotID;
@@ -26,6 +28,7 @@ import org.apache.flink.runtime.instance.HardwareDescription;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElection;
 import org.apache.flink.runtime.registration.RegistrationResponse;
+import org.apache.flink.runtime.rest.messages.taskmanager.SlotInfo;
 import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.rpc.TestingRpcService;
 import org.apache.flink.runtime.rpc.exceptions.FencingTokenException;
@@ -48,6 +51,7 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
 
 import java.time.Duration;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
@@ -370,4 +374,107 @@ class ResourceManagerTaskExecutorTest {
                         1),
                 TIMEOUT);
     }
+
+    @Test
+    void testRequestTaskManagerDetailsInfo() throws Exception {
+        final Duration fastTimeout = Duration.ofMillis(1L);
+        try {
+            final OneShotLatch startConnection = new OneShotLatch();
+            final OneShotLatch finishConnection = new OneShotLatch();
+
+            // first registration is with blocking connection
+            rpcService.setRpcGatewayFutureFunction(
+                    rpcGateway ->
+                            CompletableFuture.supplyAsync(
+                                    () -> {
+                                        startConnection.trigger();
+                                        try {
+                                            finishConnection.await();
+                                        } catch (InterruptedException ignored) 
{
+                                        }
+                                        return rpcGateway;
+                                    },
+                                    EXECUTOR_EXTENSION.getExecutor()));
+
+            TaskExecutorRegistration taskExecutorRegistration =
+                    new TaskExecutorRegistration(
+                            taskExecutorGateway.getAddress(),
+                            taskExecutorResourceID,
+                            dataPort,
+                            jmxPort,
+                            hardwareDescription,
+                            new TaskExecutorMemoryConfiguration(
+                                    1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L),
+                            DEFAULT_SLOT_PROFILE,
+                            DEFAULT_SLOT_PROFILE,
+                            taskExecutorGateway.getAddress(),
+                            0);
+
+            CompletableFuture<RegistrationResponse> firstFuture =
+                    rmGateway.registerTaskExecutor(taskExecutorRegistration, 
fastTimeout);
+            assertThatFuture(firstFuture)
+                    .as(
+                            "Should have failed because connection to 
taskmanager is delayed beyond timeout")
+                    .eventuallyFails()
+                    .withThrowableOfType(Exception.class)
+                    .withCauseInstanceOf(TimeoutException.class)
+                    
.withMessageContaining("ResourceManagerGateway.registerTaskExecutor");
+
+            startConnection.await();
+
+            // second registration after timeout is with no delay, expecting 
it to be succeeded
+            rpcService.resetRpcGatewayFutureFunction();
+            CompletableFuture<RegistrationResponse> secondFuture =
+                    rmGateway.registerTaskExecutor(taskExecutorRegistration, 
TIMEOUT);
+            RegistrationResponse response = secondFuture.get();
+
+            // Test request task manager information before slot report.
+            TaskManagerInfoWithSlots taskManagerInfoWithSlots =
+                    
rmGateway.requestTaskManagerDetailsInfo(taskExecutorResourceID, TIMEOUT).get();
+
+            
assertThat(taskManagerInfoWithSlots.getTaskManagerInfo().getAssignedTasks()).isZero();
+            assertThat(taskManagerInfoWithSlots.getAllocatedSlots())
+                    .allMatch(slotInfo -> slotInfo.getAssignedTasks() == 0);
+
+            // on success, send slot report for taskmanager registration
+            final JobID ignoredJobID = new JobID();
+            final SlotReport slotReport =
+                    new SlotReport(
+                            Arrays.asList(
+                                    new SlotStatus(
+                                            new SlotID(taskExecutorResourceID, 
1),
+                                            ResourceProfile.ANY,
+                                            ignoredJobID,
+                                            new AllocationID(),
+                                            1),
+                                    new SlotStatus(
+                                            new SlotID(taskExecutorResourceID, 
1),
+                                            ResourceProfile.ANY,
+                                            ignoredJobID,
+                                            new AllocationID(),
+                                            0)));
+            rmGateway
+                    .sendSlotReport(
+                            taskExecutorResourceID,
+                            ((TaskExecutorRegistrationSuccess) 
response).getRegistrationId(),
+                            slotReport,
+                            TIMEOUT)
+                    .get();
+
+            // let the remaining part of the first registration proceed
+            finishConnection.trigger();
+            Thread.sleep(1L);
+
+            // Test request task manager information after slot report.
+            taskManagerInfoWithSlots =
+                    
rmGateway.requestTaskManagerDetailsInfo(taskExecutorResourceID, TIMEOUT).get();
+            
assertThat(taskManagerInfoWithSlots.getTaskManagerInfo().getAssignedTasks()).isOne();
+            assertThat(
+                            
taskManagerInfoWithSlots.getAllocatedSlots().stream()
+                                    .map(SlotInfo::getAssignedTasks))
+                    .containsExactlyInAnyOrder(0, 1);
+        } finally {
+            rpcService.resetRpcGatewayFutureFunction();
+        }
+    }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
index 935096c6e63..2096c95fb97 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
@@ -206,6 +206,7 @@ class ResourceManagerTest {
         assertThat(taskManagerInfo.getJmxPort()).isEqualTo(jmxPort);
         assertThat(taskManagerInfo.getNumberSlots()).isEqualTo(0);
         assertThat(taskManagerInfo.getNumberAvailableSlots()).isEqualTo(0);
+        assertThat(taskManagerInfo.getAssignedTasks()).isZero();
         assertThat(taskManagerInfoWithSlots.getAllocatedSlots()).isEmpty();
     }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingSlotManager.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingSlotManager.java
index d2550b5d9f0..f14a1e5b985 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingSlotManager.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingSlotManager.java
@@ -54,6 +54,11 @@ public class TestingSlotManager implements SlotManager {
         this.triggerRequirementsCheckConsumer = 
triggerRequirementsCheckConsumer;
     }
 
+    @Override
+    public int getAssignedTasksOf(InstanceID instanceId) {
+        return 0;
+    }
+
     @Override
     public int getNumberRegisteredSlots() {
         return 0;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerDetailsHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerDetailsHandlerTest.java
index 5e7a1f2dd40..5f3825b252b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerDetailsHandlerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerDetailsHandlerTest.java
@@ -173,6 +173,7 @@ class TaskManagerDetailsHandlerTest {
                 0L,
                 0,
                 0,
+                0,
                 ResourceProfile.ZERO,
                 ResourceProfile.ZERO,
                 new HardwareDescription(0, 0L, 0L, 0L),
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerDetailsInfoTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerDetailsInfoTest.java
index 959f1dcd406..97a20dcb1df 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerDetailsInfoTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerDetailsInfoTest.java
@@ -48,7 +48,8 @@ class TaskManagerDetailsInfoTest extends 
RestResponseMarshallingTestBase<TaskMan
         final TaskManagerInfoWithSlots taskManagerInfoWithSlots =
                 new TaskManagerInfoWithSlots(
                         TaskManagerInfoTest.createRandomTaskManagerInfo(),
-                        Collections.singletonList(new SlotInfo(new JobID(), 
ResourceProfile.ANY)));
+                        Collections.singletonList(
+                                new SlotInfo(new JobID(), ResourceProfile.ANY, 
0)));
         final TaskManagerMetricsInfo taskManagerMetricsInfo = 
createRandomTaskManagerMetricsInfo();
 
         return new TaskManagerDetailsInfo(taskManagerInfoWithSlots, 
taskManagerMetricsInfo);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerInfoTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerInfoTest.java
index acacd53aba7..2c765bf56fe 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerInfoTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerInfoTest.java
@@ -55,6 +55,7 @@ class TaskManagerInfoTest extends 
RestResponseMarshallingTestBase<TaskManagerInf
                 random.nextLong(),
                 random.nextInt(),
                 random.nextInt(),
+                0,
                 ResourceProfile.ZERO,
                 ResourceProfile.ZERO,
                 new HardwareDescription(

Reply via email to