Hi Shammon,

Thanks for your advise!  I learn a lot about TIMESTAMP/SPECIFIC_OFFSET.
That's interesting.

However, I have a different opinion.

If a user employs the SPECIFIC_OFFSET strategy and enables auto-discovery,
they will be able to find new partitions beyond the specified offset.
Otherwise, enabling auto-discovery is no sense.

When it comes to the TIMESTAMP strategy, it seems to be trivial. I
understand your concern, however, it’s the role of time window rather than
partition discovery. The TIMESTAMP strategy means that the consumer starts
from the first record whose timestamp is greater than or equal to a given
timestamp, rather than only consuming all records whose timestamp is
greater than or equal to the given timestamp. *Thus, even disable auto
discovery or discover new partitions with TIMESTAMP strategy, same problems
still occur.*

Above all , why use EARLIEST strategy? I believe that the strategy
specified by the startup should be the strategy at the moment of startup. *So
there is no difference between new partitions and new messages in old
partitions.* Therefore, all the new partition issues that you care about
will still appear even if you disable the partition, as new messages in old
partitions. If all new messages in old partitions should be consume, all
new messages in old partitions should also be consume.


Best,
Hongshun

On Thu, Mar 23, 2023 at 8:34 PM Shammon FY <zjur...@gmail.com> wrote:

> Hi Hongshun
>
> Thanks for driving this discussion. Automatically discovering partitions
> without losing data sounds great!
>
> Currently flink supports kafka source with different startup modes, such as
> EARLIEST, LATEST, TIMESTAMP, SPECIFIC_OFFSETS and GROUP_OFFSET.
>
> If I understand correctly, you will set the offset of new partitions with
> EARLIEST? Please correct me if I'm wrong, I think the EARLIEST startup mode
> for new partitions is not suitable if users set TIMESTAMP/SPECIFIC_OFFSET
> for kafka in their jobs.
>
> For an extreme example, the current time is 2023-03-23 15:00:00 and users
> set the TIMESTAMP with 2023-03-23 16:00:00 for their jobs. If a partition
> is added during this period, jobs will generate “surprising” data. What do
> you think of it?
>
>
> Best,
> Shammon FY
>
>
> On Tue, Mar 21, 2023 at 6:58 PM Hongshun Wang <loserwang1...@gmail.com>
> wrote:
>
> > Hi, Hang,
> >
> > Thanks for your advice.
> >
> > When the second case will occur? Currently, there are three ways to
> specify
> > partitions in Kafka: by topic, by partition, and by matching the topic
> with
> > a regular expression. Currently, if the initial partition number is 0, an
> > error will occur for the first two methods. However, when using a regular
> > expression to match topics, it is allowed to have 0 matched topics.
> >
> > > I don't know when the second case will occur
> >
> >
> > Why prefer the field `firstDiscoveryDone`? When a regular expression
> > initially matches 0 topics, it should consume all messages of the new
> > topic. If unassignedInitialPartitons and unassignedTopLevelPartitions are
> > used instead of firstDiscoveryDone, any new topics created during (5
> > minutes discovery + job restart time) will be treated as the first
> > discovery, causing data loss.
> >
> > > Then when will we get the empty partition list? I think it should be
> > treated as the first initial discovery if both
> `unassignedInitialPartitons`
> > and `assignedPartitons` are empty without `firstDiscoveryDone`.
> >
> > Best
> >
> > Hongshun
> >
> > On Tue, Mar 21, 2023 at 5:56 PM Hang Ruan <ruanhang1...@gmail.com>
> wrote:
> >
> > > Hi, Hongshun,
> > >
> > > Thank you for starting this discussion.  I have some problems about the
> > > field `firstDiscoveryDone`.
> > >
> > > In the FLIP, why we need firstDiscoveryDone is as follows.
> > > > Why do we need firstDiscoveryDone? Only relying on the
> > > unAssignedInitialPartitons attribute cannot distinguish between the
> > > following two cases (which often occur in pattern mode):
> > > > The first partition discovery is so slow, before which the checkpoint
> > is
> > > executed and then job is restarted . At this time, the restored
> > > unAssignedInitialPartitons is an empty set, which means non-discovery.
> > The
> > > next discovery will be treated as first discovery.
> > > > The first time the partition is discovered is empty, and new
> partitions
> > > can only be found after multiple partition discoveries. If a restart
> > occurs
> > > between this period, the restored unAssignedInitialPartitons is also an
> > > empty set, which means empty-discovery.The next discovery will be
> treated
> > > as new discovery.
> > >
> > > I don't know when the second case will occur. The partitions must be
> > > greater than 0 when creating topics. And I have read this note in the
> > FLIP.
> > > > Note: The current design only applies to cases where all existing
> > > partitions can be discovered at once. If all old partitions cannot be
> > > discovered at once, the subsequent old partitions discovered will be
> > > treated as new partitions, leading to message duplication. Therefore,
> > this
> > > point needs to be particularly noted.
> > >
> > > Then when will we get the empty partition list? I think it should be
> > > treated as the first initial discovery if both
> > `unassignedInitialPartitons`
> > > and `assignedPartitons` are empty without `firstDiscoveryDone`.
> > >
> > > Besides that, I think the `unAssignedInitialPartitons` is better to be
> > > named `unassignedInitialPartitons`.
> > >
> > > Best,
> > > Hang
> > >
> > > Hongshun Wang <loserwang1...@gmail.com> 于2023年3月17日周五 18:42写道:
> > >
> > > > Hi everyone,
> > > >
> > > > I would like to start a discussion on FLIP-288:Enable Dynamic
> Partition
> > > > Discovery by Default in Kafka Source[1].
> > > >
> > > > As described in mail thread[2], dynamic partition discovery is
> disabled
> > > by
> > > > default and users have to explicitly specify the interval of
> discovery
> > in
> > > > order to turn it on. Besides, if the initial offset strategy is
> LATEST,
> > > > same strategy is used for new partitions, leading to the loss of some
> > > data
> > > > (thinking a new partition is created and might be discovered by Kafka
> > > > source several minutes later, and the message produced into the
> > partition
> > > > within the gap might be dropped if we use for example "latest" as the
> > > > initial offset strategy.)
> > > >
> > > > The goals of this FLIP are as follows:
> > > >
> > > >    1. Enable partition discovery by default.
> > > >    2. Use earliest as the offset strategy for new partitions after
> the
> > > >    first discovery.
> > > >
> > > > Looking forward to hearing from you.
> > > >
> > > >
> > > > [1]
> > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-288%3A+Enable+Dynamic+Partition+Discovery+by+Default+in+Kafka+Source
> > > >
> > > > [2]  <
> https://lists.apache.org/thread/d7zy46gj3sw0zwzq2rj3fmc0hx8ojtln
> > >
> > > > https://lists.apache.org/thread/d7zy46gj3sw0zwzq2rj3fmc0hx8ojtln
> > > >
> > > >
> > > > Best,
> > > >
> > > > Hongshun
> > > >
> > >
> >
>

Reply via email to