This is an automated email from the ASF dual-hosted git repository. fanrui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit d033f64b41f1d11edd22b2d70699dac7822abfe5 Author: Roc Marshal <[email protected]> AuthorDate: Fri Aug 2 19:15:07 2024 +0800 [FLINK-33875][refactor] Refactor FreeSlotInfoTracker to FreeSlotTracker for the better semantic --- .../jobmaster/slotpool/AllocatedSlotPool.java | 2 +- .../jobmaster/slotpool/DeclarativeSlotPool.java | 2 +- .../slotpool/DeclarativeSlotPoolBridge.java | 8 +- .../slotpool/DeclarativeSlotPoolService.java | 4 +- .../slotpool/DefaultAllocatedSlotPool.java | 4 +- .../slotpool/DefaultDeclarativeSlotPool.java | 6 +- ...nfoTracker.java => DefaultFreeSlotTracker.java} | 28 +++---- ...ultLocationPreferenceSlotSelectionStrategy.java | 7 +- ...OutLocationPreferenceSlotSelectionStrategy.java | 12 +-- ...eeSlotInfoTracker.java => FreeSlotTracker.java} | 17 ++-- .../LocationPreferenceSlotSelectionStrategy.java | 19 ++--- .../slotpool/PhysicalSlotProviderImpl.java | 6 +- .../PhysicalSlotRequestBulkCheckerImpl.java | 2 +- .../PreviousAllocationSlotSelectionStrategy.java | 9 +- .../flink/runtime/jobmaster/slotpool/SlotPool.java | 2 +- .../jobmaster/slotpool/SlotSelectionStrategy.java | 6 +- .../scheduler/adaptive/AdaptiveScheduler.java | 4 +- .../flink/runtime/instance/SimpleSlotContext.java | 96 ---------------------- .../flink/runtime/jobmaster/JobMasterTest.java | 29 +++---- .../slotpool/DeclarativeSlotPoolServiceTest.java | 34 ++++---- .../slotpool/DefaultAllocatedSlotPoolTest.java | 23 +++--- .../slotpool/DefaultDeclarativeSlotPoolTest.java | 10 +-- ...erTest.java => DefaultFreeSlotTrackerTest.java} | 55 ++++++------- ...estUtils.java => FreeSlotTrackerTestUtils.java} | 17 ++-- ...ocationPreferenceSlotSelectionStrategyTest.java | 10 +-- ...reviousAllocationSlotSelectionStrategyTest.java | 8 +- .../jobmaster/slotpool/SingleLogicalSlotTest.java | 15 ++-- .../slotpool/SlotSelectionStrategyTestBase.java | 59 +++++++++---- .../slotpool/TestingDeclarativeSlotPool.java | 14 ++-- .../TestingDeclarativeSlotPoolBuilder.java | 17 ++-- ...nfoTracker.java => TestingFreeSlotTracker.java} | 52 +++++------- .../runtime/scheduler/TestingPhysicalSlot.java | 70 +++++++++++----- .../scheduler/adaptive/AdaptiveSchedulerTest.java | 24 +++--- .../allocator/SlotSharingSlotAllocatorTest.java | 8 +- .../allocator/StateLocalitySlotAssignerTest.java | 2 +- .../{TestSlotInfo.java => TestingSlot.java} | 25 ++++-- 36 files changed, 332 insertions(+), 374 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AllocatedSlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AllocatedSlotPool.java index 9e2c29d68da..e0a9ef365d3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AllocatedSlotPool.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AllocatedSlotPool.java @@ -115,7 +115,7 @@ public interface AllocatedSlotPool { * * @return free slot information */ - FreeSlotInfoTracker getFreeSlotInfoTracker(); + FreeSlotTracker getFreeSlotTracker(); /** * Returns information about all slots in this pool. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPool.java index 5b8921aa12f..28a6cd9dea2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPool.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPool.java @@ -112,7 +112,7 @@ public interface DeclarativeSlotPool { * * @return free slot tracker */ - FreeSlotInfoTracker getFreeSlotInfoTracker(); + FreeSlotTracker getFreeSlotTracker(); /** * Returns the slot information for all slots (free and allocated slots). diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java index 38a9e1312f5..d7ea9a0cab6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java @@ -424,7 +424,7 @@ public class DeclarativeSlotPoolBridge extends DeclarativeSlotPoolService implem final Collection<? extends SlotInfo> allSlotsInformation = getDeclarativeSlotPool().getAllSlotsInformation(); final Set<AllocationID> freeSlots = - getDeclarativeSlotPool().getFreeSlotInfoTracker().getAvailableSlots(); + getDeclarativeSlotPool().getFreeSlotTracker().getAvailableSlots(); return allSlotsInformation.stream() .filter(slotInfo -> !freeSlots.contains(slotInfo.getAllocationId())) @@ -432,10 +432,10 @@ public class DeclarativeSlotPoolBridge extends DeclarativeSlotPoolService implem } @Override - public FreeSlotInfoTracker getFreeSlotInfoTracker() { + public FreeSlotTracker getFreeSlotTracker() { assertRunningInMainThread(); - return getDeclarativeSlotPool().getFreeSlotInfoTracker(); + return getDeclarativeSlotPool().getFreeSlotTracker(); } @Override @@ -509,7 +509,7 @@ public class DeclarativeSlotPoolBridge extends DeclarativeSlotPoolService implem private Set<ResourceProfile> getResourceProfilesFromAllSlots() { return Stream.concat( - getFreeSlotInfoTracker().getFreeSlotsInformation().stream(), + getFreeSlotTracker().getFreeSlotsInformation().stream(), getAllocatedSlotsInformation().stream()) .map(SlotInfo::getResourceProfile) .collect(Collectors.toSet()); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolService.java index 2345a553eb7..7ced0b70189 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolService.java @@ -247,7 +247,7 @@ public class DeclarativeSlotPoolService implements SlotPoolService { if (isTaskManagerRegistered(taskManagerId)) { Collection<AllocationID> freeSlots = - declarativeSlotPool.getFreeSlotInfoTracker().getFreeSlotsInformation().stream() + declarativeSlotPool.getFreeSlotTracker().getFreeSlotsInformation().stream() .filter( slotInfo -> slotInfo.getTaskManagerLocation() @@ -344,6 +344,6 @@ public class DeclarativeSlotPoolService implements SlotPoolService { "Registered TMs: %d, registered slots: %d free slots: %d", registeredTaskManagers.size(), declarativeSlotPool.getAllSlotsInformation().size(), - declarativeSlotPool.getFreeSlotInfoTracker().getAvailableSlots().size()); + declarativeSlotPool.getFreeSlotTracker().getAvailableSlots().size()); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultAllocatedSlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultAllocatedSlotPool.java index d3a7b21c6ee..edcae550a5d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultAllocatedSlotPool.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultAllocatedSlotPool.java @@ -188,8 +188,8 @@ public class DefaultAllocatedSlotPool implements AllocatedSlotPool { } @Override - public FreeSlotInfoTracker getFreeSlotInfoTracker() { - return new DefaultFreeSlotInfoTracker( + public FreeSlotTracker getFreeSlotTracker() { + return new DefaultFreeSlotTracker( freeSlots.getFreeSlotsSince().keySet(), registeredSlots::get, this::getFreeSlotInfo, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java index 2689aaa606f..f4db5974276 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java @@ -519,7 +519,7 @@ public class DefaultDeclarativeSlotPool implements DeclarativeSlotPool { @Override public void releaseIdleSlots(long currentTimeMillis) { final Collection<AllocatedSlotPool.FreeSlotInfo> freeSlotsInformation = - slotPool.getFreeSlotInfoTracker().getFreeSlotsWithIdleSinceInformation(); + slotPool.getFreeSlotTracker().getFreeSlotsWithIdleSinceInformation(); ResourceCounter excessResources = fulfilledResourceRequirements.subtract(totalResourceRequirements); @@ -599,8 +599,8 @@ public class DefaultDeclarativeSlotPool implements DeclarativeSlotPool { } @Override - public FreeSlotInfoTracker getFreeSlotInfoTracker() { - return slotPool.getFreeSlotInfoTracker(); + public FreeSlotTracker getFreeSlotTracker() { + return slotPool.getFreeSlotTracker(); } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultFreeSlotInfoTracker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultFreeSlotTracker.java similarity index 78% rename from flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultFreeSlotInfoTracker.java rename to flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultFreeSlotTracker.java index 0a394254992..8f19866a0ad 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultFreeSlotInfoTracker.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultFreeSlotTracker.java @@ -30,20 +30,20 @@ import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; -/** Default implements of {@link FreeSlotInfoTracker}. */ -public class DefaultFreeSlotInfoTracker implements FreeSlotInfoTracker { +/** Default implements of {@link FreeSlotTracker}. */ +public class DefaultFreeSlotTracker implements FreeSlotTracker { private final Set<AllocationID> freeSlots; - private final Function<AllocationID, SlotInfo> slotInfoLookup; + private final Function<AllocationID, PhysicalSlot> physicalSlotLookup; private final Function<AllocationID, AllocatedSlotPool.FreeSlotInfo> freeSlotInfoLookup; private final Function<ResourceID, Double> taskExecutorUtilizationLookup; - public DefaultFreeSlotInfoTracker( + public DefaultFreeSlotTracker( Set<AllocationID> freeSlots, - Function<AllocationID, SlotInfo> slotInfoLookup, + Function<AllocationID, PhysicalSlot> physicalSlotLookup, Function<AllocationID, AllocatedSlotPool.FreeSlotInfo> freeSlotInfoLookup, Function<ResourceID, Double> taskExecutorUtilizationLookup) { this.freeSlots = new HashSet<>(freeSlots); - this.slotInfoLookup = slotInfoLookup; + this.physicalSlotLookup = physicalSlotLookup; this.freeSlotInfoLookup = freeSlotInfoLookup; this.taskExecutorUtilizationLookup = taskExecutorUtilizationLookup; } @@ -55,7 +55,7 @@ public class DefaultFreeSlotInfoTracker implements FreeSlotInfoTracker { @Override public SlotInfo getSlotInfo(AllocationID allocationId) { - return Preconditions.checkNotNull(slotInfoLookup.apply(allocationId)); + return Preconditions.checkNotNull(physicalSlotLookup.apply(allocationId)); } @Override @@ -64,8 +64,8 @@ public class DefaultFreeSlotInfoTracker implements FreeSlotInfoTracker { } @Override - public Collection<SlotInfo> getFreeSlotsInformation() { - return freeSlots.stream().map(slotInfoLookup).collect(Collectors.toList()); + public Collection<PhysicalSlot> getFreeSlotsInformation() { + return freeSlots.stream().map(physicalSlotLookup).collect(Collectors.toList()); } @Override @@ -83,17 +83,17 @@ public class DefaultFreeSlotInfoTracker implements FreeSlotInfoTracker { } @Override - public DefaultFreeSlotInfoTracker createNewFreeSlotInfoTrackerWithoutBlockedSlots( + public DefaultFreeSlotTracker createNewFreeSlotTrackerWithoutBlockedSlots( Set<AllocationID> blockedSlots) { - Set<AllocationID> freeSlotInfoTrackerWithoutBlockedSlots = + Set<AllocationID> freeSlotTrackerWithoutBlockedSlots = freeSlots.stream() .filter(slot -> !blockedSlots.contains(slot)) .collect(Collectors.toSet()); - return new DefaultFreeSlotInfoTracker( - freeSlotInfoTrackerWithoutBlockedSlots, - slotInfoLookup, + return new DefaultFreeSlotTracker( + freeSlotTrackerWithoutBlockedSlots, + physicalSlotLookup, freeSlotInfoLookup, taskExecutorUtilizationLookup); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultLocationPreferenceSlotSelectionStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultLocationPreferenceSlotSelectionStrategy.java index 62bb31518e9..511cdbbcf44 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultLocationPreferenceSlotSelectionStrategy.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultLocationPreferenceSlotSelectionStrategy.java @@ -34,10 +34,9 @@ class DefaultLocationPreferenceSlotSelectionStrategy @Nonnull @Override protected Optional<SlotInfoAndLocality> selectWithoutLocationPreference( - @Nonnull FreeSlotInfoTracker freeSlotInfoTracker, - @Nonnull ResourceProfile resourceProfile) { - for (AllocationID allocationId : freeSlotInfoTracker.getAvailableSlots()) { - SlotInfo candidate = freeSlotInfoTracker.getSlotInfo(allocationId); + @Nonnull FreeSlotTracker freeSlotTracker, @Nonnull ResourceProfile resourceProfile) { + for (AllocationID allocationId : freeSlotTracker.getAvailableSlots()) { + SlotInfo candidate = freeSlotTracker.getSlotInfo(allocationId); if (candidate.getResourceProfile().isMatching(resourceProfile)) { return Optional.of(SlotInfoAndLocality.of(candidate, Locality.UNCONSTRAINED)); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/EvenlySpreadOutLocationPreferenceSlotSelectionStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/EvenlySpreadOutLocationPreferenceSlotSelectionStrategy.java index be8bd1cc1c7..df834bc8953 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/EvenlySpreadOutLocationPreferenceSlotSelectionStrategy.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/EvenlySpreadOutLocationPreferenceSlotSelectionStrategy.java @@ -33,16 +33,12 @@ class EvenlySpreadOutLocationPreferenceSlotSelectionStrategy @Nonnull @Override protected Optional<SlotInfoAndLocality> selectWithoutLocationPreference( - @Nonnull FreeSlotInfoTracker freeSlotInfoTracker, - @Nonnull ResourceProfile resourceProfile) { - return freeSlotInfoTracker.getAvailableSlots().stream() - .map(freeSlotInfoTracker::getSlotInfo) + @Nonnull FreeSlotTracker freeSlotTracker, @Nonnull ResourceProfile resourceProfile) { + return freeSlotTracker.getAvailableSlots().stream() + .map(freeSlotTracker::getSlotInfo) .filter(slotInfo -> slotInfo.getResourceProfile().isMatching(resourceProfile)) // calculate utilization first to avoid duplicated calculation in min() - .map( - slot -> - new Tuple2<>( - slot, freeSlotInfoTracker.getTaskExecutorUtilization(slot))) + .map(slot -> new Tuple2<>(slot, freeSlotTracker.getTaskExecutorUtilization(slot))) .min(Comparator.comparingDouble(tuple -> tuple.f1)) .map( slotInfoWithTaskExecutorUtilization -> diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/FreeSlotInfoTracker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/FreeSlotTracker.java similarity index 82% rename from flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/FreeSlotInfoTracker.java rename to flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/FreeSlotTracker.java index b685311fbd9..5800227116a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/FreeSlotInfoTracker.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/FreeSlotTracker.java @@ -24,8 +24,8 @@ import org.apache.flink.runtime.jobmaster.SlotInfo; import java.util.Collection; import java.util.Set; -/** Track all free slots, support bookkeeping slot for {@link SlotSelectionStrategy}. */ -public interface FreeSlotInfoTracker { +/** Track all free slots. */ +public interface FreeSlotTracker { /** * Get allocation id of all available slots. @@ -52,13 +52,13 @@ public interface FreeSlotInfoTracker { Collection<AllocatedSlotPool.FreeSlotInfo> getFreeSlotsWithIdleSinceInformation(); /** - * Returns a list of {@link SlotInfo} objects about all slots that are currently available in - * the slot pool. + * Returns a list of {@link PhysicalSlot} objects about all slots that are currently available + * in the slot pool. * - * @return a list of {@link SlotInfo} objects about all slots that are currently available in - * the slot pool. + * @return a list of {@link PhysicalSlot} objects about all slots that are currently available + * in the slot pool. */ - Collection<SlotInfo> getFreeSlotsInformation(); + Collection<PhysicalSlot> getFreeSlotsInformation(); /** * Get task executor utilization of this slot. @@ -81,6 +81,5 @@ public interface FreeSlotInfoTracker { * @param blockedSlots slots that should not be used * @return the new free slot tracker */ - FreeSlotInfoTracker createNewFreeSlotInfoTrackerWithoutBlockedSlots( - Set<AllocationID> blockedSlots); + FreeSlotTracker createNewFreeSlotTrackerWithoutBlockedSlots(Set<AllocationID> blockedSlots); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/LocationPreferenceSlotSelectionStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/LocationPreferenceSlotSelectionStrategy.java index f811223d270..6cc51590755 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/LocationPreferenceSlotSelectionStrategy.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/LocationPreferenceSlotSelectionStrategy.java @@ -43,11 +43,11 @@ public abstract class LocationPreferenceSlotSelectionStrategy implements SlotSel @Override public Optional<SlotInfoAndLocality> selectBestSlotForProfile( - @Nonnull FreeSlotInfoTracker freeSlotInfoTracker, @Nonnull SlotProfile slotProfile) { + @Nonnull FreeSlotTracker freeSlotTracker, @Nonnull SlotProfile slotProfile) { Collection<TaskManagerLocation> locationPreferences = slotProfile.getPreferredLocations(); - if (freeSlotInfoTracker.getAvailableSlots().isEmpty()) { + if (freeSlotTracker.getAvailableSlots().isEmpty()) { return Optional.empty(); } @@ -55,14 +55,14 @@ public abstract class LocationPreferenceSlotSelectionStrategy implements SlotSel // if we have no location preferences, we can only filter by the additional requirements. return locationPreferences.isEmpty() - ? selectWithoutLocationPreference(freeSlotInfoTracker, resourceProfile) + ? selectWithoutLocationPreference(freeSlotTracker, resourceProfile) : selectWithLocationPreference( - freeSlotInfoTracker, locationPreferences, resourceProfile); + freeSlotTracker, locationPreferences, resourceProfile); } @Nonnull private Optional<SlotInfoAndLocality> selectWithLocationPreference( - @Nonnull FreeSlotInfoTracker freeSlotInfoTracker, + @Nonnull FreeSlotTracker freeSlotTracker, @Nonnull Collection<TaskManagerLocation> locationPreferences, @Nonnull ResourceProfile resourceProfile) { @@ -82,8 +82,8 @@ public abstract class LocationPreferenceSlotSelectionStrategy implements SlotSel Locality bestCandidateLocality = Locality.UNKNOWN; double bestCandidateScore = Double.NEGATIVE_INFINITY; - for (AllocationID allocationId : freeSlotInfoTracker.getAvailableSlots()) { - SlotInfo candidate = freeSlotInfoTracker.getSlotInfo(allocationId); + for (AllocationID allocationId : freeSlotTracker.getAvailableSlots()) { + SlotInfo candidate = freeSlotTracker.getSlotInfo(allocationId); if (candidate.getResourceProfile().isMatching(resourceProfile)) { @@ -101,7 +101,7 @@ public abstract class LocationPreferenceSlotSelectionStrategy implements SlotSel calculateCandidateScore( localWeigh, hostLocalWeigh, - () -> freeSlotInfoTracker.getTaskExecutorUtilization(candidate)); + () -> freeSlotTracker.getTaskExecutorUtilization(candidate)); if (candidateScore > bestCandidateScore) { bestCandidateScore = candidateScore; bestCandidate = candidate; @@ -121,8 +121,7 @@ public abstract class LocationPreferenceSlotSelectionStrategy implements SlotSel @Nonnull protected abstract Optional<SlotInfoAndLocality> selectWithoutLocationPreference( - @Nonnull FreeSlotInfoTracker freeSlotInfoTracker, - @Nonnull ResourceProfile resourceProfile); + @Nonnull FreeSlotTracker freeSlotTracker, @Nonnull ResourceProfile resourceProfile); protected abstract double calculateCandidateScore( int localWeigh, int hostLocalWeigh, Supplier<Double> taskExecutorUtilizationSupplier); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderImpl.java index b6c80e2138c..c2d9ce19db7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderImpl.java @@ -110,18 +110,18 @@ public class PhysicalSlotProviderImpl implements PhysicalSlotProvider { private Map<SlotRequestId, Optional<PhysicalSlot>> tryAllocateFromAvailable( Collection<PhysicalSlotRequest> slotRequests) { - FreeSlotInfoTracker freeSlotInfoTracker = slotPool.getFreeSlotInfoTracker(); + FreeSlotTracker freeSlotTracker = slotPool.getFreeSlotTracker(); Map<SlotRequestId, Optional<PhysicalSlot>> allocateResult = new HashMap<>(); for (PhysicalSlotRequest request : slotRequests) { Optional<SlotSelectionStrategy.SlotInfoAndLocality> slot = slotSelectionStrategy.selectBestSlotForProfile( - freeSlotInfoTracker, request.getSlotProfile()); + freeSlotTracker, request.getSlotProfile()); allocateResult.put( request.getSlotRequestId(), slot.flatMap( slotInfoAndLocality -> { - freeSlotInfoTracker.reserveSlot( + freeSlotTracker.reserveSlot( slotInfoAndLocality.getSlotInfo().getAllocationId()); return slotPool.allocateAvailableSlot( request.getSlotRequestId(), diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequestBulkCheckerImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequestBulkCheckerImpl.java index 28a2a906767..5cac029fad6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequestBulkCheckerImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequestBulkCheckerImpl.java @@ -200,7 +200,7 @@ public class PhysicalSlotRequestBulkCheckerImpl implements PhysicalSlotRequestBu private static Set<SlotInfo> getAllSlotInfos(SlotPool slotPool) { return Stream.concat( - slotPool.getFreeSlotInfoTracker().getFreeSlotsInformation().stream(), + slotPool.getFreeSlotTracker().getFreeSlotsInformation().stream(), slotPool.getAllocatedSlotsInformation().stream()) .collect(Collectors.toSet()); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PreviousAllocationSlotSelectionStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PreviousAllocationSlotSelectionStrategy.java index 3d8f48e6eb8..41c606dac17 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PreviousAllocationSlotSelectionStrategy.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PreviousAllocationSlotSelectionStrategy.java @@ -48,7 +48,7 @@ public class PreviousAllocationSlotSelectionStrategy implements SlotSelectionStr @Override public Optional<SlotInfoAndLocality> selectBestSlotForProfile( - @Nonnull FreeSlotInfoTracker freeSlotInfoTracker, @Nonnull SlotProfile slotProfile) { + @Nonnull FreeSlotTracker freeSlotTracker, @Nonnull SlotProfile slotProfile) { LOG.debug("Select best slot for profile {}.", slotProfile); @@ -56,19 +56,18 @@ public class PreviousAllocationSlotSelectionStrategy implements SlotSelectionStr // First, if there was a prior allocation try to schedule to the same/old slot if (!priorAllocations.isEmpty()) { - for (AllocationID availableSlot : freeSlotInfoTracker.getAvailableSlots()) { + for (AllocationID availableSlot : freeSlotTracker.getAvailableSlots()) { if (priorAllocations.contains(availableSlot)) { return Optional.of( SlotInfoAndLocality.of( - freeSlotInfoTracker.getSlotInfo(availableSlot), - Locality.LOCAL)); + freeSlotTracker.getSlotInfo(availableSlot), Locality.LOCAL)); } } } // Second, select based on location preference, excluding blacklisted allocations return fallbackSlotSelectionStrategy.selectBestSlotForProfile( - freeSlotInfoTracker.createNewFreeSlotInfoTrackerWithoutBlockedSlots( + freeSlotTracker.createNewFreeSlotTrackerWithoutBlockedSlots( slotProfile.getReservedAllocations()), slotProfile); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java index 1b5cba7fcf8..3073ab0c0e7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java @@ -115,7 +115,7 @@ public interface SlotPool extends AllocatedSlotActions, AutoCloseable { * * @return all free slot tracker */ - FreeSlotInfoTracker getFreeSlotInfoTracker(); + FreeSlotTracker getFreeSlotTracker(); /** * Returns a list of {@link SlotInfo} objects about all slots that are currently allocated in diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSelectionStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSelectionStrategy.java index 968ec50791e..136e5f2a931 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSelectionStrategy.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSelectionStrategy.java @@ -34,13 +34,13 @@ public interface SlotSelectionStrategy { * of available slots and considering the given {@link SlotProfile} that describes the * requirements. * - * @param freeSlotInfoTracker a list of the available slots together with their remaining - * resources to select from. + * @param freeSlotTracker a list of the available slots together with their remaining resources + * to select from. * @param slotProfile a slot profile, describing requirements for the slot selection. * @return the selected slot info with the corresponding locality hint. */ Optional<SlotInfoAndLocality> selectBestSlotForProfile( - @Nonnull FreeSlotInfoTracker freeSlotInfoTracker, @Nonnull SlotProfile slotProfile); + @Nonnull FreeSlotTracker freeSlotTracker, @Nonnull SlotProfile slotProfile); /** This class is a value type that combines a {@link SlotInfo} with a {@link Locality} hint. */ final class SlotInfoAndLocality { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java index fb6344b31df..2fd7b694b73 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java @@ -1073,7 +1073,7 @@ public class AdaptiveScheduler @Override public boolean hasDesiredResources() { final Collection<? extends SlotInfo> freeSlots = - declarativeSlotPool.getFreeSlotInfoTracker().getFreeSlotsInformation(); + declarativeSlotPool.getFreeSlotTracker().getFreeSlotsInformation(); return hasDesiredResources(desiredResources, freeSlots); } @@ -1111,7 +1111,7 @@ public class AdaptiveScheduler return slotAllocator .determineParallelismAndCalculateAssignment( jobInformation, - declarativeSlotPool.getFreeSlotInfoTracker().getFreeSlotsInformation(), + declarativeSlotPool.getFreeSlotTracker().getFreeSlotsInformation(), JobAllocationsInformation.fromGraph(previousExecutionGraph)) .orElseThrow( () -> diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotContext.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotContext.java deleted file mode 100644 index 9c477028a4e..00000000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotContext.java +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.instance; - -import org.apache.flink.runtime.clusterframework.types.AllocationID; -import org.apache.flink.runtime.clusterframework.types.ResourceProfile; -import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; -import org.apache.flink.runtime.jobmaster.SlotContext; -import org.apache.flink.runtime.taskmanager.TaskManagerLocation; -import org.apache.flink.util.Preconditions; - -/** Simple implementation of the {@link SlotContext} interface for the legacy code. */ -public class SimpleSlotContext implements SlotContext { - - private final AllocationID allocationId; - - private final TaskManagerLocation taskManagerLocation; - - private final int physicalSlotNumber; - - private final TaskManagerGateway taskManagerGateway; - - private final ResourceProfile resourceProfile; - - public SimpleSlotContext( - AllocationID allocationId, - TaskManagerLocation taskManagerLocation, - int physicalSlotNumber, - TaskManagerGateway taskManagerGateway) { - this( - allocationId, - taskManagerLocation, - physicalSlotNumber, - taskManagerGateway, - ResourceProfile.ANY); - } - - public SimpleSlotContext( - AllocationID allocationId, - TaskManagerLocation taskManagerLocation, - int physicalSlotNumber, - TaskManagerGateway taskManagerGateway, - ResourceProfile resourceProfile) { - this.allocationId = Preconditions.checkNotNull(allocationId); - this.taskManagerLocation = Preconditions.checkNotNull(taskManagerLocation); - this.physicalSlotNumber = physicalSlotNumber; - this.taskManagerGateway = Preconditions.checkNotNull(taskManagerGateway); - this.resourceProfile = resourceProfile; - } - - @Override - public AllocationID getAllocationId() { - return allocationId; - } - - @Override - public TaskManagerLocation getTaskManagerLocation() { - return taskManagerLocation; - } - - @Override - public int getPhysicalSlotNumber() { - return physicalSlotNumber; - } - - @Override - public TaskManagerGateway getTaskManagerGateway() { - return taskManagerGateway; - } - - @Override - public ResourceProfile getResourceProfile() { - return resourceProfile; - } - - @Override - public boolean willBeOccupiedIndefinitely() { - return true; - } -} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java index 4e724fb9cb0..7bfc1cbf54a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java @@ -66,7 +66,6 @@ import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.heartbeat.HeartbeatServicesImpl; import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices; import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; -import org.apache.flink.runtime.instance.SimpleSlotContext; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; import org.apache.flink.runtime.io.network.partition.TestingJobMasterPartitionTracker; @@ -83,8 +82,8 @@ import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException; import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; import org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolFactory; -import org.apache.flink.runtime.jobmaster.slotpool.FreeSlotInfoTracker; -import org.apache.flink.runtime.jobmaster.slotpool.FreeSlotInfoTrackerTestUtils; +import org.apache.flink.runtime.jobmaster.slotpool.FreeSlotTracker; +import org.apache.flink.runtime.jobmaster.slotpool.FreeSlotTrackerTestUtils; import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot; import org.apache.flink.runtime.jobmaster.slotpool.SlotPool; import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolService; @@ -103,6 +102,7 @@ import org.apache.flink.runtime.rpc.exceptions.RecipientUnreachableException; import org.apache.flink.runtime.scheduler.DefaultSchedulerFactory; import org.apache.flink.runtime.scheduler.ExecutionGraphInfo; import org.apache.flink.runtime.scheduler.SchedulerTestingUtils; +import org.apache.flink.runtime.scheduler.TestingPhysicalSlot; import org.apache.flink.runtime.scheduler.TestingSchedulerNG; import org.apache.flink.runtime.scheduler.TestingSchedulerNGFactory; import org.apache.flink.runtime.shuffle.DefaultPartitionWithMetrics; @@ -498,7 +498,7 @@ class JobMasterTest { private final OneShotLatch hasReceivedSlotOffers; - private final Map<ResourceID, Collection<SlotInfo>> registeredSlots; + private final Map<ResourceID, Collection<PhysicalSlot>> registeredSlots; private TestingSlotPool(JobID jobId, OneShotLatch hasReceivedSlotOffers) { this.jobId = jobId; @@ -554,7 +554,7 @@ class JobMasterTest { TaskManagerGateway taskManagerGateway, Collection<SlotOffer> offers) { hasReceivedSlotOffers.trigger(); - final Collection<SlotInfo> slotInfos = + final Collection<PhysicalSlot> slotInfos = Optional.ofNullable(registeredSlots.get(taskManagerLocation.getResourceID())) .orElseThrow( () -> new FlinkRuntimeException("TaskManager not registered.")); @@ -563,11 +563,12 @@ class JobMasterTest { for (SlotOffer offer : offers) { slotInfos.add( - new SimpleSlotContext( - offer.getAllocationId(), - taskManagerLocation, - slotIndex, - taskManagerGateway)); + TestingPhysicalSlot.builder() + .withAllocationID(offer.getAllocationId()) + .withTaskManagerLocation(taskManagerLocation) + .withTaskManagerGateway(taskManagerGateway) + .withPhysicalSlotNumber(slotIndex) + .build()); slotIndex++; } @@ -582,12 +583,12 @@ class JobMasterTest { } @Override - public FreeSlotInfoTracker getFreeSlotInfoTracker() { - Map<AllocationID, SlotInfo> freeSlots = + public FreeSlotTracker getFreeSlotTracker() { + Map<AllocationID, PhysicalSlot> freeSlots = registeredSlots.values().stream() .flatMap(Collection::stream) .collect(Collectors.toMap(SlotInfo::getAllocationId, s -> s)); - return FreeSlotInfoTrackerTestUtils.createDefaultFreeSlotInfoTracker(freeSlots); + return FreeSlotTrackerTestUtils.createDefaultFreeSlotTracker(freeSlots); } @Override @@ -630,7 +631,7 @@ class JobMasterTest { @Override public AllocatedSlotReport createAllocatedSlotReport(ResourceID taskManagerId) { - final Collection<SlotInfo> slotInfos = + final Collection<PhysicalSlot> slotInfos = registeredSlots.getOrDefault(taskManagerId, Collections.emptyList()); final List<AllocatedSlotInfo> allocatedSlotInfos = diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolServiceTest.java index 7542f045f4d..bea131c6c10 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolServiceTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolServiceTest.java @@ -23,13 +23,13 @@ import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; -import org.apache.flink.runtime.instance.SimpleSlotContext; import org.apache.flink.runtime.jobmaster.AllocatedSlotReport; import org.apache.flink.runtime.jobmaster.JobMasterId; import org.apache.flink.runtime.jobmaster.RpcTaskManagerGateway; import org.apache.flink.runtime.jobmaster.SlotInfo; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway; +import org.apache.flink.runtime.scheduler.TestingPhysicalSlot; import org.apache.flink.runtime.slots.ResourceRequirement; import org.apache.flink.runtime.slots.ResourceRequirements; import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder; @@ -209,9 +209,11 @@ class DeclarativeSlotPoolServiceTest { void testCreateAllocatedSlotReport() throws Exception { final LocalTaskManagerLocation taskManagerLocation1 = new LocalTaskManagerLocation(); final LocalTaskManagerLocation taskManagerLocation2 = new LocalTaskManagerLocation(); - final SimpleSlotContext simpleSlotContext2 = createSimpleSlotContext(taskManagerLocation2); + final TestingPhysicalSlot testingPhysicalSlot2 = + createTestingPhysicalSlot(taskManagerLocation2); final Collection<SlotInfo> slotInfos = - Arrays.asList(createSimpleSlotContext(taskManagerLocation1), simpleSlotContext2); + Arrays.asList( + createTestingPhysicalSlot(taskManagerLocation1), testingPhysicalSlot2); try (DeclarativeSlotPoolService declarativeSlotPoolService = createDeclarativeSlotPoolService( new TestingDeclarativeSlotPoolFactory( @@ -226,9 +228,10 @@ class DeclarativeSlotPoolServiceTest { .allMatch( context -> context.getAllocationId() - .equals(simpleSlotContext2.getAllocationId()) + .equals(testingPhysicalSlot2.getAllocationId()) && context.getSlotIndex() - == simpleSlotContext2.getPhysicalSlotNumber()); + == testingPhysicalSlot2 + .getPhysicalSlotNumber()); } } @@ -337,7 +340,7 @@ class DeclarativeSlotPoolServiceTest { slotPoolService.releaseFreeSlotsOnTaskManager( taskManagerLocation.getResourceID(), new FlinkException("Test cause")); - assertThat(slotPool.getFreeSlotInfoTracker().getAvailableSlots()).isEmpty(); + assertThat(slotPool.getFreeSlotTracker().getAvailableSlots()).isEmpty(); assertThat( Iterables.getOnlyElement(slotPool.getAllSlotsInformation()) .getAllocationId()) @@ -369,14 +372,17 @@ class DeclarativeSlotPoolServiceTest { } @Nonnull - private SimpleSlotContext createSimpleSlotContext( + private TestingPhysicalSlot createTestingPhysicalSlot( LocalTaskManagerLocation taskManagerLocation1) { - return new SimpleSlotContext( - new AllocationID(), - taskManagerLocation1, - 0, - new RpcTaskManagerGateway( - new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway(), - jobMasterId)); + return TestingPhysicalSlot.builder() + .withAllocationID(new AllocationID()) + .withTaskManagerLocation(taskManagerLocation1) + .withPhysicalSlotNumber(0) + .withTaskManagerGateway( + new RpcTaskManagerGateway( + new TestingTaskExecutorGatewayBuilder() + .createTestingTaskExecutorGateway(), + jobMasterId)) + .build(); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultAllocatedSlotPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultAllocatedSlotPoolTest.java index 0832aa983b0..49ae727f65d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultAllocatedSlotPoolTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultAllocatedSlotPoolTest.java @@ -187,7 +187,7 @@ class DefaultAllocatedSlotPoolTest { assertSlotPoolContainsFreeSlots(slotPool, slots); for (AllocatedSlotPool.FreeSlotInfo freeSlotInfo : - slotPool.getFreeSlotInfoTracker().getFreeSlotsWithIdleSinceInformation()) { + slotPool.getFreeSlotTracker().getFreeSlotsWithIdleSinceInformation()) { final long time; if (freeSlotInfo.getAllocationId().equals(slot.getAllocationId())) { time = releaseTime; @@ -210,7 +210,7 @@ class DefaultAllocatedSlotPoolTest { final AllocatedSlotPool.FreeSlotInfo freeSlotInfo = Iterables.getOnlyElement( - slotPool.getFreeSlotInfoTracker().getFreeSlotsWithIdleSinceInformation()); + slotPool.getFreeSlotTracker().getFreeSlotsWithIdleSinceInformation()); assertThat(freeSlotInfo.getFreeSince()).isEqualTo(0L); } @@ -223,30 +223,29 @@ class DefaultAllocatedSlotPoolTest { slotPool.addSlots(slots, 0); - FreeSlotInfoTracker freeSlotInfoTracker = slotPool.getFreeSlotInfoTracker(); + FreeSlotTracker freeSlotTracker = slotPool.getFreeSlotTracker(); - assertThat(freeSlotInfoTracker.getAvailableSlots()) + assertThat(freeSlotTracker.getAvailableSlots()) .allSatisfy( allocationId -> assertThat( - freeSlotInfoTracker.getTaskExecutorUtilization( - freeSlotInfoTracker.getSlotInfo( - allocationId))) + freeSlotTracker.getTaskExecutorUtilization( + freeSlotTracker.getSlotInfo(allocationId))) .isCloseTo(0, offset(0.1))); int numAllocatedSlots = 0; for (AllocatedSlot slot : slots) { assertThat(slotPool.reserveFreeSlot(slot.getAllocationId())).isEqualTo(slot); - freeSlotInfoTracker.reserveSlot(slot.getAllocationId()); + freeSlotTracker.reserveSlot(slot.getAllocationId()); numAllocatedSlots++; final double utilization = (double) numAllocatedSlots / slots.size(); - assertThat(freeSlotInfoTracker.getAvailableSlots()) + assertThat(freeSlotTracker.getAvailableSlots()) .allSatisfy( allocationId -> assertThat( - freeSlotInfoTracker.getTaskExecutorUtilization( - freeSlotInfoTracker.getSlotInfo( + freeSlotTracker.getTaskExecutorUtilization( + freeSlotTracker.getSlotInfo( allocationId))) .isCloseTo(utilization, offset(0.1))); } @@ -307,7 +306,7 @@ class DefaultAllocatedSlotPoolTest { private void assertSlotPoolContainsFreeSlots( DefaultAllocatedSlotPool slotPool, Collection<AllocatedSlot> allocatedSlots) { final Collection<AllocatedSlotPool.FreeSlotInfo> freeSlotsInformation = - slotPool.getFreeSlotInfoTracker().getFreeSlotsWithIdleSinceInformation(); + slotPool.getFreeSlotTracker().getFreeSlotsWithIdleSinceInformation(); assertThat(freeSlotsInformation).hasSize(allocatedSlots.size()); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolTest.java index af3e9b66695..7ab4a5d5948 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolTest.java @@ -605,8 +605,8 @@ class DefaultDeclarativeSlotPoolTest extends DefaultDeclarativeSlotPoolTestBase createSlotOffersForResourceRequirements( ResourceCounter.withResource(ResourceProfile.ANY, 1))); - final SlotInfo slot = - slotPool.getFreeSlotInfoTracker().getFreeSlotsInformation().iterator().next(); + final PhysicalSlot slot = + slotPool.getFreeSlotTracker().getFreeSlotsInformation().iterator().next(); slotPool.reserveFreeSlot(slot.getAllocationId(), largeResourceProfile); assertThat( @@ -659,8 +659,8 @@ class DefaultDeclarativeSlotPoolTest extends DefaultDeclarativeSlotPoolTestBase slotPool.tryWaitSlotRequestIsDone(); - final SlotInfo largeSlot = - slotPool.getFreeSlotInfoTracker().getFreeSlotsInformation().stream() + final PhysicalSlot largeSlot = + slotPool.getFreeSlotTracker().getFreeSlotsInformation().stream() .filter(slot -> slot.getResourceProfile().equals(largeResourceProfile)) .findFirst() .get(); @@ -722,7 +722,7 @@ class DefaultDeclarativeSlotPoolTest extends DefaultDeclarativeSlotPoolTestBase 0L); final AllocationID allocationId = - slotPool.getFreeSlotInfoTracker().getAvailableSlots().iterator().next(); + slotPool.getFreeSlotTracker().getAvailableSlots().iterator().next(); assertThat(slotPool.getResourceRequirements()).isEmpty(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultFreeSlotInfoTrackerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultFreeSlotTrackerTest.java similarity index 56% rename from flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultFreeSlotInfoTrackerTest.java rename to flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultFreeSlotTrackerTest.java index 58218e318f6..a30a8dc87be 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultFreeSlotInfoTrackerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultFreeSlotTrackerTest.java @@ -37,53 +37,52 @@ import java.util.Map; import static org.assertj.core.api.Assertions.assertThat; -/** Tests for {@link FreeSlotInfoTracker}. */ -class DefaultFreeSlotInfoTrackerTest { +/** Tests for {@link DefaultFreeSlotTracker}. */ +class DefaultFreeSlotTrackerTest { @Test void testReserveSlot() { final ResourceID resourceId = ResourceID.generate(); - final SlotInfo slotInfo1 = createAllocatedSlot(resourceId); - final SlotInfo slotInfo2 = createAllocatedSlot(resourceId); - final Map<AllocationID, SlotInfo> slots = new HashMap<>(); + final PhysicalSlot slot1 = createAllocatedSlot(resourceId); + final PhysicalSlot slot2 = createAllocatedSlot(resourceId); + final Map<AllocationID, PhysicalSlot> slots = new HashMap<>(); - slots.put(slotInfo1.getAllocationId(), slotInfo1); - slots.put(slotInfo2.getAllocationId(), slotInfo2); + slots.put(slot1.getAllocationId(), slot1); + slots.put(slot2.getAllocationId(), slot2); - final FreeSlotInfoTracker freeSlotInfoTracker = - FreeSlotInfoTrackerTestUtils.createDefaultFreeSlotInfoTracker(slots); - for (AllocationID candidate : freeSlotInfoTracker.getAvailableSlots()) { - SlotInfo selectSlot = freeSlotInfoTracker.getSlotInfo(candidate); + final FreeSlotTracker freeSlotTracker = + FreeSlotTrackerTestUtils.createDefaultFreeSlotTracker(slots); + for (AllocationID candidate : freeSlotTracker.getAvailableSlots()) { + SlotInfo selectSlot = freeSlotTracker.getSlotInfo(candidate); assertThat(slots.get(selectSlot.getAllocationId())).isEqualTo(selectSlot); - freeSlotInfoTracker.reserveSlot(selectSlot.getAllocationId()); + freeSlotTracker.reserveSlot(selectSlot.getAllocationId()); break; } - assertThat(freeSlotInfoTracker.getAvailableSlots()) + assertThat(freeSlotTracker.getAvailableSlots()) .hasSize(1) - .containsAnyOf(slotInfo1.getAllocationId(), slotInfo2.getAllocationId()); + .containsAnyOf(slot1.getAllocationId(), slot2.getAllocationId()); } @Test - void testCreatedFreeSlotInfoTrackerWithoutBlockedSlots() { + void testCreatedFreeSlotTrackerWithoutBlockedSlots() { final ResourceID resourceId = ResourceID.generate(); - final SlotInfo slotInfo1 = createAllocatedSlot(resourceId); - final SlotInfo slotInfo2 = createAllocatedSlot(resourceId); - final Map<AllocationID, SlotInfo> slots = new HashMap<>(); + final PhysicalSlot slot1 = createAllocatedSlot(resourceId); + final PhysicalSlot slot2 = createAllocatedSlot(resourceId); + final Map<AllocationID, PhysicalSlot> slots = new HashMap<>(); - slots.put(slotInfo1.getAllocationId(), slotInfo1); - slots.put(slotInfo2.getAllocationId(), slotInfo2); + slots.put(slot1.getAllocationId(), slot1); + slots.put(slot2.getAllocationId(), slot2); - final FreeSlotInfoTracker freeSlotInfoTracker = - FreeSlotInfoTrackerTestUtils.createDefaultFreeSlotInfoTracker(slots); - assertThat(freeSlotInfoTracker.getAvailableSlots()).hasSize(2); + final FreeSlotTracker freeSlotTracker = + FreeSlotTrackerTestUtils.createDefaultFreeSlotTracker(slots); + assertThat(freeSlotTracker.getAvailableSlots()).hasSize(2); - final FreeSlotInfoTracker freeSlotInfoTrackerWithoutBlockedSlots = - freeSlotInfoTracker.createNewFreeSlotInfoTrackerWithoutBlockedSlots( + final FreeSlotTracker freeSlotTrackerWithoutBlockedSlots = + freeSlotTracker.createNewFreeSlotTrackerWithoutBlockedSlots( new HashSet<>( - Arrays.asList( - slotInfo1.getAllocationId(), slotInfo2.getAllocationId()))); - assertThat(freeSlotInfoTrackerWithoutBlockedSlots.getAvailableSlots()).isEmpty(); + Arrays.asList(slot1.getAllocationId(), slot2.getAllocationId()))); + assertThat(freeSlotTrackerWithoutBlockedSlots.getAvailableSlots()).isEmpty(); } private AllocatedSlot createAllocatedSlot(ResourceID owner) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/FreeSlotInfoTrackerTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/FreeSlotTrackerTestUtils.java similarity index 67% rename from flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/FreeSlotInfoTrackerTestUtils.java rename to flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/FreeSlotTrackerTestUtils.java index e13835ba687..d2f9ab6b23f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/FreeSlotInfoTrackerTestUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/FreeSlotTrackerTestUtils.java @@ -19,24 +19,23 @@ package org.apache.flink.runtime.jobmaster.slotpool; import org.apache.flink.runtime.clusterframework.types.AllocationID; -import org.apache.flink.runtime.jobmaster.SlotInfo; import java.util.Map; -/** Utils to create testing {@link FreeSlotInfoTracker}. */ -public class FreeSlotInfoTrackerTestUtils { +/** Utils to create testing {@link FreeSlotTracker}. */ +public class FreeSlotTrackerTestUtils { /** - * Create default free slot info tracker for provided slots. + * Create default free slot tracker for provided slots. * * @param freeSlots slots to track - * @return default free slot info tracker + * @return default free slot tracker */ - public static DefaultFreeSlotInfoTracker createDefaultFreeSlotInfoTracker( - Map<AllocationID, SlotInfo> freeSlots) { - return new DefaultFreeSlotInfoTracker( + public static DefaultFreeSlotTracker createDefaultFreeSlotTracker( + Map<AllocationID, PhysicalSlot> freeSlots) { + return new DefaultFreeSlotTracker( freeSlots.keySet(), freeSlots::get, - id -> new TestingFreeSlotInfoTracker.TestingFreeSlotInfo(freeSlots.get(id)), + id -> new TestingFreeSlotTracker.TestingFreeSlotInfo(freeSlots.get(id)), ignored -> 0d); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/LocationPreferenceSlotSelectionStrategyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/LocationPreferenceSlotSelectionStrategyTest.java index cd79adec7a9..cab14c55f36 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/LocationPreferenceSlotSelectionStrategyTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/LocationPreferenceSlotSelectionStrategyTest.java @@ -130,21 +130,21 @@ class LocationPreferenceSlotSelectionStrategyTest extends SlotSelectionStrategyT biggerResourceProfile, Collections.singletonList(tml2)); Optional<SlotSelectionStrategy.SlotInfoAndLocality> match = runMatching(slotProfile); - assertMatchingSlotEqualsToSlotInfo(match, slotInfo2); + assertMatchingSlotEqualsToSlotInfo(match, slot2); slotProfile = SlotProfileTestingUtils.preferredLocality( resourceProfile, Arrays.asList(tmlX, tml4)); match = runMatching(slotProfile); - assertMatchingSlotEqualsToSlotInfo(match, slotInfo4); + assertMatchingSlotEqualsToSlotInfo(match, slot4); slotProfile = SlotProfileTestingUtils.preferredLocality( resourceProfile, Arrays.asList(tml3, tml1, tml3, tmlX)); match = runMatching(slotProfile); - assertMatchingSlotEqualsToSlotInfo(match, slotInfo3); + assertMatchingSlotEqualsToSlotInfo(match, slot3); } @Test @@ -164,7 +164,7 @@ class LocationPreferenceSlotSelectionStrategyTest extends SlotSelectionStrategyT Optional<SlotSelectionStrategy.SlotInfoAndLocality> match = runMatching(slotProfile); // available previous allocation should override blacklisting - assertMatchingSlotEqualsToSlotInfo(match, slotInfo3); + assertMatchingSlotEqualsToSlotInfo(match, slot3); } protected static void assertMatchingSlotEqualsToSlotInfo( @@ -178,7 +178,7 @@ class LocationPreferenceSlotSelectionStrategyTest extends SlotSelectionStrategyT protected static void assertMatchingSlotLocalityAndInCandidates( Optional<SlotSelectionStrategy.SlotInfoAndLocality> matchingSlot, Locality locality, - FreeSlotInfoTracker candidates) { + FreeSlotTracker candidates) { assertThat(matchingSlot) .hasValueSatisfying( slotInfoAndLocality -> { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PreviousAllocationSlotSelectionStrategyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PreviousAllocationSlotSelectionStrategyTest.java index 034ce7abdd9..7a6ff8c008c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PreviousAllocationSlotSelectionStrategyTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PreviousAllocationSlotSelectionStrategyTest.java @@ -52,7 +52,7 @@ class PreviousAllocationSlotSelectionStrategyTest Collections.singleton(aid3), Collections.emptySet()); Optional<SlotSelectionStrategy.SlotInfoAndLocality> match = runMatching(slotProfile); - assertMatchingSlotEqualsToSlotInfo(match, slotInfo3); + assertMatchingSlotEqualsToSlotInfo(match, slot3); slotProfile = SlotProfile.priorAllocation( @@ -62,7 +62,7 @@ class PreviousAllocationSlotSelectionStrategyTest new HashSet<>(Arrays.asList(aidX, aid2)), Collections.emptySet()); match = runMatching(slotProfile); - assertMatchingSlotEqualsToSlotInfo(match, slotInfo2); + assertMatchingSlotEqualsToSlotInfo(match, slot2); } @Test @@ -76,7 +76,7 @@ class PreviousAllocationSlotSelectionStrategyTest Collections.singleton(aidX), Collections.emptySet()); Optional<SlotSelectionStrategy.SlotInfoAndLocality> match = runMatching(slotProfile); - assertMatchingSlotEqualsToSlotInfo(match, slotInfo4); + assertMatchingSlotEqualsToSlotInfo(match, slot4); } @Test @@ -115,6 +115,6 @@ class PreviousAllocationSlotSelectionStrategyTest Optional<SlotSelectionStrategy.SlotInfoAndLocality> match = runMatching(slotProfile); // we expect that the candidate that is not blacklisted is returned - assertMatchingSlotEqualsToSlotInfo(match, slotInfo1); + assertMatchingSlotEqualsToSlotInfo(match, slot1); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlotTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlotTest.java index cdced79e907..6122fb5d224 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlotTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlotTest.java @@ -21,13 +21,13 @@ package org.apache.flink.runtime.jobmaster.slotpool; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway; -import org.apache.flink.runtime.instance.SimpleSlotContext; import org.apache.flink.runtime.jobmanager.scheduler.Locality; import org.apache.flink.runtime.jobmanager.slots.DummySlotOwner; import org.apache.flink.runtime.jobmaster.LogicalSlot; import org.apache.flink.runtime.jobmaster.SlotContext; import org.apache.flink.runtime.jobmaster.SlotOwner; import org.apache.flink.runtime.jobmaster.SlotRequestId; +import org.apache.flink.runtime.scheduler.TestingPhysicalSlot; import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkException; @@ -73,12 +73,13 @@ class SingleLogicalSlotTest { } private static SlotContext createSlotContext() { - return new SimpleSlotContext( - new AllocationID(), - new LocalTaskManagerLocation(), - 0, - new SimpleAckingTaskManagerGateway(), - ResourceProfile.ANY); + return TestingPhysicalSlot.builder() + .withAllocationID(new AllocationID()) + .withTaskManagerLocation(new LocalTaskManagerLocation()) + .withPhysicalSlotNumber(0) + .withTaskManagerGateway(new SimpleAckingTaskManagerGateway()) + .withResourceProfile(ResourceProfile.ANY) + .build(); } @Test diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSelectionStrategyTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSelectionStrategyTestBase.java index 99a0253bb89..3bfbf18ca95 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSelectionStrategyTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSelectionStrategyTestBase.java @@ -23,9 +23,8 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.clusterframework.types.SlotProfile; import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway; -import org.apache.flink.runtime.instance.SimpleSlotContext; import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; -import org.apache.flink.runtime.jobmaster.SlotInfo; +import org.apache.flink.runtime.scheduler.TestingPhysicalSlot; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import java.net.InetAddress; @@ -58,27 +57,51 @@ abstract class SlotSelectionStrategyTestBase { protected final TaskManagerGateway taskManagerGateway = new SimpleAckingTaskManagerGateway(); - protected final SlotInfo slotInfo1 = - new SimpleSlotContext(aid1, tml1, 1, taskManagerGateway, resourceProfile); - protected final SlotInfo slotInfo2 = - new SimpleSlotContext(aid2, tml2, 2, taskManagerGateway, biggerResourceProfile); - protected final SlotInfo slotInfo3 = - new SimpleSlotContext(aid3, tml3, 3, taskManagerGateway, resourceProfile); - protected final SlotInfo slotInfo4 = - new SimpleSlotContext(aid4, tml4, 4, taskManagerGateway, resourceProfile); + protected final PhysicalSlot slot1 = + TestingPhysicalSlot.builder() + .withAllocationID(aid1) + .withTaskManagerLocation(tml1) + .withPhysicalSlotNumber(1) + .withTaskManagerGateway(taskManagerGateway) + .withResourceProfile(resourceProfile) + .build(); + protected final PhysicalSlot slot2 = + TestingPhysicalSlot.builder() + .withAllocationID(aid2) + .withTaskManagerLocation(tml2) + .withPhysicalSlotNumber(2) + .withTaskManagerGateway(taskManagerGateway) + .withResourceProfile(biggerResourceProfile) + .build(); + protected final PhysicalSlot slot3 = + TestingPhysicalSlot.builder() + .withAllocationID(aid3) + .withTaskManagerLocation(tml3) + .withPhysicalSlotNumber(3) + .withTaskManagerGateway(taskManagerGateway) + .withResourceProfile(resourceProfile) + .build(); + protected final PhysicalSlot slot4 = + TestingPhysicalSlot.builder() + .withAllocationID(aid4) + .withTaskManagerLocation(tml4) + .withPhysicalSlotNumber(4) + .withTaskManagerGateway(taskManagerGateway) + .withResourceProfile(resourceProfile) + .build(); - protected final FreeSlotInfoTracker candidates = createCandidates(); + protected final FreeSlotTracker candidates = createCandidates(); protected SlotSelectionStrategy selectionStrategy; - private FreeSlotInfoTracker createCandidates() { - Map<AllocationID, SlotInfo> candidates = new HashMap<>(4); + private FreeSlotTracker createCandidates() { + Map<AllocationID, PhysicalSlot> candidates = new HashMap<>(4); - candidates.put(slotInfo1.getAllocationId(), slotInfo1); - candidates.put(slotInfo2.getAllocationId(), slotInfo2); - candidates.put(slotInfo3.getAllocationId(), slotInfo3); - candidates.put(slotInfo4.getAllocationId(), slotInfo4); - return FreeSlotInfoTrackerTestUtils.createDefaultFreeSlotInfoTracker(candidates); + candidates.put(slot1.getAllocationId(), slot1); + candidates.put(slot2.getAllocationId(), slot2); + candidates.put(slot3.getAllocationId(), slot3); + candidates.put(slot4.getAllocationId(), slot4); + return FreeSlotTrackerTestUtils.createDefaultFreeSlotTracker(candidates); } protected Optional<SlotSelectionStrategy.SlotInfoAndLocality> runMatching( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPool.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPool.java index a713a4b2b04..32a7f518f3c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPool.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPool.java @@ -64,9 +64,9 @@ final class TestingDeclarativeSlotPool implements DeclarativeSlotPool { Collection<SlotOffer>> registerSlotsFunction; - private final Supplier<Collection<SlotInfo>> getFreeSlotsInformationSupplier; + private final Supplier<Collection<PhysicalSlot>> getFreeSlotsInformationSupplier; - private final Supplier<FreeSlotInfoTracker> getFreeSlotInfoTrackerSupplier; + private final Supplier<FreeSlotTracker> getFreeSlotTrackerSupplier; private final Supplier<Collection<? extends SlotInfo>> getAllSlotsInformationSupplier; @@ -105,8 +105,8 @@ final class TestingDeclarativeSlotPool implements DeclarativeSlotPool { Long, Collection<SlotOffer>> registerSlotsFunction, - Supplier<Collection<SlotInfo>> getFreeSlotsInformationSupplier, - Supplier<FreeSlotInfoTracker> getFreeSlotInfoTrackerSupplier, + Supplier<Collection<PhysicalSlot>> getFreeSlotsInformationSupplier, + Supplier<FreeSlotTracker> getFreeSlotTrackerSupplier, Supplier<Collection<? extends SlotInfo>> getAllSlotsInformationSupplier, BiFunction<ResourceID, Exception, ResourceCounter> releaseSlotsFunction, BiFunction<AllocationID, Exception, ResourceCounter> releaseSlotFunction, @@ -122,7 +122,7 @@ final class TestingDeclarativeSlotPool implements DeclarativeSlotPool { this.offerSlotsFunction = offerSlotsFunction; this.registerSlotsFunction = registerSlotsFunction; this.getFreeSlotsInformationSupplier = getFreeSlotsInformationSupplier; - this.getFreeSlotInfoTrackerSupplier = getFreeSlotInfoTrackerSupplier; + this.getFreeSlotTrackerSupplier = getFreeSlotTrackerSupplier; this.getAllSlotsInformationSupplier = getAllSlotsInformationSupplier; this.releaseSlotsFunction = releaseSlotsFunction; this.releaseSlotFunction = releaseSlotFunction; @@ -175,8 +175,8 @@ final class TestingDeclarativeSlotPool implements DeclarativeSlotPool { } @Override - public FreeSlotInfoTracker getFreeSlotInfoTracker() { - return getFreeSlotInfoTrackerSupplier.get(); + public FreeSlotTracker getFreeSlotTracker() { + return getFreeSlotTrackerSupplier.get(); } @Override diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPoolBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPoolBuilder.java index a2c345e1852..07262a17580 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPoolBuilder.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPoolBuilder.java @@ -54,11 +54,12 @@ public class TestingDeclarativeSlotPoolBuilder { Collection<SlotOffer>> offerSlotsFunction = (ignoredA, ignoredB, ignoredC, ignoredD) -> Collections.emptyList(); - private Supplier<Collection<SlotInfo>> getFreeSlotsInformationSupplier = Collections::emptyList; + private Supplier<Collection<PhysicalSlot>> getFreeSlotsInformationSupplier = + Collections::emptyList; private Supplier<Collection<? extends SlotInfo>> getAllSlotsInformationSupplier = Collections::emptyList; - private Supplier<FreeSlotInfoTracker> getFreeSlotInfoTrackerSupplier = - () -> TestingFreeSlotInfoTracker.newBuilder().build(); + private Supplier<FreeSlotTracker> getFreeSlotTrackerSupplier = + () -> TestingFreeSlotTracker.newBuilder().build(); private BiFunction<ResourceID, Exception, ResourceCounter> releaseSlotsFunction = (ignoredA, ignoredB) -> ResourceCounter.empty(); private BiFunction<AllocationID, Exception, ResourceCounter> releaseSlotFunction = @@ -129,14 +130,14 @@ public class TestingDeclarativeSlotPoolBuilder { } public TestingDeclarativeSlotPoolBuilder setGetFreeSlotsInformationSupplier( - Supplier<Collection<SlotInfo>> getFreeSlotsInformationSupplier) { + Supplier<Collection<PhysicalSlot>> getFreeSlotsInformationSupplier) { this.getFreeSlotsInformationSupplier = getFreeSlotsInformationSupplier; return this; } - public TestingDeclarativeSlotPoolBuilder setGetFreeSlotInfoTrackerSupplier( - Supplier<FreeSlotInfoTracker> getFreeSlotInfoTrackerSupplier) { - this.getFreeSlotInfoTrackerSupplier = getFreeSlotInfoTrackerSupplier; + public TestingDeclarativeSlotPoolBuilder setGetFreeSlotTrackerSupplier( + Supplier<FreeSlotTracker> getFreeSlotTrackerSupplier) { + this.getFreeSlotTrackerSupplier = getFreeSlotTrackerSupplier; return this; } @@ -197,7 +198,7 @@ public class TestingDeclarativeSlotPoolBuilder { offerSlotsFunction, registerSlotsFunction, getFreeSlotsInformationSupplier, - getFreeSlotInfoTrackerSupplier, + getFreeSlotTrackerSupplier, getAllSlotsInformationSupplier, releaseSlotsFunction, releaseSlotFunction, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingFreeSlotInfoTracker.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingFreeSlotTracker.java similarity index 76% rename from flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingFreeSlotInfoTracker.java rename to flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingFreeSlotTracker.java index d59ab1608b2..ff9b8803bbb 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingFreeSlotInfoTracker.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingFreeSlotTracker.java @@ -28,28 +28,28 @@ import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; -/** Testing implements of {@link FreeSlotInfoTracker}. */ -public class TestingFreeSlotInfoTracker implements FreeSlotInfoTracker { +/** Testing implements of {@link FreeSlotTracker}. */ +public class TestingFreeSlotTracker implements FreeSlotTracker { private final Supplier<Set<AllocationID>> getAvailableSlotsSupplier; private final Function<AllocationID, SlotInfo> getSlotInfoFunction; private final Supplier<Collection<AllocatedSlotPool.FreeSlotInfo>> getFreeSlotsWithIdleSinceInformationSupplier; - private final Supplier<Collection<SlotInfo>> getFreeSlotsInformationSupplier; + private final Supplier<Collection<PhysicalSlot>> getFreeSlotsInformationSupplier; private final Function<SlotInfo, Double> getTaskExecutorUtilizationFunction; private final Consumer<AllocationID> reserveSlotConsumer; - private final Function<Set<AllocationID>, FreeSlotInfoTracker> - createNewFreeSlotInfoTrackerWithoutBlockedSlotsFunction; + private final Function<Set<AllocationID>, FreeSlotTracker> + createNewFreeSlotTrackerWithoutBlockedSlotsFunction; - public TestingFreeSlotInfoTracker( + public TestingFreeSlotTracker( Supplier<Set<AllocationID>> getAvailableSlotsSupplier, Function<AllocationID, SlotInfo> getSlotInfoFunction, Supplier<Collection<AllocatedSlotPool.FreeSlotInfo>> getFreeSlotsWithIdleSinceInformationSupplier, - Supplier<Collection<SlotInfo>> getFreeSlotsInformationSupplier, + Supplier<Collection<PhysicalSlot>> getFreeSlotsInformationSupplier, Function<SlotInfo, Double> getTaskExecutorUtilizationFunction, Consumer<AllocationID> reserveSlotConsumer, - Function<Set<AllocationID>, FreeSlotInfoTracker> - createNewFreeSlotInfoTrackerWithoutBlockedSlotsFunction) { + Function<Set<AllocationID>, FreeSlotTracker> + createNewFreeSlotTrackerWithoutBlockedSlotsFunction) { this.getAvailableSlotsSupplier = getAvailableSlotsSupplier; this.getSlotInfoFunction = getSlotInfoFunction; this.getFreeSlotsWithIdleSinceInformationSupplier = @@ -57,8 +57,8 @@ public class TestingFreeSlotInfoTracker implements FreeSlotInfoTracker { this.getFreeSlotsInformationSupplier = getFreeSlotsInformationSupplier; this.getTaskExecutorUtilizationFunction = getTaskExecutorUtilizationFunction; this.reserveSlotConsumer = reserveSlotConsumer; - this.createNewFreeSlotInfoTrackerWithoutBlockedSlotsFunction = - createNewFreeSlotInfoTrackerWithoutBlockedSlotsFunction; + this.createNewFreeSlotTrackerWithoutBlockedSlotsFunction = + createNewFreeSlotTrackerWithoutBlockedSlotsFunction; } @Override @@ -77,7 +77,7 @@ public class TestingFreeSlotInfoTracker implements FreeSlotInfoTracker { } @Override - public Collection<SlotInfo> getFreeSlotsInformation() { + public Collection<PhysicalSlot> getFreeSlotsInformation() { return getFreeSlotsInformationSupplier.get(); } @@ -92,27 +92,27 @@ public class TestingFreeSlotInfoTracker implements FreeSlotInfoTracker { } @Override - public FreeSlotInfoTracker createNewFreeSlotInfoTrackerWithoutBlockedSlots( + public FreeSlotTracker createNewFreeSlotTrackerWithoutBlockedSlots( Set<AllocationID> blockedSlots) { - return createNewFreeSlotInfoTrackerWithoutBlockedSlotsFunction.apply(blockedSlots); + return createNewFreeSlotTrackerWithoutBlockedSlotsFunction.apply(blockedSlots); } public static Builder newBuilder() { return new Builder(); } - /** Builder of {@link TestingFreeSlotInfoTracker}. * */ + /** Builder of {@link TestingFreeSlotTracker}. * */ public static class Builder { private Supplier<Set<AllocationID>> getAvailableSlotsSupplier = Collections::emptySet; private Function<AllocationID, SlotInfo> getSlotInfoFunction = ignored -> null; private Supplier<Collection<AllocatedSlotPool.FreeSlotInfo>> getFreeSlotsWithIdleSinceInformationSupplier = Collections::emptyList; - private Supplier<Collection<SlotInfo>> getFreeSlotsInformationSupplier = + private Supplier<Collection<PhysicalSlot>> getFreeSlotsInformationSupplier = Collections::emptyList; private Function<SlotInfo, Double> getTaskExecutorUtilizationFunction = ignored -> 0d; private Consumer<AllocationID> reserveSlotConsumer = ignore -> {}; - private Function<Set<AllocationID>, FreeSlotInfoTracker> - createNewFreeSlotInfoTrackerWithoutBlockedSlotsFunction = ignored -> null; + private Function<Set<AllocationID>, FreeSlotTracker> + createNewFreeSlotTrackerWithoutBlockedSlotsFunction = ignored -> null; public Builder setGetAvailableSlotsSupplier( Supplier<Set<AllocationID>> getAvailableSlotsSupplier) { @@ -135,7 +135,7 @@ public class TestingFreeSlotInfoTracker implements FreeSlotInfoTracker { } public Builder setGetFreeSlotsInformationSupplier( - Supplier<Collection<SlotInfo>> getFreeSlotsInformationSupplier) { + Supplier<Collection<PhysicalSlot>> getFreeSlotsInformationSupplier) { this.getFreeSlotsInformationSupplier = getFreeSlotsInformationSupplier; return this; } @@ -151,23 +151,15 @@ public class TestingFreeSlotInfoTracker implements FreeSlotInfoTracker { return this; } - public Builder setCreateNewFreeSlotInfoTrackerWithoutBlockedSlotsFunction( - Function<Set<AllocationID>, FreeSlotInfoTracker> - createNewFreeSlotInfoTrackerWithoutBlockedSlotsFunction) { - this.createNewFreeSlotInfoTrackerWithoutBlockedSlotsFunction = - createNewFreeSlotInfoTrackerWithoutBlockedSlotsFunction; - return this; - } - - public TestingFreeSlotInfoTracker build() { - return new TestingFreeSlotInfoTracker( + public TestingFreeSlotTracker build() { + return new TestingFreeSlotTracker( getAvailableSlotsSupplier, getSlotInfoFunction, getFreeSlotsWithIdleSinceInformationSupplier, getFreeSlotsInformationSupplier, getTaskExecutorUtilizationFunction, reserveSlotConsumer, - createNewFreeSlotInfoTrackerWithoutBlockedSlotsFunction); + createNewFreeSlotTrackerWithoutBlockedSlotsFunction); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingPhysicalSlot.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingPhysicalSlot.java index 0c9128ec1e8..bce2f90436b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingPhysicalSlot.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingPhysicalSlot.java @@ -21,49 +21,79 @@ package org.apache.flink.runtime.scheduler; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway; -import org.apache.flink.runtime.instance.SimpleSlotContext; import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot; import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import org.apache.flink.util.Preconditions; import javax.annotation.Nullable; -/** - * {@link SimpleSlotContext} subclass implementing the {@link PhysicalSlot} interface for testing - * purposes. - */ -public class TestingPhysicalSlot extends SimpleSlotContext implements PhysicalSlot { +/** An implementing for the {@link PhysicalSlot} interface for testing purposes. */ +public class TestingPhysicalSlot implements PhysicalSlot { + + private final AllocationID allocationId; + + private final TaskManagerLocation taskManagerLocation; + + private final int physicalSlotNumber; + + private final TaskManagerGateway taskManagerGateway; + + private final ResourceProfile resourceProfile; + @Nullable private Payload payload; TestingPhysicalSlot(ResourceProfile resourceProfile, AllocationID allocationId) { this( allocationId, new LocalTaskManagerLocation(), + 0, new SimpleAckingTaskManagerGateway(), resourceProfile); } - TestingPhysicalSlot( - AllocationID allocationID, - TaskManagerLocation taskManagerLocation, - TaskManagerGateway taskManagerGateway, - ResourceProfile resourceProfile) { - this(allocationID, taskManagerLocation, 0, taskManagerGateway, resourceProfile); - } - TestingPhysicalSlot( AllocationID allocationId, TaskManagerLocation taskManagerLocation, int physicalSlotNumber, TaskManagerGateway taskManagerGateway, ResourceProfile resourceProfile) { - super( - allocationId, - taskManagerLocation, - physicalSlotNumber, - taskManagerGateway, - resourceProfile); + this.allocationId = Preconditions.checkNotNull(allocationId); + this.taskManagerLocation = Preconditions.checkNotNull(taskManagerLocation); + this.physicalSlotNumber = physicalSlotNumber; + this.taskManagerGateway = Preconditions.checkNotNull(taskManagerGateway); + this.resourceProfile = resourceProfile; + } + + @Override + public AllocationID getAllocationId() { + return allocationId; + } + + @Override + public TaskManagerLocation getTaskManagerLocation() { + return taskManagerLocation; + } + + @Override + public int getPhysicalSlotNumber() { + return physicalSlotNumber; + } + + @Override + public TaskManagerGateway getTaskManagerGateway() { + return taskManagerGateway; + } + + @Override + public ResourceProfile getResourceProfile() { + return resourceProfile; + } + + @Override + public boolean willBeOccupiedIndefinitely() { + return true; } @Override diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java index 2d035775f63..70babe57449 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java @@ -78,7 +78,7 @@ import org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPool; import org.apache.flink.runtime.jobmaster.slotpool.DefaultAllocatedSlotPool; import org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool; import org.apache.flink.runtime.jobmaster.slotpool.TestingDeclarativeSlotPoolBuilder; -import org.apache.flink.runtime.jobmaster.slotpool.TestingFreeSlotInfoTracker; +import org.apache.flink.runtime.jobmaster.slotpool.TestingFreeSlotTracker; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint; import org.apache.flink.runtime.metrics.MetricNames; @@ -97,7 +97,7 @@ import org.apache.flink.runtime.scheduler.SchedulerTestingUtils; import org.apache.flink.runtime.scheduler.TestingPhysicalSlot; import org.apache.flink.runtime.scheduler.VertexParallelismInformation; import org.apache.flink.runtime.scheduler.VertexParallelismStore; -import org.apache.flink.runtime.scheduler.adaptive.allocator.TestSlotInfo; +import org.apache.flink.runtime.scheduler.adaptive.allocator.TestingSlot; import org.apache.flink.runtime.scheduler.adaptive.allocator.TestingSlotAllocator; import org.apache.flink.runtime.scheduler.exceptionhistory.ExceptionHistoryEntry; import org.apache.flink.runtime.scheduler.exceptionhistory.RootExceptionHistoryEntry; @@ -302,19 +302,19 @@ public class AdaptiveSchedulerTest { void testHasEnoughResourcesReturnsTrueIfSatisfied() { final ResourceCounter resourceRequirement = ResourceCounter.withResource(ResourceProfile.UNKNOWN, 1); - final Collection<TestSlotInfo> freeSlots = - createSlotInfosForResourceRequirements(resourceRequirement); + final Collection<TestingSlot> freeSlots = + createSlotsForResourceRequirements(resourceRequirement); assertThat(AdaptiveScheduler.hasDesiredResources(resourceRequirement, freeSlots)).isTrue(); } - private Collection<TestSlotInfo> createSlotInfosForResourceRequirements( + private Collection<TestingSlot> createSlotsForResourceRequirements( ResourceCounter resourceRequirements) { - final Collection<TestSlotInfo> slotInfos = new ArrayList<>(); + final Collection<TestingSlot> slotInfos = new ArrayList<>(); for (Map.Entry<ResourceProfile, Integer> resourceProfileCount : resourceRequirements.getResourcesWithCount()) { for (int i = 0; i < resourceProfileCount.getValue(); i++) { - slotInfos.add(new TestSlotInfo(resourceProfileCount.getKey())); + slotInfos.add(new TestingSlot(resourceProfileCount.getKey())); } } @@ -330,8 +330,8 @@ public class AdaptiveSchedulerTest { ResourceCounter.withResource( ResourceProfile.newBuilder().setCpuCores(1).build(), numRequiredSlots); - final Collection<TestSlotInfo> freeSlots = - createSlotInfosForResourceRequirements(providedResources); + final Collection<TestingSlot> freeSlots = + createSlotsForResourceRequirements(providedResources); assertThat(AdaptiveScheduler.hasDesiredResources(requiredResources, freeSlots)).isTrue(); } @@ -2243,15 +2243,15 @@ public class AdaptiveSchedulerTest { TestingPhysicalSlot.builder() .withAllocationID(allocationId) .build()) - .setGetFreeSlotInfoTrackerSupplier( + .setGetFreeSlotTrackerSupplier( () -> - TestingFreeSlotInfoTracker.newBuilder() + TestingFreeSlotTracker.newBuilder() .setGetFreeSlotsInformationSupplier( () -> IntStream.range(0, parallelism) .mapToObj( v -> - new TestSlotInfo()) + new TestingSlot()) .collect( Collectors.toSet())) .build()) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocatorTest.java index bdf3396c980..a37a93ca9cd 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocatorTest.java @@ -413,9 +413,9 @@ class SlotSharingSlotAllocatorTest { 1))); List<SlotInfo> freeSlots = new ArrayList<>(); - IntStream.range(0, 10).forEach(i -> freeSlots.add(new TestSlotInfo(new AllocationID()))); - freeSlots.add(new TestSlotInfo(allocation1)); - freeSlots.add(new TestSlotInfo(allocation2)); + IntStream.range(0, 10).forEach(i -> freeSlots.add(new TestingSlot(new AllocationID()))); + freeSlots.add(new TestingSlot(allocation1)); + freeSlots.add(new TestingSlot(allocation2)); JobSchedulingPlan schedulingPlan = SlotSharingSlotAllocator.createSlotSharingSlotAllocator( @@ -448,7 +448,7 @@ class SlotSharingSlotAllocatorTest { private static Collection<SlotInfo> getSlots(int count) { final Collection<SlotInfo> slotInfo = new ArrayList<>(); for (int i = 0; i < count; i++) { - slotInfo.add(new TestSlotInfo()); + slotInfo.add(new TestingSlot()); } return slotInfo; } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssignerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssignerTest.java index ef0f7fc1f3d..445eedc82ea 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssignerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssignerTest.java @@ -175,7 +175,7 @@ class StateLocalitySlotAssignerTest { return new StateLocalitySlotAssigner() .assignSlots( new TestJobInformation(singletonList(vertexInformation)), - allocationIDs.stream().map(TestSlotInfo::new).collect(Collectors.toList()), + allocationIDs.stream().map(TestingSlot::new).collect(Collectors.toList()), new VertexParallelism( singletonMap( vertexInformation.getJobVertexID(), diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestSlotInfo.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestingSlot.java similarity index 73% rename from flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestSlotInfo.java rename to flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestingSlot.java index 9f3573a0705..85133a83678 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestSlotInfo.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestingSlot.java @@ -19,29 +19,30 @@ package org.apache.flink.runtime.scheduler.adaptive.allocator; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; -import org.apache.flink.runtime.jobmaster.SlotInfo; +import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; +import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot; import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; -/** Test {@link SlotInfo} implementation. */ -public class TestSlotInfo implements SlotInfo { +/** An implementation of {@link PhysicalSlot}for testing. */ +public class TestingSlot implements PhysicalSlot { private final AllocationID allocationId; private final ResourceProfile resourceProfile; - public TestSlotInfo() { + public TestingSlot() { this(new AllocationID(), ResourceProfile.ANY); } - public TestSlotInfo(AllocationID allocationId) { + public TestingSlot(AllocationID allocationId) { this(allocationId, ResourceProfile.ANY); } - public TestSlotInfo(ResourceProfile resourceProfile) { + public TestingSlot(ResourceProfile resourceProfile) { this(new AllocationID(), resourceProfile); } - public TestSlotInfo(AllocationID allocationId, ResourceProfile resourceProfile) { + public TestingSlot(AllocationID allocationId, ResourceProfile resourceProfile) { this.allocationId = allocationId; this.resourceProfile = resourceProfile; } @@ -70,4 +71,14 @@ public class TestSlotInfo implements SlotInfo { public boolean willBeOccupiedIndefinitely() { return false; } + + @Override + public TaskManagerGateway getTaskManagerGateway() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean tryAssignPayload(Payload payload) { + throw new UnsupportedOperationException(); + } }
