Repository: flink Updated Branches: refs/heads/master 6e123d287 -> 84672c22f
[FLINK-4690] Use direct executor to run slot allocation future handler Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/84672c22 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/84672c22 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/84672c22 Branch: refs/heads/master Commit: 84672c22f8088a70caf35b54d74eee458bf600dd Parents: 7b88f1a Author: Till Rohrmann <trohrm...@apache.org> Authored: Tue Sep 27 15:33:07 2016 +0200 Committer: Till Rohrmann <trohrm...@apache.org> Committed: Tue Sep 27 18:39:36 2016 +0200 ---------------------------------------------------------------------- .../flink/runtime/concurrent/Executors.java | 52 +++++++++++++++++ .../flink/runtime/executiongraph/Execution.java | 61 ++++++++------------ .../runtime/jobmanager/scheduler/Scheduler.java | 15 +++-- 3 files changed, 84 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/84672c22/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java new file mode 100644 index 0000000..1832d70 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.concurrent; + +import java.util.concurrent.Executor; + +/** + * Collection of {@link Executor} implementations + */ +public class Executors { + + /** + * Return a direct executor. The direct executor directly executes the runnable in the calling + * thread. + * + * @return Direct executor + */ + public static Executor directExecutor() { + return DirectExecutor.INSTANCE; + } + + /** + * Direct executor implementation. + */ + private static class DirectExecutor implements Executor { + + static final DirectExecutor INSTANCE = new DirectExecutor(); + + private DirectExecutor() {} + + @Override + public void execute(Runnable command) { + command.run(); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/84672c22/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java index 8c02e1b..912ff10 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java @@ -26,6 +26,7 @@ import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.concurrent.BiFunction; +import org.apache.flink.runtime.concurrent.Executors; import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor; import org.apache.flink.runtime.deployment.PartialInputChannelDeploymentDescriptor; @@ -52,7 +53,6 @@ import org.apache.flink.util.ExceptionUtils; import org.slf4j.Logger; import scala.concurrent.ExecutionContext; -import scala.concurrent.ExecutionContext$; import scala.concurrent.duration.FiniteDuration; import java.util.ArrayList; @@ -297,49 +297,38 @@ public class Execution { // IMPORTANT: To prevent leaks of cluster resources, we need to make sure that slots are returned // in all cases where the deployment failed. we use many try {} finally {} clauses to assure that - final Future<SimpleSlot> future = slotProvider.allocateSlot(toSchedule, queued); + final Future<SimpleSlot> slotAllocationFuture = slotProvider.allocateSlot(toSchedule, queued); - if (queued) { - future.handleAsync(new BiFunction<SimpleSlot, Throwable, Void>() { - @Override - public Void apply(SimpleSlot simpleSlot, Throwable throwable) { - if (simpleSlot != null) { + // IMPORTANT: We have to use the direct executor here so that we directly deploy the tasks + // if the slot allocation future is completed. This is necessary for immediate deployment + final Future<Void> deploymentFuture = slotAllocationFuture.handleAsync(new BiFunction<SimpleSlot, Throwable, Void>() { + @Override + public Void apply(SimpleSlot simpleSlot, Throwable throwable) { + if (simpleSlot != null) { + try { + deployToSlot(simpleSlot); + } catch (Throwable t) { try { - deployToSlot(simpleSlot); - } catch (Throwable t) { - try { - simpleSlot.releaseSlot(); - } finally { - markFailed(t); - } + simpleSlot.releaseSlot(); + } finally { + markFailed(t); } } - else { - markFailed(throwable); - } - return null; } - }, ExecutionContext$.MODULE$.global()); - } - else { - SimpleSlot slot = null; - try { - // when queued is not allowed, we will get a slot or NoResourceAvailableException will be - // thrown earlier (when allocateSlot). - slot = checkNotNull(future.getNow(null)); - deployToSlot(slot); - } - catch (Throwable t) { - try { - if (slot != null) { - slot.releaseSlot(); - } - } finally { - markFailed(t); + else { + markFailed(throwable); } + return null; } - } + }, Executors.directExecutor()); + // if tasks have to scheduled immediately check that the task has been deployed + if (!queued) { + if (!deploymentFuture.isDone()) { + markFailed(new IllegalArgumentException("The slot allocation future has not been completed yet.")); + } + } + return true; } else { http://git-wip-us.apache.org/repos/asf/flink/blob/84672c22/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java index ce2f6f7..b839e0e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java @@ -39,6 +39,7 @@ import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.concurrent.CompletableFuture; import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; import org.apache.flink.runtime.instance.SlotProvider; @@ -140,9 +141,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl final Object ret = scheduleTask(task, allowQueued); if (ret instanceof SimpleSlot) { - FlinkCompletableFuture<SimpleSlot> future = new FlinkCompletableFuture<>(); - future.complete((SimpleSlot) ret); - return future; + return FlinkCompletableFuture.completed((SimpleSlot) ret); } else if (ret instanceof Future) { return (Future) ret; @@ -153,7 +152,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl } /** - * Returns either a {@link org.apache.flink.runtime.instance.SimpleSlot}, or a {@link Future}. + * Returns either a {@link SimpleSlot}, or a {@link Future}. */ private Object scheduleTask(ScheduledUnit task, boolean queueIfNoResource) throws NoResourceAvailableException { if (task == null) { @@ -316,7 +315,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl else { // no resource available now, so queue the request if (queueIfNoResource) { - FlinkCompletableFuture<SimpleSlot> future = new FlinkCompletableFuture<>(); + CompletableFuture<SimpleSlot> future = new FlinkCompletableFuture<>(); this.taskQueue.add(new QueuedTask(task, future)); return future; } @@ -833,10 +832,10 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl private final ScheduledUnit task; - private final FlinkCompletableFuture<SimpleSlot> future; + private final CompletableFuture<SimpleSlot> future; - public QueuedTask(ScheduledUnit task, FlinkCompletableFuture<SimpleSlot> future) { + public QueuedTask(ScheduledUnit task, CompletableFuture<SimpleSlot> future) { this.task = task; this.future = future; } @@ -845,7 +844,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl return task; } - public FlinkCompletableFuture<SimpleSlot> getFuture() { + public CompletableFuture<SimpleSlot> getFuture() { return future; } }