[ https://issues.apache.org/jira/browse/FLINK-21990?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Piotr Nowojski reassigned FLINK-21990: -------------------------------------- Assignee: Anton Kalashnikov > SourceStreamTask will always hang if the CheckpointedFunction#snapshotState > throws an exception. > ------------------------------------------------------------------------------------------------ > > Key: FLINK-21990 > URL: https://issues.apache.org/jira/browse/FLINK-21990 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing, Runtime / Task > Affects Versions: 1.11.0, 1.12.0 > Reporter: ming li > Assignee: Anton Kalashnikov > Priority: Critical > Labels: pull-request-available > > If the source in {{SourceStreamTask}} implements {{CheckpointedFunction}} and > an exception is thrown in the snapshotState method, then the > {{SourceStreamTask}} will always hang. > The main reason is that the checkpoint is executed in the mailbox. When the > {{CheckpointedFunction#snapshotState}} of the source throws an exception, > the StreamTask#cleanUpInvoke will be called, where it will wait for the end > of the {{LegacySourceFunctionThread}} of the source. However, the source > thread does not end by itself (this requires the user to control it), the > {{Task}} will hang at this time, and the JobMaster has no perception of this > behavior. > {code:java} > protected void cleanUpInvoke() throws Exception { > getCompletionFuture().exceptionally(unused -> null).join(); //wait for > the end of the source > // clean up everything we initialized > isRunning = false; > ... > }{code} > I think we should call the cancel method of the source first, and then wait > for the end. > The following is my test code, the test branch is Flink's master branch. > {code:java} > @Test > public void testSourceFailure() throws Exception { > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.enableCheckpointing(2000L); > env.setRestartStrategy(RestartStrategies.noRestart()); > env.addSource(new FailedSource()).addSink(new DiscardingSink<>()); > JobGraph jobGraph = > StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph()); > try { > // assert that the job only execute checkpoint once and only failed > once. > TestUtils.submitJobAndWaitForResult( > cluster.getClusterClient(), jobGraph, > getClass().getClassLoader()); > } catch (JobExecutionException jobException) { > Optional<FlinkRuntimeException> throwable = > ExceptionUtils.findThrowable(jobException, > FlinkRuntimeException.class); > Assert.assertTrue(throwable.isPresent()); > Assert.assertEquals( > > CheckpointFailureManager.EXCEEDED_CHECKPOINT_TOLERABLE_FAILURE_MESSAGE, > throwable.get().getMessage()); > } > // assert that the job only failed once. > Assert.assertEquals(1, > StringGeneratingSourceFunction.INITIALIZE_TIMES.get()); > } > private static class FailedSource extends RichParallelSourceFunction<String> > implements CheckpointedFunction { > private transient boolean running; > @Override > public void open(Configuration parameters) throws Exception { > running = true; > } > @Override > public void run(SourceContext<String> ctx) throws Exception { > while (running) { > ctx.collect("test"); > } > } > @Override > public void cancel() { > running = false; > } > @Override > public void snapshotState(FunctionSnapshotContext context) throws > Exception { > throw new RuntimeException("source failed"); > } > @Override > public void initializeState(FunctionInitializationContext context) throws > Exception {} > } > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)