[FLINK-4690] Replace SlotAllocationFuture with flink's own future

This closes #2552.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7b88f1a7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7b88f1a7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7b88f1a7

Branch: refs/heads/master
Commit: 7b88f1a75ea92f6b26624a7358e7fcafa3e9506f
Parents: f8138f4
Author: Kurt Young <ykt...@gmail.com>
Authored: Tue Sep 27 12:10:08 2016 +0800
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Tue Sep 27 18:39:36 2016 +0200

----------------------------------------------------------------------
 .../runtime/concurrent/impl/FlinkFuture.java    |   1 -
 .../flink/runtime/executiongraph/Execution.java |  55 ++---
 .../flink/runtime/instance/SlotProvider.java    |   6 +-
 .../runtime/jobmanager/scheduler/Scheduler.java |  24 ++-
 .../scheduler/SlotAllocationFuture.java         | 146 --------------
 .../scheduler/SlotAllocationFutureAction.java   |  34 ----
 .../ExecutionGraphMetricsTest.java              |   9 +-
 .../ExecutionVertexSchedulingTest.java          |  19 +-
 .../scheduler/SchedulerIsolatedTasksTest.java   |  31 ++-
 .../scheduler/SlotAllocationFutureTest.java     | 200 -------------------
 10 files changed, 80 insertions(+), 445 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7b88f1a7/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkFuture.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkFuture.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkFuture.java
index 361cd3d..3f2c5e4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkFuture.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkFuture.java
@@ -104,7 +104,6 @@ public class FlinkFuture<T> implements Future<T> {
        @Override
        public T getNow(T valueIfAbsent) throws ExecutionException {
                Preconditions.checkNotNull(scalaFuture);
-               Preconditions.checkNotNull(valueIfAbsent);
 
                Option<Try<T>> value = scalaFuture.value();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7b88f1a7/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 6826365..8c02e1b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -20,18 +20,18 @@ package org.apache.flink.runtime.executiongraph;
 
 import akka.dispatch.OnComplete;
 import akka.dispatch.OnFailure;
-
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.BiFunction;
+import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
 import 
org.apache.flink.runtime.deployment.PartialInputChannelDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionLocation;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.instance.SlotProvider;
@@ -41,20 +41,18 @@ import 
org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
 import 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
 import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
-import org.apache.flink.runtime.jobmanager.scheduler.SlotAllocationFuture;
-import 
org.apache.flink.runtime.jobmanager.scheduler.SlotAllocationFutureAction;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.messages.Messages;
 import org.apache.flink.runtime.messages.TaskMessages.TaskOperationResult;
 import org.apache.flink.runtime.state.ChainedStateHandle;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.ExceptionUtils;
-
 import org.slf4j.Logger;
 
 import scala.concurrent.ExecutionContext;
-import scala.concurrent.Future;
+import scala.concurrent.ExecutionContext$;
 import scala.concurrent.duration.FiniteDuration;
 
 import java.util.ArrayList;
@@ -299,32 +297,43 @@ public class Execution {
 
                        // IMPORTANT: To prevent leaks of cluster resources, we 
need to make sure that slots are returned
                        //     in all cases where the deployment failed. we use 
many try {} finally {} clauses to assure that
-                       final SlotAllocationFuture future = 
slotProvider.allocateSlot(toSchedule, queued);
+                       final Future<SimpleSlot> future = 
slotProvider.allocateSlot(toSchedule, queued);
+
                        if (queued) {
-                               future.setFutureAction(new 
SlotAllocationFutureAction() {
+                               future.handleAsync(new BiFunction<SimpleSlot, 
Throwable, Void>() {
                                        @Override
-                                       public void slotAllocated(SimpleSlot 
slot) {
-                                               try {
-                                                       deployToSlot(slot);
-                                               }
-                                               catch (Throwable t) {
+                                       public Void apply(SimpleSlot 
simpleSlot, Throwable throwable) {
+                                               if (simpleSlot != null) {
                                                        try {
-                                                               
slot.releaseSlot();
-                                                       } finally {
-                                                               markFailed(t);
+                                                               
deployToSlot(simpleSlot);
+                                                       } catch (Throwable t) {
+                                                               try {
+                                                                       
simpleSlot.releaseSlot();
+                                                               } finally {
+                                                                       
markFailed(t);
+                                                               }
                                                        }
                                                }
+                                               else {
+                                                       markFailed(throwable);
+                                               }
+                                               return null;
                                        }
-                               });
+                               }, ExecutionContext$.MODULE$.global());
                        }
                        else {
-                               SimpleSlot slot = future.get();
+                               SimpleSlot slot = null;
                                try {
+                                       // when queued is not allowed, we will 
get a slot or NoResourceAvailableException will be
+                                       // thrown earlier (when allocateSlot).
+                                       slot = 
checkNotNull(future.getNow(null));
                                        deployToSlot(slot);
                                }
                                catch (Throwable t) {
                                        try {
-                                               slot.releaseSlot();
+                                               if (slot != null) {
+                                                       slot.releaseSlot();
+                                               }
                                        } finally {
                                                markFailed(t);
                                        }
@@ -394,7 +403,7 @@ public class Execution {
                        
                        final ActorGateway gateway = 
slot.getTaskManagerActorGateway();
 
-                       final Future<Object> deployAction = gateway.ask(new 
SubmitTask(deployment), timeout);
+                       final scala.concurrent.Future<Object> deployAction = 
gateway.ask(new SubmitTask(deployment), timeout);
 
                        deployAction.onComplete(new OnComplete<Object>(){
 
@@ -436,7 +445,7 @@ public class Execution {
                if (slot != null) {
                        final ActorGateway gateway = 
slot.getTaskManagerActorGateway();
 
-                       Future<Object> stopResult = gateway.retry(
+                       scala.concurrent.Future<Object> stopResult = 
gateway.retry(
                                new StopTask(attemptId),
                                NUM_STOP_CALL_TRIES,
                                timeout,
@@ -916,7 +925,7 @@ public class Execution {
 
                        final ActorGateway gateway = 
slot.getTaskManagerActorGateway();
 
-                       Future<Object> cancelResult = gateway.retry(
+                       scala.concurrent.Future<Object> cancelResult = 
gateway.retry(
                                new CancelTask(attemptId),
                                NUM_CANCEL_CALL_TRIES,
                                timeout,
@@ -965,7 +974,7 @@ public class Execution {
                        final ActorGateway gateway = 
consumerSlot.getTaskManagerActorGateway();
                        final TaskManagerLocation taskManagerLocation = 
consumerSlot.getTaskManagerLocation();
 
-                       Future<Object> futureUpdate = 
gateway.ask(updatePartitionInfo, timeout);
+                       scala.concurrent.Future<Object> futureUpdate = 
gateway.ask(updatePartitionInfo, timeout);
 
                        futureUpdate.onFailure(new OnFailure() {
                                @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/7b88f1a7/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotProvider.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotProvider.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotProvider.java
index b2c23a5..49e6d9f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotProvider.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotProvider.java
@@ -18,9 +18,9 @@
 
 package org.apache.flink.runtime.instance;
 
+import org.apache.flink.runtime.concurrent.Future;
 import 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
 import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
-import org.apache.flink.runtime.jobmanager.scheduler.SlotAllocationFuture;
 
 /**
  * The slot provider is responsible for preparing slots for ready-to-run tasks.
@@ -28,7 +28,7 @@ import 
org.apache.flink.runtime.jobmanager.scheduler.SlotAllocationFuture;
  * <p>It supports two allocating modes:
  * <ul>
  *     <li>Immediate allocating: A request for a task slot immediately gets 
satisfied, we can call
- *         {@link SlotAllocationFuture#get()} to get the allocated slot.</li>
+ *         {@link Future#getNow(Object)} to get the allocated slot.</li>
  *     <li>Queued allocating: A request for a task slot is queued and returns 
a future that will be
  *         fulfilled as soon as a slot becomes available.</li>
  * </ul>
@@ -44,5 +44,5 @@ public interface SlotProvider {
         * 
         * @throws NoResourceAvailableException
         */
-       SlotAllocationFuture allocateSlot(ScheduledUnit task, boolean 
allowQueued) throws NoResourceAvailableException;
+       Future<SimpleSlot> allocateSlot(ScheduledUnit task, boolean 
allowQueued) throws NoResourceAvailableException;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7b88f1a7/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
index c9cdd00..ce2f6f7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
@@ -39,6 +39,8 @@ import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.commons.lang3.tuple.Pair;
 
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.instance.SlotProvider;
 import org.apache.flink.runtime.instance.SlotSharingGroupAssignment;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -133,15 +135,17 @@ public class Scheduler implements InstanceListener, 
SlotAvailabilityListener, Sl
 
 
        @Override
-       public SlotAllocationFuture allocateSlot(ScheduledUnit task, boolean 
allowQueued) 
+       public Future<SimpleSlot> allocateSlot(ScheduledUnit task, boolean 
allowQueued)
                        throws NoResourceAvailableException {
 
                final Object ret = scheduleTask(task, allowQueued);
                if (ret instanceof SimpleSlot) {
-                       return new SlotAllocationFuture((SimpleSlot) ret);
+                       FlinkCompletableFuture<SimpleSlot> future = new 
FlinkCompletableFuture<>();
+                       future.complete((SimpleSlot) ret);
+                       return future;
                }
-               else if (ret instanceof SlotAllocationFuture) {
-                       return (SlotAllocationFuture) ret;
+               else if (ret instanceof Future) {
+                       return (Future) ret;
                }
                else {
                        throw new RuntimeException();
@@ -149,7 +153,7 @@ public class Scheduler implements InstanceListener, 
SlotAvailabilityListener, Sl
        }
 
        /**
-        * Returns either a {@link 
org.apache.flink.runtime.instance.SimpleSlot}, or a {@link 
SlotAllocationFuture}.
+        * Returns either a {@link 
org.apache.flink.runtime.instance.SimpleSlot}, or a {@link Future}.
         */
        private Object scheduleTask(ScheduledUnit task, boolean 
queueIfNoResource) throws NoResourceAvailableException {
                if (task == null) {
@@ -312,7 +316,7 @@ public class Scheduler implements InstanceListener, 
SlotAvailabilityListener, Sl
                                else {
                                        // no resource available now, so queue 
the request
                                        if (queueIfNoResource) {
-                                               SlotAllocationFuture future = 
new SlotAllocationFuture();
+                                               
FlinkCompletableFuture<SimpleSlot> future = new FlinkCompletableFuture<>();
                                                this.taskQueue.add(new 
QueuedTask(task, future));
                                                return future;
                                        }
@@ -560,7 +564,7 @@ public class Scheduler implements InstanceListener, 
SlotAvailabilityListener, Sl
                                                taskQueue.poll();
                                                if (queued.getFuture() != null) 
{
                                                        try {
-                                                               
queued.getFuture().setSlot(newSlot);
+                                                               
queued.getFuture().complete(newSlot);
                                                        }
                                                        catch (Throwable t) {
                                                                
LOG.error("Error calling allocation future for task " + vertex.getSimpleName(), 
t);
@@ -829,10 +833,10 @@ public class Scheduler implements InstanceListener, 
SlotAvailabilityListener, Sl
                
                private final ScheduledUnit task;
                
-               private final SlotAllocationFuture future;
+               private final FlinkCompletableFuture<SimpleSlot> future;
                
                
-               public QueuedTask(ScheduledUnit task, SlotAllocationFuture 
future) {
+               public QueuedTask(ScheduledUnit task, 
FlinkCompletableFuture<SimpleSlot> future) {
                        this.task = task;
                        this.future = future;
                }
@@ -841,7 +845,7 @@ public class Scheduler implements InstanceListener, 
SlotAvailabilityListener, Sl
                        return task;
                }
 
-               public SlotAllocationFuture getFuture() {
+               public FlinkCompletableFuture<SimpleSlot> getFuture() {
                        return future;
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/7b88f1a7/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFuture.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFuture.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFuture.java
deleted file mode 100644
index 36e4072..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFuture.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobmanager.scheduler;
-
-import org.apache.flink.runtime.instance.SimpleSlot;
-
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.flink.util.Preconditions.checkState;
-
-/**
- * 
- */
-public class SlotAllocationFuture {
-
-       private final Object monitor = new Object();
-
-       private volatile SimpleSlot slot;
-
-       private volatile SlotAllocationFutureAction action;
-
-       // 
--------------------------------------------------------------------------------------------
-
-       /**
-        * Creates a future that is uncompleted.
-        */
-       public SlotAllocationFuture() {}
-
-       /**
-        * Creates a future that is immediately completed.
-        * 
-        * @param slot The task slot that completes the future.
-        */
-       public SlotAllocationFuture(SimpleSlot slot) {
-               this.slot = slot;
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-
-       public SimpleSlot waitTillCompleted() throws InterruptedException {
-               synchronized (monitor) {
-                       while (slot == null) {
-                               monitor.wait();
-                       }
-                       return slot;
-               }
-       }
-
-       public SimpleSlot waitTillCompleted(long timeout, TimeUnit timeUnit) 
throws InterruptedException, TimeoutException {
-               checkArgument(timeout >= 0, "timeout may not be negative");
-               checkNotNull(timeUnit, "timeUnit");
-
-               if (timeout == 0) {
-                       return waitTillCompleted();
-               } else {
-                       final long deadline = System.nanoTime() + 
timeUnit.toNanos(timeout);
-                       long millisToWait;
-
-                       synchronized (monitor) {
-                               while (slot == null && (millisToWait = 
(deadline - System.nanoTime()) / 1_000_000) > 0) {
-                                       monitor.wait(millisToWait);
-                               }
-
-                               if (slot != null) {
-                                       return slot;
-                               } else {
-                                       throw new TimeoutException();
-                               }
-                       }
-               }
-       }
-
-       /**
-        * Gets the slot from this future. This method throws an exception, if 
the future has not been completed.
-        * This method never blocks.
-        * 
-        * @return The slot with which this future was completed.
-        * @throws IllegalStateException Thrown, if this method is called 
before the future is completed.
-        */
-       public SimpleSlot get() {
-               final SimpleSlot slot = this.slot;
-               if (slot != null) {
-                       return slot;
-               } else {
-                       throw new IllegalStateException("The future is not 
complete - not slot available");
-               }
-       }
-
-       public void setFutureAction(SlotAllocationFutureAction action) {
-               checkNotNull(action);
-
-               synchronized (monitor) {
-                       checkState(this.action == null, "Future already has an 
action registered.");
-
-                       this.action = action;
-
-                       if (this.slot != null) {
-                               action.slotAllocated(this.slot);
-                       }
-               }
-       }
-
-       /**
-        * Completes the future with a slot.
-        */
-       public void setSlot(SimpleSlot slot) {
-               checkNotNull(slot);
-
-               synchronized (monitor) {
-                       checkState(this.slot == null, "The future has already 
been assigned a slot.");
-
-                       this.slot = slot;
-                       monitor.notifyAll();
-
-                       if (action != null) {
-                               action.slotAllocated(slot);
-                       }
-               }
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-
-       @Override
-       public String toString() {
-               return slot == null ? "PENDING" : "DONE";
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/7b88f1a7/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFutureAction.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFutureAction.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFutureAction.java
deleted file mode 100644
index f9d032f..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFutureAction.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobmanager.scheduler;
-
-import org.apache.flink.runtime.instance.SimpleSlot;
-
-/**
- * An action that is invoked once a {@link SlotAllocationFuture} is triggered.
- */
-public interface SlotAllocationFutureAction {
-
-       /**
-        * This method is called as soon as the SlotAllocationFuture is 
triggered.
-        * 
-        * @param slot The slot that has been allocated.
-        */
-       void slotAllocated(SimpleSlot slot);
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/7b88f1a7/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
index aa5925f..a58d910 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
@@ -29,11 +29,11 @@ import org.apache.flink.metrics.reporter.MetricReporter;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.Instance;
-import org.apache.flink.runtime.jobmanager.scheduler.SlotAllocationFuture;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.instance.Slot;
@@ -136,10 +136,9 @@ public class ExecutionGraphMetricsTest extends TestLogger {
                        
when(simpleSlot.setExecutedVertex(Matchers.any(Execution.class))).thenReturn(true);
                        when(simpleSlot.getRoot()).thenReturn(rootSlot);
 
-                       when(scheduler.allocateSlot(any(ScheduledUnit.class), 
anyBoolean()))
-                                       .thenReturn(new 
SlotAllocationFuture(simpleSlot));
-
-                       
+                       FlinkCompletableFuture<SimpleSlot> future = new 
FlinkCompletableFuture<>();
+                       future.complete(simpleSlot);
+                       when(scheduler.allocateSlot(any(ScheduledUnit.class), 
anyBoolean())).thenReturn(future);
 
                        when(rootSlot.getSlotNumber()).thenReturn(0);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7b88f1a7/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
index c576ce5..104f4ea 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.executiongraph;
 
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.instance.DummyActorGateway;
 import org.apache.flink.runtime.instance.Instance;
@@ -26,7 +27,6 @@ import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
-import org.apache.flink.runtime.jobmanager.scheduler.SlotAllocationFuture;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 
 import org.junit.Test;
@@ -58,8 +58,9 @@ public class ExecutionVertexSchedulingTest {
                        assertTrue(slot.isReleased());
 
                        Scheduler scheduler = mock(Scheduler.class);
-                       
when(scheduler.allocateSlot(Matchers.any(ScheduledUnit.class), anyBoolean()))
-                               .thenReturn(new SlotAllocationFuture(slot));
+                       FlinkCompletableFuture<SimpleSlot> future = new 
FlinkCompletableFuture<>();
+                       future.complete(slot);
+                       
when(scheduler.allocateSlot(Matchers.any(ScheduledUnit.class), 
anyBoolean())).thenReturn(future);
 
                        assertEquals(ExecutionState.CREATED, 
vertex.getExecutionState());
                        // try to deploy to the slot
@@ -88,7 +89,7 @@ public class ExecutionVertexSchedulingTest {
                        slot.releaseSlot();
                        assertTrue(slot.isReleased());
 
-                       final SlotAllocationFuture future = new 
SlotAllocationFuture();
+                       final FlinkCompletableFuture<SimpleSlot> future = new 
FlinkCompletableFuture<>();
 
                        Scheduler scheduler = mock(Scheduler.class);
                        
when(scheduler.allocateSlot(Matchers.any(ScheduledUnit.class), 
anyBoolean())).thenReturn(future);
@@ -100,7 +101,10 @@ public class ExecutionVertexSchedulingTest {
                        // future has not yet a slot
                        assertEquals(ExecutionState.SCHEDULED, 
vertex.getExecutionState());
 
-                       future.setSlot(slot);
+                       future.complete(slot);
+
+                       // wait a second for future's future action be executed
+                       Thread.sleep(1000);
 
                        // will have failed
                        assertEquals(ExecutionState.FAILED, 
vertex.getExecutionState());
@@ -122,8 +126,9 @@ public class ExecutionVertexSchedulingTest {
                        final SimpleSlot slot = 
instance.allocateSimpleSlot(ejv.getJobId());
 
                        Scheduler scheduler = mock(Scheduler.class);
-                       
when(scheduler.allocateSlot(Matchers.any(ScheduledUnit.class), anyBoolean()))
-                               .thenReturn(new SlotAllocationFuture(slot));
+                       FlinkCompletableFuture<SimpleSlot> future = new 
FlinkCompletableFuture<>();
+                       future.complete(slot);
+                       
when(scheduler.allocateSlot(Matchers.any(ScheduledUnit.class), 
anyBoolean())).thenReturn(future);
 
                        assertEquals(ExecutionState.CREATED, 
vertex.getExecutionState());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7b88f1a7/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
index d78f551..9c21533 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.jobmanager.scheduler;
 
+import org.apache.flink.runtime.concurrent.AcceptFunction;
+import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
@@ -194,7 +196,7 @@ public class SchedulerIsolatedTasksTest {
                        final int totalSlots = 
scheduler.getNumberOfAvailableSlots();
 
                        // all slots we ever got.
-                       List<SlotAllocationFuture> allAllocatedSlots = new 
ArrayList<SlotAllocationFuture>();
+                       List<Future<SimpleSlot>> allAllocatedSlots = new 
ArrayList<>();
 
                        // slots that need to be released
                        final Set<SimpleSlot> toRelease = new 
HashSet<SimpleSlot>();
@@ -202,17 +204,6 @@ public class SchedulerIsolatedTasksTest {
                        // flag to track errors in the concurrent thread
                        final AtomicBoolean errored = new AtomicBoolean(false);
 
-
-                       SlotAllocationFutureAction action = new 
SlotAllocationFutureAction() {
-                               @Override
-                               public void slotAllocated(SimpleSlot slot) {
-                                       synchronized (toRelease) {
-                                               toRelease.add(slot);
-                                               toRelease.notifyAll();
-                                       }
-                               }
-                       };
-
                        // thread to asynchronously release slots
                        Runnable disposer = new Runnable() {
 
@@ -244,8 +235,16 @@ public class SchedulerIsolatedTasksTest {
                        disposeThread.start();
 
                        for (int i = 0; i < NUM_TASKS_TO_SCHEDULE; i++) {
-                               SlotAllocationFuture future = 
scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), true);
-                               future.setFutureAction(action);
+                               Future<SimpleSlot> future = 
scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), true);
+                               future.thenAcceptAsync(new 
AcceptFunction<SimpleSlot>() {
+                                       @Override
+                                       public void accept(SimpleSlot slot) {
+                                               synchronized (toRelease) {
+                                                       toRelease.add(slot);
+                                                       toRelease.notifyAll();
+                                               }
+                                       }
+                               }, TestingUtils.defaultExecutionContext());
                                allAllocatedSlots.add(future);
                        }
 
@@ -254,8 +253,8 @@ public class SchedulerIsolatedTasksTest {
                        assertFalse("The slot releasing thread caused an 
error.", errored.get());
 
                        List<SimpleSlot> slotsAfter = new 
ArrayList<SimpleSlot>();
-                       for (SlotAllocationFuture future : allAllocatedSlots) {
-                               slotsAfter.add(future.waitTillCompleted());
+                       for (Future<SimpleSlot> future : allAllocatedSlots) {
+                               slotsAfter.add(future.get());
                        }
 
                        assertEquals("All instances should have available 
slots.", NUM_INSTANCES,

http://git-wip-us.apache.org/repos/asf/flink/blob/7b88f1a7/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFutureTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFutureTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFutureTest.java
deleted file mode 100644
index ea0d2cc..0000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFutureTest.java
+++ /dev/null
@@ -1,200 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobmanager.scheduler;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.instance.Instance;
-import org.apache.flink.runtime.instance.SimpleSlot;
-
-import org.junit.Test;
-
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.fail;
-
-public class SlotAllocationFutureTest {
-
-       @Test
-       public void testInvalidActions() {
-               try {
-                       final SlotAllocationFuture future = new 
SlotAllocationFuture();
-                       
-                       SlotAllocationFutureAction action = new 
SlotAllocationFutureAction() {
-                               @Override
-                               public void slotAllocated(SimpleSlot slot) {}
-                       };
-                       
-                       future.setFutureAction(action);
-                       try {
-                               future.setFutureAction(action);
-                               fail();
-                       } catch (IllegalStateException e) {
-                               // expected
-                       }
-
-                       final Instance instance1 = 
SchedulerTestUtils.getRandomInstance(1);
-                       final Instance instance2 = 
SchedulerTestUtils.getRandomInstance(1);
-
-                       final SimpleSlot slot1 = new SimpleSlot(new JobID(), 
instance1,
-                                       instance1.getTaskManagerLocation(), 0, 
instance1.getActorGateway(), null, null);
-                       final SimpleSlot slot2 = new SimpleSlot(new JobID(), 
instance2,
-                                       instance2.getTaskManagerLocation(), 0, 
instance2.getActorGateway(), null, null);
-                       
-                       future.setSlot(slot1);
-                       try {
-                               future.setSlot(slot2);
-                               fail();
-                       } catch (IllegalStateException e) {
-                               // expected
-                       }
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-       
-       @Test
-       public void setWithAction() {
-               try {
-                       
-                       // action before the slot
-                       {
-                               final AtomicInteger invocations = new 
AtomicInteger();
-
-                               final Instance instance = 
SchedulerTestUtils.getRandomInstance(1);
-
-                               final SimpleSlot thisSlot = new SimpleSlot(new 
JobID(), instance,
-                                               
instance.getTaskManagerLocation(), 0, instance.getActorGateway(), null, null);
-                               
-                               SlotAllocationFuture future = new 
SlotAllocationFuture();
-                               
-                               future.setFutureAction(new 
SlotAllocationFutureAction() {
-                                       @Override
-                                       public void slotAllocated(SimpleSlot 
slot) {
-                                               assertEquals(thisSlot, slot);
-                                               invocations.incrementAndGet();
-                                       }
-                               });
-                               
-                               future.setSlot(thisSlot);
-                               
-                               assertEquals(1, invocations.get());
-                       }
-                       
-                       // slot before action
-                       {
-                               final AtomicInteger invocations = new 
AtomicInteger();
-                               final Instance instance = 
SchedulerTestUtils.getRandomInstance(1);
-                               
-                               final SimpleSlot thisSlot = new SimpleSlot(new 
JobID(), instance,
-                                               
instance.getTaskManagerLocation(), 0, instance.getActorGateway(), null, null);
-                               
-                               SlotAllocationFuture future = new 
SlotAllocationFuture();
-                               future.setSlot(thisSlot);
-                               
-                               future.setFutureAction(new 
SlotAllocationFutureAction() {
-                                       @Override
-                                       public void slotAllocated(SimpleSlot 
slot) {
-                                               assertEquals(thisSlot, slot);
-                                               invocations.incrementAndGet();
-                                       }
-                               });
-                               
-                               assertEquals(1, invocations.get());
-                       }
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-       
-       @Test
-       public void setSync() {
-               try {
-                       // sync before setting the slot
-                       {
-                               final AtomicInteger invocations = new 
AtomicInteger();
-                               final AtomicBoolean error = new AtomicBoolean();
-
-                               final Instance instance = 
SchedulerTestUtils.getRandomInstance(1);
-
-                               final SimpleSlot thisSlot = new SimpleSlot(new 
JobID(), instance,
-                                               
instance.getTaskManagerLocation(), 0, instance.getActorGateway(), null, null);
-                               
-                               final SlotAllocationFuture future = new 
SlotAllocationFuture();
-                               
-                               
-                               Runnable r = new Runnable() {
-                                       @Override
-                                       public void run() {
-                                               try {
-                                                       SimpleSlot syncSlot = 
future.waitTillCompleted();
-                                                       if (syncSlot == null || 
syncSlot != thisSlot) {
-                                                               error.set(true);
-                                                               return;
-                                                       }
-                                                       
invocations.incrementAndGet();
-                                               }
-                                               catch (Throwable t) {
-                                                       error.set(true);
-                                               }
-                                       }
-                               };
-                               
-                               Thread syncer = new Thread(r);
-                               syncer.start();
-                               
-                               // wait, and give the sync thread a chance to 
sync
-                               Thread.sleep(10);
-                               future.setSlot(thisSlot);
-                               
-                               syncer.join();
-                               
-                               assertFalse(error.get());
-                               assertEquals(1, invocations.get());
-                       }
-                       
-                       // setting slot before syncing
-                       {
-                               final Instance instance = 
SchedulerTestUtils.getRandomInstance(1);
-
-                               final SimpleSlot thisSlot = new SimpleSlot(new 
JobID(), instance, 
-                                               
instance.getTaskManagerLocation(), 0, instance.getActorGateway(), null, null);
-                               final SlotAllocationFuture future = new 
SlotAllocationFuture();
-
-                               future.setSlot(thisSlot);
-                               
-                               SimpleSlot retrieved = 
future.waitTillCompleted();
-                               
-                               assertNotNull(retrieved);
-                               assertEquals(thisSlot, retrieved);
-                       }
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-}

Reply via email to