KarmaGYZ commented on code in PR #23635:
URL: https://github.com/apache/flink/pull/23635#discussion_r1426214331


##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java:
##########
@@ -200,10 +226,45 @@ protected void onReleaseTaskManager(ResourceCounter 
previouslyFulfilledRequireme
 
     @VisibleForTesting
     void newSlotsAreAvailable(Collection<? extends PhysicalSlot> newSlots) {
+        if (!globalSlotsViewable) {
+            final Collection<RequestSlotMatchingStrategy.RequestSlotMatch> 
requestSlotMatches =
+                    requestSlotMatchingStrategy.matchRequestsAndSlots(
+                            newSlots, pendingRequests.values());
+            reserveMatchedFreeSlots(requestSlotMatches);
+            fulfillMatchedSlots(requestSlotMatches);
+            return;
+        }
+
+        receivedSlots.addAll(newSlots);
+        if (receivedSlots.size() < pendingRequests.size()) {
+            return;
+        }
         final Collection<RequestSlotMatchingStrategy.RequestSlotMatch> 
requestSlotMatches =
                 requestSlotMatchingStrategy.matchRequestsAndSlots(
                         newSlots, pendingRequests.values());
+        if (requestSlotMatches.size() == pendingRequests.size()) {
+            reserveMatchedFreeSlots(requestSlotMatches);
+            fulfillMatchedSlots(requestSlotMatches);
+        }
+        receivedSlots.clear();

Review Comment:
   Should we only clear the pending slots after all the requirements are 
fulfilled?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java:
##########
@@ -127,7 +155,28 @@ public void increaseResourceRequirementsBy(ResourceCounter 
increment) {
         }
         totalResourceRequirements = totalResourceRequirements.add(increment);
 
+        if (slotRequestMaxInterval == null) {
+            declareResourceRequirements();
+            return;
+        }
+
+        Preconditions.checkNotNull(componentMainThreadExecutor);
+        if (slotRequestMaxIntervalTimeoutCheckFuture != null) {
+            slotRequestMaxIntervalTimeoutCheckFuture.cancel(true);

Review Comment:
   In current implementation, the resource requirement will be sent if there is 
no further requriement change within the `slotRequestMaxInterval`. Another 
choice is to declare requirement at most once every `slotRequestMaxInterval`. 
TBH I'm not sure which one would be better. But I think that deserves a 
discussion.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolService.java:
##########
@@ -110,7 +110,7 @@ public <T> Optional<T> castInto(Class<T> clazz) {
     }
 
     @Override
-    public final void start(

Review Comment:
   There is no need to do this. Just override the onStart.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java:
##########
@@ -127,7 +155,28 @@ public void increaseResourceRequirementsBy(ResourceCounter 
increment) {
         }
         totalResourceRequirements = totalResourceRequirements.add(increment);
 
+        if (slotRequestMaxInterval == null) {
+            declareResourceRequirements();
+            return;
+        }
+
+        Preconditions.checkNotNull(componentMainThreadExecutor);
+        if (slotRequestMaxIntervalTimeoutCheckFuture != null) {
+            slotRequestMaxIntervalTimeoutCheckFuture.cancel(true);
+        }
+        slotRequestMaxIntervalTimeoutCheckFuture =
+                componentMainThreadExecutor.schedule(
+                        this::checkSlotRequestMaxIntervalTimeout,
+                        slotRequestMaxInterval.toMilliseconds(),
+                        TimeUnit.MILLISECONDS);
+    }
+
+    private void checkSlotRequestMaxIntervalTimeout() {
+        if (componentMainThreadExecutor == null || slotRequestMaxInterval == 
null) {

Review Comment:
   I think we need to assert something went wrong instead of ignore this 
illegal statement.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java:
##########
@@ -127,7 +155,28 @@ public void increaseResourceRequirementsBy(ResourceCounter 
increment) {
         }
         totalResourceRequirements = totalResourceRequirements.add(increment);
 
+        if (slotRequestMaxInterval == null) {
+            declareResourceRequirements();
+            return;
+        }
+
+        Preconditions.checkNotNull(componentMainThreadExecutor);
+        if (slotRequestMaxIntervalTimeoutCheckFuture != null) {
+            slotRequestMaxIntervalTimeoutCheckFuture.cancel(true);
+        }
+        slotRequestMaxIntervalTimeoutCheckFuture =
+                componentMainThreadExecutor.schedule(
+                        this::checkSlotRequestMaxIntervalTimeout,
+                        slotRequestMaxInterval.toMilliseconds(),
+                        TimeUnit.MILLISECONDS);
+    }
+
+    private void checkSlotRequestMaxIntervalTimeout() {
+        if (componentMainThreadExecutor == null || slotRequestMaxInterval == 
null) {
+            return;
+        }
         declareResourceRequirements();
+        slotRequestMaxIntervalTimeoutCheckFuture = null;

Review Comment:
   Do we ensure that all the modification of this variable are happened in the 
same thread?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolService.java:
##########
@@ -142,7 +142,7 @@ protected void assertHasBeenStarted() {
     }
 
     @Override
-    public final void close() {

Review Comment:
   ditto. Override the onClose



##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java:
##########
@@ -200,10 +226,45 @@ protected void onReleaseTaskManager(ResourceCounter 
previouslyFulfilledRequireme
 
     @VisibleForTesting
     void newSlotsAreAvailable(Collection<? extends PhysicalSlot> newSlots) {
+        if (!globalSlotsViewable) {
+            final Collection<RequestSlotMatchingStrategy.RequestSlotMatch> 
requestSlotMatches =
+                    requestSlotMatchingStrategy.matchRequestsAndSlots(
+                            newSlots, pendingRequests.values());
+            reserveMatchedFreeSlots(requestSlotMatches);
+            fulfillMatchedSlots(requestSlotMatches);
+            return;
+        }
+
+        receivedSlots.addAll(newSlots);
+        if (receivedSlots.size() < pendingRequests.size()) {
+            return;
+        }
         final Collection<RequestSlotMatchingStrategy.RequestSlotMatch> 
requestSlotMatches =
                 requestSlotMatchingStrategy.matchRequestsAndSlots(
                         newSlots, pendingRequests.values());
+        if (requestSlotMatches.size() == pendingRequests.size()) {

Review Comment:
   If there are redundant useless slots, should we reserve them or release them?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to