[ https://issues.apache.org/jira/browse/FLINK-9640?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16527141#comment-16527141 ]
Hai Zhou commented on FLINK-9640: --------------------------------- CC [~StephanEwen], what do you think about this ticket? > Checkpointing is aways aborted if any task has been finished > ------------------------------------------------------------ > > Key: FLINK-9640 > URL: https://issues.apache.org/jira/browse/FLINK-9640 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing > Affects Versions: 1.3.0, 1.4.0, 1.5.0, 1.6.0 > Reporter: Hai Zhou > Assignee: Hai Zhou > Priority: Major > Fix For: 1.6.0 > > > steps to reproduce: > 1. build a standalone flink cluster. > 2. submit a test job like this below: > {code:java} > public class DemoJob { > public static void main(String[] args) throws Exception { > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.disableOperatorChaining(); > env.setParallelism(4); > env.enableCheckpointing(3000); > > env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); > DataStream<String> inputStream = env.addSource(new > StringGeneratorParallelSourceFunction()); > inputStream.map(String::hashCode).print(); > env.execute(); > } > public static class StringGeneratorParallelSourceFunction extends > RichParallelSourceFunction<String> { > private static final Logger LOG = > LoggerFactory.getLogger(StringGeneratorParallelSourceFunction.class); > private volatile boolean running = true; > private int index; > private int subtask_nums; > @Override > public void open(Configuration parameters) throws Exception { > index = getRuntimeContext().getIndexOfThisSubtask(); > subtask_nums = getRuntimeContext().getNumberOfParallelSubtasks(); > } > @Override > public void run(SourceContext<String> ctx) throws Exception { > while (running) { > String data = UUID.randomUUID().toString(); > ctx.collect(data); > LOG.info("subtask_index = {}, emit string = {}", index, data); > Thread.sleep(50); > if (index == subtask_nums / 2) { > running = false; > LOG.info("subtask_index = {}, finished.", index); > } > } > } > @Override > public void cancel() { > running = false; > } > } > } > {code} > 3. observer jm and tm logs can be found. > *taskmanager.log* > {code:java} > 2018-06-21 17:05:54,144 INFO > com.yew1eb.flink.demo.DemoJob$StringGeneratorParallelSourceFunction - > subtask_index = 2, emit string = 5b0c2467-ad2e-4b53-b1a4-7a0f64560570 > 2018-06-21 17:05:54,151 INFO > com.yew1eb.flink.demo.DemoJob$StringGeneratorParallelSourceFunction - > subtask_index = 0, emit string = 11af78b3-59ea-467c-a267-7c2238e44ffe > 2018-06-21 17:05:54,195 INFO > com.yew1eb.flink.demo.DemoJob$StringGeneratorParallelSourceFunction - > subtask_index = 2, finished. > 2018-06-21 17:05:54,200 INFO org.apache.flink.runtime.taskmanager.Task > - Source: Custom Source (3/4) > (6b2a374bec5f31112811613537dd4fd9) switched from RUNNING to FINISHED. > 2018-06-21 17:05:54,201 INFO org.apache.flink.runtime.taskmanager.Task > - Freeing task resources for Source: Custom Source (3/4) > (6b2a374bec5f31112811613537dd4fd9). > 2018-06-21 17:05:54,201 INFO org.apache.flink.runtime.taskmanager.Task > - Ensuring all FileSystem streams are closed for task Source: > Custom Source (3/4) (6b2a374bec5f31112811613537dd4fd9) [FINISHED] > 2018-06-21 17:05:54,202 INFO org.apache.flink.yarn.YarnTaskManager > - Un-registering task and sending final execution state > FINISHED to JobManager for task Source: Custom Source > (6b2a374bec5f31112811613537dd4fd9) > 2018-06-21 17:05:54,211 INFO > com.yew1eb.flink.demo.DemoJob$StringGeneratorParallelSourceFunction - > subtask_index = 0, emit string = f29f48fd-ca53-4a96-b596-93948c09581d > {code} > *jobmanager.log* > {code:java} > 2018-06-21 17:05:52,682 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - Sink: Unnamed > (2/4) (3aee8bf5103065e8b57ebd0e214141ae) switched from DEPLOYING to RUNNING. > 2018-06-21 17:05:52,683 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - Map (2/4) > (de90106d04f63cb9ea531308202e233f) switched from DEPLOYING to RUNNING. > 2018-06-21 17:05:54,219 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: > Custom Source (3/4) (6b2a374bec5f31112811613537dd4fd9) switched from RUNNING > to FINISHED. > 2018-06-21 17:05:54,224 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - Map (3/4) > (8f523afb97dc848a9578f9cae5870421) switched from RUNNING to FINISHED. > 2018-06-21 17:05:54,224 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - Sink: Unnamed > (3/4) (39f76a87d20cfc491e11f0b5b08ec5c2) switched from RUNNING to FINISHED. > 2018-06-21 17:05:55,069 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint > triggering task Source: Custom Source (3/4) is not being executed at the > moment. Aborting checkpoint. > 2018-06-21 17:05:58,067 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint > triggering task Source: Custom Source (3/4) is not being executed at the > moment. Aborting checkpoint. > 2018-06-21 17:06:01,067 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint > triggering task Source: Custom Source (3/4) is not being executed at the > moment. Aborting checkpoint. > 2018-06-21 17:06:04,067 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint > triggering task Source: Custom Source (3/4) is not being executed at the > moment. Aborting checkpoint. > {code} > ------------------------------------------------------------------------------------------------- > I think we should filter out the {{FINISHED}} tasks, for tasks that need > trigger checkpoint or need ack checkpoint. more details, see > `org.apache.flink.runtime.checkpoint.CheckpointCoordinator` class. -- This message was sent by Atlassian JIRA (v7.6.3#76005)