[FLINK-5810] [flip-6] Use single timeout task for SlotManager
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d75ec5b3 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d75ec5b3 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d75ec5b3 Branch: refs/heads/master Commit: d75ec5b3551573d4eb1886c8e75dfdf6dc328da1 Parents: d16a5a2 Author: Till Rohrmann <trohrm...@apache.org> Authored: Thu Apr 27 17:46:29 2017 +0200 Committer: Till Rohrmann <trohrm...@apache.org> Committed: Fri Apr 28 15:28:26 2017 +0200 ---------------------------------------------------------------------- .../slotmanager/PendingSlotRequest.java | 35 +-- .../slotmanager/SlotManager.java | 219 ++++++++------ .../slotmanager/TaskManagerRegistration.java | 38 +-- .../clusterframework/ResourceManagerTest.java | 8 +- .../slotmanager/SlotManagerTest.java | 291 +++++++++---------- .../slotmanager/SlotProtocolTest.java | 2 - .../src/test/resources/log4j-test.properties | 2 +- .../runtime/testingUtils/TestingUtils.scala | 4 +- 8 files changed, 295 insertions(+), 304 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/d75ec5b3/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/PendingSlotRequest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/PendingSlotRequest.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/PendingSlotRequest.java index 1195791..ffe1bfc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/PendingSlotRequest.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/PendingSlotRequest.java @@ -27,8 +27,6 @@ import org.apache.flink.runtime.resourcemanager.SlotRequest; import org.apache.flink.util.Preconditions; import javax.annotation.Nullable; -import java.util.UUID; -import java.util.concurrent.ScheduledFuture; public class PendingSlotRequest { @@ -37,14 +35,12 @@ public class PendingSlotRequest { @Nullable private CompletableFuture<Acknowledge> requestFuture; - @Nullable - private UUID timeoutIdentifier; - - @Nullable - private ScheduledFuture<?> timeoutFuture; + /** Timestamp when this pending slot request has been created. */ + private final long creationTimestamp; public PendingSlotRequest(SlotRequest slotRequest) { this.slotRequest = Preconditions.checkNotNull(slotRequest); + creationTimestamp = System.currentTimeMillis(); } // ------------------------------------------------------------------------ @@ -57,11 +53,6 @@ public class PendingSlotRequest { return slotRequest.getResourceProfile(); } - @Nullable - public UUID getTimeoutIdentifier() { - return timeoutIdentifier; - } - public JobID getJobId() { return slotRequest.getJobId(); } @@ -70,6 +61,10 @@ public class PendingSlotRequest { return slotRequest.getTargetAddress(); } + public long getCreationTimestamp() { + return creationTimestamp; + } + public boolean isAssigned() { return null != requestFuture; } @@ -82,20 +77,4 @@ public class PendingSlotRequest { public CompletableFuture<Acknowledge> getRequestFuture() { return requestFuture; } - - public void cancelTimeout() { - if (timeoutFuture != null) { - timeoutFuture.cancel(true); - - timeoutIdentifier = null; - timeoutFuture = null; - } - } - - public void registerTimeout(ScheduledFuture<?> newTimeoutFuture, UUID newTimeoutIdentifier) { - cancelTimeout(); - - timeoutFuture = newTimeoutFuture; - timeoutIdentifier = newTimeoutIdentifier; - } } http://git-wip-us.apache.org/repos/asf/flink/blob/d75ec5b3/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java index f09b73a..829a06d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java @@ -64,11 +64,10 @@ import java.util.concurrent.TimeoutException; * {@link ResourceManagerActions#allocateResource(ResourceProfile)}. * * In order to free resources and avoid resource leaks, idling task managers (task managers whose - * slots are currently not used) and not fulfilled pending slot requests time out triggering their - * release and failure, respectively. + * slots are currently not used) and pending slot requests time out triggering their release and + * failure, respectively. */ public class SlotManager implements AutoCloseable { - private static final Logger LOG = LoggerFactory.getLogger(SlotManager.class); /** Scheduled executor for timeouts */ @@ -107,6 +106,10 @@ public class SlotManager implements AutoCloseable { /** Callbacks for resource (de-)allocations */ private ResourceManagerActions resourceManagerActions; + private ScheduledFuture<?> taskManagerTimeoutCheck; + + private ScheduledFuture<?> slotRequestTimeoutCheck; + /** True iff the component has been started */ private boolean started; @@ -128,6 +131,10 @@ public class SlotManager implements AutoCloseable { leaderId = null; resourceManagerActions = null; + mainThreadExecutor = null; + taskManagerTimeoutCheck = null; + slotRequestTimeoutCheck = null; + started = false; } @@ -142,17 +149,52 @@ public class SlotManager implements AutoCloseable { * @param newResourceManagerActions to use for resource (de-)allocations */ public void start(UUID newLeaderId, Executor newMainThreadExecutor, ResourceManagerActions newResourceManagerActions) { + LOG.info("Starting the SlotManager."); + leaderId = Preconditions.checkNotNull(newLeaderId); mainThreadExecutor = Preconditions.checkNotNull(newMainThreadExecutor); resourceManagerActions = Preconditions.checkNotNull(newResourceManagerActions); started = true; + + taskManagerTimeoutCheck = scheduledExecutor.scheduleWithFixedDelay(new Runnable() { + @Override + public void run() { + mainThreadExecutor.execute(new Runnable() { + @Override + public void run() { + checkTaskManagerTimeouts(); + } + }); + } + }, 0L, taskManagerTimeout.toMilliseconds(), TimeUnit.MILLISECONDS); + + slotRequestTimeoutCheck = scheduledExecutor.scheduleWithFixedDelay(new Runnable() { + @Override + public void run() { + mainThreadExecutor.execute(new Runnable() { + @Override + public void run() { + checkSlotRequestTimeouts(); + } + }); + } + }, 0L, slotRequestTimeout.toMilliseconds(), TimeUnit.MILLISECONDS); } /** * Suspends the component. This clears the internal state of the slot manager. */ public void suspend() { + LOG.info("Suspending the SlotManager."); + + // stop the timeout checks for the TaskManagers and the SlotRequests + taskManagerTimeoutCheck.cancel(false); + slotRequestTimeoutCheck.cancel(false); + + taskManagerTimeoutCheck = null; + slotRequestTimeoutCheck = null; + for (PendingSlotRequest pendingSlotRequest : pendingSlotRequests.values()) { cancelPendingSlotRequest(pendingSlotRequest); } @@ -177,6 +219,8 @@ public class SlotManager implements AutoCloseable { */ @Override public void close() throws Exception { + LOG.info("Closing the SlotManager."); + suspend(); } @@ -249,6 +293,8 @@ public class SlotManager implements AutoCloseable { public void registerTaskManager(final TaskExecutorConnection taskExecutorConnection, SlotReport initialSlotReport) { checkInit(); + LOG.info("Register TaskManager {} at the SlotManager.", taskExecutorConnection.getInstanceID()); + // we identify task managers by their instance id if (taskManagerRegistrations.containsKey(taskExecutorConnection.getInstanceID())) { reportSlotStatus(taskExecutorConnection.getInstanceID(), initialSlotReport); @@ -272,8 +318,13 @@ public class SlotManager implements AutoCloseable { taskExecutorConnection); } - if (!anySlotUsed(taskManagerRegistration.getSlots())) { - registerTaskManagerTimeout(taskManagerRegistration); + // determine if the task manager is idle or not + boolean idle = !anySlotUsed(taskManagerRegistration.getSlots()); + + if (idle) { + taskManagerRegistration.markIdle(); + } else { + taskManagerRegistration.markUsed(); } } @@ -292,9 +343,7 @@ public class SlotManager implements AutoCloseable { TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.remove(instanceId); if (null != taskManagerRegistration) { - removeSlots(taskManagerRegistration.getSlots()); - - taskManagerRegistration.cancelTimeout(); + internalUnregisterTaskManager(taskManagerRegistration); return true; } else { @@ -334,8 +383,11 @@ public class SlotManager implements AutoCloseable { } if (idle) { - // no slot of this task manager is being used --> register timer to free this resource - registerTaskManagerTimeout(taskManagerRegistration); + // no slot of this task manager is being used --> mark this task manager to be idle which allows it to + // time out + taskManagerRegistration.markIdle(); + } else { + taskManagerRegistration.markUsed(); } return true; @@ -371,9 +423,14 @@ public class SlotManager implements AutoCloseable { TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(slot.getInstanceId()); - if (null != taskManagerRegistration && !anySlotUsed(taskManagerRegistration.getSlots())) { - registerTaskManagerTimeout(taskManagerRegistration); + if (null != taskManagerRegistration) { + if (anySlotUsed(taskManagerRegistration.getSlots())) { + taskManagerRegistration.markUsed(); + } else { + taskManagerRegistration.markIdle(); + } } + } else { LOG.debug("Received request to free slot {} with expected allocation id {}, " + "but actual allocation id {} differs. Ignoring the request.", slotId, allocationId, slot.getAllocationId()); @@ -524,8 +581,8 @@ public class SlotManager implements AutoCloseable { TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(slot.getInstanceId()); if (null != taskManagerRegistration) { - // disable any registered time out for the task manager - taskManagerRegistration.cancelTimeout(); + // mark this TaskManager to be used to exempt it from timing out + taskManagerRegistration.markUsed(); } } @@ -551,24 +608,6 @@ public class SlotManager implements AutoCloseable { if (taskManagerSlot != null) { allocateSlot(taskManagerSlot, pendingSlotRequest); } else { - final UUID timeoutIdentifier = UUID.randomUUID(); - final AllocationID allocationId = pendingSlotRequest.getAllocationId(); - - // register timeout for slot request - ScheduledFuture<?> timeoutFuture = scheduledExecutor.schedule(new Runnable() { - @Override - public void run() { - mainThreadExecutor.execute(new Runnable() { - @Override - public void run() { - timeoutSlotRequest(allocationId, timeoutIdentifier); - } - }); - } - }, slotRequestTimeout.toMilliseconds(), TimeUnit.MILLISECONDS); - - pendingSlotRequest.registerTimeout(timeoutFuture, timeoutIdentifier); - resourceManagerActions.allocateResource(pendingSlotRequest.getResourceProfile()); } } @@ -591,6 +630,16 @@ public class SlotManager implements AutoCloseable { taskManagerSlot.setAssignedSlotRequest(pendingSlotRequest); pendingSlotRequest.setRequestFuture(completableFuture); + TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(taskManagerSlot.getInstanceId()); + + if (taskManagerRegistration != null) { + // mark the task manager to be used since we have a pending slot request assigned ot one of its slots + taskManagerRegistration.markUsed(); + } else { + throw new IllegalStateException("Could not find a registered task manager for instance id " + + taskManagerSlot.getInstanceId() + '.'); + } + // RPC call to the task manager Future<Acknowledge> requestFuture = gateway.requestSlot( slotId, @@ -717,7 +766,7 @@ public class SlotManager implements AutoCloseable { TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(taskManagerSlot.getInstanceId()); if (null != taskManagerRegistration && !anySlotUsed(taskManagerRegistration.getSlots())) { - registerTaskManagerTimeout(taskManagerRegistration); + taskManagerRegistration.markIdle(); } } else { LOG.debug("There was no slot with {} registered. Probably this slot has been already freed.", slotId); @@ -778,8 +827,6 @@ public class SlotManager implements AutoCloseable { * @param pendingSlotRequest to cancel */ private void cancelPendingSlotRequest(PendingSlotRequest pendingSlotRequest) { - pendingSlotRequest.cancelTimeout(); - CompletableFuture<Acknowledge> request = pendingSlotRequest.getRequestFuture(); if (null != request) { @@ -791,54 +838,50 @@ public class SlotManager implements AutoCloseable { // Internal timeout methods // --------------------------------------------------------------------------------------------- - private void timeoutTaskManager(InstanceID instanceId, UUID timeoutIdentifier) { - TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.remove(instanceId); + private void checkTaskManagerTimeouts() { + if (!taskManagerRegistrations.isEmpty()) { + long currentTime = System.currentTimeMillis(); + + Iterator<Map.Entry<InstanceID, TaskManagerRegistration>> taskManagerRegistrationIterator = taskManagerRegistrations.entrySet().iterator(); + + while (taskManagerRegistrationIterator.hasNext()) { + TaskManagerRegistration taskManagerRegistration = taskManagerRegistrationIterator.next().getValue(); - if (null != taskManagerRegistration) { - if (Objects.equals(timeoutIdentifier, taskManagerRegistration.getTimeoutIdentifier())) { if (anySlotUsed(taskManagerRegistration.getSlots())) { - LOG.debug("Cannot release the task manager with instance id {}, because some " + - "of its slots are still being used.", instanceId); - } else { - unregisterTaskManager(instanceId); + taskManagerRegistration.markUsed(); + } else if (currentTime - taskManagerRegistration.getIdleSince() >= taskManagerTimeout.toMilliseconds()) { + taskManagerRegistrationIterator.remove(); - resourceManagerActions.releaseResource(instanceId); - } - } else { - taskManagerRegistrations.put(instanceId, taskManagerRegistration); + internalUnregisterTaskManager(taskManagerRegistration); - LOG.debug("Expected timeout identifier {} differs from the task manager's " + - "timeout identifier {}. Ignoring the task manager timeout call.", - timeoutIdentifier, taskManagerRegistration.getTimeoutIdentifier()); + resourceManagerActions.releaseResource(taskManagerRegistration.getInstanceId()); + } } - } else { - LOG.debug("Could not find a registered task manager with instance id {}. Ignoring the task manager timeout call.", instanceId); } } - private void timeoutSlotRequest(AllocationID allocationId, UUID timeoutIdentifier) { - PendingSlotRequest pendingSlotRequest = pendingSlotRequests.remove(allocationId); + private void checkSlotRequestTimeouts() { + if (!pendingSlotRequests.isEmpty()) { + long currentTime = System.currentTimeMillis(); - if (null != pendingSlotRequest) { - if (Objects.equals(timeoutIdentifier, pendingSlotRequest.getTimeoutIdentifier())) { - if (!pendingSlotRequest.isAssigned()) { + Iterator<Map.Entry<AllocationID, PendingSlotRequest>> slotRequestIterator = pendingSlotRequests.entrySet().iterator(); + + while (slotRequestIterator.hasNext()) { + PendingSlotRequest slotRequest = slotRequestIterator.next().getValue(); + + if (currentTime - slotRequest.getCreationTimestamp() >= slotRequestTimeout.toMilliseconds()) { + slotRequestIterator.remove(); + + if (slotRequest.isAssigned()) { + cancelPendingSlotRequest(slotRequest); + } resourceManagerActions.notifyAllocationFailure( - pendingSlotRequest.getJobId(), - allocationId, + slotRequest.getJobId(), + slotRequest.getAllocationId(), new TimeoutException("The allocation could not be fulfilled in time.")); - } else { - LOG.debug("Cannot fail pending slot request {} because it has been assigned.", allocationId); } - } else { - pendingSlotRequests.put(allocationId, pendingSlotRequest); - - LOG.debug("Expected timeout identifier {} differs from the pending slot request's " + - "timeout identifier {}. Ignoring the slot request timeout call.", - timeoutIdentifier, pendingSlotRequest.getTimeoutIdentifier()); } - } else { - LOG.debug("Could not find pending slot request with allocation id {}. Ignoring the slot request timeout call.", allocationId); } } @@ -846,6 +889,12 @@ public class SlotManager implements AutoCloseable { // Internal utility methods // --------------------------------------------------------------------------------------------- + private void internalUnregisterTaskManager(TaskManagerRegistration taskManagerRegistration) { + Preconditions.checkNotNull(taskManagerRegistration); + + removeSlots(taskManagerRegistration.getSlots()); + } + private boolean checkDuplicateRequest(AllocationID allocationId) { return pendingSlotRequests.containsKey(allocationId) || fulfilledSlotRequests.containsKey(allocationId); } @@ -853,38 +902,18 @@ public class SlotManager implements AutoCloseable { private boolean anySlotUsed(Iterable<SlotID> slotsToCheck) { if (null != slotsToCheck) { - boolean idle = true; - for (SlotID slotId : slotsToCheck) { TaskManagerSlot taskManagerSlot = slots.get(slotId); if (null != taskManagerSlot) { - idle &= taskManagerSlot.isFree(); + if (taskManagerSlot.isAllocated()) { + return true; + } } } - - return !idle; - } else { - return false; } - } - - private void registerTaskManagerTimeout(final TaskManagerRegistration taskManagerRegistration) { - final UUID timeoutIdentifier = UUID.randomUUID(); - - ScheduledFuture<?> timeoutFuture = scheduledExecutor.schedule(new Runnable() { - @Override - public void run() { - mainThreadExecutor.execute(new Runnable() { - @Override - public void run() { - timeoutTaskManager(taskManagerRegistration.getInstanceId(), timeoutIdentifier); - } - }); - } - }, taskManagerTimeout.toMilliseconds(), TimeUnit.MILLISECONDS); - taskManagerRegistration.registerTimeout(timeoutFuture, timeoutIdentifier); + return false; } private void checkInit() { @@ -911,11 +940,11 @@ public class SlotManager implements AutoCloseable { } @VisibleForTesting - boolean hasTimeoutRegistered(InstanceID instanceId) { + boolean isTaskManagerIdle(InstanceID instanceId) { TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(instanceId); if (null != taskManagerRegistration) { - return taskManagerRegistration.getTimeoutIdentifier() != null; + return taskManagerRegistration.isIdle(); } else { return false; } http://git-wip-us.apache.org/repos/asf/flink/blob/d75ec5b3/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerRegistration.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerRegistration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerRegistration.java index 3a15cb3..7d3764c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerRegistration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerRegistration.java @@ -25,8 +25,6 @@ import org.apache.flink.util.Preconditions; import java.util.Collection; import java.util.HashSet; -import java.util.UUID; -import java.util.concurrent.ScheduledFuture; public class TaskManagerRegistration { @@ -34,9 +32,8 @@ public class TaskManagerRegistration { private final HashSet<SlotID> slots; - private UUID timeoutIdentifier; - - private ScheduledFuture<?> timeoutFuture; + /** Timestamp when the last time becoming idle. Otherwise Long.MAX_VALUE. */ + private long idleSince; public TaskManagerRegistration( TaskExecutorConnection taskManagerConnection, @@ -47,8 +44,7 @@ public class TaskManagerRegistration { this.slots = new HashSet<>(slots); - timeoutIdentifier = null; - timeoutFuture = null; + idleSince = Long.MAX_VALUE; } public TaskExecutorConnection getTaskManagerConnection() { @@ -59,31 +55,27 @@ public class TaskManagerRegistration { return taskManagerConnection.getInstanceID(); } - public UUID getTimeoutIdentifier() { - return timeoutIdentifier; - } - public Iterable<SlotID> getSlots() { return slots; } - public boolean containsSlot(SlotID slotId) { - return slots.contains(slotId); + public long getIdleSince() { + return idleSince; } - public void cancelTimeout() { - if (null != timeoutFuture) { - timeoutFuture.cancel(false); + public boolean isIdle() { + return idleSince != Long.MAX_VALUE; + } - timeoutFuture = null; - timeoutIdentifier = null; - } + public void markIdle() { + idleSince = System.currentTimeMillis(); } - public void registerTimeout(ScheduledFuture<?> newTimeoutFuture, UUID newTimeoutIdentifier) { - cancelTimeout(); + public void markUsed() { + idleSince = Long.MAX_VALUE; + } - timeoutFuture = newTimeoutFuture; - timeoutIdentifier = newTimeoutIdentifier; + public boolean containsSlot(SlotID slotId) { + return slots.contains(slotId); } } http://git-wip-us.apache.org/repos/asf/flink/blob/d75ec5b3/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java index 41c2e16..c740518 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java @@ -488,13 +488,17 @@ public class ResourceManagerTest extends TestLogger { final ScheduledExecutor scheduledExecutor = mock(ScheduledExecutor.class); final HeartbeatServices heartbeatServices = new TestingHeartbeatServices(heartbeatInterval, heartbeatTimeout, scheduledExecutor); - final TestingSlotManagerFactory slotManagerFactory = new TestingSlotManagerFactory(); final MetricRegistry metricRegistry = mock(MetricRegistry.class); final JobLeaderIdService jobLeaderIdService = new JobLeaderIdService( highAvailabilityServices, rpcService.getScheduledExecutor(), Time.minutes(5L)); final TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler(); + final SlotManager slotManager = new SlotManager( + TestingUtils.defaultScheduledExecutor(), + TestingUtils.infiniteTime(), + TestingUtils.infiniteTime(), + TestingUtils.infiniteTime()); try { final StandaloneResourceManager resourceManager = new StandaloneResourceManager( @@ -504,7 +508,7 @@ public class ResourceManagerTest extends TestLogger { resourceManagerConfiguration, highAvailabilityServices, heartbeatServices, - slotManagerFactory, + slotManager, metricRegistry, jobLeaderIdService, testingFatalErrorHandler); http://git-wip-us.apache.org/repos/asf/flink/blob/d75ec5b3/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java index fff2829..39c5f25 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java @@ -25,10 +25,9 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.clusterframework.types.SlotID; import org.apache.flink.runtime.clusterframework.types.TaskManagerSlot; -import org.apache.flink.runtime.concurrent.CompletableFuture; -import org.apache.flink.runtime.concurrent.Executors; -import org.apache.flink.runtime.concurrent.ScheduledExecutor; +import org.apache.flink.runtime.concurrent.*; import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; +import org.apache.flink.runtime.concurrent.impl.FlinkFuture; import org.apache.flink.runtime.instance.InstanceID; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.resourcemanager.SlotRequest; @@ -38,7 +37,6 @@ import org.apache.flink.runtime.taskexecutor.SlotReport; import org.apache.flink.runtime.taskexecutor.SlotStatus; import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; import org.apache.flink.runtime.taskexecutor.exceptions.SlotAllocationException; -import org.apache.flink.runtime.taskexecutor.exceptions.SlotOccupiedException; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.util.TestLogger; import org.junit.Test; @@ -46,9 +44,11 @@ import org.mockito.ArgumentCaptor; import java.util.Arrays; import java.util.UUID; +import java.util.concurrent.Callable; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -648,7 +648,7 @@ public class SlotManagerTest extends TestLogger { */ @Test public void testTaskManagerTimeout() throws Exception { - final long tmTimeout = 50L; + final long tmTimeout = 500L; final ResourceManagerActions resourceManagerActions = mock(ResourceManagerActions.class); final UUID leaderId = UUID.randomUUID(); @@ -661,7 +661,7 @@ public class SlotManagerTest extends TestLogger { final SlotStatus slotStatus = new SlotStatus(slotId, resourceProfile); final SlotReport slotReport = new SlotReport(slotStatus); - final Executor mainThreadExecutor = mock(Executor.class); + final Executor mainThreadExecutor = TestingUtils.defaultExecutor(); try (SlotManager slotManager = new SlotManager( TestingUtils.defaultScheduledExecutor(), @@ -671,24 +671,21 @@ public class SlotManagerTest extends TestLogger { slotManager.start(leaderId, mainThreadExecutor, resourceManagerActions); - slotManager.registerTaskManager(taskManagerConnection, slotReport); - - ArgumentCaptor<Runnable> runnableArgumentCaptor = ArgumentCaptor.forClass(Runnable.class); - - verify(mainThreadExecutor, timeout(tmTimeout * 10L)).execute(runnableArgumentCaptor.capture()); + mainThreadExecutor.execute(new Runnable() { + @Override + public void run() { + slotManager.registerTaskManager(taskManagerConnection, slotReport); + } + }); - // the only runnable being executed by the main thread executor should be the timeout runnable - Runnable timeoutRunnable = runnableArgumentCaptor.getValue(); - - timeoutRunnable.run(); - - verify(resourceManagerActions, times(1)).releaseResource(eq(taskManagerConnection.getInstanceID())); + verify(resourceManagerActions, timeout(100L * tmTimeout).times(1)) + .releaseResource(eq(taskManagerConnection.getInstanceID())); } } /** * Tests that slot requests time out after the specified request timeout. If a slot request - * times out, then the request is cancelled, removed from the slot manager and the resourc + * times out, then the request is cancelled, removed from the slot manager and the resource * manager is notified about the failed allocation. */ @Test @@ -703,7 +700,7 @@ public class SlotManagerTest extends TestLogger { final ResourceProfile resourceProfile = new ResourceProfile(1.0, 1); final SlotRequest slotRequest = new SlotRequest(jobId, allocationId, resourceProfile, "foobar"); - final Executor mainThreadExecutor = mock(Executor.class); + final Executor mainThreadExecutor = TestingUtils.defaultExecutor(); try (SlotManager slotManager = new SlotManager( TestingUtils.defaultScheduledExecutor(), @@ -713,21 +710,27 @@ public class SlotManagerTest extends TestLogger { slotManager.start(leaderId, mainThreadExecutor, resourceManagerActions); - assertTrue(slotManager.registerSlotRequest(slotRequest)); - - ArgumentCaptor<Runnable> runnableArgumentCaptor = ArgumentCaptor.forClass(Runnable.class); - - verify(mainThreadExecutor, timeout(allocationTimeout * 10L)).execute(runnableArgumentCaptor.capture()); - - // the only runnable being executed by the main thread executor should be the timeout runnable - Runnable timeoutRunnable = runnableArgumentCaptor.getValue(); + final AtomicReference<Exception> atomicException = new AtomicReference<>(null); - timeoutRunnable.run(); + mainThreadExecutor.execute(new Runnable() { + @Override + public void run() { + try { + assertTrue(slotManager.registerSlotRequest(slotRequest)); + } catch (Exception e) { + atomicException.compareAndSet(null, e); + } + } + }); - verify(resourceManagerActions, times(1)).notifyAllocationFailure( + verify(resourceManagerActions, timeout(100L * allocationTimeout).times(1)).notifyAllocationFailure( eq(jobId), eq(allocationId), any(TimeoutException.class)); + + if (atomicException.get() != null) { + throw atomicException.get(); + } } } @@ -815,6 +818,7 @@ public class SlotManagerTest extends TestLogger { @Test @SuppressWarnings("unchecked") public void testSlotReportWhileActiveSlotRequest() throws Exception { + final long verifyTimeout = 1000L; final UUID leaderId = UUID.randomUUID(); final ResourceManagerActions resourceManagerActions = mock(ResourceManagerActions.class); @@ -842,20 +846,37 @@ public class SlotManagerTest extends TestLogger { final SlotStatus slotStatus2 = new SlotStatus(slotId2, resourceProfile); final SlotReport slotReport = new SlotReport(Arrays.asList(slotStatus1, slotStatus2)); - // we have to manually trigger the future call backs to simulate the main thread executor behaviour - final Executor mainThreadExecutorMock = mock(Executor.class); + final Executor mainThreadExecutor = TestingUtils.defaultExecutor(); - try (SlotManager slotManager = new SlotManager( + try (final SlotManager slotManager = new SlotManager( TestingUtils.defaultScheduledExecutor(), TestingUtils.infiniteTime(), TestingUtils.infiniteTime(), TestingUtils.infiniteTime())) { - slotManager.start(leaderId, mainThreadExecutorMock, resourceManagerActions); - - slotManager.registerTaskManager(taskManagerConnection, slotReport); + slotManager.start(leaderId, mainThreadExecutor, resourceManagerActions); - slotManager.registerSlotRequest(slotRequest); + Future<Void> registrationFuture = FlinkFuture.supplyAsync(new Callable<Void>() { + @Override + public Void call() throws Exception { + slotManager.registerTaskManager(taskManagerConnection, slotReport); + + return null; + } + }, mainThreadExecutor) + .thenAccept(new AcceptFunction<Void>() { + @Override + public void accept(Void value) { + try { + slotManager.registerSlotRequest(slotRequest); + } catch (SlotManagerException e) { + throw new RuntimeException("Could not register slots.", e); + } + } + }); + + // check that no exception has been thrown + registrationFuture.get(); ArgumentCaptor<SlotID> slotIdCaptor = ArgumentCaptor.forClass(SlotID.class); @@ -867,26 +888,33 @@ public class SlotManagerTest extends TestLogger { eq(leaderId), any(Time.class)); - final SlotID requestedSlotdId = slotIdCaptor.getValue(); - final SlotID freeSlotId = requestedSlotdId.equals(slotId1) ? slotId2 : slotId1; + final SlotID requestedSlotId = slotIdCaptor.getValue(); + final SlotID freeSlotId = requestedSlotId.equals(slotId1) ? slotId2 : slotId1; + + Future<Boolean> freeSlotFuture = FlinkFuture.supplyAsync(new Callable<Boolean>() { + @Override + public Boolean call() throws Exception { + return slotManager.getSlot(freeSlotId).isFree(); + } + }, mainThreadExecutor); - assertTrue(slotManager.getSlot(freeSlotId).isFree()); + assertTrue(freeSlotFuture.get()); final SlotStatus newSlotStatus1 = new SlotStatus(slotIdCaptor.getValue(), resourceProfile, new JobID(), new AllocationID()); final SlotStatus newSlotStatus2 = new SlotStatus(freeSlotId, resourceProfile); final SlotReport newSlotReport = new SlotReport(Arrays.asList(newSlotStatus1, newSlotStatus2)); - // this should update the slot with the pending slot request triggering the reassignment of it - slotManager.reportSlotStatus(taskManagerConnection.getInstanceID(), newSlotReport); + FlinkFuture.supplyAsync(new Callable<Void>() { + @Override + public Void call() throws Exception { + // this should update the slot with the pending slot request triggering the reassignment of it + slotManager.reportSlotStatus(taskManagerConnection.getInstanceID(), newSlotReport); - ArgumentCaptor<Runnable> runnableArgumentCaptor = ArgumentCaptor.forClass(Runnable.class); - verify(mainThreadExecutorMock).execute(runnableArgumentCaptor.capture()); + return null; + } + }, mainThreadExecutor); - Runnable requestFailureRunnable = runnableArgumentCaptor.getValue(); - - requestFailureRunnable.run(); - - verify(taskExecutorGateway, times(2)).requestSlot( + verify(taskExecutorGateway, timeout(verifyTimeout).times(2)).requestSlot( slotIdCaptor.capture(), eq(jobId), eq(allocationId), @@ -894,16 +922,18 @@ public class SlotManagerTest extends TestLogger { eq(leaderId), any(Time.class)); - verify(mainThreadExecutorMock, times(2)).execute(runnableArgumentCaptor.capture()); - Runnable requestSuccessRunnable = runnableArgumentCaptor.getValue(); + final SlotID requestedSlotId2 = slotIdCaptor.getValue(); - requestSuccessRunnable.run(); - - final SlotID requestedSlotId = slotIdCaptor.getValue(); + assertEquals(slotId2, requestedSlotId2); - assertEquals(slotId2, requestedSlotId); + Future<TaskManagerSlot> requestedSlotFuture = FlinkFuture.supplyAsync(new Callable<TaskManagerSlot>() { + @Override + public TaskManagerSlot call() throws Exception { + return slotManager.getSlot(requestedSlotId2); + } + }, mainThreadExecutor); - TaskManagerSlot slot = slotManager.getSlot(requestedSlotId); + TaskManagerSlot slot = requestedSlotFuture.get(); assertTrue(slot.isAllocated()); assertEquals(allocationId, slot.getAllocationId()); @@ -916,11 +946,12 @@ public class SlotManagerTest extends TestLogger { */ @Test public void testTimeoutForUnusedTaskManager() throws Exception { - final long taskManagerTimeout = 123456L; + final long taskManagerTimeout = 50L; + final long verifyTimeout = taskManagerTimeout * 10L; final UUID leaderId = UUID.randomUUID(); final ResourceManagerActions resourceManagerActions = mock(ResourceManagerActions.class); - final ScheduledExecutor scheduledExecutor = mock(ScheduledExecutor.class); + final ScheduledExecutor scheduledExecutor = TestingUtils.defaultScheduledExecutor(); final ResourceID resourceId = ResourceID.generate(); @@ -946,21 +977,34 @@ public class SlotManagerTest extends TestLogger { final SlotStatus slotStatus2 = new SlotStatus(slotId2, resourceProfile); final SlotReport initialSlotReport = new SlotReport(Arrays.asList(slotStatus1, slotStatus2)); - try (SlotManager slotManager = new SlotManager( + final Executor mainThreadExecutor = TestingUtils.defaultExecutor(); + + try (final SlotManager slotManager = new SlotManager( scheduledExecutor, TestingUtils.infiniteTime(), TestingUtils.infiniteTime(), Time.of(taskManagerTimeout, TimeUnit.MILLISECONDS))) { - slotManager.start(leaderId, Executors.directExecutor(), resourceManagerActions); - - slotManager.registerSlotRequest(slotRequest); + slotManager.start(leaderId, mainThreadExecutor, resourceManagerActions); - slotManager.registerTaskManager(taskManagerConnection, initialSlotReport); + FlinkFuture.supplyAsync(new Callable<Void>() { + @Override + public Void call() throws Exception { + slotManager.registerSlotRequest(slotRequest); + + return null; + } + }, mainThreadExecutor) + .thenAccept(new AcceptFunction<Void>() { + @Override + public void accept(Void value) { + slotManager.registerTaskManager(taskManagerConnection, initialSlotReport); + } + }); ArgumentCaptor<SlotID> slotIdArgumentCaptor = ArgumentCaptor.forClass(SlotID.class); - verify(taskExecutorGateway).requestSlot( + verify(taskExecutorGateway, timeout(verifyTimeout)).requestSlot( slotIdArgumentCaptor.capture(), eq(jobId), eq(allocationId), @@ -968,103 +1012,48 @@ public class SlotManagerTest extends TestLogger { eq(leaderId), any(Time.class)); - assertFalse(slotManager.hasTimeoutRegistered(taskManagerConnection.getInstanceID())); - - SlotID slotId = slotIdArgumentCaptor.getValue(); - TaskManagerSlot slot = slotManager.getSlot(slotId); - - assertTrue(slot.isAllocated()); - assertEquals(allocationId, slot.getAllocationId()); - - slotManager.freeSlot(slotId, allocationId); + Future<Boolean> idleFuture = FlinkFuture.supplyAsync(new Callable<Boolean>() { + @Override + public Boolean call() throws Exception { + return slotManager.isTaskManagerIdle(taskManagerConnection.getInstanceID()); + } + }, mainThreadExecutor); - assertTrue(slotManager.hasTimeoutRegistered(taskManagerConnection.getInstanceID())); + // check that the TaskManaer is not idle + assertFalse(idleFuture.get()); - ArgumentCaptor<Runnable> runnableArgumentCaptor = ArgumentCaptor.forClass(Runnable.class); + final SlotID slotId = slotIdArgumentCaptor.getValue(); - // filter out the schedule call for the task manager which will be registered using the - // taskManagerTimeout value - verify(scheduledExecutor).schedule(runnableArgumentCaptor.capture(), eq(taskManagerTimeout), eq(TimeUnit.MILLISECONDS)); + Future<TaskManagerSlot> slotFuture = FlinkFuture.supplyAsync(new Callable<TaskManagerSlot>() { + @Override + public TaskManagerSlot call() throws Exception { + return slotManager.getSlot(slotId); + } + }, mainThreadExecutor); - Runnable timeoutRunnable = runnableArgumentCaptor.getValue(); + TaskManagerSlot slot = slotFuture.get(); - timeoutRunnable.run(); - - verify(resourceManagerActions, times(1)).releaseResource(eq(taskManagerConnection.getInstanceID())); - } - } - - /** - * Tests that the slot manager re-registers a timeout for a rejected slot request. - */ - @Test - public void testTimeoutForRejectedSlotRequest() throws Exception { - - final long slotRequestTimeout = 1337L; - final ScheduledExecutor scheduledExecutor = mock(ScheduledExecutor.class); - - final ResourceID resourceId = ResourceID.generate(); - final SlotID slotId = new SlotID(resourceId, 0); - final ResourceProfile resourceProfile = new ResourceProfile(1.0, 1); - final SlotStatus slotStatus = new SlotStatus(slotId, resourceProfile); - final SlotReport slotReport = new SlotReport(slotStatus); - - final UUID leaderId = UUID.randomUUID(); - final ResourceManagerActions resourceManagerActions = mock(ResourceManagerActions.class); - - final JobID jobId = new JobID(); - final AllocationID allocationId = new AllocationID(); - final AllocationID allocationId2 = new AllocationID(); - final SlotRequest slotRequest = new SlotRequest(jobId, allocationId, resourceProfile, "foobar"); - - CompletableFuture<Acknowledge> requestFuture = new FlinkCompletableFuture<>(); - - final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class); - when(taskExecutorGateway.requestSlot( - eq(slotId), - eq(jobId), - eq(allocationId), - anyString(), - eq(leaderId), - any(Time.class))).thenReturn(requestFuture); - - final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway); - - try (SlotManager slotManager = new SlotManager( - scheduledExecutor, - TestingUtils.infiniteTime(), - Time.milliseconds(slotRequestTimeout), - TestingUtils.infiniteTime())) { - - slotManager.start(leaderId, Executors.directExecutor(), resourceManagerActions); - - slotManager.registerTaskManager(taskManagerConnection, slotReport); - - slotManager.registerSlotRequest(slotRequest); - - verify(taskExecutorGateway).requestSlot( - eq(slotId), - eq(jobId), - eq(allocationId), - anyString(), - eq(leaderId), - any(Time.class)); - - requestFuture.completeExceptionally(new SlotOccupiedException("Slot is already occupied", allocationId2)); - - ArgumentCaptor<Runnable> runnableArgumentCaptor = ArgumentCaptor.forClass(Runnable.class); - verify(scheduledExecutor).schedule(runnableArgumentCaptor.capture(), eq(slotRequestTimeout), eq(TimeUnit.MILLISECONDS)); + assertTrue(slot.isAllocated()); + assertEquals(allocationId, slot.getAllocationId()); - Runnable timeoutRunnable = runnableArgumentCaptor.getValue(); + Future<Boolean> idleFuture2 = FlinkFuture.supplyAsync(new Callable<Void>() { + @Override + public Void call() throws Exception { + slotManager.freeSlot(slotId, allocationId); - timeoutRunnable.run(); + return null; + } + }, mainThreadExecutor) + .thenApply(new ApplyFunction<Void, Boolean>() { + @Override + public Boolean apply(Void value) { + return slotManager.isTaskManagerIdle(taskManagerConnection.getInstanceID()); + } + }); - verify(resourceManagerActions).notifyAllocationFailure(eq(jobId), eq(allocationId), any(Exception.class)); + assertTrue(idleFuture2.get()); - TaskManagerSlot slot = slotManager.getSlot(slotId); - - assertTrue(slot.isAllocated()); - assertEquals(allocationId2, slot.getAllocationId()); + verify(resourceManagerActions, timeout(verifyTimeout).times(1)).releaseResource(eq(taskManagerConnection.getInstanceID())); } } http://git-wip-us.apache.org/repos/asf/flink/blob/d75ec5b3/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java index c09316c..a1ab1ab 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java @@ -76,7 +76,6 @@ public class SlotProtocolTest extends TestLogger { @Test public void testSlotsUnavailableRequest() throws Exception { final JobID jobID = new JobID(); - final ResourceID jmResourceId = new ResourceID(jmAddress); final UUID rmLeaderID = UUID.randomUUID(); @@ -133,7 +132,6 @@ public class SlotProtocolTest extends TestLogger { @Test public void testSlotAvailableRequest() throws Exception { final JobID jobID = new JobID(); - final ResourceID jmResourceId = new ResourceID(jmAddress); final UUID rmLeaderID = UUID.randomUUID(); http://git-wip-us.apache.org/repos/asf/flink/blob/d75ec5b3/flink-runtime/src/test/resources/log4j-test.properties ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/resources/log4j-test.properties b/flink-runtime/src/test/resources/log4j-test.properties index 7ba1633..812a256 100644 --- a/flink-runtime/src/test/resources/log4j-test.properties +++ b/flink-runtime/src/test/resources/log4j-test.properties @@ -16,7 +16,7 @@ # limitations under the License. ################################################################################ -log4j.rootLogger=OFF, console +log4j.rootLogger=DEBUG, console # ----------------------------------------------------------------------------- # Console (use 'console') http://git-wip-us.apache.org/repos/asf/flink/blob/d75ec5b3/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala index 03b5172..876e26b 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala @@ -82,7 +82,7 @@ object TestingUtils { def getDefaultTestingActorSystemConfig = testConfig def infiniteTime: Time = { - Time.milliseconds(Long.MaxValue); + Time.milliseconds(Integer.MAX_VALUE); } @@ -113,7 +113,7 @@ object TestingUtils { def defaultExecutor: ScheduledExecutorService = { synchronized { if (sharedExecutorInstance == null || sharedExecutorInstance.isShutdown) { - sharedExecutorInstance = Executors.newSingleThreadScheduledExecutor() + sharedExecutorInstance = Executors.newSingleThreadScheduledExecutor(); } sharedExecutorInstance