1996fanrui commented on code in PR #25134:
URL: https://github.com/apache/flink/pull/25134#discussion_r1703679953


##########
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeResourceDeclarationTest.java:
##########
@@ -205,12 +224,36 @@ void testRequirementsDecreasedOnSlotAllocationFailure() 
throws Exception {
                 .isZero();
     }
 
-    private static final class RequirementListener {
+    /** Requirement listener for testing. */
+    public static final class RequirementListener {

Review Comment:
   ```suggestion
       static final class RequirementListener {
   ```
   
   Default is enough.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimplePhysicalSlot.java:
##########
@@ -21,12 +21,12 @@
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
-import org.apache.flink.runtime.jobmaster.SlotContext;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.Preconditions;
 
-/** Simple implementation of the {@link SlotContext} interface for the legacy 
code. */
-public class SimpleSlotContext implements SlotContext {
+/** Simple implementation of the {@link PhysicalSlot} interface for the legacy 
code. */
+public class SimplePhysicalSlot implements PhysicalSlot {

Review Comment:
   After this PR, `SimplePhysicalSlot` and `TestingPhysicalSlot` are similar 
(TestingPhysicalSlot has Payload payload related logic than  
`SimplePhysicalSlot`.)
   
   I'm curious is it possible to remove `SimplePhysicalSlot`, and all callers 
use `TestingPhysicalSlot`?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java:
##########
@@ -548,4 +610,13 @@ void increaseResourceRequirementsBy(ResourceCounter 
increment) {
     boolean isBatchSlotRequestTimeoutCheckEnabled() {
         return !isBatchSlotRequestTimeoutCheckDisabled;
     }
+
+    @VisibleForTesting
+    public void tryWaitSlotRequestIsDone() {

Review Comment:
   ```suggestion
       void tryWaitSlotRequestIsDone() {
   ```
   
   public isn't needed.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java:
##########
@@ -548,4 +610,13 @@ void increaseResourceRequirementsBy(ResourceCounter 
increment) {
     boolean isBatchSlotRequestTimeoutCheckEnabled() {
         return !isBatchSlotRequestTimeoutCheckDisabled;
     }
+
+    @VisibleForTesting
+    public void tryWaitSlotRequestIsDone() {
+        if (getDeclarativeSlotPool() instanceof DefaultDeclarativeSlotPool) {
+            final DefaultDeclarativeSlotPool slotPool =
+                    (DefaultDeclarativeSlotPool) getDeclarativeSlotPool();
+            slotPool.tryWaitSlotRequestIsDone();
+        }
+    }

Review Comment:
   If last comment makes sense, could we move ` void 
tryWaitSlotRequestIsDone()` method to `DeclarativeSlotPoolBridgeTest`?
   
   Only `DeclarativeSlotPoolBridgeTest` calls `tryWaitSlotRequestIsDone()`.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java:
##########
@@ -234,6 +290,11 @@ void newSlotsAreAvailable(Collection<? extends 
PhysicalSlot> newSlots) {
         }
     }
 
+    @VisibleForTesting
+    Collection<PhysicalSlot> getFreePhysicalSlots() {

Review Comment:
   ```suggestion
       Collection<PhysicalSlot> getFreeSlotsInformation() {
   ```
   
   nit: how about keeping it's same with interface?



##########
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingFreeSlotTracker.java:
##########
@@ -151,23 +151,23 @@ public Builder 
setReserveSlotConsumer(Consumer<AllocationID> reserveSlotConsumer
             return this;
         }
 
-        public Builder 
setCreateNewFreeSlotInfoTrackerWithoutBlockedSlotsFunction(
-                Function<Set<AllocationID>, FreeSlotInfoTracker>
-                        
createNewFreeSlotInfoTrackerWithoutBlockedSlotsFunction) {
-            this.createNewFreeSlotInfoTrackerWithoutBlockedSlotsFunction =
-                    createNewFreeSlotInfoTrackerWithoutBlockedSlotsFunction;
+        public Builder setCreateNewFreeSlotTrackerWithoutBlockedSlotsFunction(

Review Comment:
   It seems this method is not called.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeTest.java:
##########
@@ -55,19 +58,32 @@ void testSlotOffer() throws Exception {
         final PhysicalSlot allocatedSlot = 
createAllocatedSlot(expectedAllocationId);
 
         final TestingDeclarativeSlotPoolFactory declarativeSlotPoolFactory =
-                new 
TestingDeclarativeSlotPoolFactory(TestingDeclarativeSlotPool.builder());
+                new TestingDeclarativeSlotPoolFactory(

Review Comment:
   As I understand, the `slotBatchAllocatable` is the core feature of this PR. 
But I didn't find any test from `DeclarativeSlotPoolBridgeTest` cover this 
case, for example:  Request 2 slots in total.
   
   - When slotBatchAllocatable is disabled
     - When the first slot is ready: we assign it directly
     - When the second slot is ready: we assign it directly
   - When slotBatchAllocatable is enabled
     - When the first slot is ready: we don't assign it
     - When the second slot is ready: we assign 2 slots together
   
   Please correct my if I miss something, thanks



##########
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/AbstractDeclarativeSlotPoolBridgeTest.java:
##########
@@ -51,37 +52,53 @@ abstract class AbstractDeclarativeSlotPoolBridgeTest {
     @Parameter(1)
     protected Duration slotRequestMaxInterval;
 
-    @Parameters(name = "requestSlotMatchingStrategy: {0}, 
slotRequestMaxInterval: {1}")
+    @Parameter(2)
+    boolean slotBatchAllocatable;
+
+    @Parameters(
+            name =
+                    "requestSlotMatchingStrategy: {0}, slotRequestMaxInterval: 
{1}, slotBatchAllocatable: {2}")
     private static Collection<Object[]> data() {
         return Arrays.asList(
-                new Object[] {SimpleRequestSlotMatchingStrategy.INSTANCE, 
Duration.ZERO},
-                new Object[] {SimpleRequestSlotMatchingStrategy.INSTANCE, 
Duration.ofMillis(50)},
+                new Object[] {SimpleRequestSlotMatchingStrategy.INSTANCE, 
Duration.ZERO, false},
+                new Object[] {SimpleRequestSlotMatchingStrategy.INSTANCE, 
Duration.ZERO, true},
+                new Object[] {
+                    SimpleRequestSlotMatchingStrategy.INSTANCE, 
Duration.ofMillis(20), false
+                },
+                new Object[] {
+                    SimpleRequestSlotMatchingStrategy.INSTANCE, 
Duration.ofMillis(20), true
+                },
+                new Object[] {
+                    PreferredAllocationRequestSlotMatchingStrategy.INSTANCE, 
Duration.ZERO, false
+                },
                 new Object[] {
-                    PreferredAllocationRequestSlotMatchingStrategy.INSTANCE, 
Duration.ZERO
+                    PreferredAllocationRequestSlotMatchingStrategy.INSTANCE, 
Duration.ZERO, true
                 },
                 new Object[] {
-                    PreferredAllocationRequestSlotMatchingStrategy.INSTANCE, 
Duration.ofMillis(50)
+                    PreferredAllocationRequestSlotMatchingStrategy.INSTANCE,
+                    Duration.ofMillis(20),
+                    false
+                },
+                new Object[] {
+                    PreferredAllocationRequestSlotMatchingStrategy.INSTANCE,
+                    Duration.ofMillis(20),
+                    true
                 });
     }
 
     @Nonnull
     DeclarativeSlotPoolBridge createDeclarativeSlotPoolBridge(
-            DeclarativeSlotPoolFactory declarativeSlotPoolFactory,
-            RequestSlotMatchingStrategy requestSlotMatchingStrategy,
-            Duration slotRequestMaxInterval) {
+            DeclarativeSlotPoolFactory declarativeSlotPoolFactory) {
         return createDeclarativeSlotPoolBridge(
-                declarativeSlotPoolFactory,
-                requestSlotMatchingStrategy,
-                slotRequestMaxInterval,
-                componentMainThreadExecutor);
+                declarativeSlotPoolFactory, componentMainThreadExecutor, null);
     }
 
     @Nonnull
     DeclarativeSlotPoolBridge createDeclarativeSlotPoolBridge(
             DeclarativeSlotPoolFactory declarativeSlotPoolFactory,
-            RequestSlotMatchingStrategy requestSlotMatchingStrategy,
-            Duration slotRequestMaxInterval,
-            ComponentMainThreadExecutor mainThreadExecutor) {
+            ComponentMainThreadExecutor componentMainThreadExecutor,
+            
DeclarativeSlotPoolBridgeResourceDeclarationTest.RequirementListener

Review Comment:
   Is it possible that we don't pass `RequirementListener` here?
   
   Alternative solution is : All tests of 
DeclarativeSlotPoolBridgeResourceDeclarationTest could call 
`requirementListener#tryWaitSlotRequestIsDone` directly.
   
   In general, the parent class doesn't depends on the class inside of children 
class.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to