re: @Mason about a pluggable error handling strategy

+1 to that, we've had users ask for that in the past for other connectors
as well, e.g. Elasticsearch.
As long as the exposure doesn't lend itself to potentially breaking
processing semantics, I don't see why not!

On Fri, Feb 10, 2023 at 1:54 PM Tzu-Li (Gordon) Tai <tzuli...@apache.org>
wrote:

> re: @Mason about TimestampOffsetInitializer default offset reset strategy
>
> Should the OffsetsInitializer even be respected for partitions from new
> discoveries? By "new discoveries", I mean partitions that are discovered
> outside of the initial metadata pull on job startup.
>
> I had the impression that we should be differentiating between 1) initial
> discovery after job startup without restore, and 2) discovery of restored
> partitions written in Flink state at job restore time, and 3) dynamic
> discoveries post restore / startup phases.
>
> For each case, the suitable strategy for initializing offset and resetting
> offset in case of out-of-range, would be different:
> For case 1), we should respect the user-configured OffsetInitializer
> strategy for initializing. And, it makes sense that the default reset
> strategy is LATEST to be aligned with Kafka's default.
> For case 2), obviously we always initialize partition offsets based on the
> checkpointed offset. It's probably debatable whether the reset strategy
> should be EARLIEST or LATEST, but by intuition it seems to be EARLIEST to
> avoid the least amount of data loss.
> For case 3), the initialize strategy should always be read from EARLIEST,
> given the nature of delays in a poll-based periodic discovery. I don't
> think offset resets should ever occur here, as we wouldn't ever bump into
> an out-of-range situation here.
>
> TLDR, the user-configured OffsetInitializer should only be respected when
> starting a fresh job without checkpoints, and for the first set of
> partitions that already exist at startup time.
> In all other scenarios, we required dedicated ways to handle them which we
> probably shouldn't expose to the user for configuring.
>
> On Fri, Feb 10, 2023 at 1:34 PM Tzu-Li (Gordon) Tai <tzuli...@apache.org>
> wrote:
>
>> Joined the discussion a bit late, but just want to add in my +1 as well
>> :-)
>>
>> Historically, when dynamic partition discovery was implemented in the
>> earlier versions of the FlinkKafkaConsumer, it was implemented such that
>> multiple source subtasks would in parallel query Kafka brokers for
>> topic/partition metadata at the configured discovery interval. There were
>> concerns about this at a larger scale, hence the feature was disabled by
>> default.
>>
>> I don't see any reason why to not enable this by default for the latest
>> implementations of the KafkaSourceEnumerator.
>>
>> That being said - this is essentially a breaking user-facing change in
>> that it has functional side effects, but I don't see any way of introducing
>> this without a breaking change either.
>>
>> I imagine that the group of users that are most likely to be caught by
>> surprise are users who use regex topic pattern subscription, but did not
>> enable partition discovery. We need to be diligent in documenting
>> (including releasing blog posts) about this change.
>>
>> removed partition handling is not yet added
>>
>>
>> I agree with @Qinsheng that this can be an orthogonal topic outside the
>> scope of the planned changes here as it isn't straightforward.
>>
>> On Fri, Feb 10, 2023 at 12:56 PM Martijn Visser <martijnvis...@apache.org>
>> wrote:
>>
>>> Oh and Mason, definitely interesting! :)
>>>
>>> On Fri, Feb 10, 2023 at 9:51 PM Martijn Visser <martijnvis...@apache.org
>>> >
>>> wrote:
>>>
>>> > @Qingsheng what are your next steps for this proposal?
>>> >
>>> > On Thu, Jan 19, 2023 at 9:14 AM Mason Chen <mas.chen6...@gmail.com>
>>> wrote:
>>> >
>>> >> Hi all,
>>> >>
>>> >> Sorry to come into the discussion late--I saw the thread earlier.
>>> >>
>>> >> I'm also +1 for the change in general. I think most users have this
>>> turned
>>> >> on by default since the overhead is quite low. A default in the two
>>> digit
>>> >> seconds range works well for us. However, I do have two main concerns
>>> that
>>> >> are related, but don't necessarily block this FLIP:
>>> >>
>>> >> 1. Timestamp Offset Initializer
>>> >>
>>> >> Currently, the timestamp offset initializer defaults the offset reset
>>> >> strategy to LATEST. This can present some problems if the discovery
>>> >> interval is set too large since records from new partitions could be
>>> >> skipped (the set timestamp is not found in Kafka, thus resetting to
>>> the
>>> >> latest). Here is a ticket to allow customizations:
>>> >> https://issues.apache.org/jira/browse/FLINK-30200 (Qingsheng, you
>>> might
>>> >> remember this from a PR review). Thanks for mentioning this in your
>>> FLIP!
>>> >>
>>> >> 2. AdminClient Fault Tolerance
>>> >>
>>> >> AdminClient, which is used for partition discovery, seems not to
>>> handle
>>> >> Kafka timeouts as robustly as the KafkaConsumer API, and we have
>>> noticed
>>> >> that transient network hiccups cause full job restarts (since the
>>> >> jobmanager fails) in numerous incidents. Internally, we have
>>> introduced an
>>> >> error handling strategy based on the number of consecutive partition
>>> >> discovery failures. I'm interested in opening a JIRA ticket to
>>> contribute
>>> >> this feature back to Flink and open making the error handling more
>>> >> pluggable. What do you think?
>>> >>
>>> >> Best,
>>> >> Mason
>>> >>
>>> >> On Sun, Jan 15, 2023 at 11:39 PM Qingsheng Ren <re...@apache.org>
>>> wrote:
>>> >>
>>> >> > Thanks for the input Becket!
>>> >> >
>>> >> > I reorganized this proposal into FLIP-288 [1].
>>> >> >
>>> >> > [1]
>>> >> >
>>> >> >
>>> >>
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-288%3A+Enable+Dynamic+Partition+Discovery+by+Default+in+Kafka+Source
>>> >> >
>>> >> > Best,
>>> >> > Qingsheng
>>> >> >
>>> >> > On Sun, Jan 15, 2023 at 9:18 AM Becket Qin <becket....@gmail.com>
>>> >> wrote:
>>> >> >
>>> >> > > Thanks for the proposal, Qingsheng.
>>> >> > >
>>> >> > > +1 to enable auto partition discovery by default. Just a
>>> reminder, we
>>> >> > need
>>> >> > > a FLIP for this.
>>> >> > >
>>> >> > > A bit more background on this.
>>> >> > >
>>> >> > > Most of the Kafka users simply subscribe to a topic and let the
>>> >> consumer
>>> >> > to
>>> >> > > automatically adapt to partition changes. So enabling auto
>>> partition
>>> >> > > discovery would align with that experience. The counter argument
>>> last
>>> >> > time
>>> >> > > when I proposed to enable auto partition discovery was mainly due
>>> to
>>> >> the
>>> >> > > concern from the Flink users. There were arguments that sometimes
>>> >> users
>>> >> > > don't want the partition changes to get automatically picked up,
>>> but
>>> >> want
>>> >> > > to do this by restarting the job manually so they can avoid
>>> unnoticed
>>> >> > > changes in the jobs.
>>> >> > >
>>> >> > > Given that in the old Flink source, by default the auto partition
>>> >> > discovery
>>> >> > > was disabled, and there are use cases from both sides, we simply
>>> kept
>>> >> the
>>> >> > > behavior unchanged. From the discussion we have here, it looks
>>> like
>>> >> > > enabling auto partition discovery is much preferred. So I think we
>>> >> should
>>> >> > > do it.
>>> >> > >
>>> >> > > I am not worried about the performance. The new Kafka source will
>>> only
>>> >> > have
>>> >> > > the SplitEnumerator sending metadata requests when the feature is
>>> >> > enabled.
>>> >> > > It is actually much cheaper than the old Kafka source where every
>>> >> > > subtask does that.
>>> >> > >
>>> >> > > Thanks,
>>> >> > >
>>> >> > > Jiangjie (Becket) Qin
>>> >> > >
>>> >> > >
>>> >> > >
>>> >> > > On Sat, Jan 14, 2023 at 11:46 AM Yun Tang <myas...@live.com>
>>> wrote:
>>> >> > >
>>> >> > > > +1 for this proposal and thanks Qingsheng for driving this.
>>> >> > > >
>>> >> > > > Considering the interval, we also set the value as 5min,
>>> equivalent
>>> >> to
>>> >> > > the
>>> >> > > > default value of metadata.max.age.ms.
>>> >> > > >
>>> >> > > >
>>> >> > > > Best
>>> >> > > > Yun Tang
>>> >> > > > ________________________________
>>> >> > > > From: Benchao Li <libenc...@apache.org>
>>> >> > > > Sent: Friday, January 13, 2023 23:06
>>> >> > > > To: dev@flink.apache.org <dev@flink.apache.org>
>>> >> > > > Subject: Re: [DISCUSS] Enabling dynamic partition discovery by
>>> >> default
>>> >> > in
>>> >> > > > Kafka source
>>> >> > > >
>>> >> > > > +1, we've enabled this by default (10mins) in our production for
>>> >> years.
>>> >> > > >
>>> >> > > > Jing Ge <j...@ververica.com.invalid> 于2023年1月13日周五 22:22写道:
>>> >> > > >
>>> >> > > > > +1 for the proposal that makes users' daily work easier and
>>> >> therefore
>>> >> > > > makes
>>> >> > > > > Flink more attractive.
>>> >> > > > >
>>> >> > > > > Best regards,
>>> >> > > > > Jing
>>> >> > > > >
>>> >> > > > >
>>> >> > > > > On Fri, Jan 13, 2023 at 11:27 AM Qingsheng Ren <
>>> re...@apache.org>
>>> >> > > wrote:
>>> >> > > > >
>>> >> > > > > > Thanks everyone for joining the discussion!
>>> >> > > > > >
>>> >> > > > > > @Martijn:
>>> >> > > > > >
>>> >> > > > > > > All newly discovered partitions will be consumed from the
>>> >> > earliest
>>> >> > > > > offset
>>> >> > > > > > possible.
>>> >> > > > > >
>>> >> > > > > > Thanks for the reminder! I checked the logic of KafkaSource
>>> and
>>> >> > found
>>> >> > > > > that
>>> >> > > > > > new partitions will start from the offset initializer
>>> specified
>>> >> by
>>> >> > > the
>>> >> > > > > user
>>> >> > > > > > instead of the earliest. We need to correct this behavior to
>>> >> avoid
>>> >> > > > > dropping
>>> >> > > > > > messages from new partitions.
>>> >> > > > > >
>>> >> > > > > > > Job restarts from checkpoint
>>> >> > > > > >
>>> >> > > > > > I think the current logic guarantees the exactly-once
>>> semantic.
>>> >> New
>>> >> > > > > > partitions created after the checkpoint will be
>>> re-discovered
>>> >> again
>>> >> > > and
>>> >> > > > > > picked up by the source.
>>> >> > > > > >
>>> >> > > > > > @John:
>>> >> > > > > >
>>> >> > > > > > > If you want to be a little conservative with the default,
>>> 5
>>> >> > minutes
>>> >> > > > > might
>>> >> > > > > > be better than 30 seconds.
>>> >> > > > > >
>>> >> > > > > > Thanks for the suggestion! I tried to find the equivalent
>>> >> config in
>>> >> > > > Kafka
>>> >> > > > > > but missed it. It would be neat to align with the default
>>> value
>>> >> of
>>> >> > "
>>> >> > > > > > metadata.max.age.ms".
>>> >> > > > > >
>>> >> > > > > > @Gabor:
>>> >> > > > > >
>>> >> > > > > > > removed partition handling is not yet added
>>> >> > > > > >
>>> >> > > > > > There was a detailed discussion about removing partitions
>>> [1]
>>> >> but
>>> >> > it
>>> >> > > > > looks
>>> >> > > > > > like this is not an easy task considering the potential data
>>> >> loss
>>> >> > and
>>> >> > > > > state
>>> >> > > > > > inconsistency. I'm afraid there's no clear plan on this one
>>> and
>>> >> > maybe
>>> >> > > > we
>>> >> > > > > > could trigger a new discussion thread about how to correctly
>>> >> handle
>>> >> > > > > removed
>>> >> > > > > > partitions.
>>> >> > > > > >
>>> >> > > > > > [1]
>>> >> > https://lists.apache.org/thread/7r4h7v5k281w9cnbfw9lb8tp56r30lwt
>>> >> > > > > >
>>> >> > > > > > Best regards,
>>> >> > > > > > Qingsheng
>>> >> > > > > >
>>> >> > > > > >
>>> >> > > > > > On Fri, Jan 13, 2023 at 4:33 PM Gabor Somogyi <
>>> >> > > > gabor.g.somo...@gmail.com
>>> >> > > > > >
>>> >> > > > > > wrote:
>>> >> > > > > >
>>> >> > > > > > > +1 on the overall direction, it's an important feature.
>>> >> > > > > > >
>>> >> > > > > > > I've had a look on the latest master and looks like
>>> removed
>>> >> > > partition
>>> >> > > > > > > handling is not yet added but I think this is essential.
>>> >> > > > > > >
>>> >> > > > > > >
>>> >> > > > > > >
>>> >> > > > > >
>>> >> > > > >
>>> >> > > >
>>> >> > >
>>> >> >
>>> >>
>>> https://github.com/apache/flink/blob/28c3e1a3923ba560b559a216985c1abeb794ebaa/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java#L305
>>> >> > > > > > >
>>> >> > > > > > > If a partition all of a sudden disappears then it could
>>> lead
>>> >> to
>>> >> > > data
>>> >> > > > > > loss.
>>> >> > > > > > > Are you planning to add it?
>>> >> > > > > > > If yes then when?
>>> >> > > > > > >
>>> >> > > > > > > G
>>> >> > > > > > >
>>> >> > > > > > >
>>> >> > > > > > > On Fri, Jan 13, 2023 at 9:22 AM John Roesler <
>>> >> > vvcep...@apache.org>
>>> >> > > > > > wrote:
>>> >> > > > > > >
>>> >> > > > > > > > Thanks for this proposal, Qingsheng!
>>> >> > > > > > > >
>>> >> > > > > > > > If you want to be a little conservative with the
>>> default, 5
>>> >> > > minutes
>>> >> > > > > > might
>>> >> > > > > > > > be better than 30 seconds.
>>> >> > > > > > > >
>>> >> > > > > > > > The equivalent config in Kafka seems to be
>>> >> metadata.max.age.ms
>>> >> > (
>>> >> > > > > > > >
>>> >> > > > > > >
>>> >> > > > > >
>>> >> > > > >
>>> >> > > >
>>> >> > >
>>> >> >
>>> >>
>>> https://kafka.apache.org/documentation/#consumerconfigs_metadata.max.age.ms
>>> >> > > > > > > ),
>>> >> > > > > > > > which has a default value of 5 minutes.
>>> >> > > > > > > >
>>> >> > > > > > > > Other than that, I’m in favor. I agree, this should be
>>> on by
>>> >> > > > default.
>>> >> > > > > > > >
>>> >> > > > > > > > Thanks again,
>>> >> > > > > > > > John
>>> >> > > > > > > >
>>> >> > > > > > > > On Fri, Jan 13, 2023, at 08:26, Leonard Xu wrote:
>>> >> > > > > > > > > Thanks Qingsheng for driving this, enable the dynamic
>>> >> > partition
>>> >> > > > > > > > > discovery would be very useful for kafka topic scale
>>> >> > partitions
>>> >> > > > > > > > > scenarios.
>>> >> > > > > > > > >
>>> >> > > > > > > > > +1 for the change.
>>> >> > > > > > > > >
>>> >> > > > > > > > > CC: Becket
>>> >> > > > > > > > >
>>> >> > > > > > > > >
>>> >> > > > > > > > > Best,
>>> >> > > > > > > > > Leonard
>>> >> > > > > > > > >
>>> >> > > > > > > > >
>>> >> > > > > > > > >
>>> >> > > > > > > > >> On Jan 13, 2023, at 3:15 PM, Jark Wu <
>>> imj...@gmail.com>
>>> >> > > wrote:
>>> >> > > > > > > > >>
>>> >> > > > > > > > >> +1 for the change. I think this is beneficial for
>>> users
>>> >> and
>>> >> > is
>>> >> > > > > > > > compatible.
>>> >> > > > > > > > >>
>>> >> > > > > > > > >> Best,
>>> >> > > > > > > > >> Jark
>>> >> > > > > > > > >>
>>> >> > > > > > > > >> On Fri, 13 Jan 2023 at 14:22, 何军 <
>>> xuehaijux...@gmail.com
>>> >> >
>>> >> > > > wrote:
>>> >> > > > > > > > >>
>>> >> > > > > > > > >>>>
>>> >> > > > > > > > >>>> +1 for this idea, we have enabled kafka dynamic
>>> >> partition
>>> >> > > > > > discovery
>>> >> > > > > > > in
>>> >> > > > > > > > >>> all
>>> >> > > > > > > > >>>> jobs.
>>> >> > > > > > > > >>>>
>>> >> > > > > > > > >>>>
>>> >> > > > > > > > >>>
>>> >> > > > > > > >
>>> >> > > > > > >
>>> >> > > > > >
>>> >> > > > >
>>> >> > > >
>>> >> > > >
>>> >> > > > --
>>> >> > > >
>>> >> > > > Best,
>>> >> > > > Benchao Li
>>> >> > > >
>>> >> > >
>>> >> >
>>> >>
>>> >
>>>
>>

Reply via email to