Hi Kazunori,

The checkpoint lock is acquired preemptively inside the
SourceContext#collect call for the cases if the source is state less.
However this is not enough if you are implementing a stateful
`SourceFunction`, since if you are modifying state in your source function
outside of the checkpoint lock scope, those updates would be happening
concurrently to the
`org.apache.flink.streaming.api.operators.StreamOperator#snapshotState`
call and bad things would happen. `StreamOperator#snapshotState` is one
example of actions that are being executed in the
`StreamTaskActionExecutor`. `StreamTaskActionExecutor` depending on the
execution context, if it's happening in a source task or not, will be or
will not be acquiring the checkpoint lock.

Also please note that `SourceFunction` is currently (as of Flink 1.12.0)
being phased out, in favor of the new
`org.apache.flink.api.connector.source.Source` interface. Amongst other
improvements, this newer interface has a single thread execution model,
without a dedicated thread to run the source code, so there is no need for
the checkpoint lock.

Best,
Piotrek

pon., 18 sty 2021 o 13:05 Kazunori Shinhira <k.shinhira.1...@gmail.com>
napisał(a):

> Hi,
>
>
> I have a question about implementing method of SourceFunction.
>
> I'm trying to implement my own SourceFunction using Flink 1.11.3.
>
> The following javadoc says that SourceFunction which implements
> CheckpointedFunction should use synchronized block to perform checkpointing
> and emission of elements atomically.
>
>
> See
> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/source/SourceFunction.
> html
>
>
>
> On the other hand, from the implementation of StreamSourceContexts and
> StreamTaskActionExecutor, it looks like that the SourceContext.collect and
> checkpointing processes are exclusive.
>
> I’m sorry if I misunderstood.
>
>
>
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java#L106
>
>
> https://github.com/apache/flink/blob/release-1.11.3/flink-streaming-https://github.com/apache/flink/blob/release-1.11.3/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskActionExecutor.java#L91
>
>
>
> My Question is that is the synchronized block in SourceFunction necessary?
>
> And why is it necessary?
>
>
>
>
> Best regards,
>
> --
> Kazunori Shinhira
> Mail : k.shinhira.1...@gmail.com
>

Reply via email to