Hi Hongshun

Thanks for driving this discussion. Automatically discovering partitions
without losing data sounds great!

Currently flink supports kafka source with different startup modes, such as
EARLIEST, LATEST, TIMESTAMP, SPECIFIC_OFFSETS and GROUP_OFFSET.

If I understand correctly, you will set the offset of new partitions with
EARLIEST? Please correct me if I'm wrong, I think the EARLIEST startup mode
for new partitions is not suitable if users set TIMESTAMP/SPECIFIC_OFFSET
for kafka in their jobs.

For an extreme example, the current time is 2023-03-23 15:00:00 and users
set the TIMESTAMP with 2023-03-23 16:00:00 for their jobs. If a partition
is added during this period, jobs will generate “surprising” data. What do
you think of it?


Best,
Shammon FY


On Tue, Mar 21, 2023 at 6:58 PM Hongshun Wang <loserwang1...@gmail.com>
wrote:

> Hi, Hang,
>
> Thanks for your advice.
>
> When the second case will occur? Currently, there are three ways to specify
> partitions in Kafka: by topic, by partition, and by matching the topic with
> a regular expression. Currently, if the initial partition number is 0, an
> error will occur for the first two methods. However, when using a regular
> expression to match topics, it is allowed to have 0 matched topics.
>
> > I don't know when the second case will occur
>
>
> Why prefer the field `firstDiscoveryDone`? When a regular expression
> initially matches 0 topics, it should consume all messages of the new
> topic. If unassignedInitialPartitons and unassignedTopLevelPartitions are
> used instead of firstDiscoveryDone, any new topics created during (5
> minutes discovery + job restart time) will be treated as the first
> discovery, causing data loss.
>
> > Then when will we get the empty partition list? I think it should be
> treated as the first initial discovery if both `unassignedInitialPartitons`
> and `assignedPartitons` are empty without `firstDiscoveryDone`.
>
> Best
>
> Hongshun
>
> On Tue, Mar 21, 2023 at 5:56 PM Hang Ruan <ruanhang1...@gmail.com> wrote:
>
> > Hi, Hongshun,
> >
> > Thank you for starting this discussion.  I have some problems about the
> > field `firstDiscoveryDone`.
> >
> > In the FLIP, why we need firstDiscoveryDone is as follows.
> > > Why do we need firstDiscoveryDone? Only relying on the
> > unAssignedInitialPartitons attribute cannot distinguish between the
> > following two cases (which often occur in pattern mode):
> > > The first partition discovery is so slow, before which the checkpoint
> is
> > executed and then job is restarted . At this time, the restored
> > unAssignedInitialPartitons is an empty set, which means non-discovery.
> The
> > next discovery will be treated as first discovery.
> > > The first time the partition is discovered is empty, and new partitions
> > can only be found after multiple partition discoveries. If a restart
> occurs
> > between this period, the restored unAssignedInitialPartitons is also an
> > empty set, which means empty-discovery.The next discovery will be treated
> > as new discovery.
> >
> > I don't know when the second case will occur. The partitions must be
> > greater than 0 when creating topics. And I have read this note in the
> FLIP.
> > > Note: The current design only applies to cases where all existing
> > partitions can be discovered at once. If all old partitions cannot be
> > discovered at once, the subsequent old partitions discovered will be
> > treated as new partitions, leading to message duplication. Therefore,
> this
> > point needs to be particularly noted.
> >
> > Then when will we get the empty partition list? I think it should be
> > treated as the first initial discovery if both
> `unassignedInitialPartitons`
> > and `assignedPartitons` are empty without `firstDiscoveryDone`.
> >
> > Besides that, I think the `unAssignedInitialPartitons` is better to be
> > named `unassignedInitialPartitons`.
> >
> > Best,
> > Hang
> >
> > Hongshun Wang <loserwang1...@gmail.com> 于2023年3月17日周五 18:42写道:
> >
> > > Hi everyone,
> > >
> > > I would like to start a discussion on FLIP-288:Enable Dynamic Partition
> > > Discovery by Default in Kafka Source[1].
> > >
> > > As described in mail thread[2], dynamic partition discovery is disabled
> > by
> > > default and users have to explicitly specify the interval of discovery
> in
> > > order to turn it on. Besides, if the initial offset strategy is LATEST,
> > > same strategy is used for new partitions, leading to the loss of some
> > data
> > > (thinking a new partition is created and might be discovered by Kafka
> > > source several minutes later, and the message produced into the
> partition
> > > within the gap might be dropped if we use for example "latest" as the
> > > initial offset strategy.)
> > >
> > > The goals of this FLIP are as follows:
> > >
> > >    1. Enable partition discovery by default.
> > >    2. Use earliest as the offset strategy for new partitions after the
> > >    first discovery.
> > >
> > > Looking forward to hearing from you.
> > >
> > >
> > > [1]
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-288%3A+Enable+Dynamic+Partition+Discovery+by+Default+in+Kafka+Source
> > >
> > > [2]  <https://lists.apache.org/thread/d7zy46gj3sw0zwzq2rj3fmc0hx8ojtln
> >
> > > https://lists.apache.org/thread/d7zy46gj3sw0zwzq2rj3fmc0hx8ojtln
> > >
> > >
> > > Best,
> > >
> > > Hongshun
> > >
> >
>

Reply via email to