This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch release-1.13 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.13 by this push: new ce7c78c [FLINK-21181][runtime] Wait for Invokable cancellation before releasing network resources ce7c78c is described below commit ce7c78ca72ce86df0b4a28fbd3233b89da3238e1 Author: Roman Khachatryan <khachatryan.ro...@gmail.com> AuthorDate: Wed Apr 28 00:02:33 2021 +0200 [FLINK-21181][runtime] Wait for Invokable cancellation before releasing network resources --- .../program/PerJobMiniClusterFactoryTest.java | 7 +- .../runtime/webmonitor/WebFrontendITCase.java | 15 ++- .../iterative/task/AbstractIterativeTask.java | 12 +- .../runtime/iterative/task/IterationHeadTask.java | 1 + .../iterative/task/IterationIntermediateTask.java | 61 +++++----- .../task/IterationSynchronizationSinkTask.java | 9 ++ .../runtime/iterative/task/IterationTailTask.java | 81 +++++++------- .../flink/runtime/iterative/task/Terminable.java | 2 + .../runtime/jobgraph/tasks/AbstractInvokable.java | 6 +- .../apache/flink/runtime/operators/BatchTask.java | 7 +- .../flink/runtime/operators/DataSinkTask.java | 9 +- .../flink/runtime/operators/DataSourceTask.java | 8 +- .../org/apache/flink/runtime/taskmanager/Task.java | 31 ++++-- .../jobmaster/TestingAbstractInvokables.java | 4 +- .../CoordinatorEventsExactlyOnceITCase.java | 3 +- .../TaskExecutorOperatorEventHandlingTest.java | 4 +- .../apache/flink/runtime/taskmanager/TaskTest.java | 18 ++- .../testtasks/OnceBlockingNoOpInvokable.java | 5 +- .../runtime/testutils/CancelableInvokable.java | 21 +++- .../apache/flink/runtime/jobmanager/Tasks.scala | 8 +- .../flink/streaming/runtime/tasks/StreamTask.java | 10 +- .../streaming/runtime/tasks/StreamTaskTest.java | 123 +++++++++++++++++++++ 22 files changed, 343 insertions(+), 102 deletions(-) diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/PerJobMiniClusterFactoryTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/PerJobMiniClusterFactoryTest.java index f97395c..7061f64 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/program/PerJobMiniClusterFactoryTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/program/PerJobMiniClusterFactoryTest.java @@ -34,7 +34,9 @@ import org.junit.After; import org.junit.Test; import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import static org.apache.flink.core.testutils.CommonTestUtils.assertThrows; import static org.hamcrest.CoreMatchers.is; @@ -185,7 +187,7 @@ public class PerJobMiniClusterFactoryTest extends TestLogger { } @Override - public void invoke() throws Exception { + public void doInvoke() throws Exception { synchronized (lock) { while (running) { lock.wait(); @@ -194,11 +196,12 @@ public class PerJobMiniClusterFactoryTest extends TestLogger { } @Override - public void cancel() { + public Future<Void> cancel() { synchronized (lock) { running = false; lock.notifyAll(); } + return CompletableFuture.completedFuture(null); } } } diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java index ec65969..d1c3f87 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java @@ -60,7 +60,9 @@ import java.time.Duration; import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Future; import java.util.stream.Collectors; import static org.hamcrest.CoreMatchers.containsString; @@ -443,6 +445,8 @@ public class WebFrontendITCase extends TestLogger { private volatile boolean isRunning = true; + private final CompletableFuture<Void> terminationFuture = new CompletableFuture<>(); + public BlockingInvokable(Environment environment) { super(environment); } @@ -450,14 +454,19 @@ public class WebFrontendITCase extends TestLogger { @Override public void invoke() throws Exception { latch.countDown(); - while (isRunning) { - Thread.sleep(100); + try { + while (isRunning) { + Thread.sleep(100); + } + } finally { + terminationFuture.complete(null); } } @Override - public void cancel() { + public Future<Void> cancel() { this.isRunning = false; + return terminationFuture; } public static void reset() { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativeTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativeTask.java index a6d6e2d..b4c1ea3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativeTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativeTask.java @@ -62,6 +62,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.Serializable; import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.Future; /** The abstract base class for all tasks able to participate in an iteration. */ @@ -88,6 +89,8 @@ public abstract class AbstractIterativeTask<S extends Function, OT> extends Batc private volatile boolean terminationRequested; + private final CompletableFuture<Void> terminationCompletionFuture = new CompletableFuture<>(); + // -------------------------------------------------------------------------------------------- /** @@ -311,9 +314,14 @@ public abstract class AbstractIterativeTask<S extends Function, OT> extends Batc } @Override - public void cancel() throws Exception { + public void terminationCompleted() { + this.terminationCompletionFuture.complete(null); + } + + @Override + public Future<Void> cancel() throws Exception { requestTermination(); - super.cancel(); + return this.terminationCompletionFuture; } // ----------------------------------------------------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadTask.java index 2309630..2655154 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadTask.java @@ -446,6 +446,7 @@ public class IterationHeadTask<X, Y, S extends Function, OT> extends AbstractIte if (solutionSet != null) { solutionSet.close(); } + terminationCompleted(); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediateTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediateTask.java index 2de3067..65b27c5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediateTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediateTask.java @@ -101,45 +101,50 @@ public class IterationIntermediateTask<S extends Function, OT> @Override public void run() throws Exception { - SuperstepKickoffLatch nextSuperstepLatch = - SuperstepKickoffLatchBroker.instance().get(brokerKey()); + try { + SuperstepKickoffLatch nextSuperstepLatch = + SuperstepKickoffLatchBroker.instance().get(brokerKey()); - while (this.running && !terminationRequested()) { + while (this.running && !terminationRequested()) { - if (log.isInfoEnabled()) { - log.info(formatLogString("starting iteration [" + currentIteration() + "]")); - } + if (log.isInfoEnabled()) { + log.info(formatLogString("starting iteration [" + currentIteration() + "]")); + } - super.run(); + super.run(); - // check if termination was requested - verifyEndOfSuperstepState(); + // check if termination was requested + verifyEndOfSuperstepState(); - if (isWorksetUpdate && isWorksetIteration) { - long numCollected = worksetUpdateOutputCollector.getElementsCollectedAndReset(); - worksetAggregator.aggregate(numCollected); - } + if (isWorksetUpdate && isWorksetIteration) { + long numCollected = worksetUpdateOutputCollector.getElementsCollectedAndReset(); + worksetAggregator.aggregate(numCollected); + } - if (log.isInfoEnabled()) { - log.info(formatLogString("finishing iteration [" + currentIteration() + "]")); - } + if (log.isInfoEnabled()) { + log.info(formatLogString("finishing iteration [" + currentIteration() + "]")); + } - // let the successors know that the end of this superstep data is reached - sendEndOfSuperstep(); + // let the successors know that the end of this superstep data is reached + sendEndOfSuperstep(); - if (isWorksetUpdate) { - // notify iteration head if responsible for workset update - worksetBackChannel.notifyOfEndOfSuperstep(); - } + if (isWorksetUpdate) { + // notify iteration head if responsible for workset update + worksetBackChannel.notifyOfEndOfSuperstep(); + } - boolean terminated = - nextSuperstepLatch.awaitStartOfSuperstepOrTermination(currentIteration() + 1); + boolean terminated = + nextSuperstepLatch.awaitStartOfSuperstepOrTermination( + currentIteration() + 1); - if (terminated) { - requestTermination(); - } else { - incrementIterationCounter(); + if (terminated) { + requestTermination(); + } else { + incrementIterationCounter(); + } } + } finally { + terminationCompleted(); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationSynchronizationSinkTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationSynchronizationSinkTask.java index bf49c7d..8a2903a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationSynchronizationSinkTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationSynchronizationSinkTask.java @@ -40,6 +40,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicBoolean; /** @@ -73,6 +74,8 @@ public class IterationSynchronizationSinkTask extends AbstractInvokable implemen private final AtomicBoolean terminated = new AtomicBoolean(false); + private final CompletableFuture<Void> terminationCompletionFuture = new CompletableFuture<>(); + // -------------------------------------------------------------------------------------------- /** @@ -175,6 +178,7 @@ public class IterationSynchronizationSinkTask extends AbstractInvokable implemen currentIteration++; } } + terminationCompleted(); } private boolean checkForConvergence() { @@ -275,4 +279,9 @@ public class IterationSynchronizationSinkTask extends AbstractInvokable implemen public void requestTermination() { terminated.set(true); } + + @Override + public void terminationCompleted() { + terminationCompletionFuture.complete(null); + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationTailTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationTailTask.java index fe9cff6..a6aec7e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationTailTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationTailTask.java @@ -115,45 +115,50 @@ public class IterationTailTask<S extends Function, OT> extends AbstractIterative @Override public void run() throws Exception { - SuperstepKickoffLatch nextSuperStepLatch = - SuperstepKickoffLatchBroker.instance().get(brokerKey()); - - while (this.running && !terminationRequested()) { - - if (log.isInfoEnabled()) { - log.info(formatLogString("starting iteration [" + currentIteration() + "]")); - } - - super.run(); - - // check if termination was requested - verifyEndOfSuperstepState(); - - if (isWorksetUpdate && isWorksetIteration) { - // aggregate workset update element count - long numCollected = worksetUpdateOutputCollector.getElementsCollectedAndReset(); - worksetAggregator.aggregate(numCollected); - } - - if (log.isInfoEnabled()) { - log.info(formatLogString("finishing iteration [" + currentIteration() + "]")); - } - - if (isWorksetUpdate) { - // notify iteration head if responsible for workset update - worksetBackChannel.notifyOfEndOfSuperstep(); - } else if (isSolutionSetUpdate) { - // notify iteration head if responsible for solution set update - solutionSetUpdateBarrier.notifySolutionSetUpdate(); - } - - boolean terminate = - nextSuperStepLatch.awaitStartOfSuperstepOrTermination(currentIteration() + 1); - if (terminate) { - requestTermination(); - } else { - incrementIterationCounter(); + try { + SuperstepKickoffLatch nextSuperStepLatch = + SuperstepKickoffLatchBroker.instance().get(brokerKey()); + + while (this.running && !terminationRequested()) { + + if (log.isInfoEnabled()) { + log.info(formatLogString("starting iteration [" + currentIteration() + "]")); + } + + super.run(); + + // check if termination was requested + verifyEndOfSuperstepState(); + + if (isWorksetUpdate && isWorksetIteration) { + // aggregate workset update element count + long numCollected = worksetUpdateOutputCollector.getElementsCollectedAndReset(); + worksetAggregator.aggregate(numCollected); + } + + if (log.isInfoEnabled()) { + log.info(formatLogString("finishing iteration [" + currentIteration() + "]")); + } + + if (isWorksetUpdate) { + // notify iteration head if responsible for workset update + worksetBackChannel.notifyOfEndOfSuperstep(); + } else if (isSolutionSetUpdate) { + // notify iteration head if responsible for solution set update + solutionSetUpdateBarrier.notifySolutionSetUpdate(); + } + + boolean terminate = + nextSuperStepLatch.awaitStartOfSuperstepOrTermination( + currentIteration() + 1); + if (terminate) { + requestTermination(); + } else { + incrementIterationCounter(); + } } + } finally { + terminationCompleted(); } } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/Terminable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/Terminable.java index ee06eb9..1293682 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/Terminable.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/Terminable.java @@ -26,4 +26,6 @@ public interface Terminable { boolean terminationRequested(); void requestTermination(); + + void terminationCompleted(); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java index 6651523..453bd85 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java @@ -32,6 +32,7 @@ import org.apache.flink.util.FlinkException; import org.apache.flink.util.SerializedValue; import java.io.IOException; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.Future; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -104,9 +105,12 @@ public abstract class AbstractInvokable { * execution failure. It can be overwritten to respond to shut down the user code properly. * * @throws Exception thrown if any exception occurs during the execution of the user code + * @return a future that is completed when this {@link AbstractInvokable} is fully terminated. + * Note that it may never complete if the invokable is stuck. */ - public void cancel() throws Exception { + public Future<Void> cancel() throws Exception { // The default implementation does nothing. + return CompletableFuture.completedFuture(null); } /** diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java index a282ec1d..85c5e9e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java @@ -74,6 +74,8 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Future; import static java.util.Collections.emptyList; @@ -186,6 +188,7 @@ public class BatchTask<S extends Function, OT> extends AbstractInvokable protected Map<String, Accumulator<?, ?>> accumulatorMap; private OperatorMetricGroup metrics; + private final CompletableFuture<Void> terminationFuture = new CompletableFuture<>(); // -------------------------------------------------------------------------------------------- // Constructor @@ -361,6 +364,7 @@ public class BatchTask<S extends Function, OT> extends AbstractInvokable clearReaders(inputReaders); clearWriters(eventualOutputs); + terminationFuture.complete(null); } if (this.running) { @@ -375,7 +379,7 @@ public class BatchTask<S extends Function, OT> extends AbstractInvokable } @Override - public void cancel() throws Exception { + public Future<Void> cancel() throws Exception { this.running = false; if (LOG.isDebugEnabled()) { @@ -389,6 +393,7 @@ public class BatchTask<S extends Function, OT> extends AbstractInvokable } finally { closeLocalStrategiesAndCaches(); } + return terminationFuture; } // -------------------------------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java index 9f3a427..15d9ca8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java @@ -54,6 +54,9 @@ import org.apache.commons.lang3.tuple.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Future; + /** * DataSinkTask which is executed by a task manager. The task hands the data to an output format. * @@ -88,6 +91,8 @@ public class DataSinkTask<IT> extends AbstractInvokable { private volatile boolean cleanupCalled; + private final CompletableFuture<Void> terminationFuture = new CompletableFuture<>(); + /** * Create an Invokable task and set its environment. * @@ -289,6 +294,7 @@ public class DataSinkTask<IT> extends AbstractInvokable { } BatchTask.clearReaders(new MutableReader<?>[] {inputReader}); + terminationFuture.complete(null); } if (!this.taskCanceled) { @@ -299,7 +305,7 @@ public class DataSinkTask<IT> extends AbstractInvokable { } @Override - public void cancel() throws Exception { + public Future<Void> cancel() throws Exception { this.taskCanceled = true; OutputFormat<IT> format = this.format; if (format != null) { @@ -320,6 +326,7 @@ public class DataSinkTask<IT> extends AbstractInvokable { } LOG.debug(getLogString("Cancelling data sink operator")); + return terminationFuture; } /** diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java index eb05e73..41ecc15 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java @@ -54,6 +54,8 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.NoSuchElementException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Future; /** * DataSourceTask which is executed by a task manager. The task reads data and uses an {@link @@ -85,6 +87,8 @@ public class DataSourceTask<OT> extends AbstractInvokable { // cancel flag private volatile boolean taskCanceled = false; + private final CompletableFuture<Void> terminationFuture = new CompletableFuture<>(); + /** * Create an Invokable task and set its environment. * @@ -251,6 +255,7 @@ public class DataSourceTask<OT> extends AbstractInvokable { ((RichInputFormat) this.format).closeInputFormat(); LOG.debug(getLogString("Rich Source detected. Closing the InputFormat.")); } + terminationFuture.complete(null); } if (!this.taskCanceled) { @@ -261,9 +266,10 @@ public class DataSourceTask<OT> extends AbstractInvokable { } @Override - public void cancel() throws Exception { + public Future<Void> cancel() throws Exception { this.taskCanceled = true; LOG.debug(getLogString("Cancelling data source operator")); + return terminationFuture; } /** diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java index 8056ca4..ce22e9e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java @@ -102,9 +102,12 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.function.Consumer; @@ -1173,6 +1176,10 @@ public class Task new TaskCanceler( LOG, this::closeNetworkResources, + taskCancellationTimeout > 0 + ? taskCancellationTimeout + : TaskManagerOptions.TASK_CANCELLATION_TIMEOUT + .defaultValue(), invokable, executingThread, taskNameWithSubtask); @@ -1550,6 +1557,9 @@ public class Task private final Logger logger; private final Runnable networkResourcesCloser; + /** Time to wait after cancellation and interruption before releasing network resources. */ + private final long taskCancellationTimeout; + private final AbstractInvokable invokable; private final Thread executer; private final String taskName; @@ -1557,11 +1567,13 @@ public class Task TaskCanceler( Logger logger, Runnable networkResourcesCloser, + long taskCancellationTimeout, AbstractInvokable invokable, Thread executer, String taskName) { this.logger = logger; this.networkResourcesCloser = networkResourcesCloser; + this.taskCancellationTimeout = taskCancellationTimeout; this.invokable = invokable; this.executer = executer; this.taskName = taskName; @@ -1573,7 +1585,17 @@ public class Task // the user-defined cancel method may throw errors. // we need do continue despite that try { - invokable.cancel(); + Future<Void> cancellationFuture = invokable.cancel(); + // Wait for any active actions to complete (e.g. timers, mailbox actions) + // Before that, interrupt to notify them about cancellation + if (invokable.shouldInterruptOnCancel()) { + executer.interrupt(); + } + try { + cancellationFuture.get(taskCancellationTimeout, TimeUnit.MILLISECONDS); + } catch (ExecutionException | TimeoutException | InterruptedException e) { + logger.debug("Error while waiting the task to terminate {}.", taskName, e); + } } catch (Throwable t) { ExceptionUtils.rethrowIfFatalError(t); logger.error("Error while canceling the task {}.", taskName, t); @@ -1583,15 +1605,8 @@ public class Task // in order to unblock async Threads, which produce/consume the // intermediate streams outside of the main Task Thread (like // the Kafka consumer). - // - // Don't do this before cancelling the invokable. Otherwise we - // will get misleading errors in the logs. networkResourcesCloser.run(); - // send the initial interruption signal, if requested - if (invokable.shouldInterruptOnCancel()) { - executer.interrupt(); - } } catch (Throwable t) { ExceptionUtils.rethrowIfFatalError(t); logger.error("Error in the task canceler for task {}.", taskName, t); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingAbstractInvokables.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingAbstractInvokables.java index db8de8a..44b6084 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingAbstractInvokables.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingAbstractInvokables.java @@ -26,6 +26,7 @@ import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.types.IntValue; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Future; /** {@link AbstractInvokable} for testing purposes. */ public class TestingAbstractInvokables { @@ -106,8 +107,9 @@ public class TestingAbstractInvokables { } @Override - public void cancel() { + public Future<Void> cancel() { gotCanceledFuture.complete(true); + return CompletableFuture.completedFuture(null); } public static void resetGotCanceledFuture() { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java index 2337115..196b546 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java @@ -586,8 +586,9 @@ public class CoordinatorEventsExactlyOnceITCase extends TestLogger { } @Override - public void cancel() throws Exception { + public Future<Void> cancel() throws Exception { running = false; + return CompletableFuture.completedFuture(null); } @Override diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorOperatorEventHandlingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorOperatorEventHandlingTest.java index f84d513..f5c1029 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorOperatorEventHandlingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorOperatorEventHandlingTest.java @@ -197,7 +197,7 @@ public class TaskExecutorOperatorEventHandlingTest extends TestLogger { } @Override - public void invoke() throws InterruptedException { + public void doInvoke() throws InterruptedException { waitUntilCancelled(); } @@ -216,7 +216,7 @@ public class TaskExecutorOperatorEventHandlingTest extends TestLogger { } @Override - public void invoke() throws Exception { + public void doInvoke() throws Exception { getEnvironment() .getOperatorCoordinatorEventGateway() .sendOperatorEventToCoordinator( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java index d812508..d7eecb0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java @@ -63,6 +63,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; @@ -1144,8 +1145,9 @@ public class TaskTest extends TestLogger { public void invoke() {} @Override - public void cancel() { + public Future<Void> cancel() { fail("This should not be called"); + return null; } } @@ -1191,7 +1193,9 @@ public class TaskTest extends TestLogger { } @Override - public void cancel() {} + public Future<Void> cancel() { + return CompletableFuture.completedFuture(null); + } } private static final class InvokableBlockingWithTrigger extends AbstractInvokable { @@ -1313,11 +1317,12 @@ public class TaskTest extends TestLogger { } @Override - public void cancel() throws Exception { + public Future<Void> cancel() throws Exception { synchronized (this) { triggerLatch.trigger(); wait(); } + return CompletableFuture.completedFuture(null); } } @@ -1339,11 +1344,12 @@ public class TaskTest extends TestLogger { } @Override - public void cancel() { + public Future<Void> cancel() { synchronized (lock) { // do nothing but a placeholder triggerLatch.trigger(); } + return CompletableFuture.completedFuture(null); } } @@ -1367,7 +1373,9 @@ public class TaskTest extends TestLogger { } @Override - public void cancel() {} + public Future<Void> cancel() { + return CompletableFuture.completedFuture(null); + } } // ------------------------------------------------------------------------ diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testtasks/OnceBlockingNoOpInvokable.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testtasks/OnceBlockingNoOpInvokable.java index c053104..fee64ee 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/testtasks/OnceBlockingNoOpInvokable.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testtasks/OnceBlockingNoOpInvokable.java @@ -21,7 +21,9 @@ package org.apache.flink.runtime.testtasks; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicInteger; /** @@ -62,11 +64,12 @@ public class OnceBlockingNoOpInvokable extends AbstractInvokable { } @Override - public void cancel() throws Exception { + public Future<Void> cancel() throws Exception { synchronized (lock) { running = false; lock.notifyAll(); } + return CompletableFuture.completedFuture(null); } public static void waitUntilOpsAreRunning() throws InterruptedException { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CancelableInvokable.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CancelableInvokable.java index bca5fc1..b4ac0a1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CancelableInvokable.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CancelableInvokable.java @@ -21,6 +21,9 @@ package org.apache.flink.runtime.testutils; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Future; + /** * An {@link AbstractInvokable} that blocks at some point until cancelled. * @@ -31,13 +34,29 @@ public abstract class CancelableInvokable extends AbstractInvokable { private volatile boolean canceled; + private final CompletableFuture<Void> terminationFuture = new CompletableFuture<>(); + protected CancelableInvokable(Environment environment) { super(environment); } @Override - public void cancel() { + public void invoke() throws Exception { + try { + doInvoke(); + terminationFuture.complete(null); + } catch (Exception e) { + terminationFuture.completeExceptionally(e); + throw e; + } + } + + protected abstract void doInvoke() throws Exception; + + @Override + public Future<Void> cancel() { canceled = true; + return terminationFuture; } protected void waitUntilCancelled() throws InterruptedException { diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala index 84f166b..56f0c5e 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala @@ -34,7 +34,7 @@ object Tasks { getEnvironment.getInputGate(0), classOf[IntValue], getEnvironment.getTaskManagerInfo.getTmpDirectories) - + val writer = new RecordWriterBuilder[IntValue]().build( getEnvironment.getWriter(0)) @@ -77,7 +77,7 @@ object Tasks { getEnvironment.getInputGate(0), classOf[IntValue], getEnvironment.getTaskManagerInfo.getTmpDirectories) - + val reader2 = new RecordReader[IntValue]( getEnvironment.getInputGate(1), classOf[IntValue], @@ -98,12 +98,12 @@ object Tasks { env.getInputGate(0), classOf[IntValue], getEnvironment.getTaskManagerInfo.getTmpDirectories) - + val reader2 = new RecordReader[IntValue]( env.getInputGate(1), classOf[IntValue], getEnvironment.getTaskManagerInfo.getTmpDirectories) - + val reader3 = new RecordReader[IntValue]( env.getInputGate(2), classOf[IntValue], diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index fedd2cb..9998743 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -250,6 +250,8 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> extends Ab private long latestAsyncCheckpointStartDelayNanos; + private final CompletableFuture<Void> terminationFuture = new CompletableFuture<>(); + // ------------------------------------------------------------------------ /** @@ -759,7 +761,10 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> extends Ab suppressedException = runAndSuppressThrowable(mailboxProcessor::close, suppressedException); - if (suppressedException != null) { + if (suppressedException == null) { + terminationFuture.complete(null); + } else { + terminationFuture.completeExceptionally(suppressedException); throw suppressedException; } } @@ -769,7 +774,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> extends Ab } @Override - public final void cancel() throws Exception { + public final Future<Void> cancel() throws Exception { isRunning = false; canceled = true; @@ -793,6 +798,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> extends Ab } }); } + return terminationFuture; } public MailboxExecutorFactory getMailboxExecutorFactory() { diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java index 9c46bfe..835151c 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java @@ -41,6 +41,7 @@ import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter; import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.concurrent.TestingUncaughtExceptionHandler; +import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; import org.apache.flink.runtime.execution.CancelTaskException; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.execution.ExecutionState; @@ -59,6 +60,7 @@ import org.apache.flink.runtime.operators.testutils.ExpectedTestException; import org.apache.flink.runtime.operators.testutils.MockEnvironment; import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder; import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; +import org.apache.flink.runtime.shuffle.PartitionDescriptorBuilder; import org.apache.flink.runtime.shuffle.ShuffleEnvironment; import org.apache.flink.runtime.state.AbstractKeyedStateBackend; import org.apache.flink.runtime.state.AbstractStateBackend; @@ -90,6 +92,7 @@ import org.apache.flink.runtime.taskmanager.TaskExecutionState; import org.apache.flink.runtime.taskmanager.TaskManagerActions; import org.apache.flink.runtime.taskmanager.TestTaskBuilder; import org.apache.flink.runtime.util.FatalExitExceptionHandler; +import org.apache.flink.runtime.util.NettyShuffleDescriptorBuilder; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; @@ -139,6 +142,7 @@ import java.io.Closeable; import java.io.IOException; import java.io.ObjectInputStream; import java.io.StreamCorruptedException; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -157,6 +161,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import static java.util.Arrays.asList; +import static java.util.Collections.singletonList; import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.STRING_TYPE_INFO; import static org.apache.flink.configuration.StateBackendOptions.STATE_BACKEND; import static org.apache.flink.runtime.checkpoint.CheckpointFailureReason.UNKNOWN_TASK_CHECKPOINT_NOTIFICATION_FAILURE; @@ -193,6 +198,31 @@ public class StreamTaskTest extends TestLogger { @Rule public final Timeout timeoutPerTest = Timeout.seconds(30); @Test + public void testCancellationWaitsForActiveTimers() throws Exception { + StreamTaskWithBlockingTimer.reset(); + ResultPartitionDeploymentDescriptor descriptor = + new ResultPartitionDeploymentDescriptor( + PartitionDescriptorBuilder.newBuilder().build(), + NettyShuffleDescriptorBuilder.newBuilder().buildLocal(), + 1, + false); + Task task = + new TestTaskBuilder(new NettyShuffleEnvironmentBuilder().build()) + .setInvokable(StreamTaskWithBlockingTimer.class) + .setResultPartitions(singletonList(descriptor)) + .build(); + task.startTaskThread(); + + StreamTaskWithBlockingTimer.timerStarted.join(); + task.cancelExecution(); + + task.getTerminationFuture().join(); + // explicitly check for exceptions as they are ignored after cancellation + StreamTaskWithBlockingTimer.timerFinished.join(); + checkState(task.getExecutionState() == ExecutionState.CANCELED); + } + + @Test public void testSavepointSuspendCompleted() throws Exception { testSyncSavepointWithEndInput( StreamTask::notifyCheckpointCompleteAsync, CheckpointType.SAVEPOINT_SUSPEND, false); @@ -2605,4 +2635,97 @@ public class StreamTaskTest extends TestLogger { @Override public void processElement(StreamRecord<T> element) throws Exception {} } + + /** + * A {@link StreamTask} that register a single timer that waits for a cancellation and then + * emits some data. The assumption is that output remains available until the future returned + * from {@link AbstractInvokable#cancel()} is completed. Public * access to allow reflection in + * {@link Task}. + */ + public static class StreamTaskWithBlockingTimer extends StreamTask { + static volatile CompletableFuture<Void> timerStarted; + static volatile CompletableFuture<Void> timerFinished; + static volatile CompletableFuture<Void> invokableCancelled; + + public static void reset() { + timerStarted = new CompletableFuture<>(); + timerFinished = new CompletableFuture<>(); + invokableCancelled = new CompletableFuture<>(); + } + + // public access to allow reflection in Task + public StreamTaskWithBlockingTimer(Environment env) throws Exception { + super(env); + super.inputProcessor = getInputProcessor(); + getProcessingTimeServiceFactory() + .createProcessingTimeService(mainMailboxExecutor) + .registerTimer(0, unused -> onProcessingTime()); + } + + @Override + protected void cancelTask() throws Exception { + super.cancelTask(); + invokableCancelled.complete(null); + } + + private void onProcessingTime() { + try { + timerStarted.complete(null); + waitForCancellation(); + emit(); + timerFinished.complete(null); + } catch (Throwable e) { // assertion is Error + timerFinished.completeExceptionally(e); + } + } + + private void waitForCancellation() { + invokableCancelled.join(); + // allow network resources to be closed mistakenly + for (int i = 0; i < 10; i++) { + try { + Thread.sleep(50); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + // ignore: can be interrupted by TaskCanceller/Interrupter + } + } + } + + private void emit() throws IOException { + checkState(getEnvironment().getAllWriters().length > 0); + for (ResultPartitionWriter writer : getEnvironment().getAllWriters()) { + assertFalse(writer.isReleased()); + assertFalse(writer.isFinished()); + writer.emitRecord(ByteBuffer.allocate(10), 0); + } + } + + @Override + protected void init() {} + + private static StreamInputProcessor getInputProcessor() { + return new StreamInputProcessor() { + + @Override + public InputStatus processInput() { + return InputStatus.NOTHING_AVAILABLE; + } + + @Override + public CompletableFuture<Void> prepareSnapshot( + ChannelStateWriter channelStateWriter, long checkpointId) { + return CompletableFuture.completedFuture(null); + } + + @Override + public CompletableFuture<?> getAvailableFuture() { + return new CompletableFuture<>(); + } + + @Override + public void close() {} + }; + } + } }