This is an automated email from the ASF dual-hosted git repository. fanrui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit b1fbd033ad994c51d7ca57c6e38ee814835a3dd5 Author: Roc Marshal <[email protected]> AuthorDate: Fri Aug 2 20:10:11 2024 +0800 [FLINK-33875][runtime] Support slots wait mechanism at DeclarativeSlotPoolBridge side for Default Scheduler --- .../DefaultSlotPoolServiceSchedulerFactory.java | 6 + .../slotpool/AbstractSlotPoolServiceFactory.java | 6 +- .../slotpool/DeclarativeSlotPoolBridge.java | 62 +++++++++ .../DeclarativeSlotPoolBridgeServiceFactory.java | 10 +- .../AbstractDeclarativeSlotPoolBridgeTest.java | 47 ++++--- .../slotpool/DeclarativeSlotPoolBridgeBuilder.java | 8 ++ ...tiveSlotPoolBridgePreferredAllocationsTest.java | 10 +- ...arativeSlotPoolBridgeRequestCompletionTest.java | 35 ++++- ...ativeSlotPoolBridgeResourceDeclarationTest.java | 83 ++++++++++-- .../slotpool/DeclarativeSlotPoolBridgeTest.java | 145 +++++++++++++++++---- 10 files changed, 351 insertions(+), 61 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java index bd51b9049b1..f3865b7bcfb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java @@ -171,6 +171,10 @@ public final class DefaultSlotPoolServiceSchedulerFactory final Duration slotRequestMaxInterval = configuration.get(SLOT_REQUEST_MAX_INTERVAL); + // TODO: It will be assigned by the corresponding logic after + // https://issues.apache.org/jira/browse/FLINK-35966 + final boolean slotBatchAllocatable = false; + if (configuration .getOptional(JobManagerOptions.HYBRID_PARTITION_DATA_CONSUME_CONSTRAINT) .isPresent()) { @@ -190,6 +194,7 @@ public final class DefaultSlotPoolServiceSchedulerFactory slotIdleTimeout, batchSlotTimeout, slotRequestMaxInterval, + slotBatchAllocatable, getRequestSlotMatchingStrategy(configuration, jobType)); break; case Adaptive: @@ -210,6 +215,7 @@ public final class DefaultSlotPoolServiceSchedulerFactory slotIdleTimeout, batchSlotTimeout, slotRequestMaxInterval, + slotBatchAllocatable, getRequestSlotMatchingStrategy(configuration, jobType)); break; default: diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AbstractSlotPoolServiceFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AbstractSlotPoolServiceFactory.java index 484e7fce3b3..36c2b2b7de2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AbstractSlotPoolServiceFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AbstractSlotPoolServiceFactory.java @@ -37,16 +37,20 @@ public abstract class AbstractSlotPoolServiceFactory implements SlotPoolServiceF @Nonnull protected final Duration slotRequestMaxInterval; + protected final boolean slotBatchAllocatable; + protected AbstractSlotPoolServiceFactory( @Nonnull Clock clock, @Nonnull Duration rpcTimeout, @Nonnull Duration slotIdleTimeout, @Nonnull Duration batchSlotTimeout, - @Nonnull Duration slotRequestMaxInterval) { + @Nonnull Duration slotRequestMaxInterval, + boolean slotBatchAllocatable) { this.clock = clock; this.rpcTimeout = rpcTimeout; this.slotIdleTimeout = slotIdleTimeout; this.batchSlotTimeout = batchSlotTimeout; this.slotRequestMaxInterval = slotRequestMaxInterval; + this.slotBatchAllocatable = slotBatchAllocatable; } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java index d7ea9a0cab6..8d3d6435e08 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java @@ -70,6 +70,8 @@ public class DeclarativeSlotPoolBridge extends DeclarativeSlotPoolService implem private boolean isJobRestarting = false; + private final boolean slotBatchAllocatable; + public DeclarativeSlotPoolBridge( JobID jobId, DeclarativeSlotPoolFactory declarativeSlotPoolFactory, @@ -79,6 +81,7 @@ public class DeclarativeSlotPoolBridge extends DeclarativeSlotPoolService implem Duration batchSlotTimeout, RequestSlotMatchingStrategy requestSlotMatchingStrategy, Duration slotRequestMaxInterval, + boolean slotBatchAllocatable, @Nonnull ComponentMainThreadExecutor componentMainThreadExecutor) { super( jobId, @@ -96,6 +99,7 @@ public class DeclarativeSlotPoolBridge extends DeclarativeSlotPoolService implem "Using the request slot matching strategy: {}", requestSlotMatchingStrategy.getClass().getSimpleName()); this.requestSlotMatchingStrategy = requestSlotMatchingStrategy; + this.slotBatchAllocatable = slotBatchAllocatable; this.isBatchSlotRequestTimeoutCheckDisabled = false; @@ -202,10 +206,63 @@ public class DeclarativeSlotPoolBridge extends DeclarativeSlotPoolService implem @VisibleForTesting void newSlotsAreAvailable(Collection<? extends PhysicalSlot> newSlots) { + if (pendingRequests.isEmpty()) { + return; + } + + if (slotBatchAllocatable) { + newSlotsAvailableForSlotBatchAllocatable(newSlots); + } else { + newSlotsAvailableForDirectlyAllocatable(newSlots); + } + } + + private void newSlotsAvailableForSlotBatchAllocatable( + Collection<? extends PhysicalSlot> newSlots) { + log.debug("Received new available slots: {}", newSlots); + + final FreeSlotTracker freeSlotInfoTracker = getDeclarativeSlotPool().getFreeSlotTracker(); + + final int slotsNum = freeSlotInfoTracker.getAvailableSlots().size(); + if (slotsNum < pendingRequests.size()) { + // Do nothing and waiting slots. + log.debug( + "The number of available slots: {}, the required number of slots: {}, waiting for more available slots.", + slotsNum, + pendingRequests.size()); + return; + } + + final Collection<PhysicalSlot> availableSlots = + freeSlotInfoTracker.getFreeSlotsInformation(); + final Collection<RequestSlotMatchingStrategy.RequestSlotMatch> requestSlotMatches = + requestSlotMatchingStrategy.matchRequestsAndSlots( + availableSlots, pendingRequests.values()); + if (requestSlotMatches.size() == pendingRequests.size()) { + reserveAndFulfillMatchedFreeSlots(requestSlotMatches); + } else if (requestSlotMatches.size() < pendingRequests.size()) { + // Do nothing and waiting slots. + log.debug( + "Ignored the matched results: {}, pendingRequests: {}, waiting for more available slots.", + requestSlotMatches, + pendingRequests); + } else { + // For requestSlotMatches.size() > pendingRequests.size() + throw new IllegalStateException( + "The number of matched slots is not equals to the pendingRequests."); + } + } + + private void newSlotsAvailableForDirectlyAllocatable( + Collection<? extends PhysicalSlot> newSlots) { final Collection<RequestSlotMatchingStrategy.RequestSlotMatch> requestSlotMatches = requestSlotMatchingStrategy.matchRequestsAndSlots( newSlots, pendingRequests.values()); + reserveAndFulfillMatchedFreeSlots(requestSlotMatches); + } + private void reserveAndFulfillMatchedFreeSlots( + Collection<RequestSlotMatchingStrategy.RequestSlotMatch> requestSlotMatches) { for (RequestSlotMatchingStrategy.RequestSlotMatch match : requestSlotMatches) { final PendingRequest pendingRequest = match.getPendingRequest(); final PhysicalSlot slot = match.getSlot(); @@ -234,6 +291,11 @@ public class DeclarativeSlotPoolBridge extends DeclarativeSlotPoolService implem } } + @VisibleForTesting + Collection<PhysicalSlot> getFreeSlotsInformation() { + return getDeclarativeSlotPool().getFreeSlotTracker().getFreeSlotsInformation(); + } + private void reserveFreeSlot( SlotRequestId slotRequestId, AllocationID allocationId, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeServiceFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeServiceFactory.java index c446c8727da..d3ac47875ff 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeServiceFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeServiceFactory.java @@ -37,8 +37,15 @@ public class DeclarativeSlotPoolBridgeServiceFactory extends AbstractSlotPoolSer @Nonnull Duration slotIdleTimeout, @Nonnull Duration batchSlotTimeout, @Nonnull Duration slotRequestMaxInterval, + boolean slotBatchAllocatable, @Nonnull RequestSlotMatchingStrategy requestSlotMatchingStrategy) { - super(clock, rpcTimeout, slotIdleTimeout, batchSlotTimeout, slotRequestMaxInterval); + super( + clock, + rpcTimeout, + slotIdleTimeout, + batchSlotTimeout, + slotRequestMaxInterval, + slotBatchAllocatable); this.requestSlotMatchingStrategy = requestSlotMatchingStrategy; } @@ -57,6 +64,7 @@ public class DeclarativeSlotPoolBridgeServiceFactory extends AbstractSlotPoolSer batchSlotTimeout, requestSlotMatchingStrategy, slotRequestMaxInterval, + slotBatchAllocatable, componentMainThreadExecutor); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/AbstractDeclarativeSlotPoolBridgeTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/AbstractDeclarativeSlotPoolBridgeTest.java index 6d9fff147ab..5f0c29b36b5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/AbstractDeclarativeSlotPoolBridgeTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/AbstractDeclarativeSlotPoolBridgeTest.java @@ -51,37 +51,51 @@ abstract class AbstractDeclarativeSlotPoolBridgeTest { @Parameter(1) protected Duration slotRequestMaxInterval; - @Parameters(name = "requestSlotMatchingStrategy: {0}, slotRequestMaxInterval: {1}") + @Parameter(2) + boolean slotBatchAllocatable; + + @Parameters( + name = + "requestSlotMatchingStrategy: {0}, slotRequestMaxInterval: {1}, slotBatchAllocatable: {2}") private static Collection<Object[]> data() { return Arrays.asList( - new Object[] {SimpleRequestSlotMatchingStrategy.INSTANCE, Duration.ZERO}, - new Object[] {SimpleRequestSlotMatchingStrategy.INSTANCE, Duration.ofMillis(50)}, + new Object[] {SimpleRequestSlotMatchingStrategy.INSTANCE, Duration.ZERO, false}, + new Object[] {SimpleRequestSlotMatchingStrategy.INSTANCE, Duration.ZERO, true}, + new Object[] { + SimpleRequestSlotMatchingStrategy.INSTANCE, Duration.ofMillis(20), false + }, + new Object[] { + SimpleRequestSlotMatchingStrategy.INSTANCE, Duration.ofMillis(20), true + }, + new Object[] { + PreferredAllocationRequestSlotMatchingStrategy.INSTANCE, Duration.ZERO, false + }, new Object[] { - PreferredAllocationRequestSlotMatchingStrategy.INSTANCE, Duration.ZERO + PreferredAllocationRequestSlotMatchingStrategy.INSTANCE, Duration.ZERO, true }, new Object[] { - PreferredAllocationRequestSlotMatchingStrategy.INSTANCE, Duration.ofMillis(50) + PreferredAllocationRequestSlotMatchingStrategy.INSTANCE, + Duration.ofMillis(20), + false + }, + new Object[] { + PreferredAllocationRequestSlotMatchingStrategy.INSTANCE, + Duration.ofMillis(20), + true }); } @Nonnull DeclarativeSlotPoolBridge createDeclarativeSlotPoolBridge( - DeclarativeSlotPoolFactory declarativeSlotPoolFactory, - RequestSlotMatchingStrategy requestSlotMatchingStrategy, - Duration slotRequestMaxInterval) { + DeclarativeSlotPoolFactory declarativeSlotPoolFactory) { return createDeclarativeSlotPoolBridge( - declarativeSlotPoolFactory, - requestSlotMatchingStrategy, - slotRequestMaxInterval, - componentMainThreadExecutor); + declarativeSlotPoolFactory, componentMainThreadExecutor); } @Nonnull DeclarativeSlotPoolBridge createDeclarativeSlotPoolBridge( DeclarativeSlotPoolFactory declarativeSlotPoolFactory, - RequestSlotMatchingStrategy requestSlotMatchingStrategy, - Duration slotRequestMaxInterval, - ComponentMainThreadExecutor mainThreadExecutor) { + ComponentMainThreadExecutor componentMainThreadExecutor) { return new DeclarativeSlotPoolBridge( JOB_ID, declarativeSlotPoolFactory, @@ -91,7 +105,8 @@ abstract class AbstractDeclarativeSlotPoolBridgeTest { Duration.ofSeconds(20), requestSlotMatchingStrategy, slotRequestMaxInterval, - mainThreadExecutor); + slotBatchAllocatable, + componentMainThreadExecutor); } static PhysicalSlot createAllocatedSlot(AllocationID allocationID) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeBuilder.java index 9855ba8be84..0525f15b4c0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeBuilder.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeBuilder.java @@ -45,6 +45,7 @@ public class DeclarativeSlotPoolBridgeBuilder { private Clock clock = SystemClock.getInstance(); private Duration slotRequestMaxInterval = SLOT_REQUEST_MAX_INTERVAL.defaultValue(); private ComponentMainThreadExecutor mainThreadExecutor = forMainThread(); + private boolean slotBatchAllocatable = false; @Nullable private ResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway(); @@ -96,6 +97,11 @@ public class DeclarativeSlotPoolBridgeBuilder { return this; } + public DeclarativeSlotPoolBridgeBuilder setSlotBatchAllocatable(boolean slotBatchAllocatable) { + this.slotBatchAllocatable = slotBatchAllocatable; + return this; + } + public DeclarativeSlotPoolBridge build() { return new DeclarativeSlotPoolBridge( jobId, @@ -106,6 +112,7 @@ public class DeclarativeSlotPoolBridgeBuilder { batchSlotTimeout, requestSlotMatchingStrategy, slotRequestMaxInterval, + slotBatchAllocatable, mainThreadExecutor); } @@ -120,6 +127,7 @@ public class DeclarativeSlotPoolBridgeBuilder { batchSlotTimeout, requestSlotMatchingStrategy, slotRequestMaxInterval, + slotBatchAllocatable, mainThreadExecutor); slotPool.start(JobMasterId.generate(), "foobar"); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgePreferredAllocationsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgePreferredAllocationsTest.java index ace4cf756c8..9107a9b67d2 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgePreferredAllocationsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgePreferredAllocationsTest.java @@ -29,7 +29,8 @@ import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation; import org.apache.flink.testutils.TestingUtils; import org.apache.flink.util.clock.SystemClock; -import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import javax.annotation.Nonnull; @@ -45,8 +46,10 @@ import static org.assertj.core.api.Assertions.assertThat; class DeclarativeSlotPoolBridgePreferredAllocationsTest { - @Test - void testDeclarativeSlotPoolTakesPreferredAllocationsIntoAccount() throws Exception { + @ValueSource(booleans = {true, false}) + @ParameterizedTest(name = "slotBatchAllocatable: {0}") + void testDeclarativeSlotPoolTakesPreferredAllocationsIntoAccount(boolean slotBatchAllocatable) + throws Exception { final DeclarativeSlotPoolBridge declarativeSlotPoolBridge = new DeclarativeSlotPoolBridge( new JobID(), @@ -57,6 +60,7 @@ class DeclarativeSlotPoolBridgePreferredAllocationsTest { TestingUtils.infiniteDuration(), PreferredAllocationRequestSlotMatchingStrategy.INSTANCE, Duration.ZERO, + slotBatchAllocatable, forMainThread()); declarativeSlotPoolBridge.start(JobMasterId.generate(), "localhost"); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeRequestCompletionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeRequestCompletionTest.java index 71909ca5aac..a1967dc920d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeRequestCompletionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeRequestCompletionTest.java @@ -25,11 +25,16 @@ import org.apache.flink.runtime.jobmaster.SlotRequestId; import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway; import org.apache.flink.runtime.taskexecutor.slot.SlotOffer; import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation; +import org.apache.flink.testutils.junit.extensions.parameterized.Parameter; +import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension; +import org.apache.flink.testutils.junit.extensions.parameterized.Parameters; import org.apache.flink.util.FlinkException; import org.apache.flink.util.function.CheckedSupplier; +import org.assertj.core.util.Lists; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; import java.time.Duration; import java.util.Collection; @@ -44,29 +49,38 @@ import java.util.stream.IntStream; import static org.assertj.core.api.Assertions.assertThat; /** Tests how the {@link DeclarativeSlotPoolBridge} completes slot requests. */ +@ExtendWith(ParameterizedTestExtension.class) class DeclarativeSlotPoolBridgeRequestCompletionTest { private static final Duration TIMEOUT = SlotPoolUtils.TIMEOUT; private TestingResourceManagerGateway resourceManagerGateway; + @Parameter private boolean slotBatchAllocatable; + + @Parameters(name = "slotBatchAllocatable: {0}") + public static List<Boolean> getSlotBatchAllocatableParams() { + return Lists.newArrayList(false, true); + } + @BeforeEach void setUp() { resourceManagerGateway = new TestingResourceManagerGateway(); } /** Tests that the {@link DeclarativeSlotPoolBridge} completes slots in request order. */ - @Test + @TestTemplate void testRequestsAreCompletedInRequestOrder() { runSlotRequestCompletionTest( - CheckedSupplier.unchecked(this::createAndSetUpSlotPool), slotPool -> {}); + CheckedSupplier.unchecked(() -> createAndSetUpSlotPool(slotBatchAllocatable)), + slotPool -> {}); } /** * Tests that the {@link DeclarativeSlotPoolBridge} completes stashed slot requests in request * order. */ - @Test + @TestTemplate void testStashOrderMaintainsRequestOrder() { runSlotRequestCompletionTest( CheckedSupplier.unchecked(this::createAndSetUpSlotPoolWithoutResourceManager), @@ -112,17 +126,25 @@ class DeclarativeSlotPoolBridgeRequestCompletionTest { final FlinkException testingReleaseException = new FlinkException("Testing release exception"); + DeclarativeSlotPoolBridge slotPoolBridge = (DeclarativeSlotPoolBridge) slotPool; + // check that the slot requests get completed in sequential order for (int i = 0; i < slotRequestIds.size(); i++) { final CompletableFuture<PhysicalSlot> slotRequestFuture = slotRequests.get(i); - assertThat(slotRequestFuture.getNow(null)).isNotNull(); + if (slotBatchAllocatable) { + assertThat(slotRequestFuture.getNow(null)).isNull(); + assertThat(slotPoolBridge.getFreeSlotsInformation()).hasSize(1); + } else { + assertThat(slotRequestFuture.getNow(null)).isNotNull(); + } slotPool.releaseSlot(slotRequestIds.get(i), testingReleaseException); } } } - private SlotPool createAndSetUpSlotPool() throws Exception { + private SlotPool createAndSetUpSlotPool(boolean slotBatchAllocatable) throws Exception { return new DeclarativeSlotPoolBridgeBuilder() + .setSlotBatchAllocatable(slotBatchAllocatable) .setResourceManagerGateway(resourceManagerGateway) .buildAndStart(); } @@ -133,6 +155,7 @@ class DeclarativeSlotPoolBridgeRequestCompletionTest { private SlotPool createAndSetUpSlotPoolWithoutResourceManager() throws Exception { return new DeclarativeSlotPoolBridgeBuilder() + .setSlotBatchAllocatable(slotBatchAllocatable) .setResourceManagerGateway(null) .buildAndStart(); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeResourceDeclarationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeResourceDeclarationTest.java index d830a6649e9..3778b4e503b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeResourceDeclarationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeResourceDeclarationTest.java @@ -31,11 +31,17 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.TestTemplate; import org.junit.jupiter.api.extension.ExtendWith; +import javax.annotation.Nonnull; + import java.time.Duration; import java.util.Collections; +import java.util.Objects; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; import static org.apache.flink.core.testutils.FlinkAssertions.assertThatFuture; import static org.assertj.core.api.Assertions.assertThat; @@ -50,7 +56,8 @@ class DeclarativeSlotPoolBridgeResourceDeclarationTest @BeforeEach void setup() { - requirementListener = new RequirementListener(); + requirementListener = + new RequirementListener(componentMainThreadExecutor, slotRequestMaxInterval); constructDeclarativeSlotPoolBridge(componentMainThreadExecutor); } @@ -76,11 +83,7 @@ class DeclarativeSlotPoolBridgeResourceDeclarationTest final TestingDeclarativeSlotPoolFactory declarativeSlotPoolFactory = new TestingDeclarativeSlotPoolFactory(slotPoolBuilder); declarativeSlotPoolBridge = - createDeclarativeSlotPoolBridge( - declarativeSlotPoolFactory, - requestSlotMatchingStrategy, - slotRequestMaxInterval, - mainThreadExecutor); + createDeclarativeSlotPoolBridge(declarativeSlotPoolFactory, mainThreadExecutor); } @AfterEach @@ -97,6 +100,9 @@ class DeclarativeSlotPoolBridgeResourceDeclarationTest // requesting the allocation of a new slot should increase the requirements declarativeSlotPoolBridge.requestNewAllocatedSlot( new SlotRequestId(), ResourceProfile.UNKNOWN, Duration.ofMinutes(5)); + + requirementListener.tryWaitSlotRequestIsDone(); + assertThat(requirementListener.getRequirements().getResourceCount(ResourceProfile.UNKNOWN)) .isOne(); } @@ -109,6 +115,8 @@ class DeclarativeSlotPoolBridgeResourceDeclarationTest ComponentMainThreadExecutor mainThreadExecutor = ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor( scheduledExecutorService); + requirementListener = + new RequirementListener(mainThreadExecutor, slotRequestMaxInterval); constructDeclarativeSlotPoolBridge(mainThreadExecutor); declarativeSlotPoolBridge.start(JOB_MASTER_ID, "localhost"); @@ -119,12 +127,15 @@ class DeclarativeSlotPoolBridgeResourceDeclarationTest declarativeSlotPoolBridge.requestNewAllocatedSlot( new SlotRequestId(), ResourceProfile.UNKNOWN, - Duration.ofMillis(5)), + Duration.ofMillis( + slotRequestMaxInterval.toMillis() * 2)), mainThreadExecutor) .get(); - + requirementListener.tryWaitSlotRequestIsDone(); // waiting for the timeout assertThatFuture(allocationFuture).failsWithin(Duration.ofMinutes(1)); + requirementListener.tryWaitSlotRequestIsDone(); + // when the allocation fails the requirements should be reduced (it is the users // responsibility to retry) CompletableFuture.runAsync( @@ -164,6 +175,9 @@ class DeclarativeSlotPoolBridgeResourceDeclarationTest final SlotRequestId slotRequestId = new SlotRequestId(); declarativeSlotPoolBridge.allocateAvailableSlot( slotRequestId, newSlot.getAllocationId(), ResourceProfile.UNKNOWN); + + requirementListener.tryWaitSlotRequestIsDone(); + assertThat(requirementListener.getRequirements().getResourceCount(ResourceProfile.UNKNOWN)) .isOne(); } @@ -179,6 +193,8 @@ class DeclarativeSlotPoolBridgeResourceDeclarationTest declarativeSlotPoolBridge.allocateAvailableSlot( slotRequestId, newSlot.getAllocationId(), ResourceProfile.UNKNOWN); + requirementListener.tryWaitSlotRequestIsDone(); + // releasing (==freeing) a [reserved] slot should decrease the requirements declarativeSlotPoolBridge.releaseSlot( slotRequestId, new RuntimeException("Test exception")); @@ -196,6 +212,8 @@ class DeclarativeSlotPoolBridgeResourceDeclarationTest declarativeSlotPoolBridge.allocateAvailableSlot( new SlotRequestId(), newSlot.getAllocationId(), ResourceProfile.UNKNOWN); + requirementListener.tryWaitSlotRequestIsDone(); + // releasing (==freeing) a [reserved] slot should decrease the requirements declarativeSlotPoolBridge.failAllocation( newSlot.getTaskManagerLocation().getResourceID(), @@ -205,12 +223,36 @@ class DeclarativeSlotPoolBridgeResourceDeclarationTest .isZero(); } + /** Requirement listener for testing. */ private static final class RequirementListener { + ComponentMainThreadExecutor componentMainThreadExecutor; + Duration slotRequestMaxInterval; + ScheduledFuture<?> slotRequestFuture; + + RequirementListener( + ComponentMainThreadExecutor componentMainThreadExecutor, + @Nonnull Duration slotRequestMaxInterval) { + this.componentMainThreadExecutor = componentMainThreadExecutor; + this.slotRequestMaxInterval = slotRequestMaxInterval; + } + private ResourceCounter requirements = ResourceCounter.empty(); private void increaseRequirements(ResourceCounter requirements) { - this.requirements = this.requirements.add(requirements); + if (slotRequestMaxInterval.toMillis() <= 0L) { + this.requirements = this.requirements.add(requirements); + return; + } + + if (!slotSlotRequestFutureAssignable()) { + slotRequestFuture.cancel(true); + } + slotRequestFuture = + componentMainThreadExecutor.schedule( + () -> this.checkSlotRequestMaxInterval(requirements), + slotRequestMaxInterval.toMillis(), + TimeUnit.MILLISECONDS); } private void decreaseRequirements(ResourceCounter requirements) { @@ -220,5 +262,28 @@ class DeclarativeSlotPoolBridgeResourceDeclarationTest public ResourceCounter getRequirements() { return requirements; } + + public void tryWaitSlotRequestIsDone() { + if (Objects.nonNull(slotRequestFuture)) { + try { + slotRequestFuture.get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + } + } + + private boolean slotSlotRequestFutureAssignable() { + return slotRequestFuture == null + || slotRequestFuture.isDone() + || slotRequestFuture.isCancelled(); + } + + private void checkSlotRequestMaxInterval(ResourceCounter requirements) { + if (slotRequestMaxInterval.toMillis() <= 0L) { + return; + } + this.requirements = this.requirements.add(requirements); + } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeTest.java index ee261fa76aa..2f0f3dabba0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeTest.java @@ -35,7 +35,9 @@ import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; @@ -55,12 +57,23 @@ class DeclarativeSlotPoolBridgeTest extends AbstractDeclarativeSlotPoolBridgeTes final PhysicalSlot allocatedSlot = createAllocatedSlot(expectedAllocationId); final TestingDeclarativeSlotPoolFactory declarativeSlotPoolFactory = - new TestingDeclarativeSlotPoolFactory(TestingDeclarativeSlotPool.builder()); + new TestingDeclarativeSlotPoolFactory( + TestingDeclarativeSlotPool.builder() + .setGetFreeSlotTrackerSupplier( + () -> + TestingFreeSlotTracker.newBuilder() + .setGetFreeSlotsInformationSupplier( + () -> + Collections.singleton( + allocatedSlot)) + .setGetAvailableSlotsSupplier( + () -> + Collections.singleton( + allocatedSlot + .getAllocationId())) + .build())); try (DeclarativeSlotPoolBridge declarativeSlotPoolBridge = - createDeclarativeSlotPoolBridge( - declarativeSlotPoolFactory, - requestSlotMatchingStrategy, - slotRequestMaxInterval)) { + createDeclarativeSlotPoolBridge(declarativeSlotPoolFactory)) { declarativeSlotPoolBridge.start(JOB_MASTER_ID, "localhost"); @@ -68,6 +81,8 @@ class DeclarativeSlotPoolBridgeTest extends AbstractDeclarativeSlotPoolBridgeTes declarativeSlotPoolBridge.requestNewAllocatedSlot( slotRequestId, ResourceProfile.UNKNOWN, null); + tryWaitSlotRequestIsDone(declarativeSlotPoolBridge); + declarativeSlotPoolBridge.newSlotsAreAvailable(Collections.singleton(allocatedSlot)); slotAllocationFuture.join(); @@ -81,10 +96,7 @@ class DeclarativeSlotPoolBridgeTest extends AbstractDeclarativeSlotPoolBridgeTes final TestingDeclarativeSlotPoolFactory declarativeSlotPoolFactory = new TestingDeclarativeSlotPoolFactory(TestingDeclarativeSlotPool.builder()); try (DeclarativeSlotPoolBridge declarativeSlotPoolBridge = - createDeclarativeSlotPoolBridge( - declarativeSlotPoolFactory, - requestSlotMatchingStrategy, - slotRequestMaxInterval)) { + createDeclarativeSlotPoolBridge(declarativeSlotPoolFactory)) { declarativeSlotPoolBridge.start(JOB_MASTER_ID, "localhost"); @@ -98,6 +110,8 @@ class DeclarativeSlotPoolBridgeTest extends AbstractDeclarativeSlotPoolBridgeTes componentMainThreadExecutor) .get(); + tryWaitSlotRequestIsDone(declarativeSlotPoolBridge); + componentMainThreadExecutor.execute( () -> declarativeSlotPoolBridge.notifyNotEnoughResourcesAvailable( @@ -129,16 +143,16 @@ class DeclarativeSlotPoolBridgeTest extends AbstractDeclarativeSlotPoolBridgeTes final TestingDeclarativeSlotPoolFactory declarativeSlotPoolFactory = new TestingDeclarativeSlotPoolFactory(builder); try (DeclarativeSlotPoolBridge declarativeSlotPoolBridge = - createDeclarativeSlotPoolBridge( - declarativeSlotPoolFactory, - requestSlotMatchingStrategy, - slotRequestMaxInterval)) { + createDeclarativeSlotPoolBridge(declarativeSlotPoolFactory)) { declarativeSlotPoolBridge.start(JOB_MASTER_ID, "localhost"); final SlotRequestId slotRequestId = new SlotRequestId(); declarativeSlotPoolBridge.allocateAvailableSlot( slotRequestId, expectedAllocationId, allocatedSlot.getResourceProfile()); + + tryWaitSlotRequestIsDone(declarativeSlotPoolBridge); + declarativeSlotPoolBridge.releaseSlot(slotRequestId, null); assertThat(releaseSlotFuture.join()).isSameAs(expectedAllocationId); @@ -148,10 +162,7 @@ class DeclarativeSlotPoolBridgeTest extends AbstractDeclarativeSlotPoolBridgeTes @TestTemplate void testNoConcurrentModificationWhenSuspendingAndReleasingSlot() throws Exception { try (DeclarativeSlotPoolBridge declarativeSlotPoolBridge = - createDeclarativeSlotPoolBridge( - new DefaultDeclarativeSlotPoolFactory(), - requestSlotMatchingStrategy, - slotRequestMaxInterval)) { + createDeclarativeSlotPoolBridge(new DefaultDeclarativeSlotPoolFactory())) { declarativeSlotPoolBridge.start(JOB_MASTER_ID, "localhost"); @@ -178,6 +189,8 @@ class DeclarativeSlotPoolBridgeTest extends AbstractDeclarativeSlotPoolBridgeTes }) .collect(Collectors.toList()); + tryWaitSlotRequestIsDone(declarativeSlotPoolBridge); + declarativeSlotPoolBridge.close(); assertThatThrownBy(() -> FutureUtils.waitForAll(slotFutures).get()) @@ -189,10 +202,7 @@ class DeclarativeSlotPoolBridgeTest extends AbstractDeclarativeSlotPoolBridgeTes @TestTemplate void testAcceptingOfferedSlotsWithoutResourceManagerConnected() throws Exception { try (DeclarativeSlotPoolBridge declarativeSlotPoolBridge = - createDeclarativeSlotPoolBridge( - new DefaultDeclarativeSlotPoolFactory(), - requestSlotMatchingStrategy, - slotRequestMaxInterval)) { + createDeclarativeSlotPoolBridge(new DefaultDeclarativeSlotPoolFactory())) { declarativeSlotPoolBridge.start(JOB_MASTER_ID, "localhost"); @@ -200,6 +210,8 @@ class DeclarativeSlotPoolBridgeTest extends AbstractDeclarativeSlotPoolBridgeTes declarativeSlotPoolBridge.requestNewAllocatedSlot( new SlotRequestId(), ResourceProfile.UNKNOWN, RPC_TIMEOUT); + tryWaitSlotRequestIsDone(declarativeSlotPoolBridge); + final LocalTaskManagerLocation localTaskManagerLocation = new LocalTaskManagerLocation(); declarativeSlotPoolBridge.registerTaskManager(localTaskManagerLocation.getResourceID()); @@ -230,10 +242,7 @@ class DeclarativeSlotPoolBridgeTest extends AbstractDeclarativeSlotPoolBridgeTes })); try (DeclarativeSlotPoolBridge declarativeSlotPoolBridge = - createDeclarativeSlotPoolBridge( - declarativeSlotPoolFactory, - requestSlotMatchingStrategy, - slotRequestMaxInterval)) { + createDeclarativeSlotPoolBridge(declarativeSlotPoolFactory)) { declarativeSlotPoolBridge.start(JOB_MASTER_ID, "localhost"); declarativeSlotPoolBridge.setIsJobRestarting(true); @@ -252,4 +261,90 @@ class DeclarativeSlotPoolBridgeTest extends AbstractDeclarativeSlotPoolBridgeTes registerSlotsCalledFuture.join(); } } + + @TestTemplate + void testSlotsBatchAllocatableLogic() throws Exception { + testSlotsBatchAllocatableLogic(1); + testSlotsBatchAllocatableLogic(2); + testSlotsBatchAllocatableLogic(4); + testSlotsBatchAllocatableLogic(7); + testSlotsBatchAllocatableLogic(10); + testSlotsBatchAllocatableLogic(32); + } + + private void testSlotsBatchAllocatableLogic(int requestSlotNum) throws Exception { + + final Set<AllocationID> availableSlotsIds = new HashSet<>(); + final Set<PhysicalSlot> freeSlotsInformation = new HashSet<>(); + + try (DeclarativeSlotPoolBridge slotPoolBridge = + createDeclarativeSlotPoolBridge(freeSlotsInformation, availableSlotsIds)) { + + slotPoolBridge.start(JOB_MASTER_ID, "localhost"); + + final List<CompletableFuture<PhysicalSlot>> futures = new ArrayList<>(requestSlotNum); + for (int i = 0; i < requestSlotNum; i++) { + futures.add( + slotPoolBridge.requestNewAllocatedSlot( + new SlotRequestId(), ResourceProfile.UNKNOWN, null)); + } + + tryWaitSlotRequestIsDone(slotPoolBridge); + + for (int i = 0; i < requestSlotNum; i++) { + final PhysicalSlot slot = createAllocatedSlot(new AllocationID()); + newSlotsAreAvailable(slotPoolBridge, freeSlotsInformation, availableSlotsIds, slot); + if (slotBatchAllocatable) { + checkForSlotBatchAllocating(requestSlotNum, i, futures); + } else { + // Check for allocating slots directly. + futures.get(i).join(); + } + } + } + } + + private void checkForSlotBatchAllocating( + int requestSlotNum, int requestIndex, List<CompletableFuture<PhysicalSlot>> futures) { + if (requestIndex < requestSlotNum - 1) { + assertThat(FutureUtils.waitForAll(futures).getNumFuturesCompleted()).isZero(); + } else { + FutureUtils.waitForAll(futures).join(); + } + } + + private void newSlotsAreAvailable( + DeclarativeSlotPoolBridge declarativeSlotPoolBridge, + Set<PhysicalSlot> freeSlotsInformation, + Set<AllocationID> availableSlotsIds, + PhysicalSlot slot) { + freeSlotsInformation.add(slot); + availableSlotsIds.add(slot.getAllocationId()); + declarativeSlotPoolBridge.newSlotsAreAvailable(Collections.singleton(slot)); + } + + private void tryWaitSlotRequestIsDone(DeclarativeSlotPoolBridge declarativeSlotPoolBridge) { + if (declarativeSlotPoolBridge.getDeclarativeSlotPool() + instanceof DefaultDeclarativeSlotPool) { + final DefaultDeclarativeSlotPool slotPool = + (DefaultDeclarativeSlotPool) declarativeSlotPoolBridge.getDeclarativeSlotPool(); + slotPool.tryWaitSlotRequestIsDone(); + } + } + + private DeclarativeSlotPoolBridge createDeclarativeSlotPoolBridge( + Set<PhysicalSlot> freeSlotsInformation, Set<AllocationID> availableSlotsIds) { + final TestingDeclarativeSlotPoolFactory declarativeSlotPoolFactory = + new TestingDeclarativeSlotPoolFactory( + TestingDeclarativeSlotPool.builder() + .setGetFreeSlotTrackerSupplier( + () -> + TestingFreeSlotTracker.newBuilder() + .setGetFreeSlotsInformationSupplier( + () -> freeSlotsInformation) + .setGetAvailableSlotsSupplier( + () -> availableSlotsIds) + .build())); + return createDeclarativeSlotPoolBridge(declarativeSlotPoolFactory); + } }
