[
https://issues.apache.org/jira/browse/FLINK-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15585332#comment-15585332
]
ASF GitHub Bot commented on FLINK-4715:
---------------------------------------
Github user uce commented on a diff in the pull request:
https://github.com/apache/flink/pull/2652#discussion_r83841451
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ---
@@ -1251,33 +1299,124 @@ public void run() {
catch (InterruptedException e) {
// we can ignore this
}
+ }
+ catch (Throwable t) {
+ logger.error("Error in the task canceler", t);
+ }
+ }
+ }
+
+ /**
+ * Watchdog for the cancellation. If the task is stuck in cancellation,
+ * we notify the task manager about a fatal error.
+ */
+ private static class TaskCancellationWatchDog extends TimerTask {
+
+ /**
+ * Pass logger in order to prevent that the compiler needs to
inject static bridge methods
+ * to access it.
+ */
+ private final Logger logger;
+
+ /** Thread executing the Task. */
+ private final Thread executor;
+
+ /** Interrupt interval. */
+ private final long interruptInterval;
+
+ /** Timeout after which a fatal error notification happens. */
+ private final long interruptTimeout;
+
+ /** TaskManager to notify about a timeout */
+ private final TaskManagerConnection taskManager;
+
+ /** Task name (for logging and error messages). */
+ private final String taskName;
+
+ /** Synchronization with the {@link TaskCanceler} thread. */
+ private final CountDownLatch taskCancellerLatch;
+
+ public TaskCancellationWatchDog(
+ Logger logger,
+ Thread executor,
+ long interruptInterval,
+ long interruptTimeout,
+ TaskManagerConnection taskManager,
+ String taskName,
+ CountDownLatch taskCancellerLatch) {
+
+ this.logger = checkNotNull(logger);
+ this.executor = checkNotNull(executor);
+ this.interruptInterval =
checkNotNull(interruptInterval);
+ this.interruptTimeout = checkNotNull(interruptTimeout);
+ this.taskManager = checkNotNull(taskManager);
+ this.taskName = checkNotNull(taskName);
+ this.taskCancellerLatch =
checkNotNull(taskCancellerLatch);
+ }
+
+ @Override
+ public void run() {
+ try {
+ // Synchronize with task canceler
+ taskCancellerLatch.await();
+ } catch (Exception e) {
+ String msg = String.format("Exception while
waiting on task " +
+ "canceller to cancel task
'%s'.", taskName);
+ taskManager.notifyFatalError(msg, e);
+ return;
+ }
+
+ long intervalNanos =
TimeUnit.NANOSECONDS.convert(interruptInterval, TimeUnit.MILLISECONDS);
+ long timeoutNanos =
TimeUnit.NANOSECONDS.convert(interruptTimeout, TimeUnit.MILLISECONDS);
+ long deadline = System.nanoTime() + timeoutNanos;
+
+ try {
+ // Initial wait before interrupting periodically
+ Thread.sleep(interruptInterval);
+ } catch (InterruptedException ignored) {
+ }
+
+ // It is possible that the user code does not react to
the task canceller.
+ // for that reason, we spawn this separate thread that
repeatedly interrupts
+ // the user code until it exits. If the suer user code
does not exit within
+ // the timeout, we notify the job manager about a fatal
error.
+ while (executor.isAlive()) {
+ long now = System.nanoTime();
+
+ // build the stack trace of where the thread is
stuck, for the log
+ StringBuilder bld = new StringBuilder();
+ StackTraceElement[] stack =
executor.getStackTrace();
+ for (StackTraceElement e : stack) {
+ bld.append(e).append('\n');
+ }
- // it is possible that the user code does not
react immediately. for that
- // reason, we spawn a separate thread that
repeatedly interrupts the user code until
- // it exits
- while (executer.isAlive()) {
- // build the stack trace of where the
thread is stuck, for the log
- StringBuilder bld = new StringBuilder();
- StackTraceElement[] stack =
executer.getStackTrace();
- for (StackTraceElement e : stack) {
- bld.append(e).append('\n');
- }
+ if (now >= deadline) {
+ long duration =
TimeUnit.SECONDS.convert(interruptInterval, TimeUnit.MILLISECONDS);
+ String msg = String.format("Task '%s'
did not react to cancelling signal in " +
+ "the last %d seconds,
but is stuck in method:\n %s",
+ taskName,
+ duration,
+ bld.toString());
+
+ taskManager.notifyFatalError(msg, null);
+ return; // done, don't forget to leave
the loop
+ } else {
logger.warn("Task '{}' did not react to
cancelling signal, but is stuck in method:\n {}",
taskName,
bld.toString());
- executer.interrupt();
+ executor.interrupt();
try {
-
executer.join(taskCancellationIntervalMillis);
- }
- catch (InterruptedException e) {
- // we can ignore this
+ long timeLeftNanos =
Math.min(intervalNanos, deadline - now - intervalNanos);
--- End diff --
No, your suggestion is correct.
> TaskManager should commit suicide after cancellation failure
> ------------------------------------------------------------
>
> Key: FLINK-4715
> URL: https://issues.apache.org/jira/browse/FLINK-4715
> Project: Flink
> Issue Type: Improvement
> Components: TaskManager
> Affects Versions: 1.2.0
> Reporter: Till Rohrmann
> Assignee: Ufuk Celebi
> Fix For: 1.2.0
>
>
> In case of a failed cancellation, e.g. the task cannot be cancelled after a
> given time, the {{TaskManager}} should kill itself. That way we guarantee
> that there is no resource leak.
> This behaviour acts as a safety-net against faulty user code.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)