RocMarshal commented on code in PR #23635:
URL: https://github.com/apache/flink/pull/23635#discussion_r1432336723


##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java:
##########
@@ -118,26 +129,47 @@ public DefaultDeclarativeSlotPool(
         this.totalResourceRequirements = ResourceCounter.empty();
         this.fulfilledResourceRequirements = ResourceCounter.empty();
         this.slotToRequirementProfileMappings = new HashMap<>();
+        this.componentMainThreadExecutor = 
Preconditions.checkNotNull(componentMainThreadExecutor);
+        this.slotRequestMaxInterval = slotRequestMaxInterval;
     }
 
     @Override
     public void increaseResourceRequirementsBy(ResourceCounter increment) {
-        if (increment.isEmpty()) {
+        updateResourceRequirementsBy(
+                increment,
+                () -> totalResourceRequirements = 
totalResourceRequirements.add(increment));

Review Comment:
   @KarmaGYZ Thanks a lot for the comments.
   
   Please let me have a try on explaining it.
   
   My initial intention was to reduce code redundancy, as there are currently 
two calling functions, and the only difference between them is whether to 
reduce resource requests and increase resource requests.
   
   If the extraction of the common part is too broad, then I am very willing to 
improve it. for example:  
   ```
       @Override
       public void increaseResourceRequirementsBy(ResourceCounter increment) {
           if (increment.isEmpty()) {
               return;
           }
   
           totalResourceRequirements = totalResourceRequirements.add(increment);
           doDeclareResourceRequirements();
       }
   
       @Override
       public void decreaseResourceRequirementsBy(ResourceCounter decrement) {
           if (decrement.isEmpty()) {
               return;
           }
   
           totalResourceRequirements = 
totalResourceRequirements.subtract(decrement);
           doDeclareResourceRequirements();
       }
   
   
       private void doDeclareResourceRequirements() {
   
           if (slotRequestMaxInterval == null) {
               declareResourceRequirements();
               return;
           }
   
           if (slotRequestMaxIntervalTimeoutFuture != null
                   && !slotRequestMaxIntervalTimeoutFuture.isDone()
                   && !slotRequestMaxIntervalTimeoutFuture.isCancelled()) {
               slotRequestMaxIntervalTimeoutFuture.cancel(true);
           }
           slotRequestMaxIntervalTimeoutFuture =
                   componentMainThreadExecutor.schedule(
                           this::declareResourceRequirements,
                           slotRequestMaxInterval.toMilliseconds(),
                           TimeUnit.MILLISECONDS);
       }
   
   ```
   
   please let me know what's your opinion~ :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to