[FLINK-9910][scheduling] Execution#scheduleForeExecution does not cancel slot 
future

In order to properly give back an allocated slot to the SlotPool, one must not 
complete
the result future of Execution#allocateAndAssignSlotForExecution. This commit 
changes the
behaviour in Execution#scheduleForExecution accordingly.

This closes #6385.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7f24e840
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7f24e840
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7f24e840

Branch: refs/heads/release-1.5
Commit: 7f24e840bbf4beb09826ecc32c05fcf9b470982a
Parents: a0ca64d
Author: Till Rohrmann <trohrm...@apache.org>
Authored: Sun Jul 22 21:38:42 2018 +0200
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Mon Jul 23 17:22:52 2018 +0200

----------------------------------------------------------------------
 .../concurrent/FutureConsumerWithException.java | 43 +++++++++++
 .../flink/runtime/executiongraph/Execution.java | 24 +++----
 .../ExecutionGraphSchedulingTest.java           |  2 +-
 .../runtime/executiongraph/ExecutionTest.java   | 75 ++++++++++++++++++++
 4 files changed, 129 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7f24e840/flink-core/src/main/java/org/apache/flink/util/concurrent/FutureConsumerWithException.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/util/concurrent/FutureConsumerWithException.java
 
b/flink-core/src/main/java/org/apache/flink/util/concurrent/FutureConsumerWithException.java
new file mode 100644
index 0000000..c49d7dc
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/util/concurrent/FutureConsumerWithException.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util.concurrent;
+
+import java.util.concurrent.CompletionException;
+import java.util.function.Consumer;
+
+/**
+ * A checked extension of the {@link Consumer} interface which rethrows
+ * exceptions wrapped in a {@link CompletionException}.
+ *
+ * @param <T> type of the first argument
+ * @param <E> type of the thrown exception
+ */
+public interface FutureConsumerWithException<T, E extends Throwable> extends 
Consumer<T> {
+
+       void acceptWithException(T value) throws E;
+
+       @Override
+       default void accept(T value) {
+               try {
+                       acceptWithException(value);
+               } catch (Throwable t) {
+                       throw new CompletionException(t);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7f24e840/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index f8419d3..801f35a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -54,6 +54,7 @@ import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.OptionalFailure;
+import org.apache.flink.util.concurrent.FutureConsumerWithException;
 
 import org.slf4j.Logger;
 
@@ -413,24 +414,19 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
                        // IMPORTANT: We have to use the synchronous handle 
operation (direct executor) here so
                        // that we directly deploy the tasks if the slot 
allocation future is completed. This is
                        // necessary for immediate deployment.
-                       final CompletableFuture<Void> deploymentFuture = 
allocationFuture.handle(
-                               (Execution ignored, Throwable throwable) -> {
-                                       if (throwable != null) {
-                                               
markFailed(ExceptionUtils.stripCompletionException(throwable));
-                                       } else {
-                                               try {
-                                                       deploy();
-                                               } catch (Throwable t) {
-                                                       
markFailed(ExceptionUtils.stripCompletionException(t));
-                                               }
+                       final CompletableFuture<Void> deploymentFuture = 
allocationFuture.thenAccept(
+                               (FutureConsumerWithException<Execution, 
Exception>) value -> deploy());
+
+                       deploymentFuture.whenComplete(
+                               (Void ignored, Throwable failure) -> {
+                                       if (failure != null) {
+                                               
markFailed(ExceptionUtils.stripCompletionException(failure));
                                        }
-                                       return null;
-                               }
-                       );
+                               });
 
                        // if tasks have to scheduled immediately check that 
the task has been deployed
                        if (!queued && !deploymentFuture.isDone()) {
-                               allocationFuture.completeExceptionally(new 
IllegalArgumentException("The slot allocation future has not been completed 
yet."));
+                               deploymentFuture.completeExceptionally(new 
IllegalArgumentException("The slot allocation future has not been completed 
yet."));
                        }
 
                        return deploymentFuture;

http://git-wip-us.apache.org/repos/asf/flink/blob/7f24e840/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
index 6092f52..6680c9e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
@@ -640,7 +640,7 @@ public class ExecutionGraphSchedulingTest extends 
TestLogger {
        }
 
        @Nonnull
-       private SingleLogicalSlot createSingleLogicalSlot(TestingSlotOwner 
slotOwner, SimpleAckingTaskManagerGateway taskManagerGateway, SlotRequestId 
slotRequestId) {
+       static SingleLogicalSlot createSingleLogicalSlot(SlotOwner slotOwner, 
TaskManagerGateway taskManagerGateway, SlotRequestId slotRequestId) {
                TaskManagerLocation location = new TaskManagerLocation(
                        ResourceID.generate(), 
InetAddress.getLoopbackAddress(), 12345);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7f24e840/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
index d3e88e1..56fd7e1 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.jobmaster.LogicalSlot;
 import org.apache.flink.runtime.jobmaster.SlotOwner;
 import org.apache.flink.runtime.jobmaster.SlotRequestId;
 import org.apache.flink.runtime.jobmaster.TestingLogicalSlot;
+import org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
 import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
@@ -50,6 +51,8 @@ import java.util.Collections;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.equalTo;
@@ -419,6 +422,78 @@ public class ExecutionTest extends TestLogger {
                assertThat(execution.getTaskRestore(), is(nullValue()));
        }
 
+       @Test
+       public void testEagerSchedulingFailureReturnsSlot() throws Exception {
+               final JobVertex jobVertex = createNoOpJobVertex();
+               final JobVertexID jobVertexId = jobVertex.getID();
+
+               final SimpleAckingTaskManagerGateway taskManagerGateway = new 
SimpleAckingTaskManagerGateway();
+               final SingleSlotTestingSlotOwner slotOwner = new 
SingleSlotTestingSlotOwner();
+
+               final CompletableFuture<SlotRequestId> slotRequestIdFuture = 
new CompletableFuture<>();
+               final CompletableFuture<SlotRequestId> returnedSlotFuture = new 
CompletableFuture<>();
+
+               final TestingSlotProvider slotProvider = new 
TestingSlotProvider(
+                       (SlotRequestId slotRequestId) -> {
+                               slotRequestIdFuture.complete(slotRequestId);
+                               return new CompletableFuture<>();
+                       });
+
+               slotProvider.setSlotCanceller(returnedSlotFuture::complete);
+               slotOwner.getReturnedSlotFuture().thenAccept(
+                       (LogicalSlot logicalSlot) -> 
returnedSlotFuture.complete(logicalSlot.getSlotRequestId()));
+
+               ExecutionGraph executionGraph = 
ExecutionGraphTestUtils.createSimpleTestGraph(
+                       new JobID(),
+                       slotProvider,
+                       new NoRestartStrategy(),
+                       jobVertex);
+
+               ExecutionJobVertex executionJobVertex = 
executionGraph.getJobVertex(jobVertexId);
+
+               ExecutionVertex executionVertex = 
executionJobVertex.getTaskVertices()[0];
+
+               final Execution execution = 
executionVertex.getCurrentExecutionAttempt();
+
+               taskManagerGateway.setCancelConsumer(
+                       executionAttemptID -> {
+                               if 
(execution.getAttemptId().equals(executionAttemptID)) {
+                                       execution.cancelingComplete();
+                               }
+                       }
+               );
+
+               final ExecutorService executorService = 
Executors.newFixedThreadPool(1);
+
+               try {
+                       slotRequestIdFuture.thenAcceptAsync(
+                               (SlotRequestId slotRequestId) -> {
+                                       final SingleLogicalSlot 
singleLogicalSlot = ExecutionGraphSchedulingTest.createSingleLogicalSlot(
+                                               slotOwner,
+                                               taskManagerGateway,
+                                               slotRequestId);
+                                       slotProvider.complete(slotRequestId, 
singleLogicalSlot);
+                               },
+                               executorService);
+
+                       final CompletableFuture<Void> schedulingFuture = 
execution.scheduleForExecution(
+                               slotProvider,
+                               false,
+                               LocationPreferenceConstraint.ANY);
+
+                       try {
+                               schedulingFuture.get();
+                               // cancel the execution in case we could 
schedule the execution
+                               execution.cancel();
+                       } catch (ExecutionException ignored) {
+                       }
+
+                       assertThat(returnedSlotFuture.get(), 
is(equalTo(slotRequestIdFuture.get())));
+               } finally {
+                       executorService.shutdownNow();
+               }
+       }
+
        @Nonnull
        private JobVertex createNoOpJobVertex() {
                final JobVertex jobVertex = new JobVertex("Test vertex", new 
JobVertexID());

Reply via email to