This is an automated email from the ASF dual-hosted git repository.
huweihua pushed a commit to branch release-1.20
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.20 by this push:
new c595cdd7179 [FLINK-37813][runtime] Pass numberSlots via
WorkerRegistration instead of SlotReport (#27100)
c595cdd7179 is described below
commit c595cdd7179f456dd2e7c02b3943d3a3d7c891b3
Author: Weihua Hu <[email protected]>
AuthorDate: Wed Dec 3 11:10:18 2025 +0800
[FLINK-37813][runtime] Pass numberSlots via WorkerRegistration instead of
SlotReport (#27100)
* [FLINK-37813][runtime] Pass numberSlots via WorkerRegistration instead of
SlotReport
SlotReport should only report the current slot usage, while numberSlots is
an attribute of the TaskManager and should not be included in the
SlotReport.
---
.../runtime/resourcemanager/ResourceManager.java | 5 +-
.../resourcemanager/TaskExecutorRegistration.java | 11 ++++-
.../registration/WorkerRegistration.java | 10 +++-
.../flink/runtime/taskexecutor/TaskExecutor.java | 3 +-
.../ResourceManagerPartitionLifecycleTest.java | 3 +-
.../ResourceManagerTaskExecutorTest.java | 9 ++--
.../resourcemanager/ResourceManagerTest.java | 6 ++-
.../active/ActiveResourceManagerTest.java | 3 +-
.../taskexecutor/TaskExecutorRecoveryTest.java | 54 ++++++++++++++++++++--
...askExecutorToResourceManagerConnectionTest.java | 3 +-
10 files changed, 89 insertions(+), 18 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 177b045449e..5f2060ddf4c 100755
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -518,7 +518,7 @@ public abstract class ResourceManager<WorkerType extends
ResourceIDRetrievable>
WorkerResourceSpec workerResourceSpec =
WorkerResourceSpec.fromTotalResourceProfile(
workerTypeWorkerRegistration.getTotalResourceProfile(),
- slotReport.getNumSlotStatus());
+ workerTypeWorkerRegistration.getNumberSlots());
onWorkerRegistered(workerTypeWorkerRegistration.getWorker(),
workerResourceSpec);
} else if (registrationResult ==
SlotManager.RegistrationResult.REJECTED) {
closeTaskManagerConnection(
@@ -1074,7 +1074,8 @@ public abstract class ResourceManager<WorkerType extends
ResourceIDRetrievable>
taskExecutorRegistration.getMemoryConfiguration(),
taskExecutorRegistration.getTotalResourceProfile(),
taskExecutorRegistration.getDefaultSlotResourceProfile(),
- taskExecutorRegistration.getNodeId());
+ taskExecutorRegistration.getNodeId(),
+ taskExecutorRegistration.getNumberSlots());
log.info(
"Registering TaskManager with ResourceID {} ({}) at
ResourceManager",
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/TaskExecutorRegistration.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/TaskExecutorRegistration.java
index b3b9c51a3f5..6f8530b6a13 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/TaskExecutorRegistration.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/TaskExecutorRegistration.java
@@ -64,6 +64,9 @@ public class TaskExecutorRegistration implements Serializable
{
*/
private final String nodeId;
+ /** Number of slots in static slot allocation. */
+ private final int numberSlots;
+
public TaskExecutorRegistration(
final String taskExecutorAddress,
final ResourceID resourceId,
@@ -73,7 +76,8 @@ public class TaskExecutorRegistration implements Serializable
{
final TaskExecutorMemoryConfiguration memoryConfiguration,
final ResourceProfile defaultSlotResourceProfile,
final ResourceProfile totalResourceProfile,
- final String nodeId) {
+ final String nodeId,
+ final int numberSlots) {
this.taskExecutorAddress = checkNotNull(taskExecutorAddress);
this.resourceId = checkNotNull(resourceId);
this.dataPort = dataPort;
@@ -83,6 +87,7 @@ public class TaskExecutorRegistration implements Serializable
{
this.defaultSlotResourceProfile =
checkNotNull(defaultSlotResourceProfile);
this.totalResourceProfile = checkNotNull(totalResourceProfile);
this.nodeId = checkNotNull(nodeId);
+ this.numberSlots = numberSlots;
}
public String getTaskExecutorAddress() {
@@ -120,4 +125,8 @@ public class TaskExecutorRegistration implements
Serializable {
public String getNodeId() {
return nodeId;
}
+
+ public int getNumberSlots() {
+ return numberSlots;
+ }
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/WorkerRegistration.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/WorkerRegistration.java
index 79f402e4cd1..660fb33103d 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/WorkerRegistration.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/WorkerRegistration.java
@@ -45,6 +45,8 @@ public class WorkerRegistration<WorkerType extends
ResourceIDRetrievable>
private final String nodeId;
+ private final int numberSlots;
+
public WorkerRegistration(
TaskExecutorGateway taskExecutorGateway,
WorkerType worker,
@@ -54,7 +56,8 @@ public class WorkerRegistration<WorkerType extends
ResourceIDRetrievable>
TaskExecutorMemoryConfiguration memoryConfiguration,
ResourceProfile totalResourceProfile,
ResourceProfile defaultSlotResourceProfile,
- String nodeId) {
+ String nodeId,
+ int numberSlots) {
super(worker.getResourceID(), taskExecutorGateway);
@@ -66,6 +69,7 @@ public class WorkerRegistration<WorkerType extends
ResourceIDRetrievable>
this.totalResourceProfile =
Preconditions.checkNotNull(totalResourceProfile);
this.defaultSlotResourceProfile =
Preconditions.checkNotNull(defaultSlotResourceProfile);
this.nodeId = Preconditions.checkNotNull(nodeId);
+ this.numberSlots = numberSlots;
}
public WorkerType getWorker() {
@@ -99,4 +103,8 @@ public class WorkerRegistration<WorkerType extends
ResourceIDRetrievable>
public String getNodeId() {
return nodeId;
}
+
+ public int getNumberSlots() {
+ return numberSlots;
+ }
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index f90f0dce4ba..fe6c7004e0f 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -1566,7 +1566,8 @@ public class TaskExecutor extends RpcEndpoint implements
TaskExecutorGateway {
memoryConfiguration,
taskManagerConfiguration.getDefaultSlotResourceProfile(),
taskManagerConfiguration.getTotalResourceProfile(),
- unresolvedTaskManagerLocation.getNodeId());
+ unresolvedTaskManagerLocation.getNodeId(),
+ taskManagerConfiguration.getNumberSlots());
resourceManagerConnection =
new TaskExecutorToResourceManagerConnection(
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerPartitionLifecycleTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerPartitionLifecycleTest.java
index 44ca1cded16..e581a037db0 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerPartitionLifecycleTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerPartitionLifecycleTest.java
@@ -187,7 +187,8 @@ class ResourceManagerPartitionLifecycleTest {
1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L),
ResourceProfile.ZERO,
ResourceProfile.ZERO,
- taskExecutorAddress);
+ taskExecutorAddress,
+ 1);
final CompletableFuture<RegistrationResponse> registrationFuture =
resourceManagerGateway.registerTaskExecutor(
taskExecutorRegistration, TestingUtils.TIMEOUT);
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
index 90cb6213957..ed64eb26dce 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
@@ -222,7 +222,8 @@ class ResourceManagerTaskExecutorTest {
1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L),
DEFAULT_SLOT_PROFILE,
DEFAULT_SLOT_PROFILE,
- taskExecutorGateway.getAddress());
+ taskExecutorGateway.getAddress(),
+ 1);
CompletableFuture<RegistrationResponse> firstFuture =
rmGateway.registerTaskExecutor(taskExecutorRegistration,
fastTimeout);
@@ -287,7 +288,8 @@ class ResourceManagerTaskExecutorTest {
1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L),
DEFAULT_SLOT_PROFILE,
DEFAULT_SLOT_PROFILE.multiply(numberSlots),
- taskExecutorGateway.getAddress());
+ taskExecutorGateway.getAddress(),
+ numberSlots);
final RegistrationResponse registrationResponse =
rmGateway.registerTaskExecutor(taskExecutorRegistration,
TIMEOUT).get();
assertThat(registrationResponse).isInstanceOf(TaskExecutorRegistrationSuccess.class);
@@ -364,7 +366,8 @@ class ResourceManagerTaskExecutorTest {
1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L),
DEFAULT_SLOT_PROFILE,
DEFAULT_SLOT_PROFILE,
- taskExecutorAddress),
+ taskExecutorAddress,
+ 1),
TIMEOUT);
}
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
index a006a147358..3ad74bdfce4 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
@@ -255,7 +255,8 @@ class ResourceManagerTest {
1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L),
ResourceProfile.ZERO,
ResourceProfile.ZERO,
- taskExecutorAddress);
+ taskExecutorAddress,
+ 1);
final CompletableFuture<RegistrationResponse> registrationFuture =
resourceManagerGateway.registerTaskExecutor(
taskExecutorRegistration, TestingUtils.TIMEOUT);
@@ -768,7 +769,8 @@ class ResourceManagerTest {
1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L),
ResourceProfile.fromResources(1, 1024),
ResourceProfile.fromResources(1,
1024).multiply(slotCount),
- taskExecutorGateway.getAddress());
+ taskExecutorGateway.getAddress(),
+ slotCount);
RegistrationResponse registrationResult =
resourceManagerGateway
.registerTaskExecutor(taskExecutorRegistration,
TestingUtils.TIMEOUT)
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerTest.java
index 1c1133662fb..1642b1c63db 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerTest.java
@@ -1246,7 +1246,8 @@ class ActiveResourceManagerTest {
TESTING_CONFIG,
ResourceProfile.ZERO,
ResourceProfile.ZERO,
- resourceID.toString());
+ resourceID.toString(),
+ 1);
return resourceManager
.getSelfGateway(ResourceManagerGateway.class)
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorRecoveryTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorRecoveryTest.java
index b8d3b0dd6a8..860f47ae508 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorRecoveryTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorRecoveryTest.java
@@ -27,12 +27,15 @@ import org.apache.flink.core.testutils.EachCallbackWrapper;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.entrypoint.ClusterInformation;
import org.apache.flink.runtime.entrypoint.WorkingDirectory;
import
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGateway;
import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder;
import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.resourcemanager.TaskExecutorRegistration;
import
org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.rpc.TestingRpcServiceExtension;
@@ -63,8 +66,19 @@ class TaskExecutorRecoveryTest {
new EachCallbackWrapper<>(rpcServiceExtension);
@Test
- void testRecoveredTaskExecutorWillRestoreAllocationState(@TempDir File
tempDir)
- throws Exception {
+ void
testRecoveredTaskExecutorWillRestoreAllocationStateWithFixedSlotRequest(
+ @TempDir File tempDir) throws Exception {
+ testRecoveredTaskExecutorWillRestoreAllocationState(tempDir, false);
+ }
+
+ @Test
+ void
testRecoveredTaskExecutorWillRestoreAllocationStateWithDynamicSlotRequest(
+ @TempDir File tempDir) throws Exception {
+ testRecoveredTaskExecutorWillRestoreAllocationState(tempDir, true);
+ }
+
+ private void testRecoveredTaskExecutorWillRestoreAllocationState(
+ File tempDir, boolean useDynamicRequest) throws Exception {
final ResourceID resourceId = ResourceID.generate();
final Configuration configuration = new Configuration();
@@ -82,6 +96,20 @@ class TaskExecutorRecoveryTest {
return
CompletableFuture.completedFuture(Acknowledge.get());
});
+ final ArrayBlockingQueue<TaskExecutorRegistration>
taskExecutorRegistrations =
+ new ArrayBlockingQueue<>(2);
+
+ testingResourceManagerGateway.setRegisterTaskExecutorFunction(
+ taskExecutorRegistration -> {
+ taskExecutorRegistrations.offer(taskExecutorRegistration);
+ return CompletableFuture.completedFuture(
+ new TaskExecutorRegistrationSuccess(
+ new InstanceID(),
+ taskExecutorRegistration.getResourceId(),
+ new ClusterInformation("localhost", 1234),
+ null));
+ });
+
final TestingRpcService rpcService =
rpcServiceExtension.getTestingRpcService();
rpcService.registerGateway(
testingResourceManagerGateway.getAddress(),
testingResourceManagerGateway);
@@ -118,8 +146,14 @@ class TaskExecutorRecoveryTest {
assertThat(slotReport.getNumSlotStatus(), is(2));
+ final TaskExecutorRegistration taskExecutorRegistration =
taskExecutorRegistrations.take();
+ assertThat(taskExecutorRegistration.getNumberSlots(), is(2));
+
final SlotStatus slotStatus = slotReport.iterator().next();
- final SlotID allocatedSlotID = slotStatus.getSlotID();
+ final SlotID allocatedSlotID =
+ useDynamicRequest
+ ?
SlotID.getDynamicSlotID(slotStatus.getSlotID().getResourceID())
+ : slotStatus.getSlotID();
final AllocationID allocationId = new AllocationID();
taskExecutorGateway
@@ -160,9 +194,15 @@ class TaskExecutorRecoveryTest {
recoveredTaskExecutor.start();
final TaskExecutorSlotReport recoveredSlotReport = queue.take();
-
+ final int expectedNumberOfSlots = useDynamicRequest ? 3 : 2;
+ assertThat(
+ recoveredSlotReport.getSlotReport().getNumSlotStatus(),
is(expectedNumberOfSlots));
for (SlotStatus status : recoveredSlotReport.getSlotReport()) {
- if (status.getSlotID().equals(allocatedSlotID)) {
+ boolean isAllocatedSlot =
+ useDynamicRequest
+ ? status.getSlotID().getSlotNumber() == 2
+ : status.getSlotID().equals(allocatedSlotID);
+ if (isAllocatedSlot) {
assertThat(status.getJobID(), is(jobId));
assertThat(status.getAllocationID(), is(allocationId));
} else {
@@ -170,6 +210,10 @@ class TaskExecutorRecoveryTest {
}
}
+ final TaskExecutorRegistration recoveredTaskExecutorRegistration =
+ taskExecutorRegistrations.take();
+ assertThat(recoveredTaskExecutorRegistration.getNumberSlots(), is(2));
+
final Collection<SlotOffer> take = offeredSlots.take();
assertThat(take, hasSize(1));
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnectionTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnectionTest.java
index 6d19c1d6c26..4fd0545328f 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnectionTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnectionTest.java
@@ -140,7 +140,8 @@ class TaskExecutorToResourceManagerConnectionTest {
TASK_MANAGER_MEMORY_CONFIGURATION,
ResourceProfile.ZERO,
ResourceProfile.ZERO,
- TASK_MANAGER_NODE_ID);
+ TASK_MANAGER_NODE_ID,
+ 1);
return new TaskExecutorToResourceManagerConnection(
LOGGER,
rpcService,