This is an automated email from the ASF dual-hosted git repository. guoyangze pushed a commit to branch release-1.19 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 8cf29969d9aec4943713f0a6096b703718ce0dd0 Author: Yangze Guo <karma...@gmail.com> AuthorDate: Mon Feb 19 17:58:06 2024 +0800 [hotfix][test] Assert the slot allocation eventually succeed in dedicated tests of DefaultSlotStatusSyncerTest Also deduplicate the code of these tests. --- .../slotmanager/DefaultSlotStatusSyncerTest.java | 178 ++++++++------------- 1 file changed, 69 insertions(+), 109 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultSlotStatusSyncerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultSlotStatusSyncerTest.java index cf60a7e5e5f..4c3d2d5a723 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultSlotStatusSyncerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultSlotStatusSyncerTest.java @@ -19,11 +19,11 @@ package org.apache.flink.runtime.resourcemanager.slotmanager; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; -import org.apache.flink.api.java.tuple.Tuple6; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.clusterframework.types.SlotID; +import org.apache.flink.runtime.instance.InstanceID; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.resourcemanager.ResourceManagerId; import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection; @@ -35,15 +35,18 @@ import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder; import org.apache.flink.testutils.TestingUtils; import org.apache.flink.testutils.executor.TestExecutorExtension; import org.apache.flink.util.concurrent.FutureUtils; +import org.apache.flink.util.function.QuadConsumer; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; import java.util.Arrays; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeoutException; +import static org.apache.flink.core.testutils.FlinkAssertions.assertThatFuture; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -60,118 +63,18 @@ class DefaultSlotStatusSyncerTest { TestingUtils.defaultExecutorExtension(); @Test - void testAllocateSlot() throws Exception { - final FineGrainedTaskManagerTracker taskManagerTracker = - new FineGrainedTaskManagerTracker(); - final CompletableFuture< - Tuple6< - SlotID, - JobID, - AllocationID, - ResourceProfile, - String, - ResourceManagerId>> - requestFuture = new CompletableFuture<>(); - final CompletableFuture<Acknowledge> responseFuture = new CompletableFuture<>(); - final TestingTaskExecutorGateway taskExecutorGateway = - new TestingTaskExecutorGatewayBuilder() - .setRequestSlotFunction( - tuple6 -> { - requestFuture.complete(tuple6); - return responseFuture; - }) - .createTestingTaskExecutorGateway(); - final TaskExecutorConnection taskExecutorConnection = - new TaskExecutorConnection(ResourceID.generate(), taskExecutorGateway); - taskManagerTracker.addTaskManager( - taskExecutorConnection, ResourceProfile.ANY, ResourceProfile.ANY); - final ResourceTracker resourceTracker = new DefaultResourceTracker(); - final JobID jobId = new JobID(); - final SlotStatusSyncer slotStatusSyncer = - new DefaultSlotStatusSyncer(TASK_MANAGER_REQUEST_TIMEOUT); - slotStatusSyncer.initialize( - taskManagerTracker, - resourceTracker, - ResourceManagerId.generate(), - EXECUTOR_RESOURCE.getExecutor()); - - final CompletableFuture<Void> allocatedFuture = - slotStatusSyncer.allocateSlot( - taskExecutorConnection.getInstanceID(), - jobId, - "address", - ResourceProfile.ANY); - final AllocationID allocationId = requestFuture.get().f2; - assertThat(resourceTracker.getAcquiredResources(jobId)) - .contains(ResourceRequirement.create(ResourceProfile.ANY, 1)); - assertThat(taskManagerTracker.getAllocatedOrPendingSlot(allocationId)) - .hasValueSatisfying( - slot -> { - assertThat(slot.getJobId()).isEqualTo(jobId); - assertThat(slot.getState()).isEqualTo(SlotState.PENDING); - }); - - responseFuture.complete(Acknowledge.get()); - assertThat(allocatedFuture).isNotCompletedExceptionally(); + void testSlotAllocationSucceeds() throws Exception { + testSlotAllocation((ignored0, ignored1, ignored2, ignored3) -> {}); } @Test void testAllocationUpdatesIgnoredIfSlotFreed() throws Exception { - final FineGrainedTaskManagerTracker taskManagerTracker = - new FineGrainedTaskManagerTracker(); - final CompletableFuture< - Tuple6< - SlotID, - JobID, - AllocationID, - ResourceProfile, - String, - ResourceManagerId>> - requestFuture = new CompletableFuture<>(); - final CompletableFuture<Acknowledge> responseFuture = new CompletableFuture<>(); - final TestingTaskExecutorGateway taskExecutorGateway = - new TestingTaskExecutorGatewayBuilder() - .setRequestSlotFunction( - tuple6 -> { - requestFuture.complete(tuple6); - return responseFuture; - }) - .createTestingTaskExecutorGateway(); - final TaskExecutorConnection taskExecutorConnection = - new TaskExecutorConnection(ResourceID.generate(), taskExecutorGateway); - taskManagerTracker.addTaskManager( - taskExecutorConnection, ResourceProfile.ANY, ResourceProfile.ANY); - final ResourceTracker resourceTracker = new DefaultResourceTracker(); - final JobID jobId = new JobID(); - final SlotStatusSyncer slotStatusSyncer = - new DefaultSlotStatusSyncer(TASK_MANAGER_REQUEST_TIMEOUT); - slotStatusSyncer.initialize( - taskManagerTracker, - resourceTracker, - ResourceManagerId.generate(), - EXECUTOR_RESOURCE.getExecutor()); - - final CompletableFuture<Void> allocatedFuture = - slotStatusSyncer.allocateSlot( - taskExecutorConnection.getInstanceID(), - jobId, - "address", - ResourceProfile.ANY); - final AllocationID allocationId = requestFuture.get().f2; - assertThat(resourceTracker.getAcquiredResources(jobId)) - .contains(ResourceRequirement.create(ResourceProfile.ANY, 1)); - assertThat(taskManagerTracker.getAllocatedOrPendingSlot(allocationId)) - .hasValueSatisfying( - slot -> { - assertThat(slot.getJobId()).isEqualTo(jobId); - assertThat(slot.getState()).isEqualTo(SlotState.PENDING); - }); - - slotStatusSyncer.freeSlot(allocationId); - assertThat(taskManagerTracker.getAllocatedOrPendingSlot(allocationId)).isEmpty(); - - responseFuture.complete(Acknowledge.get()); - assertThat(allocatedFuture).isNotCompletedExceptionally(); + testSlotAllocation( + (slotStatusSyncer, taskManagerTracker, ignored, allocationId) -> { + slotStatusSyncer.freeSlot(allocationId); + assertThat(taskManagerTracker.getAllocatedOrPendingSlot(allocationId)) + .isEmpty(); + }); } @Test @@ -349,4 +252,61 @@ class DefaultSlotStatusSyncerTest { .hasValueSatisfying( slot -> assertThat(slot.getState()).isEqualTo(SlotState.PENDING)); } + + private static void testSlotAllocation( + QuadConsumer<SlotStatusSyncer, TaskManagerTracker, InstanceID, AllocationID> + beforeCompletingSlotRequestCallback) + throws ExecutionException, InterruptedException { + final FineGrainedTaskManagerTracker taskManagerTracker = + new FineGrainedTaskManagerTracker(); + final CompletableFuture<AllocationID> requestFuture = new CompletableFuture<>(); + + final CompletableFuture<Acknowledge> responseFuture = new CompletableFuture<>(); + final TestingTaskExecutorGateway taskExecutorGateway = + new TestingTaskExecutorGatewayBuilder() + .setRequestSlotFunction( + tuple6 -> { + requestFuture.complete(tuple6.f2); + return responseFuture; + }) + .createTestingTaskExecutorGateway(); + final TaskExecutorConnection taskExecutorConnection = + new TaskExecutorConnection(ResourceID.generate(), taskExecutorGateway); + taskManagerTracker.addTaskManager( + taskExecutorConnection, ResourceProfile.ANY, ResourceProfile.ANY); + final ResourceTracker resourceTracker = new DefaultResourceTracker(); + final JobID jobId = new JobID(); + final SlotStatusSyncer slotStatusSyncer = + new DefaultSlotStatusSyncer(TASK_MANAGER_REQUEST_TIMEOUT); + slotStatusSyncer.initialize( + taskManagerTracker, + resourceTracker, + ResourceManagerId.generate(), + EXECUTOR_RESOURCE.getExecutor()); + + final CompletableFuture<Void> allocatedFuture = + slotStatusSyncer.allocateSlot( + taskExecutorConnection.getInstanceID(), + jobId, + "address", + ResourceProfile.ANY); + final AllocationID allocationId = requestFuture.get(); + assertThat(resourceTracker.getAcquiredResources(jobId)) + .contains(ResourceRequirement.create(ResourceProfile.ANY, 1)); + assertThat(taskManagerTracker.getAllocatedOrPendingSlot(allocationId)) + .hasValueSatisfying( + slot -> { + assertThat(slot.getJobId()).isEqualTo(jobId); + assertThat(slot.getState()).isEqualTo(SlotState.PENDING); + }); + + beforeCompletingSlotRequestCallback.accept( + slotStatusSyncer, + taskManagerTracker, + taskExecutorConnection.getInstanceID(), + allocationId); + + responseFuture.complete(Acknowledge.get()); + assertThatFuture(allocatedFuture).eventuallySucceeds(); + } }