Hi, Oscar.

Yes, your are right.

If starting from a checkpoint or savepoint, kafka connector will always use
the offset stored in the states.
If starting without a checkpoint or savepoint, kafka connector will use the
specific startup mode.

Best,
Hang

Oscar Perez via user <user@flink.apache.org> 于2023年7月4日周二 20:54写道:

> Hei,
> Ok, thanks. so if I understand this correctly the difference between
> OffsetInitializer.earliest and commitedOffset(OffsetResetStrategy.EARLIEST)
> will be in the case that there is no flink state. In this case, earliest
> will not check kafka committed offset and start from earliest while in the
> latter will use the committed offset from kafka if there is any, is that
> right? In either case if the committed offset is in flink state that will
> take precedence and will be used in either case right?
>
> Thanks,
> Oscar
>
> On Tue, 4 Jul 2023 at 02:56, Mason Chen <mas.chen6...@gmail.com> wrote:
>
>> Hi Oscar,
>>
>> You are correct about the OffsetInitializer being only effective when
>> there is no Flink state--in addition, if you have partition discovery on,
>> this initializer will be reused for the new partitions (i.e. splits)
>> discovered. Assuming the job is continuing from the offset in Flink state,
>> there is no difference between the two strategies. This is because the
>> `auto.offset.reset` maps to the `OffsetResetStrategy` and
>> OffsetInitializer.earliest uses `earliest` too.
>>
>> Best,
>> Mason
>>
>> On Mon, Jul 3, 2023 at 6:56 AM Oscar Perez via user <
>> user@flink.apache.org> wrote:
>>
>>> Hei,
>>>
>>> Looking at the flink documentation for kafkasource I see the following
>>> values for starting offset:
>>>
>>> OffsetInitializer.earliest
>>> OffsetInitializer.latest
>>> OffsetInitializer.commitedOffset(OffsetResetStrategy.EARLIEST)
>>>
>>> From what I understand OffsetInitializer.earliest uses earliest offset
>>> the first time but later deployments will use the committed offset in the
>>> flink state to resume from there. If that is the case what is the
>>> difference between OffsetInitializer.earliest and
>>> commitedOffset(OffsetResetStrategy.EARLIEST) if both continue from the
>>> committed offset after redeployment?
>>>
>>> Thanks!
>>> Oscar
>>>
>>

Reply via email to