No problem :)

Piotrek

śr., 20 sty 2021 o 02:12 Kazunori Shinhira <k.shinhira.1...@gmail.com>
napisał(a):

> Hi,
>
>
> Thank you for your explanation.
>
> I now understand  the need for checkpoint lock :)
>
>
>
> Best,
>
> 2021年1月19日(火) 18:00 Piotr Nowojski <pnowoj...@apache.org>:
>
>> Hi,
>>
>> yes exactly :)
>>
>> > As a result, Source may save wrong offset and lost record if job
>> recreation occurs at that timing.
>>
>> This is just one of the possible race conditions that could happen. As
>> offsets are probably 64 bit integers, I'm pretty sure corrupted
>> writes/reads can also happen, when only half of the bits (for example lower
>> 32bits) were updated before checkpointing.
>>
>> Best,
>> Piotrek
>>
>> wt., 19 sty 2021 o 02:22 Kazunori Shinhira <k.shinhira.1...@gmail.com>
>> napisał(a):
>>
>>> Hi Piotrek,
>>>
>>>
>>>
>>> Thank you for your reply.
>>>
>>>
>>> I understood that synchronization with checkpoint lock is needed to make
>>> state modification and checkpointing exclusive.
>>>
>>> In my understanding, for example, in implementation of SourceFunction
>>> for Kafka, it is necessary to enclose the process of acquiring records and
>>> updating current offset, that is the state of kafka SourceFunction, within
>>> synchronized code.
>>>
>>> If we don't use synchronized block, `StreamOperator#snapshotState` will
>>> be called concurrently with state modification.
>>>
>>> As a result, Source may save wrong offset and lost record if job
>>> recreation occurs at that timing.
>>>
>>> Is my understanding correct ?
>>>
>>>
>>> Thank you for the information on the new Source interface.
>>>
>>> I’ll look into how to implement it.
>>>
>>>
>>>
>>> Best,
>>>
>>> 2021年1月18日(月) 23:45 Piotr Nowojski <pnowoj...@apache.org>:
>>>
>>>> Hi Kazunori,
>>>>
>>>> The checkpoint lock is acquired preemptively inside the
>>>> SourceContext#collect call for the cases if the source is state less.
>>>> However this is not enough if you are implementing a stateful
>>>> `SourceFunction`, since if you are modifying state in your source function
>>>> outside of the checkpoint lock scope, those updates would be happening
>>>> concurrently to the
>>>> `org.apache.flink.streaming.api.operators.StreamOperator#snapshotState`
>>>> call and bad things would happen. `StreamOperator#snapshotState` is one
>>>> example of actions that are being executed in the
>>>> `StreamTaskActionExecutor`. `StreamTaskActionExecutor` depending on the
>>>> execution context, if it's happening in a source task or not, will be or
>>>> will not be acquiring the checkpoint lock.
>>>>
>>>> Also please note that `SourceFunction` is currently (as of Flink
>>>> 1.12.0) being phased out, in favor of the new
>>>> `org.apache.flink.api.connector.source.Source` interface. Amongst other
>>>> improvements, this newer interface has a single thread execution model,
>>>> without a dedicated thread to run the source code, so there is no need for
>>>> the checkpoint lock.
>>>>
>>>> Best,
>>>> Piotrek
>>>>
>>>> pon., 18 sty 2021 o 13:05 Kazunori Shinhira <k.shinhira.1...@gmail.com>
>>>> napisał(a):
>>>>
>>>>> Hi,
>>>>>
>>>>>
>>>>> I have a question about implementing method of SourceFunction.
>>>>>
>>>>> I'm trying to implement my own SourceFunction using Flink 1.11.3.
>>>>>
>>>>> The following javadoc says that SourceFunction which implements
>>>>> CheckpointedFunction should use synchronized block to perform 
>>>>> checkpointing
>>>>> and emission of elements atomically.
>>>>>
>>>>>
>>>>> See
>>>>> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/source/SourceFunction.
>>>>> html
>>>>>
>>>>>
>>>>>
>>>>> On the other hand, from the implementation of StreamSourceContexts and
>>>>> StreamTaskActionExecutor, it looks like that the SourceContext.collect and
>>>>> checkpointing processes are exclusive.
>>>>>
>>>>> I’m sorry if I misunderstood.
>>>>>
>>>>>
>>>>>
>>>>> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java#L106
>>>>>
>>>>>
>>>>> https://github.com/apache/flink/blob/release-1.11.3/flink-streaming-https://github.com/apache/flink/blob/release-1.11.3/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskActionExecutor.java#L91
>>>>>
>>>>>
>>>>>
>>>>> My Question is that is the synchronized block in SourceFunction
>>>>> necessary?
>>>>>
>>>>> And why is it necessary?
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> Best regards,
>>>>>
>>>>> --
>>>>> Kazunori Shinhira
>>>>> Mail : k.shinhira.1...@gmail.com
>>>>>
>>>>
>>>
>>> --
>>> Kazunori Shinhira
>>> Mail : k.shinhira.1...@gmail.com
>>>
>>
>
> --
> Kazunori Shinhira
> Mail : k.shinhira.1...@gmail.com
>

Reply via email to