Re: Why do we need to use synchrnized(checkpointLock) in SourceFunction.run ?

2021-01-19 Thread Piotr Nowojski
No problem :) Piotrek śr., 20 sty 2021 o 02:12 Kazunori Shinhira napisał(a): > Hi, > > > Thank you for your explanation. > > I now understand the need for checkpoint lock :) > > > > Best, > > 2021年1月19日(火) 18:00 Piotr Nowojski : > >> Hi, >> >> yes exactly :) >> >> > As a result, Source may sav

Re: Why do we need to use synchrnized(checkpointLock) in SourceFunction.run ?

2021-01-19 Thread Kazunori Shinhira
Hi, Thank you for your explanation. I now understand the need for checkpoint lock :) Best, 2021年1月19日(火) 18:00 Piotr Nowojski : > Hi, > > yes exactly :) > > > As a result, Source may save wrong offset and lost record if job > recreation occurs at that timing. > > This is just one of the po

Re: Why do we need to use synchrnized(checkpointLock) in SourceFunction.run ?

2021-01-19 Thread Piotr Nowojski
Hi, yes exactly :) > As a result, Source may save wrong offset and lost record if job recreation occurs at that timing. This is just one of the possible race conditions that could happen. As offsets are probably 64 bit integers, I'm pretty sure corrupted writes/reads can also happen, when only h

Re: Why do we need to use synchrnized(checkpointLock) in SourceFunction.run ?

2021-01-18 Thread Kazunori Shinhira
Hi Piotrek, Thank you for your reply. I understood that synchronization with checkpoint lock is needed to make state modification and checkpointing exclusive. In my understanding, for example, in implementation of SourceFunction for Kafka, it is necessary to enclose the process of acquiring r

Re: Why do we need to use synchrnized(checkpointLock) in SourceFunction.run ?

2021-01-18 Thread Piotr Nowojski
Hi Kazunori, The checkpoint lock is acquired preemptively inside the SourceContext#collect call for the cases if the source is state less. However this is not enough if you are implementing a stateful `SourceFunction`, since if you are modifying state in your source function outside of the checkpo

Why do we need to use synchrnized(checkpointLock) in SourceFunction.run ?

2021-01-18 Thread Kazunori Shinhira
Hi, I have a question about implementing method of SourceFunction. I'm trying to implement my own SourceFunction using Flink 1.11.3. The following javadoc says that SourceFunction which implements CheckpointedFunction should use synchronized block to perform checkpointing and emission of elemen