This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 3d676a355e910966fc878bde62d73c1721c79942 Author: David Moravek <d...@apache.org> AuthorDate: Mon Mar 27 13:21:15 2023 +0200 [FLINK-31399] Move desiredResources out of WaitingForResources --- .../scheduler/adaptive/AdaptiveScheduler.java | 27 ++++++--- .../scheduler/adaptive/WaitingForResources.java | 18 +----- .../scheduler/adaptive/AdaptiveSchedulerTest.java | 67 +++++++++------------- .../adaptive/WaitingForResourcesTest.java | 49 ++++------------ .../scheduler/adaptive/allocator/TestSlotInfo.java | 16 +++++- 5 files changed, 74 insertions(+), 103 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java index c028fd05ecf..e3499f231bf 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java @@ -217,6 +217,7 @@ public class AdaptiveScheduler private final DeploymentStateTimeMetrics deploymentTimeMetrics; private final BoundedFIFOQueue<RootExceptionHistoryEntry> exceptionHistory; + private ResourceCounter desiredResources = ResourceCounter.empty(); private final JobManagerJobMetricGroup jobManagerJobMetricGroup; @@ -734,12 +735,17 @@ public class AdaptiveScheduler // ---------------------------------------------------------------- @Override - public boolean hasDesiredResources(ResourceCounter desiredResources) { - final Collection<? extends SlotInfo> allSlots = + public boolean hasDesiredResources() { + final Collection<? extends SlotInfo> freeSlots = declarativeSlotPool.getFreeSlotsInformation(); - ResourceCounter outstandingResources = desiredResources; + return hasDesiredResources(desiredResources, freeSlots); + } - final Iterator<? extends SlotInfo> slotIterator = allSlots.iterator(); + @VisibleForTesting + static boolean hasDesiredResources( + ResourceCounter desiredResources, Collection<? extends SlotInfo> freeSlots) { + ResourceCounter outstandingResources = desiredResources; + final Iterator<? extends SlotInfo> slotIterator = freeSlots.iterator(); while (!outstandingResources.isEmpty() && slotIterator.hasNext()) { final SlotInfo slotInfo = slotIterator.next(); @@ -791,19 +797,26 @@ public class AdaptiveScheduler @Override public void goToWaitingForResources(@Nullable ExecutionGraph previousExecutionGraph) { - final ResourceCounter desiredResources = calculateDesiredResources(); - declarativeSlotPool.setResourceRequirements(desiredResources); + declareDesiredResources(); transitionToState( new WaitingForResources.Factory( this, LOG, - desiredResources, this.initialResourceAllocationTimeout, this.resourceStabilizationTimeout, previousExecutionGraph)); } + private void declareDesiredResources() { + final ResourceCounter newDesiredResources = calculateDesiredResources(); + + if (!newDesiredResources.equals(this.desiredResources)) { + this.desiredResources = newDesiredResources; + declarativeSlotPool.setResourceRequirements(this.desiredResources); + } + } + private ResourceCounter calculateDesiredResources() { return slotAllocator.calculateRequiredSlots(jobInformation.getVertices()); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResources.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResources.java index 62b20919ee1..b15fc25776b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResources.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResources.java @@ -23,7 +23,6 @@ import org.apache.flink.api.common.JobStatus; import org.apache.flink.api.common.time.Deadline; import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; import org.apache.flink.runtime.executiongraph.ExecutionGraph; -import org.apache.flink.runtime.util.ResourceCounter; import org.apache.flink.util.Preconditions; import org.apache.flink.util.clock.Clock; import org.apache.flink.util.clock.SystemClock; @@ -44,7 +43,6 @@ class WaitingForResources implements State, ResourceListener { private final Logger log; - private final ResourceCounter desiredResources; private final Clock clock; /** If set, there's an ongoing deadline waiting for a resource stabilization. */ @@ -60,13 +58,11 @@ class WaitingForResources implements State, ResourceListener { WaitingForResources( Context context, Logger log, - ResourceCounter desiredResources, Duration initialResourceAllocationTimeout, Duration resourceStabilizationTimeout) { this( context, log, - desiredResources, initialResourceAllocationTimeout, resourceStabilizationTimeout, SystemClock.getInstance(), @@ -76,22 +72,17 @@ class WaitingForResources implements State, ResourceListener { WaitingForResources( Context context, Logger log, - ResourceCounter desiredResources, Duration initialResourceAllocationTimeout, Duration resourceStabilizationTimeout, Clock clock, @Nullable ExecutionGraph previousExecutionGraph) { this.context = Preconditions.checkNotNull(context); this.log = Preconditions.checkNotNull(log); - this.desiredResources = Preconditions.checkNotNull(desiredResources); this.resourceStabilizationTimeout = Preconditions.checkNotNull(resourceStabilizationTimeout); this.clock = clock; Preconditions.checkNotNull(initialResourceAllocationTimeout); - Preconditions.checkArgument( - !desiredResources.isEmpty(), "Desired resources must not be empty"); - Preconditions.checkArgument( !resourceStabilizationTimeout.isNegative(), "Resource stabilization timeout must not be negative"); @@ -154,7 +145,7 @@ class WaitingForResources implements State, ResourceListener { } private void checkDesiredOrSufficientResourcesAvailable() { - if (context.hasDesiredResources(desiredResources)) { + if (context.hasDesiredResources()) { createExecutionGraphWithAvailableResources(); return; } @@ -207,10 +198,9 @@ class WaitingForResources implements State, ResourceListener { /** * Checks whether we have the desired resources. * - * @param desiredResources desiredResources describing the desired resources * @return {@code true} if we have enough resources; otherwise {@code false} */ - boolean hasDesiredResources(ResourceCounter desiredResources); + boolean hasDesiredResources(); /** * Checks if we currently have sufficient resources for executing the job. @@ -235,7 +225,6 @@ class WaitingForResources implements State, ResourceListener { private final Context context; private final Logger log; - private final ResourceCounter desiredResources; private final Duration initialResourceAllocationTimeout; private final Duration resourceStabilizationTimeout; @Nullable private final ExecutionGraph previousExecutionGraph; @@ -243,13 +232,11 @@ class WaitingForResources implements State, ResourceListener { public Factory( Context context, Logger log, - ResourceCounter desiredResources, Duration initialResourceAllocationTimeout, Duration resourceStabilizationTimeout, ExecutionGraph previousExecutionGraph) { this.context = context; this.log = log; - this.desiredResources = desiredResources; this.initialResourceAllocationTimeout = initialResourceAllocationTimeout; this.resourceStabilizationTimeout = resourceStabilizationTimeout; this.previousExecutionGraph = previousExecutionGraph; @@ -263,7 +250,6 @@ class WaitingForResources implements State, ResourceListener { return new WaitingForResources( context, log, - desiredResources, initialResourceAllocationTimeout, resourceStabilizationTimeout, SystemClock.getInstance(), diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java index de5126c1175..e9fa605e09b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java @@ -80,6 +80,7 @@ import org.apache.flink.runtime.scheduler.SchedulerBase; import org.apache.flink.runtime.scheduler.SchedulerNG; import org.apache.flink.runtime.scheduler.VertexParallelismInformation; import org.apache.flink.runtime.scheduler.VertexParallelismStore; +import org.apache.flink.runtime.scheduler.adaptive.allocator.TestSlotInfo; import org.apache.flink.runtime.scheduler.adaptive.allocator.TestingSlotAllocator; import org.apache.flink.runtime.scheduler.exceptionhistory.ExceptionHistoryEntry; import org.apache.flink.runtime.scheduler.exceptionhistory.RootExceptionHistoryEntry; @@ -108,7 +109,10 @@ import java.io.IOException; import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; @@ -217,56 +221,40 @@ public class AdaptiveSchedulerTest { } @Test - void testHasEnoughResourcesReturnsFalseIfUnsatisfied() throws Exception { - final AdaptiveScheduler scheduler = - new AdaptiveSchedulerBuilder(createJobGraph(), mainThreadExecutor) - .build(EXECUTOR_RESOURCE.getExecutor()); - - scheduler.startScheduling(); - + void testHasEnoughResourcesReturnsFalseIfUnsatisfied() { final ResourceCounter resourceRequirement = ResourceCounter.withResource(ResourceProfile.UNKNOWN, 1); - - assertThat(scheduler.hasDesiredResources(resourceRequirement)).isFalse(); + assertThat( + AdaptiveScheduler.hasDesiredResources( + resourceRequirement, Collections.emptyList())) + .isFalse(); } @Test - void testHasEnoughResourcesReturnsTrueIfSatisfied() throws Exception { - final JobGraph jobGraph = createJobGraph(); - - final DefaultDeclarativeSlotPool declarativeSlotPool = - createDeclarativeSlotPool(jobGraph.getJobID()); - - final AdaptiveScheduler scheduler = - new AdaptiveSchedulerBuilder(jobGraph, mainThreadExecutor) - .setDeclarativeSlotPool(declarativeSlotPool) - .build(EXECUTOR_RESOURCE.getExecutor()); - - scheduler.startScheduling(); - + void testHasEnoughResourcesReturnsTrueIfSatisfied() { final ResourceCounter resourceRequirement = ResourceCounter.withResource(ResourceProfile.UNKNOWN, 1); + final Collection<TestSlotInfo> freeSlots = + createSlotInfosForResourceRequirements(resourceRequirement); + assertThat(AdaptiveScheduler.hasDesiredResources(resourceRequirement, freeSlots)).isTrue(); + } - offerSlots( - declarativeSlotPool, createSlotOffersForResourceRequirements(resourceRequirement)); + private Collection<TestSlotInfo> createSlotInfosForResourceRequirements( + ResourceCounter resourceRequirements) { + final Collection<TestSlotInfo> slotInfos = new ArrayList<>(); - assertThat(scheduler.hasDesiredResources(resourceRequirement)).isTrue(); + for (Map.Entry<ResourceProfile, Integer> resourceProfileCount : + resourceRequirements.getResourcesWithCount()) { + for (int i = 0; i < resourceProfileCount.getValue(); i++) { + slotInfos.add(new TestSlotInfo(resourceProfileCount.getKey())); + } + } + + return slotInfos; } @Test void testHasEnoughResourcesUsesUnmatchedSlotsAsUnknown() throws Exception { - final JobGraph jobGraph = createJobGraph(); - - final DefaultDeclarativeSlotPool declarativeSlotPool = - createDeclarativeSlotPool(jobGraph.getJobID()); - - final AdaptiveScheduler scheduler = - new AdaptiveSchedulerBuilder(jobGraph, mainThreadExecutor) - .setDeclarativeSlotPool(declarativeSlotPool) - .build(EXECUTOR_RESOURCE.getExecutor()); - - scheduler.startScheduling(); - final int numRequiredSlots = 1; final ResourceCounter requiredResources = ResourceCounter.withResource(ResourceProfile.UNKNOWN, numRequiredSlots); @@ -274,9 +262,10 @@ public class AdaptiveSchedulerTest { ResourceCounter.withResource( ResourceProfile.newBuilder().setCpuCores(1).build(), numRequiredSlots); - offerSlots(declarativeSlotPool, createSlotOffersForResourceRequirements(providedResources)); + final Collection<TestSlotInfo> freeSlots = + createSlotInfosForResourceRequirements(providedResources); - assertThat(scheduler.hasDesiredResources(requiredResources)).isTrue(); + assertThat(AdaptiveScheduler.hasDesiredResources(requiredResources, freeSlots)).isTrue(); } @Test diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResourcesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResourcesTest.java index 202d0fdbd5f..536d4b4a6e7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResourcesTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResourcesTest.java @@ -20,12 +20,10 @@ package org.apache.flink.runtime.scheduler.adaptive; import org.apache.flink.api.common.JobStatus; import org.apache.flink.core.testutils.ScheduledTask; -import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; import org.apache.flink.runtime.executiongraph.ErrorInfo; import org.apache.flink.runtime.executiongraph.ExecutionGraph; import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder; -import org.apache.flink.runtime.util.ResourceCounter; import org.apache.flink.util.TestLogger; import org.apache.flink.util.clock.Clock; import org.apache.flink.util.clock.ManualClock; @@ -55,9 +53,6 @@ import static org.junit.Assert.fail; /** Tests for the WaitingForResources state. */ public class WaitingForResourcesTest extends TestLogger { - private static final ResourceCounter RESOURCE_COUNTER = - ResourceCounter.withResource(ResourceProfile.UNKNOWN, 1); - private static final Duration STABILIZATION_TIMEOUT = Duration.ofSeconds(1); /** WaitingForResources is transitioning to Executing if there are enough resources. */ @@ -68,8 +63,7 @@ public class WaitingForResourcesTest extends TestLogger { ctx.setExpectCreatingExecutionGraph(); - new WaitingForResources( - ctx, log, RESOURCE_COUNTER, Duration.ZERO, STABILIZATION_TIMEOUT); + new WaitingForResources(ctx, log, Duration.ZERO, STABILIZATION_TIMEOUT); ctx.runScheduledTasks(); } @@ -80,8 +74,7 @@ public class WaitingForResourcesTest extends TestLogger { try (MockContext ctx = new MockContext()) { ctx.setHasDesiredResources(() -> false); WaitingForResources wfr = - new WaitingForResources( - ctx, log, RESOURCE_COUNTER, Duration.ZERO, STABILIZATION_TIMEOUT); + new WaitingForResources(ctx, log, Duration.ZERO, STABILIZATION_TIMEOUT); // we expect no state transition. wfr.onNewResourcesAvailable(); @@ -93,8 +86,7 @@ public class WaitingForResourcesTest extends TestLogger { try (MockContext ctx = new MockContext()) { ctx.setHasDesiredResources(() -> false); // initially, not enough resources WaitingForResources wfr = - new WaitingForResources( - ctx, log, RESOURCE_COUNTER, Duration.ZERO, STABILIZATION_TIMEOUT); + new WaitingForResources(ctx, log, Duration.ZERO, STABILIZATION_TIMEOUT); ctx.setHasDesiredResources(() -> true); // make resources available ctx.setExpectCreatingExecutionGraph(); wfr.onNewResourcesAvailable(); // .. and notify @@ -108,11 +100,7 @@ public class WaitingForResourcesTest extends TestLogger { Duration noStabilizationTimeout = Duration.ofMillis(0); WaitingForResources wfr = new WaitingForResources( - ctx, - log, - RESOURCE_COUNTER, - Duration.ofSeconds(1000), - noStabilizationTimeout); + ctx, log, Duration.ofSeconds(1000), noStabilizationTimeout); ctx.setHasDesiredResources(() -> false); ctx.setHasSufficientResources(() -> true); @@ -129,11 +117,7 @@ public class WaitingForResourcesTest extends TestLogger { WaitingForResources wfr = new WaitingForResources( - ctx, - log, - RESOURCE_COUNTER, - Duration.ofSeconds(1000), - stabilizationTimeout); + ctx, log, Duration.ofSeconds(1000), stabilizationTimeout); ctx.setHasDesiredResources(() -> false); ctx.setHasSufficientResources(() -> true); @@ -155,7 +139,6 @@ public class WaitingForResourcesTest extends TestLogger { new WaitingForResources( ctx, log, - RESOURCE_COUNTER, initialResourceTimeout, stabilizationTimeout, ctx.getClock(), @@ -190,7 +173,6 @@ public class WaitingForResourcesTest extends TestLogger { new WaitingForResources( ctx, log, - RESOURCE_COUNTER, initialResourceTimeout, stabilizationTimeout, ctx.getClock(), @@ -232,12 +214,7 @@ public class WaitingForResourcesTest extends TestLogger { try (MockContext ctx = new MockContext()) { ctx.setHasDesiredResources(() -> false); WaitingForResources wfr = - new WaitingForResources( - ctx, - log, - RESOURCE_COUNTER, - Duration.ofMillis(-1), - STABILIZATION_TIMEOUT); + new WaitingForResources(ctx, log, Duration.ofMillis(-1), STABILIZATION_TIMEOUT); ctx.runScheduledTasks(); assertThat(ctx.hasStateTransition(), is(false)); @@ -249,8 +226,7 @@ public class WaitingForResourcesTest extends TestLogger { try (MockContext ctx = new MockContext()) { ctx.setHasDesiredResources(() -> false); WaitingForResources wfr = - new WaitingForResources( - ctx, log, RESOURCE_COUNTER, Duration.ZERO, STABILIZATION_TIMEOUT); + new WaitingForResources(ctx, log, Duration.ZERO, STABILIZATION_TIMEOUT); ctx.setExpectCreatingExecutionGraph(); @@ -264,8 +240,7 @@ public class WaitingForResourcesTest extends TestLogger { try (MockContext ctx = new MockContext()) { ctx.setHasDesiredResources(() -> false); WaitingForResources wfr = - new WaitingForResources( - ctx, log, RESOURCE_COUNTER, Duration.ZERO, STABILIZATION_TIMEOUT); + new WaitingForResources(ctx, log, Duration.ZERO, STABILIZATION_TIMEOUT); ctx.setExpectFinished( archivedExecutionGraph -> { @@ -287,8 +262,7 @@ public class WaitingForResourcesTest extends TestLogger { try (MockContext ctx = new MockContext()) { ctx.setHasDesiredResources(() -> false); WaitingForResources wfr = - new WaitingForResources( - ctx, log, RESOURCE_COUNTER, Duration.ZERO, STABILIZATION_TIMEOUT); + new WaitingForResources(ctx, log, Duration.ZERO, STABILIZATION_TIMEOUT); ctx.setExpectFinished( (archivedExecutionGraph -> { @@ -303,8 +277,7 @@ public class WaitingForResourcesTest extends TestLogger { try (MockContext ctx = new MockContext()) { ctx.setHasDesiredResources(() -> false); WaitingForResources wfr = - new WaitingForResources( - ctx, log, RESOURCE_COUNTER, Duration.ZERO, STABILIZATION_TIMEOUT); + new WaitingForResources(ctx, log, Duration.ZERO, STABILIZATION_TIMEOUT); ctx.setExpectFinished( (archivedExecutionGraph -> { @@ -480,7 +453,7 @@ public class WaitingForResourcesTest extends TestLogger { } @Override - public boolean hasDesiredResources(ResourceCounter desiredResources) { + public boolean hasDesiredResources() { return hasDesiredResourcesSupplier.get(); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestSlotInfo.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestSlotInfo.java index e467554ddfa..9f3573a0705 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestSlotInfo.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestSlotInfo.java @@ -24,16 +24,26 @@ import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; /** Test {@link SlotInfo} implementation. */ -class TestSlotInfo implements SlotInfo { +public class TestSlotInfo implements SlotInfo { private final AllocationID allocationId; + private final ResourceProfile resourceProfile; public TestSlotInfo() { - this(new AllocationID()); + this(new AllocationID(), ResourceProfile.ANY); } public TestSlotInfo(AllocationID allocationId) { + this(allocationId, ResourceProfile.ANY); + } + + public TestSlotInfo(ResourceProfile resourceProfile) { + this(new AllocationID(), resourceProfile); + } + + public TestSlotInfo(AllocationID allocationId, ResourceProfile resourceProfile) { this.allocationId = allocationId; + this.resourceProfile = resourceProfile; } @Override @@ -53,7 +63,7 @@ class TestSlotInfo implements SlotInfo { @Override public ResourceProfile getResourceProfile() { - return ResourceProfile.ANY; + return resourceProfile; } @Override