1996fanrui commented on code in PR #25134: URL: https://github.com/apache/flink/pull/25134#discussion_r1703679953
########## flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeResourceDeclarationTest.java: ########## @@ -205,12 +224,36 @@ void testRequirementsDecreasedOnSlotAllocationFailure() throws Exception { .isZero(); } - private static final class RequirementListener { + /** Requirement listener for testing. */ + public static final class RequirementListener { Review Comment: ```suggestion static final class RequirementListener { ``` Default is enough. ########## flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimplePhysicalSlot.java: ########## @@ -21,12 +21,12 @@ import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; -import org.apache.flink.runtime.jobmaster.SlotContext; +import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.util.Preconditions; -/** Simple implementation of the {@link SlotContext} interface for the legacy code. */ -public class SimpleSlotContext implements SlotContext { +/** Simple implementation of the {@link PhysicalSlot} interface for the legacy code. */ +public class SimplePhysicalSlot implements PhysicalSlot { Review Comment: After this PR, `SimplePhysicalSlot` and `TestingPhysicalSlot` are similar (TestingPhysicalSlot has Payload payload related logic than `SimplePhysicalSlot`.) I'm curious is it possible to remove `SimplePhysicalSlot`, and all callers use `TestingPhysicalSlot`? ########## flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java: ########## @@ -548,4 +610,13 @@ void increaseResourceRequirementsBy(ResourceCounter increment) { boolean isBatchSlotRequestTimeoutCheckEnabled() { return !isBatchSlotRequestTimeoutCheckDisabled; } + + @VisibleForTesting + public void tryWaitSlotRequestIsDone() { Review Comment: ```suggestion void tryWaitSlotRequestIsDone() { ``` public isn't needed. ########## flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java: ########## @@ -548,4 +610,13 @@ void increaseResourceRequirementsBy(ResourceCounter increment) { boolean isBatchSlotRequestTimeoutCheckEnabled() { return !isBatchSlotRequestTimeoutCheckDisabled; } + + @VisibleForTesting + public void tryWaitSlotRequestIsDone() { + if (getDeclarativeSlotPool() instanceof DefaultDeclarativeSlotPool) { + final DefaultDeclarativeSlotPool slotPool = + (DefaultDeclarativeSlotPool) getDeclarativeSlotPool(); + slotPool.tryWaitSlotRequestIsDone(); + } + } Review Comment: If last comment makes sense, could we move ` void tryWaitSlotRequestIsDone()` method to `DeclarativeSlotPoolBridgeTest`? Only `DeclarativeSlotPoolBridgeTest` calls `tryWaitSlotRequestIsDone()`. ########## flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java: ########## @@ -234,6 +290,11 @@ void newSlotsAreAvailable(Collection<? extends PhysicalSlot> newSlots) { } } + @VisibleForTesting + Collection<PhysicalSlot> getFreePhysicalSlots() { Review Comment: ```suggestion Collection<PhysicalSlot> getFreeSlotsInformation() { ``` nit: how about keeping it's same with interface? ########## flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingFreeSlotTracker.java: ########## @@ -151,23 +151,23 @@ public Builder setReserveSlotConsumer(Consumer<AllocationID> reserveSlotConsumer return this; } - public Builder setCreateNewFreeSlotInfoTrackerWithoutBlockedSlotsFunction( - Function<Set<AllocationID>, FreeSlotInfoTracker> - createNewFreeSlotInfoTrackerWithoutBlockedSlotsFunction) { - this.createNewFreeSlotInfoTrackerWithoutBlockedSlotsFunction = - createNewFreeSlotInfoTrackerWithoutBlockedSlotsFunction; + public Builder setCreateNewFreeSlotTrackerWithoutBlockedSlotsFunction( Review Comment: It seems this method is not called. ########## flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeTest.java: ########## @@ -55,19 +58,32 @@ void testSlotOffer() throws Exception { final PhysicalSlot allocatedSlot = createAllocatedSlot(expectedAllocationId); final TestingDeclarativeSlotPoolFactory declarativeSlotPoolFactory = - new TestingDeclarativeSlotPoolFactory(TestingDeclarativeSlotPool.builder()); + new TestingDeclarativeSlotPoolFactory( Review Comment: As I understand, the `slotBatchAllocatable` is the core feature of this PR. But I didn't find any test from `DeclarativeSlotPoolBridgeTest` cover this case, for example: Request 2 slots in total. - When slotBatchAllocatable is disabled - When the first slot is ready: we assign it directly - When the second slot is ready: we assign it directly - When slotBatchAllocatable is enabled - When the first slot is ready: we don't assign it - When the second slot is ready: we assign 2 slots together Please correct my if I miss something, thanks ########## flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/AbstractDeclarativeSlotPoolBridgeTest.java: ########## @@ -51,37 +52,53 @@ abstract class AbstractDeclarativeSlotPoolBridgeTest { @Parameter(1) protected Duration slotRequestMaxInterval; - @Parameters(name = "requestSlotMatchingStrategy: {0}, slotRequestMaxInterval: {1}") + @Parameter(2) + boolean slotBatchAllocatable; + + @Parameters( + name = + "requestSlotMatchingStrategy: {0}, slotRequestMaxInterval: {1}, slotBatchAllocatable: {2}") private static Collection<Object[]> data() { return Arrays.asList( - new Object[] {SimpleRequestSlotMatchingStrategy.INSTANCE, Duration.ZERO}, - new Object[] {SimpleRequestSlotMatchingStrategy.INSTANCE, Duration.ofMillis(50)}, + new Object[] {SimpleRequestSlotMatchingStrategy.INSTANCE, Duration.ZERO, false}, + new Object[] {SimpleRequestSlotMatchingStrategy.INSTANCE, Duration.ZERO, true}, + new Object[] { + SimpleRequestSlotMatchingStrategy.INSTANCE, Duration.ofMillis(20), false + }, + new Object[] { + SimpleRequestSlotMatchingStrategy.INSTANCE, Duration.ofMillis(20), true + }, + new Object[] { + PreferredAllocationRequestSlotMatchingStrategy.INSTANCE, Duration.ZERO, false + }, new Object[] { - PreferredAllocationRequestSlotMatchingStrategy.INSTANCE, Duration.ZERO + PreferredAllocationRequestSlotMatchingStrategy.INSTANCE, Duration.ZERO, true }, new Object[] { - PreferredAllocationRequestSlotMatchingStrategy.INSTANCE, Duration.ofMillis(50) + PreferredAllocationRequestSlotMatchingStrategy.INSTANCE, + Duration.ofMillis(20), + false + }, + new Object[] { + PreferredAllocationRequestSlotMatchingStrategy.INSTANCE, + Duration.ofMillis(20), + true }); } @Nonnull DeclarativeSlotPoolBridge createDeclarativeSlotPoolBridge( - DeclarativeSlotPoolFactory declarativeSlotPoolFactory, - RequestSlotMatchingStrategy requestSlotMatchingStrategy, - Duration slotRequestMaxInterval) { + DeclarativeSlotPoolFactory declarativeSlotPoolFactory) { return createDeclarativeSlotPoolBridge( - declarativeSlotPoolFactory, - requestSlotMatchingStrategy, - slotRequestMaxInterval, - componentMainThreadExecutor); + declarativeSlotPoolFactory, componentMainThreadExecutor, null); } @Nonnull DeclarativeSlotPoolBridge createDeclarativeSlotPoolBridge( DeclarativeSlotPoolFactory declarativeSlotPoolFactory, - RequestSlotMatchingStrategy requestSlotMatchingStrategy, - Duration slotRequestMaxInterval, - ComponentMainThreadExecutor mainThreadExecutor) { + ComponentMainThreadExecutor componentMainThreadExecutor, + DeclarativeSlotPoolBridgeResourceDeclarationTest.RequirementListener Review Comment: Is it possible that we don't pass `RequirementListener` here? Alternative solution is : All tests of DeclarativeSlotPoolBridgeResourceDeclarationTest could call `requirementListener#tryWaitSlotRequestIsDone` directly. In general, the parent class doesn't depends on the class inside of children class. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org