This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d033f64b41f1d11edd22b2d70699dac7822abfe5
Author: Roc Marshal <[email protected]>
AuthorDate: Fri Aug 2 19:15:07 2024 +0800

    [FLINK-33875][refactor] Refactor FreeSlotInfoTracker to FreeSlotTracker for 
the better semantic
---
 .../jobmaster/slotpool/AllocatedSlotPool.java      |  2 +-
 .../jobmaster/slotpool/DeclarativeSlotPool.java    |  2 +-
 .../slotpool/DeclarativeSlotPoolBridge.java        |  8 +-
 .../slotpool/DeclarativeSlotPoolService.java       |  4 +-
 .../slotpool/DefaultAllocatedSlotPool.java         |  4 +-
 .../slotpool/DefaultDeclarativeSlotPool.java       |  6 +-
 ...nfoTracker.java => DefaultFreeSlotTracker.java} | 28 +++----
 ...ultLocationPreferenceSlotSelectionStrategy.java |  7 +-
 ...OutLocationPreferenceSlotSelectionStrategy.java | 12 +--
 ...eeSlotInfoTracker.java => FreeSlotTracker.java} | 17 ++--
 .../LocationPreferenceSlotSelectionStrategy.java   | 19 ++---
 .../slotpool/PhysicalSlotProviderImpl.java         |  6 +-
 .../PhysicalSlotRequestBulkCheckerImpl.java        |  2 +-
 .../PreviousAllocationSlotSelectionStrategy.java   |  9 +-
 .../flink/runtime/jobmaster/slotpool/SlotPool.java |  2 +-
 .../jobmaster/slotpool/SlotSelectionStrategy.java  |  6 +-
 .../scheduler/adaptive/AdaptiveScheduler.java      |  4 +-
 .../flink/runtime/instance/SimpleSlotContext.java  | 96 ----------------------
 .../flink/runtime/jobmaster/JobMasterTest.java     | 29 +++----
 .../slotpool/DeclarativeSlotPoolServiceTest.java   | 34 ++++----
 .../slotpool/DefaultAllocatedSlotPoolTest.java     | 23 +++---
 .../slotpool/DefaultDeclarativeSlotPoolTest.java   | 10 +--
 ...erTest.java => DefaultFreeSlotTrackerTest.java} | 55 ++++++-------
 ...estUtils.java => FreeSlotTrackerTestUtils.java} | 17 ++--
 ...ocationPreferenceSlotSelectionStrategyTest.java | 10 +--
 ...reviousAllocationSlotSelectionStrategyTest.java |  8 +-
 .../jobmaster/slotpool/SingleLogicalSlotTest.java  | 15 ++--
 .../slotpool/SlotSelectionStrategyTestBase.java    | 59 +++++++++----
 .../slotpool/TestingDeclarativeSlotPool.java       | 14 ++--
 .../TestingDeclarativeSlotPoolBuilder.java         | 17 ++--
 ...nfoTracker.java => TestingFreeSlotTracker.java} | 52 +++++-------
 .../runtime/scheduler/TestingPhysicalSlot.java     | 70 +++++++++++-----
 .../scheduler/adaptive/AdaptiveSchedulerTest.java  | 24 +++---
 .../allocator/SlotSharingSlotAllocatorTest.java    |  8 +-
 .../allocator/StateLocalitySlotAssignerTest.java   |  2 +-
 .../{TestSlotInfo.java => TestingSlot.java}        | 25 ++++--
 36 files changed, 332 insertions(+), 374 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AllocatedSlotPool.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AllocatedSlotPool.java
index 9e2c29d68da..e0a9ef365d3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AllocatedSlotPool.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AllocatedSlotPool.java
@@ -115,7 +115,7 @@ public interface AllocatedSlotPool {
      *
      * @return free slot information
      */
-    FreeSlotInfoTracker getFreeSlotInfoTracker();
+    FreeSlotTracker getFreeSlotTracker();
 
     /**
      * Returns information about all slots in this pool.
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPool.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPool.java
index 5b8921aa12f..28a6cd9dea2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPool.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPool.java
@@ -112,7 +112,7 @@ public interface DeclarativeSlotPool {
      *
      * @return free slot tracker
      */
-    FreeSlotInfoTracker getFreeSlotInfoTracker();
+    FreeSlotTracker getFreeSlotTracker();
 
     /**
      * Returns the slot information for all slots (free and allocated slots).
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java
index 38a9e1312f5..d7ea9a0cab6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java
@@ -424,7 +424,7 @@ public class DeclarativeSlotPoolBridge extends 
DeclarativeSlotPoolService implem
         final Collection<? extends SlotInfo> allSlotsInformation =
                 getDeclarativeSlotPool().getAllSlotsInformation();
         final Set<AllocationID> freeSlots =
-                
getDeclarativeSlotPool().getFreeSlotInfoTracker().getAvailableSlots();
+                
getDeclarativeSlotPool().getFreeSlotTracker().getAvailableSlots();
 
         return allSlotsInformation.stream()
                 .filter(slotInfo -> 
!freeSlots.contains(slotInfo.getAllocationId()))
@@ -432,10 +432,10 @@ public class DeclarativeSlotPoolBridge extends 
DeclarativeSlotPoolService implem
     }
 
     @Override
-    public FreeSlotInfoTracker getFreeSlotInfoTracker() {
+    public FreeSlotTracker getFreeSlotTracker() {
         assertRunningInMainThread();
 
-        return getDeclarativeSlotPool().getFreeSlotInfoTracker();
+        return getDeclarativeSlotPool().getFreeSlotTracker();
     }
 
     @Override
@@ -509,7 +509,7 @@ public class DeclarativeSlotPoolBridge extends 
DeclarativeSlotPoolService implem
 
     private Set<ResourceProfile> getResourceProfilesFromAllSlots() {
         return Stream.concat(
-                        
getFreeSlotInfoTracker().getFreeSlotsInformation().stream(),
+                        
getFreeSlotTracker().getFreeSlotsInformation().stream(),
                         getAllocatedSlotsInformation().stream())
                 .map(SlotInfo::getResourceProfile)
                 .collect(Collectors.toSet());
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolService.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolService.java
index 2345a553eb7..7ced0b70189 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolService.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolService.java
@@ -247,7 +247,7 @@ public class DeclarativeSlotPoolService implements 
SlotPoolService {
         if (isTaskManagerRegistered(taskManagerId)) {
 
             Collection<AllocationID> freeSlots =
-                    
declarativeSlotPool.getFreeSlotInfoTracker().getFreeSlotsInformation().stream()
+                    
declarativeSlotPool.getFreeSlotTracker().getFreeSlotsInformation().stream()
                             .filter(
                                     slotInfo ->
                                             slotInfo.getTaskManagerLocation()
@@ -344,6 +344,6 @@ public class DeclarativeSlotPoolService implements 
SlotPoolService {
                 "Registered TMs: %d, registered slots: %d free slots: %d",
                 registeredTaskManagers.size(),
                 declarativeSlotPool.getAllSlotsInformation().size(),
-                
declarativeSlotPool.getFreeSlotInfoTracker().getAvailableSlots().size());
+                
declarativeSlotPool.getFreeSlotTracker().getAvailableSlots().size());
     }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultAllocatedSlotPool.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultAllocatedSlotPool.java
index d3a7b21c6ee..edcae550a5d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultAllocatedSlotPool.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultAllocatedSlotPool.java
@@ -188,8 +188,8 @@ public class DefaultAllocatedSlotPool implements 
AllocatedSlotPool {
     }
 
     @Override
-    public FreeSlotInfoTracker getFreeSlotInfoTracker() {
-        return new DefaultFreeSlotInfoTracker(
+    public FreeSlotTracker getFreeSlotTracker() {
+        return new DefaultFreeSlotTracker(
                 freeSlots.getFreeSlotsSince().keySet(),
                 registeredSlots::get,
                 this::getFreeSlotInfo,
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java
index 2689aaa606f..f4db5974276 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java
@@ -519,7 +519,7 @@ public class DefaultDeclarativeSlotPool implements 
DeclarativeSlotPool {
     @Override
     public void releaseIdleSlots(long currentTimeMillis) {
         final Collection<AllocatedSlotPool.FreeSlotInfo> freeSlotsInformation =
-                
slotPool.getFreeSlotInfoTracker().getFreeSlotsWithIdleSinceInformation();
+                
slotPool.getFreeSlotTracker().getFreeSlotsWithIdleSinceInformation();
 
         ResourceCounter excessResources =
                 
fulfilledResourceRequirements.subtract(totalResourceRequirements);
@@ -599,8 +599,8 @@ public class DefaultDeclarativeSlotPool implements 
DeclarativeSlotPool {
     }
 
     @Override
-    public FreeSlotInfoTracker getFreeSlotInfoTracker() {
-        return slotPool.getFreeSlotInfoTracker();
+    public FreeSlotTracker getFreeSlotTracker() {
+        return slotPool.getFreeSlotTracker();
     }
 
     @Override
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultFreeSlotInfoTracker.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultFreeSlotTracker.java
similarity index 78%
rename from 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultFreeSlotInfoTracker.java
rename to 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultFreeSlotTracker.java
index 0a394254992..8f19866a0ad 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultFreeSlotInfoTracker.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultFreeSlotTracker.java
@@ -30,20 +30,20 @@ import java.util.Set;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
-/** Default implements of {@link FreeSlotInfoTracker}. */
-public class DefaultFreeSlotInfoTracker implements FreeSlotInfoTracker {
+/** Default implements of {@link FreeSlotTracker}. */
+public class DefaultFreeSlotTracker implements FreeSlotTracker {
     private final Set<AllocationID> freeSlots;
-    private final Function<AllocationID, SlotInfo> slotInfoLookup;
+    private final Function<AllocationID, PhysicalSlot> physicalSlotLookup;
     private final Function<AllocationID, AllocatedSlotPool.FreeSlotInfo> 
freeSlotInfoLookup;
     private final Function<ResourceID, Double> taskExecutorUtilizationLookup;
 
-    public DefaultFreeSlotInfoTracker(
+    public DefaultFreeSlotTracker(
             Set<AllocationID> freeSlots,
-            Function<AllocationID, SlotInfo> slotInfoLookup,
+            Function<AllocationID, PhysicalSlot> physicalSlotLookup,
             Function<AllocationID, AllocatedSlotPool.FreeSlotInfo> 
freeSlotInfoLookup,
             Function<ResourceID, Double> taskExecutorUtilizationLookup) {
         this.freeSlots = new HashSet<>(freeSlots);
-        this.slotInfoLookup = slotInfoLookup;
+        this.physicalSlotLookup = physicalSlotLookup;
         this.freeSlotInfoLookup = freeSlotInfoLookup;
         this.taskExecutorUtilizationLookup = taskExecutorUtilizationLookup;
     }
@@ -55,7 +55,7 @@ public class DefaultFreeSlotInfoTracker implements 
FreeSlotInfoTracker {
 
     @Override
     public SlotInfo getSlotInfo(AllocationID allocationId) {
-        return Preconditions.checkNotNull(slotInfoLookup.apply(allocationId));
+        return 
Preconditions.checkNotNull(physicalSlotLookup.apply(allocationId));
     }
 
     @Override
@@ -64,8 +64,8 @@ public class DefaultFreeSlotInfoTracker implements 
FreeSlotInfoTracker {
     }
 
     @Override
-    public Collection<SlotInfo> getFreeSlotsInformation() {
-        return 
freeSlots.stream().map(slotInfoLookup).collect(Collectors.toList());
+    public Collection<PhysicalSlot> getFreeSlotsInformation() {
+        return 
freeSlots.stream().map(physicalSlotLookup).collect(Collectors.toList());
     }
 
     @Override
@@ -83,17 +83,17 @@ public class DefaultFreeSlotInfoTracker implements 
FreeSlotInfoTracker {
     }
 
     @Override
-    public DefaultFreeSlotInfoTracker 
createNewFreeSlotInfoTrackerWithoutBlockedSlots(
+    public DefaultFreeSlotTracker createNewFreeSlotTrackerWithoutBlockedSlots(
             Set<AllocationID> blockedSlots) {
 
-        Set<AllocationID> freeSlotInfoTrackerWithoutBlockedSlots =
+        Set<AllocationID> freeSlotTrackerWithoutBlockedSlots =
                 freeSlots.stream()
                         .filter(slot -> !blockedSlots.contains(slot))
                         .collect(Collectors.toSet());
 
-        return new DefaultFreeSlotInfoTracker(
-                freeSlotInfoTrackerWithoutBlockedSlots,
-                slotInfoLookup,
+        return new DefaultFreeSlotTracker(
+                freeSlotTrackerWithoutBlockedSlots,
+                physicalSlotLookup,
                 freeSlotInfoLookup,
                 taskExecutorUtilizationLookup);
     }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultLocationPreferenceSlotSelectionStrategy.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultLocationPreferenceSlotSelectionStrategy.java
index 62bb31518e9..511cdbbcf44 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultLocationPreferenceSlotSelectionStrategy.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultLocationPreferenceSlotSelectionStrategy.java
@@ -34,10 +34,9 @@ class DefaultLocationPreferenceSlotSelectionStrategy
     @Nonnull
     @Override
     protected Optional<SlotInfoAndLocality> selectWithoutLocationPreference(
-            @Nonnull FreeSlotInfoTracker freeSlotInfoTracker,
-            @Nonnull ResourceProfile resourceProfile) {
-        for (AllocationID allocationId : 
freeSlotInfoTracker.getAvailableSlots()) {
-            SlotInfo candidate = freeSlotInfoTracker.getSlotInfo(allocationId);
+            @Nonnull FreeSlotTracker freeSlotTracker, @Nonnull ResourceProfile 
resourceProfile) {
+        for (AllocationID allocationId : freeSlotTracker.getAvailableSlots()) {
+            SlotInfo candidate = freeSlotTracker.getSlotInfo(allocationId);
             if (candidate.getResourceProfile().isMatching(resourceProfile)) {
                 return Optional.of(SlotInfoAndLocality.of(candidate, 
Locality.UNCONSTRAINED));
             }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/EvenlySpreadOutLocationPreferenceSlotSelectionStrategy.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/EvenlySpreadOutLocationPreferenceSlotSelectionStrategy.java
index be8bd1cc1c7..df834bc8953 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/EvenlySpreadOutLocationPreferenceSlotSelectionStrategy.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/EvenlySpreadOutLocationPreferenceSlotSelectionStrategy.java
@@ -33,16 +33,12 @@ class EvenlySpreadOutLocationPreferenceSlotSelectionStrategy
     @Nonnull
     @Override
     protected Optional<SlotInfoAndLocality> selectWithoutLocationPreference(
-            @Nonnull FreeSlotInfoTracker freeSlotInfoTracker,
-            @Nonnull ResourceProfile resourceProfile) {
-        return freeSlotInfoTracker.getAvailableSlots().stream()
-                .map(freeSlotInfoTracker::getSlotInfo)
+            @Nonnull FreeSlotTracker freeSlotTracker, @Nonnull ResourceProfile 
resourceProfile) {
+        return freeSlotTracker.getAvailableSlots().stream()
+                .map(freeSlotTracker::getSlotInfo)
                 .filter(slotInfo -> 
slotInfo.getResourceProfile().isMatching(resourceProfile))
                 // calculate utilization first to avoid duplicated calculation 
in min()
-                .map(
-                        slot ->
-                                new Tuple2<>(
-                                        slot, 
freeSlotInfoTracker.getTaskExecutorUtilization(slot)))
+                .map(slot -> new Tuple2<>(slot, 
freeSlotTracker.getTaskExecutorUtilization(slot)))
                 .min(Comparator.comparingDouble(tuple -> tuple.f1))
                 .map(
                         slotInfoWithTaskExecutorUtilization ->
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/FreeSlotInfoTracker.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/FreeSlotTracker.java
similarity index 82%
rename from 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/FreeSlotInfoTracker.java
rename to 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/FreeSlotTracker.java
index b685311fbd9..5800227116a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/FreeSlotInfoTracker.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/FreeSlotTracker.java
@@ -24,8 +24,8 @@ import org.apache.flink.runtime.jobmaster.SlotInfo;
 import java.util.Collection;
 import java.util.Set;
 
-/** Track all free slots, support bookkeeping slot for {@link 
SlotSelectionStrategy}. */
-public interface FreeSlotInfoTracker {
+/** Track all free slots. */
+public interface FreeSlotTracker {
 
     /**
      * Get allocation id of all available slots.
@@ -52,13 +52,13 @@ public interface FreeSlotInfoTracker {
     Collection<AllocatedSlotPool.FreeSlotInfo> 
getFreeSlotsWithIdleSinceInformation();
 
     /**
-     * Returns a list of {@link SlotInfo} objects about all slots that are 
currently available in
-     * the slot pool.
+     * Returns a list of {@link PhysicalSlot} objects about all slots that are 
currently available
+     * in the slot pool.
      *
-     * @return a list of {@link SlotInfo} objects about all slots that are 
currently available in
-     *     the slot pool.
+     * @return a list of {@link PhysicalSlot} objects about all slots that are 
currently available
+     *     in the slot pool.
      */
-    Collection<SlotInfo> getFreeSlotsInformation();
+    Collection<PhysicalSlot> getFreeSlotsInformation();
 
     /**
      * Get task executor utilization of this slot.
@@ -81,6 +81,5 @@ public interface FreeSlotInfoTracker {
      * @param blockedSlots slots that should not be used
      * @return the new free slot tracker
      */
-    FreeSlotInfoTracker createNewFreeSlotInfoTrackerWithoutBlockedSlots(
-            Set<AllocationID> blockedSlots);
+    FreeSlotTracker 
createNewFreeSlotTrackerWithoutBlockedSlots(Set<AllocationID> blockedSlots);
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/LocationPreferenceSlotSelectionStrategy.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/LocationPreferenceSlotSelectionStrategy.java
index f811223d270..6cc51590755 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/LocationPreferenceSlotSelectionStrategy.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/LocationPreferenceSlotSelectionStrategy.java
@@ -43,11 +43,11 @@ public abstract class 
LocationPreferenceSlotSelectionStrategy implements SlotSel
 
     @Override
     public Optional<SlotInfoAndLocality> selectBestSlotForProfile(
-            @Nonnull FreeSlotInfoTracker freeSlotInfoTracker, @Nonnull 
SlotProfile slotProfile) {
+            @Nonnull FreeSlotTracker freeSlotTracker, @Nonnull SlotProfile 
slotProfile) {
 
         Collection<TaskManagerLocation> locationPreferences = 
slotProfile.getPreferredLocations();
 
-        if (freeSlotInfoTracker.getAvailableSlots().isEmpty()) {
+        if (freeSlotTracker.getAvailableSlots().isEmpty()) {
             return Optional.empty();
         }
 
@@ -55,14 +55,14 @@ public abstract class 
LocationPreferenceSlotSelectionStrategy implements SlotSel
 
         // if we have no location preferences, we can only filter by the 
additional requirements.
         return locationPreferences.isEmpty()
-                ? selectWithoutLocationPreference(freeSlotInfoTracker, 
resourceProfile)
+                ? selectWithoutLocationPreference(freeSlotTracker, 
resourceProfile)
                 : selectWithLocationPreference(
-                        freeSlotInfoTracker, locationPreferences, 
resourceProfile);
+                        freeSlotTracker, locationPreferences, resourceProfile);
     }
 
     @Nonnull
     private Optional<SlotInfoAndLocality> selectWithLocationPreference(
-            @Nonnull FreeSlotInfoTracker freeSlotInfoTracker,
+            @Nonnull FreeSlotTracker freeSlotTracker,
             @Nonnull Collection<TaskManagerLocation> locationPreferences,
             @Nonnull ResourceProfile resourceProfile) {
 
@@ -82,8 +82,8 @@ public abstract class LocationPreferenceSlotSelectionStrategy 
implements SlotSel
         Locality bestCandidateLocality = Locality.UNKNOWN;
         double bestCandidateScore = Double.NEGATIVE_INFINITY;
 
-        for (AllocationID allocationId : 
freeSlotInfoTracker.getAvailableSlots()) {
-            SlotInfo candidate = freeSlotInfoTracker.getSlotInfo(allocationId);
+        for (AllocationID allocationId : freeSlotTracker.getAvailableSlots()) {
+            SlotInfo candidate = freeSlotTracker.getSlotInfo(allocationId);
 
             if (candidate.getResourceProfile().isMatching(resourceProfile)) {
 
@@ -101,7 +101,7 @@ public abstract class 
LocationPreferenceSlotSelectionStrategy implements SlotSel
                         calculateCandidateScore(
                                 localWeigh,
                                 hostLocalWeigh,
-                                () -> 
freeSlotInfoTracker.getTaskExecutorUtilization(candidate));
+                                () -> 
freeSlotTracker.getTaskExecutorUtilization(candidate));
                 if (candidateScore > bestCandidateScore) {
                     bestCandidateScore = candidateScore;
                     bestCandidate = candidate;
@@ -121,8 +121,7 @@ public abstract class 
LocationPreferenceSlotSelectionStrategy implements SlotSel
 
     @Nonnull
     protected abstract Optional<SlotInfoAndLocality> 
selectWithoutLocationPreference(
-            @Nonnull FreeSlotInfoTracker freeSlotInfoTracker,
-            @Nonnull ResourceProfile resourceProfile);
+            @Nonnull FreeSlotTracker freeSlotTracker, @Nonnull ResourceProfile 
resourceProfile);
 
     protected abstract double calculateCandidateScore(
             int localWeigh, int hostLocalWeigh, Supplier<Double> 
taskExecutorUtilizationSupplier);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderImpl.java
index b6c80e2138c..c2d9ce19db7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderImpl.java
@@ -110,18 +110,18 @@ public class PhysicalSlotProviderImpl implements 
PhysicalSlotProvider {
 
     private Map<SlotRequestId, Optional<PhysicalSlot>> 
tryAllocateFromAvailable(
             Collection<PhysicalSlotRequest> slotRequests) {
-        FreeSlotInfoTracker freeSlotInfoTracker = 
slotPool.getFreeSlotInfoTracker();
+        FreeSlotTracker freeSlotTracker = slotPool.getFreeSlotTracker();
 
         Map<SlotRequestId, Optional<PhysicalSlot>> allocateResult = new 
HashMap<>();
         for (PhysicalSlotRequest request : slotRequests) {
             Optional<SlotSelectionStrategy.SlotInfoAndLocality> slot =
                     slotSelectionStrategy.selectBestSlotForProfile(
-                            freeSlotInfoTracker, request.getSlotProfile());
+                            freeSlotTracker, request.getSlotProfile());
             allocateResult.put(
                     request.getSlotRequestId(),
                     slot.flatMap(
                             slotInfoAndLocality -> {
-                                freeSlotInfoTracker.reserveSlot(
+                                freeSlotTracker.reserveSlot(
                                         
slotInfoAndLocality.getSlotInfo().getAllocationId());
                                 return slotPool.allocateAvailableSlot(
                                         request.getSlotRequestId(),
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequestBulkCheckerImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequestBulkCheckerImpl.java
index 28a2a906767..5cac029fad6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequestBulkCheckerImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequestBulkCheckerImpl.java
@@ -200,7 +200,7 @@ public class PhysicalSlotRequestBulkCheckerImpl implements 
PhysicalSlotRequestBu
 
     private static Set<SlotInfo> getAllSlotInfos(SlotPool slotPool) {
         return Stream.concat(
-                        
slotPool.getFreeSlotInfoTracker().getFreeSlotsInformation().stream(),
+                        
slotPool.getFreeSlotTracker().getFreeSlotsInformation().stream(),
                         slotPool.getAllocatedSlotsInformation().stream())
                 .collect(Collectors.toSet());
     }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PreviousAllocationSlotSelectionStrategy.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PreviousAllocationSlotSelectionStrategy.java
index 3d8f48e6eb8..41c606dac17 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PreviousAllocationSlotSelectionStrategy.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PreviousAllocationSlotSelectionStrategy.java
@@ -48,7 +48,7 @@ public class PreviousAllocationSlotSelectionStrategy 
implements SlotSelectionStr
 
     @Override
     public Optional<SlotInfoAndLocality> selectBestSlotForProfile(
-            @Nonnull FreeSlotInfoTracker freeSlotInfoTracker, @Nonnull 
SlotProfile slotProfile) {
+            @Nonnull FreeSlotTracker freeSlotTracker, @Nonnull SlotProfile 
slotProfile) {
 
         LOG.debug("Select best slot for profile {}.", slotProfile);
 
@@ -56,19 +56,18 @@ public class PreviousAllocationSlotSelectionStrategy 
implements SlotSelectionStr
 
         // First, if there was a prior allocation try to schedule to the 
same/old slot
         if (!priorAllocations.isEmpty()) {
-            for (AllocationID availableSlot : 
freeSlotInfoTracker.getAvailableSlots()) {
+            for (AllocationID availableSlot : 
freeSlotTracker.getAvailableSlots()) {
                 if (priorAllocations.contains(availableSlot)) {
                     return Optional.of(
                             SlotInfoAndLocality.of(
-                                    
freeSlotInfoTracker.getSlotInfo(availableSlot),
-                                    Locality.LOCAL));
+                                    
freeSlotTracker.getSlotInfo(availableSlot), Locality.LOCAL));
                 }
             }
         }
 
         // Second, select based on location preference, excluding blacklisted 
allocations
         return fallbackSlotSelectionStrategy.selectBestSlotForProfile(
-                
freeSlotInfoTracker.createNewFreeSlotInfoTrackerWithoutBlockedSlots(
+                freeSlotTracker.createNewFreeSlotTrackerWithoutBlockedSlots(
                         slotProfile.getReservedAllocations()),
                 slotProfile);
     }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
index 1b5cba7fcf8..3073ab0c0e7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
@@ -115,7 +115,7 @@ public interface SlotPool extends AllocatedSlotActions, 
AutoCloseable {
      *
      * @return all free slot tracker
      */
-    FreeSlotInfoTracker getFreeSlotInfoTracker();
+    FreeSlotTracker getFreeSlotTracker();
 
     /**
      * Returns a list of {@link SlotInfo} objects about all slots that are 
currently allocated in
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSelectionStrategy.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSelectionStrategy.java
index 968ec50791e..136e5f2a931 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSelectionStrategy.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSelectionStrategy.java
@@ -34,13 +34,13 @@ public interface SlotSelectionStrategy {
      * of available slots and considering the given {@link SlotProfile} that 
describes the
      * requirements.
      *
-     * @param freeSlotInfoTracker a list of the available slots together with 
their remaining
-     *     resources to select from.
+     * @param freeSlotTracker a list of the available slots together with 
their remaining resources
+     *     to select from.
      * @param slotProfile a slot profile, describing requirements for the slot 
selection.
      * @return the selected slot info with the corresponding locality hint.
      */
     Optional<SlotInfoAndLocality> selectBestSlotForProfile(
-            @Nonnull FreeSlotInfoTracker freeSlotInfoTracker, @Nonnull 
SlotProfile slotProfile);
+            @Nonnull FreeSlotTracker freeSlotTracker, @Nonnull SlotProfile 
slotProfile);
 
     /** This class is a value type that combines a {@link SlotInfo} with a 
{@link Locality} hint. */
     final class SlotInfoAndLocality {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
index fb6344b31df..2fd7b694b73 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
@@ -1073,7 +1073,7 @@ public class AdaptiveScheduler
     @Override
     public boolean hasDesiredResources() {
         final Collection<? extends SlotInfo> freeSlots =
-                
declarativeSlotPool.getFreeSlotInfoTracker().getFreeSlotsInformation();
+                
declarativeSlotPool.getFreeSlotTracker().getFreeSlotsInformation();
         return hasDesiredResources(desiredResources, freeSlots);
     }
 
@@ -1111,7 +1111,7 @@ public class AdaptiveScheduler
         return slotAllocator
                 .determineParallelismAndCalculateAssignment(
                         jobInformation,
-                        
declarativeSlotPool.getFreeSlotInfoTracker().getFreeSlotsInformation(),
+                        
declarativeSlotPool.getFreeSlotTracker().getFreeSlotsInformation(),
                         
JobAllocationsInformation.fromGraph(previousExecutionGraph))
                 .orElseThrow(
                         () ->
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotContext.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotContext.java
deleted file mode 100644
index 9c477028a4e..00000000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotContext.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.instance;
-
-import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
-import org.apache.flink.runtime.jobmaster.SlotContext;
-import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-import org.apache.flink.util.Preconditions;
-
-/** Simple implementation of the {@link SlotContext} interface for the legacy 
code. */
-public class SimpleSlotContext implements SlotContext {
-
-    private final AllocationID allocationId;
-
-    private final TaskManagerLocation taskManagerLocation;
-
-    private final int physicalSlotNumber;
-
-    private final TaskManagerGateway taskManagerGateway;
-
-    private final ResourceProfile resourceProfile;
-
-    public SimpleSlotContext(
-            AllocationID allocationId,
-            TaskManagerLocation taskManagerLocation,
-            int physicalSlotNumber,
-            TaskManagerGateway taskManagerGateway) {
-        this(
-                allocationId,
-                taskManagerLocation,
-                physicalSlotNumber,
-                taskManagerGateway,
-                ResourceProfile.ANY);
-    }
-
-    public SimpleSlotContext(
-            AllocationID allocationId,
-            TaskManagerLocation taskManagerLocation,
-            int physicalSlotNumber,
-            TaskManagerGateway taskManagerGateway,
-            ResourceProfile resourceProfile) {
-        this.allocationId = Preconditions.checkNotNull(allocationId);
-        this.taskManagerLocation = 
Preconditions.checkNotNull(taskManagerLocation);
-        this.physicalSlotNumber = physicalSlotNumber;
-        this.taskManagerGateway = 
Preconditions.checkNotNull(taskManagerGateway);
-        this.resourceProfile = resourceProfile;
-    }
-
-    @Override
-    public AllocationID getAllocationId() {
-        return allocationId;
-    }
-
-    @Override
-    public TaskManagerLocation getTaskManagerLocation() {
-        return taskManagerLocation;
-    }
-
-    @Override
-    public int getPhysicalSlotNumber() {
-        return physicalSlotNumber;
-    }
-
-    @Override
-    public TaskManagerGateway getTaskManagerGateway() {
-        return taskManagerGateway;
-    }
-
-    @Override
-    public ResourceProfile getResourceProfile() {
-        return resourceProfile;
-    }
-
-    @Override
-    public boolean willBeOccupiedIndefinitely() {
-        return true;
-    }
-}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index 4e724fb9cb0..7bfc1cbf54a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -66,7 +66,6 @@ import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.heartbeat.HeartbeatServicesImpl;
 import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
 import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
-import org.apache.flink.runtime.instance.SimpleSlotContext;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import 
org.apache.flink.runtime.io.network.partition.TestingJobMasterPartitionTracker;
@@ -83,8 +82,8 @@ import 
org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolFactory;
-import org.apache.flink.runtime.jobmaster.slotpool.FreeSlotInfoTracker;
-import 
org.apache.flink.runtime.jobmaster.slotpool.FreeSlotInfoTrackerTestUtils;
+import org.apache.flink.runtime.jobmaster.slotpool.FreeSlotTracker;
+import org.apache.flink.runtime.jobmaster.slotpool.FreeSlotTrackerTestUtils;
 import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotPool;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolService;
@@ -103,6 +102,7 @@ import 
org.apache.flink.runtime.rpc.exceptions.RecipientUnreachableException;
 import org.apache.flink.runtime.scheduler.DefaultSchedulerFactory;
 import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
 import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
+import org.apache.flink.runtime.scheduler.TestingPhysicalSlot;
 import org.apache.flink.runtime.scheduler.TestingSchedulerNG;
 import org.apache.flink.runtime.scheduler.TestingSchedulerNGFactory;
 import org.apache.flink.runtime.shuffle.DefaultPartitionWithMetrics;
@@ -498,7 +498,7 @@ class JobMasterTest {
 
         private final OneShotLatch hasReceivedSlotOffers;
 
-        private final Map<ResourceID, Collection<SlotInfo>> registeredSlots;
+        private final Map<ResourceID, Collection<PhysicalSlot>> 
registeredSlots;
 
         private TestingSlotPool(JobID jobId, OneShotLatch 
hasReceivedSlotOffers) {
             this.jobId = jobId;
@@ -554,7 +554,7 @@ class JobMasterTest {
                 TaskManagerGateway taskManagerGateway,
                 Collection<SlotOffer> offers) {
             hasReceivedSlotOffers.trigger();
-            final Collection<SlotInfo> slotInfos =
+            final Collection<PhysicalSlot> slotInfos =
                     
Optional.ofNullable(registeredSlots.get(taskManagerLocation.getResourceID()))
                             .orElseThrow(
                                     () -> new 
FlinkRuntimeException("TaskManager not registered."));
@@ -563,11 +563,12 @@ class JobMasterTest {
 
             for (SlotOffer offer : offers) {
                 slotInfos.add(
-                        new SimpleSlotContext(
-                                offer.getAllocationId(),
-                                taskManagerLocation,
-                                slotIndex,
-                                taskManagerGateway));
+                        TestingPhysicalSlot.builder()
+                                .withAllocationID(offer.getAllocationId())
+                                .withTaskManagerLocation(taskManagerLocation)
+                                .withTaskManagerGateway(taskManagerGateway)
+                                .withPhysicalSlotNumber(slotIndex)
+                                .build());
                 slotIndex++;
             }
 
@@ -582,12 +583,12 @@ class JobMasterTest {
         }
 
         @Override
-        public FreeSlotInfoTracker getFreeSlotInfoTracker() {
-            Map<AllocationID, SlotInfo> freeSlots =
+        public FreeSlotTracker getFreeSlotTracker() {
+            Map<AllocationID, PhysicalSlot> freeSlots =
                     registeredSlots.values().stream()
                             .flatMap(Collection::stream)
                             
.collect(Collectors.toMap(SlotInfo::getAllocationId, s -> s));
-            return 
FreeSlotInfoTrackerTestUtils.createDefaultFreeSlotInfoTracker(freeSlots);
+            return 
FreeSlotTrackerTestUtils.createDefaultFreeSlotTracker(freeSlots);
         }
 
         @Override
@@ -630,7 +631,7 @@ class JobMasterTest {
 
         @Override
         public AllocatedSlotReport createAllocatedSlotReport(ResourceID 
taskManagerId) {
-            final Collection<SlotInfo> slotInfos =
+            final Collection<PhysicalSlot> slotInfos =
                     registeredSlots.getOrDefault(taskManagerId, 
Collections.emptyList());
 
             final List<AllocatedSlotInfo> allocatedSlotInfos =
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolServiceTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolServiceTest.java
index 7542f045f4d..bea131c6c10 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolServiceTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolServiceTest.java
@@ -23,13 +23,13 @@ import 
org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
-import org.apache.flink.runtime.instance.SimpleSlotContext;
 import org.apache.flink.runtime.jobmaster.AllocatedSlotReport;
 import org.apache.flink.runtime.jobmaster.JobMasterId;
 import org.apache.flink.runtime.jobmaster.RpcTaskManagerGateway;
 import org.apache.flink.runtime.jobmaster.SlotInfo;
 import org.apache.flink.runtime.messages.Acknowledge;
 import 
org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
+import org.apache.flink.runtime.scheduler.TestingPhysicalSlot;
 import org.apache.flink.runtime.slots.ResourceRequirement;
 import org.apache.flink.runtime.slots.ResourceRequirements;
 import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
@@ -209,9 +209,11 @@ class DeclarativeSlotPoolServiceTest {
     void testCreateAllocatedSlotReport() throws Exception {
         final LocalTaskManagerLocation taskManagerLocation1 = new 
LocalTaskManagerLocation();
         final LocalTaskManagerLocation taskManagerLocation2 = new 
LocalTaskManagerLocation();
-        final SimpleSlotContext simpleSlotContext2 = 
createSimpleSlotContext(taskManagerLocation2);
+        final TestingPhysicalSlot testingPhysicalSlot2 =
+                createTestingPhysicalSlot(taskManagerLocation2);
         final Collection<SlotInfo> slotInfos =
-                Arrays.asList(createSimpleSlotContext(taskManagerLocation1), 
simpleSlotContext2);
+                Arrays.asList(
+                        createTestingPhysicalSlot(taskManagerLocation1), 
testingPhysicalSlot2);
         try (DeclarativeSlotPoolService declarativeSlotPoolService =
                 createDeclarativeSlotPoolService(
                         new TestingDeclarativeSlotPoolFactory(
@@ -226,9 +228,10 @@ class DeclarativeSlotPoolServiceTest {
                     .allMatch(
                             context ->
                                     context.getAllocationId()
-                                                    
.equals(simpleSlotContext2.getAllocationId())
+                                                    
.equals(testingPhysicalSlot2.getAllocationId())
                                             && context.getSlotIndex()
-                                                    == 
simpleSlotContext2.getPhysicalSlotNumber());
+                                                    == testingPhysicalSlot2
+                                                            
.getPhysicalSlotNumber());
         }
     }
 
@@ -337,7 +340,7 @@ class DeclarativeSlotPoolServiceTest {
             slotPoolService.releaseFreeSlotsOnTaskManager(
                     taskManagerLocation.getResourceID(), new 
FlinkException("Test cause"));
 
-            
assertThat(slotPool.getFreeSlotInfoTracker().getAvailableSlots()).isEmpty();
+            
assertThat(slotPool.getFreeSlotTracker().getAvailableSlots()).isEmpty();
             assertThat(
                             
Iterables.getOnlyElement(slotPool.getAllSlotsInformation())
                                     .getAllocationId())
@@ -369,14 +372,17 @@ class DeclarativeSlotPoolServiceTest {
     }
 
     @Nonnull
-    private SimpleSlotContext createSimpleSlotContext(
+    private TestingPhysicalSlot createTestingPhysicalSlot(
             LocalTaskManagerLocation taskManagerLocation1) {
-        return new SimpleSlotContext(
-                new AllocationID(),
-                taskManagerLocation1,
-                0,
-                new RpcTaskManagerGateway(
-                        new 
TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway(),
-                        jobMasterId));
+        return TestingPhysicalSlot.builder()
+                .withAllocationID(new AllocationID())
+                .withTaskManagerLocation(taskManagerLocation1)
+                .withPhysicalSlotNumber(0)
+                .withTaskManagerGateway(
+                        new RpcTaskManagerGateway(
+                                new TestingTaskExecutorGatewayBuilder()
+                                        .createTestingTaskExecutorGateway(),
+                                jobMasterId))
+                .build();
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultAllocatedSlotPoolTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultAllocatedSlotPoolTest.java
index 0832aa983b0..49ae727f65d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultAllocatedSlotPoolTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultAllocatedSlotPoolTest.java
@@ -187,7 +187,7 @@ class DefaultAllocatedSlotPoolTest {
         assertSlotPoolContainsFreeSlots(slotPool, slots);
 
         for (AllocatedSlotPool.FreeSlotInfo freeSlotInfo :
-                
slotPool.getFreeSlotInfoTracker().getFreeSlotsWithIdleSinceInformation()) {
+                
slotPool.getFreeSlotTracker().getFreeSlotsWithIdleSinceInformation()) {
             final long time;
             if (freeSlotInfo.getAllocationId().equals(slot.getAllocationId())) 
{
                 time = releaseTime;
@@ -210,7 +210,7 @@ class DefaultAllocatedSlotPoolTest {
 
         final AllocatedSlotPool.FreeSlotInfo freeSlotInfo =
                 Iterables.getOnlyElement(
-                        
slotPool.getFreeSlotInfoTracker().getFreeSlotsWithIdleSinceInformation());
+                        
slotPool.getFreeSlotTracker().getFreeSlotsWithIdleSinceInformation());
 
         assertThat(freeSlotInfo.getFreeSince()).isEqualTo(0L);
     }
@@ -223,30 +223,29 @@ class DefaultAllocatedSlotPoolTest {
 
         slotPool.addSlots(slots, 0);
 
-        FreeSlotInfoTracker freeSlotInfoTracker = 
slotPool.getFreeSlotInfoTracker();
+        FreeSlotTracker freeSlotTracker = slotPool.getFreeSlotTracker();
 
-        assertThat(freeSlotInfoTracker.getAvailableSlots())
+        assertThat(freeSlotTracker.getAvailableSlots())
                 .allSatisfy(
                         allocationId ->
                                 assertThat(
-                                                
freeSlotInfoTracker.getTaskExecutorUtilization(
-                                                        
freeSlotInfoTracker.getSlotInfo(
-                                                                allocationId)))
+                                                
freeSlotTracker.getTaskExecutorUtilization(
+                                                        
freeSlotTracker.getSlotInfo(allocationId)))
                                         .isCloseTo(0, offset(0.1)));
 
         int numAllocatedSlots = 0;
         for (AllocatedSlot slot : slots) {
             
assertThat(slotPool.reserveFreeSlot(slot.getAllocationId())).isEqualTo(slot);
-            freeSlotInfoTracker.reserveSlot(slot.getAllocationId());
+            freeSlotTracker.reserveSlot(slot.getAllocationId());
             numAllocatedSlots++;
             final double utilization = (double) numAllocatedSlots / 
slots.size();
 
-            assertThat(freeSlotInfoTracker.getAvailableSlots())
+            assertThat(freeSlotTracker.getAvailableSlots())
                     .allSatisfy(
                             allocationId ->
                                     assertThat(
-                                                    
freeSlotInfoTracker.getTaskExecutorUtilization(
-                                                            
freeSlotInfoTracker.getSlotInfo(
+                                                    
freeSlotTracker.getTaskExecutorUtilization(
+                                                            
freeSlotTracker.getSlotInfo(
                                                                     
allocationId)))
                                             .isCloseTo(utilization, 
offset(0.1)));
         }
@@ -307,7 +306,7 @@ class DefaultAllocatedSlotPoolTest {
     private void assertSlotPoolContainsFreeSlots(
             DefaultAllocatedSlotPool slotPool, Collection<AllocatedSlot> 
allocatedSlots) {
         final Collection<AllocatedSlotPool.FreeSlotInfo> freeSlotsInformation =
-                
slotPool.getFreeSlotInfoTracker().getFreeSlotsWithIdleSinceInformation();
+                
slotPool.getFreeSlotTracker().getFreeSlotsWithIdleSinceInformation();
 
         assertThat(freeSlotsInformation).hasSize(allocatedSlots.size());
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolTest.java
index af3e9b66695..7ab4a5d5948 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolTest.java
@@ -605,8 +605,8 @@ class DefaultDeclarativeSlotPoolTest extends 
DefaultDeclarativeSlotPoolTestBase
                 createSlotOffersForResourceRequirements(
                         ResourceCounter.withResource(ResourceProfile.ANY, 1)));
 
-        final SlotInfo slot =
-                
slotPool.getFreeSlotInfoTracker().getFreeSlotsInformation().iterator().next();
+        final PhysicalSlot slot =
+                
slotPool.getFreeSlotTracker().getFreeSlotsInformation().iterator().next();
 
         slotPool.reserveFreeSlot(slot.getAllocationId(), largeResourceProfile);
         assertThat(
@@ -659,8 +659,8 @@ class DefaultDeclarativeSlotPoolTest extends 
DefaultDeclarativeSlotPoolTestBase
 
         slotPool.tryWaitSlotRequestIsDone();
 
-        final SlotInfo largeSlot =
-                
slotPool.getFreeSlotInfoTracker().getFreeSlotsInformation().stream()
+        final PhysicalSlot largeSlot =
+                
slotPool.getFreeSlotTracker().getFreeSlotsInformation().stream()
                         .filter(slot -> 
slot.getResourceProfile().equals(largeResourceProfile))
                         .findFirst()
                         .get();
@@ -722,7 +722,7 @@ class DefaultDeclarativeSlotPoolTest extends 
DefaultDeclarativeSlotPoolTestBase
                 0L);
 
         final AllocationID allocationId =
-                
slotPool.getFreeSlotInfoTracker().getAvailableSlots().iterator().next();
+                
slotPool.getFreeSlotTracker().getAvailableSlots().iterator().next();
 
         assertThat(slotPool.getResourceRequirements()).isEmpty();
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultFreeSlotInfoTrackerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultFreeSlotTrackerTest.java
similarity index 56%
rename from 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultFreeSlotInfoTrackerTest.java
rename to 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultFreeSlotTrackerTest.java
index 58218e318f6..a30a8dc87be 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultFreeSlotInfoTrackerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultFreeSlotTrackerTest.java
@@ -37,53 +37,52 @@ import java.util.Map;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
-/** Tests for {@link FreeSlotInfoTracker}. */
-class DefaultFreeSlotInfoTrackerTest {
+/** Tests for {@link DefaultFreeSlotTracker}. */
+class DefaultFreeSlotTrackerTest {
 
     @Test
     void testReserveSlot() {
         final ResourceID resourceId = ResourceID.generate();
-        final SlotInfo slotInfo1 = createAllocatedSlot(resourceId);
-        final SlotInfo slotInfo2 = createAllocatedSlot(resourceId);
-        final Map<AllocationID, SlotInfo> slots = new HashMap<>();
+        final PhysicalSlot slot1 = createAllocatedSlot(resourceId);
+        final PhysicalSlot slot2 = createAllocatedSlot(resourceId);
+        final Map<AllocationID, PhysicalSlot> slots = new HashMap<>();
 
-        slots.put(slotInfo1.getAllocationId(), slotInfo1);
-        slots.put(slotInfo2.getAllocationId(), slotInfo2);
+        slots.put(slot1.getAllocationId(), slot1);
+        slots.put(slot2.getAllocationId(), slot2);
 
-        final FreeSlotInfoTracker freeSlotInfoTracker =
-                
FreeSlotInfoTrackerTestUtils.createDefaultFreeSlotInfoTracker(slots);
-        for (AllocationID candidate : freeSlotInfoTracker.getAvailableSlots()) 
{
-            SlotInfo selectSlot = freeSlotInfoTracker.getSlotInfo(candidate);
+        final FreeSlotTracker freeSlotTracker =
+                FreeSlotTrackerTestUtils.createDefaultFreeSlotTracker(slots);
+        for (AllocationID candidate : freeSlotTracker.getAvailableSlots()) {
+            SlotInfo selectSlot = freeSlotTracker.getSlotInfo(candidate);
             
assertThat(slots.get(selectSlot.getAllocationId())).isEqualTo(selectSlot);
-            freeSlotInfoTracker.reserveSlot(selectSlot.getAllocationId());
+            freeSlotTracker.reserveSlot(selectSlot.getAllocationId());
             break;
         }
 
-        assertThat(freeSlotInfoTracker.getAvailableSlots())
+        assertThat(freeSlotTracker.getAvailableSlots())
                 .hasSize(1)
-                .containsAnyOf(slotInfo1.getAllocationId(), 
slotInfo2.getAllocationId());
+                .containsAnyOf(slot1.getAllocationId(), 
slot2.getAllocationId());
     }
 
     @Test
-    void testCreatedFreeSlotInfoTrackerWithoutBlockedSlots() {
+    void testCreatedFreeSlotTrackerWithoutBlockedSlots() {
         final ResourceID resourceId = ResourceID.generate();
-        final SlotInfo slotInfo1 = createAllocatedSlot(resourceId);
-        final SlotInfo slotInfo2 = createAllocatedSlot(resourceId);
-        final Map<AllocationID, SlotInfo> slots = new HashMap<>();
+        final PhysicalSlot slot1 = createAllocatedSlot(resourceId);
+        final PhysicalSlot slot2 = createAllocatedSlot(resourceId);
+        final Map<AllocationID, PhysicalSlot> slots = new HashMap<>();
 
-        slots.put(slotInfo1.getAllocationId(), slotInfo1);
-        slots.put(slotInfo2.getAllocationId(), slotInfo2);
+        slots.put(slot1.getAllocationId(), slot1);
+        slots.put(slot2.getAllocationId(), slot2);
 
-        final FreeSlotInfoTracker freeSlotInfoTracker =
-                
FreeSlotInfoTrackerTestUtils.createDefaultFreeSlotInfoTracker(slots);
-        assertThat(freeSlotInfoTracker.getAvailableSlots()).hasSize(2);
+        final FreeSlotTracker freeSlotTracker =
+                FreeSlotTrackerTestUtils.createDefaultFreeSlotTracker(slots);
+        assertThat(freeSlotTracker.getAvailableSlots()).hasSize(2);
 
-        final FreeSlotInfoTracker freeSlotInfoTrackerWithoutBlockedSlots =
-                
freeSlotInfoTracker.createNewFreeSlotInfoTrackerWithoutBlockedSlots(
+        final FreeSlotTracker freeSlotTrackerWithoutBlockedSlots =
+                freeSlotTracker.createNewFreeSlotTrackerWithoutBlockedSlots(
                         new HashSet<>(
-                                Arrays.asList(
-                                        slotInfo1.getAllocationId(), 
slotInfo2.getAllocationId())));
-        
assertThat(freeSlotInfoTrackerWithoutBlockedSlots.getAvailableSlots()).isEmpty();
+                                Arrays.asList(slot1.getAllocationId(), 
slot2.getAllocationId())));
+        
assertThat(freeSlotTrackerWithoutBlockedSlots.getAvailableSlots()).isEmpty();
     }
 
     private AllocatedSlot createAllocatedSlot(ResourceID owner) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/FreeSlotInfoTrackerTestUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/FreeSlotTrackerTestUtils.java
similarity index 67%
rename from 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/FreeSlotInfoTrackerTestUtils.java
rename to 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/FreeSlotTrackerTestUtils.java
index e13835ba687..d2f9ab6b23f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/FreeSlotInfoTrackerTestUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/FreeSlotTrackerTestUtils.java
@@ -19,24 +19,23 @@
 package org.apache.flink.runtime.jobmaster.slotpool;
 
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.jobmaster.SlotInfo;
 
 import java.util.Map;
 
-/** Utils to create testing {@link FreeSlotInfoTracker}. */
-public class FreeSlotInfoTrackerTestUtils {
+/** Utils to create testing {@link FreeSlotTracker}. */
+public class FreeSlotTrackerTestUtils {
     /**
-     * Create default free slot info tracker for provided slots.
+     * Create default free slot tracker for provided slots.
      *
      * @param freeSlots slots to track
-     * @return default free slot info tracker
+     * @return default free slot tracker
      */
-    public static DefaultFreeSlotInfoTracker createDefaultFreeSlotInfoTracker(
-            Map<AllocationID, SlotInfo> freeSlots) {
-        return new DefaultFreeSlotInfoTracker(
+    public static DefaultFreeSlotTracker createDefaultFreeSlotTracker(
+            Map<AllocationID, PhysicalSlot> freeSlots) {
+        return new DefaultFreeSlotTracker(
                 freeSlots.keySet(),
                 freeSlots::get,
-                id -> new 
TestingFreeSlotInfoTracker.TestingFreeSlotInfo(freeSlots.get(id)),
+                id -> new 
TestingFreeSlotTracker.TestingFreeSlotInfo(freeSlots.get(id)),
                 ignored -> 0d);
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/LocationPreferenceSlotSelectionStrategyTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/LocationPreferenceSlotSelectionStrategyTest.java
index cd79adec7a9..cab14c55f36 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/LocationPreferenceSlotSelectionStrategyTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/LocationPreferenceSlotSelectionStrategyTest.java
@@ -130,21 +130,21 @@ class LocationPreferenceSlotSelectionStrategyTest extends 
SlotSelectionStrategyT
                         biggerResourceProfile, 
Collections.singletonList(tml2));
         Optional<SlotSelectionStrategy.SlotInfoAndLocality> match = 
runMatching(slotProfile);
 
-        assertMatchingSlotEqualsToSlotInfo(match, slotInfo2);
+        assertMatchingSlotEqualsToSlotInfo(match, slot2);
 
         slotProfile =
                 SlotProfileTestingUtils.preferredLocality(
                         resourceProfile, Arrays.asList(tmlX, tml4));
         match = runMatching(slotProfile);
 
-        assertMatchingSlotEqualsToSlotInfo(match, slotInfo4);
+        assertMatchingSlotEqualsToSlotInfo(match, slot4);
 
         slotProfile =
                 SlotProfileTestingUtils.preferredLocality(
                         resourceProfile, Arrays.asList(tml3, tml1, tml3, 
tmlX));
         match = runMatching(slotProfile);
 
-        assertMatchingSlotEqualsToSlotInfo(match, slotInfo3);
+        assertMatchingSlotEqualsToSlotInfo(match, slot3);
     }
 
     @Test
@@ -164,7 +164,7 @@ class LocationPreferenceSlotSelectionStrategyTest extends 
SlotSelectionStrategyT
         Optional<SlotSelectionStrategy.SlotInfoAndLocality> match = 
runMatching(slotProfile);
 
         // available previous allocation should override blacklisting
-        assertMatchingSlotEqualsToSlotInfo(match, slotInfo3);
+        assertMatchingSlotEqualsToSlotInfo(match, slot3);
     }
 
     protected static void assertMatchingSlotEqualsToSlotInfo(
@@ -178,7 +178,7 @@ class LocationPreferenceSlotSelectionStrategyTest extends 
SlotSelectionStrategyT
     protected static void assertMatchingSlotLocalityAndInCandidates(
             Optional<SlotSelectionStrategy.SlotInfoAndLocality> matchingSlot,
             Locality locality,
-            FreeSlotInfoTracker candidates) {
+            FreeSlotTracker candidates) {
         assertThat(matchingSlot)
                 .hasValueSatisfying(
                         slotInfoAndLocality -> {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PreviousAllocationSlotSelectionStrategyTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PreviousAllocationSlotSelectionStrategyTest.java
index 034ce7abdd9..7a6ff8c008c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PreviousAllocationSlotSelectionStrategyTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PreviousAllocationSlotSelectionStrategyTest.java
@@ -52,7 +52,7 @@ class PreviousAllocationSlotSelectionStrategyTest
                         Collections.singleton(aid3),
                         Collections.emptySet());
         Optional<SlotSelectionStrategy.SlotInfoAndLocality> match = 
runMatching(slotProfile);
-        assertMatchingSlotEqualsToSlotInfo(match, slotInfo3);
+        assertMatchingSlotEqualsToSlotInfo(match, slot3);
 
         slotProfile =
                 SlotProfile.priorAllocation(
@@ -62,7 +62,7 @@ class PreviousAllocationSlotSelectionStrategyTest
                         new HashSet<>(Arrays.asList(aidX, aid2)),
                         Collections.emptySet());
         match = runMatching(slotProfile);
-        assertMatchingSlotEqualsToSlotInfo(match, slotInfo2);
+        assertMatchingSlotEqualsToSlotInfo(match, slot2);
     }
 
     @Test
@@ -76,7 +76,7 @@ class PreviousAllocationSlotSelectionStrategyTest
                         Collections.singleton(aidX),
                         Collections.emptySet());
         Optional<SlotSelectionStrategy.SlotInfoAndLocality> match = 
runMatching(slotProfile);
-        assertMatchingSlotEqualsToSlotInfo(match, slotInfo4);
+        assertMatchingSlotEqualsToSlotInfo(match, slot4);
     }
 
     @Test
@@ -115,6 +115,6 @@ class PreviousAllocationSlotSelectionStrategyTest
         Optional<SlotSelectionStrategy.SlotInfoAndLocality> match = 
runMatching(slotProfile);
 
         // we expect that the candidate that is not blacklisted is returned
-        assertMatchingSlotEqualsToSlotInfo(match, slotInfo1);
+        assertMatchingSlotEqualsToSlotInfo(match, slot1);
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlotTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlotTest.java
index cdced79e907..6122fb5d224 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlotTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlotTest.java
@@ -21,13 +21,13 @@ package org.apache.flink.runtime.jobmaster.slotpool;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import 
org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
-import org.apache.flink.runtime.instance.SimpleSlotContext;
 import org.apache.flink.runtime.jobmanager.scheduler.Locality;
 import org.apache.flink.runtime.jobmanager.slots.DummySlotOwner;
 import org.apache.flink.runtime.jobmaster.LogicalSlot;
 import org.apache.flink.runtime.jobmaster.SlotContext;
 import org.apache.flink.runtime.jobmaster.SlotOwner;
 import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.scheduler.TestingPhysicalSlot;
 import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
@@ -73,12 +73,13 @@ class SingleLogicalSlotTest {
     }
 
     private static SlotContext createSlotContext() {
-        return new SimpleSlotContext(
-                new AllocationID(),
-                new LocalTaskManagerLocation(),
-                0,
-                new SimpleAckingTaskManagerGateway(),
-                ResourceProfile.ANY);
+        return TestingPhysicalSlot.builder()
+                .withAllocationID(new AllocationID())
+                .withTaskManagerLocation(new LocalTaskManagerLocation())
+                .withPhysicalSlotNumber(0)
+                .withTaskManagerGateway(new SimpleAckingTaskManagerGateway())
+                .withResourceProfile(ResourceProfile.ANY)
+                .build();
     }
 
     @Test
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSelectionStrategyTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSelectionStrategyTestBase.java
index 99a0253bb89..3bfbf18ca95 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSelectionStrategyTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSelectionStrategyTestBase.java
@@ -23,9 +23,8 @@ import 
org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.clusterframework.types.SlotProfile;
 import 
org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
-import org.apache.flink.runtime.instance.SimpleSlotContext;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
-import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.scheduler.TestingPhysicalSlot;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 
 import java.net.InetAddress;
@@ -58,27 +57,51 @@ abstract class SlotSelectionStrategyTestBase {
 
     protected final TaskManagerGateway taskManagerGateway = new 
SimpleAckingTaskManagerGateway();
 
-    protected final SlotInfo slotInfo1 =
-            new SimpleSlotContext(aid1, tml1, 1, taskManagerGateway, 
resourceProfile);
-    protected final SlotInfo slotInfo2 =
-            new SimpleSlotContext(aid2, tml2, 2, taskManagerGateway, 
biggerResourceProfile);
-    protected final SlotInfo slotInfo3 =
-            new SimpleSlotContext(aid3, tml3, 3, taskManagerGateway, 
resourceProfile);
-    protected final SlotInfo slotInfo4 =
-            new SimpleSlotContext(aid4, tml4, 4, taskManagerGateway, 
resourceProfile);
+    protected final PhysicalSlot slot1 =
+            TestingPhysicalSlot.builder()
+                    .withAllocationID(aid1)
+                    .withTaskManagerLocation(tml1)
+                    .withPhysicalSlotNumber(1)
+                    .withTaskManagerGateway(taskManagerGateway)
+                    .withResourceProfile(resourceProfile)
+                    .build();
+    protected final PhysicalSlot slot2 =
+            TestingPhysicalSlot.builder()
+                    .withAllocationID(aid2)
+                    .withTaskManagerLocation(tml2)
+                    .withPhysicalSlotNumber(2)
+                    .withTaskManagerGateway(taskManagerGateway)
+                    .withResourceProfile(biggerResourceProfile)
+                    .build();
+    protected final PhysicalSlot slot3 =
+            TestingPhysicalSlot.builder()
+                    .withAllocationID(aid3)
+                    .withTaskManagerLocation(tml3)
+                    .withPhysicalSlotNumber(3)
+                    .withTaskManagerGateway(taskManagerGateway)
+                    .withResourceProfile(resourceProfile)
+                    .build();
+    protected final PhysicalSlot slot4 =
+            TestingPhysicalSlot.builder()
+                    .withAllocationID(aid4)
+                    .withTaskManagerLocation(tml4)
+                    .withPhysicalSlotNumber(4)
+                    .withTaskManagerGateway(taskManagerGateway)
+                    .withResourceProfile(resourceProfile)
+                    .build();
 
-    protected final FreeSlotInfoTracker candidates = createCandidates();
+    protected final FreeSlotTracker candidates = createCandidates();
 
     protected SlotSelectionStrategy selectionStrategy;
 
-    private FreeSlotInfoTracker createCandidates() {
-        Map<AllocationID, SlotInfo> candidates = new HashMap<>(4);
+    private FreeSlotTracker createCandidates() {
+        Map<AllocationID, PhysicalSlot> candidates = new HashMap<>(4);
 
-        candidates.put(slotInfo1.getAllocationId(), slotInfo1);
-        candidates.put(slotInfo2.getAllocationId(), slotInfo2);
-        candidates.put(slotInfo3.getAllocationId(), slotInfo3);
-        candidates.put(slotInfo4.getAllocationId(), slotInfo4);
-        return 
FreeSlotInfoTrackerTestUtils.createDefaultFreeSlotInfoTracker(candidates);
+        candidates.put(slot1.getAllocationId(), slot1);
+        candidates.put(slot2.getAllocationId(), slot2);
+        candidates.put(slot3.getAllocationId(), slot3);
+        candidates.put(slot4.getAllocationId(), slot4);
+        return 
FreeSlotTrackerTestUtils.createDefaultFreeSlotTracker(candidates);
     }
 
     protected Optional<SlotSelectionStrategy.SlotInfoAndLocality> runMatching(
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPool.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPool.java
index a713a4b2b04..32a7f518f3c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPool.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPool.java
@@ -64,9 +64,9 @@ final class TestingDeclarativeSlotPool implements 
DeclarativeSlotPool {
                     Collection<SlotOffer>>
             registerSlotsFunction;
 
-    private final Supplier<Collection<SlotInfo>> 
getFreeSlotsInformationSupplier;
+    private final Supplier<Collection<PhysicalSlot>> 
getFreeSlotsInformationSupplier;
 
-    private final Supplier<FreeSlotInfoTracker> getFreeSlotInfoTrackerSupplier;
+    private final Supplier<FreeSlotTracker> getFreeSlotTrackerSupplier;
 
     private final Supplier<Collection<? extends SlotInfo>> 
getAllSlotsInformationSupplier;
 
@@ -105,8 +105,8 @@ final class TestingDeclarativeSlotPool implements 
DeclarativeSlotPool {
                             Long,
                             Collection<SlotOffer>>
                     registerSlotsFunction,
-            Supplier<Collection<SlotInfo>> getFreeSlotsInformationSupplier,
-            Supplier<FreeSlotInfoTracker> getFreeSlotInfoTrackerSupplier,
+            Supplier<Collection<PhysicalSlot>> getFreeSlotsInformationSupplier,
+            Supplier<FreeSlotTracker> getFreeSlotTrackerSupplier,
             Supplier<Collection<? extends SlotInfo>> 
getAllSlotsInformationSupplier,
             BiFunction<ResourceID, Exception, ResourceCounter> 
releaseSlotsFunction,
             BiFunction<AllocationID, Exception, ResourceCounter> 
releaseSlotFunction,
@@ -122,7 +122,7 @@ final class TestingDeclarativeSlotPool implements 
DeclarativeSlotPool {
         this.offerSlotsFunction = offerSlotsFunction;
         this.registerSlotsFunction = registerSlotsFunction;
         this.getFreeSlotsInformationSupplier = getFreeSlotsInformationSupplier;
-        this.getFreeSlotInfoTrackerSupplier = getFreeSlotInfoTrackerSupplier;
+        this.getFreeSlotTrackerSupplier = getFreeSlotTrackerSupplier;
         this.getAllSlotsInformationSupplier = getAllSlotsInformationSupplier;
         this.releaseSlotsFunction = releaseSlotsFunction;
         this.releaseSlotFunction = releaseSlotFunction;
@@ -175,8 +175,8 @@ final class TestingDeclarativeSlotPool implements 
DeclarativeSlotPool {
     }
 
     @Override
-    public FreeSlotInfoTracker getFreeSlotInfoTracker() {
-        return getFreeSlotInfoTrackerSupplier.get();
+    public FreeSlotTracker getFreeSlotTracker() {
+        return getFreeSlotTrackerSupplier.get();
     }
 
     @Override
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPoolBuilder.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPoolBuilder.java
index a2c345e1852..07262a17580 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPoolBuilder.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPoolBuilder.java
@@ -54,11 +54,12 @@ public class TestingDeclarativeSlotPoolBuilder {
                     Collection<SlotOffer>>
             offerSlotsFunction =
                     (ignoredA, ignoredB, ignoredC, ignoredD) -> 
Collections.emptyList();
-    private Supplier<Collection<SlotInfo>> getFreeSlotsInformationSupplier = 
Collections::emptyList;
+    private Supplier<Collection<PhysicalSlot>> getFreeSlotsInformationSupplier 
=
+            Collections::emptyList;
     private Supplier<Collection<? extends SlotInfo>> 
getAllSlotsInformationSupplier =
             Collections::emptyList;
-    private Supplier<FreeSlotInfoTracker> getFreeSlotInfoTrackerSupplier =
-            () -> TestingFreeSlotInfoTracker.newBuilder().build();
+    private Supplier<FreeSlotTracker> getFreeSlotTrackerSupplier =
+            () -> TestingFreeSlotTracker.newBuilder().build();
     private BiFunction<ResourceID, Exception, ResourceCounter> 
releaseSlotsFunction =
             (ignoredA, ignoredB) -> ResourceCounter.empty();
     private BiFunction<AllocationID, Exception, ResourceCounter> 
releaseSlotFunction =
@@ -129,14 +130,14 @@ public class TestingDeclarativeSlotPoolBuilder {
     }
 
     public TestingDeclarativeSlotPoolBuilder 
setGetFreeSlotsInformationSupplier(
-            Supplier<Collection<SlotInfo>> getFreeSlotsInformationSupplier) {
+            Supplier<Collection<PhysicalSlot>> 
getFreeSlotsInformationSupplier) {
         this.getFreeSlotsInformationSupplier = getFreeSlotsInformationSupplier;
         return this;
     }
 
-    public TestingDeclarativeSlotPoolBuilder setGetFreeSlotInfoTrackerSupplier(
-            Supplier<FreeSlotInfoTracker> getFreeSlotInfoTrackerSupplier) {
-        this.getFreeSlotInfoTrackerSupplier = getFreeSlotInfoTrackerSupplier;
+    public TestingDeclarativeSlotPoolBuilder setGetFreeSlotTrackerSupplier(
+            Supplier<FreeSlotTracker> getFreeSlotTrackerSupplier) {
+        this.getFreeSlotTrackerSupplier = getFreeSlotTrackerSupplier;
         return this;
     }
 
@@ -197,7 +198,7 @@ public class TestingDeclarativeSlotPoolBuilder {
                 offerSlotsFunction,
                 registerSlotsFunction,
                 getFreeSlotsInformationSupplier,
-                getFreeSlotInfoTrackerSupplier,
+                getFreeSlotTrackerSupplier,
                 getAllSlotsInformationSupplier,
                 releaseSlotsFunction,
                 releaseSlotFunction,
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingFreeSlotInfoTracker.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingFreeSlotTracker.java
similarity index 76%
rename from 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingFreeSlotInfoTracker.java
rename to 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingFreeSlotTracker.java
index d59ab1608b2..ff9b8803bbb 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingFreeSlotInfoTracker.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingFreeSlotTracker.java
@@ -28,28 +28,28 @@ import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.Supplier;
 
-/** Testing implements of {@link FreeSlotInfoTracker}. */
-public class TestingFreeSlotInfoTracker implements FreeSlotInfoTracker {
+/** Testing implements of {@link FreeSlotTracker}. */
+public class TestingFreeSlotTracker implements FreeSlotTracker {
     private final Supplier<Set<AllocationID>> getAvailableSlotsSupplier;
     private final Function<AllocationID, SlotInfo> getSlotInfoFunction;
     private final Supplier<Collection<AllocatedSlotPool.FreeSlotInfo>>
             getFreeSlotsWithIdleSinceInformationSupplier;
-    private final Supplier<Collection<SlotInfo>> 
getFreeSlotsInformationSupplier;
+    private final Supplier<Collection<PhysicalSlot>> 
getFreeSlotsInformationSupplier;
     private final Function<SlotInfo, Double> 
getTaskExecutorUtilizationFunction;
     private final Consumer<AllocationID> reserveSlotConsumer;
-    private final Function<Set<AllocationID>, FreeSlotInfoTracker>
-            createNewFreeSlotInfoTrackerWithoutBlockedSlotsFunction;
+    private final Function<Set<AllocationID>, FreeSlotTracker>
+            createNewFreeSlotTrackerWithoutBlockedSlotsFunction;
 
-    public TestingFreeSlotInfoTracker(
+    public TestingFreeSlotTracker(
             Supplier<Set<AllocationID>> getAvailableSlotsSupplier,
             Function<AllocationID, SlotInfo> getSlotInfoFunction,
             Supplier<Collection<AllocatedSlotPool.FreeSlotInfo>>
                     getFreeSlotsWithIdleSinceInformationSupplier,
-            Supplier<Collection<SlotInfo>> getFreeSlotsInformationSupplier,
+            Supplier<Collection<PhysicalSlot>> getFreeSlotsInformationSupplier,
             Function<SlotInfo, Double> getTaskExecutorUtilizationFunction,
             Consumer<AllocationID> reserveSlotConsumer,
-            Function<Set<AllocationID>, FreeSlotInfoTracker>
-                    createNewFreeSlotInfoTrackerWithoutBlockedSlotsFunction) {
+            Function<Set<AllocationID>, FreeSlotTracker>
+                    createNewFreeSlotTrackerWithoutBlockedSlotsFunction) {
         this.getAvailableSlotsSupplier = getAvailableSlotsSupplier;
         this.getSlotInfoFunction = getSlotInfoFunction;
         this.getFreeSlotsWithIdleSinceInformationSupplier =
@@ -57,8 +57,8 @@ public class TestingFreeSlotInfoTracker implements 
FreeSlotInfoTracker {
         this.getFreeSlotsInformationSupplier = getFreeSlotsInformationSupplier;
         this.getTaskExecutorUtilizationFunction = 
getTaskExecutorUtilizationFunction;
         this.reserveSlotConsumer = reserveSlotConsumer;
-        this.createNewFreeSlotInfoTrackerWithoutBlockedSlotsFunction =
-                createNewFreeSlotInfoTrackerWithoutBlockedSlotsFunction;
+        this.createNewFreeSlotTrackerWithoutBlockedSlotsFunction =
+                createNewFreeSlotTrackerWithoutBlockedSlotsFunction;
     }
 
     @Override
@@ -77,7 +77,7 @@ public class TestingFreeSlotInfoTracker implements 
FreeSlotInfoTracker {
     }
 
     @Override
-    public Collection<SlotInfo> getFreeSlotsInformation() {
+    public Collection<PhysicalSlot> getFreeSlotsInformation() {
         return getFreeSlotsInformationSupplier.get();
     }
 
@@ -92,27 +92,27 @@ public class TestingFreeSlotInfoTracker implements 
FreeSlotInfoTracker {
     }
 
     @Override
-    public FreeSlotInfoTracker createNewFreeSlotInfoTrackerWithoutBlockedSlots(
+    public FreeSlotTracker createNewFreeSlotTrackerWithoutBlockedSlots(
             Set<AllocationID> blockedSlots) {
-        return 
createNewFreeSlotInfoTrackerWithoutBlockedSlotsFunction.apply(blockedSlots);
+        return 
createNewFreeSlotTrackerWithoutBlockedSlotsFunction.apply(blockedSlots);
     }
 
     public static Builder newBuilder() {
         return new Builder();
     }
 
-    /** Builder of {@link TestingFreeSlotInfoTracker}. * */
+    /** Builder of {@link TestingFreeSlotTracker}. * */
     public static class Builder {
         private Supplier<Set<AllocationID>> getAvailableSlotsSupplier = 
Collections::emptySet;
         private Function<AllocationID, SlotInfo> getSlotInfoFunction = ignored 
-> null;
         private Supplier<Collection<AllocatedSlotPool.FreeSlotInfo>>
                 getFreeSlotsWithIdleSinceInformationSupplier = 
Collections::emptyList;
-        private Supplier<Collection<SlotInfo>> getFreeSlotsInformationSupplier 
=
+        private Supplier<Collection<PhysicalSlot>> 
getFreeSlotsInformationSupplier =
                 Collections::emptyList;
         private Function<SlotInfo, Double> getTaskExecutorUtilizationFunction 
= ignored -> 0d;
         private Consumer<AllocationID> reserveSlotConsumer = ignore -> {};
-        private Function<Set<AllocationID>, FreeSlotInfoTracker>
-                createNewFreeSlotInfoTrackerWithoutBlockedSlotsFunction = 
ignored -> null;
+        private Function<Set<AllocationID>, FreeSlotTracker>
+                createNewFreeSlotTrackerWithoutBlockedSlotsFunction = ignored 
-> null;
 
         public Builder setGetAvailableSlotsSupplier(
                 Supplier<Set<AllocationID>> getAvailableSlotsSupplier) {
@@ -135,7 +135,7 @@ public class TestingFreeSlotInfoTracker implements 
FreeSlotInfoTracker {
         }
 
         public Builder setGetFreeSlotsInformationSupplier(
-                Supplier<Collection<SlotInfo>> 
getFreeSlotsInformationSupplier) {
+                Supplier<Collection<PhysicalSlot>> 
getFreeSlotsInformationSupplier) {
             this.getFreeSlotsInformationSupplier = 
getFreeSlotsInformationSupplier;
             return this;
         }
@@ -151,23 +151,15 @@ public class TestingFreeSlotInfoTracker implements 
FreeSlotInfoTracker {
             return this;
         }
 
-        public Builder 
setCreateNewFreeSlotInfoTrackerWithoutBlockedSlotsFunction(
-                Function<Set<AllocationID>, FreeSlotInfoTracker>
-                        
createNewFreeSlotInfoTrackerWithoutBlockedSlotsFunction) {
-            this.createNewFreeSlotInfoTrackerWithoutBlockedSlotsFunction =
-                    createNewFreeSlotInfoTrackerWithoutBlockedSlotsFunction;
-            return this;
-        }
-
-        public TestingFreeSlotInfoTracker build() {
-            return new TestingFreeSlotInfoTracker(
+        public TestingFreeSlotTracker build() {
+            return new TestingFreeSlotTracker(
                     getAvailableSlotsSupplier,
                     getSlotInfoFunction,
                     getFreeSlotsWithIdleSinceInformationSupplier,
                     getFreeSlotsInformationSupplier,
                     getTaskExecutorUtilizationFunction,
                     reserveSlotConsumer,
-                    createNewFreeSlotInfoTrackerWithoutBlockedSlotsFunction);
+                    createNewFreeSlotTrackerWithoutBlockedSlotsFunction);
         }
     }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingPhysicalSlot.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingPhysicalSlot.java
index 0c9128ec1e8..bce2f90436b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingPhysicalSlot.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingPhysicalSlot.java
@@ -21,49 +21,79 @@ package org.apache.flink.runtime.scheduler;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import 
org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
-import org.apache.flink.runtime.instance.SimpleSlotContext;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
 import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.Preconditions;
 
 import javax.annotation.Nullable;
 
-/**
- * {@link SimpleSlotContext} subclass implementing the {@link PhysicalSlot} 
interface for testing
- * purposes.
- */
-public class TestingPhysicalSlot extends SimpleSlotContext implements 
PhysicalSlot {
+/** An implementing for the {@link PhysicalSlot} interface for testing 
purposes. */
+public class TestingPhysicalSlot implements PhysicalSlot {
+
+    private final AllocationID allocationId;
+
+    private final TaskManagerLocation taskManagerLocation;
+
+    private final int physicalSlotNumber;
+
+    private final TaskManagerGateway taskManagerGateway;
+
+    private final ResourceProfile resourceProfile;
+
     @Nullable private Payload payload;
 
     TestingPhysicalSlot(ResourceProfile resourceProfile, AllocationID 
allocationId) {
         this(
                 allocationId,
                 new LocalTaskManagerLocation(),
+                0,
                 new SimpleAckingTaskManagerGateway(),
                 resourceProfile);
     }
 
-    TestingPhysicalSlot(
-            AllocationID allocationID,
-            TaskManagerLocation taskManagerLocation,
-            TaskManagerGateway taskManagerGateway,
-            ResourceProfile resourceProfile) {
-        this(allocationID, taskManagerLocation, 0, taskManagerGateway, 
resourceProfile);
-    }
-
     TestingPhysicalSlot(
             AllocationID allocationId,
             TaskManagerLocation taskManagerLocation,
             int physicalSlotNumber,
             TaskManagerGateway taskManagerGateway,
             ResourceProfile resourceProfile) {
-        super(
-                allocationId,
-                taskManagerLocation,
-                physicalSlotNumber,
-                taskManagerGateway,
-                resourceProfile);
+        this.allocationId = Preconditions.checkNotNull(allocationId);
+        this.taskManagerLocation = 
Preconditions.checkNotNull(taskManagerLocation);
+        this.physicalSlotNumber = physicalSlotNumber;
+        this.taskManagerGateway = 
Preconditions.checkNotNull(taskManagerGateway);
+        this.resourceProfile = resourceProfile;
+    }
+
+    @Override
+    public AllocationID getAllocationId() {
+        return allocationId;
+    }
+
+    @Override
+    public TaskManagerLocation getTaskManagerLocation() {
+        return taskManagerLocation;
+    }
+
+    @Override
+    public int getPhysicalSlotNumber() {
+        return physicalSlotNumber;
+    }
+
+    @Override
+    public TaskManagerGateway getTaskManagerGateway() {
+        return taskManagerGateway;
+    }
+
+    @Override
+    public ResourceProfile getResourceProfile() {
+        return resourceProfile;
+    }
+
+    @Override
+    public boolean willBeOccupiedIndefinitely() {
+        return true;
     }
 
     @Override
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
index 2d035775f63..70babe57449 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
@@ -78,7 +78,7 @@ import 
org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPool;
 import org.apache.flink.runtime.jobmaster.slotpool.DefaultAllocatedSlotPool;
 import org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool;
 import 
org.apache.flink.runtime.jobmaster.slotpool.TestingDeclarativeSlotPoolBuilder;
-import org.apache.flink.runtime.jobmaster.slotpool.TestingFreeSlotInfoTracker;
+import org.apache.flink.runtime.jobmaster.slotpool.TestingFreeSlotTracker;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
 import org.apache.flink.runtime.metrics.MetricNames;
@@ -97,7 +97,7 @@ import 
org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
 import org.apache.flink.runtime.scheduler.TestingPhysicalSlot;
 import org.apache.flink.runtime.scheduler.VertexParallelismInformation;
 import org.apache.flink.runtime.scheduler.VertexParallelismStore;
-import org.apache.flink.runtime.scheduler.adaptive.allocator.TestSlotInfo;
+import org.apache.flink.runtime.scheduler.adaptive.allocator.TestingSlot;
 import 
org.apache.flink.runtime.scheduler.adaptive.allocator.TestingSlotAllocator;
 import 
org.apache.flink.runtime.scheduler.exceptionhistory.ExceptionHistoryEntry;
 import 
org.apache.flink.runtime.scheduler.exceptionhistory.RootExceptionHistoryEntry;
@@ -302,19 +302,19 @@ public class AdaptiveSchedulerTest {
     void testHasEnoughResourcesReturnsTrueIfSatisfied() {
         final ResourceCounter resourceRequirement =
                 ResourceCounter.withResource(ResourceProfile.UNKNOWN, 1);
-        final Collection<TestSlotInfo> freeSlots =
-                createSlotInfosForResourceRequirements(resourceRequirement);
+        final Collection<TestingSlot> freeSlots =
+                createSlotsForResourceRequirements(resourceRequirement);
         assertThat(AdaptiveScheduler.hasDesiredResources(resourceRequirement, 
freeSlots)).isTrue();
     }
 
-    private Collection<TestSlotInfo> createSlotInfosForResourceRequirements(
+    private Collection<TestingSlot> createSlotsForResourceRequirements(
             ResourceCounter resourceRequirements) {
-        final Collection<TestSlotInfo> slotInfos = new ArrayList<>();
+        final Collection<TestingSlot> slotInfos = new ArrayList<>();
 
         for (Map.Entry<ResourceProfile, Integer> resourceProfileCount :
                 resourceRequirements.getResourcesWithCount()) {
             for (int i = 0; i < resourceProfileCount.getValue(); i++) {
-                slotInfos.add(new TestSlotInfo(resourceProfileCount.getKey()));
+                slotInfos.add(new TestingSlot(resourceProfileCount.getKey()));
             }
         }
 
@@ -330,8 +330,8 @@ public class AdaptiveSchedulerTest {
                 ResourceCounter.withResource(
                         ResourceProfile.newBuilder().setCpuCores(1).build(), 
numRequiredSlots);
 
-        final Collection<TestSlotInfo> freeSlots =
-                createSlotInfosForResourceRequirements(providedResources);
+        final Collection<TestingSlot> freeSlots =
+                createSlotsForResourceRequirements(providedResources);
 
         assertThat(AdaptiveScheduler.hasDesiredResources(requiredResources, 
freeSlots)).isTrue();
     }
@@ -2243,15 +2243,15 @@ public class AdaptiveSchedulerTest {
                                         TestingPhysicalSlot.builder()
                                                 .withAllocationID(allocationId)
                                                 .build())
-                        .setGetFreeSlotInfoTrackerSupplier(
+                        .setGetFreeSlotTrackerSupplier(
                                 () ->
-                                        TestingFreeSlotInfoTracker.newBuilder()
+                                        TestingFreeSlotTracker.newBuilder()
                                                 
.setGetFreeSlotsInformationSupplier(
                                                         () ->
                                                                 
IntStream.range(0, parallelism)
                                                                         
.mapToObj(
                                                                                
 v ->
-                                                                               
         new TestSlotInfo())
+                                                                               
         new TestingSlot())
                                                                         
.collect(
                                                                                
 Collectors.toSet()))
                                                 .build())
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocatorTest.java
index bdf3396c980..a37a93ca9cd 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocatorTest.java
@@ -413,9 +413,9 @@ class SlotSharingSlotAllocatorTest {
                                 1)));
 
         List<SlotInfo> freeSlots = new ArrayList<>();
-        IntStream.range(0, 10).forEach(i -> freeSlots.add(new TestSlotInfo(new 
AllocationID())));
-        freeSlots.add(new TestSlotInfo(allocation1));
-        freeSlots.add(new TestSlotInfo(allocation2));
+        IntStream.range(0, 10).forEach(i -> freeSlots.add(new TestingSlot(new 
AllocationID())));
+        freeSlots.add(new TestingSlot(allocation1));
+        freeSlots.add(new TestingSlot(allocation2));
 
         JobSchedulingPlan schedulingPlan =
                 SlotSharingSlotAllocator.createSlotSharingSlotAllocator(
@@ -448,7 +448,7 @@ class SlotSharingSlotAllocatorTest {
     private static Collection<SlotInfo> getSlots(int count) {
         final Collection<SlotInfo> slotInfo = new ArrayList<>();
         for (int i = 0; i < count; i++) {
-            slotInfo.add(new TestSlotInfo());
+            slotInfo.add(new TestingSlot());
         }
         return slotInfo;
     }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssignerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssignerTest.java
index ef0f7fc1f3d..445eedc82ea 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssignerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssignerTest.java
@@ -175,7 +175,7 @@ class StateLocalitySlotAssignerTest {
         return new StateLocalitySlotAssigner()
                 .assignSlots(
                         new 
TestJobInformation(singletonList(vertexInformation)),
-                        
allocationIDs.stream().map(TestSlotInfo::new).collect(Collectors.toList()),
+                        
allocationIDs.stream().map(TestingSlot::new).collect(Collectors.toList()),
                         new VertexParallelism(
                                 singletonMap(
                                         vertexInformation.getJobVertexID(),
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestSlotInfo.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestingSlot.java
similarity index 73%
rename from 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestSlotInfo.java
rename to 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestingSlot.java
index 9f3573a0705..85133a83678 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestSlotInfo.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestingSlot.java
@@ -19,29 +19,30 @@ package 
org.apache.flink.runtime.scheduler.adaptive.allocator;
 
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
 import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 
-/** Test {@link SlotInfo} implementation. */
-public class TestSlotInfo implements SlotInfo {
+/** An implementation of {@link PhysicalSlot}for testing. */
+public class TestingSlot implements PhysicalSlot {
 
     private final AllocationID allocationId;
     private final ResourceProfile resourceProfile;
 
-    public TestSlotInfo() {
+    public TestingSlot() {
         this(new AllocationID(), ResourceProfile.ANY);
     }
 
-    public TestSlotInfo(AllocationID allocationId) {
+    public TestingSlot(AllocationID allocationId) {
         this(allocationId, ResourceProfile.ANY);
     }
 
-    public TestSlotInfo(ResourceProfile resourceProfile) {
+    public TestingSlot(ResourceProfile resourceProfile) {
         this(new AllocationID(), resourceProfile);
     }
 
-    public TestSlotInfo(AllocationID allocationId, ResourceProfile 
resourceProfile) {
+    public TestingSlot(AllocationID allocationId, ResourceProfile 
resourceProfile) {
         this.allocationId = allocationId;
         this.resourceProfile = resourceProfile;
     }
@@ -70,4 +71,14 @@ public class TestSlotInfo implements SlotInfo {
     public boolean willBeOccupiedIndefinitely() {
         return false;
     }
+
+    @Override
+    public TaskManagerGateway getTaskManagerGateway() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean tryAssignPayload(Payload payload) {
+        throw new UnsupportedOperationException();
+    }
 }

Reply via email to