This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c6caf1472750f1c64e5010f4676bd8a0dacf7b8b
Author: Chesnay Schepler <ches...@apache.org>
AuthorDate: Tue Aug 31 13:28:14 2021 +0200

    [FLINK-24058][coordination][tests] Harden TaskSlotTableImplTest
    
    Instead of checking that the the timeout is not being triggered, instead 
check that the timeout is being cancelled.
---
 .../taskexecutor/slot/TaskSlotTableImplTest.java   | 33 ++++++++++++----------
 .../runtime/taskexecutor/slot/TaskSlotUtils.java   |  2 +-
 2 files changed, 19 insertions(+), 16 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableImplTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableImplTest.java
index 578bf81..23e692e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableImplTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableImplTest.java
@@ -41,15 +41,12 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
-import static org.junit.Assert.fail;
 
 /** Tests for the {@link TaskSlotTable}. */
 public class TaskSlotTableImplTest extends TestLogger {
@@ -487,15 +484,15 @@ public class TaskSlotTableImplTest extends TestLogger {
                             SlotNotFoundException>
                     taskSlotTableAction)
             throws Exception {
-        final CompletableFuture<AllocationID> timeoutFuture = new 
CompletableFuture<>();
-        final TestingSlotActions testingSlotActions =
-                new TestingSlotActionsBuilder()
-                        .setTimeoutSlotConsumer(
-                                (allocationID, uuid) -> 
timeoutFuture.complete(allocationID))
-                        .build();
+        final CompletableFuture<AllocationID> timeoutCancellationFuture = new 
CompletableFuture<>();
+
+        final TimerService<AllocationID> testingTimerService =
+                new TestingTimerServiceBuilder<AllocationID>()
+                        
.setUnregisterTimeoutConsumer(timeoutCancellationFuture::complete)
+                        .createTestingTimerService();
 
         try (final TaskSlotTableImpl<TaskSlotPayload> taskSlotTable =
-                createTaskSlotTableAndStart(1, testingSlotActions)) {
+                createTaskSlotTableAndStart(1, testingTimerService)) {
             final AllocationID allocationId = new AllocationID();
             final long timeout = 50L;
             final JobID jobId = new JobID();
@@ -504,11 +501,7 @@ public class TaskSlotTableImplTest extends TestLogger {
                     is(true));
             assertThat(taskSlotTableAction.apply(taskSlotTable, jobId, 
allocationId), is(true));
 
-            try {
-                timeoutFuture.get(timeout, TimeUnit.MILLISECONDS);
-                fail("The slot timeout should have been deactivated.");
-            } catch (TimeoutException expected) {
-            }
+            timeoutCancellationFuture.get();
         }
     }
 
@@ -548,4 +541,14 @@ public class TaskSlotTableImplTest extends TestLogger {
         taskSlotTable.start(slotActions, 
ComponentMainThreadExecutorServiceAdapter.forMainThread());
         return taskSlotTable;
     }
+
+    private static TaskSlotTableImpl<TaskSlotPayload> 
createTaskSlotTableAndStart(
+            final int numberOfSlots, TimerService<AllocationID> timerService) {
+        final TaskSlotTableImpl<TaskSlotPayload> taskSlotTable =
+                TaskSlotUtils.createTaskSlotTable(numberOfSlots, timerService);
+        taskSlotTable.start(
+                new TestingSlotActionsBuilder().build(),
+                ComponentMainThreadExecutorServiceAdapter.forMainThread());
+        return taskSlotTable;
+    }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotUtils.java
index 3ad63b4..23c01e9 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotUtils.java
@@ -52,7 +52,7 @@ public enum TaskSlotUtils {
                 numberOfSlots, 
createDefaultTimerService(timeout.toMilliseconds()));
     }
 
-    private static <T extends TaskSlotPayload> TaskSlotTableImpl<T> 
createTaskSlotTable(
+    public static <T extends TaskSlotPayload> TaskSlotTableImpl<T> 
createTaskSlotTable(
             int numberOfSlots, TimerService<AllocationID> timerService) {
         return new TaskSlotTableImpl<>(
                 numberOfSlots,

Reply via email to