[FLINK-4690] Replace SlotAllocationFuture with flink's own future This closes #2552.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7b88f1a7 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7b88f1a7 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7b88f1a7 Branch: refs/heads/master Commit: 7b88f1a75ea92f6b26624a7358e7fcafa3e9506f Parents: f8138f4 Author: Kurt Young <ykt...@gmail.com> Authored: Tue Sep 27 12:10:08 2016 +0800 Committer: Till Rohrmann <trohrm...@apache.org> Committed: Tue Sep 27 18:39:36 2016 +0200 ---------------------------------------------------------------------- .../runtime/concurrent/impl/FlinkFuture.java | 1 - .../flink/runtime/executiongraph/Execution.java | 55 ++--- .../flink/runtime/instance/SlotProvider.java | 6 +- .../runtime/jobmanager/scheduler/Scheduler.java | 24 ++- .../scheduler/SlotAllocationFuture.java | 146 -------------- .../scheduler/SlotAllocationFutureAction.java | 34 ---- .../ExecutionGraphMetricsTest.java | 9 +- .../ExecutionVertexSchedulingTest.java | 19 +- .../scheduler/SchedulerIsolatedTasksTest.java | 31 ++- .../scheduler/SlotAllocationFutureTest.java | 200 ------------------- 10 files changed, 80 insertions(+), 445 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/7b88f1a7/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkFuture.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkFuture.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkFuture.java index 361cd3d..3f2c5e4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkFuture.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkFuture.java @@ -104,7 +104,6 @@ public class FlinkFuture<T> implements Future<T> { @Override public T getNow(T valueIfAbsent) throws ExecutionException { Preconditions.checkNotNull(scalaFuture); - Preconditions.checkNotNull(valueIfAbsent); Option<Try<T>> value = scalaFuture.value(); http://git-wip-us.apache.org/repos/asf/flink/blob/7b88f1a7/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java index 6826365..8c02e1b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java @@ -20,18 +20,18 @@ package org.apache.flink.runtime.executiongraph; import akka.dispatch.OnComplete; import akka.dispatch.OnFailure; - import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.runtime.JobException; import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult; import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.concurrent.BiFunction; +import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor; import org.apache.flink.runtime.deployment.PartialInputChannelDeploymentDescriptor; import org.apache.flink.runtime.deployment.ResultPartitionLocation; import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; import org.apache.flink.runtime.execution.ExecutionState; -import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.instance.SimpleSlot; import org.apache.flink.runtime.instance.SlotProvider; @@ -41,20 +41,18 @@ import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint; import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException; import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit; -import org.apache.flink.runtime.jobmanager.scheduler.SlotAllocationFuture; -import org.apache.flink.runtime.jobmanager.scheduler.SlotAllocationFutureAction; import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; import org.apache.flink.runtime.messages.Messages; import org.apache.flink.runtime.messages.TaskMessages.TaskOperationResult; import org.apache.flink.runtime.state.ChainedStateHandle; import org.apache.flink.runtime.state.KeyGroupsStateHandle; import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.util.ExceptionUtils; - import org.slf4j.Logger; import scala.concurrent.ExecutionContext; -import scala.concurrent.Future; +import scala.concurrent.ExecutionContext$; import scala.concurrent.duration.FiniteDuration; import java.util.ArrayList; @@ -299,32 +297,43 @@ public class Execution { // IMPORTANT: To prevent leaks of cluster resources, we need to make sure that slots are returned // in all cases where the deployment failed. we use many try {} finally {} clauses to assure that - final SlotAllocationFuture future = slotProvider.allocateSlot(toSchedule, queued); + final Future<SimpleSlot> future = slotProvider.allocateSlot(toSchedule, queued); + if (queued) { - future.setFutureAction(new SlotAllocationFutureAction() { + future.handleAsync(new BiFunction<SimpleSlot, Throwable, Void>() { @Override - public void slotAllocated(SimpleSlot slot) { - try { - deployToSlot(slot); - } - catch (Throwable t) { + public Void apply(SimpleSlot simpleSlot, Throwable throwable) { + if (simpleSlot != null) { try { - slot.releaseSlot(); - } finally { - markFailed(t); + deployToSlot(simpleSlot); + } catch (Throwable t) { + try { + simpleSlot.releaseSlot(); + } finally { + markFailed(t); + } } } + else { + markFailed(throwable); + } + return null; } - }); + }, ExecutionContext$.MODULE$.global()); } else { - SimpleSlot slot = future.get(); + SimpleSlot slot = null; try { + // when queued is not allowed, we will get a slot or NoResourceAvailableException will be + // thrown earlier (when allocateSlot). + slot = checkNotNull(future.getNow(null)); deployToSlot(slot); } catch (Throwable t) { try { - slot.releaseSlot(); + if (slot != null) { + slot.releaseSlot(); + } } finally { markFailed(t); } @@ -394,7 +403,7 @@ public class Execution { final ActorGateway gateway = slot.getTaskManagerActorGateway(); - final Future<Object> deployAction = gateway.ask(new SubmitTask(deployment), timeout); + final scala.concurrent.Future<Object> deployAction = gateway.ask(new SubmitTask(deployment), timeout); deployAction.onComplete(new OnComplete<Object>(){ @@ -436,7 +445,7 @@ public class Execution { if (slot != null) { final ActorGateway gateway = slot.getTaskManagerActorGateway(); - Future<Object> stopResult = gateway.retry( + scala.concurrent.Future<Object> stopResult = gateway.retry( new StopTask(attemptId), NUM_STOP_CALL_TRIES, timeout, @@ -916,7 +925,7 @@ public class Execution { final ActorGateway gateway = slot.getTaskManagerActorGateway(); - Future<Object> cancelResult = gateway.retry( + scala.concurrent.Future<Object> cancelResult = gateway.retry( new CancelTask(attemptId), NUM_CANCEL_CALL_TRIES, timeout, @@ -965,7 +974,7 @@ public class Execution { final ActorGateway gateway = consumerSlot.getTaskManagerActorGateway(); final TaskManagerLocation taskManagerLocation = consumerSlot.getTaskManagerLocation(); - Future<Object> futureUpdate = gateway.ask(updatePartitionInfo, timeout); + scala.concurrent.Future<Object> futureUpdate = gateway.ask(updatePartitionInfo, timeout); futureUpdate.onFailure(new OnFailure() { @Override http://git-wip-us.apache.org/repos/asf/flink/blob/7b88f1a7/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotProvider.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotProvider.java index b2c23a5..49e6d9f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotProvider.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotProvider.java @@ -18,9 +18,9 @@ package org.apache.flink.runtime.instance; +import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException; import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit; -import org.apache.flink.runtime.jobmanager.scheduler.SlotAllocationFuture; /** * The slot provider is responsible for preparing slots for ready-to-run tasks. @@ -28,7 +28,7 @@ import org.apache.flink.runtime.jobmanager.scheduler.SlotAllocationFuture; * <p>It supports two allocating modes: * <ul> * <li>Immediate allocating: A request for a task slot immediately gets satisfied, we can call - * {@link SlotAllocationFuture#get()} to get the allocated slot.</li> + * {@link Future#getNow(Object)} to get the allocated slot.</li> * <li>Queued allocating: A request for a task slot is queued and returns a future that will be * fulfilled as soon as a slot becomes available.</li> * </ul> @@ -44,5 +44,5 @@ public interface SlotProvider { * * @throws NoResourceAvailableException */ - SlotAllocationFuture allocateSlot(ScheduledUnit task, boolean allowQueued) throws NoResourceAvailableException; + Future<SimpleSlot> allocateSlot(ScheduledUnit task, boolean allowQueued) throws NoResourceAvailableException; } http://git-wip-us.apache.org/repos/asf/flink/blob/7b88f1a7/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java index c9cdd00..ce2f6f7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java @@ -39,6 +39,8 @@ import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; import org.apache.flink.runtime.instance.SlotProvider; import org.apache.flink.runtime.instance.SlotSharingGroupAssignment; import org.apache.flink.runtime.jobgraph.JobVertexID; @@ -133,15 +135,17 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl @Override - public SlotAllocationFuture allocateSlot(ScheduledUnit task, boolean allowQueued) + public Future<SimpleSlot> allocateSlot(ScheduledUnit task, boolean allowQueued) throws NoResourceAvailableException { final Object ret = scheduleTask(task, allowQueued); if (ret instanceof SimpleSlot) { - return new SlotAllocationFuture((SimpleSlot) ret); + FlinkCompletableFuture<SimpleSlot> future = new FlinkCompletableFuture<>(); + future.complete((SimpleSlot) ret); + return future; } - else if (ret instanceof SlotAllocationFuture) { - return (SlotAllocationFuture) ret; + else if (ret instanceof Future) { + return (Future) ret; } else { throw new RuntimeException(); @@ -149,7 +153,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl } /** - * Returns either a {@link org.apache.flink.runtime.instance.SimpleSlot}, or a {@link SlotAllocationFuture}. + * Returns either a {@link org.apache.flink.runtime.instance.SimpleSlot}, or a {@link Future}. */ private Object scheduleTask(ScheduledUnit task, boolean queueIfNoResource) throws NoResourceAvailableException { if (task == null) { @@ -312,7 +316,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl else { // no resource available now, so queue the request if (queueIfNoResource) { - SlotAllocationFuture future = new SlotAllocationFuture(); + FlinkCompletableFuture<SimpleSlot> future = new FlinkCompletableFuture<>(); this.taskQueue.add(new QueuedTask(task, future)); return future; } @@ -560,7 +564,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl taskQueue.poll(); if (queued.getFuture() != null) { try { - queued.getFuture().setSlot(newSlot); + queued.getFuture().complete(newSlot); } catch (Throwable t) { LOG.error("Error calling allocation future for task " + vertex.getSimpleName(), t); @@ -829,10 +833,10 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl private final ScheduledUnit task; - private final SlotAllocationFuture future; + private final FlinkCompletableFuture<SimpleSlot> future; - public QueuedTask(ScheduledUnit task, SlotAllocationFuture future) { + public QueuedTask(ScheduledUnit task, FlinkCompletableFuture<SimpleSlot> future) { this.task = task; this.future = future; } @@ -841,7 +845,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl return task; } - public SlotAllocationFuture getFuture() { + public FlinkCompletableFuture<SimpleSlot> getFuture() { return future; } } http://git-wip-us.apache.org/repos/asf/flink/blob/7b88f1a7/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFuture.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFuture.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFuture.java deleted file mode 100644 index 36e4072..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFuture.java +++ /dev/null @@ -1,146 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.jobmanager.scheduler; - -import org.apache.flink.runtime.instance.SimpleSlot; - -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -import static org.apache.flink.util.Preconditions.checkArgument; -import static org.apache.flink.util.Preconditions.checkNotNull; -import static org.apache.flink.util.Preconditions.checkState; - -/** - * - */ -public class SlotAllocationFuture { - - private final Object monitor = new Object(); - - private volatile SimpleSlot slot; - - private volatile SlotAllocationFutureAction action; - - // -------------------------------------------------------------------------------------------- - - /** - * Creates a future that is uncompleted. - */ - public SlotAllocationFuture() {} - - /** - * Creates a future that is immediately completed. - * - * @param slot The task slot that completes the future. - */ - public SlotAllocationFuture(SimpleSlot slot) { - this.slot = slot; - } - - // -------------------------------------------------------------------------------------------- - - public SimpleSlot waitTillCompleted() throws InterruptedException { - synchronized (monitor) { - while (slot == null) { - monitor.wait(); - } - return slot; - } - } - - public SimpleSlot waitTillCompleted(long timeout, TimeUnit timeUnit) throws InterruptedException, TimeoutException { - checkArgument(timeout >= 0, "timeout may not be negative"); - checkNotNull(timeUnit, "timeUnit"); - - if (timeout == 0) { - return waitTillCompleted(); - } else { - final long deadline = System.nanoTime() + timeUnit.toNanos(timeout); - long millisToWait; - - synchronized (monitor) { - while (slot == null && (millisToWait = (deadline - System.nanoTime()) / 1_000_000) > 0) { - monitor.wait(millisToWait); - } - - if (slot != null) { - return slot; - } else { - throw new TimeoutException(); - } - } - } - } - - /** - * Gets the slot from this future. This method throws an exception, if the future has not been completed. - * This method never blocks. - * - * @return The slot with which this future was completed. - * @throws IllegalStateException Thrown, if this method is called before the future is completed. - */ - public SimpleSlot get() { - final SimpleSlot slot = this.slot; - if (slot != null) { - return slot; - } else { - throw new IllegalStateException("The future is not complete - not slot available"); - } - } - - public void setFutureAction(SlotAllocationFutureAction action) { - checkNotNull(action); - - synchronized (monitor) { - checkState(this.action == null, "Future already has an action registered."); - - this.action = action; - - if (this.slot != null) { - action.slotAllocated(this.slot); - } - } - } - - /** - * Completes the future with a slot. - */ - public void setSlot(SimpleSlot slot) { - checkNotNull(slot); - - synchronized (monitor) { - checkState(this.slot == null, "The future has already been assigned a slot."); - - this.slot = slot; - monitor.notifyAll(); - - if (action != null) { - action.slotAllocated(slot); - } - } - } - - // -------------------------------------------------------------------------------------------- - - @Override - public String toString() { - return slot == null ? "PENDING" : "DONE"; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/7b88f1a7/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFutureAction.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFutureAction.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFutureAction.java deleted file mode 100644 index f9d032f..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFutureAction.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.jobmanager.scheduler; - -import org.apache.flink.runtime.instance.SimpleSlot; - -/** - * An action that is invoked once a {@link SlotAllocationFuture} is triggered. - */ -public interface SlotAllocationFutureAction { - - /** - * This method is called as soon as the SlotAllocationFuture is triggered. - * - * @param slot The slot that has been allocated. - */ - void slotAllocated(SimpleSlot slot); -} http://git-wip-us.apache.org/repos/asf/flink/blob/7b88f1a7/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java index aa5925f..a58d910 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java @@ -29,11 +29,11 @@ import org.apache.flink.metrics.reporter.MetricReporter; import org.apache.flink.runtime.JobException; import org.apache.flink.runtime.blob.BlobKey; import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.restart.RestartStrategy; import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.instance.Instance; -import org.apache.flink.runtime.jobmanager.scheduler.SlotAllocationFuture; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.instance.SimpleSlot; import org.apache.flink.runtime.instance.Slot; @@ -136,10 +136,9 @@ public class ExecutionGraphMetricsTest extends TestLogger { when(simpleSlot.setExecutedVertex(Matchers.any(Execution.class))).thenReturn(true); when(simpleSlot.getRoot()).thenReturn(rootSlot); - when(scheduler.allocateSlot(any(ScheduledUnit.class), anyBoolean())) - .thenReturn(new SlotAllocationFuture(simpleSlot)); - - + FlinkCompletableFuture<SimpleSlot> future = new FlinkCompletableFuture<>(); + future.complete(simpleSlot); + when(scheduler.allocateSlot(any(ScheduledUnit.class), anyBoolean())).thenReturn(future); when(rootSlot.getSlotNumber()).thenReturn(0); http://git-wip-us.apache.org/repos/asf/flink/blob/7b88f1a7/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java index c576ce5..104f4ea 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.executiongraph; import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.instance.DummyActorGateway; import org.apache.flink.runtime.instance.Instance; @@ -26,7 +27,6 @@ import org.apache.flink.runtime.instance.SimpleSlot; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit; import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; -import org.apache.flink.runtime.jobmanager.scheduler.SlotAllocationFuture; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.junit.Test; @@ -58,8 +58,9 @@ public class ExecutionVertexSchedulingTest { assertTrue(slot.isReleased()); Scheduler scheduler = mock(Scheduler.class); - when(scheduler.allocateSlot(Matchers.any(ScheduledUnit.class), anyBoolean())) - .thenReturn(new SlotAllocationFuture(slot)); + FlinkCompletableFuture<SimpleSlot> future = new FlinkCompletableFuture<>(); + future.complete(slot); + when(scheduler.allocateSlot(Matchers.any(ScheduledUnit.class), anyBoolean())).thenReturn(future); assertEquals(ExecutionState.CREATED, vertex.getExecutionState()); // try to deploy to the slot @@ -88,7 +89,7 @@ public class ExecutionVertexSchedulingTest { slot.releaseSlot(); assertTrue(slot.isReleased()); - final SlotAllocationFuture future = new SlotAllocationFuture(); + final FlinkCompletableFuture<SimpleSlot> future = new FlinkCompletableFuture<>(); Scheduler scheduler = mock(Scheduler.class); when(scheduler.allocateSlot(Matchers.any(ScheduledUnit.class), anyBoolean())).thenReturn(future); @@ -100,7 +101,10 @@ public class ExecutionVertexSchedulingTest { // future has not yet a slot assertEquals(ExecutionState.SCHEDULED, vertex.getExecutionState()); - future.setSlot(slot); + future.complete(slot); + + // wait a second for future's future action be executed + Thread.sleep(1000); // will have failed assertEquals(ExecutionState.FAILED, vertex.getExecutionState()); @@ -122,8 +126,9 @@ public class ExecutionVertexSchedulingTest { final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId()); Scheduler scheduler = mock(Scheduler.class); - when(scheduler.allocateSlot(Matchers.any(ScheduledUnit.class), anyBoolean())) - .thenReturn(new SlotAllocationFuture(slot)); + FlinkCompletableFuture<SimpleSlot> future = new FlinkCompletableFuture<>(); + future.complete(slot); + when(scheduler.allocateSlot(Matchers.any(ScheduledUnit.class), anyBoolean())).thenReturn(future); assertEquals(ExecutionState.CREATED, vertex.getExecutionState()); http://git-wip-us.apache.org/repos/asf/flink/blob/7b88f1a7/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java index d78f551..9c21533 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java @@ -18,6 +18,8 @@ package org.apache.flink.runtime.jobmanager.scheduler; +import org.apache.flink.runtime.concurrent.AcceptFunction; +import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.runtime.instance.Instance; import org.apache.flink.runtime.instance.SimpleSlot; import org.apache.flink.runtime.testingUtils.TestingUtils; @@ -194,7 +196,7 @@ public class SchedulerIsolatedTasksTest { final int totalSlots = scheduler.getNumberOfAvailableSlots(); // all slots we ever got. - List<SlotAllocationFuture> allAllocatedSlots = new ArrayList<SlotAllocationFuture>(); + List<Future<SimpleSlot>> allAllocatedSlots = new ArrayList<>(); // slots that need to be released final Set<SimpleSlot> toRelease = new HashSet<SimpleSlot>(); @@ -202,17 +204,6 @@ public class SchedulerIsolatedTasksTest { // flag to track errors in the concurrent thread final AtomicBoolean errored = new AtomicBoolean(false); - - SlotAllocationFutureAction action = new SlotAllocationFutureAction() { - @Override - public void slotAllocated(SimpleSlot slot) { - synchronized (toRelease) { - toRelease.add(slot); - toRelease.notifyAll(); - } - } - }; - // thread to asynchronously release slots Runnable disposer = new Runnable() { @@ -244,8 +235,16 @@ public class SchedulerIsolatedTasksTest { disposeThread.start(); for (int i = 0; i < NUM_TASKS_TO_SCHEDULE; i++) { - SlotAllocationFuture future = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), true); - future.setFutureAction(action); + Future<SimpleSlot> future = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), true); + future.thenAcceptAsync(new AcceptFunction<SimpleSlot>() { + @Override + public void accept(SimpleSlot slot) { + synchronized (toRelease) { + toRelease.add(slot); + toRelease.notifyAll(); + } + } + }, TestingUtils.defaultExecutionContext()); allAllocatedSlots.add(future); } @@ -254,8 +253,8 @@ public class SchedulerIsolatedTasksTest { assertFalse("The slot releasing thread caused an error.", errored.get()); List<SimpleSlot> slotsAfter = new ArrayList<SimpleSlot>(); - for (SlotAllocationFuture future : allAllocatedSlots) { - slotsAfter.add(future.waitTillCompleted()); + for (Future<SimpleSlot> future : allAllocatedSlots) { + slotsAfter.add(future.get()); } assertEquals("All instances should have available slots.", NUM_INSTANCES, http://git-wip-us.apache.org/repos/asf/flink/blob/7b88f1a7/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFutureTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFutureTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFutureTest.java deleted file mode 100644 index ea0d2cc..0000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFutureTest.java +++ /dev/null @@ -1,200 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.jobmanager.scheduler; - -import org.apache.flink.api.common.JobID; -import org.apache.flink.runtime.instance.Instance; -import org.apache.flink.runtime.instance.SimpleSlot; - -import org.junit.Test; - -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.fail; - -public class SlotAllocationFutureTest { - - @Test - public void testInvalidActions() { - try { - final SlotAllocationFuture future = new SlotAllocationFuture(); - - SlotAllocationFutureAction action = new SlotAllocationFutureAction() { - @Override - public void slotAllocated(SimpleSlot slot) {} - }; - - future.setFutureAction(action); - try { - future.setFutureAction(action); - fail(); - } catch (IllegalStateException e) { - // expected - } - - final Instance instance1 = SchedulerTestUtils.getRandomInstance(1); - final Instance instance2 = SchedulerTestUtils.getRandomInstance(1); - - final SimpleSlot slot1 = new SimpleSlot(new JobID(), instance1, - instance1.getTaskManagerLocation(), 0, instance1.getActorGateway(), null, null); - final SimpleSlot slot2 = new SimpleSlot(new JobID(), instance2, - instance2.getTaskManagerLocation(), 0, instance2.getActorGateway(), null, null); - - future.setSlot(slot1); - try { - future.setSlot(slot2); - fail(); - } catch (IllegalStateException e) { - // expected - } - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test - public void setWithAction() { - try { - - // action before the slot - { - final AtomicInteger invocations = new AtomicInteger(); - - final Instance instance = SchedulerTestUtils.getRandomInstance(1); - - final SimpleSlot thisSlot = new SimpleSlot(new JobID(), instance, - instance.getTaskManagerLocation(), 0, instance.getActorGateway(), null, null); - - SlotAllocationFuture future = new SlotAllocationFuture(); - - future.setFutureAction(new SlotAllocationFutureAction() { - @Override - public void slotAllocated(SimpleSlot slot) { - assertEquals(thisSlot, slot); - invocations.incrementAndGet(); - } - }); - - future.setSlot(thisSlot); - - assertEquals(1, invocations.get()); - } - - // slot before action - { - final AtomicInteger invocations = new AtomicInteger(); - final Instance instance = SchedulerTestUtils.getRandomInstance(1); - - final SimpleSlot thisSlot = new SimpleSlot(new JobID(), instance, - instance.getTaskManagerLocation(), 0, instance.getActorGateway(), null, null); - - SlotAllocationFuture future = new SlotAllocationFuture(); - future.setSlot(thisSlot); - - future.setFutureAction(new SlotAllocationFutureAction() { - @Override - public void slotAllocated(SimpleSlot slot) { - assertEquals(thisSlot, slot); - invocations.incrementAndGet(); - } - }); - - assertEquals(1, invocations.get()); - } - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test - public void setSync() { - try { - // sync before setting the slot - { - final AtomicInteger invocations = new AtomicInteger(); - final AtomicBoolean error = new AtomicBoolean(); - - final Instance instance = SchedulerTestUtils.getRandomInstance(1); - - final SimpleSlot thisSlot = new SimpleSlot(new JobID(), instance, - instance.getTaskManagerLocation(), 0, instance.getActorGateway(), null, null); - - final SlotAllocationFuture future = new SlotAllocationFuture(); - - - Runnable r = new Runnable() { - @Override - public void run() { - try { - SimpleSlot syncSlot = future.waitTillCompleted(); - if (syncSlot == null || syncSlot != thisSlot) { - error.set(true); - return; - } - invocations.incrementAndGet(); - } - catch (Throwable t) { - error.set(true); - } - } - }; - - Thread syncer = new Thread(r); - syncer.start(); - - // wait, and give the sync thread a chance to sync - Thread.sleep(10); - future.setSlot(thisSlot); - - syncer.join(); - - assertFalse(error.get()); - assertEquals(1, invocations.get()); - } - - // setting slot before syncing - { - final Instance instance = SchedulerTestUtils.getRandomInstance(1); - - final SimpleSlot thisSlot = new SimpleSlot(new JobID(), instance, - instance.getTaskManagerLocation(), 0, instance.getActorGateway(), null, null); - final SlotAllocationFuture future = new SlotAllocationFuture(); - - future.setSlot(thisSlot); - - SimpleSlot retrieved = future.waitTillCompleted(); - - assertNotNull(retrieved); - assertEquals(thisSlot, retrieved); - } - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } -}