This is an automated email from the ASF dual-hosted git repository. guoyangze pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 72bff2a2d0072602e4e625476bf5480dc50dc76c Author: Weihua Hu <huweihua....@gmail.com> AuthorDate: Tue Jul 18 14:47:28 2023 +0800 [FLINK-31843][runtime] remove redundant SlotPool#getFreeSlotsInformation --- .../jobmaster/slotpool/AllocatedSlotPool.java | 7 ---- .../jobmaster/slotpool/DeclarativeSlotPool.java | 8 ---- .../slotpool/DeclarativeSlotPoolBridge.java | 14 +------ .../slotpool/DeclarativeSlotPoolService.java | 4 +- .../slotpool/DefaultAllocatedSlotPool.java | 38 ++++++------------- .../slotpool/DefaultDeclarativeSlotPool.java | 10 +---- .../PhysicalSlotRequestBulkCheckerImpl.java | 2 +- .../flink/runtime/jobmaster/slotpool/SlotPool.java | 9 ----- .../scheduler/adaptive/AdaptiveScheduler.java | 4 +- .../flink/runtime/jobmaster/JobMasterTest.java | 11 ------ .../slotpool/DeclarativeSlotPoolServiceTest.java | 2 +- .../slotpool/DefaultAllocatedSlotPoolTest.java | 44 +++++++++++----------- .../slotpool/DefaultDeclarativeSlotPoolTest.java | 7 ++-- .../slotpool/TestingDeclarativeSlotPool.java | 5 --- 14 files changed, 48 insertions(+), 117 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AllocatedSlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AllocatedSlotPool.java index 4339e0b8f49..9e2c29d68da 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AllocatedSlotPool.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AllocatedSlotPool.java @@ -110,13 +110,6 @@ public interface AllocatedSlotPool { */ Optional<SlotInfo> getSlotInformation(AllocationID allocationID); - /** - * Returns information about all currently free slots. - * - * @return collection of free slot information - */ - Collection<FreeSlotInfo> getFreeSlotsInformation(); - /** * Returns information about all currently free slots. * diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPool.java index 981d6682777..5b8921aa12f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPool.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPool.java @@ -107,14 +107,6 @@ public interface DeclarativeSlotPool { TaskManagerGateway taskManagerGateway, long currentTime); - /** - * Returns the slot information for all free slots (slots which can be allocated from the slot - * pool). - * - * @return collection of free slot information - */ - Collection<SlotInfo> getFreeSlotsInformation(); - /** * Returns the free slot tracker. * diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java index 1d4af3db7ac..1a77528fa2d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java @@ -422,23 +422,13 @@ public class DeclarativeSlotPoolBridge extends DeclarativeSlotPoolService implem final Collection<? extends SlotInfo> allSlotsInformation = getDeclarativeSlotPool().getAllSlotsInformation(); final Set<AllocationID> freeSlots = - getDeclarativeSlotPool().getFreeSlotsInformation().stream() - .map(SlotInfo::getAllocationId) - .collect(Collectors.toSet()); + getDeclarativeSlotPool().getFreeSlotInfoTracker().getAvailableSlots(); return allSlotsInformation.stream() .filter(slotInfo -> !freeSlots.contains(slotInfo.getAllocationId())) .collect(Collectors.toList()); } - @Override - @Nonnull - public Collection<SlotInfo> getAvailableSlotsInformation() { - assertRunningInMainThread(); - - return getDeclarativeSlotPool().getFreeSlotsInformation(); - } - @Override public FreeSlotInfoTracker getFreeSlotInfoTracker() { assertRunningInMainThread(); @@ -519,7 +509,7 @@ public class DeclarativeSlotPoolBridge extends DeclarativeSlotPoolService implem private Set<ResourceProfile> getResourceProfilesFromAllSlots() { return Stream.concat( - getAvailableSlotsInformation().stream(), + getFreeSlotInfoTracker().getFreeSlotsInformation().stream(), getAllocatedSlotsInformation().stream()) .map(SlotInfo::getResourceProfile) .collect(Collectors.toSet()); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolService.java index ee5fadbc788..6088d8b45f7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolService.java @@ -241,7 +241,7 @@ public class DeclarativeSlotPoolService implements SlotPoolService { if (isTaskManagerRegistered(taskManagerId)) { Collection<AllocationID> freeSlots = - declarativeSlotPool.getFreeSlotsInformation().stream() + declarativeSlotPool.getFreeSlotInfoTracker().getFreeSlotsInformation().stream() .filter( slotInfo -> slotInfo.getTaskManagerLocation() @@ -338,6 +338,6 @@ public class DeclarativeSlotPoolService implements SlotPoolService { "Registered TMs: %d, registered slots: %d free slots: %d", registeredTaskManagers.size(), declarativeSlotPool.getAllSlotsInformation().size(), - declarativeSlotPool.getFreeSlotsInformation().size()); + declarativeSlotPool.getFreeSlotInfoTracker().getAvailableSlots().size()); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultAllocatedSlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultAllocatedSlotPool.java index a8020152b66..d3a7b21c6ee 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultAllocatedSlotPool.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultAllocatedSlotPool.java @@ -187,28 +187,6 @@ public class DefaultAllocatedSlotPool implements AllocatedSlotPool { return Optional.ofNullable(registeredSlots.get(allocationID)); } - @Override - public Collection<FreeSlotInfo> getFreeSlotsInformation() { - final Collection<FreeSlotInfo> freeSlotInfos = new ArrayList<>(); - - for (Map.Entry<AllocationID, Long> freeSlot : freeSlots.getFreeSlotsSince().entrySet()) { - final AllocatedSlot allocatedSlot = - Preconditions.checkNotNull(registeredSlots.get(freeSlot.getKey())); - - freeSlotInfos.add(DefaultFreeSlotInfo.create(allocatedSlot, freeSlot.getValue())); - } - - return freeSlotInfos; - } - - private FreeSlotInfo getFreeSlotInfo(AllocationID allocationId) { - final AllocatedSlot allocatedSlot = - Preconditions.checkNotNull(registeredSlots.get(allocationId)); - final Long idleSince = - Preconditions.checkNotNull(freeSlots.getFreeSlotsSince().get(allocationId)); - return DefaultFreeSlotInfo.create(allocatedSlot, idleSince); - } - @Override public FreeSlotInfoTracker getFreeSlotInfoTracker() { return new DefaultFreeSlotInfoTracker( @@ -218,7 +196,12 @@ public class DefaultAllocatedSlotPool implements AllocatedSlotPool { this::getTaskExecutorUtilization); } - public double getTaskExecutorUtilization(ResourceID resourceId) { + @Override + public Collection<? extends SlotInfo> getAllSlotsInformation() { + return registeredSlots.values(); + } + + private double getTaskExecutorUtilization(ResourceID resourceId) { Set<AllocationID> slots = slotsPerTaskExecutor.get(resourceId); Preconditions.checkNotNull(slots, "There is no slots on %s", resourceId); @@ -226,9 +209,12 @@ public class DefaultAllocatedSlotPool implements AllocatedSlotPool { / slots.size(); } - @Override - public Collection<? extends SlotInfo> getAllSlotsInformation() { - return registeredSlots.values(); + private FreeSlotInfo getFreeSlotInfo(AllocationID allocationId) { + final AllocatedSlot allocatedSlot = + Preconditions.checkNotNull(registeredSlots.get(allocationId)); + final Long idleSince = + Preconditions.checkNotNull(freeSlots.getFreeSlotsSince().get(allocationId)); + return DefaultFreeSlotInfo.create(allocatedSlot, idleSince); } private static final class FreeSlots { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java index 9b2e188c074..ebec6f34264 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java @@ -52,7 +52,6 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.function.Consumer; import java.util.function.Function; -import java.util.stream.Collectors; /** * Default {@link DeclarativeSlotPool} implementation. @@ -487,7 +486,7 @@ public class DefaultDeclarativeSlotPool implements DeclarativeSlotPool { @Override public void releaseIdleSlots(long currentTimeMillis) { final Collection<AllocatedSlotPool.FreeSlotInfo> freeSlotsInformation = - slotPool.getFreeSlotsInformation(); + slotPool.getFreeSlotInfoTracker().getFreeSlotsWithIdleSinceInformation(); ResourceCounter excessResources = fulfilledResourceRequirements.subtract(totalResourceRequirements); @@ -563,13 +562,6 @@ public class DefaultDeclarativeSlotPool implements DeclarativeSlotPool { } } - @Override - public Collection<SlotInfo> getFreeSlotsInformation() { - return slotPool.getFreeSlotsInformation().stream() - .map(AllocatedSlotPool.FreeSlotInfo::asSlotInfo) - .collect(Collectors.toList()); - } - @Override public FreeSlotInfoTracker getFreeSlotInfoTracker() { return slotPool.getFreeSlotInfoTracker(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequestBulkCheckerImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequestBulkCheckerImpl.java index 8be7cff3e3c..28a2a906767 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequestBulkCheckerImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequestBulkCheckerImpl.java @@ -200,7 +200,7 @@ public class PhysicalSlotRequestBulkCheckerImpl implements PhysicalSlotRequestBu private static Set<SlotInfo> getAllSlotInfos(SlotPool slotPool) { return Stream.concat( - slotPool.getAvailableSlotsInformation().stream(), + slotPool.getFreeSlotInfoTracker().getFreeSlotsInformation().stream(), slotPool.getAllocatedSlotsInformation().stream()) .collect(Collectors.toSet()); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java index 8bd2383891d..edff664625c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java @@ -115,15 +115,6 @@ public interface SlotPool extends AllocatedSlotActions, AutoCloseable { // allocating and disposing slots // ------------------------------------------------------------------------ - /** - * Returns a list of {@link SlotInfo} objects about all slots that are currently available in - * the slot pool. - * - * @return a list of {@link SlotInfo} objects about all slots that are currently available in - * the slot pool. - */ - Collection<SlotInfo> getAvailableSlotsInformation(); - /** * Returns all free slot tracker. * diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java index d4b9d65d403..4ee22c95848 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java @@ -835,7 +835,7 @@ public class AdaptiveScheduler @Override public boolean hasDesiredResources() { final Collection<? extends SlotInfo> freeSlots = - declarativeSlotPool.getFreeSlotsInformation(); + declarativeSlotPool.getFreeSlotInfoTracker().getFreeSlotsInformation(); return hasDesiredResources(desiredResources, freeSlots); } @@ -873,7 +873,7 @@ public class AdaptiveScheduler return slotAllocator .determineParallelismAndCalculateAssignment( jobInformation, - declarativeSlotPool.getFreeSlotsInformation(), + declarativeSlotPool.getFreeSlotInfoTracker().getFreeSlotsInformation(), JobAllocationsInformation.fromGraph(previousExecutionGraph)) .orElseThrow( () -> diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java index da7154c2ba4..e9889970a6c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java @@ -572,17 +572,6 @@ class JobMasterTest { "TestingSlotPool does not support this operation."); } - @Nonnull - @Override - public Collection<SlotInfo> getAvailableSlotsInformation() { - final Collection<SlotInfo> allSlotInfos = - registeredSlots.values().stream() - .flatMap(Collection::stream) - .collect(Collectors.toList()); - - return Collections.unmodifiableCollection(allSlotInfos); - } - @Override public FreeSlotInfoTracker getFreeSlotInfoTracker() { Map<AllocationID, SlotInfo> freeSlots = diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolServiceTest.java index 7d15fd1e320..811406695f3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolServiceTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolServiceTest.java @@ -338,7 +338,7 @@ class DeclarativeSlotPoolServiceTest { slotPoolService.releaseFreeSlotsOnTaskManager( taskManagerLocation.getResourceID(), new FlinkException("Test cause")); - assertThat(slotPool.getFreeSlotsInformation()).isEmpty(); + assertThat(slotPool.getFreeSlotInfoTracker().getAvailableSlots()).isEmpty(); assertThat( Iterables.getOnlyElement(slotPool.getAllSlotsInformation()) .getAllocationId()) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultAllocatedSlotPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultAllocatedSlotPoolTest.java index 9a539176ea6..2fe2b9e645e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultAllocatedSlotPoolTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultAllocatedSlotPoolTest.java @@ -186,7 +186,8 @@ class DefaultAllocatedSlotPoolTest { assertThat(slotPool.freeReservedSlot(slot.getAllocationId(), releaseTime)).isPresent(); assertSlotPoolContainsFreeSlots(slotPool, slots); - for (AllocatedSlotPool.FreeSlotInfo freeSlotInfo : slotPool.getFreeSlotsInformation()) { + for (AllocatedSlotPool.FreeSlotInfo freeSlotInfo : + slotPool.getFreeSlotInfoTracker().getFreeSlotsWithIdleSinceInformation()) { final long time; if (freeSlotInfo.getAllocationId().equals(slot.getAllocationId())) { time = releaseTime; @@ -208,7 +209,8 @@ class DefaultAllocatedSlotPoolTest { assertThat(slotPool.freeReservedSlot(slot.getAllocationId(), 1)).isNotPresent(); final AllocatedSlotPool.FreeSlotInfo freeSlotInfo = - Iterables.getOnlyElement(slotPool.getFreeSlotsInformation()); + Iterables.getOnlyElement( + slotPool.getFreeSlotInfoTracker().getFreeSlotsWithIdleSinceInformation()); assertThat(freeSlotInfo.getFreeSince()).isEqualTo(0L); } @@ -221,32 +223,32 @@ class DefaultAllocatedSlotPoolTest { slotPool.addSlots(slots, 0); - assertThat(slotPool.getFreeSlotsInformation()) + FreeSlotInfoTracker freeSlotInfoTracker = slotPool.getFreeSlotInfoTracker(); + + assertThat(freeSlotInfoTracker.getAvailableSlots()) .allSatisfy( - freeSlotInfo -> + allocationId -> assertThat( - slotPool.getTaskExecutorUtilization( - freeSlotInfo - .asSlotInfo() - .getTaskManagerLocation() - .getResourceID())) + freeSlotInfoTracker.getTaskExecutorUtilization( + freeSlotInfoTracker.getSlotInfo( + allocationId))) .isCloseTo(0, offset(0.1))); int numAllocatedSlots = 0; for (AllocatedSlot slot : slots) { assertThat(slotPool.reserveFreeSlot(slot.getAllocationId())).isEqualTo(slot); + freeSlotInfoTracker.reserveSlot(slot.getAllocationId()); numAllocatedSlots++; - - for (AllocatedSlotPool.FreeSlotInfo freeSlotInfo : slotPool.getFreeSlotsInformation()) { - final double utilization = (double) numAllocatedSlots / slots.size(); - assertThat( - slotPool.getTaskExecutorUtilization( - freeSlotInfo - .asSlotInfo() - .getTaskManagerLocation() - .getResourceID())) - .isCloseTo(utilization, offset(0.1)); - } + final double utilization = (double) numAllocatedSlots / slots.size(); + + assertThat(freeSlotInfoTracker.getAvailableSlots()) + .allSatisfy( + allocationId -> + assertThat( + freeSlotInfoTracker.getTaskExecutorUtilization( + freeSlotInfoTracker.getSlotInfo( + allocationId))) + .isCloseTo(utilization, offset(0.1))); } } @@ -305,7 +307,7 @@ class DefaultAllocatedSlotPoolTest { private void assertSlotPoolContainsFreeSlots( DefaultAllocatedSlotPool slotPool, Collection<AllocatedSlot> allocatedSlots) { final Collection<AllocatedSlotPool.FreeSlotInfo> freeSlotsInformation = - slotPool.getFreeSlotsInformation(); + slotPool.getFreeSlotInfoTracker().getFreeSlotsWithIdleSinceInformation(); assertThat(freeSlotsInformation).hasSize(allocatedSlots.size()); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolTest.java index 5f0a9cb6df4..bf57aab2b66 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolTest.java @@ -569,7 +569,8 @@ class DefaultDeclarativeSlotPoolTest { createSlotOffersForResourceRequirements( ResourceCounter.withResource(ResourceProfile.ANY, 1))); - final SlotInfo slot = slotPool.getFreeSlotsInformation().iterator().next(); + final SlotInfo slot = + slotPool.getFreeSlotInfoTracker().getFreeSlotsInformation().iterator().next(); slotPool.reserveFreeSlot(slot.getAllocationId(), largeResourceProfile); assertThat( @@ -615,7 +616,7 @@ class DefaultDeclarativeSlotPoolTest { ResourceCounter.withResource(smallResourceProfile, 1)); final SlotInfo largeSlot = - slotPool.getFreeSlotsInformation().stream() + slotPool.getFreeSlotInfoTracker().getFreeSlotsInformation().stream() .filter(slot -> slot.getResourceProfile().equals(largeResourceProfile)) .findFirst() .get(); @@ -677,7 +678,7 @@ class DefaultDeclarativeSlotPoolTest { 0L); final AllocationID allocationId = - slotPool.getFreeSlotsInformation().iterator().next().getAllocationId(); + slotPool.getFreeSlotInfoTracker().getAvailableSlots().iterator().next(); assertThat(slotPool.getResourceRequirements()).isEmpty(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPool.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPool.java index 6fc7ef32e3e..a713a4b2b04 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPool.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPool.java @@ -174,11 +174,6 @@ final class TestingDeclarativeSlotPool implements DeclarativeSlotPool { slots, taskManagerLocation, taskManagerGateway, currentTime); } - @Override - public Collection<SlotInfo> getFreeSlotsInformation() { - return getFreeSlotsInformationSupplier.get(); - } - @Override public FreeSlotInfoTracker getFreeSlotInfoTracker() { return getFreeSlotInfoTrackerSupplier.get();