This is an automated email from the ASF dual-hosted git repository.

guoyangze pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f1a079f95e409d3a81ff5dd199ea3c0dc30470cc
Author: Weihua Hu <huweihua....@gmail.com>
AuthorDate: Thu Jun 29 19:20:45 2023 +0800

    [FLINK-31843][runtime] SlotSelectionStrategy use FreeSlotInfoTracker to 
select the best slot.
---
 .../jobmaster/slotpool/AllocatedSlotPool.java      |  7 +++
 .../jobmaster/slotpool/DeclarativeSlotPool.java    |  7 +++
 .../slotpool/DeclarativeSlotPoolBridge.java        |  7 +++
 .../slotpool/DefaultAllocatedSlotPool.java         | 19 ++++++++
 .../slotpool/DefaultDeclarativeSlotPool.java       |  5 +++
 ...ultLocationPreferenceSlotSelectionStrategy.java |  8 ++--
 ...OutLocationPreferenceSlotSelectionStrategy.java | 25 ++++++-----
 .../LocationPreferenceSlotSelectionStrategy.java   | 24 +++++-----
 .../slotpool/PhysicalSlotProviderImpl.java         | 17 +++++---
 .../PreviousAllocationSlotSelectionStrategy.java   | 42 +++++-------------
 .../flink/runtime/jobmaster/slotpool/SlotPool.java |  7 +++
 .../jobmaster/slotpool/SlotSelectionStrategy.java  |  8 ++--
 ...ocationPreferenceSlotSelectionStrategyTest.java | 15 +++----
 .../types/SlotSelectionStrategyTestBase.java       | 51 ++++++++++------------
 .../flink/runtime/jobmaster/JobMasterTest.java     | 11 +++++
 .../slotpool/TestingDeclarativeSlotPool.java       |  9 ++++
 .../TestingDeclarativeSlotPoolBuilder.java         |  9 ++++
 17 files changed, 166 insertions(+), 105 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AllocatedSlotPool.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AllocatedSlotPool.java
index 092963c00d1..03152e364cf 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AllocatedSlotPool.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AllocatedSlotPool.java
@@ -117,6 +117,13 @@ public interface AllocatedSlotPool {
      */
     Collection<FreeSlotInfo> getFreeSlotsInformation();
 
+    /**
+     * Returns information about all currently free slots.
+     *
+     * @return free slot information
+     */
+    FreeSlotInfoTracker getFreeSlotInfoTracker();
+
     /**
      * Returns information about all slots in this pool.
      *
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPool.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPool.java
index 9cd89064275..b4a01c9d18b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPool.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPool.java
@@ -115,6 +115,13 @@ public interface DeclarativeSlotPool {
      */
     Collection<SlotInfoWithUtilization> getFreeSlotsInformation();
 
+    /**
+     * Returns the free slot tracker.
+     *
+     * @return free slot tracker
+     */
+    FreeSlotInfoTracker getFreeSlotInfoTracker();
+
     /**
      * Returns the slot information for all slots (free and allocated slots).
      *
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java
index 3cb0d04ed19..36fb9692726 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java
@@ -439,6 +439,13 @@ public class DeclarativeSlotPoolBridge extends 
DeclarativeSlotPoolService implem
         return getDeclarativeSlotPool().getFreeSlotsInformation();
     }
 
+    @Override
+    public FreeSlotInfoTracker getFreeSlotInfoTracker() {
+        assertRunningInMainThread();
+
+        return getDeclarativeSlotPool().getFreeSlotInfoTracker();
+    }
+
     @Override
     public void disableBatchSlotRequestTimeoutCheck() {
         isBatchSlotRequestTimeoutCheckDisabled = true;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultAllocatedSlotPool.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultAllocatedSlotPool.java
index e83b84fdca2..c768054e3a3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultAllocatedSlotPool.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultAllocatedSlotPool.java
@@ -205,6 +205,25 @@ public class DefaultAllocatedSlotPool implements 
AllocatedSlotPool {
         return freeSlotInfos;
     }
 
+    private FreeSlotInfo getFreeSlotInfo(AllocationID allocationId) {
+        final AllocatedSlot allocatedSlot =
+                Preconditions.checkNotNull(registeredSlots.get(allocationId));
+        final Long idleSince =
+                
Preconditions.checkNotNull(freeSlots.getFreeSlotsSince().get(allocationId));
+        final SlotInfoWithUtilization slotInfoWithUtilization =
+                SlotInfoWithUtilization.from(allocatedSlot, 
this::getTaskExecutorUtilization);
+        return DefaultFreeSlotInfo.create(slotInfoWithUtilization, idleSince);
+    }
+
+    @Override
+    public FreeSlotInfoTracker getFreeSlotInfoTracker() {
+        return new DefaultFreeSlotInfoTracker(
+                freeSlots.getFreeSlotsSince().keySet(),
+                registeredSlots::get,
+                this::getFreeSlotInfo,
+                this::getTaskExecutorUtilization);
+    }
+
     public double getTaskExecutorUtilization(ResourceID resourceId) {
         Set<AllocationID> slots = slotsPerTaskExecutor.get(resourceId);
         Preconditions.checkNotNull(slots, "There is no slots on %s", 
resourceId);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java
index bd97f3aa7fc..82147530355 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java
@@ -570,6 +570,11 @@ public class DefaultDeclarativeSlotPool implements 
DeclarativeSlotPool {
                 .collect(Collectors.toList());
     }
 
+    @Override
+    public FreeSlotInfoTracker getFreeSlotInfoTracker() {
+        return slotPool.getFreeSlotInfoTracker();
+    }
+
     @Override
     public Collection<? extends SlotInfo> getAllSlotsInformation() {
         return slotPool.getAllSlotsInformation();
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultLocationPreferenceSlotSelectionStrategy.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultLocationPreferenceSlotSelectionStrategy.java
index a10e5609e68..62bb31518e9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultLocationPreferenceSlotSelectionStrategy.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultLocationPreferenceSlotSelectionStrategy.java
@@ -18,12 +18,13 @@
 
 package org.apache.flink.runtime.jobmaster.slotpool;
 
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
 
 import javax.annotation.Nonnull;
 
-import java.util.Collection;
 import java.util.Optional;
 import java.util.function.Supplier;
 
@@ -33,9 +34,10 @@ class DefaultLocationPreferenceSlotSelectionStrategy
     @Nonnull
     @Override
     protected Optional<SlotInfoAndLocality> selectWithoutLocationPreference(
-            @Nonnull Collection<SlotInfoWithUtilization> availableSlots,
+            @Nonnull FreeSlotInfoTracker freeSlotInfoTracker,
             @Nonnull ResourceProfile resourceProfile) {
-        for (SlotInfoWithUtilization candidate : availableSlots) {
+        for (AllocationID allocationId : 
freeSlotInfoTracker.getAvailableSlots()) {
+            SlotInfo candidate = freeSlotInfoTracker.getSlotInfo(allocationId);
             if (candidate.getResourceProfile().isMatching(resourceProfile)) {
                 return Optional.of(SlotInfoAndLocality.of(candidate, 
Locality.UNCONSTRAINED));
             }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/EvenlySpreadOutLocationPreferenceSlotSelectionStrategy.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/EvenlySpreadOutLocationPreferenceSlotSelectionStrategy.java
index fdae956c95d..be8bd1cc1c7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/EvenlySpreadOutLocationPreferenceSlotSelectionStrategy.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/EvenlySpreadOutLocationPreferenceSlotSelectionStrategy.java
@@ -18,12 +18,12 @@
 
 package org.apache.flink.runtime.jobmaster.slotpool;
 
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.jobmanager.scheduler.Locality;
 
 import javax.annotation.Nonnull;
 
-import java.util.Collection;
 import java.util.Comparator;
 import java.util.Optional;
 import java.util.function.Supplier;
@@ -33,19 +33,22 @@ class EvenlySpreadOutLocationPreferenceSlotSelectionStrategy
     @Nonnull
     @Override
     protected Optional<SlotInfoAndLocality> selectWithoutLocationPreference(
-            @Nonnull Collection<SlotInfoWithUtilization> availableSlots,
+            @Nonnull FreeSlotInfoTracker freeSlotInfoTracker,
             @Nonnull ResourceProfile resourceProfile) {
-        return availableSlots.stream()
-                .filter(
-                        slotInfoWithUtilization ->
-                                slotInfoWithUtilization
-                                        .getResourceProfile()
-                                        .isMatching(resourceProfile))
-                
.min(Comparator.comparing(SlotInfoWithUtilization::getTaskExecutorUtilization))
+        return freeSlotInfoTracker.getAvailableSlots().stream()
+                .map(freeSlotInfoTracker::getSlotInfo)
+                .filter(slotInfo -> 
slotInfo.getResourceProfile().isMatching(resourceProfile))
+                // calculate utilization first to avoid duplicated calculation 
in min()
                 .map(
-                        slotInfoWithUtilization ->
+                        slot ->
+                                new Tuple2<>(
+                                        slot, 
freeSlotInfoTracker.getTaskExecutorUtilization(slot)))
+                .min(Comparator.comparingDouble(tuple -> tuple.f1))
+                .map(
+                        slotInfoWithTaskExecutorUtilization ->
                                 SlotInfoAndLocality.of(
-                                        slotInfoWithUtilization, 
Locality.UNCONSTRAINED));
+                                        slotInfoWithTaskExecutorUtilization.f0,
+                                        Locality.UNCONSTRAINED));
     }
 
     @Override
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/LocationPreferenceSlotSelectionStrategy.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/LocationPreferenceSlotSelectionStrategy.java
index d5983ee6fa8..f811223d270 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/LocationPreferenceSlotSelectionStrategy.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/LocationPreferenceSlotSelectionStrategy.java
@@ -18,10 +18,12 @@
 
 package org.apache.flink.runtime.jobmaster.slotpool;
 
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.clusterframework.types.SlotProfile;
 import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.CollectionUtil;
 
@@ -41,12 +43,11 @@ public abstract class 
LocationPreferenceSlotSelectionStrategy implements SlotSel
 
     @Override
     public Optional<SlotInfoAndLocality> selectBestSlotForProfile(
-            @Nonnull Collection<SlotInfoWithUtilization> availableSlots,
-            @Nonnull SlotProfile slotProfile) {
+            @Nonnull FreeSlotInfoTracker freeSlotInfoTracker, @Nonnull 
SlotProfile slotProfile) {
 
         Collection<TaskManagerLocation> locationPreferences = 
slotProfile.getPreferredLocations();
 
-        if (availableSlots.isEmpty()) {
+        if (freeSlotInfoTracker.getAvailableSlots().isEmpty()) {
             return Optional.empty();
         }
 
@@ -54,14 +55,14 @@ public abstract class 
LocationPreferenceSlotSelectionStrategy implements SlotSel
 
         // if we have no location preferences, we can only filter by the 
additional requirements.
         return locationPreferences.isEmpty()
-                ? selectWithoutLocationPreference(availableSlots, 
resourceProfile)
+                ? selectWithoutLocationPreference(freeSlotInfoTracker, 
resourceProfile)
                 : selectWithLocationPreference(
-                        availableSlots, locationPreferences, resourceProfile);
+                        freeSlotInfoTracker, locationPreferences, 
resourceProfile);
     }
 
     @Nonnull
     private Optional<SlotInfoAndLocality> selectWithLocationPreference(
-            @Nonnull Collection<SlotInfoWithUtilization> availableSlots,
+            @Nonnull FreeSlotInfoTracker freeSlotInfoTracker,
             @Nonnull Collection<TaskManagerLocation> locationPreferences,
             @Nonnull ResourceProfile resourceProfile) {
 
@@ -77,11 +78,12 @@ public abstract class 
LocationPreferenceSlotSelectionStrategy implements SlotSel
             preferredFQHostNames.merge(locationPreference.getFQDNHostname(), 
1, Integer::sum);
         }
 
-        SlotInfoWithUtilization bestCandidate = null;
+        SlotInfo bestCandidate = null;
         Locality bestCandidateLocality = Locality.UNKNOWN;
         double bestCandidateScore = Double.NEGATIVE_INFINITY;
 
-        for (SlotInfoWithUtilization candidate : availableSlots) {
+        for (AllocationID allocationId : 
freeSlotInfoTracker.getAvailableSlots()) {
+            SlotInfo candidate = freeSlotInfoTracker.getSlotInfo(allocationId);
 
             if (candidate.getResourceProfile().isMatching(resourceProfile)) {
 
@@ -97,7 +99,9 @@ public abstract class LocationPreferenceSlotSelectionStrategy 
implements SlotSel
 
                 double candidateScore =
                         calculateCandidateScore(
-                                localWeigh, hostLocalWeigh, 
candidate::getTaskExecutorUtilization);
+                                localWeigh,
+                                hostLocalWeigh,
+                                () -> 
freeSlotInfoTracker.getTaskExecutorUtilization(candidate));
                 if (candidateScore > bestCandidateScore) {
                     bestCandidateScore = candidateScore;
                     bestCandidate = candidate;
@@ -117,7 +121,7 @@ public abstract class 
LocationPreferenceSlotSelectionStrategy implements SlotSel
 
     @Nonnull
     protected abstract Optional<SlotInfoAndLocality> 
selectWithoutLocationPreference(
-            @Nonnull Collection<SlotInfoWithUtilization> availableSlots,
+            @Nonnull FreeSlotInfoTracker freeSlotInfoTracker,
             @Nonnull ResourceProfile resourceProfile);
 
     protected abstract double calculateCandidateScore(
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderImpl.java
index 507aeef3473..508f72bfad2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderImpl.java
@@ -85,17 +85,20 @@ public class PhysicalSlotProviderImpl implements 
PhysicalSlotProvider {
 
     private Optional<PhysicalSlot> tryAllocateFromAvailable(
             SlotRequestId slotRequestId, SlotProfile slotProfile) {
-        Collection<SlotInfoWithUtilization> slotInfoList = 
slotPool.getAvailableSlotsInformation();
+        FreeSlotInfoTracker freeSlotInfoTracker = 
slotPool.getFreeSlotInfoTracker();
 
         Optional<SlotSelectionStrategy.SlotInfoAndLocality> 
selectedAvailableSlot =
-                slotSelectionStrategy.selectBestSlotForProfile(slotInfoList, 
slotProfile);
+                
slotSelectionStrategy.selectBestSlotForProfile(freeSlotInfoTracker, 
slotProfile);
 
         return selectedAvailableSlot.flatMap(
-                slotInfoAndLocality ->
-                        slotPool.allocateAvailableSlot(
-                                slotRequestId,
-                                
slotInfoAndLocality.getSlotInfo().getAllocationId(),
-                                slotProfile.getPhysicalSlotResourceProfile()));
+                slotInfoAndLocality -> {
+                    freeSlotInfoTracker.reserveSlot(
+                            
slotInfoAndLocality.getSlotInfo().getAllocationId());
+                    return slotPool.allocateAvailableSlot(
+                            slotRequestId,
+                            
slotInfoAndLocality.getSlotInfo().getAllocationId(),
+                            slotProfile.getPhysicalSlotResourceProfile());
+                });
     }
 
     private CompletableFuture<PhysicalSlot> requestNewSlot(
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PreviousAllocationSlotSelectionStrategy.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PreviousAllocationSlotSelectionStrategy.java
index 66e6b9da4fa..3d8f48e6eb8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PreviousAllocationSlotSelectionStrategy.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PreviousAllocationSlotSelectionStrategy.java
@@ -27,11 +27,8 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nonnull;
 
-import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.Optional;
-import java.util.Set;
 
 /**
  * This class implements a {@link SlotSelectionStrategy} that is based on 
previous allocations and
@@ -51,8 +48,7 @@ public class PreviousAllocationSlotSelectionStrategy 
implements SlotSelectionStr
 
     @Override
     public Optional<SlotInfoAndLocality> selectBestSlotForProfile(
-            @Nonnull Collection<SlotInfoWithUtilization> availableSlots,
-            @Nonnull SlotProfile slotProfile) {
+            @Nonnull FreeSlotInfoTracker freeSlotInfoTracker, @Nonnull 
SlotProfile slotProfile) {
 
         LOG.debug("Select best slot for profile {}.", slotProfile);
 
@@ -60,39 +56,21 @@ public class PreviousAllocationSlotSelectionStrategy 
implements SlotSelectionStr
 
         // First, if there was a prior allocation try to schedule to the 
same/old slot
         if (!priorAllocations.isEmpty()) {
-            for (SlotInfoWithUtilization availableSlot : availableSlots) {
-                if 
(priorAllocations.contains(availableSlot.getAllocationId())) {
-                    return Optional.of(SlotInfoAndLocality.of(availableSlot, 
Locality.LOCAL));
+            for (AllocationID availableSlot : 
freeSlotInfoTracker.getAvailableSlots()) {
+                if (priorAllocations.contains(availableSlot)) {
+                    return Optional.of(
+                            SlotInfoAndLocality.of(
+                                    
freeSlotInfoTracker.getSlotInfo(availableSlot),
+                                    Locality.LOCAL));
                 }
             }
         }
 
         // Second, select based on location preference, excluding blacklisted 
allocations
-        Set<AllocationID> blackListedAllocations = 
slotProfile.getReservedAllocations();
-        Collection<SlotInfoWithUtilization> availableAndAllowedSlots =
-                computeWithoutBlacklistedSlots(availableSlots, 
blackListedAllocations);
         return fallbackSlotSelectionStrategy.selectBestSlotForProfile(
-                availableAndAllowedSlots, slotProfile);
-    }
-
-    @Nonnull
-    private Collection<SlotInfoWithUtilization> computeWithoutBlacklistedSlots(
-            @Nonnull Collection<SlotInfoWithUtilization> availableSlots,
-            @Nonnull Set<AllocationID> blacklistedAllocations) {
-
-        if (blacklistedAllocations.isEmpty()) {
-            return Collections.unmodifiableCollection(availableSlots);
-        }
-
-        ArrayList<SlotInfoWithUtilization> availableAndAllowedSlots =
-                new ArrayList<>(availableSlots.size());
-        for (SlotInfoWithUtilization availableSlot : availableSlots) {
-            if 
(!blacklistedAllocations.contains(availableSlot.getAllocationId())) {
-                availableAndAllowedSlots.add(availableSlot);
-            }
-        }
-
-        return availableAndAllowedSlots;
+                
freeSlotInfoTracker.createNewFreeSlotInfoTrackerWithoutBlockedSlots(
+                        slotProfile.getReservedAllocations()),
+                slotProfile);
     }
 
     public static PreviousAllocationSlotSelectionStrategy create() {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
index 1bed443bef4..c9b01df298f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
@@ -124,6 +124,13 @@ public interface SlotPool extends AllocatedSlotActions, 
AutoCloseable {
      */
     Collection<SlotInfoWithUtilization> getAvailableSlotsInformation();
 
+    /**
+     * Returns all free slot tracker.
+     *
+     * @return all free slot tracker
+     */
+    FreeSlotInfoTracker getFreeSlotInfoTracker();
+
     /**
      * Returns a list of {@link SlotInfo} objects about all slots that are 
currently allocated in
      * the slot pool.
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSelectionStrategy.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSelectionStrategy.java
index 36b001016ae..968ec50791e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSelectionStrategy.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSelectionStrategy.java
@@ -24,7 +24,6 @@ import org.apache.flink.runtime.jobmaster.SlotInfo;
 
 import javax.annotation.Nonnull;
 
-import java.util.Collection;
 import java.util.Optional;
 
 /** Interface for slot selection strategies. */
@@ -35,14 +34,13 @@ public interface SlotSelectionStrategy {
      * of available slots and considering the given {@link SlotProfile} that 
describes the
      * requirements.
      *
-     * @param availableSlots a list of the available slots together with their 
remaining resources
-     *     to select from.
+     * @param freeSlotInfoTracker a list of the available slots together with 
their remaining
+     *     resources to select from.
      * @param slotProfile a slot profile, describing requirements for the slot 
selection.
      * @return the selected slot info with the corresponding locality hint.
      */
     Optional<SlotInfoAndLocality> selectBestSlotForProfile(
-            @Nonnull Collection<SlotInfoWithUtilization> availableSlots,
-            @Nonnull SlotProfile slotProfile);
+            @Nonnull FreeSlotInfoTracker freeSlotInfoTracker, @Nonnull 
SlotProfile slotProfile);
 
     /** This class is a value type that combines a {@link SlotInfo} with a 
{@link Locality} hint. */
     final class SlotInfoAndLocality {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/LocationPreferenceSlotSelectionStrategyTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/LocationPreferenceSlotSelectionStrategyTest.java
index bbae2412d44..c9fc0cd9f79 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/LocationPreferenceSlotSelectionStrategyTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/LocationPreferenceSlotSelectionStrategyTest.java
@@ -19,8 +19,9 @@
 package org.apache.flink.runtime.clusterframework.types;
 
 import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.jobmaster.slotpool.FreeSlotInfoTracker;
 import 
org.apache.flink.runtime.jobmaster.slotpool.LocationPreferenceSlotSelectionStrategy;
-import org.apache.flink.runtime.jobmaster.slotpool.SlotInfoWithUtilization;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotSelectionStrategy;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 
@@ -32,7 +33,6 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Optional;
-import java.util.Set;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -166,8 +166,7 @@ class LocationPreferenceSlotSelectionStrategyTest extends 
SlotSelectionStrategyT
     }
 
     protected static void assertMatchingSlotEqualsToSlotInfo(
-            Optional<SlotSelectionStrategy.SlotInfoAndLocality> matchingSlot,
-            SlotInfoWithUtilization slotInfo) {
+            Optional<SlotSelectionStrategy.SlotInfoAndLocality> matchingSlot, 
SlotInfo slotInfo) {
         assertThat(matchingSlot)
                 .hasValueSatisfying(
                         slotInfoAndLocality ->
@@ -177,15 +176,15 @@ class LocationPreferenceSlotSelectionStrategyTest extends 
SlotSelectionStrategyT
     protected static void assertMatchingSlotLocalityAndInCandidates(
             Optional<SlotSelectionStrategy.SlotInfoAndLocality> matchingSlot,
             Locality locality,
-            Set<SlotInfoWithUtilization> candidates) {
+            FreeSlotInfoTracker candidates) {
         assertThat(matchingSlot)
                 .hasValueSatisfying(
                         slotInfoAndLocality -> {
                             
assertThat(slotInfoAndLocality.getLocality()).isEqualTo(locality);
-                            assertThat(candidates)
+                            assertThat(candidates.getAvailableSlots())
                                     .anySatisfy(
-                                            slotInfoWithUtilization ->
-                                                    
assertThat(slotInfoWithUtilization)
+                                            allocationId ->
+                                                    
assertThat(candidates.getSlotInfo(allocationId))
                                                             .isEqualTo(
                                                                     
slotInfoAndLocality
                                                                             
.getSlotInfo()));
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/SlotSelectionStrategyTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/SlotSelectionStrategyTestBase.java
index 51cb6c96817..805209d4245 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/SlotSelectionStrategyTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/SlotSelectionStrategyTestBase.java
@@ -21,15 +21,16 @@ package org.apache.flink.runtime.clusterframework.types;
 import 
org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
 import org.apache.flink.runtime.instance.SimpleSlotContext;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
-import org.apache.flink.runtime.jobmaster.slotpool.SlotInfoWithUtilization;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.jobmaster.slotpool.FreeSlotInfoTracker;
+import 
org.apache.flink.runtime.jobmaster.slotpool.FreeSlotInfoTrackerTestUtils;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotSelectionStrategy;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 
 import java.net.InetAddress;
-import java.util.Collections;
-import java.util.HashSet;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Optional;
-import java.util.Set;
 
 /** Test base for {@link SlotSelectionStrategy}. */
 abstract class SlotSelectionStrategyTestBase {
@@ -56,35 +57,27 @@ abstract class SlotSelectionStrategyTestBase {
 
     protected final TaskManagerGateway taskManagerGateway = new 
SimpleAckingTaskManagerGateway();
 
-    protected final SlotInfoWithUtilization slotInfo1 =
-            SlotInfoWithUtilization.from(
-                    new SimpleSlotContext(aid1, tml1, 1, taskManagerGateway, 
resourceProfile),
-                    ignored -> 0.0d);
-    protected final SlotInfoWithUtilization slotInfo2 =
-            SlotInfoWithUtilization.from(
-                    new SimpleSlotContext(aid2, tml2, 2, taskManagerGateway, 
biggerResourceProfile),
-                    ignored -> 0.0d);
-    protected final SlotInfoWithUtilization slotInfo3 =
-            SlotInfoWithUtilization.from(
-                    new SimpleSlotContext(aid3, tml3, 3, taskManagerGateway, 
resourceProfile),
-                    ignored -> 0.0d);
-    protected final SlotInfoWithUtilization slotInfo4 =
-            SlotInfoWithUtilization.from(
-                    new SimpleSlotContext(aid4, tml4, 4, taskManagerGateway, 
resourceProfile),
-                    ignored -> 0.0d);
+    protected final SlotInfo slotInfo1 =
+            new SimpleSlotContext(aid1, tml1, 1, taskManagerGateway, 
resourceProfile);
+    protected final SlotInfo slotInfo2 =
+            new SimpleSlotContext(aid2, tml2, 2, taskManagerGateway, 
biggerResourceProfile);
+    protected final SlotInfo slotInfo3 =
+            new SimpleSlotContext(aid3, tml3, 3, taskManagerGateway, 
resourceProfile);
+    protected final SlotInfo slotInfo4 =
+            new SimpleSlotContext(aid4, tml4, 4, taskManagerGateway, 
resourceProfile);
 
-    protected final Set<SlotInfoWithUtilization> candidates =
-            Collections.unmodifiableSet(createCandidates());
+    protected final FreeSlotInfoTracker candidates = createCandidates();
 
     protected SlotSelectionStrategy selectionStrategy;
 
-    private Set<SlotInfoWithUtilization> createCandidates() {
-        Set<SlotInfoWithUtilization> candidates = new HashSet<>(4);
-        candidates.add(slotInfo1);
-        candidates.add(slotInfo2);
-        candidates.add(slotInfo3);
-        candidates.add(slotInfo4);
-        return candidates;
+    private FreeSlotInfoTracker createCandidates() {
+        Map<AllocationID, SlotInfo> candidates = new HashMap<>(4);
+
+        candidates.put(slotInfo1.getAllocationId(), slotInfo1);
+        candidates.put(slotInfo2.getAllocationId(), slotInfo2);
+        candidates.put(slotInfo3.getAllocationId(), slotInfo3);
+        candidates.put(slotInfo4.getAllocationId(), slotInfo4);
+        return 
FreeSlotInfoTrackerTestUtils.createDefaultFreeSlotInfoTracker(candidates);
     }
 
     protected Optional<SlotSelectionStrategy.SlotInfoAndLocality> runMatching(
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index 00ef90ac807..04dabcbb86e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -81,6 +81,8 @@ import 
org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolFactory;
+import org.apache.flink.runtime.jobmaster.slotpool.FreeSlotInfoTracker;
+import 
org.apache.flink.runtime.jobmaster.slotpool.FreeSlotInfoTrackerTestUtils;
 import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotInfoWithUtilization;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotPool;
@@ -583,6 +585,15 @@ class JobMasterTest {
             return Collections.unmodifiableCollection(allSlotInfos);
         }
 
+        @Override
+        public FreeSlotInfoTracker getFreeSlotInfoTracker() {
+            Map<AllocationID, SlotInfo> freeSlots =
+                    registeredSlots.values().stream()
+                            .flatMap(Collection::stream)
+                            
.collect(Collectors.toMap(SlotInfo::getAllocationId, s -> s));
+            return 
FreeSlotInfoTrackerTestUtils.createDefaultFreeSlotInfoTracker(freeSlots);
+        }
+
         @Override
         public Collection<SlotInfo> getAllocatedSlotsInformation() {
             return Collections.emptyList();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPool.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPool.java
index b3c49bc8cb7..4f12a42956b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPool.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPool.java
@@ -66,6 +66,8 @@ final class TestingDeclarativeSlotPool implements 
DeclarativeSlotPool {
 
     private final Supplier<Collection<SlotInfoWithUtilization>> 
getFreeSlotsInformationSupplier;
 
+    private final Supplier<FreeSlotInfoTracker> getFreeSlotInfoTrackerSupplier;
+
     private final Supplier<Collection<? extends SlotInfo>> 
getAllSlotsInformationSupplier;
 
     private final BiFunction<ResourceID, Exception, ResourceCounter> 
releaseSlotsFunction;
@@ -104,6 +106,7 @@ final class TestingDeclarativeSlotPool implements 
DeclarativeSlotPool {
                             Collection<SlotOffer>>
                     registerSlotsFunction,
             Supplier<Collection<SlotInfoWithUtilization>> 
getFreeSlotsInformationSupplier,
+            Supplier<FreeSlotInfoTracker> getFreeSlotInfoTrackerSupplier,
             Supplier<Collection<? extends SlotInfo>> 
getAllSlotsInformationSupplier,
             BiFunction<ResourceID, Exception, ResourceCounter> 
releaseSlotsFunction,
             BiFunction<AllocationID, Exception, ResourceCounter> 
releaseSlotFunction,
@@ -119,6 +122,7 @@ final class TestingDeclarativeSlotPool implements 
DeclarativeSlotPool {
         this.offerSlotsFunction = offerSlotsFunction;
         this.registerSlotsFunction = registerSlotsFunction;
         this.getFreeSlotsInformationSupplier = getFreeSlotsInformationSupplier;
+        this.getFreeSlotInfoTrackerSupplier = getFreeSlotInfoTrackerSupplier;
         this.getAllSlotsInformationSupplier = getAllSlotsInformationSupplier;
         this.releaseSlotsFunction = releaseSlotsFunction;
         this.releaseSlotFunction = releaseSlotFunction;
@@ -175,6 +179,11 @@ final class TestingDeclarativeSlotPool implements 
DeclarativeSlotPool {
         return getFreeSlotsInformationSupplier.get();
     }
 
+    @Override
+    public FreeSlotInfoTracker getFreeSlotInfoTracker() {
+        return getFreeSlotInfoTrackerSupplier.get();
+    }
+
     @Override
     public Collection<? extends SlotInfo> getAllSlotsInformation() {
         return getAllSlotsInformationSupplier.get();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPoolBuilder.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPoolBuilder.java
index 99a02324b29..fb27a5797e1 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPoolBuilder.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPoolBuilder.java
@@ -58,6 +58,8 @@ public class TestingDeclarativeSlotPoolBuilder {
             Collections::emptyList;
     private Supplier<Collection<? extends SlotInfo>> 
getAllSlotsInformationSupplier =
             Collections::emptyList;
+    private Supplier<FreeSlotInfoTracker> getFreeSlotInfoTrackerSupplier =
+            () -> TestingFreeSlotInfoTracker.newBuilder().build();
     private BiFunction<ResourceID, Exception, ResourceCounter> 
releaseSlotsFunction =
             (ignoredA, ignoredB) -> ResourceCounter.empty();
     private BiFunction<AllocationID, Exception, ResourceCounter> 
releaseSlotFunction =
@@ -133,6 +135,12 @@ public class TestingDeclarativeSlotPoolBuilder {
         return this;
     }
 
+    public TestingDeclarativeSlotPoolBuilder setGetFreeSlotInfoTrackerSupplier(
+            Supplier<FreeSlotInfoTracker> getFreeSlotInfoTrackerSupplier) {
+        this.getFreeSlotInfoTrackerSupplier = getFreeSlotInfoTrackerSupplier;
+        return this;
+    }
+
     public TestingDeclarativeSlotPoolBuilder setGetAllSlotsInformationSupplier(
             Supplier<Collection<? extends SlotInfo>> 
getAllSlotsInformationSupplier) {
         this.getAllSlotsInformationSupplier = getAllSlotsInformationSupplier;
@@ -190,6 +198,7 @@ public class TestingDeclarativeSlotPoolBuilder {
                 offerSlotsFunction,
                 registerSlotsFunction,
                 getFreeSlotsInformationSupplier,
+                getFreeSlotInfoTrackerSupplier,
                 getAllSlotsInformationSupplier,
                 releaseSlotsFunction,
                 releaseSlotFunction,


Reply via email to