This is an automated email from the ASF dual-hosted git repository. xtsong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit a0aa4813b9fe3db37cabe38fa47f1d1096c879dc Author: Weihua Hu <huweihua....@gmail.com> AuthorDate: Thu Nov 3 16:10:49 2022 +0800 [FLINK-29870][runtime] Split ResourceActions to ResourceAllocator and ResourceEventListener. --- .../runtime/resourcemanager/ResourceManager.java | 12 ++-- .../slotmanager/DeclarativeSlotManager.java | 25 +++---- .../slotmanager/FineGrainedSlotManager.java | 32 +++++---- ...ResourceActions.java => ResourceAllocator.java} | 16 +---- ...urceActions.java => ResourceEventListener.java} | 30 ++------ .../resourcemanager/slotmanager/SlotManager.java | 10 +-- .../slotmanager/TaskExecutorManager.java | 14 ++-- .../AbstractFineGrainedSlotManagerITCase.java | 12 ++-- .../slotmanager/DeclarativeSlotManagerBuilder.java | 40 +++++++++-- .../slotmanager/DeclarativeSlotManagerTest.java | 83 ++++++++++++---------- ...gerDefaultResourceAllocationStrategyITCase.java | 2 +- .../slotmanager/FineGrainedSlotManagerTest.java | 12 ++-- .../FineGrainedSlotManagerTestBase.java | 10 ++- .../slotmanager/TaskExecutorManagerBuilder.java | 8 +-- .../slotmanager/TaskExecutorManagerTest.java | 38 +++++----- ...eActions.java => TestingResourceAllocator.java} | 25 ++----- ...r.java => TestingResourceAllocatorBuilder.java} | 28 ++------ .../slotmanager/TestingResourceEventListener.java | 48 +++++++++++++ .../TestingResourceEventListenerBuilder.java | 41 +++++++++++ .../slotmanager/TestingSlotManager.java | 3 +- 20 files changed, 284 insertions(+), 205 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java index 12684c25247..3abd6b6b68d 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java @@ -56,7 +56,8 @@ import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerExcept import org.apache.flink.runtime.resourcemanager.exceptions.UnknownTaskExecutorException; import org.apache.flink.runtime.resourcemanager.registration.JobManagerRegistration; import org.apache.flink.runtime.resourcemanager.registration.WorkerRegistration; -import org.apache.flink.runtime.resourcemanager.slotmanager.ResourceActions; +import org.apache.flink.runtime.resourcemanager.slotmanager.ResourceAllocator; +import org.apache.flink.runtime.resourcemanager.slotmanager.ResourceEventListener; import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; import org.apache.flink.runtime.rest.messages.LogInfo; import org.apache.flink.runtime.rest.messages.ThreadDumpInfo; @@ -266,7 +267,8 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable> slotManager.start( getFencingToken(), getMainThreadExecutor(), - new ResourceActionsImpl(), + new ResourceAllocatorImpl(), + new ResourceEventListenerImpl(), blocklistHandler::isBlockedTaskManager); delegationTokenManager.start(this); @@ -1334,7 +1336,7 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable> } } - private class ResourceActionsImpl implements ResourceActions { + private class ResourceAllocatorImpl implements ResourceAllocator { @Override public void releaseResource(InstanceID instanceId, Exception cause) { @@ -1348,9 +1350,11 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable> validateRunsInMainThread(); return startNewWorker(workerResourceSpec); } + } + private class ResourceEventListenerImpl implements ResourceEventListener { @Override - public void notifyNotEnoughResourcesAvailable( + public void notEnoughResourceAvailable( JobID jobId, Collection<ResourceRequirement> acquiredResources) { validateRunsInMainThread(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManager.java index 71898ae747d..f5e210de073 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManager.java @@ -69,7 +69,7 @@ public class DeclarativeSlotManager implements SlotManager { private final SlotTracker slotTracker; private final ResourceTracker resourceTracker; - private final BiFunction<Executor, ResourceActions, TaskExecutorManager> + private final BiFunction<Executor, ResourceAllocator, TaskExecutorManager> taskExecutorManagerFactory; @Nullable private TaskExecutorManager taskExecutorManager; @@ -97,8 +97,8 @@ public class DeclarativeSlotManager implements SlotManager { /** Executor for future callbacks which have to be "synchronized". */ @Nullable private Executor mainThreadExecutor; - /** Callbacks for resource (de-)allocations. */ - @Nullable private ResourceActions resourceActions; + /** Callbacks for resource not enough. */ + @Nullable private ResourceEventListener resourceEventListener; /** The future of the requirements delay check. */ @Nullable private CompletableFuture<Void> requirementsCheckFuture; @@ -131,7 +131,7 @@ public class DeclarativeSlotManager implements SlotManager { slotMatchingStrategy = slotManagerConfiguration.getSlotMatchingStrategy(); taskExecutorManagerFactory = - (executor, resourceActions) -> + (executor, resourceAllocator) -> new TaskExecutorManager( slotManagerConfiguration.getDefaultWorkerResourceSpec(), slotManagerConfiguration.getNumSlotsPerWorker(), @@ -141,10 +141,10 @@ public class DeclarativeSlotManager implements SlotManager { slotManagerConfiguration.getTaskManagerTimeout(), scheduledExecutor, executor, - resourceActions); + resourceAllocator); resourceManagerId = null; - resourceActions = null; + resourceEventListener = null; mainThreadExecutor = null; taskExecutorManager = null; blockedTaskManagerChecker = null; @@ -199,22 +199,23 @@ public class DeclarativeSlotManager implements SlotManager { * * @param newResourceManagerId to use for communication with the task managers * @param newMainThreadExecutor to use to run code in the ResourceManager's main thread - * @param newResourceActions to use for resource (de-)allocations + * @param newResourceAllocator to use for resource (de-)allocations * @param newBlockedTaskManagerChecker to query whether a task manager is blocked */ @Override public void start( ResourceManagerId newResourceManagerId, Executor newMainThreadExecutor, - ResourceActions newResourceActions, + ResourceAllocator newResourceAllocator, + ResourceEventListener newResourceEventListener, BlockedTaskManagerChecker newBlockedTaskManagerChecker) { LOG.debug("Starting the slot manager."); this.resourceManagerId = Preconditions.checkNotNull(newResourceManagerId); mainThreadExecutor = Preconditions.checkNotNull(newMainThreadExecutor); - resourceActions = Preconditions.checkNotNull(newResourceActions); + resourceEventListener = Preconditions.checkNotNull(newResourceEventListener); taskExecutorManager = - taskExecutorManagerFactory.apply(newMainThreadExecutor, newResourceActions); + taskExecutorManagerFactory.apply(newMainThreadExecutor, newResourceAllocator); blockedTaskManagerChecker = Preconditions.checkNotNull(newBlockedTaskManagerChecker); started = true; @@ -253,7 +254,7 @@ public class DeclarativeSlotManager implements SlotManager { taskExecutorManager = null; resourceManagerId = null; - resourceActions = null; + resourceEventListener = null; blockedTaskManagerChecker = null; started = false; } @@ -723,7 +724,7 @@ public class DeclarativeSlotManager implements SlotManager { "Could not fulfill resource requirements of job {}. Free slots: {}", jobId, slotTracker.getFreeSlots().size()); - resourceActions.notifyNotEnoughResourcesAvailable( + resourceEventListener.notEnoughResourceAvailable( jobId, resourceTracker.getAcquiredResources(jobId)); return pendingSlots; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java index 4833c7ad1a6..c999d9883de 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java @@ -108,7 +108,10 @@ public class FineGrainedSlotManager implements SlotManager { @Nullable private Executor mainThreadExecutor; /** Callbacks for resource (de-)allocations. */ - @Nullable private ResourceActions resourceActions; + @Nullable private ResourceAllocator resourceAllocator; + + /** Callbacks for resource not enough. */ + @Nullable private ResourceEventListener resourceEventListener; @Nullable private ScheduledFuture<?> taskManagerTimeoutsCheck; @@ -149,7 +152,8 @@ public class FineGrainedSlotManager implements SlotManager { this.maxTotalMem = Preconditions.checkNotNull(slotManagerConfiguration.getMaxTotalMem()); resourceManagerId = null; - resourceActions = null; + resourceAllocator = null; + resourceEventListener = null; mainThreadExecutor = null; taskManagerTimeoutsCheck = null; requirementsCheckFuture = null; @@ -166,7 +170,7 @@ public class FineGrainedSlotManager implements SlotManager { if (failUnfulfillableRequest && !unfulfillableJobs.isEmpty()) { for (JobID jobId : unfulfillableJobs) { - resourceActions.notifyNotEnoughResourcesAvailable( + resourceEventListener.notEnoughResourceAvailable( jobId, resourceTracker.getAcquiredResources(jobId)); } } @@ -186,20 +190,22 @@ public class FineGrainedSlotManager implements SlotManager { * * @param newResourceManagerId to use for communication with the task managers * @param newMainThreadExecutor to use to run code in the ResourceManager's main thread - * @param newResourceActions to use for resource (de-)allocations + * @param newResourceAllocator to use for resource (de-)allocations * @param newBlockedTaskManagerChecker to query whether a task manager is blocked */ @Override public void start( ResourceManagerId newResourceManagerId, Executor newMainThreadExecutor, - ResourceActions newResourceActions, + ResourceAllocator newResourceAllocator, + ResourceEventListener newResourceEventListener, BlockedTaskManagerChecker newBlockedTaskManagerChecker) { LOG.info("Starting the slot manager."); resourceManagerId = Preconditions.checkNotNull(newResourceManagerId); mainThreadExecutor = Preconditions.checkNotNull(newMainThreadExecutor); - resourceActions = Preconditions.checkNotNull(newResourceActions); + resourceAllocator = Preconditions.checkNotNull(newResourceAllocator); + resourceEventListener = Preconditions.checkNotNull(newResourceEventListener); slotStatusSyncer.initialize( taskManagerTracker, resourceTracker, resourceManagerId, mainThreadExecutor); blockedTaskManagerChecker = Preconditions.checkNotNull(newBlockedTaskManagerChecker); @@ -246,7 +252,8 @@ public class FineGrainedSlotManager implements SlotManager { unfulfillableJobs.clear(); resourceManagerId = null; - resourceActions = null; + resourceAllocator = null; + resourceEventListener = null; started = false; } @@ -346,7 +353,7 @@ public class FineGrainedSlotManager implements SlotManager { taskExecutorConnection.getResourceID(), maxTotalCpu, maxTotalMem.toHumanReadableString()); - resourceActions.releaseResource( + resourceAllocator.releaseResource( taskExecutorConnection.getInstanceID(), new FlinkExpectedException( "The max total resource limitation is reached.")); @@ -571,7 +578,7 @@ public class FineGrainedSlotManager implements SlotManager { if (sendNotEnoughResourceNotifications) { for (JobID jobId : unfulfillableJobs) { LOG.warn("Could not fulfill resource requirements of job {}.", jobId); - resourceActions.notifyNotEnoughResourcesAvailable( + resourceEventListener.notEnoughResourceAvailable( jobId, resourceTracker.getAcquiredResources(jobId)); } } @@ -730,7 +737,7 @@ public class FineGrainedSlotManager implements SlotManager { private void releaseIdleTaskExecutor(InstanceID timedOutTaskManagerId) { final FlinkExpectedException cause = new FlinkExpectedException("TaskManager exceeded the idle timeout."); - resourceActions.releaseResource(timedOutTaskManagerId, cause); + resourceAllocator.releaseResource(timedOutTaskManagerId, cause); } private boolean allocateResource(PendingTaskManager pendingTaskManager) { @@ -743,7 +750,7 @@ public class FineGrainedSlotManager implements SlotManager { return false; } - if (!resourceActions.allocateResource( + if (!resourceAllocator.allocateResource( WorkerResourceSpec.fromTotalResourceProfile( pendingTaskManager.getTotalResourceProfile(), pendingTaskManager.getNumSlots()))) { @@ -771,7 +778,8 @@ public class FineGrainedSlotManager implements SlotManager { Preconditions.checkState(started, "The slot manager has not been started."); Preconditions.checkNotNull(resourceManagerId); Preconditions.checkNotNull(mainThreadExecutor); - Preconditions.checkNotNull(resourceActions); + Preconditions.checkNotNull(resourceAllocator); + Preconditions.checkNotNull(resourceEventListener); } private boolean isMaxTotalResourceExceededAfterAdding(ResourceProfile newResource) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceActions.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceAllocator.java similarity index 73% copy from flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceActions.java copy to flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceAllocator.java index 639f8cb53fa..179a2ad5ae1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceActions.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceAllocator.java @@ -18,15 +18,11 @@ package org.apache.flink.runtime.resourcemanager.slotmanager; -import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.instance.InstanceID; import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec; -import org.apache.flink.runtime.slots.ResourceRequirement; - -import java.util.Collection; /** Resource related actions which the {@link SlotManager} can perform. */ -public interface ResourceActions { +public interface ResourceAllocator { /** * Releases the resource with the given instance id. @@ -43,14 +39,4 @@ public interface ResourceActions { * @return whether the resource can be allocated */ boolean allocateResource(WorkerResourceSpec workerResourceSpec); - - /** - * Notifies that not enough resources are available to fulfill the resource requirements of a - * job. - * - * @param jobId job for which not enough resources are available - * @param acquiredResources the resources that have been acquired for the job - */ - void notifyNotEnoughResourcesAvailable( - JobID jobId, Collection<ResourceRequirement> acquiredResources); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceActions.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceEventListener.java similarity index 53% rename from flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceActions.java rename to flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceEventListener.java index 639f8cb53fa..1106b378a18 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceActions.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceEventListener.java @@ -19,38 +19,16 @@ package org.apache.flink.runtime.resourcemanager.slotmanager; import org.apache.flink.api.common.JobID; -import org.apache.flink.runtime.instance.InstanceID; -import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec; import org.apache.flink.runtime.slots.ResourceRequirement; import java.util.Collection; -/** Resource related actions which the {@link SlotManager} can perform. */ -public interface ResourceActions { - - /** - * Releases the resource with the given instance id. - * - * @param instanceId identifying which resource to release - * @param cause why the resource is released - */ - void releaseResource(InstanceID instanceId, Exception cause); - - /** - * Requests to allocate a resource with the given {@link WorkerResourceSpec}. - * - * @param workerResourceSpec for the to be allocated worker - * @return whether the resource can be allocated - */ - boolean allocateResource(WorkerResourceSpec workerResourceSpec); - +/** Listener for resource events of {@link SlotManager}. */ +@FunctionalInterface +public interface ResourceEventListener { /** - * Notifies that not enough resources are available to fulfill the resource requirements of a - * job. - * * @param jobId job for which not enough resources are available * @param acquiredResources the resources that have been acquired for the job */ - void notifyNotEnoughResourcesAvailable( - JobID jobId, Collection<ResourceRequirement> acquiredResources); + void notEnoughResourceAvailable(JobID jobId, Collection<ResourceRequirement> acquiredResources); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java index cf706515b02..9ea06c1478b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java @@ -40,7 +40,7 @@ import java.util.concurrent.Executor; * their allocation and all pending slot requests. Whenever a new slot is registered or an allocated * slot is freed, then it tries to fulfill another pending slot request. Whenever there are not * enough slots available the slot manager will notify the resource manager about it via {@link - * ResourceActions#allocateResource(WorkerResourceSpec)}. + * ResourceAllocator#allocateResource(WorkerResourceSpec)}. * * <p>In order to free resources and avoid resource leaks, idling task managers (task managers whose * slots are currently not used) and pending slot requests time out triggering their release and @@ -56,7 +56,7 @@ public interface SlotManager extends AutoCloseable { int getNumberFreeSlotsOf(InstanceID instanceId); /** - * Get number of workers SlotManager requested from {@link ResourceActions} that are not yet + * Get number of workers SlotManager requested from {@link ResourceAllocator} that are not yet * fulfilled. * * @return a map whose key set is all the unique resource specs of the pending workers, and the @@ -79,13 +79,15 @@ public interface SlotManager extends AutoCloseable { * * @param newResourceManagerId to use for communication with the task managers * @param newMainThreadExecutor to use to run code in the ResourceManager's main thread - * @param newResourceActions to use for resource (de-)allocations + * @param newResourceAllocator to use for resource (de-)allocations + * @param resourceEventListener to use for notify resource not enough * @param newBlockedTaskManagerChecker to query whether a task manager is blocked */ void start( ResourceManagerId newResourceManagerId, Executor newMainThreadExecutor, - ResourceActions newResourceActions, + ResourceAllocator newResourceAllocator, + ResourceEventListener resourceEventListener, BlockedTaskManagerChecker newBlockedTaskManagerChecker); /** Suspends the component. This clears the internal state of the slot manager. */ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManager.java index 1df5a872ee4..d1515a45eea 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManager.java @@ -90,7 +90,7 @@ class TaskExecutorManager implements AutoCloseable { private final Time taskManagerTimeout; /** Callbacks for resource (de-)allocations. */ - private final ResourceActions resourceActions; + private final ResourceAllocator resourceAllocator; /** All currently registered task managers. */ private final Map<InstanceID, TaskManagerRegistration> taskManagerRegistrations = @@ -111,7 +111,7 @@ class TaskExecutorManager implements AutoCloseable { Time taskManagerTimeout, ScheduledExecutor scheduledExecutor, Executor mainThreadExecutor, - ResourceActions resourceActions) { + ResourceAllocator resourceAllocator) { this.defaultWorkerResourceSpec = defaultWorkerResourceSpec; this.numSlotsPerWorker = numSlotsPerWorker; @@ -123,7 +123,7 @@ class TaskExecutorManager implements AutoCloseable { SlotManagerUtils.generateDefaultSlotResourceProfile( defaultWorkerResourceSpec, numSlotsPerWorker); - this.resourceActions = Preconditions.checkNotNull(resourceActions); + this.resourceAllocator = Preconditions.checkNotNull(resourceAllocator); this.mainThreadExecutor = mainThreadExecutor; taskManagerTimeoutsAndRedundancyCheck = scheduledExecutor.scheduleWithFixedDelay( @@ -157,7 +157,7 @@ class TaskExecutorManager implements AutoCloseable { LOG.info( "The total number of slots exceeds the max limitation {}, releasing the excess task executor.", maxSlotNum); - resourceActions.releaseResource( + resourceAllocator.releaseResource( taskExecutorConnection.getInstanceID(), new FlinkExpectedException( "The total number of slots exceeds the max limitation.")); @@ -270,7 +270,7 @@ class TaskExecutorManager implements AutoCloseable { return Optional.empty(); } - if (!resourceActions.allocateResource(defaultWorkerResourceSpec)) { + if (!resourceAllocator.allocateResource(defaultWorkerResourceSpec)) { // resource cannot be allocated return Optional.empty(); } @@ -321,7 +321,7 @@ class TaskExecutorManager implements AutoCloseable { >= taskManagerTimeout.toMilliseconds()) { // we collect the instance ids first in order to avoid concurrent modifications // by the - // ResourceActions.releaseResource call + // ResourceAllocator.releaseResource call timedOutTaskManagers.add(taskManagerRegistration); } } @@ -406,7 +406,7 @@ class TaskExecutorManager implements AutoCloseable { LOG.debug( "Release TaskExecutor {} because it exceeded the idle timeout.", timedOutTaskManagerId); - resourceActions.releaseResource(timedOutTaskManagerId, cause); + resourceAllocator.releaseResource(timedOutTaskManagerId, cause); } // --------------------------------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/AbstractFineGrainedSlotManagerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/AbstractFineGrainedSlotManagerITCase.java index fd86e99efb1..e472b718539 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/AbstractFineGrainedSlotManagerITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/AbstractFineGrainedSlotManagerITCase.java @@ -67,7 +67,7 @@ abstract class AbstractFineGrainedSlotManagerITCase extends FineGrainedSlotManag new CompletableFuture<>(); new Context() { { - resourceActionsBuilder.setAllocateResourceConsumer( + resourceAllocatorBuilder.setAllocateResourceConsumer( allocateResourceFuture::complete); runTest( () -> { @@ -224,7 +224,7 @@ abstract class AbstractFineGrainedSlotManagerITCase extends FineGrainedSlotManag new Context() { { setBlockedTaskManagerChecker(blockedTaskManager::equals); - resourceActionsBuilder.setAllocateResourceConsumer( + resourceAllocatorBuilder.setAllocateResourceConsumer( allocateResourceFuture::complete); runTest( () -> { @@ -259,7 +259,7 @@ abstract class AbstractFineGrainedSlotManagerITCase extends FineGrainedSlotManag new Context() { { - resourceActionsBuilder.setAllocateResourceConsumer( + resourceAllocatorBuilder.setAllocateResourceConsumer( ignored -> { if (allocateResourceFutures.get(0).isDone()) { allocateResourceFutures.get(1).complete(null); @@ -430,7 +430,7 @@ abstract class AbstractFineGrainedSlotManagerITCase extends FineGrainedSlotManag final SlotReport slotReport = new SlotReport(); new Context() { { - resourceActionsBuilder.setAllocateResourceConsumer( + resourceAllocatorBuilder.setAllocateResourceConsumer( ignored -> allocateResourceFutures.complete(null)); runTest( () -> { @@ -481,7 +481,7 @@ abstract class AbstractFineGrainedSlotManagerITCase extends FineGrainedSlotManag final SlotReport slotReport = new SlotReport(); new Context() { { - resourceActionsBuilder.setAllocateResourceConsumer( + resourceAllocatorBuilder.setAllocateResourceConsumer( ignored -> allocateResourceFutures.complete(null)); runTest( () -> { @@ -528,7 +528,7 @@ abstract class AbstractFineGrainedSlotManagerITCase extends FineGrainedSlotManag new Context() { { - resourceActionsBuilder.setAllocateResourceConsumer( + resourceAllocatorBuilder.setAllocateResourceConsumer( ignored -> { if (allocateResourceFutures.get(0).isDone()) { allocateResourceFutures.get(1).complete(null); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerBuilder.java index ee0b811350b..959f5250018 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerBuilder.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerBuilder.java @@ -170,30 +170,56 @@ public class DeclarativeSlotManagerBuilder { public DeclarativeSlotManager buildAndStartWithDirectExec() { return buildAndStartWithDirectExec( - ResourceManagerId.generate(), new TestingResourceActionsBuilder().build()); + ResourceManagerId.generate(), + new TestingResourceAllocatorBuilder().build(), + new TestingResourceEventListenerBuilder().build()); } public DeclarativeSlotManager buildAndStartWithDirectExec( - ResourceManagerId resourceManagerId, ResourceActions resourceManagerActions) { - return buildAndStart(resourceManagerId, Executors.directExecutor(), resourceManagerActions); + ResourceManagerId resourceManagerId, ResourceAllocator resourceAllocator) { + return buildAndStartWithDirectExec( + resourceManagerId, + resourceAllocator, + new TestingResourceEventListenerBuilder().build()); + } + + public DeclarativeSlotManager buildAndStartWithDirectExec( + ResourceManagerId resourceManagerId, + ResourceAllocator resourceAllocator, + ResourceEventListener resourceEventListener) { + return buildAndStart( + resourceManagerId, + Executors.directExecutor(), + resourceAllocator, + resourceEventListener); } public DeclarativeSlotManager buildAndStart( ResourceManagerId resourceManagerId, Executor executor, - ResourceActions resourceManagerActions) { + ResourceAllocator resourceAllocator, + ResourceEventListener resourceEventListener) { return buildAndStart( - resourceManagerId, executor, resourceManagerActions, resourceID -> false); + resourceManagerId, + executor, + resourceAllocator, + resourceEventListener, + resourceID -> false); } public DeclarativeSlotManager buildAndStart( ResourceManagerId resourceManagerId, Executor executor, - ResourceActions resourceManagerActions, + ResourceAllocator resourceAllocator, + ResourceEventListener resourceEventListener, BlockedTaskManagerChecker blockedTaskManagerChecker) { final DeclarativeSlotManager slotManager = build(); slotManager.start( - resourceManagerId, executor, resourceManagerActions, blockedTaskManagerChecker); + resourceManagerId, + executor, + resourceAllocator, + resourceEventListener, + blockedTaskManagerChecker); return slotManager; } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerTest.java index 6ae3cdeffba..684c4145cea 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerTest.java @@ -194,13 +194,12 @@ class DeclarativeSlotManagerTest { final ResourceRequirements resourceRequirements = createResourceRequirementsForSingleSlot(); CompletableFuture<WorkerResourceSpec> allocateResourceFuture = new CompletableFuture<>(); - ResourceActions resourceManagerActions = - new TestingResourceActionsBuilder() + ResourceAllocator resourceAllocator = + new TestingResourceAllocatorBuilder() .setAllocateResourceConsumer(allocateResourceFuture::complete) .build(); - try (SlotManager slotManager = - createSlotManager(resourceManagerId, resourceManagerActions)) { + try (SlotManager slotManager = createSlotManager(resourceManagerId, resourceAllocator)) { slotManager.processResourceRequirements(resourceRequirements); @@ -218,8 +217,8 @@ class DeclarativeSlotManagerTest { final ResourceRequirements resourceRequirements = createResourceRequirementsForSingleSlot(); CompletableFuture<WorkerResourceSpec> allocateResourceFuture = new CompletableFuture<>(); - ResourceActions resourceManagerActions = - new TestingResourceActionsBuilder() + ResourceAllocator resourceAllocator = + new TestingResourceAllocatorBuilder() .setAllocateResourceConsumer(allocateResourceFuture::complete) .build(); @@ -230,7 +229,8 @@ class DeclarativeSlotManagerTest { .buildAndStart( resourceManagerId, Executors.directExecutor(), - resourceManagerActions, + resourceAllocator, + new TestingResourceEventListenerBuilder().build(), blockedTaskManager::equals)) { final TaskExecutorGateway taskExecutorGateway = @@ -258,8 +258,8 @@ class DeclarativeSlotManagerTest { void testRequirementDeclarationWithResourceAllocationFailure() throws Exception { final ResourceRequirements resourceRequirements = createResourceRequirementsForSingleSlot(); - ResourceActions resourceManagerActions = - new TestingResourceActionsBuilder() + ResourceAllocator resourceAllocator = + new TestingResourceAllocatorBuilder() .setAllocateResourceFunction(value -> false) .build(); @@ -269,7 +269,7 @@ class DeclarativeSlotManagerTest { createDeclarativeSlotManagerBuilder() .setResourceTracker(resourceTracker) .buildAndStartWithDirectExec( - ResourceManagerId.generate(), resourceManagerActions)) { + ResourceManagerId.generate(), resourceAllocator)) { slotManager.processResourceRequirements(resourceRequirements); @@ -347,7 +347,7 @@ class DeclarativeSlotManagerTest { createDeclarativeSlotManagerBuilder() .setSlotTracker(slotTracker) .buildAndStartWithDirectExec( - resourceManagerId, new TestingResourceActionsBuilder().build())) { + resourceManagerId, new TestingResourceAllocatorBuilder().build())) { if (scenario == RequirementDeclarationScenario @@ -432,8 +432,8 @@ class DeclarativeSlotManagerTest { void testDuplicateResourceRequirementDeclarationAfterSuccessfulAllocation() throws Exception { final ResourceManagerId resourceManagerId = ResourceManagerId.generate(); final AtomicInteger allocateResourceCalls = new AtomicInteger(0); - final ResourceActions resourceManagerActions = - new TestingResourceActionsBuilder() + final ResourceAllocator resourceAllocator = + new TestingResourceAllocatorBuilder() .setAllocateResourceConsumer( ignored -> allocateResourceCalls.incrementAndGet()) .build(); @@ -454,7 +454,7 @@ class DeclarativeSlotManagerTest { try (DeclarativeSlotManager slotManager = createDeclarativeSlotManagerBuilder() .setSlotTracker(slotTracker) - .buildAndStartWithDirectExec(resourceManagerId, resourceManagerActions)) { + .buildAndStartWithDirectExec(resourceManagerId, resourceAllocator)) { slotManager.registerTaskManager( taskManagerConnection, slotReport, ResourceProfile.ANY, ResourceProfile.ANY); @@ -549,14 +549,13 @@ class DeclarativeSlotManagerTest { @Test void testReceivingUnknownSlotReport() throws Exception { final ResourceManagerId resourceManagerId = ResourceManagerId.generate(); - final ResourceActions resourceManagerActions = new TestingResourceActionsBuilder().build(); + final ResourceAllocator resourceAllocator = new TestingResourceAllocatorBuilder().build(); final InstanceID unknownInstanceID = new InstanceID(); final SlotID unknownSlotId = new SlotID(ResourceID.generate(), 0); final SlotReport unknownSlotReport = new SlotReport(createFreeSlotStatus(unknownSlotId)); - try (SlotManager slotManager = - createSlotManager(resourceManagerId, resourceManagerActions)) { + try (SlotManager slotManager = createSlotManager(resourceManagerId, resourceAllocator)) { // check that we don't have any slots registered assertThat(slotManager.getNumberRegisteredSlots()).isEqualTo(0); @@ -656,7 +655,8 @@ class DeclarativeSlotManagerTest { .buildAndStart( ResourceManagerId.generate(), mainThreadExecutor, - new TestingResourceActionsBuilder().build())) { + new TestingResourceAllocatorBuilder().build(), + new TestingResourceEventListenerBuilder().build())) { CompletableFuture.runAsync( () -> @@ -790,7 +790,8 @@ class DeclarativeSlotManagerTest { .buildAndStart( ResourceManagerId.generate(), mainThreadExecutor, - new TestingResourceActionsBuilder().build())) { + new TestingResourceAllocatorBuilder().build(), + new TestingResourceEventListenerBuilder().build())) { slotManager.registerTaskManager( taskExecutorConnection, slotReport, ResourceProfile.ANY, ResourceProfile.ANY); @@ -1073,8 +1074,8 @@ class DeclarativeSlotManagerTest { void testRequestNewResources() throws Exception { final int numberSlots = 2; final AtomicInteger resourceRequests = new AtomicInteger(0); - final TestingResourceActions testingResourceActions = - new TestingResourceActionsBuilder() + final TestingResourceAllocator testingResourceAllocator = + new TestingResourceAllocatorBuilder() .setAllocateResourceFunction( ignored -> { resourceRequests.incrementAndGet(); @@ -1084,7 +1085,7 @@ class DeclarativeSlotManagerTest { try (final DeclarativeSlotManager slotManager = createSlotManager( - ResourceManagerId.generate(), testingResourceActions, numberSlots)) { + ResourceManagerId.generate(), testingResourceAllocator, numberSlots)) { final JobID jobId = new JobID(); @@ -1190,10 +1191,14 @@ class DeclarativeSlotManagerTest { List<Tuple2<JobID, Collection<ResourceRequirement>>> notEnoughResourceNotifications = new ArrayList<>(); - ResourceActions resourceManagerActions = - new TestingResourceActionsBuilder() + ResourceAllocator resourceAllocator = + new TestingResourceAllocatorBuilder() .setAllocateResourceFunction(ignored -> false) - .setNotEnoughResourcesConsumer( + .build(); + + ResourceEventListener resourceEventListener = + new TestingResourceEventListenerBuilder() + .setNotEnoughResourceAvailableConsumer( (jobId1, acquiredResources) -> notEnoughResourceNotifications.add( Tuple2.of(jobId1, acquiredResources))) @@ -1204,7 +1209,8 @@ class DeclarativeSlotManagerTest { .buildAndStart( ResourceManagerId.generate(), new ManuallyTriggeredScheduledExecutor(), - resourceManagerActions)) { + resourceAllocator, + resourceEventListener)) { if (withNotificationGracePeriod) { // this should disable notifications @@ -1271,7 +1277,8 @@ class DeclarativeSlotManagerTest { .buildAndStart( ResourceManagerId.generate(), executor, - new TestingResourceActionsBuilder().build())) { + new TestingResourceAllocatorBuilder().build(), + new TestingResourceEventListenerBuilder().build())) { JobID jobId = new JobID(); slotManager.processResourceRequirements(createResourceRequirements(jobId, 1)); @@ -1318,7 +1325,8 @@ class DeclarativeSlotManagerTest { .buildAndStart( ResourceManagerId.generate(), executor, - new TestingResourceActionsBuilder().build())) { + new TestingResourceAllocatorBuilder().build(), + new TestingResourceEventListenerBuilder().build())) { JobID jobId = new JobID(); slotManager.processResourceRequirements(createResourceRequirements(jobId, 1)); @@ -1372,7 +1380,8 @@ class DeclarativeSlotManagerTest { .buildAndStart( ResourceManagerId.generate(), ComponentMainThreadExecutorServiceAdapter.forMainThread(), - new TestingResourceActionsBuilder().build())) { + new TestingResourceAllocatorBuilder().build(), + new TestingResourceEventListenerBuilder().build())) { final TaskExecutorConnection taskExecutionConnection = createTaskExecutorConnection(taskExecutorGateway); @@ -1422,7 +1431,8 @@ class DeclarativeSlotManagerTest { .buildAndStart( ResourceManagerId.generate(), ComponentMainThreadExecutorServiceAdapter.forMainThread(), - new TestingResourceActionsBuilder().build())) { + new TestingResourceAllocatorBuilder().build(), + new TestingResourceEventListenerBuilder().build())) { final JobID jobId = new JobID(); @@ -1461,7 +1471,7 @@ class DeclarativeSlotManagerTest { .setRequirementCheckDelay(delay) .buildAndStartWithDirectExec( ResourceManagerId.generate(), - new TestingResourceActionsBuilder() + new TestingResourceAllocatorBuilder() .setAllocateResourceConsumer( workerResourceSpec -> allocatedResourceCounter.getAndIncrement()) @@ -1502,7 +1512,8 @@ class DeclarativeSlotManagerTest { .buildAndStart( ResourceManagerId.generate(), ComponentMainThreadExecutorServiceAdapter.forMainThread(), - new TestingResourceActionsBuilder().build())) { + new TestingResourceAllocatorBuilder().build(), + new TestingResourceEventListenerBuilder().build())) { final JobID jobId = new JobID(); @@ -1585,19 +1596,19 @@ class DeclarativeSlotManagerTest { } private DeclarativeSlotManager createSlotManager( - ResourceManagerId resourceManagerId, ResourceActions resourceManagerActions) { - return createSlotManager(resourceManagerId, resourceManagerActions, 1); + ResourceManagerId resourceManagerId, ResourceAllocator resourceAllocator) { + return createSlotManager(resourceManagerId, resourceAllocator, 1); } private DeclarativeSlotManager createSlotManager( ResourceManagerId resourceManagerId, - ResourceActions resourceManagerActions, + ResourceAllocator resourceAllocator, int numSlotsPerWorker) { return createDeclarativeSlotManagerBuilder( new ScheduledExecutorServiceAdapter(EXECUTOR_RESOURCE.getExecutor())) .setNumSlotsPerWorker(numSlotsPerWorker) .setRedundantTaskManagerNum(0) - .buildAndStartWithDirectExec(resourceManagerId, resourceManagerActions); + .buildAndStartWithDirectExec(resourceManagerId, resourceAllocator); } private static DeclarativeSlotManagerBuilder createDeclarativeSlotManagerBuilder() { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerDefaultResourceAllocationStrategyITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerDefaultResourceAllocationStrategyITCase.java index d68320761df..73e0355bd31 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerDefaultResourceAllocationStrategyITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerDefaultResourceAllocationStrategyITCase.java @@ -52,7 +52,7 @@ class FineGrainedSlotManagerDefaultResourceAllocationStrategyITCase new Context() { { - resourceActionsBuilder.setAllocateResourceFunction( + resourceAllocatorBuilder.setAllocateResourceFunction( ignored -> { resourceRequests.incrementAndGet(); return true; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTest.java index bfc438c4316..0c85473d13a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTest.java @@ -378,7 +378,7 @@ class FineGrainedSlotManagerTest extends FineGrainedSlotManagerTestBase { allocateResourceFutures.add(new CompletableFuture<>()); new Context() { { - resourceActionsBuilder.setAllocateResourceConsumer( + resourceAllocatorBuilder.setAllocateResourceConsumer( ignored -> { if (allocateResourceFutures.get(0).isDone()) { allocateResourceFutures.get(1).complete(null); @@ -445,7 +445,7 @@ class FineGrainedSlotManagerTest extends FineGrainedSlotManagerTestBase { pendingTaskManager.getPendingTaskManagerId(), DEFAULT_SLOT_RESOURCE_PROFILE) .build())); - resourceActionsBuilder.setAllocateResourceConsumer( + resourceAllocatorBuilder.setAllocateResourceConsumer( ignored -> requestResourceFuture.complete(null)); runTest( () -> { @@ -496,7 +496,7 @@ class FineGrainedSlotManagerTest extends FineGrainedSlotManagerTestBase { final CompletableFuture<Void> notifyNotEnoughResourceFuture = new CompletableFuture<>(); new Context() { { - resourceActionsBuilder.setNotEnoughResourcesConsumer( + resourceEventListenerBuilder.setNotEnoughResourceAvailableConsumer( (jobId1, acquiredResources) -> { notEnoughResourceNotifications.add( Tuple2.of(jobId1, acquiredResources)); @@ -638,7 +638,7 @@ class FineGrainedSlotManagerTest extends FineGrainedSlotManagerTestBase { final InstanceID instanceId = taskExecutionConnection.getInstanceID(); new Context() { { - resourceActionsBuilder.setReleaseResourceConsumer( + resourceAllocatorBuilder.setReleaseResourceConsumer( (instanceID, e) -> releaseResourceFuture.complete(instanceID)); slotManagerConfigurationBuilder.setTaskManagerTimeout(taskManagerTimeout); runTest( @@ -827,7 +827,7 @@ class FineGrainedSlotManagerTest extends FineGrainedSlotManagerTestBase { { maxTotalResourceSetter.accept(slotManagerConfigurationBuilder); - resourceActionsBuilder.setAllocateResourceConsumer( + resourceAllocatorBuilder.setAllocateResourceConsumer( ignored -> { if (allocateResourceFutures.get(0).isDone()) { allocateResourceFutures.get(1).complete(null); @@ -876,7 +876,7 @@ class FineGrainedSlotManagerTest extends FineGrainedSlotManagerTestBase { { maxTotalResourceSetter.accept(slotManagerConfigurationBuilder); - resourceActionsBuilder.setReleaseResourceConsumer( + resourceAllocatorBuilder.setReleaseResourceConsumer( (instanceId, ignore) -> releaseResourceFuture.complete(instanceId)); runTest( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTestBase.java index 049ef97bc0e..f2e8e4f9d76 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTestBase.java @@ -157,8 +157,11 @@ abstract class FineGrainedSlotManagerTestBase { final TestingResourceAllocationStrategy.Builder resourceAllocationStrategyBuilder = TestingResourceAllocationStrategy.newBuilder(); - final TestingResourceActionsBuilder resourceActionsBuilder = - new TestingResourceActionsBuilder(); + final TestingResourceAllocatorBuilder resourceAllocatorBuilder = + new TestingResourceAllocatorBuilder(); + + final TestingResourceEventListenerBuilder resourceEventListenerBuilder = + new TestingResourceEventListenerBuilder(); final SlotManagerConfigurationBuilder slotManagerConfigurationBuilder = SlotManagerConfigurationBuilder.newBuilder(); @@ -221,7 +224,8 @@ abstract class FineGrainedSlotManagerTestBase { slotManager.start( resourceManagerId, mainThreadExecutor, - resourceActionsBuilder.build(), + resourceAllocatorBuilder.build(), + resourceEventListenerBuilder.build(), blockedTaskManagerChecker)); testMethod.run(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManagerBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManagerBuilder.java index 157e894aea1..88e37e1ba16 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManagerBuilder.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManagerBuilder.java @@ -35,7 +35,7 @@ public class TaskExecutorManagerBuilder { private Time taskManagerTimeout = Time.seconds(5); private final ScheduledExecutor scheduledExecutor; private Executor mainThreadExecutor = Executors.directExecutor(); - private ResourceActions newResourceActions = new TestingResourceActionsBuilder().build(); + private ResourceAllocator newResourceAllocator = new TestingResourceAllocatorBuilder().build(); public TaskExecutorManagerBuilder(ScheduledExecutor scheduledExecutor) { this.scheduledExecutor = scheduledExecutor; @@ -78,8 +78,8 @@ public class TaskExecutorManagerBuilder { return this; } - public TaskExecutorManagerBuilder setResourceActions(ResourceActions newResourceActions) { - this.newResourceActions = newResourceActions; + public TaskExecutorManagerBuilder setResourceAllocator(ResourceAllocator newResourceAllocator) { + this.newResourceAllocator = newResourceAllocator; return this; } @@ -93,6 +93,6 @@ public class TaskExecutorManagerBuilder { taskManagerTimeout, scheduledExecutor, mainThreadExecutor, - newResourceActions); + newResourceAllocator); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManagerTest.java index 28593e31f76..ed01b63b4c7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManagerTest.java @@ -133,7 +133,7 @@ public class TaskExecutorManagerTest extends TestLogger { /** * Tests that a task manager timeout does not remove the slots from the SlotManager. A timeout - * should only trigger the {@link ResourceActions#releaseResource(InstanceID, Exception)} + * should only trigger the {@link ResourceAllocator#releaseResource(InstanceID, Exception)} * callback. The receiver of the callback can then decide what to do with the TaskManager. * * <p>See FLINK-7793 @@ -143,8 +143,8 @@ public class TaskExecutorManagerTest extends TestLogger { final Time taskManagerTimeout = Time.milliseconds(10L); final CompletableFuture<InstanceID> releaseResourceFuture = new CompletableFuture<>(); - final ResourceActions resourceActions = - createResourceActionsBuilder() + final ResourceAllocator resourceAllocator = + createResourceAllocatorBuilder() .setReleaseResourceConsumer( (instanceId, ignored) -> releaseResourceFuture.complete(instanceId)) .build(); @@ -154,7 +154,7 @@ public class TaskExecutorManagerTest extends TestLogger { try (final TaskExecutorManager taskExecutorManager = createTaskExecutorManagerBuilder() .setTaskManagerTimeout(taskManagerTimeout) - .setResourceActions(resourceActions) + .setResourceAllocator(resourceAllocator) .setMainThreadExecutor(mainThreadExecutor) .createTaskExecutorManager()) { @@ -195,8 +195,8 @@ public class TaskExecutorManagerTest extends TestLogger { final Time taskManagerTimeout = Time.milliseconds(50L); final CompletableFuture<InstanceID> releaseResourceFuture = new CompletableFuture<>(); - final ResourceActions resourceManagerActions = - new TestingResourceActionsBuilder() + final ResourceAllocator resourceAllocator = + new TestingResourceAllocatorBuilder() .setReleaseResourceConsumer( (instanceID, e) -> releaseResourceFuture.complete(instanceID)) .build(); @@ -207,7 +207,7 @@ public class TaskExecutorManagerTest extends TestLogger { createTaskExecutorManagerBuilder() .setTaskManagerTimeout(taskManagerTimeout) .setDefaultWorkerResourceSpec(workerResourceSpec) - .setResourceActions(resourceManagerActions) + .setResourceAllocator(resourceAllocator) .setMainThreadExecutor(mainThreadExecutor) .createTaskExecutorManager()) { @@ -248,8 +248,8 @@ public class TaskExecutorManagerTest extends TestLogger { ResourceProfile.newBuilder().setCpuCores(numCoresPerWorker + 1).build(); final AtomicInteger resourceRequests = new AtomicInteger(0); - ResourceActions resourceActions = - createResourceActionsBuilder() + ResourceAllocator resourceAllocator = + createResourceAllocatorBuilder() .setAllocateResourceFunction( ignored -> { resourceRequests.incrementAndGet(); @@ -262,7 +262,7 @@ public class TaskExecutorManagerTest extends TestLogger { .setDefaultWorkerResourceSpec(workerResourceSpec) .setNumSlotsPerWorker(1) .setMaxNumSlots(1) - .setResourceActions(resourceActions) + .setResourceAllocator(resourceAllocator) .createTaskExecutorManager()) { assertThat( @@ -281,8 +281,8 @@ public class TaskExecutorManagerTest extends TestLogger { final int maxSlotNum = 1; final AtomicInteger resourceRequests = new AtomicInteger(0); - ResourceActions resourceActions = - createResourceActionsBuilder() + ResourceAllocator resourceAllocator = + createResourceAllocatorBuilder() .setAllocateResourceFunction( ignored -> { resourceRequests.incrementAndGet(); @@ -294,7 +294,7 @@ public class TaskExecutorManagerTest extends TestLogger { createTaskExecutorManagerBuilder() .setNumSlotsPerWorker(numberSlots) .setMaxNumSlots(maxSlotNum) - .setResourceActions(resourceActions) + .setResourceAllocator(resourceAllocator) .createTaskExecutorManager()) { assertThat(resourceRequests.get(), is(0)); @@ -317,8 +317,8 @@ public class TaskExecutorManagerTest extends TestLogger { final int maxSlotNum = 1; final CompletableFuture<InstanceID> releasedResourceFuture = new CompletableFuture<>(); - ResourceActions resourceActions = - createResourceActionsBuilder() + ResourceAllocator resourceAllocator = + createResourceAllocatorBuilder() .setReleaseResourceConsumer( (instanceID, e) -> releasedResourceFuture.complete(instanceID)) .build(); @@ -327,7 +327,7 @@ public class TaskExecutorManagerTest extends TestLogger { createTaskExecutorManagerBuilder() .setNumSlotsPerWorker(numberSlots) .setMaxNumSlots(maxSlotNum) - .setResourceActions(resourceActions) + .setResourceAllocator(resourceAllocator) .createTaskExecutorManager()) { createAndRegisterTaskExecutor(taskExecutorManager, 1, ResourceProfile.ANY); @@ -376,11 +376,11 @@ public class TaskExecutorManagerTest extends TestLogger { private static TaskExecutorManagerBuilder createTaskExecutorManagerBuilder() { return new TaskExecutorManagerBuilder( new ScheduledExecutorServiceAdapter(EXECUTOR_RESOURCE.getExecutor())) - .setResourceActions(createResourceActionsBuilder().build()); + .setResourceAllocator(createResourceAllocatorBuilder().build()); } - private static TestingResourceActionsBuilder createResourceActionsBuilder() { - return new TestingResourceActionsBuilder() + private static TestingResourceAllocatorBuilder createResourceAllocatorBuilder() { + return new TestingResourceAllocatorBuilder() // ensures we do something when excess resource are requested .setAllocateResourceFunction(ignored -> true); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceActions.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceAllocator.java similarity index 67% rename from flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceActions.java rename to flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceAllocator.java index a302b2001e7..83207ce41cd 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceActions.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceAllocator.java @@ -18,37 +18,26 @@ package org.apache.flink.runtime.resourcemanager.slotmanager; -import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.instance.InstanceID; import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec; -import org.apache.flink.runtime.slots.ResourceRequirement; import javax.annotation.Nonnull; -import java.util.Collection; import java.util.function.BiConsumer; import java.util.function.Function; -/** Testing implementation of the {@link ResourceActions}. */ -public class TestingResourceActions implements ResourceActions { +/** Testing implementation of the {@link ResourceAllocator}. */ +public class TestingResourceAllocator implements ResourceAllocator { @Nonnull private final BiConsumer<InstanceID, Exception> releaseResourceConsumer; @Nonnull private final Function<WorkerResourceSpec, Boolean> allocateResourceFunction; - @Nonnull - private final BiConsumer<JobID, Collection<ResourceRequirement>> - notifyNotEnoughResourcesConsumer; - - public TestingResourceActions( + public TestingResourceAllocator( @Nonnull BiConsumer<InstanceID, Exception> releaseResourceConsumer, - @Nonnull Function<WorkerResourceSpec, Boolean> allocateResourceFunction, - @Nonnull - BiConsumer<JobID, Collection<ResourceRequirement>> - notifyNotEnoughResourcesConsumer) { + @Nonnull Function<WorkerResourceSpec, Boolean> allocateResourceFunction) { this.releaseResourceConsumer = releaseResourceConsumer; this.allocateResourceFunction = allocateResourceFunction; - this.notifyNotEnoughResourcesConsumer = notifyNotEnoughResourcesConsumer; } @Override @@ -60,10 +49,4 @@ public class TestingResourceActions implements ResourceActions { public boolean allocateResource(WorkerResourceSpec workerResourceSpec) { return allocateResourceFunction.apply(workerResourceSpec); } - - @Override - public void notifyNotEnoughResourcesAvailable( - JobID jobId, Collection<ResourceRequirement> acquiredResources) { - notifyNotEnoughResourcesConsumer.accept(jobId, acquiredResources); - } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceActionsBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceAllocatorBuilder.java similarity index 64% rename from flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceActionsBuilder.java rename to flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceAllocatorBuilder.java index a2c1a07c30a..8dc5abd2a81 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceActionsBuilder.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceAllocatorBuilder.java @@ -18,36 +18,31 @@ package org.apache.flink.runtime.resourcemanager.slotmanager; -import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.instance.InstanceID; import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec; -import org.apache.flink.runtime.slots.ResourceRequirement; -import java.util.Collection; import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Function; -/** Builder for the {@link TestingResourceActions}. */ -public class TestingResourceActionsBuilder { +/** Builder for the {@link TestingResourceAllocator}. */ +public class TestingResourceAllocatorBuilder { private BiConsumer<InstanceID, Exception> releaseResourceConsumer = (ignoredA, ignoredB) -> {}; private Function<WorkerResourceSpec, Boolean> allocateResourceFunction = (ignored) -> true; - private BiConsumer<JobID, Collection<ResourceRequirement>> notifyNotEnoughResourcesConsumer = - (ignoredA, ignoredB) -> {}; - public TestingResourceActionsBuilder setReleaseResourceConsumer( + public TestingResourceAllocatorBuilder setReleaseResourceConsumer( BiConsumer<InstanceID, Exception> releaseResourceConsumer) { this.releaseResourceConsumer = releaseResourceConsumer; return this; } - public TestingResourceActionsBuilder setAllocateResourceFunction( + public TestingResourceAllocatorBuilder setAllocateResourceFunction( Function<WorkerResourceSpec, Boolean> allocateResourceFunction) { this.allocateResourceFunction = allocateResourceFunction; return this; } - public TestingResourceActionsBuilder setAllocateResourceConsumer( + public TestingResourceAllocatorBuilder setAllocateResourceConsumer( Consumer<WorkerResourceSpec> allocateResourceConsumer) { this.allocateResourceFunction = workerRequest -> { @@ -57,16 +52,7 @@ public class TestingResourceActionsBuilder { return this; } - public TestingResourceActionsBuilder setNotEnoughResourcesConsumer( - BiConsumer<JobID, Collection<ResourceRequirement>> notifyNotEnoughResourcesConsumer) { - this.notifyNotEnoughResourcesConsumer = notifyNotEnoughResourcesConsumer; - return this; - } - - public TestingResourceActions build() { - return new TestingResourceActions( - releaseResourceConsumer, - allocateResourceFunction, - notifyNotEnoughResourcesConsumer); + public TestingResourceAllocator build() { + return new TestingResourceAllocator(releaseResourceConsumer, allocateResourceFunction); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceEventListener.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceEventListener.java new file mode 100644 index 00000000000..bff5f30f04a --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceEventListener.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.resourcemanager.slotmanager; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.slots.ResourceRequirement; + +import javax.annotation.Nonnull; + +import java.util.Collection; +import java.util.function.BiConsumer; + +/** Testing implementation of the {@link ResourceEventListener}. */ +public class TestingResourceEventListener implements ResourceEventListener { + + @Nonnull + private final BiConsumer<JobID, Collection<ResourceRequirement>> + notEnoughResourceAvailableConsumer; + + public TestingResourceEventListener( + @Nonnull + BiConsumer<JobID, Collection<ResourceRequirement>> + notEnoughResourceAvailableConsumer) { + this.notEnoughResourceAvailableConsumer = notEnoughResourceAvailableConsumer; + } + + @Override + public void notEnoughResourceAvailable( + JobID jobId, Collection<ResourceRequirement> acquiredResources) { + notEnoughResourceAvailableConsumer.accept(jobId, acquiredResources); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceEventListenerBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceEventListenerBuilder.java new file mode 100644 index 00000000000..5e2e8d884ec --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceEventListenerBuilder.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.resourcemanager.slotmanager; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.slots.ResourceRequirement; + +import java.util.Collection; +import java.util.function.BiConsumer; + +/** Builder for the {@link TestingResourceEventListener}. */ +public class TestingResourceEventListenerBuilder { + private BiConsumer<JobID, Collection<ResourceRequirement>> notEnoughResourceAvailableConsumer = + (ignoredA, ignoredB) -> {}; + + public TestingResourceEventListenerBuilder setNotEnoughResourceAvailableConsumer( + BiConsumer<JobID, Collection<ResourceRequirement>> notEnoughResourceAvailableConsumer) { + this.notEnoughResourceAvailableConsumer = notEnoughResourceAvailableConsumer; + return this; + } + + public TestingResourceEventListener build() { + return new TestingResourceEventListener(notEnoughResourceAvailableConsumer); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingSlotManager.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingSlotManager.java index 4494320d488..bb740f8fb59 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingSlotManager.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingSlotManager.java @@ -114,7 +114,8 @@ public class TestingSlotManager implements SlotManager { public void start( ResourceManagerId newResourceManagerId, Executor newMainThreadExecutor, - ResourceActions newResourceActions, + ResourceAllocator newResourceAllocator, + ResourceEventListener resourceEventListener, BlockedTaskManagerChecker newBlockedTaskManagerChecker) {} @Override