This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b1fbd033ad994c51d7ca57c6e38ee814835a3dd5
Author: Roc Marshal <[email protected]>
AuthorDate: Fri Aug 2 20:10:11 2024 +0800

    [FLINK-33875][runtime] Support slots wait mechanism at 
DeclarativeSlotPoolBridge side for Default Scheduler
---
 .../DefaultSlotPoolServiceSchedulerFactory.java    |   6 +
 .../slotpool/AbstractSlotPoolServiceFactory.java   |   6 +-
 .../slotpool/DeclarativeSlotPoolBridge.java        |  62 +++++++++
 .../DeclarativeSlotPoolBridgeServiceFactory.java   |  10 +-
 .../AbstractDeclarativeSlotPoolBridgeTest.java     |  47 ++++---
 .../slotpool/DeclarativeSlotPoolBridgeBuilder.java |   8 ++
 ...tiveSlotPoolBridgePreferredAllocationsTest.java |  10 +-
 ...arativeSlotPoolBridgeRequestCompletionTest.java |  35 ++++-
 ...ativeSlotPoolBridgeResourceDeclarationTest.java |  83 ++++++++++--
 .../slotpool/DeclarativeSlotPoolBridgeTest.java    | 145 +++++++++++++++++----
 10 files changed, 351 insertions(+), 61 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java
index bd51b9049b1..f3865b7bcfb 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java
@@ -171,6 +171,10 @@ public final class DefaultSlotPoolServiceSchedulerFactory
 
         final Duration slotRequestMaxInterval = 
configuration.get(SLOT_REQUEST_MAX_INTERVAL);
 
+        // TODO: It will be assigned by the corresponding logic after
+        //  https://issues.apache.org/jira/browse/FLINK-35966
+        final boolean slotBatchAllocatable = false;
+
         if (configuration
                 
.getOptional(JobManagerOptions.HYBRID_PARTITION_DATA_CONSUME_CONSTRAINT)
                 .isPresent()) {
@@ -190,6 +194,7 @@ public final class DefaultSlotPoolServiceSchedulerFactory
                                 slotIdleTimeout,
                                 batchSlotTimeout,
                                 slotRequestMaxInterval,
+                                slotBatchAllocatable,
                                 getRequestSlotMatchingStrategy(configuration, 
jobType));
                 break;
             case Adaptive:
@@ -210,6 +215,7 @@ public final class DefaultSlotPoolServiceSchedulerFactory
                                 slotIdleTimeout,
                                 batchSlotTimeout,
                                 slotRequestMaxInterval,
+                                slotBatchAllocatable,
                                 getRequestSlotMatchingStrategy(configuration, 
jobType));
                 break;
             default:
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AbstractSlotPoolServiceFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AbstractSlotPoolServiceFactory.java
index 484e7fce3b3..36c2b2b7de2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AbstractSlotPoolServiceFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AbstractSlotPoolServiceFactory.java
@@ -37,16 +37,20 @@ public abstract class AbstractSlotPoolServiceFactory 
implements SlotPoolServiceF
 
     @Nonnull protected final Duration slotRequestMaxInterval;
 
+    protected final boolean slotBatchAllocatable;
+
     protected AbstractSlotPoolServiceFactory(
             @Nonnull Clock clock,
             @Nonnull Duration rpcTimeout,
             @Nonnull Duration slotIdleTimeout,
             @Nonnull Duration batchSlotTimeout,
-            @Nonnull Duration slotRequestMaxInterval) {
+            @Nonnull Duration slotRequestMaxInterval,
+            boolean slotBatchAllocatable) {
         this.clock = clock;
         this.rpcTimeout = rpcTimeout;
         this.slotIdleTimeout = slotIdleTimeout;
         this.batchSlotTimeout = batchSlotTimeout;
         this.slotRequestMaxInterval = slotRequestMaxInterval;
+        this.slotBatchAllocatable = slotBatchAllocatable;
     }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java
index d7ea9a0cab6..8d3d6435e08 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java
@@ -70,6 +70,8 @@ public class DeclarativeSlotPoolBridge extends 
DeclarativeSlotPoolService implem
 
     private boolean isJobRestarting = false;
 
+    private final boolean slotBatchAllocatable;
+
     public DeclarativeSlotPoolBridge(
             JobID jobId,
             DeclarativeSlotPoolFactory declarativeSlotPoolFactory,
@@ -79,6 +81,7 @@ public class DeclarativeSlotPoolBridge extends 
DeclarativeSlotPoolService implem
             Duration batchSlotTimeout,
             RequestSlotMatchingStrategy requestSlotMatchingStrategy,
             Duration slotRequestMaxInterval,
+            boolean slotBatchAllocatable,
             @Nonnull ComponentMainThreadExecutor componentMainThreadExecutor) {
         super(
                 jobId,
@@ -96,6 +99,7 @@ public class DeclarativeSlotPoolBridge extends 
DeclarativeSlotPoolService implem
                 "Using the request slot matching strategy: {}",
                 requestSlotMatchingStrategy.getClass().getSimpleName());
         this.requestSlotMatchingStrategy = requestSlotMatchingStrategy;
+        this.slotBatchAllocatable = slotBatchAllocatable;
 
         this.isBatchSlotRequestTimeoutCheckDisabled = false;
 
@@ -202,10 +206,63 @@ public class DeclarativeSlotPoolBridge extends 
DeclarativeSlotPoolService implem
 
     @VisibleForTesting
     void newSlotsAreAvailable(Collection<? extends PhysicalSlot> newSlots) {
+        if (pendingRequests.isEmpty()) {
+            return;
+        }
+
+        if (slotBatchAllocatable) {
+            newSlotsAvailableForSlotBatchAllocatable(newSlots);
+        } else {
+            newSlotsAvailableForDirectlyAllocatable(newSlots);
+        }
+    }
+
+    private void newSlotsAvailableForSlotBatchAllocatable(
+            Collection<? extends PhysicalSlot> newSlots) {
+        log.debug("Received new available slots: {}", newSlots);
+
+        final FreeSlotTracker freeSlotInfoTracker = 
getDeclarativeSlotPool().getFreeSlotTracker();
+
+        final int slotsNum = freeSlotInfoTracker.getAvailableSlots().size();
+        if (slotsNum < pendingRequests.size()) {
+            // Do nothing and waiting slots.
+            log.debug(
+                    "The number of available slots: {}, the required number of 
slots: {}, waiting for more available slots.",
+                    slotsNum,
+                    pendingRequests.size());
+            return;
+        }
+
+        final Collection<PhysicalSlot> availableSlots =
+                freeSlotInfoTracker.getFreeSlotsInformation();
+        final Collection<RequestSlotMatchingStrategy.RequestSlotMatch> 
requestSlotMatches =
+                requestSlotMatchingStrategy.matchRequestsAndSlots(
+                        availableSlots, pendingRequests.values());
+        if (requestSlotMatches.size() == pendingRequests.size()) {
+            reserveAndFulfillMatchedFreeSlots(requestSlotMatches);
+        } else if (requestSlotMatches.size() < pendingRequests.size()) {
+            // Do nothing and waiting slots.
+            log.debug(
+                    "Ignored the matched results: {}, pendingRequests: {}, 
waiting for more available slots.",
+                    requestSlotMatches,
+                    pendingRequests);
+        } else {
+            // For requestSlotMatches.size() > pendingRequests.size()
+            throw new IllegalStateException(
+                    "The number of matched slots is not equals to the 
pendingRequests.");
+        }
+    }
+
+    private void newSlotsAvailableForDirectlyAllocatable(
+            Collection<? extends PhysicalSlot> newSlots) {
         final Collection<RequestSlotMatchingStrategy.RequestSlotMatch> 
requestSlotMatches =
                 requestSlotMatchingStrategy.matchRequestsAndSlots(
                         newSlots, pendingRequests.values());
+        reserveAndFulfillMatchedFreeSlots(requestSlotMatches);
+    }
 
+    private void reserveAndFulfillMatchedFreeSlots(
+            Collection<RequestSlotMatchingStrategy.RequestSlotMatch> 
requestSlotMatches) {
         for (RequestSlotMatchingStrategy.RequestSlotMatch match : 
requestSlotMatches) {
             final PendingRequest pendingRequest = match.getPendingRequest();
             final PhysicalSlot slot = match.getSlot();
@@ -234,6 +291,11 @@ public class DeclarativeSlotPoolBridge extends 
DeclarativeSlotPoolService implem
         }
     }
 
+    @VisibleForTesting
+    Collection<PhysicalSlot> getFreeSlotsInformation() {
+        return 
getDeclarativeSlotPool().getFreeSlotTracker().getFreeSlotsInformation();
+    }
+
     private void reserveFreeSlot(
             SlotRequestId slotRequestId,
             AllocationID allocationId,
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeServiceFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeServiceFactory.java
index c446c8727da..d3ac47875ff 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeServiceFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeServiceFactory.java
@@ -37,8 +37,15 @@ public class DeclarativeSlotPoolBridgeServiceFactory extends 
AbstractSlotPoolSer
             @Nonnull Duration slotIdleTimeout,
             @Nonnull Duration batchSlotTimeout,
             @Nonnull Duration slotRequestMaxInterval,
+            boolean slotBatchAllocatable,
             @Nonnull RequestSlotMatchingStrategy requestSlotMatchingStrategy) {
-        super(clock, rpcTimeout, slotIdleTimeout, batchSlotTimeout, 
slotRequestMaxInterval);
+        super(
+                clock,
+                rpcTimeout,
+                slotIdleTimeout,
+                batchSlotTimeout,
+                slotRequestMaxInterval,
+                slotBatchAllocatable);
         this.requestSlotMatchingStrategy = requestSlotMatchingStrategy;
     }
 
@@ -57,6 +64,7 @@ public class DeclarativeSlotPoolBridgeServiceFactory extends 
AbstractSlotPoolSer
                 batchSlotTimeout,
                 requestSlotMatchingStrategy,
                 slotRequestMaxInterval,
+                slotBatchAllocatable,
                 componentMainThreadExecutor);
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/AbstractDeclarativeSlotPoolBridgeTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/AbstractDeclarativeSlotPoolBridgeTest.java
index 6d9fff147ab..5f0c29b36b5 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/AbstractDeclarativeSlotPoolBridgeTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/AbstractDeclarativeSlotPoolBridgeTest.java
@@ -51,37 +51,51 @@ abstract class AbstractDeclarativeSlotPoolBridgeTest {
     @Parameter(1)
     protected Duration slotRequestMaxInterval;
 
-    @Parameters(name = "requestSlotMatchingStrategy: {0}, 
slotRequestMaxInterval: {1}")
+    @Parameter(2)
+    boolean slotBatchAllocatable;
+
+    @Parameters(
+            name =
+                    "requestSlotMatchingStrategy: {0}, slotRequestMaxInterval: 
{1}, slotBatchAllocatable: {2}")
     private static Collection<Object[]> data() {
         return Arrays.asList(
-                new Object[] {SimpleRequestSlotMatchingStrategy.INSTANCE, 
Duration.ZERO},
-                new Object[] {SimpleRequestSlotMatchingStrategy.INSTANCE, 
Duration.ofMillis(50)},
+                new Object[] {SimpleRequestSlotMatchingStrategy.INSTANCE, 
Duration.ZERO, false},
+                new Object[] {SimpleRequestSlotMatchingStrategy.INSTANCE, 
Duration.ZERO, true},
+                new Object[] {
+                    SimpleRequestSlotMatchingStrategy.INSTANCE, 
Duration.ofMillis(20), false
+                },
+                new Object[] {
+                    SimpleRequestSlotMatchingStrategy.INSTANCE, 
Duration.ofMillis(20), true
+                },
+                new Object[] {
+                    PreferredAllocationRequestSlotMatchingStrategy.INSTANCE, 
Duration.ZERO, false
+                },
                 new Object[] {
-                    PreferredAllocationRequestSlotMatchingStrategy.INSTANCE, 
Duration.ZERO
+                    PreferredAllocationRequestSlotMatchingStrategy.INSTANCE, 
Duration.ZERO, true
                 },
                 new Object[] {
-                    PreferredAllocationRequestSlotMatchingStrategy.INSTANCE, 
Duration.ofMillis(50)
+                    PreferredAllocationRequestSlotMatchingStrategy.INSTANCE,
+                    Duration.ofMillis(20),
+                    false
+                },
+                new Object[] {
+                    PreferredAllocationRequestSlotMatchingStrategy.INSTANCE,
+                    Duration.ofMillis(20),
+                    true
                 });
     }
 
     @Nonnull
     DeclarativeSlotPoolBridge createDeclarativeSlotPoolBridge(
-            DeclarativeSlotPoolFactory declarativeSlotPoolFactory,
-            RequestSlotMatchingStrategy requestSlotMatchingStrategy,
-            Duration slotRequestMaxInterval) {
+            DeclarativeSlotPoolFactory declarativeSlotPoolFactory) {
         return createDeclarativeSlotPoolBridge(
-                declarativeSlotPoolFactory,
-                requestSlotMatchingStrategy,
-                slotRequestMaxInterval,
-                componentMainThreadExecutor);
+                declarativeSlotPoolFactory, componentMainThreadExecutor);
     }
 
     @Nonnull
     DeclarativeSlotPoolBridge createDeclarativeSlotPoolBridge(
             DeclarativeSlotPoolFactory declarativeSlotPoolFactory,
-            RequestSlotMatchingStrategy requestSlotMatchingStrategy,
-            Duration slotRequestMaxInterval,
-            ComponentMainThreadExecutor mainThreadExecutor) {
+            ComponentMainThreadExecutor componentMainThreadExecutor) {
         return new DeclarativeSlotPoolBridge(
                 JOB_ID,
                 declarativeSlotPoolFactory,
@@ -91,7 +105,8 @@ abstract class AbstractDeclarativeSlotPoolBridgeTest {
                 Duration.ofSeconds(20),
                 requestSlotMatchingStrategy,
                 slotRequestMaxInterval,
-                mainThreadExecutor);
+                slotBatchAllocatable,
+                componentMainThreadExecutor);
     }
 
     static PhysicalSlot createAllocatedSlot(AllocationID allocationID) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeBuilder.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeBuilder.java
index 9855ba8be84..0525f15b4c0 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeBuilder.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeBuilder.java
@@ -45,6 +45,7 @@ public class DeclarativeSlotPoolBridgeBuilder {
     private Clock clock = SystemClock.getInstance();
     private Duration slotRequestMaxInterval = 
SLOT_REQUEST_MAX_INTERVAL.defaultValue();
     private ComponentMainThreadExecutor mainThreadExecutor = forMainThread();
+    private boolean slotBatchAllocatable = false;
 
     @Nullable
     private ResourceManagerGateway resourceManagerGateway = new 
TestingResourceManagerGateway();
@@ -96,6 +97,11 @@ public class DeclarativeSlotPoolBridgeBuilder {
         return this;
     }
 
+    public DeclarativeSlotPoolBridgeBuilder setSlotBatchAllocatable(boolean 
slotBatchAllocatable) {
+        this.slotBatchAllocatable = slotBatchAllocatable;
+        return this;
+    }
+
     public DeclarativeSlotPoolBridge build() {
         return new DeclarativeSlotPoolBridge(
                 jobId,
@@ -106,6 +112,7 @@ public class DeclarativeSlotPoolBridgeBuilder {
                 batchSlotTimeout,
                 requestSlotMatchingStrategy,
                 slotRequestMaxInterval,
+                slotBatchAllocatable,
                 mainThreadExecutor);
     }
 
@@ -120,6 +127,7 @@ public class DeclarativeSlotPoolBridgeBuilder {
                         batchSlotTimeout,
                         requestSlotMatchingStrategy,
                         slotRequestMaxInterval,
+                        slotBatchAllocatable,
                         mainThreadExecutor);
 
         slotPool.start(JobMasterId.generate(), "foobar");
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgePreferredAllocationsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgePreferredAllocationsTest.java
index ace4cf756c8..9107a9b67d2 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgePreferredAllocationsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgePreferredAllocationsTest.java
@@ -29,7 +29,8 @@ import 
org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
 import org.apache.flink.testutils.TestingUtils;
 import org.apache.flink.util.clock.SystemClock;
 
-import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import javax.annotation.Nonnull;
 
@@ -45,8 +46,10 @@ import static org.assertj.core.api.Assertions.assertThat;
 
 class DeclarativeSlotPoolBridgePreferredAllocationsTest {
 
-    @Test
-    void testDeclarativeSlotPoolTakesPreferredAllocationsIntoAccount() throws 
Exception {
+    @ValueSource(booleans = {true, false})
+    @ParameterizedTest(name = "slotBatchAllocatable: {0}")
+    void testDeclarativeSlotPoolTakesPreferredAllocationsIntoAccount(boolean 
slotBatchAllocatable)
+            throws Exception {
         final DeclarativeSlotPoolBridge declarativeSlotPoolBridge =
                 new DeclarativeSlotPoolBridge(
                         new JobID(),
@@ -57,6 +60,7 @@ class DeclarativeSlotPoolBridgePreferredAllocationsTest {
                         TestingUtils.infiniteDuration(),
                         
PreferredAllocationRequestSlotMatchingStrategy.INSTANCE,
                         Duration.ZERO,
+                        slotBatchAllocatable,
                         forMainThread());
 
         declarativeSlotPoolBridge.start(JobMasterId.generate(), "localhost");
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeRequestCompletionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeRequestCompletionTest.java
index 71909ca5aac..a1967dc920d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeRequestCompletionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeRequestCompletionTest.java
@@ -25,11 +25,16 @@ import org.apache.flink.runtime.jobmaster.SlotRequestId;
 import 
org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
 import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
 import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
+import 
org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.function.CheckedSupplier;
 
+import org.assertj.core.util.Lists;
 import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
 
 import java.time.Duration;
 import java.util.Collection;
@@ -44,29 +49,38 @@ import java.util.stream.IntStream;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests how the {@link DeclarativeSlotPoolBridge} completes slot requests. */
+@ExtendWith(ParameterizedTestExtension.class)
 class DeclarativeSlotPoolBridgeRequestCompletionTest {
 
     private static final Duration TIMEOUT = SlotPoolUtils.TIMEOUT;
 
     private TestingResourceManagerGateway resourceManagerGateway;
 
+    @Parameter private boolean slotBatchAllocatable;
+
+    @Parameters(name = "slotBatchAllocatable: {0}")
+    public static List<Boolean> getSlotBatchAllocatableParams() {
+        return Lists.newArrayList(false, true);
+    }
+
     @BeforeEach
     void setUp() {
         resourceManagerGateway = new TestingResourceManagerGateway();
     }
 
     /** Tests that the {@link DeclarativeSlotPoolBridge} completes slots in 
request order. */
-    @Test
+    @TestTemplate
     void testRequestsAreCompletedInRequestOrder() {
         runSlotRequestCompletionTest(
-                CheckedSupplier.unchecked(this::createAndSetUpSlotPool), 
slotPool -> {});
+                CheckedSupplier.unchecked(() -> 
createAndSetUpSlotPool(slotBatchAllocatable)),
+                slotPool -> {});
     }
 
     /**
      * Tests that the {@link DeclarativeSlotPoolBridge} completes stashed slot 
requests in request
      * order.
      */
-    @Test
+    @TestTemplate
     void testStashOrderMaintainsRequestOrder() {
         runSlotRequestCompletionTest(
                 
CheckedSupplier.unchecked(this::createAndSetUpSlotPoolWithoutResourceManager),
@@ -112,17 +126,25 @@ class DeclarativeSlotPoolBridgeRequestCompletionTest {
             final FlinkException testingReleaseException =
                     new FlinkException("Testing release exception");
 
+            DeclarativeSlotPoolBridge slotPoolBridge = 
(DeclarativeSlotPoolBridge) slotPool;
+
             // check that the slot requests get completed in sequential order
             for (int i = 0; i < slotRequestIds.size(); i++) {
                 final CompletableFuture<PhysicalSlot> slotRequestFuture = 
slotRequests.get(i);
-                assertThat(slotRequestFuture.getNow(null)).isNotNull();
+                if (slotBatchAllocatable) {
+                    assertThat(slotRequestFuture.getNow(null)).isNull();
+                    
assertThat(slotPoolBridge.getFreeSlotsInformation()).hasSize(1);
+                } else {
+                    assertThat(slotRequestFuture.getNow(null)).isNotNull();
+                }
                 slotPool.releaseSlot(slotRequestIds.get(i), 
testingReleaseException);
             }
         }
     }
 
-    private SlotPool createAndSetUpSlotPool() throws Exception {
+    private SlotPool createAndSetUpSlotPool(boolean slotBatchAllocatable) 
throws Exception {
         return new DeclarativeSlotPoolBridgeBuilder()
+                .setSlotBatchAllocatable(slotBatchAllocatable)
                 .setResourceManagerGateway(resourceManagerGateway)
                 .buildAndStart();
     }
@@ -133,6 +155,7 @@ class DeclarativeSlotPoolBridgeRequestCompletionTest {
 
     private SlotPool createAndSetUpSlotPoolWithoutResourceManager() throws 
Exception {
         return new DeclarativeSlotPoolBridgeBuilder()
+                .setSlotBatchAllocatable(slotBatchAllocatable)
                 .setResourceManagerGateway(null)
                 .buildAndStart();
     }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeResourceDeclarationTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeResourceDeclarationTest.java
index d830a6649e9..3778b4e503b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeResourceDeclarationTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeResourceDeclarationTest.java
@@ -31,11 +31,17 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.TestTemplate;
 import org.junit.jupiter.api.extension.ExtendWith;
 
+import javax.annotation.Nonnull;
+
 import java.time.Duration;
 import java.util.Collections;
+import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.core.testutils.FlinkAssertions.assertThatFuture;
 import static org.assertj.core.api.Assertions.assertThat;
@@ -50,7 +56,8 @@ class DeclarativeSlotPoolBridgeResourceDeclarationTest
 
     @BeforeEach
     void setup() {
-        requirementListener = new RequirementListener();
+        requirementListener =
+                new RequirementListener(componentMainThreadExecutor, 
slotRequestMaxInterval);
 
         constructDeclarativeSlotPoolBridge(componentMainThreadExecutor);
     }
@@ -76,11 +83,7 @@ class DeclarativeSlotPoolBridgeResourceDeclarationTest
         final TestingDeclarativeSlotPoolFactory declarativeSlotPoolFactory =
                 new TestingDeclarativeSlotPoolFactory(slotPoolBuilder);
         declarativeSlotPoolBridge =
-                createDeclarativeSlotPoolBridge(
-                        declarativeSlotPoolFactory,
-                        requestSlotMatchingStrategy,
-                        slotRequestMaxInterval,
-                        mainThreadExecutor);
+                createDeclarativeSlotPoolBridge(declarativeSlotPoolFactory, 
mainThreadExecutor);
     }
 
     @AfterEach
@@ -97,6 +100,9 @@ class DeclarativeSlotPoolBridgeResourceDeclarationTest
         // requesting the allocation of a new slot should increase the 
requirements
         declarativeSlotPoolBridge.requestNewAllocatedSlot(
                 new SlotRequestId(), ResourceProfile.UNKNOWN, 
Duration.ofMinutes(5));
+
+        requirementListener.tryWaitSlotRequestIsDone();
+
         
assertThat(requirementListener.getRequirements().getResourceCount(ResourceProfile.UNKNOWN))
                 .isOne();
     }
@@ -109,6 +115,8 @@ class DeclarativeSlotPoolBridgeResourceDeclarationTest
             ComponentMainThreadExecutor mainThreadExecutor =
                     
ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(
                             scheduledExecutorService);
+            requirementListener =
+                    new RequirementListener(mainThreadExecutor, 
slotRequestMaxInterval);
             constructDeclarativeSlotPoolBridge(mainThreadExecutor);
             declarativeSlotPoolBridge.start(JOB_MASTER_ID, "localhost");
 
@@ -119,12 +127,15 @@ class DeclarativeSlotPoolBridgeResourceDeclarationTest
                                             
declarativeSlotPoolBridge.requestNewAllocatedSlot(
                                                     new SlotRequestId(),
                                                     ResourceProfile.UNKNOWN,
-                                                    Duration.ofMillis(5)),
+                                                    Duration.ofMillis(
+                                                            
slotRequestMaxInterval.toMillis() * 2)),
                                     mainThreadExecutor)
                             .get();
-
+            requirementListener.tryWaitSlotRequestIsDone();
             // waiting for the timeout
             
assertThatFuture(allocationFuture).failsWithin(Duration.ofMinutes(1));
+            requirementListener.tryWaitSlotRequestIsDone();
+
             // when the allocation fails the requirements should be reduced 
(it is the users
             // responsibility to retry)
             CompletableFuture.runAsync(
@@ -164,6 +175,9 @@ class DeclarativeSlotPoolBridgeResourceDeclarationTest
         final SlotRequestId slotRequestId = new SlotRequestId();
         declarativeSlotPoolBridge.allocateAvailableSlot(
                 slotRequestId, newSlot.getAllocationId(), 
ResourceProfile.UNKNOWN);
+
+        requirementListener.tryWaitSlotRequestIsDone();
+
         
assertThat(requirementListener.getRequirements().getResourceCount(ResourceProfile.UNKNOWN))
                 .isOne();
     }
@@ -179,6 +193,8 @@ class DeclarativeSlotPoolBridgeResourceDeclarationTest
         declarativeSlotPoolBridge.allocateAvailableSlot(
                 slotRequestId, newSlot.getAllocationId(), 
ResourceProfile.UNKNOWN);
 
+        requirementListener.tryWaitSlotRequestIsDone();
+
         // releasing (==freeing) a [reserved] slot should decrease the 
requirements
         declarativeSlotPoolBridge.releaseSlot(
                 slotRequestId, new RuntimeException("Test exception"));
@@ -196,6 +212,8 @@ class DeclarativeSlotPoolBridgeResourceDeclarationTest
         declarativeSlotPoolBridge.allocateAvailableSlot(
                 new SlotRequestId(), newSlot.getAllocationId(), 
ResourceProfile.UNKNOWN);
 
+        requirementListener.tryWaitSlotRequestIsDone();
+
         // releasing (==freeing) a [reserved] slot should decrease the 
requirements
         declarativeSlotPoolBridge.failAllocation(
                 newSlot.getTaskManagerLocation().getResourceID(),
@@ -205,12 +223,36 @@ class DeclarativeSlotPoolBridgeResourceDeclarationTest
                 .isZero();
     }
 
+    /** Requirement listener for testing. */
     private static final class RequirementListener {
 
+        ComponentMainThreadExecutor componentMainThreadExecutor;
+        Duration slotRequestMaxInterval;
+        ScheduledFuture<?> slotRequestFuture;
+
+        RequirementListener(
+                ComponentMainThreadExecutor componentMainThreadExecutor,
+                @Nonnull Duration slotRequestMaxInterval) {
+            this.componentMainThreadExecutor = componentMainThreadExecutor;
+            this.slotRequestMaxInterval = slotRequestMaxInterval;
+        }
+
         private ResourceCounter requirements = ResourceCounter.empty();
 
         private void increaseRequirements(ResourceCounter requirements) {
-            this.requirements = this.requirements.add(requirements);
+            if (slotRequestMaxInterval.toMillis() <= 0L) {
+                this.requirements = this.requirements.add(requirements);
+                return;
+            }
+
+            if (!slotSlotRequestFutureAssignable()) {
+                slotRequestFuture.cancel(true);
+            }
+            slotRequestFuture =
+                    componentMainThreadExecutor.schedule(
+                            () -> 
this.checkSlotRequestMaxInterval(requirements),
+                            slotRequestMaxInterval.toMillis(),
+                            TimeUnit.MILLISECONDS);
         }
 
         private void decreaseRequirements(ResourceCounter requirements) {
@@ -220,5 +262,28 @@ class DeclarativeSlotPoolBridgeResourceDeclarationTest
         public ResourceCounter getRequirements() {
             return requirements;
         }
+
+        public void tryWaitSlotRequestIsDone() {
+            if (Objects.nonNull(slotRequestFuture)) {
+                try {
+                    slotRequestFuture.get();
+                } catch (InterruptedException | ExecutionException e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        }
+
+        private boolean slotSlotRequestFutureAssignable() {
+            return slotRequestFuture == null
+                    || slotRequestFuture.isDone()
+                    || slotRequestFuture.isCancelled();
+        }
+
+        private void checkSlotRequestMaxInterval(ResourceCounter requirements) 
{
+            if (slotRequestMaxInterval.toMillis() <= 0L) {
+                return;
+            }
+            this.requirements = this.requirements.add(requirements);
+        }
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeTest.java
index ee261fa76aa..2f0f3dabba0 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeTest.java
@@ -35,7 +35,9 @@ import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.stream.Collectors;
@@ -55,12 +57,23 @@ class DeclarativeSlotPoolBridgeTest extends 
AbstractDeclarativeSlotPoolBridgeTes
         final PhysicalSlot allocatedSlot = 
createAllocatedSlot(expectedAllocationId);
 
         final TestingDeclarativeSlotPoolFactory declarativeSlotPoolFactory =
-                new 
TestingDeclarativeSlotPoolFactory(TestingDeclarativeSlotPool.builder());
+                new TestingDeclarativeSlotPoolFactory(
+                        TestingDeclarativeSlotPool.builder()
+                                .setGetFreeSlotTrackerSupplier(
+                                        () ->
+                                                
TestingFreeSlotTracker.newBuilder()
+                                                        
.setGetFreeSlotsInformationSupplier(
+                                                                () ->
+                                                                        
Collections.singleton(
+                                                                               
 allocatedSlot))
+                                                        
.setGetAvailableSlotsSupplier(
+                                                                () ->
+                                                                        
Collections.singleton(
+                                                                               
 allocatedSlot
+                                                                               
         .getAllocationId()))
+                                                        .build()));
         try (DeclarativeSlotPoolBridge declarativeSlotPoolBridge =
-                createDeclarativeSlotPoolBridge(
-                        declarativeSlotPoolFactory,
-                        requestSlotMatchingStrategy,
-                        slotRequestMaxInterval)) {
+                createDeclarativeSlotPoolBridge(declarativeSlotPoolFactory)) {
 
             declarativeSlotPoolBridge.start(JOB_MASTER_ID, "localhost");
 
@@ -68,6 +81,8 @@ class DeclarativeSlotPoolBridgeTest extends 
AbstractDeclarativeSlotPoolBridgeTes
                     declarativeSlotPoolBridge.requestNewAllocatedSlot(
                             slotRequestId, ResourceProfile.UNKNOWN, null);
 
+            tryWaitSlotRequestIsDone(declarativeSlotPoolBridge);
+
             
declarativeSlotPoolBridge.newSlotsAreAvailable(Collections.singleton(allocatedSlot));
 
             slotAllocationFuture.join();
@@ -81,10 +96,7 @@ class DeclarativeSlotPoolBridgeTest extends 
AbstractDeclarativeSlotPoolBridgeTes
         final TestingDeclarativeSlotPoolFactory declarativeSlotPoolFactory =
                 new 
TestingDeclarativeSlotPoolFactory(TestingDeclarativeSlotPool.builder());
         try (DeclarativeSlotPoolBridge declarativeSlotPoolBridge =
-                createDeclarativeSlotPoolBridge(
-                        declarativeSlotPoolFactory,
-                        requestSlotMatchingStrategy,
-                        slotRequestMaxInterval)) {
+                createDeclarativeSlotPoolBridge(declarativeSlotPoolFactory)) {
 
             declarativeSlotPoolBridge.start(JOB_MASTER_ID, "localhost");
 
@@ -98,6 +110,8 @@ class DeclarativeSlotPoolBridgeTest extends 
AbstractDeclarativeSlotPoolBridgeTes
                                     componentMainThreadExecutor)
                             .get();
 
+            tryWaitSlotRequestIsDone(declarativeSlotPoolBridge);
+
             componentMainThreadExecutor.execute(
                     () ->
                             
declarativeSlotPoolBridge.notifyNotEnoughResourcesAvailable(
@@ -129,16 +143,16 @@ class DeclarativeSlotPoolBridgeTest extends 
AbstractDeclarativeSlotPoolBridgeTes
         final TestingDeclarativeSlotPoolFactory declarativeSlotPoolFactory =
                 new TestingDeclarativeSlotPoolFactory(builder);
         try (DeclarativeSlotPoolBridge declarativeSlotPoolBridge =
-                createDeclarativeSlotPoolBridge(
-                        declarativeSlotPoolFactory,
-                        requestSlotMatchingStrategy,
-                        slotRequestMaxInterval)) {
+                createDeclarativeSlotPoolBridge(declarativeSlotPoolFactory)) {
             declarativeSlotPoolBridge.start(JOB_MASTER_ID, "localhost");
 
             final SlotRequestId slotRequestId = new SlotRequestId();
 
             declarativeSlotPoolBridge.allocateAvailableSlot(
                     slotRequestId, expectedAllocationId, 
allocatedSlot.getResourceProfile());
+
+            tryWaitSlotRequestIsDone(declarativeSlotPoolBridge);
+
             declarativeSlotPoolBridge.releaseSlot(slotRequestId, null);
 
             
assertThat(releaseSlotFuture.join()).isSameAs(expectedAllocationId);
@@ -148,10 +162,7 @@ class DeclarativeSlotPoolBridgeTest extends 
AbstractDeclarativeSlotPoolBridgeTes
     @TestTemplate
     void testNoConcurrentModificationWhenSuspendingAndReleasingSlot() throws 
Exception {
         try (DeclarativeSlotPoolBridge declarativeSlotPoolBridge =
-                createDeclarativeSlotPoolBridge(
-                        new DefaultDeclarativeSlotPoolFactory(),
-                        requestSlotMatchingStrategy,
-                        slotRequestMaxInterval)) {
+                createDeclarativeSlotPoolBridge(new 
DefaultDeclarativeSlotPoolFactory())) {
 
             declarativeSlotPoolBridge.start(JOB_MASTER_ID, "localhost");
 
@@ -178,6 +189,8 @@ class DeclarativeSlotPoolBridgeTest extends 
AbstractDeclarativeSlotPoolBridgeTes
                                     })
                             .collect(Collectors.toList());
 
+            tryWaitSlotRequestIsDone(declarativeSlotPoolBridge);
+
             declarativeSlotPoolBridge.close();
 
             assertThatThrownBy(() -> FutureUtils.waitForAll(slotFutures).get())
@@ -189,10 +202,7 @@ class DeclarativeSlotPoolBridgeTest extends 
AbstractDeclarativeSlotPoolBridgeTes
     @TestTemplate
     void testAcceptingOfferedSlotsWithoutResourceManagerConnected() throws 
Exception {
         try (DeclarativeSlotPoolBridge declarativeSlotPoolBridge =
-                createDeclarativeSlotPoolBridge(
-                        new DefaultDeclarativeSlotPoolFactory(),
-                        requestSlotMatchingStrategy,
-                        slotRequestMaxInterval)) {
+                createDeclarativeSlotPoolBridge(new 
DefaultDeclarativeSlotPoolFactory())) {
 
             declarativeSlotPoolBridge.start(JOB_MASTER_ID, "localhost");
 
@@ -200,6 +210,8 @@ class DeclarativeSlotPoolBridgeTest extends 
AbstractDeclarativeSlotPoolBridgeTes
                     declarativeSlotPoolBridge.requestNewAllocatedSlot(
                             new SlotRequestId(), ResourceProfile.UNKNOWN, 
RPC_TIMEOUT);
 
+            tryWaitSlotRequestIsDone(declarativeSlotPoolBridge);
+
             final LocalTaskManagerLocation localTaskManagerLocation =
                     new LocalTaskManagerLocation();
             
declarativeSlotPoolBridge.registerTaskManager(localTaskManagerLocation.getResourceID());
@@ -230,10 +242,7 @@ class DeclarativeSlotPoolBridgeTest extends 
AbstractDeclarativeSlotPoolBridgeTes
                                         }));
 
         try (DeclarativeSlotPoolBridge declarativeSlotPoolBridge =
-                createDeclarativeSlotPoolBridge(
-                        declarativeSlotPoolFactory,
-                        requestSlotMatchingStrategy,
-                        slotRequestMaxInterval)) {
+                createDeclarativeSlotPoolBridge(declarativeSlotPoolFactory)) {
             declarativeSlotPoolBridge.start(JOB_MASTER_ID, "localhost");
 
             declarativeSlotPoolBridge.setIsJobRestarting(true);
@@ -252,4 +261,90 @@ class DeclarativeSlotPoolBridgeTest extends 
AbstractDeclarativeSlotPoolBridgeTes
             registerSlotsCalledFuture.join();
         }
     }
+
+    @TestTemplate
+    void testSlotsBatchAllocatableLogic() throws Exception {
+        testSlotsBatchAllocatableLogic(1);
+        testSlotsBatchAllocatableLogic(2);
+        testSlotsBatchAllocatableLogic(4);
+        testSlotsBatchAllocatableLogic(7);
+        testSlotsBatchAllocatableLogic(10);
+        testSlotsBatchAllocatableLogic(32);
+    }
+
+    private void testSlotsBatchAllocatableLogic(int requestSlotNum) throws 
Exception {
+
+        final Set<AllocationID> availableSlotsIds = new HashSet<>();
+        final Set<PhysicalSlot> freeSlotsInformation = new HashSet<>();
+
+        try (DeclarativeSlotPoolBridge slotPoolBridge =
+                createDeclarativeSlotPoolBridge(freeSlotsInformation, 
availableSlotsIds)) {
+
+            slotPoolBridge.start(JOB_MASTER_ID, "localhost");
+
+            final List<CompletableFuture<PhysicalSlot>> futures = new 
ArrayList<>(requestSlotNum);
+            for (int i = 0; i < requestSlotNum; i++) {
+                futures.add(
+                        slotPoolBridge.requestNewAllocatedSlot(
+                                new SlotRequestId(), ResourceProfile.UNKNOWN, 
null));
+            }
+
+            tryWaitSlotRequestIsDone(slotPoolBridge);
+
+            for (int i = 0; i < requestSlotNum; i++) {
+                final PhysicalSlot slot = createAllocatedSlot(new 
AllocationID());
+                newSlotsAreAvailable(slotPoolBridge, freeSlotsInformation, 
availableSlotsIds, slot);
+                if (slotBatchAllocatable) {
+                    checkForSlotBatchAllocating(requestSlotNum, i, futures);
+                } else {
+                    // Check for allocating slots directly.
+                    futures.get(i).join();
+                }
+            }
+        }
+    }
+
+    private void checkForSlotBatchAllocating(
+            int requestSlotNum, int requestIndex, 
List<CompletableFuture<PhysicalSlot>> futures) {
+        if (requestIndex < requestSlotNum - 1) {
+            
assertThat(FutureUtils.waitForAll(futures).getNumFuturesCompleted()).isZero();
+        } else {
+            FutureUtils.waitForAll(futures).join();
+        }
+    }
+
+    private void newSlotsAreAvailable(
+            DeclarativeSlotPoolBridge declarativeSlotPoolBridge,
+            Set<PhysicalSlot> freeSlotsInformation,
+            Set<AllocationID> availableSlotsIds,
+            PhysicalSlot slot) {
+        freeSlotsInformation.add(slot);
+        availableSlotsIds.add(slot.getAllocationId());
+        
declarativeSlotPoolBridge.newSlotsAreAvailable(Collections.singleton(slot));
+    }
+
+    private void tryWaitSlotRequestIsDone(DeclarativeSlotPoolBridge 
declarativeSlotPoolBridge) {
+        if (declarativeSlotPoolBridge.getDeclarativeSlotPool()
+                instanceof DefaultDeclarativeSlotPool) {
+            final DefaultDeclarativeSlotPool slotPool =
+                    (DefaultDeclarativeSlotPool) 
declarativeSlotPoolBridge.getDeclarativeSlotPool();
+            slotPool.tryWaitSlotRequestIsDone();
+        }
+    }
+
+    private DeclarativeSlotPoolBridge createDeclarativeSlotPoolBridge(
+            Set<PhysicalSlot> freeSlotsInformation, Set<AllocationID> 
availableSlotsIds) {
+        final TestingDeclarativeSlotPoolFactory declarativeSlotPoolFactory =
+                new TestingDeclarativeSlotPoolFactory(
+                        TestingDeclarativeSlotPool.builder()
+                                .setGetFreeSlotTrackerSupplier(
+                                        () ->
+                                                
TestingFreeSlotTracker.newBuilder()
+                                                        
.setGetFreeSlotsInformationSupplier(
+                                                                () -> 
freeSlotsInformation)
+                                                        
.setGetAvailableSlotsSupplier(
+                                                                () -> 
availableSlotsIds)
+                                                        .build()));
+        return createDeclarativeSlotPoolBridge(declarativeSlotPoolFactory);
+    }
 }


Reply via email to