RocMarshal commented on code in PR #23635: URL: https://github.com/apache/flink/pull/23635#discussion_r1426510597
########## flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java: ########## @@ -127,7 +155,28 @@ public void increaseResourceRequirementsBy(ResourceCounter increment) { } totalResourceRequirements = totalResourceRequirements.add(increment); + if (slotRequestMaxInterval == null) { + declareResourceRequirements(); + return; + } + + Preconditions.checkNotNull(componentMainThreadExecutor); + if (slotRequestMaxIntervalTimeoutCheckFuture != null) { + slotRequestMaxIntervalTimeoutCheckFuture.cancel(true); + } + slotRequestMaxIntervalTimeoutCheckFuture = + componentMainThreadExecutor.schedule( + this::checkSlotRequestMaxIntervalTimeout, + slotRequestMaxInterval.toMilliseconds(), + TimeUnit.MILLISECONDS); + } + + private void checkSlotRequestMaxIntervalTimeout() { + if (componentMainThreadExecutor == null || slotRequestMaxInterval == null) { + return; + } declareResourceRequirements(); + slotRequestMaxIntervalTimeoutCheckFuture = null; Review Comment: Yes, here is an illegal assignment with risk. IIUC, we only need to close the current `future` that's cancelable when increasing requirements. Otherwise, we can use the result of scheduled task to make assignment, please correct me if i'm wrong. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org