Repository: flink
Updated Branches:
  refs/heads/master 6e123d287 -> 84672c22f


[FLINK-4690] Use direct executor to run slot allocation future handler


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/84672c22
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/84672c22
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/84672c22

Branch: refs/heads/master
Commit: 84672c22f8088a70caf35b54d74eee458bf600dd
Parents: 7b88f1a
Author: Till Rohrmann <trohrm...@apache.org>
Authored: Tue Sep 27 15:33:07 2016 +0200
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Tue Sep 27 18:39:36 2016 +0200

----------------------------------------------------------------------
 .../flink/runtime/concurrent/Executors.java     | 52 +++++++++++++++++
 .../flink/runtime/executiongraph/Execution.java | 61 ++++++++------------
 .../runtime/jobmanager/scheduler/Scheduler.java | 15 +++--
 3 files changed, 84 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/84672c22/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java
new file mode 100644
index 0000000..1832d70
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.concurrent;
+
+import java.util.concurrent.Executor;
+
+/**
+ * Collection of {@link Executor} implementations
+ */
+public class Executors {
+
+       /**
+        * Return a direct executor. The direct executor directly executes the 
runnable in the calling
+        * thread.
+        *
+        * @return Direct executor
+        */
+       public static Executor directExecutor() {
+               return DirectExecutor.INSTANCE;
+       }
+
+       /**
+        * Direct executor implementation.
+        */
+       private static class DirectExecutor implements Executor {
+
+               static final DirectExecutor INSTANCE = new DirectExecutor();
+
+               private DirectExecutor() {}
+
+               @Override
+               public void execute(Runnable command) {
+                       command.run();
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/84672c22/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 8c02e1b..912ff10 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -26,6 +26,7 @@ import 
org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.BiFunction;
+import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
 import 
org.apache.flink.runtime.deployment.PartialInputChannelDeploymentDescriptor;
@@ -52,7 +53,6 @@ import org.apache.flink.util.ExceptionUtils;
 import org.slf4j.Logger;
 
 import scala.concurrent.ExecutionContext;
-import scala.concurrent.ExecutionContext$;
 import scala.concurrent.duration.FiniteDuration;
 
 import java.util.ArrayList;
@@ -297,49 +297,38 @@ public class Execution {
 
                        // IMPORTANT: To prevent leaks of cluster resources, we 
need to make sure that slots are returned
                        //     in all cases where the deployment failed. we use 
many try {} finally {} clauses to assure that
-                       final Future<SimpleSlot> future = 
slotProvider.allocateSlot(toSchedule, queued);
+                       final Future<SimpleSlot> slotAllocationFuture = 
slotProvider.allocateSlot(toSchedule, queued);
 
-                       if (queued) {
-                               future.handleAsync(new BiFunction<SimpleSlot, 
Throwable, Void>() {
-                                       @Override
-                                       public Void apply(SimpleSlot 
simpleSlot, Throwable throwable) {
-                                               if (simpleSlot != null) {
+                       // IMPORTANT: We have to use the direct executor here 
so that we directly deploy the tasks
+                       // if the slot allocation future is completed. This is 
necessary for immediate deployment
+                       final Future<Void> deploymentFuture = 
slotAllocationFuture.handleAsync(new BiFunction<SimpleSlot, Throwable, Void>() {
+                               @Override
+                               public Void apply(SimpleSlot simpleSlot, 
Throwable throwable) {
+                                       if (simpleSlot != null) {
+                                               try {
+                                                       
deployToSlot(simpleSlot);
+                                               } catch (Throwable t) {
                                                        try {
-                                                               
deployToSlot(simpleSlot);
-                                                       } catch (Throwable t) {
-                                                               try {
-                                                                       
simpleSlot.releaseSlot();
-                                                               } finally {
-                                                                       
markFailed(t);
-                                                               }
+                                                               
simpleSlot.releaseSlot();
+                                                       } finally {
+                                                               markFailed(t);
                                                        }
                                                }
-                                               else {
-                                                       markFailed(throwable);
-                                               }
-                                               return null;
                                        }
-                               }, ExecutionContext$.MODULE$.global());
-                       }
-                       else {
-                               SimpleSlot slot = null;
-                               try {
-                                       // when queued is not allowed, we will 
get a slot or NoResourceAvailableException will be
-                                       // thrown earlier (when allocateSlot).
-                                       slot = 
checkNotNull(future.getNow(null));
-                                       deployToSlot(slot);
-                               }
-                               catch (Throwable t) {
-                                       try {
-                                               if (slot != null) {
-                                                       slot.releaseSlot();
-                                               }
-                                       } finally {
-                                               markFailed(t);
+                                       else {
+                                               markFailed(throwable);
                                        }
+                                       return null;
                                }
-                       }
+                       }, Executors.directExecutor());
 
+                       // if tasks have to scheduled immediately check that 
the task has been deployed
+                       if (!queued) {
+                               if (!deploymentFuture.isDone()) {
+                                       markFailed(new 
IllegalArgumentException("The slot allocation future has not been completed 
yet."));
+                               }
+                       }
+                       
                        return true;
                }
                else {

http://git-wip-us.apache.org/repos/asf/flink/blob/84672c22/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
index ce2f6f7..b839e0e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
@@ -39,6 +39,7 @@ import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.commons.lang3.tuple.Pair;
 
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.CompletableFuture;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.instance.SlotProvider;
@@ -140,9 +141,7 @@ public class Scheduler implements InstanceListener, 
SlotAvailabilityListener, Sl
 
                final Object ret = scheduleTask(task, allowQueued);
                if (ret instanceof SimpleSlot) {
-                       FlinkCompletableFuture<SimpleSlot> future = new 
FlinkCompletableFuture<>();
-                       future.complete((SimpleSlot) ret);
-                       return future;
+                       return FlinkCompletableFuture.completed((SimpleSlot) 
ret);
                }
                else if (ret instanceof Future) {
                        return (Future) ret;
@@ -153,7 +152,7 @@ public class Scheduler implements InstanceListener, 
SlotAvailabilityListener, Sl
        }
 
        /**
-        * Returns either a {@link 
org.apache.flink.runtime.instance.SimpleSlot}, or a {@link Future}.
+        * Returns either a {@link SimpleSlot}, or a {@link Future}.
         */
        private Object scheduleTask(ScheduledUnit task, boolean 
queueIfNoResource) throws NoResourceAvailableException {
                if (task == null) {
@@ -316,7 +315,7 @@ public class Scheduler implements InstanceListener, 
SlotAvailabilityListener, Sl
                                else {
                                        // no resource available now, so queue 
the request
                                        if (queueIfNoResource) {
-                                               
FlinkCompletableFuture<SimpleSlot> future = new FlinkCompletableFuture<>();
+                                               CompletableFuture<SimpleSlot> 
future = new FlinkCompletableFuture<>();
                                                this.taskQueue.add(new 
QueuedTask(task, future));
                                                return future;
                                        }
@@ -833,10 +832,10 @@ public class Scheduler implements InstanceListener, 
SlotAvailabilityListener, Sl
                
                private final ScheduledUnit task;
                
-               private final FlinkCompletableFuture<SimpleSlot> future;
+               private final CompletableFuture<SimpleSlot> future;
                
                
-               public QueuedTask(ScheduledUnit task, 
FlinkCompletableFuture<SimpleSlot> future) {
+               public QueuedTask(ScheduledUnit task, 
CompletableFuture<SimpleSlot> future) {
                        this.task = task;
                        this.future = future;
                }
@@ -845,7 +844,7 @@ public class Scheduler implements InstanceListener, 
SlotAvailabilityListener, Sl
                        return task;
                }
 
-               public FlinkCompletableFuture<SimpleSlot> getFuture() {
+               public CompletableFuture<SimpleSlot> getFuture() {
                        return future;
                }
        }

Reply via email to