Hi,
In short, [1] means whether the job will trigger checkpoints, and [2] means which operators will take action when checkpoints are triggered. If use ExampleCountSource, flink-streaming-java should be a dependency in pom.xml and classes such as ListState, ListStateDescriptor, FunctionInitializationContext, FunctionSnapshotContext, CheckpointedFunction, SourceFunction should be import. By the way, I'm not sure whether this mail will be displayed well because it's the first time for me to write such a formatted one. If not, please let me know. ------------------------------------------------------------------------------------ Detailed reply for question 1: CheckpointedFunction is not necessary to trigger or complete a checkpoint. A job will trigger a checkpoint when all its tasks are running and checkpointing was enabled using code in [1], such as env.enableCheckpointing(xxx). Your job in the first mail didn't trigger a checkpoint because the source was not running at the time of the first checkpoint (rather than checkpoint was not enabled). However, for some functions and operators, checkpoints make no sense. Take the code in that word count demo for an example: source → flatMap → keyBy → sum → print Assume the data: aaa bbb aaa bbb ccc aaa bbb aaa ccc ddd And assume the job failed because of somewhat error after dealing with the first 3 lines. aaa bbb aaa bbb ccc aaa -- job fail -- job recover bbb aaa ccc ddd When the source operator and the sum operator recover from a failure, they'll need a checkpoint. The source operator wants to know where to start (the 4th line) because some data may already be done before the failure. The sum operator wants to know what's the count of every word before the failure (aaa:3, bbb:2, ccc:1) so that when new sentences coming they can be calculated correctly. However, the flatMap operator doesn't need a checkpoint at all. Whenever a sentence comes, split it. This operator requires nothing from a checkpoint to recover. CheckpointedFunction in [2] is to distinguish these stateful functions from all the functions (It's not the only way, but the most flexible one). See [3] and [4] for more information. ------------------------------------------------------------------------------------ Detailed reply for question 2: Here's my sample code for ExampleCountSource.java | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 | import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; publicclassExampleCountSourceimplements SourceFunction<Long>, CheckpointedFunction { privatelong count = 0L; privatevolatileboolean isRunning = true; privatetransient ListState<Long> checkpointedCount; @Override publicvoid run(SourceContext<Long> ctx) throws Exception { while (isRunning && count < 1000) { // this synchronized block ensures that state checkpointing,// internal state updates and emission of elements are an atomic operationsynchronized (ctx.getCheckpointLock()) { ctx.collect(count); count++; } } } @Override publicvoid cancel() { isRunning = false; } @Override publicvoid initializeState(FunctionInitializationContext context) throws Exception { this.checkpointedCount = context .getOperatorStateStore() .getListState(new ListStateDescriptor<>("count", Long.class)); if (context.isRestored()) { for (Long count : this.checkpointedCount.get()) { this.count = count; } } } @Override publicvoid snapshotState(FunctionSnapshotContext context) throws Exception { this.checkpointedCount.clear(); this.checkpointedCount.add(count); } } | [1]. https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/checkpointing.html#java [2]. https://ci.apache.org/projects/flink/flink-docs-release-1.12/api/java/org/apache/flink/streaming/api/functions/source/SourceFunction.html [3]. https://ci.apache.org/projects/flink/flink-docs-release-1.12/concepts/stateful-stream-processing.html [4]. https://ci.apache.org/projects/flink/flink-docs-release-1.12/api/java/org/apache/flink/streaming/api/checkpoint/CheckpointedFunction.html Regards, Smile