[FLINK-9910][scheduling] Execution#scheduleForeExecution does not cancel slot future
In order to properly give back an allocated slot to the SlotPool, one must not complete the result future of Execution#allocateAndAssignSlotForExecution. This commit changes the behaviour in Execution#scheduleForExecution accordingly. This closes #6385. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7f24e840 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7f24e840 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7f24e840 Branch: refs/heads/release-1.5 Commit: 7f24e840bbf4beb09826ecc32c05fcf9b470982a Parents: a0ca64d Author: Till Rohrmann <trohrm...@apache.org> Authored: Sun Jul 22 21:38:42 2018 +0200 Committer: Till Rohrmann <trohrm...@apache.org> Committed: Mon Jul 23 17:22:52 2018 +0200 ---------------------------------------------------------------------- .../concurrent/FutureConsumerWithException.java | 43 +++++++++++ .../flink/runtime/executiongraph/Execution.java | 24 +++---- .../ExecutionGraphSchedulingTest.java | 2 +- .../runtime/executiongraph/ExecutionTest.java | 75 ++++++++++++++++++++ 4 files changed, 129 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/7f24e840/flink-core/src/main/java/org/apache/flink/util/concurrent/FutureConsumerWithException.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/util/concurrent/FutureConsumerWithException.java b/flink-core/src/main/java/org/apache/flink/util/concurrent/FutureConsumerWithException.java new file mode 100644 index 0000000..c49d7dc --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/util/concurrent/FutureConsumerWithException.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.util.concurrent; + +import java.util.concurrent.CompletionException; +import java.util.function.Consumer; + +/** + * A checked extension of the {@link Consumer} interface which rethrows + * exceptions wrapped in a {@link CompletionException}. + * + * @param <T> type of the first argument + * @param <E> type of the thrown exception + */ +public interface FutureConsumerWithException<T, E extends Throwable> extends Consumer<T> { + + void acceptWithException(T value) throws E; + + @Override + default void accept(T value) { + try { + acceptWithException(value); + } catch (Throwable t) { + throw new CompletionException(t); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/7f24e840/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java index f8419d3..801f35a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java @@ -54,6 +54,7 @@ import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkException; import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.OptionalFailure; +import org.apache.flink.util.concurrent.FutureConsumerWithException; import org.slf4j.Logger; @@ -413,24 +414,19 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution // IMPORTANT: We have to use the synchronous handle operation (direct executor) here so // that we directly deploy the tasks if the slot allocation future is completed. This is // necessary for immediate deployment. - final CompletableFuture<Void> deploymentFuture = allocationFuture.handle( - (Execution ignored, Throwable throwable) -> { - if (throwable != null) { - markFailed(ExceptionUtils.stripCompletionException(throwable)); - } else { - try { - deploy(); - } catch (Throwable t) { - markFailed(ExceptionUtils.stripCompletionException(t)); - } + final CompletableFuture<Void> deploymentFuture = allocationFuture.thenAccept( + (FutureConsumerWithException<Execution, Exception>) value -> deploy()); + + deploymentFuture.whenComplete( + (Void ignored, Throwable failure) -> { + if (failure != null) { + markFailed(ExceptionUtils.stripCompletionException(failure)); } - return null; - } - ); + }); // if tasks have to scheduled immediately check that the task has been deployed if (!queued && !deploymentFuture.isDone()) { - allocationFuture.completeExceptionally(new IllegalArgumentException("The slot allocation future has not been completed yet.")); + deploymentFuture.completeExceptionally(new IllegalArgumentException("The slot allocation future has not been completed yet.")); } return deploymentFuture; http://git-wip-us.apache.org/repos/asf/flink/blob/7f24e840/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java index 6092f52..6680c9e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java @@ -640,7 +640,7 @@ public class ExecutionGraphSchedulingTest extends TestLogger { } @Nonnull - private SingleLogicalSlot createSingleLogicalSlot(TestingSlotOwner slotOwner, SimpleAckingTaskManagerGateway taskManagerGateway, SlotRequestId slotRequestId) { + static SingleLogicalSlot createSingleLogicalSlot(SlotOwner slotOwner, TaskManagerGateway taskManagerGateway, SlotRequestId slotRequestId) { TaskManagerLocation location = new TaskManagerLocation( ResourceID.generate(), InetAddress.getLoopbackAddress(), 12345); http://git-wip-us.apache.org/repos/asf/flink/blob/7f24e840/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java index d3e88e1..56fd7e1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java @@ -33,6 +33,7 @@ import org.apache.flink.runtime.jobmaster.LogicalSlot; import org.apache.flink.runtime.jobmaster.SlotOwner; import org.apache.flink.runtime.jobmaster.SlotRequestId; import org.apache.flink.runtime.jobmaster.TestingLogicalSlot; +import org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot; import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider; import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; @@ -50,6 +51,8 @@ import java.util.Collections; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.equalTo; @@ -419,6 +422,78 @@ public class ExecutionTest extends TestLogger { assertThat(execution.getTaskRestore(), is(nullValue())); } + @Test + public void testEagerSchedulingFailureReturnsSlot() throws Exception { + final JobVertex jobVertex = createNoOpJobVertex(); + final JobVertexID jobVertexId = jobVertex.getID(); + + final SimpleAckingTaskManagerGateway taskManagerGateway = new SimpleAckingTaskManagerGateway(); + final SingleSlotTestingSlotOwner slotOwner = new SingleSlotTestingSlotOwner(); + + final CompletableFuture<SlotRequestId> slotRequestIdFuture = new CompletableFuture<>(); + final CompletableFuture<SlotRequestId> returnedSlotFuture = new CompletableFuture<>(); + + final TestingSlotProvider slotProvider = new TestingSlotProvider( + (SlotRequestId slotRequestId) -> { + slotRequestIdFuture.complete(slotRequestId); + return new CompletableFuture<>(); + }); + + slotProvider.setSlotCanceller(returnedSlotFuture::complete); + slotOwner.getReturnedSlotFuture().thenAccept( + (LogicalSlot logicalSlot) -> returnedSlotFuture.complete(logicalSlot.getSlotRequestId())); + + ExecutionGraph executionGraph = ExecutionGraphTestUtils.createSimpleTestGraph( + new JobID(), + slotProvider, + new NoRestartStrategy(), + jobVertex); + + ExecutionJobVertex executionJobVertex = executionGraph.getJobVertex(jobVertexId); + + ExecutionVertex executionVertex = executionJobVertex.getTaskVertices()[0]; + + final Execution execution = executionVertex.getCurrentExecutionAttempt(); + + taskManagerGateway.setCancelConsumer( + executionAttemptID -> { + if (execution.getAttemptId().equals(executionAttemptID)) { + execution.cancelingComplete(); + } + } + ); + + final ExecutorService executorService = Executors.newFixedThreadPool(1); + + try { + slotRequestIdFuture.thenAcceptAsync( + (SlotRequestId slotRequestId) -> { + final SingleLogicalSlot singleLogicalSlot = ExecutionGraphSchedulingTest.createSingleLogicalSlot( + slotOwner, + taskManagerGateway, + slotRequestId); + slotProvider.complete(slotRequestId, singleLogicalSlot); + }, + executorService); + + final CompletableFuture<Void> schedulingFuture = execution.scheduleForExecution( + slotProvider, + false, + LocationPreferenceConstraint.ANY); + + try { + schedulingFuture.get(); + // cancel the execution in case we could schedule the execution + execution.cancel(); + } catch (ExecutionException ignored) { + } + + assertThat(returnedSlotFuture.get(), is(equalTo(slotRequestIdFuture.get()))); + } finally { + executorService.shutdownNow(); + } + } + @Nonnull private JobVertex createNoOpJobVertex() { final JobVertex jobVertex = new JobVertex("Test vertex", new JobVertexID());