re: @Mason about a pluggable error handling strategy +1 to that, we've had users ask for that in the past for other connectors as well, e.g. Elasticsearch. As long as the exposure doesn't lend itself to potentially breaking processing semantics, I don't see why not!
On Fri, Feb 10, 2023 at 1:54 PM Tzu-Li (Gordon) Tai <tzuli...@apache.org> wrote: > re: @Mason about TimestampOffsetInitializer default offset reset strategy > > Should the OffsetsInitializer even be respected for partitions from new > discoveries? By "new discoveries", I mean partitions that are discovered > outside of the initial metadata pull on job startup. > > I had the impression that we should be differentiating between 1) initial > discovery after job startup without restore, and 2) discovery of restored > partitions written in Flink state at job restore time, and 3) dynamic > discoveries post restore / startup phases. > > For each case, the suitable strategy for initializing offset and resetting > offset in case of out-of-range, would be different: > For case 1), we should respect the user-configured OffsetInitializer > strategy for initializing. And, it makes sense that the default reset > strategy is LATEST to be aligned with Kafka's default. > For case 2), obviously we always initialize partition offsets based on the > checkpointed offset. It's probably debatable whether the reset strategy > should be EARLIEST or LATEST, but by intuition it seems to be EARLIEST to > avoid the least amount of data loss. > For case 3), the initialize strategy should always be read from EARLIEST, > given the nature of delays in a poll-based periodic discovery. I don't > think offset resets should ever occur here, as we wouldn't ever bump into > an out-of-range situation here. > > TLDR, the user-configured OffsetInitializer should only be respected when > starting a fresh job without checkpoints, and for the first set of > partitions that already exist at startup time. > In all other scenarios, we required dedicated ways to handle them which we > probably shouldn't expose to the user for configuring. > > On Fri, Feb 10, 2023 at 1:34 PM Tzu-Li (Gordon) Tai <tzuli...@apache.org> > wrote: > >> Joined the discussion a bit late, but just want to add in my +1 as well >> :-) >> >> Historically, when dynamic partition discovery was implemented in the >> earlier versions of the FlinkKafkaConsumer, it was implemented such that >> multiple source subtasks would in parallel query Kafka brokers for >> topic/partition metadata at the configured discovery interval. There were >> concerns about this at a larger scale, hence the feature was disabled by >> default. >> >> I don't see any reason why to not enable this by default for the latest >> implementations of the KafkaSourceEnumerator. >> >> That being said - this is essentially a breaking user-facing change in >> that it has functional side effects, but I don't see any way of introducing >> this without a breaking change either. >> >> I imagine that the group of users that are most likely to be caught by >> surprise are users who use regex topic pattern subscription, but did not >> enable partition discovery. We need to be diligent in documenting >> (including releasing blog posts) about this change. >> >> removed partition handling is not yet added >> >> >> I agree with @Qinsheng that this can be an orthogonal topic outside the >> scope of the planned changes here as it isn't straightforward. >> >> On Fri, Feb 10, 2023 at 12:56 PM Martijn Visser <martijnvis...@apache.org> >> wrote: >> >>> Oh and Mason, definitely interesting! :) >>> >>> On Fri, Feb 10, 2023 at 9:51 PM Martijn Visser <martijnvis...@apache.org >>> > >>> wrote: >>> >>> > @Qingsheng what are your next steps for this proposal? >>> > >>> > On Thu, Jan 19, 2023 at 9:14 AM Mason Chen <mas.chen6...@gmail.com> >>> wrote: >>> > >>> >> Hi all, >>> >> >>> >> Sorry to come into the discussion late--I saw the thread earlier. >>> >> >>> >> I'm also +1 for the change in general. I think most users have this >>> turned >>> >> on by default since the overhead is quite low. A default in the two >>> digit >>> >> seconds range works well for us. However, I do have two main concerns >>> that >>> >> are related, but don't necessarily block this FLIP: >>> >> >>> >> 1. Timestamp Offset Initializer >>> >> >>> >> Currently, the timestamp offset initializer defaults the offset reset >>> >> strategy to LATEST. This can present some problems if the discovery >>> >> interval is set too large since records from new partitions could be >>> >> skipped (the set timestamp is not found in Kafka, thus resetting to >>> the >>> >> latest). Here is a ticket to allow customizations: >>> >> https://issues.apache.org/jira/browse/FLINK-30200 (Qingsheng, you >>> might >>> >> remember this from a PR review). Thanks for mentioning this in your >>> FLIP! >>> >> >>> >> 2. AdminClient Fault Tolerance >>> >> >>> >> AdminClient, which is used for partition discovery, seems not to >>> handle >>> >> Kafka timeouts as robustly as the KafkaConsumer API, and we have >>> noticed >>> >> that transient network hiccups cause full job restarts (since the >>> >> jobmanager fails) in numerous incidents. Internally, we have >>> introduced an >>> >> error handling strategy based on the number of consecutive partition >>> >> discovery failures. I'm interested in opening a JIRA ticket to >>> contribute >>> >> this feature back to Flink and open making the error handling more >>> >> pluggable. What do you think? >>> >> >>> >> Best, >>> >> Mason >>> >> >>> >> On Sun, Jan 15, 2023 at 11:39 PM Qingsheng Ren <re...@apache.org> >>> wrote: >>> >> >>> >> > Thanks for the input Becket! >>> >> > >>> >> > I reorganized this proposal into FLIP-288 [1]. >>> >> > >>> >> > [1] >>> >> > >>> >> > >>> >> >>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-288%3A+Enable+Dynamic+Partition+Discovery+by+Default+in+Kafka+Source >>> >> > >>> >> > Best, >>> >> > Qingsheng >>> >> > >>> >> > On Sun, Jan 15, 2023 at 9:18 AM Becket Qin <becket....@gmail.com> >>> >> wrote: >>> >> > >>> >> > > Thanks for the proposal, Qingsheng. >>> >> > > >>> >> > > +1 to enable auto partition discovery by default. Just a >>> reminder, we >>> >> > need >>> >> > > a FLIP for this. >>> >> > > >>> >> > > A bit more background on this. >>> >> > > >>> >> > > Most of the Kafka users simply subscribe to a topic and let the >>> >> consumer >>> >> > to >>> >> > > automatically adapt to partition changes. So enabling auto >>> partition >>> >> > > discovery would align with that experience. The counter argument >>> last >>> >> > time >>> >> > > when I proposed to enable auto partition discovery was mainly due >>> to >>> >> the >>> >> > > concern from the Flink users. There were arguments that sometimes >>> >> users >>> >> > > don't want the partition changes to get automatically picked up, >>> but >>> >> want >>> >> > > to do this by restarting the job manually so they can avoid >>> unnoticed >>> >> > > changes in the jobs. >>> >> > > >>> >> > > Given that in the old Flink source, by default the auto partition >>> >> > discovery >>> >> > > was disabled, and there are use cases from both sides, we simply >>> kept >>> >> the >>> >> > > behavior unchanged. From the discussion we have here, it looks >>> like >>> >> > > enabling auto partition discovery is much preferred. So I think we >>> >> should >>> >> > > do it. >>> >> > > >>> >> > > I am not worried about the performance. The new Kafka source will >>> only >>> >> > have >>> >> > > the SplitEnumerator sending metadata requests when the feature is >>> >> > enabled. >>> >> > > It is actually much cheaper than the old Kafka source where every >>> >> > > subtask does that. >>> >> > > >>> >> > > Thanks, >>> >> > > >>> >> > > Jiangjie (Becket) Qin >>> >> > > >>> >> > > >>> >> > > >>> >> > > On Sat, Jan 14, 2023 at 11:46 AM Yun Tang <myas...@live.com> >>> wrote: >>> >> > > >>> >> > > > +1 for this proposal and thanks Qingsheng for driving this. >>> >> > > > >>> >> > > > Considering the interval, we also set the value as 5min, >>> equivalent >>> >> to >>> >> > > the >>> >> > > > default value of metadata.max.age.ms. >>> >> > > > >>> >> > > > >>> >> > > > Best >>> >> > > > Yun Tang >>> >> > > > ________________________________ >>> >> > > > From: Benchao Li <libenc...@apache.org> >>> >> > > > Sent: Friday, January 13, 2023 23:06 >>> >> > > > To: dev@flink.apache.org <dev@flink.apache.org> >>> >> > > > Subject: Re: [DISCUSS] Enabling dynamic partition discovery by >>> >> default >>> >> > in >>> >> > > > Kafka source >>> >> > > > >>> >> > > > +1, we've enabled this by default (10mins) in our production for >>> >> years. >>> >> > > > >>> >> > > > Jing Ge <j...@ververica.com.invalid> 于2023年1月13日周五 22:22写道: >>> >> > > > >>> >> > > > > +1 for the proposal that makes users' daily work easier and >>> >> therefore >>> >> > > > makes >>> >> > > > > Flink more attractive. >>> >> > > > > >>> >> > > > > Best regards, >>> >> > > > > Jing >>> >> > > > > >>> >> > > > > >>> >> > > > > On Fri, Jan 13, 2023 at 11:27 AM Qingsheng Ren < >>> re...@apache.org> >>> >> > > wrote: >>> >> > > > > >>> >> > > > > > Thanks everyone for joining the discussion! >>> >> > > > > > >>> >> > > > > > @Martijn: >>> >> > > > > > >>> >> > > > > > > All newly discovered partitions will be consumed from the >>> >> > earliest >>> >> > > > > offset >>> >> > > > > > possible. >>> >> > > > > > >>> >> > > > > > Thanks for the reminder! I checked the logic of KafkaSource >>> and >>> >> > found >>> >> > > > > that >>> >> > > > > > new partitions will start from the offset initializer >>> specified >>> >> by >>> >> > > the >>> >> > > > > user >>> >> > > > > > instead of the earliest. We need to correct this behavior to >>> >> avoid >>> >> > > > > dropping >>> >> > > > > > messages from new partitions. >>> >> > > > > > >>> >> > > > > > > Job restarts from checkpoint >>> >> > > > > > >>> >> > > > > > I think the current logic guarantees the exactly-once >>> semantic. >>> >> New >>> >> > > > > > partitions created after the checkpoint will be >>> re-discovered >>> >> again >>> >> > > and >>> >> > > > > > picked up by the source. >>> >> > > > > > >>> >> > > > > > @John: >>> >> > > > > > >>> >> > > > > > > If you want to be a little conservative with the default, >>> 5 >>> >> > minutes >>> >> > > > > might >>> >> > > > > > be better than 30 seconds. >>> >> > > > > > >>> >> > > > > > Thanks for the suggestion! I tried to find the equivalent >>> >> config in >>> >> > > > Kafka >>> >> > > > > > but missed it. It would be neat to align with the default >>> value >>> >> of >>> >> > " >>> >> > > > > > metadata.max.age.ms". >>> >> > > > > > >>> >> > > > > > @Gabor: >>> >> > > > > > >>> >> > > > > > > removed partition handling is not yet added >>> >> > > > > > >>> >> > > > > > There was a detailed discussion about removing partitions >>> [1] >>> >> but >>> >> > it >>> >> > > > > looks >>> >> > > > > > like this is not an easy task considering the potential data >>> >> loss >>> >> > and >>> >> > > > > state >>> >> > > > > > inconsistency. I'm afraid there's no clear plan on this one >>> and >>> >> > maybe >>> >> > > > we >>> >> > > > > > could trigger a new discussion thread about how to correctly >>> >> handle >>> >> > > > > removed >>> >> > > > > > partitions. >>> >> > > > > > >>> >> > > > > > [1] >>> >> > https://lists.apache.org/thread/7r4h7v5k281w9cnbfw9lb8tp56r30lwt >>> >> > > > > > >>> >> > > > > > Best regards, >>> >> > > > > > Qingsheng >>> >> > > > > > >>> >> > > > > > >>> >> > > > > > On Fri, Jan 13, 2023 at 4:33 PM Gabor Somogyi < >>> >> > > > gabor.g.somo...@gmail.com >>> >> > > > > > >>> >> > > > > > wrote: >>> >> > > > > > >>> >> > > > > > > +1 on the overall direction, it's an important feature. >>> >> > > > > > > >>> >> > > > > > > I've had a look on the latest master and looks like >>> removed >>> >> > > partition >>> >> > > > > > > handling is not yet added but I think this is essential. >>> >> > > > > > > >>> >> > > > > > > >>> >> > > > > > > >>> >> > > > > > >>> >> > > > > >>> >> > > > >>> >> > > >>> >> > >>> >> >>> https://github.com/apache/flink/blob/28c3e1a3923ba560b559a216985c1abeb794ebaa/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java#L305 >>> >> > > > > > > >>> >> > > > > > > If a partition all of a sudden disappears then it could >>> lead >>> >> to >>> >> > > data >>> >> > > > > > loss. >>> >> > > > > > > Are you planning to add it? >>> >> > > > > > > If yes then when? >>> >> > > > > > > >>> >> > > > > > > G >>> >> > > > > > > >>> >> > > > > > > >>> >> > > > > > > On Fri, Jan 13, 2023 at 9:22 AM John Roesler < >>> >> > vvcep...@apache.org> >>> >> > > > > > wrote: >>> >> > > > > > > >>> >> > > > > > > > Thanks for this proposal, Qingsheng! >>> >> > > > > > > > >>> >> > > > > > > > If you want to be a little conservative with the >>> default, 5 >>> >> > > minutes >>> >> > > > > > might >>> >> > > > > > > > be better than 30 seconds. >>> >> > > > > > > > >>> >> > > > > > > > The equivalent config in Kafka seems to be >>> >> metadata.max.age.ms >>> >> > ( >>> >> > > > > > > > >>> >> > > > > > > >>> >> > > > > > >>> >> > > > > >>> >> > > > >>> >> > > >>> >> > >>> >> >>> https://kafka.apache.org/documentation/#consumerconfigs_metadata.max.age.ms >>> >> > > > > > > ), >>> >> > > > > > > > which has a default value of 5 minutes. >>> >> > > > > > > > >>> >> > > > > > > > Other than that, I’m in favor. I agree, this should be >>> on by >>> >> > > > default. >>> >> > > > > > > > >>> >> > > > > > > > Thanks again, >>> >> > > > > > > > John >>> >> > > > > > > > >>> >> > > > > > > > On Fri, Jan 13, 2023, at 08:26, Leonard Xu wrote: >>> >> > > > > > > > > Thanks Qingsheng for driving this, enable the dynamic >>> >> > partition >>> >> > > > > > > > > discovery would be very useful for kafka topic scale >>> >> > partitions >>> >> > > > > > > > > scenarios. >>> >> > > > > > > > > >>> >> > > > > > > > > +1 for the change. >>> >> > > > > > > > > >>> >> > > > > > > > > CC: Becket >>> >> > > > > > > > > >>> >> > > > > > > > > >>> >> > > > > > > > > Best, >>> >> > > > > > > > > Leonard >>> >> > > > > > > > > >>> >> > > > > > > > > >>> >> > > > > > > > > >>> >> > > > > > > > >> On Jan 13, 2023, at 3:15 PM, Jark Wu < >>> imj...@gmail.com> >>> >> > > wrote: >>> >> > > > > > > > >> >>> >> > > > > > > > >> +1 for the change. I think this is beneficial for >>> users >>> >> and >>> >> > is >>> >> > > > > > > > compatible. >>> >> > > > > > > > >> >>> >> > > > > > > > >> Best, >>> >> > > > > > > > >> Jark >>> >> > > > > > > > >> >>> >> > > > > > > > >> On Fri, 13 Jan 2023 at 14:22, 何军 < >>> xuehaijux...@gmail.com >>> >> > >>> >> > > > wrote: >>> >> > > > > > > > >> >>> >> > > > > > > > >>>> >>> >> > > > > > > > >>>> +1 for this idea, we have enabled kafka dynamic >>> >> partition >>> >> > > > > > discovery >>> >> > > > > > > in >>> >> > > > > > > > >>> all >>> >> > > > > > > > >>>> jobs. >>> >> > > > > > > > >>>> >>> >> > > > > > > > >>>> >>> >> > > > > > > > >>> >>> >> > > > > > > > >>> >> > > > > > > >>> >> > > > > > >>> >> > > > > >>> >> > > > >>> >> > > > >>> >> > > > -- >>> >> > > > >>> >> > > > Best, >>> >> > > > Benchao Li >>> >> > > > >>> >> > > >>> >> > >>> >> >>> > >>> >>