RocMarshal commented on code in PR #23635:
URL: https://github.com/apache/flink/pull/23635#discussion_r1432336723
##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java:
##########
@@ -118,26 +129,47 @@ public DefaultDeclarativeSlotPool(
this.totalResourceRequirements = ResourceCounter.empty();
this.fulfilledResourceRequirements = ResourceCounter.empty();
this.slotToRequirementProfileMappings = new HashMap<>();
+ this.componentMainThreadExecutor =
Preconditions.checkNotNull(componentMainThreadExecutor);
+ this.slotRequestMaxInterval = slotRequestMaxInterval;
}
@Override
public void increaseResourceRequirementsBy(ResourceCounter increment) {
- if (increment.isEmpty()) {
+ updateResourceRequirementsBy(
+ increment,
+ () -> totalResourceRequirements =
totalResourceRequirements.add(increment));
Review Comment:
@KarmaGYZ Thanks a lot for the comments.
Please let me have a try on explaining it.
My initial intention was to reduce code redundancy, as there are currently
two calling functions, and the only difference between them is whether to
reduce resource requests and increase resource requests.
If the extraction of the common part is too broad, then I am very willing to
improve it. for example:
```
@Override
public void increaseResourceRequirementsBy(ResourceCounter increment) {
if (increment.isEmpty()) {
return;
}
totalResourceRequirements = totalResourceRequirements.add(increment);
updateResourceRequirementsBy();
}
@Override
public void decreaseResourceRequirementsBy(ResourceCounter decrement) {
if (decrement.isEmpty()) {
return;
}
totalResourceRequirements =
totalResourceRequirements.subtract(decrement);
updateResourceRequirementsBy();
}
private void updateResourceRequirementsBy() {
if (slotRequestMaxInterval == null) {
declareResourceRequirements();
return;
}
if (slotRequestMaxIntervalTimeoutFuture != null
&& !slotRequestMaxIntervalTimeoutFuture.isDone()
&& !slotRequestMaxIntervalTimeoutFuture.isCancelled()) {
slotRequestMaxIntervalTimeoutFuture.cancel(true);
}
slotRequestMaxIntervalTimeoutFuture =
componentMainThreadExecutor.schedule(
this::declareResourceRequirements,
slotRequestMaxInterval.toMilliseconds(),
TimeUnit.MILLISECONDS);
}
```
please let me know what's your opinion~ :)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]