[ 
https://issues.apache.org/jira/browse/FLINK-15467?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17197200#comment-17197200
 ] 

Arvid Heise commented on FLINK-15467:
-------------------------------------

Merged to release-1.10 a20fb4a796d8df91e4ebbcdc0b35cdd75145c574.

> Should wait for the end of the source thread during the Task cancellation
> -------------------------------------------------------------------------
>
>                 Key: FLINK-15467
>                 URL: https://issues.apache.org/jira/browse/FLINK-15467
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Task
>    Affects Versions: 1.9.0, 1.9.1, 1.10.1
>            Reporter: ming li
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.12.0, 1.11.3
>
>
>      In the new mailBox model, SourceStreamTask starts a source thread to run 
> user methods, and the current execution thread will block on mailbox.takeMail 
> (). When a task cancels, the TaskCanceler thread will cancel the task and 
> interrupt the execution thread. Therefore, the execution thread of 
> SourceStreamTask will throw InterruptedException, then cancel the task again, 
> and throw an exception.
> {code:java}
> //代码占位符
> @Override
> protected void performDefaultAction(ActionContext context) throws Exception {
>    // Against the usual contract of this method, this implementation is not 
> step-wise but blocking instead for
>    // compatibility reasons with the current source interface (source 
> functions run as a loop, not in steps).
>    sourceThread.start();
>    // We run an alternative mailbox loop that does not involve default 
> actions and synchronizes around actions.
>    try {
>       runAlternativeMailboxLoop();
>    } catch (Exception mailboxEx) {
>       // We cancel the source function if some runtime exception escaped the 
> mailbox.
>       if (!isCanceled()) {
>          cancelTask();
>       }
>       throw mailboxEx;
>    }
>    sourceThread.join();
>    if (!isFinished) {
>       sourceThread.checkThrowSourceExecutionException();
>    }
>    context.allActionsCompleted();
> }
> {code}
>    When all tasks of this TaskExecutor are canceled, the blob file will be 
> cleaned up. But the real source thread is not finished at this time, which 
> will cause a ClassNotFoundException when loading a new class. In this case, 
> the source thread may not be able to properly clean up and release resources 
> (such as closing child threads, cleaning up local files, etc.). Therefore, I 
> think we should mark this task canceled or finished after the execution of 
> the source thread is completed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to