[ 
https://issues.apache.org/jira/browse/FLINK-21990?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski reassigned FLINK-21990:
--------------------------------------

    Assignee: Anton Kalashnikov

> SourceStreamTask will always hang if the CheckpointedFunction#snapshotState 
> throws an exception.
> ------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-21990
>                 URL: https://issues.apache.org/jira/browse/FLINK-21990
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Checkpointing, Runtime / Task
>    Affects Versions: 1.11.0, 1.12.0
>            Reporter: ming li
>            Assignee: Anton Kalashnikov
>            Priority: Critical
>              Labels: pull-request-available
>
> If the source in {{SourceStreamTask}} implements {{CheckpointedFunction}} and 
> an exception is thrown in the snapshotState method, then the 
> {{SourceStreamTask}} will always hang.
> The main reason is that the checkpoint is executed in the mailbox. When the 
> {{CheckpointedFunction#snapshotState}}  of the source throws an exception, 
> the StreamTask#cleanUpInvoke will be called, where it will wait for the end 
> of the {{LegacySourceFunctionThread}} of the source. However, the source 
> thread does not end by itself (this requires the user to control it), the 
> {{Task}} will hang at this time, and the JobMaster has no perception of this 
> behavior.
> {code:java}
> protected void cleanUpInvoke() throws Exception {
>     getCompletionFuture().exceptionally(unused -> null).join(); //wait for 
> the end of the source
>     // clean up everything we initialized
>     isRunning = false;
>     ...
> }{code}
> I think we should call the cancel method of the source first, and then wait 
> for the end.
> The following is my test code, the test branch is Flink's master branch.
> {code:java}
> @Test
> public void testSourceFailure() throws Exception {
>     final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>     env.enableCheckpointing(2000L);
>     env.setRestartStrategy(RestartStrategies.noRestart());
>     env.addSource(new FailedSource()).addSink(new DiscardingSink<>());
>     JobGraph jobGraph = 
> StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());
>     try {
>         // assert that the job only execute checkpoint once and only failed 
> once.
>         TestUtils.submitJobAndWaitForResult(
>                 cluster.getClusterClient(), jobGraph, 
> getClass().getClassLoader());
>     } catch (JobExecutionException jobException) {
>         Optional<FlinkRuntimeException> throwable =
>                 ExceptionUtils.findThrowable(jobException, 
> FlinkRuntimeException.class);
>         Assert.assertTrue(throwable.isPresent());
>         Assert.assertEquals(
>                 
> CheckpointFailureManager.EXCEEDED_CHECKPOINT_TOLERABLE_FAILURE_MESSAGE,
>                 throwable.get().getMessage());
>     }
>     // assert that the job only failed once.
>     Assert.assertEquals(1, 
> StringGeneratingSourceFunction.INITIALIZE_TIMES.get());
> }
> private static class FailedSource extends RichParallelSourceFunction<String>
>         implements CheckpointedFunction {
>     private transient boolean running;
>     @Override
>     public void open(Configuration parameters) throws Exception {
>         running = true;
>     }
>     @Override
>     public void run(SourceContext<String> ctx) throws Exception {
>         while (running) {
>             ctx.collect("test");
>         }
>     }
>     @Override
>     public void cancel() {
>         running = false;
>     }
>     @Override
>     public void snapshotState(FunctionSnapshotContext context) throws 
> Exception {
>         throw new RuntimeException("source failed");
>     }
>     @Override
>     public void initializeState(FunctionInitializationContext context) throws 
> Exception {}
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to