[FLINK-5810] [flip-6] Use single timeout task for SlotManager

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d75ec5b3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d75ec5b3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d75ec5b3

Branch: refs/heads/table-retraction
Commit: d75ec5b3551573d4eb1886c8e75dfdf6dc328da1
Parents: d16a5a2
Author: Till Rohrmann <trohrm...@apache.org>
Authored: Thu Apr 27 17:46:29 2017 +0200
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Fri Apr 28 15:28:26 2017 +0200

----------------------------------------------------------------------
 .../slotmanager/PendingSlotRequest.java         |  35 +--
 .../slotmanager/SlotManager.java                | 219 ++++++++------
 .../slotmanager/TaskManagerRegistration.java    |  38 +--
 .../clusterframework/ResourceManagerTest.java   |   8 +-
 .../slotmanager/SlotManagerTest.java            | 291 +++++++++----------
 .../slotmanager/SlotProtocolTest.java           |   2 -
 .../src/test/resources/log4j-test.properties    |   2 +-
 .../runtime/testingUtils/TestingUtils.scala     |   4 +-
 8 files changed, 295 insertions(+), 304 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d75ec5b3/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/PendingSlotRequest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/PendingSlotRequest.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/PendingSlotRequest.java
index 1195791..ffe1bfc 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/PendingSlotRequest.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/PendingSlotRequest.java
@@ -27,8 +27,6 @@ import org.apache.flink.runtime.resourcemanager.SlotRequest;
 import org.apache.flink.util.Preconditions;
 
 import javax.annotation.Nullable;
-import java.util.UUID;
-import java.util.concurrent.ScheduledFuture;
 
 public class PendingSlotRequest {
 
@@ -37,14 +35,12 @@ public class PendingSlotRequest {
        @Nullable
        private CompletableFuture<Acknowledge> requestFuture;
 
-       @Nullable
-       private UUID timeoutIdentifier;
-
-       @Nullable
-       private ScheduledFuture<?> timeoutFuture;
+       /** Timestamp when this pending slot request has been created. */
+       private final long creationTimestamp;
 
        public PendingSlotRequest(SlotRequest slotRequest) {
                this.slotRequest = Preconditions.checkNotNull(slotRequest);
+               creationTimestamp = System.currentTimeMillis();
        }
 
        // 
------------------------------------------------------------------------
@@ -57,11 +53,6 @@ public class PendingSlotRequest {
                return slotRequest.getResourceProfile();
        }
 
-       @Nullable
-       public UUID getTimeoutIdentifier() {
-               return timeoutIdentifier;
-       }
-
        public JobID getJobId() {
                return slotRequest.getJobId();
        }
@@ -70,6 +61,10 @@ public class PendingSlotRequest {
                return slotRequest.getTargetAddress();
        }
 
+       public long getCreationTimestamp() {
+               return creationTimestamp;
+       }
+
        public boolean isAssigned() {
                return null != requestFuture;
        }
@@ -82,20 +77,4 @@ public class PendingSlotRequest {
        public CompletableFuture<Acknowledge> getRequestFuture() {
                return requestFuture;
        }
-
-       public void cancelTimeout() {
-               if (timeoutFuture != null) {
-                       timeoutFuture.cancel(true);
-
-                       timeoutIdentifier = null;
-                       timeoutFuture = null;
-               }
-       }
-
-       public void registerTimeout(ScheduledFuture<?> newTimeoutFuture, UUID 
newTimeoutIdentifier) {
-               cancelTimeout();
-
-               timeoutFuture = newTimeoutFuture;
-               timeoutIdentifier = newTimeoutIdentifier;
-       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d75ec5b3/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
index f09b73a..829a06d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
@@ -64,11 +64,10 @@ import java.util.concurrent.TimeoutException;
  * {@link ResourceManagerActions#allocateResource(ResourceProfile)}.
  *
  * In order to free resources and avoid resource leaks, idling task managers 
(task managers whose
- * slots are currently not used) and not fulfilled pending slot requests time 
out triggering their
- * release and failure, respectively.
+ * slots are currently not used) and pending slot requests time out triggering 
their release and
+ * failure, respectively.
  */
 public class SlotManager implements AutoCloseable {
-
        private static final Logger LOG = 
LoggerFactory.getLogger(SlotManager.class);
 
        /** Scheduled executor for timeouts */
@@ -107,6 +106,10 @@ public class SlotManager implements AutoCloseable {
        /** Callbacks for resource (de-)allocations */
        private ResourceManagerActions resourceManagerActions;
 
+       private ScheduledFuture<?> taskManagerTimeoutCheck;
+
+       private ScheduledFuture<?> slotRequestTimeoutCheck;
+
        /** True iff the component has been started */
        private boolean started;
 
@@ -128,6 +131,10 @@ public class SlotManager implements AutoCloseable {
 
                leaderId = null;
                resourceManagerActions = null;
+               mainThreadExecutor = null;
+               taskManagerTimeoutCheck = null;
+               slotRequestTimeoutCheck = null;
+
                started = false;
        }
 
@@ -142,17 +149,52 @@ public class SlotManager implements AutoCloseable {
         * @param newResourceManagerActions to use for resource (de-)allocations
         */
        public void start(UUID newLeaderId, Executor newMainThreadExecutor, 
ResourceManagerActions newResourceManagerActions) {
+               LOG.info("Starting the SlotManager.");
+
                leaderId = Preconditions.checkNotNull(newLeaderId);
                mainThreadExecutor = 
Preconditions.checkNotNull(newMainThreadExecutor);
                resourceManagerActions = 
Preconditions.checkNotNull(newResourceManagerActions);
 
                started = true;
+
+               taskManagerTimeoutCheck = 
scheduledExecutor.scheduleWithFixedDelay(new Runnable() {
+                       @Override
+                       public void run() {
+                               mainThreadExecutor.execute(new Runnable() {
+                                       @Override
+                                       public void run() {
+                                               checkTaskManagerTimeouts();
+                                       }
+                               });
+                       }
+               }, 0L, taskManagerTimeout.toMilliseconds(), 
TimeUnit.MILLISECONDS);
+
+               slotRequestTimeoutCheck = 
scheduledExecutor.scheduleWithFixedDelay(new Runnable() {
+                       @Override
+                       public void run() {
+                               mainThreadExecutor.execute(new Runnable() {
+                                       @Override
+                                       public void run() {
+                                               checkSlotRequestTimeouts();
+                                       }
+                               });
+                       }
+               }, 0L, slotRequestTimeout.toMilliseconds(), 
TimeUnit.MILLISECONDS);
        }
 
        /**
         * Suspends the component. This clears the internal state of the slot 
manager.
         */
        public void suspend() {
+               LOG.info("Suspending the SlotManager.");
+
+               // stop the timeout checks for the TaskManagers and the 
SlotRequests
+               taskManagerTimeoutCheck.cancel(false);
+               slotRequestTimeoutCheck.cancel(false);
+
+               taskManagerTimeoutCheck = null;
+               slotRequestTimeoutCheck = null;
+
                for (PendingSlotRequest pendingSlotRequest : 
pendingSlotRequests.values()) {
                        cancelPendingSlotRequest(pendingSlotRequest);
                }
@@ -177,6 +219,8 @@ public class SlotManager implements AutoCloseable {
         */
        @Override
        public void close() throws Exception {
+               LOG.info("Closing the SlotManager.");
+
                suspend();
        }
 
@@ -249,6 +293,8 @@ public class SlotManager implements AutoCloseable {
        public void registerTaskManager(final TaskExecutorConnection 
taskExecutorConnection, SlotReport initialSlotReport) {
                checkInit();
 
+               LOG.info("Register TaskManager {} at the SlotManager.", 
taskExecutorConnection.getInstanceID());
+
                // we identify task managers by their instance id
                if 
(taskManagerRegistrations.containsKey(taskExecutorConnection.getInstanceID())) {
                        
reportSlotStatus(taskExecutorConnection.getInstanceID(), initialSlotReport);
@@ -272,8 +318,13 @@ public class SlotManager implements AutoCloseable {
                                        taskExecutorConnection);
                        }
 
-                       if (!anySlotUsed(taskManagerRegistration.getSlots())) {
-                               
registerTaskManagerTimeout(taskManagerRegistration);
+                       // determine if the task manager is idle or not
+                       boolean idle = 
!anySlotUsed(taskManagerRegistration.getSlots());
+
+                       if (idle) {
+                               taskManagerRegistration.markIdle();
+                       } else {
+                               taskManagerRegistration.markUsed();
                        }
                }
 
@@ -292,9 +343,7 @@ public class SlotManager implements AutoCloseable {
                TaskManagerRegistration taskManagerRegistration = 
taskManagerRegistrations.remove(instanceId);
 
                if (null != taskManagerRegistration) {
-                       removeSlots(taskManagerRegistration.getSlots());
-
-                       taskManagerRegistration.cancelTimeout();
+                       internalUnregisterTaskManager(taskManagerRegistration);
 
                        return true;
                } else {
@@ -334,8 +383,11 @@ public class SlotManager implements AutoCloseable {
                        }
 
                        if (idle) {
-                               // no slot of this task manager is being used 
--> register timer to free this resource
-                               
registerTaskManagerTimeout(taskManagerRegistration);
+                               // no slot of this task manager is being used 
--> mark this task manager to be idle which allows it to
+                               // time out
+                               taskManagerRegistration.markIdle();
+                       } else {
+                               taskManagerRegistration.markUsed();
                        }
 
                        return true;
@@ -371,9 +423,14 @@ public class SlotManager implements AutoCloseable {
 
                                        TaskManagerRegistration 
taskManagerRegistration = taskManagerRegistrations.get(slot.getInstanceId());
 
-                                       if (null != taskManagerRegistration && 
!anySlotUsed(taskManagerRegistration.getSlots())) {
-                                               
registerTaskManagerTimeout(taskManagerRegistration);
+                                       if (null != taskManagerRegistration) {
+                                               if 
(anySlotUsed(taskManagerRegistration.getSlots())) {
+                                                       
taskManagerRegistration.markUsed();
+                                               } else {
+                                                       
taskManagerRegistration.markIdle();
+                                               }
                                        }
+
                                } else {
                                        LOG.debug("Received request to free 
slot {} with expected allocation id {}, " +
                                                "but actual allocation id {} 
differs. Ignoring the request.", slotId, allocationId, slot.getAllocationId());
@@ -524,8 +581,8 @@ public class SlotManager implements AutoCloseable {
                                TaskManagerRegistration taskManagerRegistration 
= taskManagerRegistrations.get(slot.getInstanceId());
 
                                if (null != taskManagerRegistration) {
-                                       // disable any registered time out for 
the task manager
-                                       taskManagerRegistration.cancelTimeout();
+                                       // mark this TaskManager to be used to 
exempt it from timing out
+                                       taskManagerRegistration.markUsed();
                                }
                        }
 
@@ -551,24 +608,6 @@ public class SlotManager implements AutoCloseable {
                if (taskManagerSlot != null) {
                        allocateSlot(taskManagerSlot, pendingSlotRequest);
                } else {
-                       final UUID timeoutIdentifier = UUID.randomUUID();
-                       final AllocationID allocationId = 
pendingSlotRequest.getAllocationId();
-
-                       // register timeout for slot request
-                       ScheduledFuture<?> timeoutFuture = 
scheduledExecutor.schedule(new Runnable() {
-                               @Override
-                               public void run() {
-                                       mainThreadExecutor.execute(new 
Runnable() {
-                                               @Override
-                                               public void run() {
-                                                       
timeoutSlotRequest(allocationId, timeoutIdentifier);
-                                               }
-                                       });
-                               }
-                       }, slotRequestTimeout.toMilliseconds(), 
TimeUnit.MILLISECONDS);
-
-                       pendingSlotRequest.registerTimeout(timeoutFuture, 
timeoutIdentifier);
-
                        
resourceManagerActions.allocateResource(pendingSlotRequest.getResourceProfile());
                }
        }
@@ -591,6 +630,16 @@ public class SlotManager implements AutoCloseable {
                taskManagerSlot.setAssignedSlotRequest(pendingSlotRequest);
                pendingSlotRequest.setRequestFuture(completableFuture);
 
+               TaskManagerRegistration taskManagerRegistration = 
taskManagerRegistrations.get(taskManagerSlot.getInstanceId());
+
+               if (taskManagerRegistration != null) {
+                       // mark the task manager to be used since we have a 
pending slot request assigned ot one of its slots
+                       taskManagerRegistration.markUsed();
+               } else {
+                       throw new IllegalStateException("Could not find a 
registered task manager for instance id " +
+                               taskManagerSlot.getInstanceId() + '.');
+               }
+
                // RPC call to the task manager
                Future<Acknowledge> requestFuture = gateway.requestSlot(
                        slotId,
@@ -717,7 +766,7 @@ public class SlotManager implements AutoCloseable {
                        TaskManagerRegistration taskManagerRegistration = 
taskManagerRegistrations.get(taskManagerSlot.getInstanceId());
 
                        if (null != taskManagerRegistration && 
!anySlotUsed(taskManagerRegistration.getSlots())) {
-                               
registerTaskManagerTimeout(taskManagerRegistration);
+                               taskManagerRegistration.markIdle();
                        }
                } else {
                        LOG.debug("There was no slot with {} registered. 
Probably this slot has been already freed.", slotId);
@@ -778,8 +827,6 @@ public class SlotManager implements AutoCloseable {
         * @param pendingSlotRequest to cancel
         */
        private void cancelPendingSlotRequest(PendingSlotRequest 
pendingSlotRequest) {
-               pendingSlotRequest.cancelTimeout();
-
                CompletableFuture<Acknowledge> request = 
pendingSlotRequest.getRequestFuture();
 
                if (null != request) {
@@ -791,54 +838,50 @@ public class SlotManager implements AutoCloseable {
        // Internal timeout methods
        // 
---------------------------------------------------------------------------------------------
 
-       private void timeoutTaskManager(InstanceID instanceId, UUID 
timeoutIdentifier) {
-               TaskManagerRegistration taskManagerRegistration = 
taskManagerRegistrations.remove(instanceId);
+       private void checkTaskManagerTimeouts() {
+               if (!taskManagerRegistrations.isEmpty()) {
+                       long currentTime = System.currentTimeMillis();
+
+                       Iterator<Map.Entry<InstanceID, 
TaskManagerRegistration>> taskManagerRegistrationIterator = 
taskManagerRegistrations.entrySet().iterator();
+
+                       while (taskManagerRegistrationIterator.hasNext()) {
+                               TaskManagerRegistration taskManagerRegistration 
= taskManagerRegistrationIterator.next().getValue();
 
-               if (null != taskManagerRegistration) {
-                       if (Objects.equals(timeoutIdentifier, 
taskManagerRegistration.getTimeoutIdentifier())) {
                                if 
(anySlotUsed(taskManagerRegistration.getSlots())) {
-                                       LOG.debug("Cannot release the task 
manager with instance id {}, because some " +
-                                               "of its slots are still being 
used.", instanceId);
-                               } else {
-                                       unregisterTaskManager(instanceId);
+                                       taskManagerRegistration.markUsed();
+                               } else if (currentTime - 
taskManagerRegistration.getIdleSince() >= taskManagerTimeout.toMilliseconds()) {
+                                       
taskManagerRegistrationIterator.remove();
 
-                                       
resourceManagerActions.releaseResource(instanceId);
-                               }
-                       } else {
-                               taskManagerRegistrations.put(instanceId, 
taskManagerRegistration);
+                                       
internalUnregisterTaskManager(taskManagerRegistration);
 
-                               LOG.debug("Expected timeout identifier {} 
differs from the task manager's " +
-                                       "timeout identifier {}. Ignoring the 
task manager timeout call.",
-                                       timeoutIdentifier, 
taskManagerRegistration.getTimeoutIdentifier());
+                                       
resourceManagerActions.releaseResource(taskManagerRegistration.getInstanceId());
+                               }
                        }
-               } else {
-                       LOG.debug("Could not find a registered task manager 
with instance id {}. Ignoring the task manager timeout call.", instanceId);
                }
        }
 
-       private void timeoutSlotRequest(AllocationID allocationId, UUID 
timeoutIdentifier) {
-               PendingSlotRequest pendingSlotRequest = 
pendingSlotRequests.remove(allocationId);
+       private void checkSlotRequestTimeouts() {
+               if (!pendingSlotRequests.isEmpty()) {
+                       long currentTime = System.currentTimeMillis();
 
-               if (null != pendingSlotRequest) {
-                       if (Objects.equals(timeoutIdentifier, 
pendingSlotRequest.getTimeoutIdentifier())) {
-                               if (!pendingSlotRequest.isAssigned()) {
+                       Iterator<Map.Entry<AllocationID, PendingSlotRequest>> 
slotRequestIterator = pendingSlotRequests.entrySet().iterator();
+
+                       while (slotRequestIterator.hasNext()) {
+                               PendingSlotRequest slotRequest = 
slotRequestIterator.next().getValue();
+
+                               if (currentTime - 
slotRequest.getCreationTimestamp() >= slotRequestTimeout.toMilliseconds()) {
+                                       slotRequestIterator.remove();
+
+                                       if (slotRequest.isAssigned()) {
+                                               
cancelPendingSlotRequest(slotRequest);
+                                       }
 
                                        
resourceManagerActions.notifyAllocationFailure(
-                                               pendingSlotRequest.getJobId(),
-                                               allocationId,
+                                               slotRequest.getJobId(),
+                                               slotRequest.getAllocationId(),
                                                new TimeoutException("The 
allocation could not be fulfilled in time."));
-                               } else {
-                                       LOG.debug("Cannot fail pending slot 
request {} because it has been assigned.", allocationId);
                                }
-                       } else {
-                               pendingSlotRequests.put(allocationId, 
pendingSlotRequest);
-
-                               LOG.debug("Expected timeout identifier {} 
differs from the pending slot request's " +
-                                       "timeout identifier {}. Ignoring the 
slot request timeout call.",
-                                       timeoutIdentifier, 
pendingSlotRequest.getTimeoutIdentifier());
                        }
-               } else {
-                       LOG.debug("Could not find pending slot request with 
allocation id {}. Ignoring the slot request timeout call.", allocationId);
                }
        }
 
@@ -846,6 +889,12 @@ public class SlotManager implements AutoCloseable {
        // Internal utility methods
        // 
---------------------------------------------------------------------------------------------
 
+       private void internalUnregisterTaskManager(TaskManagerRegistration 
taskManagerRegistration) {
+               Preconditions.checkNotNull(taskManagerRegistration);
+
+               removeSlots(taskManagerRegistration.getSlots());
+       }
+
        private boolean checkDuplicateRequest(AllocationID allocationId) {
                return pendingSlotRequests.containsKey(allocationId) || 
fulfilledSlotRequests.containsKey(allocationId);
        }
@@ -853,38 +902,18 @@ public class SlotManager implements AutoCloseable {
        private boolean anySlotUsed(Iterable<SlotID> slotsToCheck) {
 
                if (null != slotsToCheck) {
-                       boolean idle = true;
-
                        for (SlotID slotId : slotsToCheck) {
                                TaskManagerSlot taskManagerSlot = 
slots.get(slotId);
 
                                if (null != taskManagerSlot) {
-                                       idle &= taskManagerSlot.isFree();
+                                       if (taskManagerSlot.isAllocated()) {
+                                               return true;
+                                       }
                                }
                        }
-
-                       return !idle;
-               } else {
-                       return false;
                }
-       }
-
-       private void registerTaskManagerTimeout(final TaskManagerRegistration 
taskManagerRegistration) {
-               final UUID timeoutIdentifier = UUID.randomUUID();
-
-               ScheduledFuture<?> timeoutFuture = 
scheduledExecutor.schedule(new Runnable() {
-                       @Override
-                       public void run() {
-                               mainThreadExecutor.execute(new Runnable() {
-                                       @Override
-                                       public void run() {
-                                               
timeoutTaskManager(taskManagerRegistration.getInstanceId(), timeoutIdentifier);
-                                       }
-                               });
-                       }
-               }, taskManagerTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
 
-               taskManagerRegistration.registerTimeout(timeoutFuture, 
timeoutIdentifier);
+               return false;
        }
 
        private void checkInit() {
@@ -911,11 +940,11 @@ public class SlotManager implements AutoCloseable {
        }
 
        @VisibleForTesting
-       boolean hasTimeoutRegistered(InstanceID instanceId) {
+       boolean isTaskManagerIdle(InstanceID instanceId) {
                TaskManagerRegistration taskManagerRegistration = 
taskManagerRegistrations.get(instanceId);
 
                if (null != taskManagerRegistration) {
-                       return taskManagerRegistration.getTimeoutIdentifier() 
!= null;
+                       return taskManagerRegistration.isIdle();
                } else {
                        return false;
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/d75ec5b3/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerRegistration.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerRegistration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerRegistration.java
index 3a15cb3..7d3764c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerRegistration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerRegistration.java
@@ -25,8 +25,6 @@ import org.apache.flink.util.Preconditions;
 
 import java.util.Collection;
 import java.util.HashSet;
-import java.util.UUID;
-import java.util.concurrent.ScheduledFuture;
 
 public class TaskManagerRegistration {
 
@@ -34,9 +32,8 @@ public class TaskManagerRegistration {
 
        private final HashSet<SlotID> slots;
 
-       private UUID timeoutIdentifier;
-
-       private ScheduledFuture<?> timeoutFuture;
+       /** Timestamp when the last time becoming idle. Otherwise 
Long.MAX_VALUE. */
+       private long idleSince;
 
        public TaskManagerRegistration(
                TaskExecutorConnection taskManagerConnection,
@@ -47,8 +44,7 @@ public class TaskManagerRegistration {
 
                this.slots = new HashSet<>(slots);
 
-               timeoutIdentifier = null;
-               timeoutFuture = null;
+               idleSince = Long.MAX_VALUE;
        }
 
        public TaskExecutorConnection getTaskManagerConnection() {
@@ -59,31 +55,27 @@ public class TaskManagerRegistration {
                return taskManagerConnection.getInstanceID();
        }
 
-       public UUID getTimeoutIdentifier() {
-               return timeoutIdentifier;
-       }
-
        public Iterable<SlotID> getSlots() {
                return slots;
        }
 
-       public boolean containsSlot(SlotID slotId) {
-               return slots.contains(slotId);
+       public long getIdleSince() {
+               return idleSince;
        }
 
-       public void cancelTimeout() {
-               if (null != timeoutFuture) {
-                       timeoutFuture.cancel(false);
+       public boolean isIdle() {
+               return idleSince != Long.MAX_VALUE;
+       }
 
-                       timeoutFuture = null;
-                       timeoutIdentifier = null;
-               }
+       public void markIdle() {
+               idleSince = System.currentTimeMillis();
        }
 
-       public void registerTimeout(ScheduledFuture<?> newTimeoutFuture, UUID 
newTimeoutIdentifier) {
-               cancelTimeout();
+       public void markUsed() {
+               idleSince = Long.MAX_VALUE;
+       }
 
-               timeoutFuture = newTimeoutFuture;
-               timeoutIdentifier = newTimeoutIdentifier;
+       public boolean containsSlot(SlotID slotId) {
+               return slots.contains(slotId);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d75ec5b3/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
index 41c2e16..c740518 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
@@ -488,13 +488,17 @@ public class ResourceManagerTest extends TestLogger {
                final ScheduledExecutor scheduledExecutor = 
mock(ScheduledExecutor.class);
                final HeartbeatServices heartbeatServices = new 
TestingHeartbeatServices(heartbeatInterval, heartbeatTimeout, 
scheduledExecutor);
 
-               final TestingSlotManagerFactory slotManagerFactory = new 
TestingSlotManagerFactory();
                final MetricRegistry metricRegistry = 
mock(MetricRegistry.class);
                final JobLeaderIdService jobLeaderIdService = new 
JobLeaderIdService(
                        highAvailabilityServices,
                        rpcService.getScheduledExecutor(),
                        Time.minutes(5L));
                final TestingFatalErrorHandler testingFatalErrorHandler = new 
TestingFatalErrorHandler();
+               final SlotManager slotManager = new SlotManager(
+                       TestingUtils.defaultScheduledExecutor(),
+                       TestingUtils.infiniteTime(),
+                       TestingUtils.infiniteTime(),
+                       TestingUtils.infiniteTime());
 
                try {
                        final StandaloneResourceManager resourceManager = new 
StandaloneResourceManager(
@@ -504,7 +508,7 @@ public class ResourceManagerTest extends TestLogger {
                                resourceManagerConfiguration,
                                highAvailabilityServices,
                                heartbeatServices,
-                               slotManagerFactory,
+                               slotManager,
                                metricRegistry,
                                jobLeaderIdService,
                                testingFatalErrorHandler);

http://git-wip-us.apache.org/repos/asf/flink/blob/d75ec5b3/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
index fff2829..39c5f25 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
@@ -25,10 +25,9 @@ import 
org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.clusterframework.types.SlotID;
 import org.apache.flink.runtime.clusterframework.types.TaskManagerSlot;
-import org.apache.flink.runtime.concurrent.CompletableFuture;
-import org.apache.flink.runtime.concurrent.Executors;
-import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.runtime.concurrent.*;
 import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.resourcemanager.SlotRequest;
@@ -38,7 +37,6 @@ import org.apache.flink.runtime.taskexecutor.SlotReport;
 import org.apache.flink.runtime.taskexecutor.SlotStatus;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import 
org.apache.flink.runtime.taskexecutor.exceptions.SlotAllocationException;
-import org.apache.flink.runtime.taskexecutor.exceptions.SlotOccupiedException;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.util.TestLogger;
 import org.junit.Test;
@@ -46,9 +44,11 @@ import org.mockito.ArgumentCaptor;
 
 import java.util.Arrays;
 import java.util.UUID;
+import java.util.concurrent.Callable;
 import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -648,7 +648,7 @@ public class SlotManagerTest extends TestLogger {
         */
        @Test
        public void testTaskManagerTimeout() throws Exception {
-               final long tmTimeout = 50L;
+               final long tmTimeout = 500L;
 
                final ResourceManagerActions resourceManagerActions = 
mock(ResourceManagerActions.class);
                final UUID leaderId = UUID.randomUUID();
@@ -661,7 +661,7 @@ public class SlotManagerTest extends TestLogger {
                final SlotStatus slotStatus = new SlotStatus(slotId, 
resourceProfile);
                final SlotReport slotReport = new SlotReport(slotStatus);
 
-               final Executor mainThreadExecutor = mock(Executor.class);
+               final Executor mainThreadExecutor = 
TestingUtils.defaultExecutor();
 
                try (SlotManager slotManager = new SlotManager(
                        TestingUtils.defaultScheduledExecutor(),
@@ -671,24 +671,21 @@ public class SlotManagerTest extends TestLogger {
 
                        slotManager.start(leaderId, mainThreadExecutor, 
resourceManagerActions);
 
-                       slotManager.registerTaskManager(taskManagerConnection, 
slotReport);
-
-                       ArgumentCaptor<Runnable> runnableArgumentCaptor = 
ArgumentCaptor.forClass(Runnable.class);
-
-                       verify(mainThreadExecutor, timeout(tmTimeout * 
10L)).execute(runnableArgumentCaptor.capture());
+                       mainThreadExecutor.execute(new Runnable() {
+                               @Override
+                               public void run() {
+                                       
slotManager.registerTaskManager(taskManagerConnection, slotReport);
+                               }
+                       });
 
-                       // the only runnable being executed by the main thread 
executor should be the timeout runnable
-                       Runnable timeoutRunnable = 
runnableArgumentCaptor.getValue();
-
-                       timeoutRunnable.run();
-
-                       verify(resourceManagerActions, 
times(1)).releaseResource(eq(taskManagerConnection.getInstanceID()));
+                       verify(resourceManagerActions, timeout(100L * 
tmTimeout).times(1))
+                               
.releaseResource(eq(taskManagerConnection.getInstanceID()));
                }
        }
 
        /**
         * Tests that slot requests time out after the specified request 
timeout. If a slot request
-        * times out, then the request is cancelled, removed from the slot 
manager and the resourc
+        * times out, then the request is cancelled, removed from the slot 
manager and the resource
         * manager is notified about the failed allocation.
         */
        @Test
@@ -703,7 +700,7 @@ public class SlotManagerTest extends TestLogger {
                final ResourceProfile resourceProfile = new 
ResourceProfile(1.0, 1);
                final SlotRequest slotRequest = new SlotRequest(jobId, 
allocationId, resourceProfile, "foobar");
 
-               final Executor mainThreadExecutor = mock(Executor.class);
+               final Executor mainThreadExecutor = 
TestingUtils.defaultExecutor();
 
                try (SlotManager slotManager = new SlotManager(
                        TestingUtils.defaultScheduledExecutor(),
@@ -713,21 +710,27 @@ public class SlotManagerTest extends TestLogger {
 
                        slotManager.start(leaderId, mainThreadExecutor, 
resourceManagerActions);
 
-                       
assertTrue(slotManager.registerSlotRequest(slotRequest));
-
-                       ArgumentCaptor<Runnable> runnableArgumentCaptor = 
ArgumentCaptor.forClass(Runnable.class);
-
-                       verify(mainThreadExecutor, timeout(allocationTimeout * 
10L)).execute(runnableArgumentCaptor.capture());
-
-                       // the only runnable being executed by the main thread 
executor should be the timeout runnable
-                       Runnable timeoutRunnable = 
runnableArgumentCaptor.getValue();
+                       final AtomicReference<Exception> atomicException = new 
AtomicReference<>(null);
 
-                       timeoutRunnable.run();
+                       mainThreadExecutor.execute(new Runnable() {
+                               @Override
+                               public void run() {
+                                       try {
+                                               
assertTrue(slotManager.registerSlotRequest(slotRequest));
+                                       } catch (Exception e) {
+                                               
atomicException.compareAndSet(null, e);
+                                       }
+                               }
+                       });
 
-                       verify(resourceManagerActions, 
times(1)).notifyAllocationFailure(
+                       verify(resourceManagerActions, timeout(100L * 
allocationTimeout).times(1)).notifyAllocationFailure(
                                eq(jobId),
                                eq(allocationId),
                                any(TimeoutException.class));
+
+                       if (atomicException.get() != null) {
+                               throw atomicException.get();
+                       }
                }
        }
 
@@ -815,6 +818,7 @@ public class SlotManagerTest extends TestLogger {
        @Test
        @SuppressWarnings("unchecked")
        public void testSlotReportWhileActiveSlotRequest() throws Exception {
+               final long verifyTimeout = 1000L;
                final UUID leaderId = UUID.randomUUID();
                final ResourceManagerActions resourceManagerActions = 
mock(ResourceManagerActions.class);
 
@@ -842,20 +846,37 @@ public class SlotManagerTest extends TestLogger {
                final SlotStatus slotStatus2 = new SlotStatus(slotId2, 
resourceProfile);
                final SlotReport slotReport = new 
SlotReport(Arrays.asList(slotStatus1, slotStatus2));
 
-               // we have to manually trigger the future call backs to 
simulate the main thread executor behaviour
-               final Executor mainThreadExecutorMock = mock(Executor.class);
+               final Executor mainThreadExecutor = 
TestingUtils.defaultExecutor();
 
-               try (SlotManager slotManager = new SlotManager(
+               try (final SlotManager slotManager = new SlotManager(
                        TestingUtils.defaultScheduledExecutor(),
                        TestingUtils.infiniteTime(),
                        TestingUtils.infiniteTime(),
                        TestingUtils.infiniteTime())) {
 
-                       slotManager.start(leaderId, mainThreadExecutorMock, 
resourceManagerActions);
-
-                       slotManager.registerTaskManager(taskManagerConnection, 
slotReport);
+                       slotManager.start(leaderId, mainThreadExecutor, 
resourceManagerActions);
 
-                       slotManager.registerSlotRequest(slotRequest);
+                       Future<Void> registrationFuture = 
FlinkFuture.supplyAsync(new Callable<Void>() {
+                               @Override
+                               public Void call() throws Exception {
+                                       
slotManager.registerTaskManager(taskManagerConnection, slotReport);
+
+                                       return null;
+                               }
+                       }, mainThreadExecutor)
+                       .thenAccept(new AcceptFunction<Void>() {
+                               @Override
+                               public void accept(Void value) {
+                                       try {
+                                               
slotManager.registerSlotRequest(slotRequest);
+                                       } catch (SlotManagerException e) {
+                                               throw new 
RuntimeException("Could not register slots.", e);
+                                       }
+                               }
+                       });
+
+                       // check that no exception has been thrown
+                       registrationFuture.get();
 
                        ArgumentCaptor<SlotID> slotIdCaptor = 
ArgumentCaptor.forClass(SlotID.class);
 
@@ -867,26 +888,33 @@ public class SlotManagerTest extends TestLogger {
                                eq(leaderId),
                                any(Time.class));
 
-                       final SlotID requestedSlotdId = slotIdCaptor.getValue();
-                       final SlotID freeSlotId = 
requestedSlotdId.equals(slotId1) ? slotId2 : slotId1;
+                       final SlotID requestedSlotId = slotIdCaptor.getValue();
+                       final SlotID freeSlotId = 
requestedSlotId.equals(slotId1) ? slotId2 : slotId1;
+
+                       Future<Boolean> freeSlotFuture = 
FlinkFuture.supplyAsync(new Callable<Boolean>() {
+                               @Override
+                               public Boolean call() throws Exception {
+                                       return 
slotManager.getSlot(freeSlotId).isFree();
+                               }
+                       }, mainThreadExecutor);
 
-                       assertTrue(slotManager.getSlot(freeSlotId).isFree());
+                       assertTrue(freeSlotFuture.get());
 
                        final SlotStatus newSlotStatus1 = new 
SlotStatus(slotIdCaptor.getValue(), resourceProfile, new JobID(), new 
AllocationID());
                        final SlotStatus newSlotStatus2 = new 
SlotStatus(freeSlotId, resourceProfile);
                        final SlotReport newSlotReport = new 
SlotReport(Arrays.asList(newSlotStatus1, newSlotStatus2));
 
-                       // this should update the slot with the pending slot 
request triggering the reassignment of it
-                       
slotManager.reportSlotStatus(taskManagerConnection.getInstanceID(), 
newSlotReport);
+                       FlinkFuture.supplyAsync(new Callable<Void>() {
+                               @Override
+                               public Void call() throws Exception {
+                                       // this should update the slot with the 
pending slot request triggering the reassignment of it
+                                       
slotManager.reportSlotStatus(taskManagerConnection.getInstanceID(), 
newSlotReport);
 
-                       ArgumentCaptor<Runnable> runnableArgumentCaptor = 
ArgumentCaptor.forClass(Runnable.class);
-                       
verify(mainThreadExecutorMock).execute(runnableArgumentCaptor.capture());
+                                       return null;
+                               }
+                       }, mainThreadExecutor);
 
-                       Runnable requestFailureRunnable = 
runnableArgumentCaptor.getValue();
-
-                       requestFailureRunnable.run();
-
-                       verify(taskExecutorGateway, times(2)).requestSlot(
+                       verify(taskExecutorGateway, 
timeout(verifyTimeout).times(2)).requestSlot(
                                slotIdCaptor.capture(),
                                eq(jobId),
                                eq(allocationId),
@@ -894,16 +922,18 @@ public class SlotManagerTest extends TestLogger {
                                eq(leaderId),
                                any(Time.class));
 
-                       verify(mainThreadExecutorMock, 
times(2)).execute(runnableArgumentCaptor.capture());
-                       Runnable requestSuccessRunnable = 
runnableArgumentCaptor.getValue();
+                       final SlotID requestedSlotId2 = slotIdCaptor.getValue();
 
-                       requestSuccessRunnable.run();
-
-                       final SlotID requestedSlotId = slotIdCaptor.getValue();
+                       assertEquals(slotId2, requestedSlotId2);
 
-                       assertEquals(slotId2, requestedSlotId);
+                       Future<TaskManagerSlot> requestedSlotFuture = 
FlinkFuture.supplyAsync(new Callable<TaskManagerSlot>() {
+                               @Override
+                               public TaskManagerSlot call() throws Exception {
+                                       return 
slotManager.getSlot(requestedSlotId2);
+                               }
+                       }, mainThreadExecutor);
 
-                       TaskManagerSlot slot = 
slotManager.getSlot(requestedSlotId);
+                       TaskManagerSlot slot = requestedSlotFuture.get();
 
                        assertTrue(slot.isAllocated());
                        assertEquals(allocationId, slot.getAllocationId());
@@ -916,11 +946,12 @@ public class SlotManagerTest extends TestLogger {
         */
        @Test
        public void testTimeoutForUnusedTaskManager() throws Exception {
-               final long taskManagerTimeout = 123456L;
+               final long taskManagerTimeout = 50L;
+               final long verifyTimeout = taskManagerTimeout * 10L;
 
                final UUID leaderId = UUID.randomUUID();
                final ResourceManagerActions resourceManagerActions = 
mock(ResourceManagerActions.class);
-               final ScheduledExecutor scheduledExecutor = 
mock(ScheduledExecutor.class);
+               final ScheduledExecutor scheduledExecutor = 
TestingUtils.defaultScheduledExecutor();
 
                final ResourceID resourceId = ResourceID.generate();
 
@@ -946,21 +977,34 @@ public class SlotManagerTest extends TestLogger {
                final SlotStatus slotStatus2 = new SlotStatus(slotId2, 
resourceProfile);
                final SlotReport initialSlotReport = new 
SlotReport(Arrays.asList(slotStatus1, slotStatus2));
 
-               try (SlotManager slotManager = new SlotManager(
+               final Executor mainThreadExecutor = 
TestingUtils.defaultExecutor();
+
+               try (final SlotManager slotManager = new SlotManager(
                        scheduledExecutor,
                        TestingUtils.infiniteTime(),
                        TestingUtils.infiniteTime(),
                        Time.of(taskManagerTimeout, TimeUnit.MILLISECONDS))) {
 
-                       slotManager.start(leaderId, Executors.directExecutor(), 
resourceManagerActions);
-
-                       slotManager.registerSlotRequest(slotRequest);
+                       slotManager.start(leaderId, mainThreadExecutor, 
resourceManagerActions);
 
-                       slotManager.registerTaskManager(taskManagerConnection, 
initialSlotReport);
+                       FlinkFuture.supplyAsync(new Callable<Void>() {
+                               @Override
+                               public Void call() throws Exception {
+                                       
slotManager.registerSlotRequest(slotRequest);
+
+                                       return null;
+                               }
+                       }, mainThreadExecutor)
+                       .thenAccept(new AcceptFunction<Void>() {
+                               @Override
+                               public void accept(Void value) {
+                                       
slotManager.registerTaskManager(taskManagerConnection, initialSlotReport);
+                               }
+                       });
 
                        ArgumentCaptor<SlotID> slotIdArgumentCaptor = 
ArgumentCaptor.forClass(SlotID.class);
 
-                       verify(taskExecutorGateway).requestSlot(
+                       verify(taskExecutorGateway, 
timeout(verifyTimeout)).requestSlot(
                                slotIdArgumentCaptor.capture(),
                                eq(jobId),
                                eq(allocationId),
@@ -968,103 +1012,48 @@ public class SlotManagerTest extends TestLogger {
                                eq(leaderId),
                                any(Time.class));
 
-                       
assertFalse(slotManager.hasTimeoutRegistered(taskManagerConnection.getInstanceID()));
-
-                       SlotID slotId = slotIdArgumentCaptor.getValue();
-                       TaskManagerSlot slot = slotManager.getSlot(slotId);
-
-                       assertTrue(slot.isAllocated());
-                       assertEquals(allocationId, slot.getAllocationId());
-
-                       slotManager.freeSlot(slotId, allocationId);
+                       Future<Boolean> idleFuture = 
FlinkFuture.supplyAsync(new Callable<Boolean>() {
+                               @Override
+                               public Boolean call() throws Exception {
+                                       return 
slotManager.isTaskManagerIdle(taskManagerConnection.getInstanceID());
+                               }
+                       }, mainThreadExecutor);
 
-                       
assertTrue(slotManager.hasTimeoutRegistered(taskManagerConnection.getInstanceID()));
+                       // check that the TaskManaer is not idle
+                       assertFalse(idleFuture.get());
 
-                       ArgumentCaptor<Runnable> runnableArgumentCaptor = 
ArgumentCaptor.forClass(Runnable.class);
+                       final SlotID slotId = slotIdArgumentCaptor.getValue();
 
-                       // filter out the schedule call for the task manager 
which will be registered using the
-                       // taskManagerTimeout value
-                       
verify(scheduledExecutor).schedule(runnableArgumentCaptor.capture(), 
eq(taskManagerTimeout), eq(TimeUnit.MILLISECONDS));
+                       Future<TaskManagerSlot> slotFuture = 
FlinkFuture.supplyAsync(new Callable<TaskManagerSlot>() {
+                               @Override
+                               public TaskManagerSlot call() throws Exception {
+                                       return slotManager.getSlot(slotId);
+                               }
+                       }, mainThreadExecutor);
 
-                       Runnable timeoutRunnable = 
runnableArgumentCaptor.getValue();
+                       TaskManagerSlot slot = slotFuture.get();
 
-                       timeoutRunnable.run();
-
-                       verify(resourceManagerActions, 
times(1)).releaseResource(eq(taskManagerConnection.getInstanceID()));
-               }
-       }
-
-       /**
-        * Tests that the slot manager re-registers a timeout for a rejected 
slot request.
-        */
-       @Test
-       public void testTimeoutForRejectedSlotRequest() throws Exception {
-
-               final long slotRequestTimeout = 1337L;
-               final ScheduledExecutor scheduledExecutor = 
mock(ScheduledExecutor.class);
-
-               final ResourceID resourceId = ResourceID.generate();
-               final SlotID slotId = new SlotID(resourceId, 0);
-               final ResourceProfile resourceProfile = new 
ResourceProfile(1.0, 1);
-               final SlotStatus slotStatus = new SlotStatus(slotId, 
resourceProfile);
-               final SlotReport slotReport = new SlotReport(slotStatus);
-
-               final UUID leaderId = UUID.randomUUID();
-               final ResourceManagerActions resourceManagerActions = 
mock(ResourceManagerActions.class);
-
-               final JobID jobId = new JobID();
-               final AllocationID allocationId = new AllocationID();
-               final AllocationID allocationId2 = new AllocationID();
-               final SlotRequest slotRequest = new SlotRequest(jobId, 
allocationId, resourceProfile, "foobar");
-
-               CompletableFuture<Acknowledge> requestFuture = new 
FlinkCompletableFuture<>();
-
-               final TaskExecutorGateway taskExecutorGateway = 
mock(TaskExecutorGateway.class);
-               when(taskExecutorGateway.requestSlot(
-                       eq(slotId),
-                       eq(jobId),
-                       eq(allocationId),
-                       anyString(),
-                       eq(leaderId),
-                       any(Time.class))).thenReturn(requestFuture);
-
-               final TaskExecutorConnection taskManagerConnection = new 
TaskExecutorConnection(taskExecutorGateway);
-
-               try (SlotManager slotManager = new SlotManager(
-                       scheduledExecutor,
-                       TestingUtils.infiniteTime(),
-                       Time.milliseconds(slotRequestTimeout),
-                       TestingUtils.infiniteTime())) {
-
-                       slotManager.start(leaderId, Executors.directExecutor(), 
resourceManagerActions);
-
-                       slotManager.registerTaskManager(taskManagerConnection, 
slotReport);
-
-                       slotManager.registerSlotRequest(slotRequest);
-
-                       verify(taskExecutorGateway).requestSlot(
-                               eq(slotId),
-                               eq(jobId),
-                               eq(allocationId),
-                               anyString(),
-                               eq(leaderId),
-                               any(Time.class));
-
-                       requestFuture.completeExceptionally(new 
SlotOccupiedException("Slot is already occupied", allocationId2));
-
-                       ArgumentCaptor<Runnable> runnableArgumentCaptor = 
ArgumentCaptor.forClass(Runnable.class);
-                       
verify(scheduledExecutor).schedule(runnableArgumentCaptor.capture(), 
eq(slotRequestTimeout), eq(TimeUnit.MILLISECONDS));
+                       assertTrue(slot.isAllocated());
+                       assertEquals(allocationId, slot.getAllocationId());
 
-                       Runnable timeoutRunnable = 
runnableArgumentCaptor.getValue();
+                       Future<Boolean> idleFuture2 = 
FlinkFuture.supplyAsync(new Callable<Void>() {
+                               @Override
+                               public Void call() throws Exception {
+                                       slotManager.freeSlot(slotId, 
allocationId);
 
-                       timeoutRunnable.run();
+                                       return null;
+                               }
+                       }, mainThreadExecutor)
+                       .thenApply(new ApplyFunction<Void, Boolean>() {
+                               @Override
+                               public Boolean apply(Void value) {
+                                       return 
slotManager.isTaskManagerIdle(taskManagerConnection.getInstanceID());
+                               }
+                       });
 
-                       
verify(resourceManagerActions).notifyAllocationFailure(eq(jobId), 
eq(allocationId), any(Exception.class));
+                       assertTrue(idleFuture2.get());
 
-                       TaskManagerSlot slot = slotManager.getSlot(slotId);
-
-                       assertTrue(slot.isAllocated());
-                       assertEquals(allocationId2, slot.getAllocationId());
+                       verify(resourceManagerActions, 
timeout(verifyTimeout).times(1)).releaseResource(eq(taskManagerConnection.getInstanceID()));
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d75ec5b3/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
index c09316c..a1ab1ab 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
@@ -76,7 +76,6 @@ public class SlotProtocolTest extends TestLogger {
        @Test
        public void testSlotsUnavailableRequest() throws Exception {
                final JobID jobID = new JobID();
-               final ResourceID jmResourceId = new ResourceID(jmAddress);
 
                final UUID rmLeaderID = UUID.randomUUID();
 
@@ -133,7 +132,6 @@ public class SlotProtocolTest extends TestLogger {
        @Test
        public void testSlotAvailableRequest() throws Exception {
                final JobID jobID = new JobID();
-               final ResourceID jmResourceId = new ResourceID(jmAddress);
 
                final UUID rmLeaderID = UUID.randomUUID();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d75ec5b3/flink-runtime/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/resources/log4j-test.properties 
b/flink-runtime/src/test/resources/log4j-test.properties
index 7ba1633..812a256 100644
--- a/flink-runtime/src/test/resources/log4j-test.properties
+++ b/flink-runtime/src/test/resources/log4j-test.properties
@@ -16,7 +16,7 @@
 # limitations under the License.
 
################################################################################
 
-log4j.rootLogger=OFF, console
+log4j.rootLogger=DEBUG, console
 
 # -----------------------------------------------------------------------------
 # Console (use 'console')

http://git-wip-us.apache.org/repos/asf/flink/blob/d75ec5b3/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
index 03b5172..876e26b 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
@@ -82,7 +82,7 @@ object TestingUtils {
   def getDefaultTestingActorSystemConfig = testConfig
 
   def infiniteTime: Time = {
-    Time.milliseconds(Long.MaxValue);
+    Time.milliseconds(Integer.MAX_VALUE);
   }
   
 
@@ -113,7 +113,7 @@ object TestingUtils {
   def defaultExecutor: ScheduledExecutorService = {
     synchronized {
       if (sharedExecutorInstance == null || sharedExecutorInstance.isShutdown) 
{
-        sharedExecutorInstance = Executors.newSingleThreadScheduledExecutor()
+        sharedExecutorInstance = Executors.newSingleThreadScheduledExecutor();
       }
 
       sharedExecutorInstance

Reply via email to