Thanks for the proposal, Qingsheng.

+1 to enable auto partition discovery by default. Just a reminder, we need
a FLIP for this.

A bit more background on this.

Most of the Kafka users simply subscribe to a topic and let the consumer to
automatically adapt to partition changes. So enabling auto partition
discovery would align with that experience. The counter argument last time
when I proposed to enable auto partition discovery was mainly due to the
concern from the Flink users. There were arguments that sometimes users
don't want the partition changes to get automatically picked up, but want
to do this by restarting the job manually so they can avoid unnoticed
changes in the jobs.

Given that in the old Flink source, by default the auto partition discovery
was disabled, and there are use cases from both sides, we simply kept the
behavior unchanged. From the discussion we have here, it looks like
enabling auto partition discovery is much preferred. So I think we should
do it.

I am not worried about the performance. The new Kafka source will only have
the SplitEnumerator sending metadata requests when the feature is enabled.
It is actually much cheaper than the old Kafka source where every
subtask does that.

Thanks,

Jiangjie (Becket) Qin



On Sat, Jan 14, 2023 at 11:46 AM Yun Tang <myas...@live.com> wrote:

> +1 for this proposal and thanks Qingsheng for driving this.
>
> Considering the interval, we also set the value as 5min, equivalent to the
> default value of metadata.max.age.ms.
>
>
> Best
> Yun Tang
> ________________________________
> From: Benchao Li <libenc...@apache.org>
> Sent: Friday, January 13, 2023 23:06
> To: dev@flink.apache.org <dev@flink.apache.org>
> Subject: Re: [DISCUSS] Enabling dynamic partition discovery by default in
> Kafka source
>
> +1, we've enabled this by default (10mins) in our production for years.
>
> Jing Ge <j...@ververica.com.invalid> 于2023年1月13日周五 22:22写道:
>
> > +1 for the proposal that makes users' daily work easier and therefore
> makes
> > Flink more attractive.
> >
> > Best regards,
> > Jing
> >
> >
> > On Fri, Jan 13, 2023 at 11:27 AM Qingsheng Ren <re...@apache.org> wrote:
> >
> > > Thanks everyone for joining the discussion!
> > >
> > > @Martijn:
> > >
> > > > All newly discovered partitions will be consumed from the earliest
> > offset
> > > possible.
> > >
> > > Thanks for the reminder! I checked the logic of KafkaSource and found
> > that
> > > new partitions will start from the offset initializer specified by the
> > user
> > > instead of the earliest. We need to correct this behavior to avoid
> > dropping
> > > messages from new partitions.
> > >
> > > > Job restarts from checkpoint
> > >
> > > I think the current logic guarantees the exactly-once semantic. New
> > > partitions created after the checkpoint will be re-discovered again and
> > > picked up by the source.
> > >
> > > @John:
> > >
> > > > If you want to be a little conservative with the default, 5 minutes
> > might
> > > be better than 30 seconds.
> > >
> > > Thanks for the suggestion! I tried to find the equivalent config in
> Kafka
> > > but missed it. It would be neat to align with the default value of "
> > > metadata.max.age.ms".
> > >
> > > @Gabor:
> > >
> > > > removed partition handling is not yet added
> > >
> > > There was a detailed discussion about removing partitions [1] but it
> > looks
> > > like this is not an easy task considering the potential data loss and
> > state
> > > inconsistency. I'm afraid there's no clear plan on this one and maybe
> we
> > > could trigger a new discussion thread about how to correctly handle
> > removed
> > > partitions.
> > >
> > > [1] https://lists.apache.org/thread/7r4h7v5k281w9cnbfw9lb8tp56r30lwt
> > >
> > > Best regards,
> > > Qingsheng
> > >
> > >
> > > On Fri, Jan 13, 2023 at 4:33 PM Gabor Somogyi <
> gabor.g.somo...@gmail.com
> > >
> > > wrote:
> > >
> > > > +1 on the overall direction, it's an important feature.
> > > >
> > > > I've had a look on the latest master and looks like removed partition
> > > > handling is not yet added but I think this is essential.
> > > >
> > > >
> > > >
> > >
> >
> https://github.com/apache/flink/blob/28c3e1a3923ba560b559a216985c1abeb794ebaa/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java#L305
> > > >
> > > > If a partition all of a sudden disappears then it could lead to data
> > > loss.
> > > > Are you planning to add it?
> > > > If yes then when?
> > > >
> > > > G
> > > >
> > > >
> > > > On Fri, Jan 13, 2023 at 9:22 AM John Roesler <vvcep...@apache.org>
> > > wrote:
> > > >
> > > > > Thanks for this proposal, Qingsheng!
> > > > >
> > > > > If you want to be a little conservative with the default, 5 minutes
> > > might
> > > > > be better than 30 seconds.
> > > > >
> > > > > The equivalent config in Kafka seems to be metadata.max.age.ms (
> > > > >
> > > >
> > >
> >
> https://kafka.apache.org/documentation/#consumerconfigs_metadata.max.age.ms
> > > > ),
> > > > > which has a default value of 5 minutes.
> > > > >
> > > > > Other than that, I’m in favor. I agree, this should be on by
> default.
> > > > >
> > > > > Thanks again,
> > > > > John
> > > > >
> > > > > On Fri, Jan 13, 2023, at 08:26, Leonard Xu wrote:
> > > > > > Thanks Qingsheng for driving this, enable the dynamic partition
> > > > > > discovery would be very useful for kafka topic scale partitions
> > > > > > scenarios.
> > > > > >
> > > > > > +1 for the change.
> > > > > >
> > > > > > CC: Becket
> > > > > >
> > > > > >
> > > > > > Best,
> > > > > > Leonard
> > > > > >
> > > > > >
> > > > > >
> > > > > >> On Jan 13, 2023, at 3:15 PM, Jark Wu <imj...@gmail.com> wrote:
> > > > > >>
> > > > > >> +1 for the change. I think this is beneficial for users and is
> > > > > compatible.
> > > > > >>
> > > > > >> Best,
> > > > > >> Jark
> > > > > >>
> > > > > >> On Fri, 13 Jan 2023 at 14:22, 何军 <xuehaijux...@gmail.com>
> wrote:
> > > > > >>
> > > > > >>>>
> > > > > >>>> +1 for this idea, we have enabled kafka dynamic partition
> > > discovery
> > > > in
> > > > > >>> all
> > > > > >>>> jobs.
> > > > > >>>>
> > > > > >>>>
> > > > > >>>
> > > > >
> > > >
> > >
> >
>
>
> --
>
> Best,
> Benchao Li
>

Reply via email to