Hi Kazunori, The checkpoint lock is acquired preemptively inside the SourceContext#collect call for the cases if the source is state less. However this is not enough if you are implementing a stateful `SourceFunction`, since if you are modifying state in your source function outside of the checkpoint lock scope, those updates would be happening concurrently to the `org.apache.flink.streaming.api.operators.StreamOperator#snapshotState` call and bad things would happen. `StreamOperator#snapshotState` is one example of actions that are being executed in the `StreamTaskActionExecutor`. `StreamTaskActionExecutor` depending on the execution context, if it's happening in a source task or not, will be or will not be acquiring the checkpoint lock.
Also please note that `SourceFunction` is currently (as of Flink 1.12.0) being phased out, in favor of the new `org.apache.flink.api.connector.source.Source` interface. Amongst other improvements, this newer interface has a single thread execution model, without a dedicated thread to run the source code, so there is no need for the checkpoint lock. Best, Piotrek pon., 18 sty 2021 o 13:05 Kazunori Shinhira <k.shinhira.1...@gmail.com> napisał(a): > Hi, > > > I have a question about implementing method of SourceFunction. > > I'm trying to implement my own SourceFunction using Flink 1.11.3. > > The following javadoc says that SourceFunction which implements > CheckpointedFunction should use synchronized block to perform checkpointing > and emission of elements atomically. > > > See > https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/source/SourceFunction. > html > > > > On the other hand, from the implementation of StreamSourceContexts and > StreamTaskActionExecutor, it looks like that the SourceContext.collect and > checkpointing processes are exclusive. > > I’m sorry if I misunderstood. > > > > https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java#L106 > > > https://github.com/apache/flink/blob/release-1.11.3/flink-streaming-https://github.com/apache/flink/blob/release-1.11.3/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskActionExecutor.java#L91 > > > > My Question is that is the synchronized block in SourceFunction necessary? > > And why is it necessary? > > > > > Best regards, > > -- > Kazunori Shinhira > Mail : k.shinhira.1...@gmail.com >