This is an automated email from the ASF dual-hosted git repository. xtsong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 3f25a1ae02d3a266f6bd062deb8938bacb825c97 Author: Yangze Guo <karma...@gmail.com> AuthorDate: Mon Aug 23 11:06:22 2021 +0800 [hotfix][runtime] Remove the lastResourceRequirementsCheck in FineGrainedSlotManager --- .../slotmanager/FineGrainedSlotManager.java | 28 ++++++++++------------ 1 file changed, 13 insertions(+), 15 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java index 65888b2..e93a699 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java @@ -109,8 +109,6 @@ public class FineGrainedSlotManager implements SlotManager { @Nullable private ScheduledFuture<?> taskManagerTimeoutsCheck; - @Nullable private ScheduledFuture<?> lastResourceRequirementsCheck; - @Nullable private CompletableFuture<Void> requirementsCheckFuture; /** True iff the component has been started. */ @@ -148,7 +146,6 @@ public class FineGrainedSlotManager implements SlotManager { resourceActions = null; mainThreadExecutor = null; taskManagerTimeoutsCheck = null; - lastResourceRequirementsCheck = null; requirementsCheckFuture = null; started = false; @@ -229,11 +226,6 @@ public class FineGrainedSlotManager implements SlotManager { taskManagerTimeoutsCheck = null; } - if (lastResourceRequirementsCheck != null) { - lastResourceRequirementsCheck.cancel(false); - lastResourceRequirementsCheck = null; - } - slotStatusSyncer.close(); taskManagerTracker.clear(); resourceTracker.clear(); @@ -495,11 +487,16 @@ public class FineGrainedSlotManager implements SlotManager { if (requirementsCheckFuture == null || requirementsCheckFuture.isDone()) { LOG.info("Scheduling the resource requirement check."); requirementsCheckFuture = new CompletableFuture<>(); - lastResourceRequirementsCheck = - scheduledExecutor.schedule( - () -> mainThreadExecutor.execute(this::checkResourceRequirements), - requirementsCheckDelay.toMilliseconds(), - TimeUnit.MILLISECONDS); + scheduledExecutor.schedule( + () -> + mainThreadExecutor.execute( + () -> { + checkResourceRequirements(); + Preconditions.checkNotNull(requirementsCheckFuture) + .complete(null); + }), + requirementsCheckDelay.toMilliseconds(), + TimeUnit.MILLISECONDS); } } @@ -507,11 +504,13 @@ public class FineGrainedSlotManager implements SlotManager { * DO NOT call this method directly. Use {@link #checkResourceRequirementsWithDelay()} instead. */ private void checkResourceRequirements() { + if (!started) { + return; + } LOG.info("Matching resource requirements against available resources."); Map<JobID, Collection<ResourceRequirement>> missingResources = resourceTracker.getMissingResources(); if (missingResources.isEmpty()) { - requirementsCheckFuture.complete(null); return; } @@ -555,7 +554,6 @@ public class FineGrainedSlotManager implements SlotManager { jobId, resourceTracker.getAcquiredResources(jobId)); } } - requirementsCheckFuture.complete(null); } private void allocateSlotsAccordingTo(Map<JobID, Map<InstanceID, ResourceCounter>> result) {