This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit b32d80e6da102b450bed97d8b3ea59825a769686 Author: David Moravek <d...@apache.org> AuthorDate: Mon Mar 27 12:33:02 2023 +0200 [hotfix][tests] Remove timeout --- .../runtime/scheduler/DefaultSchedulerTest.java | 17 ++++++------- .../scheduler/adaptive/AdaptiveSchedulerTest.java | 28 ++++++++++------------ 2 files changed, 22 insertions(+), 23 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java index 0f14e3c81f2..8ebe408d4b4 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java @@ -109,7 +109,6 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.slf4j.Logger; -import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -132,7 +131,6 @@ import java.util.stream.StreamSupport; import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createExecutionAttemptId; import static org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPoolTest.createSlotOffersForResourceRequirements; -import static org.apache.flink.runtime.jobmaster.slotpool.SlotPoolTestUtils.offerSlots; import static org.apache.flink.runtime.scheduler.SchedulerTestingUtils.acknowledgePendingCheckpoint; import static org.apache.flink.runtime.scheduler.SchedulerTestingUtils.createFailedTaskExecutionState; import static org.apache.flink.runtime.scheduler.SchedulerTestingUtils.enableCheckpointing; @@ -175,7 +173,7 @@ public class DefaultSchedulerTest extends TestLogger { @BeforeEach void setUp() { executor = Executors.newSingleThreadExecutor(); - scheduledExecutorService = new DirectScheduledExecutorService(); + scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); configuration = new Configuration(); @@ -1526,6 +1524,9 @@ public class DefaultSchedulerTest extends TestLogger { final AdaptiveSchedulerTest.SubmissionBufferingTaskManagerGateway taskManagerGateway = new AdaptiveSchedulerTest.SubmissionBufferingTaskManagerGateway(1); + final TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation(); + assertThat(slotPool.registerTaskManager(taskManagerLocation.getResourceID())).isTrue(); + taskManagerGateway.setCancelConsumer( executionAttemptId -> { singleThreadMainThreadExecutor.execute( @@ -1539,15 +1540,15 @@ public class DefaultSchedulerTest extends TestLogger { () -> { scheduler.startScheduling(); - offerSlots( - slotPool, + slotPool.offerSlots( + taskManagerLocation, + taskManagerGateway, createSlotOffersForResourceRequirements( - ResourceCounter.withResource(ResourceProfile.UNKNOWN, 1)), - taskManagerGateway); + ResourceCounter.withResource(ResourceProfile.UNKNOWN, 1))); }); // wait for the first task submission - taskManagerGateway.waitForSubmissions(1, Duration.ofSeconds(5)); + taskManagerGateway.waitForSubmissions(1); // sleep a bit to ensure uptime is > 0 Thread.sleep(10L); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java index afa1442ce7b..de5126c1175 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java @@ -312,7 +312,7 @@ public class AdaptiveSchedulerTest { }); // wait for all tasks to be submitted - taskManagerGateway.waitForSubmissions(numAvailableSlots, Duration.ofSeconds(5)); + taskManagerGateway.waitForSubmissions(numAvailableSlots); final ArchivedExecutionGraph executionGraph = CompletableFuture.supplyAsync( @@ -364,7 +364,7 @@ public class AdaptiveSchedulerTest { }); // Wait for just the first submission to indicate the execution graph is ready - taskManagerGateway.waitForSubmissions(1, Duration.ofSeconds(5)); + taskManagerGateway.waitForSubmissions(1); final ArchivedExecutionGraph executionGraph = CompletableFuture.supplyAsync( @@ -501,7 +501,7 @@ public class AdaptiveSchedulerTest { }); // wait for the first task submission - taskManagerGateway.waitForSubmissions(1, Duration.ofSeconds(5)); + taskManagerGateway.waitForSubmissions(1); assertThat(numRestartsMetric.getValue()).isEqualTo(0L); @@ -517,7 +517,7 @@ public class AdaptiveSchedulerTest { }); // wait for the second task submissions - taskManagerGateway.waitForSubmissions(PARALLELISM, Duration.ofSeconds(5)); + taskManagerGateway.waitForSubmissions(PARALLELISM); assertThat(numRestartsMetric.getValue()).isEqualTo(1L); } @@ -589,7 +589,7 @@ public class AdaptiveSchedulerTest { }); // wait for the first task submission - taskManagerGateway.waitForSubmissions(1, Duration.ofSeconds(5)); + taskManagerGateway.waitForSubmissions(1); CommonTestUtils.waitUntilCondition(() -> upTimeGauge.getValue() > 0L); assertThat(downTimeGauge.getValue()).isEqualTo(0L); @@ -606,7 +606,7 @@ public class AdaptiveSchedulerTest { }); // wait for the second task submissions - taskManagerGateway.waitForSubmissions(2, Duration.ofSeconds(5)); + taskManagerGateway.waitForSubmissions(2); CommonTestUtils.waitUntilCondition(() -> upTimeGauge.getValue() > 0L); assertThat(downTimeGauge.getValue()).isEqualTo(0L); @@ -708,7 +708,7 @@ public class AdaptiveSchedulerTest { assertThat(startingStateFuture.get()).isInstanceOf(WaitingForResources.class); // Wait for all tasks to be submitted - taskManagerGateway.waitForSubmissions(PARALLELISM, Duration.ofSeconds(5)); + taskManagerGateway.waitForSubmissions(PARALLELISM); final ArchivedExecutionGraph executionGraph = CompletableFuture.supplyAsync( @@ -915,7 +915,7 @@ public class AdaptiveSchedulerTest { }); // Wait for task to be submitted - taskManagerGateway.waitForSubmissions(1, Duration.ofSeconds(5)); + taskManagerGateway.waitForSubmissions(1); ArchivedExecutionGraph executionGraph = getArchivedExecutionGraphForRunningJob(scheduler).get(); @@ -938,7 +938,7 @@ public class AdaptiveSchedulerTest { }); // wait for the job to be re-submitted - taskManagerGateway.waitForSubmissions(parallelism, Duration.ofSeconds(5)); + taskManagerGateway.waitForSubmissions(parallelism); ArchivedExecutionGraph resubmittedExecutionGraph = getArchivedExecutionGraphForRunningJob(scheduler).get(); @@ -1064,7 +1064,7 @@ public class AdaptiveSchedulerTest { // wait for all tasks to be deployed // this is important because some tests trigger savepoints // these only properly work if the deployment has been started - taskManagerGateway.waitForSubmissions(PARALLELISM, TestingUtils.infiniteDuration()); + taskManagerGateway.waitForSubmissions(PARALLELISM); CompletableFuture<Iterable<ArchivedExecutionVertex>> vertexFuture = new CompletableFuture<>(); @@ -1570,16 +1570,14 @@ public class AdaptiveSchedulerTest { * Block until an arbitrary number of submissions have been received. * * @param numSubmissions The number of submissions to wait for - * @param perTaskTimeout The max amount of time to wait between each submission * @return the list of the waited-for submissions * @throws InterruptedException if a timeout is exceeded waiting for a submission */ - public List<TaskDeploymentDescriptor> waitForSubmissions( - int numSubmissions, Duration perTaskTimeout) throws InterruptedException { + public List<TaskDeploymentDescriptor> waitForSubmissions(int numSubmissions) + throws InterruptedException { List<TaskDeploymentDescriptor> descriptors = new ArrayList<>(); for (int i = 0; i < numSubmissions; i++) { - descriptors.add( - submittedTasks.poll(perTaskTimeout.toMillis(), TimeUnit.MILLISECONDS)); + descriptors.add(submittedTasks.take()); } return descriptors; }