Hi Hongshun

Thanks for your explanation, I have got your point. I review the FLIP again
and only have one minor comment which won't block this FLIP: should we need
in `OffsetsInitializer newDiscoveryOffsetsInitializer` in the constructor
of `KafkaSourceEnumerator`?  I think we can remove it if we always use
EARLIEST for new discovery partitions.

Best,
Shammon FY

On Tue, Apr 18, 2023 at 4:59 PM Hongshun Wang <loserwang1...@gmail.com>
wrote:

> Hi Shammon,
>
> Thank you for your advice.I have carefully considered whether to show this
> in SQL DDL. Therefore, I carefully studied whether it is feasible Recently
>
> However,  after reading the corresponding code more thoroughly, it appears
> that SpecifiedOffsetsInitializer and TimestampOffsetsInitializer do not
> work as we initially thought. Finally, I have decided to only use
> "EARLIEST" instead of allowing the user to make a free choice.
>
> Now, let me show my new understanding.
>
> The actual work of SpecifiedOffsetsInitializer and
> TimestampOffsetsInitializer:
>
>
>    - *SpecifiedOffsetsInitializer*: Use *Specified offset* for specified
>    partitions while use *EARLIEST* for unspecified partitions. Specified
>    partitions offset should be less than the latest offset, otherwise it
> will
>    start from the *EARLIEST*.
>    - *TimestampOffsetsInitializer*: Initialize the offsets based on a
>    timestamp. If the message meeting the requirement of the timestamp have
> not
>    been produced to Kafka yet, just use the *LATEST* offset.
>
> So, some problems will occur when new partition use
> SpecifiedOffsetsInitializer or TimestampOffsetsInitializer. You can find
> more information in the "Rejected Alternatives" section of Flip-288, which
> includes details of the code and process of deductive reasoning.
> All these problems can be reproducible in the current version. The reason
> why they haven't been exposed is probably because users usually set the
> existing specified offset or timestamp, so it appears as earliest in
> production.
>
> WDYT?
> CC:Ruan, Shammon, Gordon, Leonard and Qingsheng.
>
> Yours
>
> Hongshun
>
>
>
>
> On Fri, Apr 14, 2023 at 5:48 PM Shammon FY <zjur...@gmail.com> wrote:
>
> > Hi Hongshun
> >
> > Thanks for updating the FLIP, it totally sounds good to me.
> >
> > I just have one comment: How does sql job set new discovery offsets
> > initializer?
> > I found `DataStream` jobs can set different offsets initializers for new
> > discovery partitions in `KafkaSourceBuilder.setNewDiscoveryOffsets`. Do
> SQL
> > jobs need to support this feature?
> >
> > Best,
> > Shammon FY
> >
> > On Wed, Apr 12, 2023 at 2:27 PM Hongshun Wang <loserwang1...@gmail.com>
> > wrote:
> >
> > > Hi everyone,
> > >
> > > I have already modified FLIP-288 to provide a
> > > newDiscoveryOffsetsInitializer in the KafkaSourceBuilder and
> > > KafkaSourceEnumerator. Users can use
> > > KafkaSourceBuilder#setNewDiscoveryOffsets to change the strategy for
> new
> > > partitions.
> > >
> > > Surely, enabling the partition discovery strategy by default and
> > modifying
> > > the offset strategy for new partitions should be brought to the user's
> > > attention. Therefore, it will be explained in the 1.18 release notes.
> > >
> > > WDYT?CC, Ruan, Shammon, Gordon and Leonard.
> > >
> > >
> > > Best,
> > >
> > > Hongshun
> > >
> > > On Fri, Mar 31, 2023 at 2:56 PM Hongshun Wang <loserwang1...@gmail.com
> >
> > > wrote:
> > >
> > > > Hi everyone,
> > > > Thanks for your participation.
> > > >
> > > > @Gordon, I looked at the several questions you raised:
> > > >
> > > >    1. Should we use the firstDiscovery flag or two separate
> > > >    OffsetsInitializers? Actually, I have considered later. If we
> follow
> > > >    my initial idea, we can provide a default earliest
> > OffsetsInitializer
> > > >    for a new partition. However, According to @Shammon's suggestion,
> > > different
> > > >    startup OffsetsInitializers correspond to different post-startup
> > > >    OffsetsInitializers for Flink's built-in offset strategies.
> > > >    2. "Future-time" TIMESTAMP OffsetsInitializer. I looked at the
> code
> > > >    again, and it seems that neither @Shammon nor I have figured out .
> > > >    TimestampOffsetsInitializer#getPartitionOffsets has a comment:
> > "First
> > > >    get the current end offsets of the partitions. This is going to be
> > > used in
> > > >    case we cannot find a suitable offset based on the timestamp,
> i.e.,
> > > the
> > > >    message meeting the requirement of the timestamp has not been
> > > produced to
> > > >    Kafka yet. *In this case, we just use the latest offset*."
> > Therefore,
> > > >    using the TimestampOffsetsInitializer will always have an offset
> at
> > > >    startup.
> > > >    3. Clarification on coupling SPECIFIC-OFFSET startup with
> > > >    SPECIFIC-OFFSET post-startup. SPECIFIC-OFFSET strategy already
> uses
> > > >    "auto.offset.reset" position for partitions that are not hit.
> > > >
> > > > @Gordon, @Shammon, @Leonard, the core issue we are concerned about is
> > > > whether the offset specified at the beginning includes non-exist
> > > > partitions. The previous design may have SPECIFIC-OFFSET startup with
> > > > future partition. However, I think since different strategies have
> been
> > > > used for the first discovered partition and the later discovered
> > > partition,
> > > > the specified offset at startup should be the partitions that have
> been
> > > > confirmed to exist, if not an error will be thrown. If partitions
> still
> > > not
> > > > exist, it should be specified in the post-startup OffsetsInitializers
> > > > (default EARLIEST).
> > > >
> > > > Best
> > > > Hongshun
> > > >
> > > >
> > > > On Thu, Mar 30, 2023 at 1:43 PM Shammon FY <zjur...@gmail.com>
> wrote:
> > > >
> > > >> Thanks Gordon and Leonard
> > > >>
> > > >> I'm sorry that there is no specific case from my side, but I
> consider
> > > the
> > > >> issue as follows
> > > >>
> > > >> 1. Users may set an offset later than the current time because Flink
> > > does
> > > >> not limit it
> > > >> 2. If we use EARLIEST for a newly discovered partition with
> different
> > > >> OFFSETs, which may be different from the previous strategy. I think
> > it's
> > > >> best to keep the same strategy as before if it does not cause data
> > > losing
> > > >> 3. I think support different OFFSETs in the FLIP will not make the
> > > >> implementation more complexity
> > > >>
> > > >> Of course, if it is confirmed that this is an illegal Timestamp
> OFFSET
> > > and
> > > >> Flink validate it. Then we can use the same strategy to apply to the
> > > newly
> > > >> discovered partition, I think this will be nice too
> > > >>
> > > >> Best,
> > > >> Shammon FY
> > > >>
> > > >>
> > > >> On Thu, Mar 30, 2023 at 12:29 PM Leonard Xu <xbjt...@gmail.com>
> > wrote:
> > > >>
> > > >> > Thanks Hongshun and Shammon for driving the FLIP!
> > > >> >
> > > >> >
> > > >> > > *2. Clarification on "future-time" TIMESTAMP OffsetsInitializer*
> > > >> > > *3. Clarification on coupling SPECIFIC-OFFSET startup with
> > > >> > SPECIFIC-OFFSET
> > > >> > > post-startup*
> > > >> >
> > > >> > Grodan raised a good point about the future TIMESTAMP and
> > > >> SPECIFIC-OFFSET,
> > > >> > the timestamps/offset of the newly added partition is undetermined
> > > when
> > > >> the
> > > >> > job starts (the partition has not been created yet), and it is the
> > > >> > timestamps/offset in the future.
> > > >> >
> > > >> >  I used many message queue systems like Kafka, Pulsar, xxMQ. In my
> > > past
> > > >> > experience,  TIMESTAMP and SPECIFIC-OFFSET startup modes are
> usually
> > > >> used
> > > >> > to specify existing timestamps/offset, which are used for business
> > > >> > scenarios such as backfilling data and re-refreshing data. At
> > present,
> > > >> It's
> > > >> > hard to imagine a user scenario specifying a future timestamp to
> > > filter
> > > >> > data in the current topic of message queue system. Is it
> > overthinking
> > > to
> > > >> > consider future  future TIMESTAMP and SPECIFIC-OFFSET?
> > > >> >
> > > >> >
> > > >> > Best,
> > > >> > Leonard
> > > >>
> > > >
> > >
> >
>

Reply via email to