[ https://issues.apache.org/jira/browse/FLINK-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15584855#comment-15584855 ]
ASF GitHub Bot commented on FLINK-4715: --------------------------------------- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2652#discussion_r83800845 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java --- @@ -1251,33 +1289,113 @@ public void run() { catch (InterruptedException e) { // we can ignore this } + } + catch (Throwable t) { + LOG.error("Error in the task canceler", t); + } - // it is possible that the user code does not react immediately. for that - // reason, we spawn a separate thread that repeatedly interrupts the user code until - // it exits - while (executer.isAlive()) { - // build the stack trace of where the thread is stuck, for the log - StringBuilder bld = new StringBuilder(); - StackTraceElement[] stack = executer.getStackTrace(); - for (StackTraceElement e : stack) { - bld.append(e).append('\n'); - } + System.out.println("Canceler done"); + } + } + + /** + * Watchdog for the cancellation. If the task is stuck in cancellation, + * we notify the task manager about a fatal error. + */ + private static class TaskCancellationWatchDog extends TimerTask { + + /** Thread executing the Task. */ + private final Thread executor; + + /** Interrupt interval. */ + private final long interruptInterval; + + /** Timeout after which a fatal error notification happens. */ + private final long interruptTimeout; + + /** TaskManager to notify about a timeout */ + private final TaskManagerConnection taskManager; - logger.warn("Task '{}' did not react to cancelling signal, but is stuck in method:\n {}", + /** Task name (for logging and error messages). */ + private final String taskName; + + /** Synchronization with the {@link TaskCanceler} thread. */ + private final CountDownLatch taskCancellerLatch; + + public TaskCancellationWatchDog( + Thread executor, + long interruptInterval, + long interruptTimeout, + TaskManagerConnection taskManager, + String taskName, + CountDownLatch taskCancellerLatch) { + + this.executor = checkNotNull(executor); + this.interruptInterval = checkNotNull(interruptInterval); + this.interruptTimeout = checkNotNull(interruptTimeout); + this.taskManager = checkNotNull(taskManager); + this.taskName = checkNotNull(taskName); + this.taskCancellerLatch = checkNotNull(taskCancellerLatch); + } + + @Override + public void run() { + try { + // Synchronize with task canceler + if (!taskCancellerLatch.await(interruptTimeout, TimeUnit.MILLISECONDS)) { + return; // Did not return + } + } catch (InterruptedException e) { + return; + } + + long deadline = System.currentTimeMillis() + interruptTimeout; --- End diff -- Using `System.nanoTime()` is more stable than `System.currentTimeMillis()`. Would be good to use, especially if we are dealing with timeouts that want to kill the process. > TaskManager should commit suicide after cancellation failure > ------------------------------------------------------------ > > Key: FLINK-4715 > URL: https://issues.apache.org/jira/browse/FLINK-4715 > Project: Flink > Issue Type: Improvement > Components: TaskManager > Affects Versions: 1.2.0 > Reporter: Till Rohrmann > Assignee: Ufuk Celebi > Fix For: 1.2.0 > > > In case of a failed cancellation, e.g. the task cannot be cancelled after a > given time, the {{TaskManager}} should kill itself. That way we guarantee > that there is no resource leak. > This behaviour acts as a safety-net against faulty user code. -- This message was sent by Atlassian JIRA (v6.3.4#6332)