[jira] [Commented] (FLINK-15467) Should wait for the end of the source thread during the Task cancellation

2020-09-16 Thread Jun Qin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15467?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17197216#comment-17197216
 ] 

Jun Qin commented on FLINK-15467:
-

Great! Thanks [~AHeise]

> Should wait for the end of the source thread during the Task cancellation
> -
>
> Key: FLINK-15467
> URL: https://issues.apache.org/jira/browse/FLINK-15467
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Task
>Affects Versions: 1.9.0, 1.9.1, 1.10.1
>Reporter: ming li
>Assignee: Roman Khachatryan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.12.0, 1.10.3, 1.11.3
>
>
>      In the new mailBox model, SourceStreamTask starts a source thread to run 
> user methods, and the current execution thread will block on mailbox.takeMail 
> (). When a task cancels, the TaskCanceler thread will cancel the task and 
> interrupt the execution thread. Therefore, the execution thread of 
> SourceStreamTask will throw InterruptedException, then cancel the task again, 
> and throw an exception.
> {code:java}
> //代码占位符
> @Override
> protected void performDefaultAction(ActionContext context) throws Exception {
>// Against the usual contract of this method, this implementation is not 
> step-wise but blocking instead for
>// compatibility reasons with the current source interface (source 
> functions run as a loop, not in steps).
>sourceThread.start();
>// We run an alternative mailbox loop that does not involve default 
> actions and synchronizes around actions.
>try {
>   runAlternativeMailboxLoop();
>} catch (Exception mailboxEx) {
>   // We cancel the source function if some runtime exception escaped the 
> mailbox.
>   if (!isCanceled()) {
>  cancelTask();
>   }
>   throw mailboxEx;
>}
>sourceThread.join();
>if (!isFinished) {
>   sourceThread.checkThrowSourceExecutionException();
>}
>context.allActionsCompleted();
> }
> {code}
>    When all tasks of this TaskExecutor are canceled, the blob file will be 
> cleaned up. But the real source thread is not finished at this time, which 
> will cause a ClassNotFoundException when loading a new class. In this case, 
> the source thread may not be able to properly clean up and release resources 
> (such as closing child threads, cleaning up local files, etc.). Therefore, I 
> think we should mark this task canceled or finished after the execution of 
> the source thread is completed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15467) Should wait for the end of the source thread during the Task cancellation

2020-09-16 Thread Arvid Heise (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15467?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17197200#comment-17197200
 ] 

Arvid Heise commented on FLINK-15467:
-

Merged to release-1.10 a20fb4a796d8df91e4ebbcdc0b35cdd75145c574.

> Should wait for the end of the source thread during the Task cancellation
> -
>
> Key: FLINK-15467
> URL: https://issues.apache.org/jira/browse/FLINK-15467
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Task
>Affects Versions: 1.9.0, 1.9.1, 1.10.1
>Reporter: ming li
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.12.0, 1.11.3
>
>
>      In the new mailBox model, SourceStreamTask starts a source thread to run 
> user methods, and the current execution thread will block on mailbox.takeMail 
> (). When a task cancels, the TaskCanceler thread will cancel the task and 
> interrupt the execution thread. Therefore, the execution thread of 
> SourceStreamTask will throw InterruptedException, then cancel the task again, 
> and throw an exception.
> {code:java}
> //代码占位符
> @Override
> protected void performDefaultAction(ActionContext context) throws Exception {
>// Against the usual contract of this method, this implementation is not 
> step-wise but blocking instead for
>// compatibility reasons with the current source interface (source 
> functions run as a loop, not in steps).
>sourceThread.start();
>// We run an alternative mailbox loop that does not involve default 
> actions and synchronizes around actions.
>try {
>   runAlternativeMailboxLoop();
>} catch (Exception mailboxEx) {
>   // We cancel the source function if some runtime exception escaped the 
> mailbox.
>   if (!isCanceled()) {
>  cancelTask();
>   }
>   throw mailboxEx;
>}
>sourceThread.join();
>if (!isFinished) {
>   sourceThread.checkThrowSourceExecutionException();
>}
>context.allActionsCompleted();
> }
> {code}
>    When all tasks of this TaskExecutor are canceled, the blob file will be 
> cleaned up. But the real source thread is not finished at this time, which 
> will cause a ClassNotFoundException when loading a new class. In this case, 
> the source thread may not be able to properly clean up and release resources 
> (such as closing child threads, cleaning up local files, etc.). Therefore, I 
> think we should mark this task canceled or finished after the execution of 
> the source thread is completed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15467) Should wait for the end of the source thread during the Task cancellation

2020-09-09 Thread Jun Qin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15467?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17192936#comment-17192936
 ] 

Jun Qin commented on FLINK-15467:
-

[~pnowojski] as discussed, could you please back port this fix to Flink 1.10 as 
it is impacting Flink 1.10 customers.

> Should wait for the end of the source thread during the Task cancellation
> -
>
> Key: FLINK-15467
> URL: https://issues.apache.org/jira/browse/FLINK-15467
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Task
>Affects Versions: 1.9.0, 1.9.1, 1.10.1
>Reporter: ming li
>Assignee: Roman Khachatryan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.12.0, 1.11.2
>
>
>      In the new mailBox model, SourceStreamTask starts a source thread to run 
> user methods, and the current execution thread will block on mailbox.takeMail 
> (). When a task cancels, the TaskCanceler thread will cancel the task and 
> interrupt the execution thread. Therefore, the execution thread of 
> SourceStreamTask will throw InterruptedException, then cancel the task again, 
> and throw an exception.
> {code:java}
> //代码占位符
> @Override
> protected void performDefaultAction(ActionContext context) throws Exception {
>// Against the usual contract of this method, this implementation is not 
> step-wise but blocking instead for
>// compatibility reasons with the current source interface (source 
> functions run as a loop, not in steps).
>sourceThread.start();
>// We run an alternative mailbox loop that does not involve default 
> actions and synchronizes around actions.
>try {
>   runAlternativeMailboxLoop();
>} catch (Exception mailboxEx) {
>   // We cancel the source function if some runtime exception escaped the 
> mailbox.
>   if (!isCanceled()) {
>  cancelTask();
>   }
>   throw mailboxEx;
>}
>sourceThread.join();
>if (!isFinished) {
>   sourceThread.checkThrowSourceExecutionException();
>}
>context.allActionsCompleted();
> }
> {code}
>    When all tasks of this TaskExecutor are canceled, the blob file will be 
> cleaned up. But the real source thread is not finished at this time, which 
> will cause a ClassNotFoundException when loading a new class. In this case, 
> the source thread may not be able to properly clean up and release resources 
> (such as closing child threads, cleaning up local files, etc.). Therefore, I 
> think we should mark this task canceled or finished after the execution of 
> the source thread is completed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15467) Should wait for the end of the source thread during the Task cancellation

2020-07-31 Thread Roman Khachatryan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15467?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17168532#comment-17168532
 ] 

Roman Khachatryan commented on FLINK-15467:
---

Thanks for the clarification [~Ming Li].

I think the fix covers this case too. In fact, I  checked it manually and it 
fixed the problem. But of course, a double check from your side would be highly 
appreciated.

> Should wait for the end of the source thread during the Task cancellation
> -
>
> Key: FLINK-15467
> URL: https://issues.apache.org/jira/browse/FLINK-15467
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Task
>Affects Versions: 1.9.0, 1.9.1, 1.10.1
>Reporter: ming li
>Assignee: Roman Khachatryan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.12.0, 1.11.2
>
>
>      In the new mailBox model, SourceStreamTask starts a source thread to run 
> user methods, and the current execution thread will block on mailbox.takeMail 
> (). When a task cancels, the TaskCanceler thread will cancel the task and 
> interrupt the execution thread. Therefore, the execution thread of 
> SourceStreamTask will throw InterruptedException, then cancel the task again, 
> and throw an exception.
> {code:java}
> //代码占位符
> @Override
> protected void performDefaultAction(ActionContext context) throws Exception {
>// Against the usual contract of this method, this implementation is not 
> step-wise but blocking instead for
>// compatibility reasons with the current source interface (source 
> functions run as a loop, not in steps).
>sourceThread.start();
>// We run an alternative mailbox loop that does not involve default 
> actions and synchronizes around actions.
>try {
>   runAlternativeMailboxLoop();
>} catch (Exception mailboxEx) {
>   // We cancel the source function if some runtime exception escaped the 
> mailbox.
>   if (!isCanceled()) {
>  cancelTask();
>   }
>   throw mailboxEx;
>}
>sourceThread.join();
>if (!isFinished) {
>   sourceThread.checkThrowSourceExecutionException();
>}
>context.allActionsCompleted();
> }
> {code}
>    When all tasks of this TaskExecutor are canceled, the blob file will be 
> cleaned up. But the real source thread is not finished at this time, which 
> will cause a ClassNotFoundException when loading a new class. In this case, 
> the source thread may not be able to properly clean up and release resources 
> (such as closing child threads, cleaning up local files, etc.). Therefore, I 
> think we should mark this task canceled or finished after the execution of 
> the source thread is completed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15467) Should wait for the end of the source thread during the Task cancellation

2020-07-31 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15467?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17168528#comment-17168528
 ] 

Piotr Nowojski commented on FLINK-15467:


That was also our understanding [~Ming Li]. Either way the problem should be 
gone as {{SourceStreamTask}} wouldn't finish until {{sourceThread}} is done and 
life cycle of the Task's class loader is bound to the Task's life. 

> Should wait for the end of the source thread during the Task cancellation
> -
>
> Key: FLINK-15467
> URL: https://issues.apache.org/jira/browse/FLINK-15467
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Task
>Affects Versions: 1.9.0, 1.9.1, 1.10.1
>Reporter: ming li
>Assignee: Roman Khachatryan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.12.0, 1.11.2
>
>
>      In the new mailBox model, SourceStreamTask starts a source thread to run 
> user methods, and the current execution thread will block on mailbox.takeMail 
> (). When a task cancels, the TaskCanceler thread will cancel the task and 
> interrupt the execution thread. Therefore, the execution thread of 
> SourceStreamTask will throw InterruptedException, then cancel the task again, 
> and throw an exception.
> {code:java}
> //代码占位符
> @Override
> protected void performDefaultAction(ActionContext context) throws Exception {
>// Against the usual contract of this method, this implementation is not 
> step-wise but blocking instead for
>// compatibility reasons with the current source interface (source 
> functions run as a loop, not in steps).
>sourceThread.start();
>// We run an alternative mailbox loop that does not involve default 
> actions and synchronizes around actions.
>try {
>   runAlternativeMailboxLoop();
>} catch (Exception mailboxEx) {
>   // We cancel the source function if some runtime exception escaped the 
> mailbox.
>   if (!isCanceled()) {
>  cancelTask();
>   }
>   throw mailboxEx;
>}
>sourceThread.join();
>if (!isFinished) {
>   sourceThread.checkThrowSourceExecutionException();
>}
>context.allActionsCompleted();
> }
> {code}
>    When all tasks of this TaskExecutor are canceled, the blob file will be 
> cleaned up. But the real source thread is not finished at this time, which 
> will cause a ClassNotFoundException when loading a new class. In this case, 
> the source thread may not be able to properly clean up and release resources 
> (such as closing child threads, cleaning up local files, etc.). Therefore, I 
> think we should mark this task canceled or finished after the execution of 
> the source thread is completed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15467) Should wait for the end of the source thread during the Task cancellation

2020-07-31 Thread ming li (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15467?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17168520#comment-17168520
 ] 

ming li commented on FLINK-15467:
-

Thanks for this fix. I want to correct the cause of this problem: it is not 
that the blob file is deleted, but that the userClassloader will be closed when 
the task ends. Later, when trying to use this classloader to load a class (when 
the classloader is not specified, the classloader of the caller class will be 
used), a ClassNotFoundException will be thrown.

> Should wait for the end of the source thread during the Task cancellation
> -
>
> Key: FLINK-15467
> URL: https://issues.apache.org/jira/browse/FLINK-15467
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Task
>Affects Versions: 1.9.0, 1.9.1, 1.10.1
>Reporter: ming li
>Assignee: Roman Khachatryan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.12.0, 1.11.2
>
>
>      In the new mailBox model, SourceStreamTask starts a source thread to run 
> user methods, and the current execution thread will block on mailbox.takeMail 
> (). When a task cancels, the TaskCanceler thread will cancel the task and 
> interrupt the execution thread. Therefore, the execution thread of 
> SourceStreamTask will throw InterruptedException, then cancel the task again, 
> and throw an exception.
> {code:java}
> //代码占位符
> @Override
> protected void performDefaultAction(ActionContext context) throws Exception {
>// Against the usual contract of this method, this implementation is not 
> step-wise but blocking instead for
>// compatibility reasons with the current source interface (source 
> functions run as a loop, not in steps).
>sourceThread.start();
>// We run an alternative mailbox loop that does not involve default 
> actions and synchronizes around actions.
>try {
>   runAlternativeMailboxLoop();
>} catch (Exception mailboxEx) {
>   // We cancel the source function if some runtime exception escaped the 
> mailbox.
>   if (!isCanceled()) {
>  cancelTask();
>   }
>   throw mailboxEx;
>}
>sourceThread.join();
>if (!isFinished) {
>   sourceThread.checkThrowSourceExecutionException();
>}
>context.allActionsCompleted();
> }
> {code}
>    When all tasks of this TaskExecutor are canceled, the blob file will be 
> cleaned up. But the real source thread is not finished at this time, which 
> will cause a ClassNotFoundException when loading a new class. In this case, 
> the source thread may not be able to properly clean up and release resources 
> (such as closing child threads, cleaning up local files, etc.). Therefore, I 
> think we should mark this task canceled or finished after the execution of 
> the source thread is completed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15467) Should wait for the end of the source thread during the Task cancellation

2020-06-18 Thread Roman Khachatryan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15467?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17139948#comment-17139948
 ] 

Roman Khachatryan commented on FLINK-15467:
---

Hi [~Ming Li], thanks for the clarification!

I was able to reproduce the issue and will submit a PR to fix it.

> Should wait for the end of the source thread during the Task cancellation
> -
>
> Key: FLINK-15467
> URL: https://issues.apache.org/jira/browse/FLINK-15467
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Task
>Affects Versions: 1.9.0, 1.9.1, 1.10.1
>Reporter: ming li
>Assignee: Roman Khachatryan
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.12.0
>
>
>      In the new mailBox model, SourceStreamTask starts a source thread to run 
> user methods, and the current execution thread will block on mailbox.takeMail 
> (). When a task cancels, the TaskCanceler thread will cancel the task and 
> interrupt the execution thread. Therefore, the execution thread of 
> SourceStreamTask will throw InterruptedException, then cancel the task again, 
> and throw an exception.
> {code:java}
> //代码占位符
> @Override
> protected void performDefaultAction(ActionContext context) throws Exception {
>// Against the usual contract of this method, this implementation is not 
> step-wise but blocking instead for
>// compatibility reasons with the current source interface (source 
> functions run as a loop, not in steps).
>sourceThread.start();
>// We run an alternative mailbox loop that does not involve default 
> actions and synchronizes around actions.
>try {
>   runAlternativeMailboxLoop();
>} catch (Exception mailboxEx) {
>   // We cancel the source function if some runtime exception escaped the 
> mailbox.
>   if (!isCanceled()) {
>  cancelTask();
>   }
>   throw mailboxEx;
>}
>sourceThread.join();
>if (!isFinished) {
>   sourceThread.checkThrowSourceExecutionException();
>}
>context.allActionsCompleted();
> }
> {code}
>    When all tasks of this TaskExecutor are canceled, the blob file will be 
> cleaned up. But the real source thread is not finished at this time, which 
> will cause a ClassNotFoundException when loading a new class. In this case, 
> the source thread may not be able to properly clean up and release resources 
> (such as closing child threads, cleaning up local files, etc.). Therefore, I 
> think we should mark this task canceled or finished after the execution of 
> the source thread is completed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15467) Should wait for the end of the source thread during the Task cancellation

2020-06-17 Thread ming li (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15467?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17138977#comment-17138977
 ] 

ming li commented on FLINK-15467:
-

Hi,[~roman_khachatryan],In my test, I ran my test code in a standalone cluster. 
After running, I directly clicked cancel on the webUI to trigger a cancel 
process. In the test code, simulating soucethread may take a lot of time to 
clean up, so I try to sleep for 3 seconds and then load a class that has not 
been loaded by jvm. At this time, the above exception will occur.

I have tried not to sleep and then load a new class, this problem will not 
occur, I think this is because the cleaning of BlobLibrary has not been 
completed.

I also think we should join the source thread in SourceStreamTask.cancelTask to 
fix this problem.

> Should wait for the end of the source thread during the Task cancellation
> -
>
> Key: FLINK-15467
> URL: https://issues.apache.org/jira/browse/FLINK-15467
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Task
>Affects Versions: 1.9.0, 1.9.1, 1.10.1
>Reporter: ming li
>Assignee: Roman Khachatryan
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.12.0
>
>
>      In the new mailBox model, SourceStreamTask starts a source thread to run 
> user methods, and the current execution thread will block on mailbox.takeMail 
> (). When a task cancels, the TaskCanceler thread will cancel the task and 
> interrupt the execution thread. Therefore, the execution thread of 
> SourceStreamTask will throw InterruptedException, then cancel the task again, 
> and throw an exception.
> {code:java}
> //代码占位符
> @Override
> protected void performDefaultAction(ActionContext context) throws Exception {
>// Against the usual contract of this method, this implementation is not 
> step-wise but blocking instead for
>// compatibility reasons with the current source interface (source 
> functions run as a loop, not in steps).
>sourceThread.start();
>// We run an alternative mailbox loop that does not involve default 
> actions and synchronizes around actions.
>try {
>   runAlternativeMailboxLoop();
>} catch (Exception mailboxEx) {
>   // We cancel the source function if some runtime exception escaped the 
> mailbox.
>   if (!isCanceled()) {
>  cancelTask();
>   }
>   throw mailboxEx;
>}
>sourceThread.join();
>if (!isFinished) {
>   sourceThread.checkThrowSourceExecutionException();
>}
>context.allActionsCompleted();
> }
> {code}
>    When all tasks of this TaskExecutor are canceled, the blob file will be 
> cleaned up. But the real source thread is not finished at this time, which 
> will cause a ClassNotFoundException when loading a new class. In this case, 
> the source thread may not be able to properly clean up and release resources 
> (such as closing child threads, cleaning up local files, etc.). Therefore, I 
> think we should mark this task canceled or finished after the execution of 
> the source thread is completed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15467) Should wait for the end of the source thread during the Task cancellation

2020-06-17 Thread Roman Khachatryan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15467?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17138723#comment-17138723
 ] 

Roman Khachatryan commented on FLINK-15467:
---

I tried to run the program above with some corrections (to stop the job after 
some elements and load other classes).

I  couldn't reproduce the issue: the class was loaded successfully after 
stopping all services.

 

[~Ming Li], are you running it on a cluster and then stopping via JM?

 

If yes, I think this is what I think is happening in your case:
 # Job is canceled in JM
 # JM sends RPC to TM to cancel the task
 # TM calls Task.cancelExecution which starts TaskCanceler and TaskInterrupter
 # Source thread ignores the interrupt
 # Main task thread proceeds to finally block in Task.doRun, where it calls 
cancelInvokable and notifyFinalState >>
 # >> TaskExecutor.unregisterTaskAndNotifyFinalState > taskSlotTable.removeTask 
>
 # >> TaskExecutor.closeJobManagerConnectionIfNoAllocatedResources, which called
closeJob > BlobLibraryCacheManager.release

If this is the case, then just joining the source thread inside cancelInvokable 
would solve the problem (specifically, in SourceStreamTask.cancelTask).

 

SourceStreamTask.cancelTask is called from:
 * RPC notification about checkpoint completion - fine, done asynchronously
 * TaskCanceller - can delay subsequent networkResourcesCloser.run() - which is 
probably also fine
 * Task.cancelInvokable - what we want

 

> Should wait for the end of the source thread during the Task cancellation
> -
>
> Key: FLINK-15467
> URL: https://issues.apache.org/jira/browse/FLINK-15467
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Task
>Affects Versions: 1.9.0, 1.9.1, 1.10.1
>Reporter: ming li
>Assignee: Roman Khachatryan
>Priority: Critical
> Fix For: 1.12.0
>
>
>      In the new mailBox model, SourceStreamTask starts a source thread to run 
> user methods, and the current execution thread will block on mailbox.takeMail 
> (). When a task cancels, the TaskCanceler thread will cancel the task and 
> interrupt the execution thread. Therefore, the execution thread of 
> SourceStreamTask will throw InterruptedException, then cancel the task again, 
> and throw an exception.
> {code:java}
> //代码占位符
> @Override
> protected void performDefaultAction(ActionContext context) throws Exception {
>// Against the usual contract of this method, this implementation is not 
> step-wise but blocking instead for
>// compatibility reasons with the current source interface (source 
> functions run as a loop, not in steps).
>sourceThread.start();
>// We run an alternative mailbox loop that does not involve default 
> actions and synchronizes around actions.
>try {
>   runAlternativeMailboxLoop();
>} catch (Exception mailboxEx) {
>   // We cancel the source function if some runtime exception escaped the 
> mailbox.
>   if (!isCanceled()) {
>  cancelTask();
>   }
>   throw mailboxEx;
>}
>sourceThread.join();
>if (!isFinished) {
>   sourceThread.checkThrowSourceExecutionException();
>}
>context.allActionsCompleted();
> }
> {code}
>    When all tasks of this TaskExecutor are canceled, the blob file will be 
> cleaned up. But the real source thread is not finished at this time, which 
> will cause a ClassNotFoundException when loading a new class. In this case, 
> the source thread may not be able to properly clean up and release resources 
> (such as closing child threads, cleaning up local files, etc.). Therefore, I 
> think we should mark this task canceled or finished after the execution of 
> the source thread is completed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15467) Should wait for the end of the source thread during the Task cancellation

2020-06-03 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15467?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17124822#comment-17124822
 ] 

Piotr Nowojski commented on FLINK-15467:


Thanks for coming back and providing a way to reproduce the problem. I haven't 
run your code, but it looks like you are right. I'm not entirely sure how to 
fix this from the top of my head, someone would have to investigate this 
further.

> Should wait for the end of the source thread during the Task cancellation
> -
>
> Key: FLINK-15467
> URL: https://issues.apache.org/jira/browse/FLINK-15467
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Task
>Affects Versions: 1.9.0, 1.9.1, 1.10.1
>Reporter: ming li
>Priority: Critical
> Fix For: 1.12.0
>
>
>      In the new mailBox model, SourceStreamTask starts a source thread to run 
> user methods, and the current execution thread will block on mailbox.takeMail 
> (). When a task cancels, the TaskCanceler thread will cancel the task and 
> interrupt the execution thread. Therefore, the execution thread of 
> SourceStreamTask will throw InterruptedException, then cancel the task again, 
> and throw an exception.
> {code:java}
> //代码占位符
> @Override
> protected void performDefaultAction(ActionContext context) throws Exception {
>// Against the usual contract of this method, this implementation is not 
> step-wise but blocking instead for
>// compatibility reasons with the current source interface (source 
> functions run as a loop, not in steps).
>sourceThread.start();
>// We run an alternative mailbox loop that does not involve default 
> actions and synchronizes around actions.
>try {
>   runAlternativeMailboxLoop();
>} catch (Exception mailboxEx) {
>   // We cancel the source function if some runtime exception escaped the 
> mailbox.
>   if (!isCanceled()) {
>  cancelTask();
>   }
>   throw mailboxEx;
>}
>sourceThread.join();
>if (!isFinished) {
>   sourceThread.checkThrowSourceExecutionException();
>}
>context.allActionsCompleted();
> }
> {code}
>    When all tasks of this TaskExecutor are canceled, the blob file will be 
> cleaned up. But the real source thread is not finished at this time, which 
> will cause a ClassNotFoundException when loading a new class. In this case, 
> the source thread may not be able to properly clean up and release resources 
> (such as closing child threads, cleaning up local files, etc.). Therefore, I 
> think we should mark this task canceled or finished after the execution of 
> the source thread is completed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15467) Should wait for the end of the source thread during the Task cancellation

2020-06-02 Thread ming li (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15467?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17123730#comment-17123730
 ] 

ming li commented on FLINK-15467:
-

Below is my test code:
{code:java}
//代码占位符

package com.flink.test;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Created by liming on 2020/6/2.
 */
public class TestJobClean {
private final Logger logger = LoggerFactory.getLogger(TestLoad.class);

public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.addSource(new CustomSource())
.addSink(new DiscardingSink());
env.execute("testJobCancel");
}

private static class CustomSource implements ParallelSourceFunction 
{
private final Logger logger = 
LoggerFactory.getLogger(CustomSource.class);
private boolean running = true;

public void run(SourceContext sourceContext) throws Exception {
try {
while (running) {
Thread.sleep(1000);
sourceContext.collect("test");
}
} finally {
try {
Thread.sleep(3000); //Simulate preparatory work
} catch (InterruptedException e) {
Thread.sleep(3000); //Simulate preparatory work
}
try {
logger.info("try to load clean class");
Class.forName("com.flink.test.TestLoad"); //try to load 
some class to do clean
logger.info("load clean class success");
} catch (Exception e) {
logger.error("load clean class failed", e);
}

}
}

public void cancel() {
running = false;
}
}
}
{code}
The logs are as follows:
2020-06-02 20:34:24,368 INFO org.apache.flink.runtime.taskmanager.Task - 
Attempting to cancel task Source: Custom Source -> Sink: Unnamed (1/1) 
(10ee411efd563d83214fd6f947dcad1e).
2020-06-02 20:34:24,369 INFO org.apache.flink.runtime.taskmanager.Task - 
Source: Custom Source -> Sink: Unnamed (1/1) (10ee411efd563d83214fd6f947dcad1e) 
switched from RUNNING to CANCELING.
2020-06-02 20:34:24,369 INFO org.apache.flink.runtime.taskmanager.Task - 
Triggering cancellation of task code Source: Custom Source -> Sink: Unnamed 
(1/1) (10ee411efd563d83214fd6f947dcad1e).
2020-06-02 20:34:24,371 INFO org.apache.flink.runtime.taskmanager.Task - 
Source: Custom Source -> Sink: Unnamed (1/1) (10ee411efd563d83214fd6f947dcad1e) 
switched from CANCELING to CANCELED.
2020-06-02 20:34:24,371 INFO org.apache.flink.runtime.taskmanager.Task - 
Freeing task resources for Source: Custom Source -> Sink: Unnamed (1/1) 
(10ee411efd563d83214fd6f947dcad1e).
2020-06-02 20:34:24,372 INFO org.apache.flink.runtime.taskmanager.Task - 
Ensuring all FileSystem streams are closed for task Source: Custom Source -> 
Sink: Unnamed (1/1) (10ee411efd563d83214fd6f947dcad1e) [CANCELED]
2020-06-02 20:34:24,373 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor 
- Un-registering task and sending final execution state CANCELED to JobManager 
for task Source: Custom Source -> Sink: Unnamed (1/1) 
10ee411efd563d83214fd6f947dcad1e.
2020-06-02 20:34:24,383 INFO 
org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl - Free slot 
TaskSlot(index:0, state:ACTIVE, resource profile: 
ResourceProfile\{cpuCores=1., taskHeapMemory=384.000mb 
(402653174 bytes), taskOffHeapMemory=0 bytes, managedMemory=512.000mb 
(536870920 bytes), networkMemory=128.000mb (134217730 bytes)}, allocationId: 
e42db2155c2aa58e98c1dce598ecb7f5, jobId: 36242552141952de9bc8f3867f9d6f6d).
2020-06-02 20:34:24,383 INFO 
org.apache.flink.runtime.taskexecutor.JobLeaderService - Remove job 
36242552141952de9bc8f3867f9d6f6d from job leader monitoring.
2020-06-02 20:34:24,384 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor 
- Close JobManager connection for job 36242552141952de9bc8f3867f9d6f6d.
2020-06-02 20:34:24,386 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor 
- Close JobManager connection for job 36242552141952de9bc8f3867f9d6f6d.
2020-06-02 20:34:24,387 INFO 
org.apache.flink.runtime.taskexecutor.JobLeaderService - Cannot reconnect to 
job 36242552141952de9bc8f3867f9d6f6d because it is not registered.
2020-06-02 20:34:27,375 INFO com.flink.test.TestJobClean$CustomSource - try to 
load clean class
2020-06-02 20:34:27,376 ERROR com.flink.test.TestJobClean$CustomSource - load 
clean class failed
java.lang.ClassNotFoundException: com.flink.test.TestLoad
at 

[jira] [Commented] (FLINK-15467) Should wait for the end of the source thread during the Task cancellation

2020-06-02 Thread ming li (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15467?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17123726#comment-17123726
 ] 

ming li commented on FLINK-15467:
-

Hi,[~pnowojski].I try to use the latest release version (1.10.1), and the same 
problem exists: when the job is canceled, I try to load certain classes, but 
the jar package file has been cleared. Therefore, I think we still need to wait 
for the sourceThread to complete before clearing the jar file.

> Should wait for the end of the source thread during the Task cancellation
> -
>
> Key: FLINK-15467
> URL: https://issues.apache.org/jira/browse/FLINK-15467
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Task
>Affects Versions: 1.9.0, 1.9.1
>Reporter: ming li
>Priority: Major
>
>      In the new mailBox model, SourceStreamTask starts a source thread to run 
> user methods, and the current execution thread will block on mailbox.takeMail 
> (). When a task cancels, the TaskCanceler thread will cancel the task and 
> interrupt the execution thread. Therefore, the execution thread of 
> SourceStreamTask will throw InterruptedException, then cancel the task again, 
> and throw an exception.
> {code:java}
> //代码占位符
> @Override
> protected void performDefaultAction(ActionContext context) throws Exception {
>// Against the usual contract of this method, this implementation is not 
> step-wise but blocking instead for
>// compatibility reasons with the current source interface (source 
> functions run as a loop, not in steps).
>sourceThread.start();
>// We run an alternative mailbox loop that does not involve default 
> actions and synchronizes around actions.
>try {
>   runAlternativeMailboxLoop();
>} catch (Exception mailboxEx) {
>   // We cancel the source function if some runtime exception escaped the 
> mailbox.
>   if (!isCanceled()) {
>  cancelTask();
>   }
>   throw mailboxEx;
>}
>sourceThread.join();
>if (!isFinished) {
>   sourceThread.checkThrowSourceExecutionException();
>}
>context.allActionsCompleted();
> }
> {code}
>    When all tasks of this TaskExecutor are canceled, the blob file will be 
> cleaned up. But the real source thread is not finished at this time, which 
> will cause a ClassNotFoundException when loading a new class. In this case, 
> the source thread may not be able to properly clean up and release resources 
> (such as closing child threads, cleaning up local files, etc.). Therefore, I 
> think we should mark this task canceled or finished after the execution of 
> the source thread is completed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15467) Should wait for the end of the source thread during the Task cancellation

2020-01-21 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15467?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17020179#comment-17020179
 ] 

Piotr Nowojski commented on FLINK-15467:


Could you [~Ming Li] elaborate what's the problem? Which method invocations? 
Also this code has changed quite a lot, so is this issue still valid in the 
current master?

> Should wait for the end of the source thread during the Task cancellation
> -
>
> Key: FLINK-15467
> URL: https://issues.apache.org/jira/browse/FLINK-15467
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Task
>Affects Versions: 1.9.0, 1.9.1
>Reporter: ming li
>Priority: Major
>
>      In the new mailBox model, SourceStreamTask starts a source thread to run 
> user methods, and the current execution thread will block on mailbox.takeMail 
> (). When a task cancels, the TaskCanceler thread will cancel the task and 
> interrupt the execution thread. Therefore, the execution thread of 
> SourceStreamTask will throw InterruptedException, then cancel the task again, 
> and throw an exception.
> {code:java}
> //代码占位符
> @Override
> protected void performDefaultAction(ActionContext context) throws Exception {
>// Against the usual contract of this method, this implementation is not 
> step-wise but blocking instead for
>// compatibility reasons with the current source interface (source 
> functions run as a loop, not in steps).
>sourceThread.start();
>// We run an alternative mailbox loop that does not involve default 
> actions and synchronizes around actions.
>try {
>   runAlternativeMailboxLoop();
>} catch (Exception mailboxEx) {
>   // We cancel the source function if some runtime exception escaped the 
> mailbox.
>   if (!isCanceled()) {
>  cancelTask();
>   }
>   throw mailboxEx;
>}
>sourceThread.join();
>if (!isFinished) {
>   sourceThread.checkThrowSourceExecutionException();
>}
>context.allActionsCompleted();
> }
> {code}
>    When all tasks of this TaskExecutor are canceled, the blob file will be 
> cleaned up. But the real source thread is not finished at this time, which 
> will cause a ClassNotFoundException when loading a new class. In this case, 
> the source thread may not be able to properly clean up and release resources 
> (such as closing child threads, cleaning up local files, etc.). Therefore, I 
> think we should mark this task canceled or finished after the execution of 
> the source thread is completed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)