This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch release-1.14 in repository https://gitbox.apache.org/repos/asf/flink.git
commit c6caf1472750f1c64e5010f4676bd8a0dacf7b8b Author: Chesnay Schepler <ches...@apache.org> AuthorDate: Tue Aug 31 13:28:14 2021 +0200 [FLINK-24058][coordination][tests] Harden TaskSlotTableImplTest Instead of checking that the the timeout is not being triggered, instead check that the timeout is being cancelled. --- .../taskexecutor/slot/TaskSlotTableImplTest.java | 33 ++++++++++++---------- .../runtime/taskexecutor/slot/TaskSlotUtils.java | 2 +- 2 files changed, 19 insertions(+), 16 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableImplTest.java index 578bf81..23e692e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableImplTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableImplTest.java @@ -41,15 +41,12 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; -import static org.junit.Assert.fail; /** Tests for the {@link TaskSlotTable}. */ public class TaskSlotTableImplTest extends TestLogger { @@ -487,15 +484,15 @@ public class TaskSlotTableImplTest extends TestLogger { SlotNotFoundException> taskSlotTableAction) throws Exception { - final CompletableFuture<AllocationID> timeoutFuture = new CompletableFuture<>(); - final TestingSlotActions testingSlotActions = - new TestingSlotActionsBuilder() - .setTimeoutSlotConsumer( - (allocationID, uuid) -> timeoutFuture.complete(allocationID)) - .build(); + final CompletableFuture<AllocationID> timeoutCancellationFuture = new CompletableFuture<>(); + + final TimerService<AllocationID> testingTimerService = + new TestingTimerServiceBuilder<AllocationID>() + .setUnregisterTimeoutConsumer(timeoutCancellationFuture::complete) + .createTestingTimerService(); try (final TaskSlotTableImpl<TaskSlotPayload> taskSlotTable = - createTaskSlotTableAndStart(1, testingSlotActions)) { + createTaskSlotTableAndStart(1, testingTimerService)) { final AllocationID allocationId = new AllocationID(); final long timeout = 50L; final JobID jobId = new JobID(); @@ -504,11 +501,7 @@ public class TaskSlotTableImplTest extends TestLogger { is(true)); assertThat(taskSlotTableAction.apply(taskSlotTable, jobId, allocationId), is(true)); - try { - timeoutFuture.get(timeout, TimeUnit.MILLISECONDS); - fail("The slot timeout should have been deactivated."); - } catch (TimeoutException expected) { - } + timeoutCancellationFuture.get(); } } @@ -548,4 +541,14 @@ public class TaskSlotTableImplTest extends TestLogger { taskSlotTable.start(slotActions, ComponentMainThreadExecutorServiceAdapter.forMainThread()); return taskSlotTable; } + + private static TaskSlotTableImpl<TaskSlotPayload> createTaskSlotTableAndStart( + final int numberOfSlots, TimerService<AllocationID> timerService) { + final TaskSlotTableImpl<TaskSlotPayload> taskSlotTable = + TaskSlotUtils.createTaskSlotTable(numberOfSlots, timerService); + taskSlotTable.start( + new TestingSlotActionsBuilder().build(), + ComponentMainThreadExecutorServiceAdapter.forMainThread()); + return taskSlotTable; + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotUtils.java index 3ad63b4..23c01e9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotUtils.java @@ -52,7 +52,7 @@ public enum TaskSlotUtils { numberOfSlots, createDefaultTimerService(timeout.toMilliseconds())); } - private static <T extends TaskSlotPayload> TaskSlotTableImpl<T> createTaskSlotTable( + public static <T extends TaskSlotPayload> TaskSlotTableImpl<T> createTaskSlotTable( int numberOfSlots, TimerService<AllocationID> timerService) { return new TaskSlotTableImpl<>( numberOfSlots,