Hi,

yes exactly :)

> As a result, Source may save wrong offset and lost record if job
recreation occurs at that timing.

This is just one of the possible race conditions that could happen. As
offsets are probably 64 bit integers, I'm pretty sure corrupted
writes/reads can also happen, when only half of the bits (for example lower
32bits) were updated before checkpointing.

Best,
Piotrek

wt., 19 sty 2021 o 02:22 Kazunori Shinhira <k.shinhira.1...@gmail.com>
napisał(a):

> Hi Piotrek,
>
>
>
> Thank you for your reply.
>
>
> I understood that synchronization with checkpoint lock is needed to make
> state modification and checkpointing exclusive.
>
> In my understanding, for example, in implementation of SourceFunction for
> Kafka, it is necessary to enclose the process of acquiring records and
> updating current offset, that is the state of kafka SourceFunction, within
> synchronized code.
>
> If we don't use synchronized block, `StreamOperator#snapshotState` will be
> called concurrently with state modification.
>
> As a result, Source may save wrong offset and lost record if job
> recreation occurs at that timing.
>
> Is my understanding correct ?
>
>
> Thank you for the information on the new Source interface.
>
> I’ll look into how to implement it.
>
>
>
> Best,
>
> 2021年1月18日(月) 23:45 Piotr Nowojski <pnowoj...@apache.org>:
>
>> Hi Kazunori,
>>
>> The checkpoint lock is acquired preemptively inside the
>> SourceContext#collect call for the cases if the source is state less.
>> However this is not enough if you are implementing a stateful
>> `SourceFunction`, since if you are modifying state in your source function
>> outside of the checkpoint lock scope, those updates would be happening
>> concurrently to the
>> `org.apache.flink.streaming.api.operators.StreamOperator#snapshotState`
>> call and bad things would happen. `StreamOperator#snapshotState` is one
>> example of actions that are being executed in the
>> `StreamTaskActionExecutor`. `StreamTaskActionExecutor` depending on the
>> execution context, if it's happening in a source task or not, will be or
>> will not be acquiring the checkpoint lock.
>>
>> Also please note that `SourceFunction` is currently (as of Flink 1.12.0)
>> being phased out, in favor of the new
>> `org.apache.flink.api.connector.source.Source` interface. Amongst other
>> improvements, this newer interface has a single thread execution model,
>> without a dedicated thread to run the source code, so there is no need for
>> the checkpoint lock.
>>
>> Best,
>> Piotrek
>>
>> pon., 18 sty 2021 o 13:05 Kazunori Shinhira <k.shinhira.1...@gmail.com>
>> napisał(a):
>>
>>> Hi,
>>>
>>>
>>> I have a question about implementing method of SourceFunction.
>>>
>>> I'm trying to implement my own SourceFunction using Flink 1.11.3.
>>>
>>> The following javadoc says that SourceFunction which implements
>>> CheckpointedFunction should use synchronized block to perform checkpointing
>>> and emission of elements atomically.
>>>
>>>
>>> See
>>> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/source/SourceFunction.
>>> html
>>>
>>>
>>>
>>> On the other hand, from the implementation of StreamSourceContexts and
>>> StreamTaskActionExecutor, it looks like that the SourceContext.collect and
>>> checkpointing processes are exclusive.
>>>
>>> I’m sorry if I misunderstood.
>>>
>>>
>>>
>>> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java#L106
>>>
>>>
>>> https://github.com/apache/flink/blob/release-1.11.3/flink-streaming-https://github.com/apache/flink/blob/release-1.11.3/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskActionExecutor.java#L91
>>>
>>>
>>>
>>> My Question is that is the synchronized block in SourceFunction
>>> necessary?
>>>
>>> And why is it necessary?
>>>
>>>
>>>
>>>
>>> Best regards,
>>>
>>> --
>>> Kazunori Shinhira
>>> Mail : k.shinhira.1...@gmail.com
>>>
>>
>
> --
> Kazunori Shinhira
> Mail : k.shinhira.1...@gmail.com
>

Reply via email to