No problem :) Piotrek
śr., 20 sty 2021 o 02:12 Kazunori Shinhira <k.shinhira.1...@gmail.com> napisał(a): > Hi, > > > Thank you for your explanation. > > I now understand the need for checkpoint lock :) > > > > Best, > > 2021年1月19日(火) 18:00 Piotr Nowojski <pnowoj...@apache.org>: > >> Hi, >> >> yes exactly :) >> >> > As a result, Source may save wrong offset and lost record if job >> recreation occurs at that timing. >> >> This is just one of the possible race conditions that could happen. As >> offsets are probably 64 bit integers, I'm pretty sure corrupted >> writes/reads can also happen, when only half of the bits (for example lower >> 32bits) were updated before checkpointing. >> >> Best, >> Piotrek >> >> wt., 19 sty 2021 o 02:22 Kazunori Shinhira <k.shinhira.1...@gmail.com> >> napisał(a): >> >>> Hi Piotrek, >>> >>> >>> >>> Thank you for your reply. >>> >>> >>> I understood that synchronization with checkpoint lock is needed to make >>> state modification and checkpointing exclusive. >>> >>> In my understanding, for example, in implementation of SourceFunction >>> for Kafka, it is necessary to enclose the process of acquiring records and >>> updating current offset, that is the state of kafka SourceFunction, within >>> synchronized code. >>> >>> If we don't use synchronized block, `StreamOperator#snapshotState` will >>> be called concurrently with state modification. >>> >>> As a result, Source may save wrong offset and lost record if job >>> recreation occurs at that timing. >>> >>> Is my understanding correct ? >>> >>> >>> Thank you for the information on the new Source interface. >>> >>> I’ll look into how to implement it. >>> >>> >>> >>> Best, >>> >>> 2021年1月18日(月) 23:45 Piotr Nowojski <pnowoj...@apache.org>: >>> >>>> Hi Kazunori, >>>> >>>> The checkpoint lock is acquired preemptively inside the >>>> SourceContext#collect call for the cases if the source is state less. >>>> However this is not enough if you are implementing a stateful >>>> `SourceFunction`, since if you are modifying state in your source function >>>> outside of the checkpoint lock scope, those updates would be happening >>>> concurrently to the >>>> `org.apache.flink.streaming.api.operators.StreamOperator#snapshotState` >>>> call and bad things would happen. `StreamOperator#snapshotState` is one >>>> example of actions that are being executed in the >>>> `StreamTaskActionExecutor`. `StreamTaskActionExecutor` depending on the >>>> execution context, if it's happening in a source task or not, will be or >>>> will not be acquiring the checkpoint lock. >>>> >>>> Also please note that `SourceFunction` is currently (as of Flink >>>> 1.12.0) being phased out, in favor of the new >>>> `org.apache.flink.api.connector.source.Source` interface. Amongst other >>>> improvements, this newer interface has a single thread execution model, >>>> without a dedicated thread to run the source code, so there is no need for >>>> the checkpoint lock. >>>> >>>> Best, >>>> Piotrek >>>> >>>> pon., 18 sty 2021 o 13:05 Kazunori Shinhira <k.shinhira.1...@gmail.com> >>>> napisał(a): >>>> >>>>> Hi, >>>>> >>>>> >>>>> I have a question about implementing method of SourceFunction. >>>>> >>>>> I'm trying to implement my own SourceFunction using Flink 1.11.3. >>>>> >>>>> The following javadoc says that SourceFunction which implements >>>>> CheckpointedFunction should use synchronized block to perform >>>>> checkpointing >>>>> and emission of elements atomically. >>>>> >>>>> >>>>> See >>>>> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/source/SourceFunction. >>>>> html >>>>> >>>>> >>>>> >>>>> On the other hand, from the implementation of StreamSourceContexts and >>>>> StreamTaskActionExecutor, it looks like that the SourceContext.collect and >>>>> checkpointing processes are exclusive. >>>>> >>>>> I’m sorry if I misunderstood. >>>>> >>>>> >>>>> >>>>> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java#L106 >>>>> >>>>> >>>>> https://github.com/apache/flink/blob/release-1.11.3/flink-streaming-https://github.com/apache/flink/blob/release-1.11.3/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskActionExecutor.java#L91 >>>>> >>>>> >>>>> >>>>> My Question is that is the synchronized block in SourceFunction >>>>> necessary? >>>>> >>>>> And why is it necessary? >>>>> >>>>> >>>>> >>>>> >>>>> Best regards, >>>>> >>>>> -- >>>>> Kazunori Shinhira >>>>> Mail : k.shinhira.1...@gmail.com >>>>> >>>> >>> >>> -- >>> Kazunori Shinhira >>> Mail : k.shinhira.1...@gmail.com >>> >> > > -- > Kazunori Shinhira > Mail : k.shinhira.1...@gmail.com >