Hi, Oscar. Yes, your are right.
If starting from a checkpoint or savepoint, kafka connector will always use the offset stored in the states. If starting without a checkpoint or savepoint, kafka connector will use the specific startup mode. Best, Hang Oscar Perez via user <user@flink.apache.org> 于2023年7月4日周二 20:54写道: > Hei, > Ok, thanks. so if I understand this correctly the difference between > OffsetInitializer.earliest and commitedOffset(OffsetResetStrategy.EARLIEST) > will be in the case that there is no flink state. In this case, earliest > will not check kafka committed offset and start from earliest while in the > latter will use the committed offset from kafka if there is any, is that > right? In either case if the committed offset is in flink state that will > take precedence and will be used in either case right? > > Thanks, > Oscar > > On Tue, 4 Jul 2023 at 02:56, Mason Chen <mas.chen6...@gmail.com> wrote: > >> Hi Oscar, >> >> You are correct about the OffsetInitializer being only effective when >> there is no Flink state--in addition, if you have partition discovery on, >> this initializer will be reused for the new partitions (i.e. splits) >> discovered. Assuming the job is continuing from the offset in Flink state, >> there is no difference between the two strategies. This is because the >> `auto.offset.reset` maps to the `OffsetResetStrategy` and >> OffsetInitializer.earliest uses `earliest` too. >> >> Best, >> Mason >> >> On Mon, Jul 3, 2023 at 6:56 AM Oscar Perez via user < >> user@flink.apache.org> wrote: >> >>> Hei, >>> >>> Looking at the flink documentation for kafkasource I see the following >>> values for starting offset: >>> >>> OffsetInitializer.earliest >>> OffsetInitializer.latest >>> OffsetInitializer.commitedOffset(OffsetResetStrategy.EARLIEST) >>> >>> From what I understand OffsetInitializer.earliest uses earliest offset >>> the first time but later deployments will use the committed offset in the >>> flink state to resume from there. If that is the case what is the >>> difference between OffsetInitializer.earliest and >>> commitedOffset(OffsetResetStrategy.EARLIEST) if both continue from the >>> committed offset after redeployment? >>> >>> Thanks! >>> Oscar >>> >>