[jira] [Commented] (FLINK-15467) Should wait for the end of the source thread during the Task cancellation
[ https://issues.apache.org/jira/browse/FLINK-15467?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17197216#comment-17197216 ] Jun Qin commented on FLINK-15467: - Great! Thanks [~AHeise] > Should wait for the end of the source thread during the Task cancellation > - > > Key: FLINK-15467 > URL: https://issues.apache.org/jira/browse/FLINK-15467 > Project: Flink > Issue Type: Bug > Components: Runtime / Task >Affects Versions: 1.9.0, 1.9.1, 1.10.1 >Reporter: ming li >Assignee: Roman Khachatryan >Priority: Major > Labels: pull-request-available > Fix For: 1.12.0, 1.10.3, 1.11.3 > > > In the new mailBox model, SourceStreamTask starts a source thread to run > user methods, and the current execution thread will block on mailbox.takeMail > (). When a task cancels, the TaskCanceler thread will cancel the task and > interrupt the execution thread. Therefore, the execution thread of > SourceStreamTask will throw InterruptedException, then cancel the task again, > and throw an exception. > {code:java} > //代码占位符 > @Override > protected void performDefaultAction(ActionContext context) throws Exception { >// Against the usual contract of this method, this implementation is not > step-wise but blocking instead for >// compatibility reasons with the current source interface (source > functions run as a loop, not in steps). >sourceThread.start(); >// We run an alternative mailbox loop that does not involve default > actions and synchronizes around actions. >try { > runAlternativeMailboxLoop(); >} catch (Exception mailboxEx) { > // We cancel the source function if some runtime exception escaped the > mailbox. > if (!isCanceled()) { > cancelTask(); > } > throw mailboxEx; >} >sourceThread.join(); >if (!isFinished) { > sourceThread.checkThrowSourceExecutionException(); >} >context.allActionsCompleted(); > } > {code} > When all tasks of this TaskExecutor are canceled, the blob file will be > cleaned up. But the real source thread is not finished at this time, which > will cause a ClassNotFoundException when loading a new class. In this case, > the source thread may not be able to properly clean up and release resources > (such as closing child threads, cleaning up local files, etc.). Therefore, I > think we should mark this task canceled or finished after the execution of > the source thread is completed. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15467) Should wait for the end of the source thread during the Task cancellation
[ https://issues.apache.org/jira/browse/FLINK-15467?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17197200#comment-17197200 ] Arvid Heise commented on FLINK-15467: - Merged to release-1.10 a20fb4a796d8df91e4ebbcdc0b35cdd75145c574. > Should wait for the end of the source thread during the Task cancellation > - > > Key: FLINK-15467 > URL: https://issues.apache.org/jira/browse/FLINK-15467 > Project: Flink > Issue Type: Bug > Components: Runtime / Task >Affects Versions: 1.9.0, 1.9.1, 1.10.1 >Reporter: ming li >Priority: Major > Labels: pull-request-available > Fix For: 1.12.0, 1.11.3 > > > In the new mailBox model, SourceStreamTask starts a source thread to run > user methods, and the current execution thread will block on mailbox.takeMail > (). When a task cancels, the TaskCanceler thread will cancel the task and > interrupt the execution thread. Therefore, the execution thread of > SourceStreamTask will throw InterruptedException, then cancel the task again, > and throw an exception. > {code:java} > //代码占位符 > @Override > protected void performDefaultAction(ActionContext context) throws Exception { >// Against the usual contract of this method, this implementation is not > step-wise but blocking instead for >// compatibility reasons with the current source interface (source > functions run as a loop, not in steps). >sourceThread.start(); >// We run an alternative mailbox loop that does not involve default > actions and synchronizes around actions. >try { > runAlternativeMailboxLoop(); >} catch (Exception mailboxEx) { > // We cancel the source function if some runtime exception escaped the > mailbox. > if (!isCanceled()) { > cancelTask(); > } > throw mailboxEx; >} >sourceThread.join(); >if (!isFinished) { > sourceThread.checkThrowSourceExecutionException(); >} >context.allActionsCompleted(); > } > {code} > When all tasks of this TaskExecutor are canceled, the blob file will be > cleaned up. But the real source thread is not finished at this time, which > will cause a ClassNotFoundException when loading a new class. In this case, > the source thread may not be able to properly clean up and release resources > (such as closing child threads, cleaning up local files, etc.). Therefore, I > think we should mark this task canceled or finished after the execution of > the source thread is completed. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15467) Should wait for the end of the source thread during the Task cancellation
[ https://issues.apache.org/jira/browse/FLINK-15467?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17192936#comment-17192936 ] Jun Qin commented on FLINK-15467: - [~pnowojski] as discussed, could you please back port this fix to Flink 1.10 as it is impacting Flink 1.10 customers. > Should wait for the end of the source thread during the Task cancellation > - > > Key: FLINK-15467 > URL: https://issues.apache.org/jira/browse/FLINK-15467 > Project: Flink > Issue Type: Bug > Components: Runtime / Task >Affects Versions: 1.9.0, 1.9.1, 1.10.1 >Reporter: ming li >Assignee: Roman Khachatryan >Priority: Major > Labels: pull-request-available > Fix For: 1.12.0, 1.11.2 > > > In the new mailBox model, SourceStreamTask starts a source thread to run > user methods, and the current execution thread will block on mailbox.takeMail > (). When a task cancels, the TaskCanceler thread will cancel the task and > interrupt the execution thread. Therefore, the execution thread of > SourceStreamTask will throw InterruptedException, then cancel the task again, > and throw an exception. > {code:java} > //代码占位符 > @Override > protected void performDefaultAction(ActionContext context) throws Exception { >// Against the usual contract of this method, this implementation is not > step-wise but blocking instead for >// compatibility reasons with the current source interface (source > functions run as a loop, not in steps). >sourceThread.start(); >// We run an alternative mailbox loop that does not involve default > actions and synchronizes around actions. >try { > runAlternativeMailboxLoop(); >} catch (Exception mailboxEx) { > // We cancel the source function if some runtime exception escaped the > mailbox. > if (!isCanceled()) { > cancelTask(); > } > throw mailboxEx; >} >sourceThread.join(); >if (!isFinished) { > sourceThread.checkThrowSourceExecutionException(); >} >context.allActionsCompleted(); > } > {code} > When all tasks of this TaskExecutor are canceled, the blob file will be > cleaned up. But the real source thread is not finished at this time, which > will cause a ClassNotFoundException when loading a new class. In this case, > the source thread may not be able to properly clean up and release resources > (such as closing child threads, cleaning up local files, etc.). Therefore, I > think we should mark this task canceled or finished after the execution of > the source thread is completed. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15467) Should wait for the end of the source thread during the Task cancellation
[ https://issues.apache.org/jira/browse/FLINK-15467?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17168532#comment-17168532 ] Roman Khachatryan commented on FLINK-15467: --- Thanks for the clarification [~Ming Li]. I think the fix covers this case too. In fact, I checked it manually and it fixed the problem. But of course, a double check from your side would be highly appreciated. > Should wait for the end of the source thread during the Task cancellation > - > > Key: FLINK-15467 > URL: https://issues.apache.org/jira/browse/FLINK-15467 > Project: Flink > Issue Type: Bug > Components: Runtime / Task >Affects Versions: 1.9.0, 1.9.1, 1.10.1 >Reporter: ming li >Assignee: Roman Khachatryan >Priority: Major > Labels: pull-request-available > Fix For: 1.12.0, 1.11.2 > > > In the new mailBox model, SourceStreamTask starts a source thread to run > user methods, and the current execution thread will block on mailbox.takeMail > (). When a task cancels, the TaskCanceler thread will cancel the task and > interrupt the execution thread. Therefore, the execution thread of > SourceStreamTask will throw InterruptedException, then cancel the task again, > and throw an exception. > {code:java} > //代码占位符 > @Override > protected void performDefaultAction(ActionContext context) throws Exception { >// Against the usual contract of this method, this implementation is not > step-wise but blocking instead for >// compatibility reasons with the current source interface (source > functions run as a loop, not in steps). >sourceThread.start(); >// We run an alternative mailbox loop that does not involve default > actions and synchronizes around actions. >try { > runAlternativeMailboxLoop(); >} catch (Exception mailboxEx) { > // We cancel the source function if some runtime exception escaped the > mailbox. > if (!isCanceled()) { > cancelTask(); > } > throw mailboxEx; >} >sourceThread.join(); >if (!isFinished) { > sourceThread.checkThrowSourceExecutionException(); >} >context.allActionsCompleted(); > } > {code} > When all tasks of this TaskExecutor are canceled, the blob file will be > cleaned up. But the real source thread is not finished at this time, which > will cause a ClassNotFoundException when loading a new class. In this case, > the source thread may not be able to properly clean up and release resources > (such as closing child threads, cleaning up local files, etc.). Therefore, I > think we should mark this task canceled or finished after the execution of > the source thread is completed. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15467) Should wait for the end of the source thread during the Task cancellation
[ https://issues.apache.org/jira/browse/FLINK-15467?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17168528#comment-17168528 ] Piotr Nowojski commented on FLINK-15467: That was also our understanding [~Ming Li]. Either way the problem should be gone as {{SourceStreamTask}} wouldn't finish until {{sourceThread}} is done and life cycle of the Task's class loader is bound to the Task's life. > Should wait for the end of the source thread during the Task cancellation > - > > Key: FLINK-15467 > URL: https://issues.apache.org/jira/browse/FLINK-15467 > Project: Flink > Issue Type: Bug > Components: Runtime / Task >Affects Versions: 1.9.0, 1.9.1, 1.10.1 >Reporter: ming li >Assignee: Roman Khachatryan >Priority: Major > Labels: pull-request-available > Fix For: 1.12.0, 1.11.2 > > > In the new mailBox model, SourceStreamTask starts a source thread to run > user methods, and the current execution thread will block on mailbox.takeMail > (). When a task cancels, the TaskCanceler thread will cancel the task and > interrupt the execution thread. Therefore, the execution thread of > SourceStreamTask will throw InterruptedException, then cancel the task again, > and throw an exception. > {code:java} > //代码占位符 > @Override > protected void performDefaultAction(ActionContext context) throws Exception { >// Against the usual contract of this method, this implementation is not > step-wise but blocking instead for >// compatibility reasons with the current source interface (source > functions run as a loop, not in steps). >sourceThread.start(); >// We run an alternative mailbox loop that does not involve default > actions and synchronizes around actions. >try { > runAlternativeMailboxLoop(); >} catch (Exception mailboxEx) { > // We cancel the source function if some runtime exception escaped the > mailbox. > if (!isCanceled()) { > cancelTask(); > } > throw mailboxEx; >} >sourceThread.join(); >if (!isFinished) { > sourceThread.checkThrowSourceExecutionException(); >} >context.allActionsCompleted(); > } > {code} > When all tasks of this TaskExecutor are canceled, the blob file will be > cleaned up. But the real source thread is not finished at this time, which > will cause a ClassNotFoundException when loading a new class. In this case, > the source thread may not be able to properly clean up and release resources > (such as closing child threads, cleaning up local files, etc.). Therefore, I > think we should mark this task canceled or finished after the execution of > the source thread is completed. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15467) Should wait for the end of the source thread during the Task cancellation
[ https://issues.apache.org/jira/browse/FLINK-15467?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17168520#comment-17168520 ] ming li commented on FLINK-15467: - Thanks for this fix. I want to correct the cause of this problem: it is not that the blob file is deleted, but that the userClassloader will be closed when the task ends. Later, when trying to use this classloader to load a class (when the classloader is not specified, the classloader of the caller class will be used), a ClassNotFoundException will be thrown. > Should wait for the end of the source thread during the Task cancellation > - > > Key: FLINK-15467 > URL: https://issues.apache.org/jira/browse/FLINK-15467 > Project: Flink > Issue Type: Bug > Components: Runtime / Task >Affects Versions: 1.9.0, 1.9.1, 1.10.1 >Reporter: ming li >Assignee: Roman Khachatryan >Priority: Major > Labels: pull-request-available > Fix For: 1.12.0, 1.11.2 > > > In the new mailBox model, SourceStreamTask starts a source thread to run > user methods, and the current execution thread will block on mailbox.takeMail > (). When a task cancels, the TaskCanceler thread will cancel the task and > interrupt the execution thread. Therefore, the execution thread of > SourceStreamTask will throw InterruptedException, then cancel the task again, > and throw an exception. > {code:java} > //代码占位符 > @Override > protected void performDefaultAction(ActionContext context) throws Exception { >// Against the usual contract of this method, this implementation is not > step-wise but blocking instead for >// compatibility reasons with the current source interface (source > functions run as a loop, not in steps). >sourceThread.start(); >// We run an alternative mailbox loop that does not involve default > actions and synchronizes around actions. >try { > runAlternativeMailboxLoop(); >} catch (Exception mailboxEx) { > // We cancel the source function if some runtime exception escaped the > mailbox. > if (!isCanceled()) { > cancelTask(); > } > throw mailboxEx; >} >sourceThread.join(); >if (!isFinished) { > sourceThread.checkThrowSourceExecutionException(); >} >context.allActionsCompleted(); > } > {code} > When all tasks of this TaskExecutor are canceled, the blob file will be > cleaned up. But the real source thread is not finished at this time, which > will cause a ClassNotFoundException when loading a new class. In this case, > the source thread may not be able to properly clean up and release resources > (such as closing child threads, cleaning up local files, etc.). Therefore, I > think we should mark this task canceled or finished after the execution of > the source thread is completed. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15467) Should wait for the end of the source thread during the Task cancellation
[ https://issues.apache.org/jira/browse/FLINK-15467?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17139948#comment-17139948 ] Roman Khachatryan commented on FLINK-15467: --- Hi [~Ming Li], thanks for the clarification! I was able to reproduce the issue and will submit a PR to fix it. > Should wait for the end of the source thread during the Task cancellation > - > > Key: FLINK-15467 > URL: https://issues.apache.org/jira/browse/FLINK-15467 > Project: Flink > Issue Type: Bug > Components: Runtime / Task >Affects Versions: 1.9.0, 1.9.1, 1.10.1 >Reporter: ming li >Assignee: Roman Khachatryan >Priority: Critical > Labels: pull-request-available > Fix For: 1.12.0 > > > In the new mailBox model, SourceStreamTask starts a source thread to run > user methods, and the current execution thread will block on mailbox.takeMail > (). When a task cancels, the TaskCanceler thread will cancel the task and > interrupt the execution thread. Therefore, the execution thread of > SourceStreamTask will throw InterruptedException, then cancel the task again, > and throw an exception. > {code:java} > //代码占位符 > @Override > protected void performDefaultAction(ActionContext context) throws Exception { >// Against the usual contract of this method, this implementation is not > step-wise but blocking instead for >// compatibility reasons with the current source interface (source > functions run as a loop, not in steps). >sourceThread.start(); >// We run an alternative mailbox loop that does not involve default > actions and synchronizes around actions. >try { > runAlternativeMailboxLoop(); >} catch (Exception mailboxEx) { > // We cancel the source function if some runtime exception escaped the > mailbox. > if (!isCanceled()) { > cancelTask(); > } > throw mailboxEx; >} >sourceThread.join(); >if (!isFinished) { > sourceThread.checkThrowSourceExecutionException(); >} >context.allActionsCompleted(); > } > {code} > When all tasks of this TaskExecutor are canceled, the blob file will be > cleaned up. But the real source thread is not finished at this time, which > will cause a ClassNotFoundException when loading a new class. In this case, > the source thread may not be able to properly clean up and release resources > (such as closing child threads, cleaning up local files, etc.). Therefore, I > think we should mark this task canceled or finished after the execution of > the source thread is completed. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15467) Should wait for the end of the source thread during the Task cancellation
[ https://issues.apache.org/jira/browse/FLINK-15467?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17138977#comment-17138977 ] ming li commented on FLINK-15467: - Hi,[~roman_khachatryan],In my test, I ran my test code in a standalone cluster. After running, I directly clicked cancel on the webUI to trigger a cancel process. In the test code, simulating soucethread may take a lot of time to clean up, so I try to sleep for 3 seconds and then load a class that has not been loaded by jvm. At this time, the above exception will occur. I have tried not to sleep and then load a new class, this problem will not occur, I think this is because the cleaning of BlobLibrary has not been completed. I also think we should join the source thread in SourceStreamTask.cancelTask to fix this problem. > Should wait for the end of the source thread during the Task cancellation > - > > Key: FLINK-15467 > URL: https://issues.apache.org/jira/browse/FLINK-15467 > Project: Flink > Issue Type: Bug > Components: Runtime / Task >Affects Versions: 1.9.0, 1.9.1, 1.10.1 >Reporter: ming li >Assignee: Roman Khachatryan >Priority: Critical > Labels: pull-request-available > Fix For: 1.12.0 > > > In the new mailBox model, SourceStreamTask starts a source thread to run > user methods, and the current execution thread will block on mailbox.takeMail > (). When a task cancels, the TaskCanceler thread will cancel the task and > interrupt the execution thread. Therefore, the execution thread of > SourceStreamTask will throw InterruptedException, then cancel the task again, > and throw an exception. > {code:java} > //代码占位符 > @Override > protected void performDefaultAction(ActionContext context) throws Exception { >// Against the usual contract of this method, this implementation is not > step-wise but blocking instead for >// compatibility reasons with the current source interface (source > functions run as a loop, not in steps). >sourceThread.start(); >// We run an alternative mailbox loop that does not involve default > actions and synchronizes around actions. >try { > runAlternativeMailboxLoop(); >} catch (Exception mailboxEx) { > // We cancel the source function if some runtime exception escaped the > mailbox. > if (!isCanceled()) { > cancelTask(); > } > throw mailboxEx; >} >sourceThread.join(); >if (!isFinished) { > sourceThread.checkThrowSourceExecutionException(); >} >context.allActionsCompleted(); > } > {code} > When all tasks of this TaskExecutor are canceled, the blob file will be > cleaned up. But the real source thread is not finished at this time, which > will cause a ClassNotFoundException when loading a new class. In this case, > the source thread may not be able to properly clean up and release resources > (such as closing child threads, cleaning up local files, etc.). Therefore, I > think we should mark this task canceled or finished after the execution of > the source thread is completed. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15467) Should wait for the end of the source thread during the Task cancellation
[ https://issues.apache.org/jira/browse/FLINK-15467?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17138723#comment-17138723 ] Roman Khachatryan commented on FLINK-15467: --- I tried to run the program above with some corrections (to stop the job after some elements and load other classes). I couldn't reproduce the issue: the class was loaded successfully after stopping all services. [~Ming Li], are you running it on a cluster and then stopping via JM? If yes, I think this is what I think is happening in your case: # Job is canceled in JM # JM sends RPC to TM to cancel the task # TM calls Task.cancelExecution which starts TaskCanceler and TaskInterrupter # Source thread ignores the interrupt # Main task thread proceeds to finally block in Task.doRun, where it calls cancelInvokable and notifyFinalState >> # >> TaskExecutor.unregisterTaskAndNotifyFinalState > taskSlotTable.removeTask > # >> TaskExecutor.closeJobManagerConnectionIfNoAllocatedResources, which called closeJob > BlobLibraryCacheManager.release If this is the case, then just joining the source thread inside cancelInvokable would solve the problem (specifically, in SourceStreamTask.cancelTask). SourceStreamTask.cancelTask is called from: * RPC notification about checkpoint completion - fine, done asynchronously * TaskCanceller - can delay subsequent networkResourcesCloser.run() - which is probably also fine * Task.cancelInvokable - what we want > Should wait for the end of the source thread during the Task cancellation > - > > Key: FLINK-15467 > URL: https://issues.apache.org/jira/browse/FLINK-15467 > Project: Flink > Issue Type: Bug > Components: Runtime / Task >Affects Versions: 1.9.0, 1.9.1, 1.10.1 >Reporter: ming li >Assignee: Roman Khachatryan >Priority: Critical > Fix For: 1.12.0 > > > In the new mailBox model, SourceStreamTask starts a source thread to run > user methods, and the current execution thread will block on mailbox.takeMail > (). When a task cancels, the TaskCanceler thread will cancel the task and > interrupt the execution thread. Therefore, the execution thread of > SourceStreamTask will throw InterruptedException, then cancel the task again, > and throw an exception. > {code:java} > //代码占位符 > @Override > protected void performDefaultAction(ActionContext context) throws Exception { >// Against the usual contract of this method, this implementation is not > step-wise but blocking instead for >// compatibility reasons with the current source interface (source > functions run as a loop, not in steps). >sourceThread.start(); >// We run an alternative mailbox loop that does not involve default > actions and synchronizes around actions. >try { > runAlternativeMailboxLoop(); >} catch (Exception mailboxEx) { > // We cancel the source function if some runtime exception escaped the > mailbox. > if (!isCanceled()) { > cancelTask(); > } > throw mailboxEx; >} >sourceThread.join(); >if (!isFinished) { > sourceThread.checkThrowSourceExecutionException(); >} >context.allActionsCompleted(); > } > {code} > When all tasks of this TaskExecutor are canceled, the blob file will be > cleaned up. But the real source thread is not finished at this time, which > will cause a ClassNotFoundException when loading a new class. In this case, > the source thread may not be able to properly clean up and release resources > (such as closing child threads, cleaning up local files, etc.). Therefore, I > think we should mark this task canceled or finished after the execution of > the source thread is completed. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15467) Should wait for the end of the source thread during the Task cancellation
[ https://issues.apache.org/jira/browse/FLINK-15467?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17124822#comment-17124822 ] Piotr Nowojski commented on FLINK-15467: Thanks for coming back and providing a way to reproduce the problem. I haven't run your code, but it looks like you are right. I'm not entirely sure how to fix this from the top of my head, someone would have to investigate this further. > Should wait for the end of the source thread during the Task cancellation > - > > Key: FLINK-15467 > URL: https://issues.apache.org/jira/browse/FLINK-15467 > Project: Flink > Issue Type: Bug > Components: Runtime / Task >Affects Versions: 1.9.0, 1.9.1, 1.10.1 >Reporter: ming li >Priority: Critical > Fix For: 1.12.0 > > > In the new mailBox model, SourceStreamTask starts a source thread to run > user methods, and the current execution thread will block on mailbox.takeMail > (). When a task cancels, the TaskCanceler thread will cancel the task and > interrupt the execution thread. Therefore, the execution thread of > SourceStreamTask will throw InterruptedException, then cancel the task again, > and throw an exception. > {code:java} > //代码占位符 > @Override > protected void performDefaultAction(ActionContext context) throws Exception { >// Against the usual contract of this method, this implementation is not > step-wise but blocking instead for >// compatibility reasons with the current source interface (source > functions run as a loop, not in steps). >sourceThread.start(); >// We run an alternative mailbox loop that does not involve default > actions and synchronizes around actions. >try { > runAlternativeMailboxLoop(); >} catch (Exception mailboxEx) { > // We cancel the source function if some runtime exception escaped the > mailbox. > if (!isCanceled()) { > cancelTask(); > } > throw mailboxEx; >} >sourceThread.join(); >if (!isFinished) { > sourceThread.checkThrowSourceExecutionException(); >} >context.allActionsCompleted(); > } > {code} > When all tasks of this TaskExecutor are canceled, the blob file will be > cleaned up. But the real source thread is not finished at this time, which > will cause a ClassNotFoundException when loading a new class. In this case, > the source thread may not be able to properly clean up and release resources > (such as closing child threads, cleaning up local files, etc.). Therefore, I > think we should mark this task canceled or finished after the execution of > the source thread is completed. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15467) Should wait for the end of the source thread during the Task cancellation
[ https://issues.apache.org/jira/browse/FLINK-15467?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17123730#comment-17123730 ] ming li commented on FLINK-15467: - Below is my test code: {code:java} //代码占位符 package com.flink.test; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.DiscardingSink; import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * Created by liming on 2020/6/2. */ public class TestJobClean { private final Logger logger = LoggerFactory.getLogger(TestLoad.class); public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.addSource(new CustomSource()) .addSink(new DiscardingSink()); env.execute("testJobCancel"); } private static class CustomSource implements ParallelSourceFunction { private final Logger logger = LoggerFactory.getLogger(CustomSource.class); private boolean running = true; public void run(SourceContext sourceContext) throws Exception { try { while (running) { Thread.sleep(1000); sourceContext.collect("test"); } } finally { try { Thread.sleep(3000); //Simulate preparatory work } catch (InterruptedException e) { Thread.sleep(3000); //Simulate preparatory work } try { logger.info("try to load clean class"); Class.forName("com.flink.test.TestLoad"); //try to load some class to do clean logger.info("load clean class success"); } catch (Exception e) { logger.error("load clean class failed", e); } } } public void cancel() { running = false; } } } {code} The logs are as follows: 2020-06-02 20:34:24,368 INFO org.apache.flink.runtime.taskmanager.Task - Attempting to cancel task Source: Custom Source -> Sink: Unnamed (1/1) (10ee411efd563d83214fd6f947dcad1e). 2020-06-02 20:34:24,369 INFO org.apache.flink.runtime.taskmanager.Task - Source: Custom Source -> Sink: Unnamed (1/1) (10ee411efd563d83214fd6f947dcad1e) switched from RUNNING to CANCELING. 2020-06-02 20:34:24,369 INFO org.apache.flink.runtime.taskmanager.Task - Triggering cancellation of task code Source: Custom Source -> Sink: Unnamed (1/1) (10ee411efd563d83214fd6f947dcad1e). 2020-06-02 20:34:24,371 INFO org.apache.flink.runtime.taskmanager.Task - Source: Custom Source -> Sink: Unnamed (1/1) (10ee411efd563d83214fd6f947dcad1e) switched from CANCELING to CANCELED. 2020-06-02 20:34:24,371 INFO org.apache.flink.runtime.taskmanager.Task - Freeing task resources for Source: Custom Source -> Sink: Unnamed (1/1) (10ee411efd563d83214fd6f947dcad1e). 2020-06-02 20:34:24,372 INFO org.apache.flink.runtime.taskmanager.Task - Ensuring all FileSystem streams are closed for task Source: Custom Source -> Sink: Unnamed (1/1) (10ee411efd563d83214fd6f947dcad1e) [CANCELED] 2020-06-02 20:34:24,373 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Un-registering task and sending final execution state CANCELED to JobManager for task Source: Custom Source -> Sink: Unnamed (1/1) 10ee411efd563d83214fd6f947dcad1e. 2020-06-02 20:34:24,383 INFO org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl - Free slot TaskSlot(index:0, state:ACTIVE, resource profile: ResourceProfile\{cpuCores=1., taskHeapMemory=384.000mb (402653174 bytes), taskOffHeapMemory=0 bytes, managedMemory=512.000mb (536870920 bytes), networkMemory=128.000mb (134217730 bytes)}, allocationId: e42db2155c2aa58e98c1dce598ecb7f5, jobId: 36242552141952de9bc8f3867f9d6f6d). 2020-06-02 20:34:24,383 INFO org.apache.flink.runtime.taskexecutor.JobLeaderService - Remove job 36242552141952de9bc8f3867f9d6f6d from job leader monitoring. 2020-06-02 20:34:24,384 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Close JobManager connection for job 36242552141952de9bc8f3867f9d6f6d. 2020-06-02 20:34:24,386 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Close JobManager connection for job 36242552141952de9bc8f3867f9d6f6d. 2020-06-02 20:34:24,387 INFO org.apache.flink.runtime.taskexecutor.JobLeaderService - Cannot reconnect to job 36242552141952de9bc8f3867f9d6f6d because it is not registered. 2020-06-02 20:34:27,375 INFO com.flink.test.TestJobClean$CustomSource - try to load clean class 2020-06-02 20:34:27,376 ERROR com.flink.test.TestJobClean$CustomSource - load clean class failed java.lang.ClassNotFoundException: com.flink.test.TestLoad at
[jira] [Commented] (FLINK-15467) Should wait for the end of the source thread during the Task cancellation
[ https://issues.apache.org/jira/browse/FLINK-15467?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17123726#comment-17123726 ] ming li commented on FLINK-15467: - Hi,[~pnowojski].I try to use the latest release version (1.10.1), and the same problem exists: when the job is canceled, I try to load certain classes, but the jar package file has been cleared. Therefore, I think we still need to wait for the sourceThread to complete before clearing the jar file. > Should wait for the end of the source thread during the Task cancellation > - > > Key: FLINK-15467 > URL: https://issues.apache.org/jira/browse/FLINK-15467 > Project: Flink > Issue Type: Bug > Components: Runtime / Task >Affects Versions: 1.9.0, 1.9.1 >Reporter: ming li >Priority: Major > > In the new mailBox model, SourceStreamTask starts a source thread to run > user methods, and the current execution thread will block on mailbox.takeMail > (). When a task cancels, the TaskCanceler thread will cancel the task and > interrupt the execution thread. Therefore, the execution thread of > SourceStreamTask will throw InterruptedException, then cancel the task again, > and throw an exception. > {code:java} > //代码占位符 > @Override > protected void performDefaultAction(ActionContext context) throws Exception { >// Against the usual contract of this method, this implementation is not > step-wise but blocking instead for >// compatibility reasons with the current source interface (source > functions run as a loop, not in steps). >sourceThread.start(); >// We run an alternative mailbox loop that does not involve default > actions and synchronizes around actions. >try { > runAlternativeMailboxLoop(); >} catch (Exception mailboxEx) { > // We cancel the source function if some runtime exception escaped the > mailbox. > if (!isCanceled()) { > cancelTask(); > } > throw mailboxEx; >} >sourceThread.join(); >if (!isFinished) { > sourceThread.checkThrowSourceExecutionException(); >} >context.allActionsCompleted(); > } > {code} > When all tasks of this TaskExecutor are canceled, the blob file will be > cleaned up. But the real source thread is not finished at this time, which > will cause a ClassNotFoundException when loading a new class. In this case, > the source thread may not be able to properly clean up and release resources > (such as closing child threads, cleaning up local files, etc.). Therefore, I > think we should mark this task canceled or finished after the execution of > the source thread is completed. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15467) Should wait for the end of the source thread during the Task cancellation
[ https://issues.apache.org/jira/browse/FLINK-15467?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17020179#comment-17020179 ] Piotr Nowojski commented on FLINK-15467: Could you [~Ming Li] elaborate what's the problem? Which method invocations? Also this code has changed quite a lot, so is this issue still valid in the current master? > Should wait for the end of the source thread during the Task cancellation > - > > Key: FLINK-15467 > URL: https://issues.apache.org/jira/browse/FLINK-15467 > Project: Flink > Issue Type: Bug > Components: Runtime / Task >Affects Versions: 1.9.0, 1.9.1 >Reporter: ming li >Priority: Major > > In the new mailBox model, SourceStreamTask starts a source thread to run > user methods, and the current execution thread will block on mailbox.takeMail > (). When a task cancels, the TaskCanceler thread will cancel the task and > interrupt the execution thread. Therefore, the execution thread of > SourceStreamTask will throw InterruptedException, then cancel the task again, > and throw an exception. > {code:java} > //代码占位符 > @Override > protected void performDefaultAction(ActionContext context) throws Exception { >// Against the usual contract of this method, this implementation is not > step-wise but blocking instead for >// compatibility reasons with the current source interface (source > functions run as a loop, not in steps). >sourceThread.start(); >// We run an alternative mailbox loop that does not involve default > actions and synchronizes around actions. >try { > runAlternativeMailboxLoop(); >} catch (Exception mailboxEx) { > // We cancel the source function if some runtime exception escaped the > mailbox. > if (!isCanceled()) { > cancelTask(); > } > throw mailboxEx; >} >sourceThread.join(); >if (!isFinished) { > sourceThread.checkThrowSourceExecutionException(); >} >context.allActionsCompleted(); > } > {code} > When all tasks of this TaskExecutor are canceled, the blob file will be > cleaned up. But the real source thread is not finished at this time, which > will cause a ClassNotFoundException when loading a new class. In this case, > the source thread may not be able to properly clean up and release resources > (such as closing child threads, cleaning up local files, etc.). Therefore, I > think we should mark this task canceled or finished after the execution of > the source thread is completed. -- This message was sent by Atlassian Jira (v8.3.4#803005)