This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3f25a1ae02d3a266f6bd062deb8938bacb825c97
Author: Yangze Guo <karma...@gmail.com>
AuthorDate: Mon Aug 23 11:06:22 2021 +0800

    [hotfix][runtime] Remove the lastResourceRequirementsCheck in 
FineGrainedSlotManager
---
 .../slotmanager/FineGrainedSlotManager.java        | 28 ++++++++++------------
 1 file changed, 13 insertions(+), 15 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java
index 65888b2..e93a699 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java
@@ -109,8 +109,6 @@ public class FineGrainedSlotManager implements SlotManager {
 
     @Nullable private ScheduledFuture<?> taskManagerTimeoutsCheck;
 
-    @Nullable private ScheduledFuture<?> lastResourceRequirementsCheck;
-
     @Nullable private CompletableFuture<Void> requirementsCheckFuture;
 
     /** True iff the component has been started. */
@@ -148,7 +146,6 @@ public class FineGrainedSlotManager implements SlotManager {
         resourceActions = null;
         mainThreadExecutor = null;
         taskManagerTimeoutsCheck = null;
-        lastResourceRequirementsCheck = null;
         requirementsCheckFuture = null;
 
         started = false;
@@ -229,11 +226,6 @@ public class FineGrainedSlotManager implements SlotManager 
{
             taskManagerTimeoutsCheck = null;
         }
 
-        if (lastResourceRequirementsCheck != null) {
-            lastResourceRequirementsCheck.cancel(false);
-            lastResourceRequirementsCheck = null;
-        }
-
         slotStatusSyncer.close();
         taskManagerTracker.clear();
         resourceTracker.clear();
@@ -495,11 +487,16 @@ public class FineGrainedSlotManager implements 
SlotManager {
         if (requirementsCheckFuture == null || 
requirementsCheckFuture.isDone()) {
             LOG.info("Scheduling the resource requirement check.");
             requirementsCheckFuture = new CompletableFuture<>();
-            lastResourceRequirementsCheck =
-                    scheduledExecutor.schedule(
-                            () -> 
mainThreadExecutor.execute(this::checkResourceRequirements),
-                            requirementsCheckDelay.toMilliseconds(),
-                            TimeUnit.MILLISECONDS);
+            scheduledExecutor.schedule(
+                    () ->
+                            mainThreadExecutor.execute(
+                                    () -> {
+                                        checkResourceRequirements();
+                                        
Preconditions.checkNotNull(requirementsCheckFuture)
+                                                .complete(null);
+                                    }),
+                    requirementsCheckDelay.toMilliseconds(),
+                    TimeUnit.MILLISECONDS);
         }
     }
 
@@ -507,11 +504,13 @@ public class FineGrainedSlotManager implements 
SlotManager {
      * DO NOT call this method directly. Use {@link 
#checkResourceRequirementsWithDelay()} instead.
      */
     private void checkResourceRequirements() {
+        if (!started) {
+            return;
+        }
         LOG.info("Matching resource requirements against available 
resources.");
         Map<JobID, Collection<ResourceRequirement>> missingResources =
                 resourceTracker.getMissingResources();
         if (missingResources.isEmpty()) {
-            requirementsCheckFuture.complete(null);
             return;
         }
 
@@ -555,7 +554,6 @@ public class FineGrainedSlotManager implements SlotManager {
                         jobId, resourceTracker.getAcquiredResources(jobId));
             }
         }
-        requirementsCheckFuture.complete(null);
     }
 
     private void allocateSlotsAccordingTo(Map<JobID, Map<InstanceID, 
ResourceCounter>> result) {

Reply via email to