This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b32d80e6da102b450bed97d8b3ea59825a769686
Author: David Moravek <d...@apache.org>
AuthorDate: Mon Mar 27 12:33:02 2023 +0200

    [hotfix][tests] Remove timeout
---
 .../runtime/scheduler/DefaultSchedulerTest.java    | 17 ++++++-------
 .../scheduler/adaptive/AdaptiveSchedulerTest.java  | 28 ++++++++++------------
 2 files changed, 22 insertions(+), 23 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
index 0f14e3c81f2..8ebe408d4b4 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
@@ -109,7 +109,6 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.slf4j.Logger;
 
-import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -132,7 +131,6 @@ import java.util.stream.StreamSupport;
 
 import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createExecutionAttemptId;
 import static 
org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPoolTest.createSlotOffersForResourceRequirements;
-import static 
org.apache.flink.runtime.jobmaster.slotpool.SlotPoolTestUtils.offerSlots;
 import static 
org.apache.flink.runtime.scheduler.SchedulerTestingUtils.acknowledgePendingCheckpoint;
 import static 
org.apache.flink.runtime.scheduler.SchedulerTestingUtils.createFailedTaskExecutionState;
 import static 
org.apache.flink.runtime.scheduler.SchedulerTestingUtils.enableCheckpointing;
@@ -175,7 +173,7 @@ public class DefaultSchedulerTest extends TestLogger {
     @BeforeEach
     void setUp() {
         executor = Executors.newSingleThreadExecutor();
-        scheduledExecutorService = new DirectScheduledExecutorService();
+        scheduledExecutorService = 
Executors.newSingleThreadScheduledExecutor();
 
         configuration = new Configuration();
 
@@ -1526,6 +1524,9 @@ public class DefaultSchedulerTest extends TestLogger {
         final AdaptiveSchedulerTest.SubmissionBufferingTaskManagerGateway 
taskManagerGateway =
                 new 
AdaptiveSchedulerTest.SubmissionBufferingTaskManagerGateway(1);
 
+        final TaskManagerLocation taskManagerLocation = new 
LocalTaskManagerLocation();
+        
assertThat(slotPool.registerTaskManager(taskManagerLocation.getResourceID())).isTrue();
+
         taskManagerGateway.setCancelConsumer(
                 executionAttemptId -> {
                     singleThreadMainThreadExecutor.execute(
@@ -1539,15 +1540,15 @@ public class DefaultSchedulerTest extends TestLogger {
                 () -> {
                     scheduler.startScheduling();
 
-                    offerSlots(
-                            slotPool,
+                    slotPool.offerSlots(
+                            taskManagerLocation,
+                            taskManagerGateway,
                             createSlotOffersForResourceRequirements(
-                                    
ResourceCounter.withResource(ResourceProfile.UNKNOWN, 1)),
-                            taskManagerGateway);
+                                    
ResourceCounter.withResource(ResourceProfile.UNKNOWN, 1)));
                 });
 
         // wait for the first task submission
-        taskManagerGateway.waitForSubmissions(1, Duration.ofSeconds(5));
+        taskManagerGateway.waitForSubmissions(1);
 
         // sleep a bit to ensure uptime is > 0
         Thread.sleep(10L);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
index afa1442ce7b..de5126c1175 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
@@ -312,7 +312,7 @@ public class AdaptiveSchedulerTest {
                 });
 
         // wait for all tasks to be submitted
-        taskManagerGateway.waitForSubmissions(numAvailableSlots, 
Duration.ofSeconds(5));
+        taskManagerGateway.waitForSubmissions(numAvailableSlots);
 
         final ArchivedExecutionGraph executionGraph =
                 CompletableFuture.supplyAsync(
@@ -364,7 +364,7 @@ public class AdaptiveSchedulerTest {
                 });
 
         // Wait for just the first submission to indicate the execution graph 
is ready
-        taskManagerGateway.waitForSubmissions(1, Duration.ofSeconds(5));
+        taskManagerGateway.waitForSubmissions(1);
 
         final ArchivedExecutionGraph executionGraph =
                 CompletableFuture.supplyAsync(
@@ -501,7 +501,7 @@ public class AdaptiveSchedulerTest {
                 });
 
         // wait for the first task submission
-        taskManagerGateway.waitForSubmissions(1, Duration.ofSeconds(5));
+        taskManagerGateway.waitForSubmissions(1);
 
         assertThat(numRestartsMetric.getValue()).isEqualTo(0L);
 
@@ -517,7 +517,7 @@ public class AdaptiveSchedulerTest {
                 });
 
         // wait for the second task submissions
-        taskManagerGateway.waitForSubmissions(PARALLELISM, 
Duration.ofSeconds(5));
+        taskManagerGateway.waitForSubmissions(PARALLELISM);
 
         assertThat(numRestartsMetric.getValue()).isEqualTo(1L);
     }
@@ -589,7 +589,7 @@ public class AdaptiveSchedulerTest {
                 });
 
         // wait for the first task submission
-        taskManagerGateway.waitForSubmissions(1, Duration.ofSeconds(5));
+        taskManagerGateway.waitForSubmissions(1);
 
         CommonTestUtils.waitUntilCondition(() -> upTimeGauge.getValue() > 0L);
         assertThat(downTimeGauge.getValue()).isEqualTo(0L);
@@ -606,7 +606,7 @@ public class AdaptiveSchedulerTest {
                 });
 
         // wait for the second task submissions
-        taskManagerGateway.waitForSubmissions(2, Duration.ofSeconds(5));
+        taskManagerGateway.waitForSubmissions(2);
 
         CommonTestUtils.waitUntilCondition(() -> upTimeGauge.getValue() > 0L);
         assertThat(downTimeGauge.getValue()).isEqualTo(0L);
@@ -708,7 +708,7 @@ public class AdaptiveSchedulerTest {
         
assertThat(startingStateFuture.get()).isInstanceOf(WaitingForResources.class);
 
         // Wait for all tasks to be submitted
-        taskManagerGateway.waitForSubmissions(PARALLELISM, 
Duration.ofSeconds(5));
+        taskManagerGateway.waitForSubmissions(PARALLELISM);
 
         final ArchivedExecutionGraph executionGraph =
                 CompletableFuture.supplyAsync(
@@ -915,7 +915,7 @@ public class AdaptiveSchedulerTest {
                 });
 
         // Wait for task to be submitted
-        taskManagerGateway.waitForSubmissions(1, Duration.ofSeconds(5));
+        taskManagerGateway.waitForSubmissions(1);
 
         ArchivedExecutionGraph executionGraph =
                 getArchivedExecutionGraphForRunningJob(scheduler).get();
@@ -938,7 +938,7 @@ public class AdaptiveSchedulerTest {
                 });
 
         // wait for the job to be re-submitted
-        taskManagerGateway.waitForSubmissions(parallelism, 
Duration.ofSeconds(5));
+        taskManagerGateway.waitForSubmissions(parallelism);
 
         ArchivedExecutionGraph resubmittedExecutionGraph =
                 getArchivedExecutionGraphForRunningJob(scheduler).get();
@@ -1064,7 +1064,7 @@ public class AdaptiveSchedulerTest {
         // wait for all tasks to be deployed
         // this is important because some tests trigger savepoints
         // these only properly work if the deployment has been started
-        taskManagerGateway.waitForSubmissions(PARALLELISM, 
TestingUtils.infiniteDuration());
+        taskManagerGateway.waitForSubmissions(PARALLELISM);
 
         CompletableFuture<Iterable<ArchivedExecutionVertex>> vertexFuture =
                 new CompletableFuture<>();
@@ -1570,16 +1570,14 @@ public class AdaptiveSchedulerTest {
          * Block until an arbitrary number of submissions have been received.
          *
          * @param numSubmissions The number of submissions to wait for
-         * @param perTaskTimeout The max amount of time to wait between each 
submission
          * @return the list of the waited-for submissions
          * @throws InterruptedException if a timeout is exceeded waiting for a 
submission
          */
-        public List<TaskDeploymentDescriptor> waitForSubmissions(
-                int numSubmissions, Duration perTaskTimeout) throws 
InterruptedException {
+        public List<TaskDeploymentDescriptor> waitForSubmissions(int 
numSubmissions)
+                throws InterruptedException {
             List<TaskDeploymentDescriptor> descriptors = new ArrayList<>();
             for (int i = 0; i < numSubmissions; i++) {
-                descriptors.add(
-                        submittedTasks.poll(perTaskTimeout.toMillis(), 
TimeUnit.MILLISECONDS));
+                descriptors.add(submittedTasks.take());
             }
             return descriptors;
         }

Reply via email to