This is an automated email from the ASF dual-hosted git repository.

guoyangze pushed a commit to branch release-1.18
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e95cb6e73900fbbc2039407be1bd87271b2a950b
Author: Yangze Guo <karma...@gmail.com>
AuthorDate: Mon Feb 19 17:58:06 2024 +0800

    [hotfix][test] Assert the slot allocation eventually succeed in dedicated 
tests of DefaultSlotStatusSyncerTest
    
    Also deduplicate the code of these tests.
---
 .../slotmanager/DefaultSlotStatusSyncerTest.java   | 178 ++++++++-------------
 1 file changed, 69 insertions(+), 109 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultSlotStatusSyncerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultSlotStatusSyncerTest.java
index cf60a7e5e5f..4c3d2d5a723 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultSlotStatusSyncerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultSlotStatusSyncerTest.java
@@ -19,11 +19,11 @@ package 
org.apache.flink.runtime.resourcemanager.slotmanager;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.api.java.tuple.Tuple6;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
 import 
org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
@@ -35,15 +35,18 @@ import 
org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
 import org.apache.flink.testutils.TestingUtils;
 import org.apache.flink.testutils.executor.TestExecutorExtension;
 import org.apache.flink.util.concurrent.FutureUtils;
+import org.apache.flink.util.function.QuadConsumer;
 
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
 
 import java.util.Arrays;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeoutException;
 
+import static org.apache.flink.core.testutils.FlinkAssertions.assertThatFuture;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
@@ -60,118 +63,18 @@ class DefaultSlotStatusSyncerTest {
             TestingUtils.defaultExecutorExtension();
 
     @Test
-    void testAllocateSlot() throws Exception {
-        final FineGrainedTaskManagerTracker taskManagerTracker =
-                new FineGrainedTaskManagerTracker();
-        final CompletableFuture<
-                        Tuple6<
-                                SlotID,
-                                JobID,
-                                AllocationID,
-                                ResourceProfile,
-                                String,
-                                ResourceManagerId>>
-                requestFuture = new CompletableFuture<>();
-        final CompletableFuture<Acknowledge> responseFuture = new 
CompletableFuture<>();
-        final TestingTaskExecutorGateway taskExecutorGateway =
-                new TestingTaskExecutorGatewayBuilder()
-                        .setRequestSlotFunction(
-                                tuple6 -> {
-                                    requestFuture.complete(tuple6);
-                                    return responseFuture;
-                                })
-                        .createTestingTaskExecutorGateway();
-        final TaskExecutorConnection taskExecutorConnection =
-                new TaskExecutorConnection(ResourceID.generate(), 
taskExecutorGateway);
-        taskManagerTracker.addTaskManager(
-                taskExecutorConnection, ResourceProfile.ANY, 
ResourceProfile.ANY);
-        final ResourceTracker resourceTracker = new DefaultResourceTracker();
-        final JobID jobId = new JobID();
-        final SlotStatusSyncer slotStatusSyncer =
-                new DefaultSlotStatusSyncer(TASK_MANAGER_REQUEST_TIMEOUT);
-        slotStatusSyncer.initialize(
-                taskManagerTracker,
-                resourceTracker,
-                ResourceManagerId.generate(),
-                EXECUTOR_RESOURCE.getExecutor());
-
-        final CompletableFuture<Void> allocatedFuture =
-                slotStatusSyncer.allocateSlot(
-                        taskExecutorConnection.getInstanceID(),
-                        jobId,
-                        "address",
-                        ResourceProfile.ANY);
-        final AllocationID allocationId = requestFuture.get().f2;
-        assertThat(resourceTracker.getAcquiredResources(jobId))
-                .contains(ResourceRequirement.create(ResourceProfile.ANY, 1));
-        assertThat(taskManagerTracker.getAllocatedOrPendingSlot(allocationId))
-                .hasValueSatisfying(
-                        slot -> {
-                            assertThat(slot.getJobId()).isEqualTo(jobId);
-                            
assertThat(slot.getState()).isEqualTo(SlotState.PENDING);
-                        });
-
-        responseFuture.complete(Acknowledge.get());
-        assertThat(allocatedFuture).isNotCompletedExceptionally();
+    void testSlotAllocationSucceeds() throws Exception {
+        testSlotAllocation((ignored0, ignored1, ignored2, ignored3) -> {});
     }
 
     @Test
     void testAllocationUpdatesIgnoredIfSlotFreed() throws Exception {
-        final FineGrainedTaskManagerTracker taskManagerTracker =
-                new FineGrainedTaskManagerTracker();
-        final CompletableFuture<
-                        Tuple6<
-                                SlotID,
-                                JobID,
-                                AllocationID,
-                                ResourceProfile,
-                                String,
-                                ResourceManagerId>>
-                requestFuture = new CompletableFuture<>();
-        final CompletableFuture<Acknowledge> responseFuture = new 
CompletableFuture<>();
-        final TestingTaskExecutorGateway taskExecutorGateway =
-                new TestingTaskExecutorGatewayBuilder()
-                        .setRequestSlotFunction(
-                                tuple6 -> {
-                                    requestFuture.complete(tuple6);
-                                    return responseFuture;
-                                })
-                        .createTestingTaskExecutorGateway();
-        final TaskExecutorConnection taskExecutorConnection =
-                new TaskExecutorConnection(ResourceID.generate(), 
taskExecutorGateway);
-        taskManagerTracker.addTaskManager(
-                taskExecutorConnection, ResourceProfile.ANY, 
ResourceProfile.ANY);
-        final ResourceTracker resourceTracker = new DefaultResourceTracker();
-        final JobID jobId = new JobID();
-        final SlotStatusSyncer slotStatusSyncer =
-                new DefaultSlotStatusSyncer(TASK_MANAGER_REQUEST_TIMEOUT);
-        slotStatusSyncer.initialize(
-                taskManagerTracker,
-                resourceTracker,
-                ResourceManagerId.generate(),
-                EXECUTOR_RESOURCE.getExecutor());
-
-        final CompletableFuture<Void> allocatedFuture =
-                slotStatusSyncer.allocateSlot(
-                        taskExecutorConnection.getInstanceID(),
-                        jobId,
-                        "address",
-                        ResourceProfile.ANY);
-        final AllocationID allocationId = requestFuture.get().f2;
-        assertThat(resourceTracker.getAcquiredResources(jobId))
-                .contains(ResourceRequirement.create(ResourceProfile.ANY, 1));
-        assertThat(taskManagerTracker.getAllocatedOrPendingSlot(allocationId))
-                .hasValueSatisfying(
-                        slot -> {
-                            assertThat(slot.getJobId()).isEqualTo(jobId);
-                            
assertThat(slot.getState()).isEqualTo(SlotState.PENDING);
-                        });
-
-        slotStatusSyncer.freeSlot(allocationId);
-        
assertThat(taskManagerTracker.getAllocatedOrPendingSlot(allocationId)).isEmpty();
-
-        responseFuture.complete(Acknowledge.get());
-        assertThat(allocatedFuture).isNotCompletedExceptionally();
+        testSlotAllocation(
+                (slotStatusSyncer, taskManagerTracker, ignored, allocationId) 
-> {
+                    slotStatusSyncer.freeSlot(allocationId);
+                    
assertThat(taskManagerTracker.getAllocatedOrPendingSlot(allocationId))
+                            .isEmpty();
+                });
     }
 
     @Test
@@ -349,4 +252,61 @@ class DefaultSlotStatusSyncerTest {
                 .hasValueSatisfying(
                         slot -> 
assertThat(slot.getState()).isEqualTo(SlotState.PENDING));
     }
+
+    private static void testSlotAllocation(
+            QuadConsumer<SlotStatusSyncer, TaskManagerTracker, InstanceID, 
AllocationID>
+                    beforeCompletingSlotRequestCallback)
+            throws ExecutionException, InterruptedException {
+        final FineGrainedTaskManagerTracker taskManagerTracker =
+                new FineGrainedTaskManagerTracker();
+        final CompletableFuture<AllocationID> requestFuture = new 
CompletableFuture<>();
+
+        final CompletableFuture<Acknowledge> responseFuture = new 
CompletableFuture<>();
+        final TestingTaskExecutorGateway taskExecutorGateway =
+                new TestingTaskExecutorGatewayBuilder()
+                        .setRequestSlotFunction(
+                                tuple6 -> {
+                                    requestFuture.complete(tuple6.f2);
+                                    return responseFuture;
+                                })
+                        .createTestingTaskExecutorGateway();
+        final TaskExecutorConnection taskExecutorConnection =
+                new TaskExecutorConnection(ResourceID.generate(), 
taskExecutorGateway);
+        taskManagerTracker.addTaskManager(
+                taskExecutorConnection, ResourceProfile.ANY, 
ResourceProfile.ANY);
+        final ResourceTracker resourceTracker = new DefaultResourceTracker();
+        final JobID jobId = new JobID();
+        final SlotStatusSyncer slotStatusSyncer =
+                new DefaultSlotStatusSyncer(TASK_MANAGER_REQUEST_TIMEOUT);
+        slotStatusSyncer.initialize(
+                taskManagerTracker,
+                resourceTracker,
+                ResourceManagerId.generate(),
+                EXECUTOR_RESOURCE.getExecutor());
+
+        final CompletableFuture<Void> allocatedFuture =
+                slotStatusSyncer.allocateSlot(
+                        taskExecutorConnection.getInstanceID(),
+                        jobId,
+                        "address",
+                        ResourceProfile.ANY);
+        final AllocationID allocationId = requestFuture.get();
+        assertThat(resourceTracker.getAcquiredResources(jobId))
+                .contains(ResourceRequirement.create(ResourceProfile.ANY, 1));
+        assertThat(taskManagerTracker.getAllocatedOrPendingSlot(allocationId))
+                .hasValueSatisfying(
+                        slot -> {
+                            assertThat(slot.getJobId()).isEqualTo(jobId);
+                            
assertThat(slot.getState()).isEqualTo(SlotState.PENDING);
+                        });
+
+        beforeCompletingSlotRequestCallback.accept(
+                slotStatusSyncer,
+                taskManagerTracker,
+                taskExecutorConnection.getInstanceID(),
+                allocationId);
+
+        responseFuture.complete(Acknowledge.get());
+        assertThatFuture(allocatedFuture).eventuallySucceeds();
+    }
 }

Reply via email to