This is an automated email from the ASF dual-hosted git repository.

guoyangze pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 72bff2a2d0072602e4e625476bf5480dc50dc76c
Author: Weihua Hu <huweihua....@gmail.com>
AuthorDate: Tue Jul 18 14:47:28 2023 +0800

    [FLINK-31843][runtime] remove redundant SlotPool#getFreeSlotsInformation
---
 .../jobmaster/slotpool/AllocatedSlotPool.java      |  7 ----
 .../jobmaster/slotpool/DeclarativeSlotPool.java    |  8 ----
 .../slotpool/DeclarativeSlotPoolBridge.java        | 14 +------
 .../slotpool/DeclarativeSlotPoolService.java       |  4 +-
 .../slotpool/DefaultAllocatedSlotPool.java         | 38 ++++++-------------
 .../slotpool/DefaultDeclarativeSlotPool.java       | 10 +----
 .../PhysicalSlotRequestBulkCheckerImpl.java        |  2 +-
 .../flink/runtime/jobmaster/slotpool/SlotPool.java |  9 -----
 .../scheduler/adaptive/AdaptiveScheduler.java      |  4 +-
 .../flink/runtime/jobmaster/JobMasterTest.java     | 11 ------
 .../slotpool/DeclarativeSlotPoolServiceTest.java   |  2 +-
 .../slotpool/DefaultAllocatedSlotPoolTest.java     | 44 +++++++++++-----------
 .../slotpool/DefaultDeclarativeSlotPoolTest.java   |  7 ++--
 .../slotpool/TestingDeclarativeSlotPool.java       |  5 ---
 14 files changed, 48 insertions(+), 117 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AllocatedSlotPool.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AllocatedSlotPool.java
index 4339e0b8f49..9e2c29d68da 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AllocatedSlotPool.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AllocatedSlotPool.java
@@ -110,13 +110,6 @@ public interface AllocatedSlotPool {
      */
     Optional<SlotInfo> getSlotInformation(AllocationID allocationID);
 
-    /**
-     * Returns information about all currently free slots.
-     *
-     * @return collection of free slot information
-     */
-    Collection<FreeSlotInfo> getFreeSlotsInformation();
-
     /**
      * Returns information about all currently free slots.
      *
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPool.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPool.java
index 981d6682777..5b8921aa12f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPool.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPool.java
@@ -107,14 +107,6 @@ public interface DeclarativeSlotPool {
             TaskManagerGateway taskManagerGateway,
             long currentTime);
 
-    /**
-     * Returns the slot information for all free slots (slots which can be 
allocated from the slot
-     * pool).
-     *
-     * @return collection of free slot information
-     */
-    Collection<SlotInfo> getFreeSlotsInformation();
-
     /**
      * Returns the free slot tracker.
      *
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java
index 1d4af3db7ac..1a77528fa2d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java
@@ -422,23 +422,13 @@ public class DeclarativeSlotPoolBridge extends 
DeclarativeSlotPoolService implem
         final Collection<? extends SlotInfo> allSlotsInformation =
                 getDeclarativeSlotPool().getAllSlotsInformation();
         final Set<AllocationID> freeSlots =
-                getDeclarativeSlotPool().getFreeSlotsInformation().stream()
-                        .map(SlotInfo::getAllocationId)
-                        .collect(Collectors.toSet());
+                
getDeclarativeSlotPool().getFreeSlotInfoTracker().getAvailableSlots();
 
         return allSlotsInformation.stream()
                 .filter(slotInfo -> 
!freeSlots.contains(slotInfo.getAllocationId()))
                 .collect(Collectors.toList());
     }
 
-    @Override
-    @Nonnull
-    public Collection<SlotInfo> getAvailableSlotsInformation() {
-        assertRunningInMainThread();
-
-        return getDeclarativeSlotPool().getFreeSlotsInformation();
-    }
-
     @Override
     public FreeSlotInfoTracker getFreeSlotInfoTracker() {
         assertRunningInMainThread();
@@ -519,7 +509,7 @@ public class DeclarativeSlotPoolBridge extends 
DeclarativeSlotPoolService implem
 
     private Set<ResourceProfile> getResourceProfilesFromAllSlots() {
         return Stream.concat(
-                        getAvailableSlotsInformation().stream(),
+                        
getFreeSlotInfoTracker().getFreeSlotsInformation().stream(),
                         getAllocatedSlotsInformation().stream())
                 .map(SlotInfo::getResourceProfile)
                 .collect(Collectors.toSet());
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolService.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolService.java
index ee5fadbc788..6088d8b45f7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolService.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolService.java
@@ -241,7 +241,7 @@ public class DeclarativeSlotPoolService implements 
SlotPoolService {
         if (isTaskManagerRegistered(taskManagerId)) {
 
             Collection<AllocationID> freeSlots =
-                    declarativeSlotPool.getFreeSlotsInformation().stream()
+                    
declarativeSlotPool.getFreeSlotInfoTracker().getFreeSlotsInformation().stream()
                             .filter(
                                     slotInfo ->
                                             slotInfo.getTaskManagerLocation()
@@ -338,6 +338,6 @@ public class DeclarativeSlotPoolService implements 
SlotPoolService {
                 "Registered TMs: %d, registered slots: %d free slots: %d",
                 registeredTaskManagers.size(),
                 declarativeSlotPool.getAllSlotsInformation().size(),
-                declarativeSlotPool.getFreeSlotsInformation().size());
+                
declarativeSlotPool.getFreeSlotInfoTracker().getAvailableSlots().size());
     }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultAllocatedSlotPool.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultAllocatedSlotPool.java
index a8020152b66..d3a7b21c6ee 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultAllocatedSlotPool.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultAllocatedSlotPool.java
@@ -187,28 +187,6 @@ public class DefaultAllocatedSlotPool implements 
AllocatedSlotPool {
         return Optional.ofNullable(registeredSlots.get(allocationID));
     }
 
-    @Override
-    public Collection<FreeSlotInfo> getFreeSlotsInformation() {
-        final Collection<FreeSlotInfo> freeSlotInfos = new ArrayList<>();
-
-        for (Map.Entry<AllocationID, Long> freeSlot : 
freeSlots.getFreeSlotsSince().entrySet()) {
-            final AllocatedSlot allocatedSlot =
-                    
Preconditions.checkNotNull(registeredSlots.get(freeSlot.getKey()));
-
-            freeSlotInfos.add(DefaultFreeSlotInfo.create(allocatedSlot, 
freeSlot.getValue()));
-        }
-
-        return freeSlotInfos;
-    }
-
-    private FreeSlotInfo getFreeSlotInfo(AllocationID allocationId) {
-        final AllocatedSlot allocatedSlot =
-                Preconditions.checkNotNull(registeredSlots.get(allocationId));
-        final Long idleSince =
-                
Preconditions.checkNotNull(freeSlots.getFreeSlotsSince().get(allocationId));
-        return DefaultFreeSlotInfo.create(allocatedSlot, idleSince);
-    }
-
     @Override
     public FreeSlotInfoTracker getFreeSlotInfoTracker() {
         return new DefaultFreeSlotInfoTracker(
@@ -218,7 +196,12 @@ public class DefaultAllocatedSlotPool implements 
AllocatedSlotPool {
                 this::getTaskExecutorUtilization);
     }
 
-    public double getTaskExecutorUtilization(ResourceID resourceId) {
+    @Override
+    public Collection<? extends SlotInfo> getAllSlotsInformation() {
+        return registeredSlots.values();
+    }
+
+    private double getTaskExecutorUtilization(ResourceID resourceId) {
         Set<AllocationID> slots = slotsPerTaskExecutor.get(resourceId);
         Preconditions.checkNotNull(slots, "There is no slots on %s", 
resourceId);
 
@@ -226,9 +209,12 @@ public class DefaultAllocatedSlotPool implements 
AllocatedSlotPool {
                 / slots.size();
     }
 
-    @Override
-    public Collection<? extends SlotInfo> getAllSlotsInformation() {
-        return registeredSlots.values();
+    private FreeSlotInfo getFreeSlotInfo(AllocationID allocationId) {
+        final AllocatedSlot allocatedSlot =
+                Preconditions.checkNotNull(registeredSlots.get(allocationId));
+        final Long idleSince =
+                
Preconditions.checkNotNull(freeSlots.getFreeSlotsSince().get(allocationId));
+        return DefaultFreeSlotInfo.create(allocatedSlot, idleSince);
     }
 
     private static final class FreeSlots {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java
index 9b2e188c074..ebec6f34264 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java
@@ -52,7 +52,6 @@ import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Consumer;
 import java.util.function.Function;
-import java.util.stream.Collectors;
 
 /**
  * Default {@link DeclarativeSlotPool} implementation.
@@ -487,7 +486,7 @@ public class DefaultDeclarativeSlotPool implements 
DeclarativeSlotPool {
     @Override
     public void releaseIdleSlots(long currentTimeMillis) {
         final Collection<AllocatedSlotPool.FreeSlotInfo> freeSlotsInformation =
-                slotPool.getFreeSlotsInformation();
+                
slotPool.getFreeSlotInfoTracker().getFreeSlotsWithIdleSinceInformation();
 
         ResourceCounter excessResources =
                 
fulfilledResourceRequirements.subtract(totalResourceRequirements);
@@ -563,13 +562,6 @@ public class DefaultDeclarativeSlotPool implements 
DeclarativeSlotPool {
         }
     }
 
-    @Override
-    public Collection<SlotInfo> getFreeSlotsInformation() {
-        return slotPool.getFreeSlotsInformation().stream()
-                .map(AllocatedSlotPool.FreeSlotInfo::asSlotInfo)
-                .collect(Collectors.toList());
-    }
-
     @Override
     public FreeSlotInfoTracker getFreeSlotInfoTracker() {
         return slotPool.getFreeSlotInfoTracker();
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequestBulkCheckerImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequestBulkCheckerImpl.java
index 8be7cff3e3c..28a2a906767 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequestBulkCheckerImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequestBulkCheckerImpl.java
@@ -200,7 +200,7 @@ public class PhysicalSlotRequestBulkCheckerImpl implements 
PhysicalSlotRequestBu
 
     private static Set<SlotInfo> getAllSlotInfos(SlotPool slotPool) {
         return Stream.concat(
-                        slotPool.getAvailableSlotsInformation().stream(),
+                        
slotPool.getFreeSlotInfoTracker().getFreeSlotsInformation().stream(),
                         slotPool.getAllocatedSlotsInformation().stream())
                 .collect(Collectors.toSet());
     }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
index 8bd2383891d..edff664625c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
@@ -115,15 +115,6 @@ public interface SlotPool extends AllocatedSlotActions, 
AutoCloseable {
     //  allocating and disposing slots
     // ------------------------------------------------------------------------
 
-    /**
-     * Returns a list of {@link SlotInfo} objects about all slots that are 
currently available in
-     * the slot pool.
-     *
-     * @return a list of {@link SlotInfo} objects about all slots that are 
currently available in
-     *     the slot pool.
-     */
-    Collection<SlotInfo> getAvailableSlotsInformation();
-
     /**
      * Returns all free slot tracker.
      *
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
index d4b9d65d403..4ee22c95848 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
@@ -835,7 +835,7 @@ public class AdaptiveScheduler
     @Override
     public boolean hasDesiredResources() {
         final Collection<? extends SlotInfo> freeSlots =
-                declarativeSlotPool.getFreeSlotsInformation();
+                
declarativeSlotPool.getFreeSlotInfoTracker().getFreeSlotsInformation();
         return hasDesiredResources(desiredResources, freeSlots);
     }
 
@@ -873,7 +873,7 @@ public class AdaptiveScheduler
         return slotAllocator
                 .determineParallelismAndCalculateAssignment(
                         jobInformation,
-                        declarativeSlotPool.getFreeSlotsInformation(),
+                        
declarativeSlotPool.getFreeSlotInfoTracker().getFreeSlotsInformation(),
                         
JobAllocationsInformation.fromGraph(previousExecutionGraph))
                 .orElseThrow(
                         () ->
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index da7154c2ba4..e9889970a6c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -572,17 +572,6 @@ class JobMasterTest {
                     "TestingSlotPool does not support this operation.");
         }
 
-        @Nonnull
-        @Override
-        public Collection<SlotInfo> getAvailableSlotsInformation() {
-            final Collection<SlotInfo> allSlotInfos =
-                    registeredSlots.values().stream()
-                            .flatMap(Collection::stream)
-                            .collect(Collectors.toList());
-
-            return Collections.unmodifiableCollection(allSlotInfos);
-        }
-
         @Override
         public FreeSlotInfoTracker getFreeSlotInfoTracker() {
             Map<AllocationID, SlotInfo> freeSlots =
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolServiceTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolServiceTest.java
index 7d15fd1e320..811406695f3 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolServiceTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolServiceTest.java
@@ -338,7 +338,7 @@ class DeclarativeSlotPoolServiceTest {
             slotPoolService.releaseFreeSlotsOnTaskManager(
                     taskManagerLocation.getResourceID(), new 
FlinkException("Test cause"));
 
-            assertThat(slotPool.getFreeSlotsInformation()).isEmpty();
+            
assertThat(slotPool.getFreeSlotInfoTracker().getAvailableSlots()).isEmpty();
             assertThat(
                             
Iterables.getOnlyElement(slotPool.getAllSlotsInformation())
                                     .getAllocationId())
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultAllocatedSlotPoolTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultAllocatedSlotPoolTest.java
index 9a539176ea6..2fe2b9e645e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultAllocatedSlotPoolTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultAllocatedSlotPoolTest.java
@@ -186,7 +186,8 @@ class DefaultAllocatedSlotPoolTest {
         assertThat(slotPool.freeReservedSlot(slot.getAllocationId(), 
releaseTime)).isPresent();
         assertSlotPoolContainsFreeSlots(slotPool, slots);
 
-        for (AllocatedSlotPool.FreeSlotInfo freeSlotInfo : 
slotPool.getFreeSlotsInformation()) {
+        for (AllocatedSlotPool.FreeSlotInfo freeSlotInfo :
+                
slotPool.getFreeSlotInfoTracker().getFreeSlotsWithIdleSinceInformation()) {
             final long time;
             if (freeSlotInfo.getAllocationId().equals(slot.getAllocationId())) 
{
                 time = releaseTime;
@@ -208,7 +209,8 @@ class DefaultAllocatedSlotPoolTest {
         assertThat(slotPool.freeReservedSlot(slot.getAllocationId(), 
1)).isNotPresent();
 
         final AllocatedSlotPool.FreeSlotInfo freeSlotInfo =
-                Iterables.getOnlyElement(slotPool.getFreeSlotsInformation());
+                Iterables.getOnlyElement(
+                        
slotPool.getFreeSlotInfoTracker().getFreeSlotsWithIdleSinceInformation());
 
         assertThat(freeSlotInfo.getFreeSince()).isEqualTo(0L);
     }
@@ -221,32 +223,32 @@ class DefaultAllocatedSlotPoolTest {
 
         slotPool.addSlots(slots, 0);
 
-        assertThat(slotPool.getFreeSlotsInformation())
+        FreeSlotInfoTracker freeSlotInfoTracker = 
slotPool.getFreeSlotInfoTracker();
+
+        assertThat(freeSlotInfoTracker.getAvailableSlots())
                 .allSatisfy(
-                        freeSlotInfo ->
+                        allocationId ->
                                 assertThat(
-                                                
slotPool.getTaskExecutorUtilization(
-                                                        freeSlotInfo
-                                                                .asSlotInfo()
-                                                                
.getTaskManagerLocation()
-                                                                
.getResourceID()))
+                                                
freeSlotInfoTracker.getTaskExecutorUtilization(
+                                                        
freeSlotInfoTracker.getSlotInfo(
+                                                                allocationId)))
                                         .isCloseTo(0, offset(0.1)));
 
         int numAllocatedSlots = 0;
         for (AllocatedSlot slot : slots) {
             
assertThat(slotPool.reserveFreeSlot(slot.getAllocationId())).isEqualTo(slot);
+            freeSlotInfoTracker.reserveSlot(slot.getAllocationId());
             numAllocatedSlots++;
-
-            for (AllocatedSlotPool.FreeSlotInfo freeSlotInfo : 
slotPool.getFreeSlotsInformation()) {
-                final double utilization = (double) numAllocatedSlots / 
slots.size();
-                assertThat(
-                                slotPool.getTaskExecutorUtilization(
-                                        freeSlotInfo
-                                                .asSlotInfo()
-                                                .getTaskManagerLocation()
-                                                .getResourceID()))
-                        .isCloseTo(utilization, offset(0.1));
-            }
+            final double utilization = (double) numAllocatedSlots / 
slots.size();
+
+            assertThat(freeSlotInfoTracker.getAvailableSlots())
+                    .allSatisfy(
+                            allocationId ->
+                                    assertThat(
+                                                    
freeSlotInfoTracker.getTaskExecutorUtilization(
+                                                            
freeSlotInfoTracker.getSlotInfo(
+                                                                    
allocationId)))
+                                            .isCloseTo(utilization, 
offset(0.1)));
         }
     }
 
@@ -305,7 +307,7 @@ class DefaultAllocatedSlotPoolTest {
     private void assertSlotPoolContainsFreeSlots(
             DefaultAllocatedSlotPool slotPool, Collection<AllocatedSlot> 
allocatedSlots) {
         final Collection<AllocatedSlotPool.FreeSlotInfo> freeSlotsInformation =
-                slotPool.getFreeSlotsInformation();
+                
slotPool.getFreeSlotInfoTracker().getFreeSlotsWithIdleSinceInformation();
 
         assertThat(freeSlotsInformation).hasSize(allocatedSlots.size());
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolTest.java
index 5f0a9cb6df4..bf57aab2b66 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolTest.java
@@ -569,7 +569,8 @@ class DefaultDeclarativeSlotPoolTest {
                 createSlotOffersForResourceRequirements(
                         ResourceCounter.withResource(ResourceProfile.ANY, 1)));
 
-        final SlotInfo slot = 
slotPool.getFreeSlotsInformation().iterator().next();
+        final SlotInfo slot =
+                
slotPool.getFreeSlotInfoTracker().getFreeSlotsInformation().iterator().next();
 
         slotPool.reserveFreeSlot(slot.getAllocationId(), largeResourceProfile);
         assertThat(
@@ -615,7 +616,7 @@ class DefaultDeclarativeSlotPoolTest {
                 ResourceCounter.withResource(smallResourceProfile, 1));
 
         final SlotInfo largeSlot =
-                slotPool.getFreeSlotsInformation().stream()
+                
slotPool.getFreeSlotInfoTracker().getFreeSlotsInformation().stream()
                         .filter(slot -> 
slot.getResourceProfile().equals(largeResourceProfile))
                         .findFirst()
                         .get();
@@ -677,7 +678,7 @@ class DefaultDeclarativeSlotPoolTest {
                 0L);
 
         final AllocationID allocationId =
-                
slotPool.getFreeSlotsInformation().iterator().next().getAllocationId();
+                
slotPool.getFreeSlotInfoTracker().getAvailableSlots().iterator().next();
 
         assertThat(slotPool.getResourceRequirements()).isEmpty();
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPool.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPool.java
index 6fc7ef32e3e..a713a4b2b04 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPool.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPool.java
@@ -174,11 +174,6 @@ final class TestingDeclarativeSlotPool implements 
DeclarativeSlotPool {
                 slots, taskManagerLocation, taskManagerGateway, currentTime);
     }
 
-    @Override
-    public Collection<SlotInfo> getFreeSlotsInformation() {
-        return getFreeSlotsInformationSupplier.get();
-    }
-
     @Override
     public FreeSlotInfoTracker getFreeSlotInfoTracker() {
         return getFreeSlotInfoTrackerSupplier.get();

Reply via email to