Re: [DISCUSS] Minimum constraint for segment.ms

2024-03-13 Thread Kamal Chandraprakash
One use case I see for setting the `segment.bytes` to 1 is to delete all
the records from the topic.
We can mention about it in the doc to use the `kafka-delete-records` API
instead.




On Wed, Mar 13, 2024 at 6:59 PM Divij Vaidya 
wrote:

> + users@kafka
>
> Hi users of Apache Kafka
>
> With the upcoming 4.0 release, we have an opportunity to improve the
> constraints and default values for various Kafka configurations.
>
> We are soliciting your feedback and suggestions on configurations where the
> default values and/or constraints should be adjusted. Please reply in this
> thread directly.
>
> --
> Divij Vaidya
> Apache Kafka PMC
>
>
>
> On Wed, Mar 13, 2024 at 12:56 PM Divij Vaidya 
> wrote:
>
> > Thanks for the discussion folks. I have started a KIP
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1030%3A+Change+constraints+and+default+values+for+various+configurations
> > to keep track of the changes that we are discussion. Please consider this
> > as a collaborative work-in-progress KIP and once it is ready to be
> > published, we can start a discussion thread on it.
> >
> > I am also going to start a thread to solicit feedback from users@
> mailing
> > list as well.
> >
> > --
> > Divij Vaidya
> >
> >
> >
> > On Wed, Mar 13, 2024 at 12:55 PM Christopher Shannon <
> > christopher.l.shan...@gmail.com> wrote:
> >
> >> I think it's a great idea to raise a KIP to look at adjusting defaults
> and
> >> minimum/maximum config values for version 4.0.
> >>
> >> As pointed out, the minimum values for segment.ms and segment.bytes
> don't
> >> make sense and would probably bring down a cluster pretty quickly if set
> >> that low, so version 4.0 is a good time to fix it and to also look at
> the
> >> other configs as well for adjustments.
> >>
> >> On Wed, Mar 13, 2024 at 4:39 AM Sergio Daniel Troiano
> >>  wrote:
> >>
> >> > hey guys,
> >> >
> >> > Regarding to num.recovery.threads.per.data.dir: I agree, in our
> company
> >> we
> >> > use the number of vCPUs to do so as this is not competing with ready
> >> > cluster traffic.
> >> >
> >> >
> >> > On Wed, 13 Mar 2024 at 09:29, Luke Chen  wrote:
> >> >
> >> > > Hi Divij,
> >> > >
> >> > > Thanks for raising this.
> >> > > The valid minimum value 1 for `segment.ms` is completely
> >> unreasonable.
> >> > > Similarly for `segment.bytes`, `metadata.log.segment.ms`,
> >> > > `metadata.log.segment.bytes`.
> >> > >
> >> > > In addition to that, there are also some config default values we'd
> >> like
> >> > to
> >> > > propose to change in v4.0.
> >> > > We can collect more comments from the community, and come out with a
> >> KIP
> >> > > for them.
> >> > >
> >> > > 1. num.recovery.threads.per.data.dir:
> >> > > The current default value is 1. But the log recovery is happening
> >> before
> >> > > brokers are in ready state, which means, we should use all the
> >> available
> >> > > resource to speed up the log recovery to bring the broker to ready
> >> state
> >> > > soon. Default value should be... maybe 4 (to be decided)?
> >> > >
> >> > > 2. Other configs might be able to consider to change the default,
> but
> >> > open
> >> > > for comments:
> >> > >2.1. num.replica.fetchers: default is 1, but that's not enough
> when
> >> > > there are multiple partitions in the cluster
> >> > >2.2. `socket.send.buffer.bytes`/`socket.receive.buffer.bytes`:
> >> > > Currently, we set 100kb as default value, but that's not enough for
> >> > > high-speed network.
> >> > >
> >> > > Thank you.
> >> > > Luke
> >> > >
> >> > >
> >> > > On Tue, Mar 12, 2024 at 1:32 AM Divij Vaidya <
> divijvaidy...@gmail.com
> >> >
> >> > > wrote:
> >> > >
> >> > > > Hey folks
> >> > > >
> >> > > > Before I file a KIP to change this in 4.0, I wanted to understand
> >> the
> >> > > > historical context for the value of the following setting.
> >> > > >
> >> > > > Currently, segment.ms minimum threshold is set to 1ms [1].
> >> > > >
> >> > > > Segments are expensive. Every segment uses multiple file
> descriptors
> >> > and
> >> > > > it's easy to run out of OS limits when creating a large number of
> >> > > segments.
> >> > > > Large number of segments also delays log loading on startup
> because
> >> of
> >> > > > expensive operations such as iterating through all directories &
> >> > > > conditionally loading all producer state.
> >> > > >
> >> > > > I am currently not aware of a reason as to why someone might want
> to
> >> > work
> >> > > > with a segment.ms of less than ~10s (number chosen arbitrary that
> >> > looks
> >> > > > sane)
> >> > > >
> >> > > > What was the historical context of setting the minimum threshold
> to
> >> 1ms
> >> > > for
> >> > > > this setting?
> >> > > >
> >> > > > [1]
> >> > https://kafka.apache.org/documentation.html#topicconfigs_segment.ms
> >> > > >
> >> > > > --
> >> > > > Divij Vaidya
> >> > > >
> >> > >
> >> >
> >>
> >
>


Re: [DISCUSS] KIP-989: RocksDB Iterator Metrics

2024-03-13 Thread Sophie Blee-Goldman
About your last two points: I completely agree that we should try to
make this independent of RocksDB, and should probably adopt a
general philosophy of being store-implementation agnostic unless
there is good reason to focus on a particular store type: eg if it was
only possible to implement for certain stores, or only made sense in
the context of a certain store type but not necessarily stores in general.

While leaking memory due to unclosed iterators on RocksDB stores is
certainly the most common issue, I think Matthias sufficiently
demonstrated that the problem of leaking iterators is not actually
unique to RocksDB, and we should consider including in-memory
stores at the very least. I also think that at this point, we may as well
just implement the metrics for *all* store types, whether rocksdb or
in-memory or custom. Not just because it probably applies to all
store types (leaking iterators are rarely a good thing!) but because
I imagine the best way to implement this would be to do so at the
high-level iterator rather than implementing it separately for each
specific iterator implementation for every store type.

That said, I haven't thought all that carefully about the implementation
yet -- it just strikes me as easiest to do at the top level of the store
hierarchy rather than at the bottom. My gut instinct may very well be
wrong, but that's what it's saying

On Thu, Mar 7, 2024 at 10:43 AM Matthias J. Sax  wrote:

> Seems I am late to this party. Can we pick this up again aiming for 3.8
> release? I think it would be a great addition. Few comments:
>
>
> - I think it does make sense to report `iterator-duration-avg` and
> `iterator-duration-max` for all *closed* iterators -- it just seems to
> be a useful metric (wondering if this would be _overall_ or bounded to
> some time window?)
>
> - About the duration iterators are currently open, I believe the only
> useful way is to report the "oldest iterator", ie, the smallest iterator
> open-time, of all currently open-iterator? We all agree that in general,
> leaking iterator would bump the count metric, and if there is a few ones
> which are not closed and open for a long time, it seem sufficient to
> detect the single oldest one for alerting purpose?
>
> - What I don't like about the KIP is it focus on RocksDB. I don't think
> we should build on the internal RocksDB counters as proposed (I guess,
> we could still expose them, similar to other RocksDB metrics which we
> expose already). However, for this new metric, we should track it
> ourselves and thus make it independent of RocksDB -- in the end, an
> in-memory store could also leak memory (and kill a JVM with an
> out-of-memory error) and we should be able to track it.
>
> - Not sure if we would like to add support for custom stores, to allow
> them to register their iterators with this metric? Or would this not be
> necessary, because custom stores could just register a custom metric
> about it to begin with?
>
>
>
> -Matthias
>
> On 10/25/23 4:41 PM, Sophie Blee-Goldman wrote:
> >>
> >>   If we used "iterator-duration-max", for
> >> example, would it not be confusing that it includes Iterators that are
> >> still open, and therefore the duration is not yet known?
> >
> >
> > 1. Ah, I think I understand your concern better now -- I totally agree
> that
> > a
> >   "iterator-duration-max" metric would be confusing/misleading. I was
> > thinking about it a bit differently, something more akin to the
> > "last-rebalance-seconds-ago" consumer metric. As the name suggests,
> > that basically just tracks how long the consumer has gone without
> > rebalancing -- it doesn't purport to represent the actual duration
> between
> > rebalances, just the current time since the last one.  The hard part is
> > really
> > in choosing a name that reflects this -- maybe you have some better ideas
> > but off the top of my head, perhaps something like
> "iterator-lifetime-max"?
> >
> > 2. I'm not quite sure how to interpret the "iterator-duration-total"
> metric
> > -- what exactly does it mean to add up all the iterator durations? For
> > some context, while this is not a hard-and-fast rule, in general you'll
> > find that Kafka/Streams metrics tend to come in pairs of avg/max or
> > rate/total. Something that you might measure the avg for usually is
> > also useful to measure the max, whereas a total metric is probably
> > also useful as a rate but not so much as an avg. I actually think this
> > is part of why it feels like it makes so much sense to include a "max"
> > version of this metric, as Lucas suggested, even if the name of
> > "iterator-duration-max" feels misleading. Ultimately the metric names
> > are up to you, but for this reason, I would personally advocate for
> > just going with an "iterator-duration-avg" and "iterator-duration-max"
> >
> > I did see your example in which you mention one could monitor the
> > rate of change of the "-total" metric. While this does make sense to
> > me, if the 

Re: [DISCUSS] KIP-990: Capability to SUSPEND Tasks on DeserializationException

2024-03-13 Thread Sophie Blee-Goldman
>
> Well, the KIP mentions the ability to either re-try the record (eg,
> after applying some external fix that would allow Kafka Streams to now
> deserialize the record now) or to skip it by advancing the offset.


That's fair -- you're definitely right that what's described in the KIP
document
right now would not be practical. I just wanted to clarify that this
doesn't
mean the feature as a whole is impractical, but certainly we'd want to
update the proposal to remove the line about resetting offsets via external
tool and come up with a more concrete approach, and perhaps  describe
it in more detail.

That's  probably not worth getting into until/unless we decide whether to
go forward with this feature in the first place. I'll let Nick reflect on
the
motivation and your other comments and then decide whether he still
wants to pursue it.

To Nick: if you want to go through with this KIP and can expand on the
motivation so that we understand it better, I'd be happy to help work
out the details. For now I'll just wait for your decision

On Wed, Mar 13, 2024 at 10:24 AM Matthias J. Sax  wrote:

> Yes, about the "drop records" case. It's a very common scenario to have
> a repartition step before a windowed aggregation or a join with
> grace-period.
>
>
> About "add feature vs guard users": it's always a tricky question and
> tradeoff. For this particular KIP, I personally think we should opt to
> not add the feature but guard the users, as I don't see too much value
> compared to the complexity and "traps" it adds. -- It's of course just
> my personal opinion, and if there is a asked from many users to add this
> feature, I would not push back further. As mentioned in my previous
> reply, I don't fully understand the motivation yet; maybe Nick can
> provide more context on it.
>
>
> > In other words, opting for the PAUSE option would simply stall the
> > task, and upon #resume it would just be discarding that record and then
> > continuing on with processing
>
> Well, the KIP mentions the ability to either re-try the record (eg,
> after applying some external fix that would allow Kafka Streams to now
> deserialize the record now) or to skip it by advancing the offset. But
> to do this, we need to extend the `resume()` callback to pass in this
> information, making the whole setup and usage of this feature more
> complex, as one needs to so more upfront instrumentation of their custom
> code. -- It's just a technical thing we need to consider if we want to
> move forward, and the KIP should not say "advancing the consumer
> offsets, either via an external tool" because this cannot work. Just
> pointing out incorrect technical assumption, not disregarding that it
> can be done.
>
>
> About committing: yes, I agree to all what you say, and again it was not
> meant as concern, but just as honest questions about some technical
> details. I think it would be good to consider there trade-offs and
> explain in the KIP why we want to do what. That's all.
>
>
>
> -Matthias
>
> On 3/12/24 11:24 PM, Sophie Blee-Goldman wrote:
> >>
> >>   I see way too many food-guns and complications that can be introduced.
> >
> >
> > What is a "food-gun"?? I'm picturing like a spud rifle/potato gun but I
> > don't think that's what you meant hahaha
> >
> > I don't feel super strongly one way or another, but I have a few
> questions
> > & corrections about some of these complaints/concerns:
> >
> > If one task
> >> pauses but other keep running, we keep advancing stream-time downstream,
> >> and thus when the task would resume later, there is a very high
> >> probability that records are dropped as window got already closed.
> >
> > Just to make sure I/everyone understand what you're getting at here, you
> > would be
> > referring to the case of a stateful operation downstream of a
> key-changing
> > operation
> > which is in turn downstream of the  "paused" task -- ie with a
> repartition
> > separating
> > the paused task and the task with a windowed aggregation? Each task has
> its
> > own
> > view of stream-time (technically each processor within a task) so the
> only
> > way that
> > delaying one task and not another would affect which records get dropped
> is
> > if those
> > two tasks are rekeyed and the repartitioning results in their outputs
> being
> > mixed -- yes?
> >
> > Anyways I think you make a good case for why pausing a single task -- or
> > even an entire
> > instance if others are allowed to continue running -- might make it too
> > easy for users to
> > shoot themselves in the foot without understanding the full
> ramifications.
> > Of course, there
> > are already a million ways for users to screw up their app if configured
> or
> > operated incorrectly,
> > and we shouldn't necessarily kill a feature just because some people
> might
> > use it when they
> > shouldn't. Why can't we just document that this feature should not be
> used
> > with applications
> > that include time-sensitive operators?
> >
> > I also 

Re: [DISCUSS] KIP-1020: Deprecate `window.size.ms` in `StreamsConfig`

2024-03-13 Thread Sophie Blee-Goldman
By "don't add them" do you just mean we would not have any actual
variables defined anywhere for these configs (eg WINDOW_SIZE_MS)
and simply document -- somewhere -- that one can use the string
"window.size.ms" when configuring a command-line client with a
windowed serde? Or something else? I assume you aren't proposing
to remove the ability to use and understand this config from the
implementations themselves, but correct me if that's wrong.

Are there any other configs in similar situations that we could look
to for precedent? I personally am not aware of any but by definition
I suppose these would be hard to discover unless you were actively
looking for them, so I'm wondering if there might be other "shadow
configs" elsewhere in the code base.

If these are truly the first/only of their kind, I would vote to just stick
them in the appropriate class. As for which class to put them in, I
think I'm convinced that "window.size.ms" should only go in the
TimeWindowedDeserializer rather than sticking them both in the
TimeWindowedSerde as I originally suggested. However, I would
even go a step further and not place the "inner.window.class.serde"
in the TimeWindowedSerde class either. To me, it actually makes
the most sense to define it in both the TimeWindowedSerializer
and the TimeWindowedDeserializer.

The reason being that, as discussed above, the only use case for
these configs would be in the console consumer/producer which
only uses the Serializer or Deserializer, and would never actually
be used by/in Streams where we use the Serde version. And while
defining the  "inner.window.class.serde" in two places might seem
redundant, this would mean that all the configs needed to properly
configure the specific class being used by the particular kind of
consumer client -- that is, Deserializer for a console consumer and
Serializer for a console producer -- would be located in that exact
class. I assume this would make them much easier to discover
and be used than having to search for configs defined in classes
you don't even need for the console client, like the Serde form

Just my two cents -- happy to hear other opinions on this

On Mon, Mar 11, 2024 at 6:58 PM Matthias J. Sax  wrote:

> Yes, it's used inside `TimeWindowedSerializer` and actually also inside
> `TimeWindowDeserializer`.
>
> However, it does IMHO not change that we should remove it from
> `StreamsConfig` because both configs are not intended to be used in Java
> code... If one writes Java code, they should use
>
>new TimeWindowedSerializer(Serializer)
>new TimeWindowDeserializer(Deserializer, Long)
>new TimeWindowSerdes(Serde, Long)
>
> and thus they don't need either config.
>
> The configs are only needed for command line tool, that create the
> (de)serializer via reflection using the default constructor.
>
> Does this make sense?
>
>
>
> The only open question is really, if and where to add them... Strictly
> speaking, we don't need either config as public variable as nobody
> should use them in Java code. To me, it just feels right/better do make
> them public for documentation purpose that these configs exists?
>
> `inner.window.class.serde` has "serde" in it's name, so we could add it
> to `TimeWindowSerdes`? For `window.size.ms`, it's only used by the
> deserialize to maybe add it there? Just some ideas. -- Or we sidestep
> this question and just don't add them; also fine with me.
>
>
> -Matthias
>
> On 3/11/24 10:53 AM, Lucia Cerchie wrote:
> > PS-- I was re-reading the PR that originated this discussion and realized
> > that `window.inner.serde.class` is used here
> > <
> https://github.com/a0x8o/kafka/blob/master/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedSerializer.java#L44
> >
> > in KStreams. This goes against removing it, yes?
> >
> > On Mon, Mar 11, 2024 at 10:40 AM Lucia Cerchie 
> > wrote:
> >
> >> Sophie, I'll add a paragraph about removing `windowed.inner.serde.class`
> >> to the KIP. I'll also add putting it in the `TimeWindowedSerde` class
> with
> >> some add'tl guidance on the docs addition.
> >>
> >> Also, I double-checked setting window.size.ms on the client and it
> >> doesn't throw an error at all, in response to Matthias's question.
> Changing
> >> the KIP in response to that.
> >>
> >> On Sun, Mar 10, 2024 at 6:04 PM Sophie Blee-Goldman <
> sop...@responsive.dev>
> >> wrote:
> >>
> >>> Thanks for responding Matthias -- you got there first, but I was going
> to
> >>> say exactly the same thing as in your most reply. In other words, I see
> >>> the
> >>> `windowed.inner.serde.class` as being in the same boat as the `
> >>> window.size.ms` config, so whatever we do with one we should do for
> the
> >>> other.
> >>>
> >>> I do agree with removing these from StreamsConfig, but defining them in
> >>> ConsumerConfig feels weird as well. There's really no great answer
> here.
> >>>
> >>> My only concern about adding it to the corresponding
> >>> serde/serializer/deserializer class is 

Re: [DISCUSS] Apache Kafka 3.6.2 release

2024-03-13 Thread Satish Duggana
+1, Thanks Mani for volunteering.

On Thu, 14 Mar 2024 at 06:01, Luke Chen  wrote:
>
> +1, Thanks Manikumar!
>
> On Thu, Mar 14, 2024 at 3:40 AM Bruno Cadonna  wrote:
>
> > Thanks Manikumar!
> >
> > +1
> >
> > Best,
> > Bruno
> >
> > On 3/13/24 5:56 PM, Josep Prat wrote:
> > > +1 thanks for volunteering!
> > >
> > > Best
> > > ---
> > >
> > > Josep Prat
> > > Open Source Engineering Director, aivenjosep.p...@aiven.io   |
> > > +491715557497 | aiven.io
> > > Aiven Deutschland GmbH
> > > Alexanderufer 3-7, 10117 Berlin
> > > Geschäftsführer: Oskari Saarenmaa & Hannu Valtonen
> > > Amtsgericht Charlottenburg, HRB 209739 B
> > >
> > > On Wed, Mar 13, 2024, 17:17 Divij Vaidya 
> > wrote:
> > >
> > >> +1
> > >>
> > >> Thank you for volunteering.
> > >>
> > >> --
> > >> Divij Vaidya
> > >>
> > >>
> > >>
> > >> On Wed, Mar 13, 2024 at 4:58 PM Justine Olshan
> > >> 
> > >> wrote:
> > >>
> > >>> Thanks Manikumar!
> > >>> +1 from me
> > >>>
> > >>> Justine
> > >>>
> > >>> On Wed, Mar 13, 2024 at 8:52 AM Manikumar 
> > >>> wrote:
> > >>>
> >  Hi,
> > 
> >  I'd like to volunteer to be the release manager for a bug fix release
> > >> of
> >  the 3.6 line.
> >  If there are no objections, I'll send out the release plan soon.
> > 
> >  Thanks,
> >  Manikumar
> > 
> > >>>
> > >>
> > >
> >


Re: [DISCUSS] Apache Kafka 3.6.2 release

2024-03-13 Thread Luke Chen
+1, Thanks Manikumar!

On Thu, Mar 14, 2024 at 3:40 AM Bruno Cadonna  wrote:

> Thanks Manikumar!
>
> +1
>
> Best,
> Bruno
>
> On 3/13/24 5:56 PM, Josep Prat wrote:
> > +1 thanks for volunteering!
> >
> > Best
> > ---
> >
> > Josep Prat
> > Open Source Engineering Director, aivenjosep.p...@aiven.io   |
> > +491715557497 | aiven.io
> > Aiven Deutschland GmbH
> > Alexanderufer 3-7, 10117 Berlin
> > Geschäftsführer: Oskari Saarenmaa & Hannu Valtonen
> > Amtsgericht Charlottenburg, HRB 209739 B
> >
> > On Wed, Mar 13, 2024, 17:17 Divij Vaidya 
> wrote:
> >
> >> +1
> >>
> >> Thank you for volunteering.
> >>
> >> --
> >> Divij Vaidya
> >>
> >>
> >>
> >> On Wed, Mar 13, 2024 at 4:58 PM Justine Olshan
> >> 
> >> wrote:
> >>
> >>> Thanks Manikumar!
> >>> +1 from me
> >>>
> >>> Justine
> >>>
> >>> On Wed, Mar 13, 2024 at 8:52 AM Manikumar 
> >>> wrote:
> >>>
>  Hi,
> 
>  I'd like to volunteer to be the release manager for a bug fix release
> >> of
>  the 3.6 line.
>  If there are no objections, I'll send out the release plan soon.
> 
>  Thanks,
>  Manikumar
> 
> >>>
> >>
> >
>


Re: [DISCUSS] KIP-1024: Make the restore behavior of GlobalKTables with custom processors configureable

2024-03-13 Thread Walker Carlson
Thanks for the feedback Bruno, Matthias, and Lucas!

There is a decent amount but I'm going to try and just hit the major points
as I would like to keep this change simple.

I've made corrections for the mistakes pointed out. Thanks for the
suggestions everyone.

The main sticking point seems to be with the method of signalling the
restore behavior. It seems we can all agree with how the API should look
with the default option we are adding. I think keeping the option to load
directly from the topic into the store is a good idea. It is much more
performant and could make a simple metric collector processor much simpler.

I think something that Matthais said about creating a special class of
processors for the global stores helps me think about the issue. I tend to
fall into the category that we should keep global stores open to the
possibility of having child nodes in the future. I don't really see the
downside of having that as an option. It might not be best for a lot of
cases, but something simple could be very useful to put in the PAPI.

I like the idea of having a `GlobalStoreParameters` but only if we decide
to make the processor need to extend an interface like
'GobalStoreProcessor`. If not that seems excessive.

As of right now I don't see a better option than having a boolean flag for
the reprocessOnRestore option. I expanded the description in the docs so I
hope that helps.

I am more than willing to take other ideas on it.

thanks,
Walker


Re: [DISCUSS] Apache Kafka 3.6.2 release

2024-03-13 Thread Bruno Cadonna

Thanks Manikumar!

+1

Best,
Bruno

On 3/13/24 5:56 PM, Josep Prat wrote:

+1 thanks for volunteering!

Best
---

Josep Prat
Open Source Engineering Director, aivenjosep.p...@aiven.io   |
+491715557497 | aiven.io
Aiven Deutschland GmbH
Alexanderufer 3-7, 10117 Berlin
Geschäftsführer: Oskari Saarenmaa & Hannu Valtonen
Amtsgericht Charlottenburg, HRB 209739 B

On Wed, Mar 13, 2024, 17:17 Divij Vaidya  wrote:


+1

Thank you for volunteering.

--
Divij Vaidya



On Wed, Mar 13, 2024 at 4:58 PM Justine Olshan

wrote:


Thanks Manikumar!
+1 from me

Justine

On Wed, Mar 13, 2024 at 8:52 AM Manikumar 
wrote:


Hi,

I'd like to volunteer to be the release manager for a bug fix release

of

the 3.6 line.
If there are no objections, I'll send out the release plan soon.

Thanks,
Manikumar









Dynamic Metrics Reporter Producer reload

2024-03-13 Thread Kenneth Eversole
Hello,


We currently use short lived TLS certs for our CruiseControlMetricsReporter
which is creating issues when the cert reloads the producer client is
failing to restart as well. A rolling restart resolves this but I was
hoping to find out if there is a dynamic way to reload these producers or
any client reallly in a similar pattern we do with brokers.

Github issue on the cruise control repo for another reference:
https://github.com/linkedin/cruise-control/issues/1148


Kenneth Eversole


Re: [DISCUSS] Personal branches under apache/kafka

2024-03-13 Thread Matthias J. Sax

+1

Should be fine to just delete all of them (as long as nobody raised 
objections).


Not sure if we could enable some protection GitHub side that disallow to 
push into non-existing branches and thus avoids accidental branch creation?



-Matthias

On 3/13/24 11:39 AM, Josep Prat wrote:

Hi Michael,

I think it's a good idea. Only "official" branches should exist in the
upstream repo.
I guess the only exception would be if a massive feature would be done by
different individuals collaborating and they would need a "neutral" place
for the branch to be. But This didn't happen yet and I doubt it will in the
near future.

Best,

---
Josep Prat
Open Source Engineering Director, aivenjosep.p...@aiven.io   |
+491715557497 | aiven.io
Aiven Deutschland GmbH
Alexanderufer 3-7, 10117 Berlin
Geschäftsführer: Oskari Saarenmaa & Hannu Valtonen
Amtsgericht Charlottenburg, HRB 209739 B

On Wed, Mar 13, 2024, 19:27 José Armando García Sancio
 wrote:


On Wed, Mar 13, 2024 at 11:02 AM Mickael Maison
 wrote:

What do you think?


I agree. I wouldn't be surprised if these branches (not trunk or
release branches) were created by mistake by the committer.

Thanks,
--
-José





Re: [DISCUSS] Personal branches under apache/kafka

2024-03-13 Thread Josep Prat
Hi Michael,

I think it's a good idea. Only "official" branches should exist in the
upstream repo.
I guess the only exception would be if a massive feature would be done by
different individuals collaborating and they would need a "neutral" place
for the branch to be. But This didn't happen yet and I doubt it will in the
near future.

Best,

---
Josep Prat
Open Source Engineering Director, aivenjosep.p...@aiven.io   |
+491715557497 | aiven.io
Aiven Deutschland GmbH
Alexanderufer 3-7, 10117 Berlin
Geschäftsführer: Oskari Saarenmaa & Hannu Valtonen
Amtsgericht Charlottenburg, HRB 209739 B

On Wed, Mar 13, 2024, 19:27 José Armando García Sancio
 wrote:

> On Wed, Mar 13, 2024 at 11:02 AM Mickael Maison
>  wrote:
> > What do you think?
>
> I agree. I wouldn't be surprised if these branches (not trunk or
> release branches) were created by mistake by the committer.
>
> Thanks,
> --
> -José
>


Re: Incremental build for scala tests

2024-03-13 Thread Pavel Pozdeev

There seems to be open issue in Gradle for it:  
https://github.com/gradle/gradle/issues/20854
 
 
  
>Monday, March 4, 2024 9:44 PM UTC from Pavel Pozdeev
> 
>
>Hi team,
>I'm a new member, just joined the community. Trying to add Kraft support to 
>some existing unit-tests.
>I noticed, that when I do any change to some unit-test, e.g. 
>"kafka.admin.AclCommandTest.scala", and then run:
> 
>"./gradlew core:compileTestScala"
> 
>the entire folder "core/build/classes/scala/test" is cleared, and gradle 
>re-compiles ALL tests. It takes quite a long time.
>Looks like this issue affects only tests. If I change main scala code, e.g. 
>"kafka.admin.AclCommand.scala", and then run:
> 
>"./gradlew core:compileScala"
> 
>only single file is re-compiled, and it takes only a few seconds.
>Has anybody noticed same issue before?
> 
>Best,
>Pavel Pozdeev
 

Re: [DISCUSS] Personal branches under apache/kafka

2024-03-13 Thread José Armando García Sancio
On Wed, Mar 13, 2024 at 11:02 AM Mickael Maison
 wrote:
> What do you think?

I agree. I wouldn't be surprised if these branches (not trunk or
release branches) were created by mistake by the committer.

Thanks,
-- 
-José


[DISCUSS] Personal branches under apache/kafka

2024-03-13 Thread Mickael Maison
Hi,

We have accumulated a number of personal branches in the github
repository: https://github.com/apache/kafka/branches/all

All these branches have been created by committers for various
reasons, bugfix, tests.

I wonder if we should avoid creating branches in the apache repository
(always use your own fork like regular contributors) and in the rare
cases this is necessary ensure we delete them once done? This way we
would only have branches for the various releases (3.7, 3.6, etc).

What do you think?

Thanks,
Mickael


Re: [DISCUSS] KIP-1022 Formatting and Updating Features

2024-03-13 Thread Justine Olshan
Hey folks -- let me summarize my understanding

Artem requested we change the name of transaction version (tv) to
transaction protocol version (tpv). If I do that, I will also probably
change to group coordinator protocol version (gcpv).


Folks were concerned about upgrades/downgrades with different feature
versions when using the feature tool and --release version. I think we can
address some concerns here:

   - don't allow a command that moves a version in the wrong direction
   (when upgrade actually downgrades a feature for example)
   - allow a command to describe all the feature versions for a given
   metadata version

Note that Colin expressed some concern with having the --release-version
flag at all. I wonder if we can reach a compromise with having the upgrade
command have a "latest" command only.


Jun proposed this "latest" functionality can be the default when no version
or feature is specified.

Jose requested that we make --release-version and --feature mutually
exclusive for both the storage tool and the features tool. We should also
specify each feature in the storage tool as a separate flag.

Finally, Jun and Jose requested deprecation of the --metadata flag, but
Colin felt we should keep it. I mentioned the --feature flag does the same
but no reply yet.

* So here are the updates I propose:*
1. Update the feature names as Artem described -- transaction protocol
version and group coordinator protocol version

2. Rename --features in the storage tool to --feature. In the case where we
use this flag, we must repeat the flag for the number of features to set
all of them.

3. No argument behavior on the storage and upgrade tools is to set all the
features to the latest (stable) version

4. Include an API to list the features for a given metadata version

5. I'm going back and forth on whether we should support the
--release-version flag still. If we do, we need to include validation so we
only upgrade on upgrade.

Let me know if folks have any issues with the updates I proposed or think
there is something I missed.

Justine

On Fri, Mar 8, 2024 at 11:59 AM Artem Livshits
 wrote:

> Hi Justine,
>
> >  Are you suggesting it should be called "transaction protocol version" or
> "TPV"? I don't mind that, but just wanted to clarify if we want to include
> protocol or if simply "transaction version" is enough.
>
> My understanding is that "metadata version" is the version of metadata
> records, which is fairly straightforward.  "Transaction version" may be
> ambiguous.
>
> -Artem
>
> On Thu, Feb 29, 2024 at 3:39 PM Justine Olshan
> 
> wrote:
>
> > Hey folks,
> >
> > Thanks for the discussion. Let me try to cover everyone's comments.
> >
> > Artem --
> > I can add the examples you mentioned. As for naming, right now the
> feature
> > is called "transaction version" or "TV". Are you suggesting it should be
> > called "transaction protocol version" or "TPV"? I don't mind that, but
> just
> > wanted to clarify if we want to include protocol or if simply
> "transaction
> > version" is enough.
> >
> > Jun --
> >
> > 10.  *With **more features, would each of those be controlled by a
> separate
> > feature or*
> >
> > *multiple features. For example, is the new transaction record format*
> >
> > *controlled only by MV with TV having a dependency on MV or is it
> > controlled*
> >
> > *by both MV and TV.*
> >
> >
> > I think this will need to be decided on a case by case basis. There
> should
> > be a mechanism to set dependencies among features.
> > For transaction version specifically, I have no metadata version
> > dependencies besides requiring 3.3 to write the feature records and use
> the
> > feature tools. I would suspect all new features would have this
> > requirement.
> >
> >
> > 11. *Basically, if **--release-version is not used, the command will just
> > use the latest*
> >
> > *production version of every feature. Should we apply that logic to both*
> >
> > *tools?*
> >
> >
> > How would this work with the upgrade tool? I think we want a way to set a
> > new feature version for one feature and not touch any of the others.
> >
> >
> > *12. Should we remove --metadata METADATA from kafka-features? It does
> the*
> >
> > *same thing as --release-version.*
> >
> >
> > When I previously discussed with Colin McCabe offline about this tool, he
> > was strongly against deprecation or changing flags. I personally think it
> > could good
> >
> > to unify and not support a ton of flags, but I would want to make sure he
> > is aligned.
> >
> >
> > *13. KIP-853 also extends the tools to support a new feature
> > kraft.version.*
> >
> > *It would be useful to have alignment between that KIP and this one.*
> >
> >
> > Sounds good. Looks like Jose is in on the discussion so we can continue
> > here. :)
> >
> >
> >
> > Jose --
> >
> >
> > *1. KIP-853 uses --feature for kafka-storage instead of --features.*
> >
> > *This is consistent with the use of --feature in the "kafka-feature.sh*
> >
> > *upgrade" 

Re: [DISCUSS] KIP-1024: Make the restore behavior of GlobalKTables with custom processors configureable

2024-03-13 Thread Matthias J. Sax
If the custom store is a key-value store, yes, we could do this. But the 
interface does not enforce a key-value store, it's just a most generic 
`StateStore` that we pass in, and thus it could be something totally 
unknown to us, and we cannot apply a cast...


The underlying idea is really about 100% flexibility in the PAPI layer.

That's also the reason why all stores need to provide a callback for the 
restore path. Kafka Streams runtime can only read the record from the 
changelog, but it cannot put it into the store, as the runtime only sees 
the `StateStore` interface -- thus, we invoke a store specific callback 
(`StateRestoreCallback` interface) that needs to actually put the data 
into the store for us. For our built-in store, we of course provide 
these callbacks, but the point is, that the runtime does not know 
anything about the nature of the store but is fully agnostic to it, to 
allow the plugin of any custom store with any custom interface (which 
just needs to implement `StateStore`).



Not sure if I understand what you mean by this transformation step?



-Matthias


On 3/12/24 3:04 AM, Lucas Brutschy wrote:

@Matthias:

Thanks, I didn't realize that we need processors for any custom store.
Are we sure we cannot build a generic processor to load data into a
custom key-value store? I'm not sure, but you know the code better
than me.

One other alternative is to allow the user to provide a state
transformer `Function,
ConsumerRecord>` to adapt the state before loading it,
defaulting to identity. This would provide the ability to do
efficient, non-deserializing transformations like  => 

On Thu, Mar 7, 2024 at 7:19 PM Matthias J. Sax  wrote:


@Bruno:

(1), I think you are spot for the ts-extractor: on the restore code
path, we only support record-ts, but there is no need for a custom-ts
because for regular changelog topics KS sets the ts, and thus, the
optimization this KIP proposes required that the global topic follow the
changelog format, ie, the ts must be in the record-ts.

However, for the regular processing path, I am not sure if we can omit
deserializers. The way the PAPI is wired up, seems to require that we
give proper types to _other_ Processor that read from the global state
store. For this reason, the store (which takes `Serdes` with proper
types) is wrapped with a `MeteredStore` (like all others) to do the
Serde work, and this MeteredStore is also exposed to the
global-Processor? Might be good for Walker to dig into this to find out
the details?

If would of course be nice if we could avoid the unnecessary
deserialization on topic read, and re-serialization on global-store put
for this case, but it seems not to be straightforward to do...


(2). Is this about the PAPI/Topology? For this case, we don't have any
config object across the board. We only do this in the DSL. Hence, I
would propose to just follow the existing pattern in this KIP to keep
the API consistent. For the DSL, it could make sense of course. -- Of
course, if we think the PAPI could be improved with config objects, we
could do this in a dedicate KIP.


@Lucas:

The PAPI is unfortunately (by design) much more open and less
restrictive. If a users has a custom state store, we need some
`Processor` code from them, because we cannot provide a built-in
processor for an unknown store. The overload which won't take a
processor would only work for the built-in key-value store, what I
assume would cover most use-cases, however, we should keep the door open
for other use cases. Otherwise, we disallow this optimization for custom
stores. PAPI is really about flexibility, and yes, with great power
comes great responsibility for the users :)

But this actually highlights a different aspect: the overload not
accepting a custom `Processor` but using a built-in processor, should
not accept a generic `StoreBuilder` but should restrict the type to
`StoreBuilder`?


-Matthias



On 3/6/24 1:14 PM, Lucas Brutschy wrote:

Hey Walker

Thanks for the KIP, and congrats on the KiBiKIP ;)

My main point is that I'd vote against introducing
`reprocessOnRestore`. The behavior for `reprocessOnRestore = false` is
just incorrect and should be removed or deprecated. If we think we
need to keep the old behavior around, renaming the methods, e.g., to
`addGlobalReadOnlyStore`, is a great opportunity to deprecate the old
behavior. But at a first glance, the old behavior just looks like a
bug to me and should just be removed.

So for this KIP, I'd keep two variants as you propose and drop the
boolean parameter, but the two variants will be
   1) a copy-restore variant without custom processing, as you propose.
   2) a process-restore variant with custom processing (parameters the
same as before). This should be combined with a clear warning in the
Javadoc of the performance downside of this approach.

Presentation:
1) I wonder if you could make another pass on the motivation section.
I was lacking some context on this problem, and I think the 

[jira] [Created] (KAFKA-16369) Broker may not shut down when SocketServer fails to bind as Address already in use

2024-03-13 Thread Edoardo Comar (Jira)
Edoardo Comar created KAFKA-16369:
-

 Summary: Broker may not shut down when SocketServer fails to bind 
as Address already in use
 Key: KAFKA-16369
 URL: https://issues.apache.org/jira/browse/KAFKA-16369
 Project: Kafka
  Issue Type: Bug
Reporter: Edoardo Comar


When in Zookeeper mode, if a port the broker should listen to is already bound

the KafkaException: Socket server failed to bind to localhost:9092: Address 
already in use.

is thrown but the Broker continues to startup .

It correctly shuts down when in KRaft mode.

Easy to reproduce when in Zookeper mode with server.config set to listen to 
localhost only
{color:#00}listeners={color}{color:#a31515}PLAINTEXT://localhost:9092{color}
 
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-990: Capability to SUSPEND Tasks on DeserializationException

2024-03-13 Thread Matthias J. Sax
Yes, about the "drop records" case. It's a very common scenario to have 
a repartition step before a windowed aggregation or a join with 
grace-period.



About "add feature vs guard users": it's always a tricky question and 
tradeoff. For this particular KIP, I personally think we should opt to 
not add the feature but guard the users, as I don't see too much value 
compared to the complexity and "traps" it adds. -- It's of course just 
my personal opinion, and if there is a asked from many users to add this 
feature, I would not push back further. As mentioned in my previous 
reply, I don't fully understand the motivation yet; maybe Nick can 
provide more context on it.




In other words, opting for the PAUSE option would simply stall the
task, and upon #resume it would just be discarding that record and then
continuing on with processing


Well, the KIP mentions the ability to either re-try the record (eg, 
after applying some external fix that would allow Kafka Streams to now 
deserialize the record now) or to skip it by advancing the offset. But 
to do this, we need to extend the `resume()` callback to pass in this 
information, making the whole setup and usage of this feature more 
complex, as one needs to so more upfront instrumentation of their custom 
code. -- It's just a technical thing we need to consider if we want to 
move forward, and the KIP should not say "advancing the consumer 
offsets, either via an external tool" because this cannot work. Just 
pointing out incorrect technical assumption, not disregarding that it 
can be done.



About committing: yes, I agree to all what you say, and again it was not 
meant as concern, but just as honest questions about some technical 
details. I think it would be good to consider there trade-offs and 
explain in the KIP why we want to do what. That's all.




-Matthias

On 3/12/24 11:24 PM, Sophie Blee-Goldman wrote:


  I see way too many food-guns and complications that can be introduced.



What is a "food-gun"?? I'm picturing like a spud rifle/potato gun but I
don't think that's what you meant hahaha

I don't feel super strongly one way or another, but I have a few questions
& corrections about some of these complaints/concerns:

If one task

pauses but other keep running, we keep advancing stream-time downstream,
and thus when the task would resume later, there is a very high
probability that records are dropped as window got already closed.


Just to make sure I/everyone understand what you're getting at here, you
would be
referring to the case of a stateful operation downstream of a key-changing
operation
which is in turn downstream of the  "paused" task -- ie with a repartition
separating
the paused task and the task with a windowed aggregation? Each task has its
own
view of stream-time (technically each processor within a task) so the only
way that
delaying one task and not another would affect which records get dropped is
if those
two tasks are rekeyed and the repartitioning results in their outputs being
mixed -- yes?

Anyways I think you make a good case for why pausing a single task -- or
even an entire
instance if others are allowed to continue running -- might make it too
easy for users to
shoot themselves in the foot without understanding the full ramifications.
Of course, there
are already a million ways for users to screw up their app if configured or
operated incorrectly,
and we shouldn't necessarily kill a feature just because some people might
use it when they
shouldn't. Why can't we just document that this feature should not be used
with applications
that include time-sensitive operators?

I also feel like you dismissed the "skip record case" somewhat too easily:


For the "skip record case", it's also not possible to skip over an
offset from outside while the application is running



True, you can't advance the offset from outside the app, but I don't see why
you would want to, much less why you should need to for this to work.
Surely the best way to implement this case would just be for the #resume
API to behave, and work, exactly the same as the handler's CONTINUE
option? In other words, opting for the PAUSE option would simply stall the
task, and upon #resume it would just be discarding that record and then
continuing on with processing (or even committing the offset immediately
after
it, perhaps even asynchronously since it presumably doesn't matter if it
doesn't succeed and the record is picked up again by accident -- as long as
  that doesn't happen repeatedly in an infinite loop, which I don't see why
it would.)

On the subject of committing...

Other questions: if a task would be paused, would we commit the current

offset? What happens if we re-balance? Would we just lose the "pause"
state, and hit the same error again and just pause again?



I was imagining that we would either just wait without committing, or
perhaps
even commit everything up to -- but not including -- the "bad" record when
PAUSE is triggered. 

Re: [DISCUSS] Apache Kafka 3.6.2 release

2024-03-13 Thread Josep Prat
+1 thanks for volunteering!

Best
---

Josep Prat
Open Source Engineering Director, aivenjosep.p...@aiven.io   |
+491715557497 | aiven.io
Aiven Deutschland GmbH
Alexanderufer 3-7, 10117 Berlin
Geschäftsführer: Oskari Saarenmaa & Hannu Valtonen
Amtsgericht Charlottenburg, HRB 209739 B

On Wed, Mar 13, 2024, 17:17 Divij Vaidya  wrote:

> +1
>
> Thank you for volunteering.
>
> --
> Divij Vaidya
>
>
>
> On Wed, Mar 13, 2024 at 4:58 PM Justine Olshan
> 
> wrote:
>
> > Thanks Manikumar!
> > +1 from me
> >
> > Justine
> >
> > On Wed, Mar 13, 2024 at 8:52 AM Manikumar 
> > wrote:
> >
> > > Hi,
> > >
> > > I'd like to volunteer to be the release manager for a bug fix release
> of
> > > the 3.6 line.
> > > If there are no objections, I'll send out the release plan soon.
> > >
> > > Thanks,
> > > Manikumar
> > >
> >
>


Re: [DISCUSS] Apache Kafka 3.6.2 release

2024-03-13 Thread Divij Vaidya
+1

Thank you for volunteering.

--
Divij Vaidya



On Wed, Mar 13, 2024 at 4:58 PM Justine Olshan 
wrote:

> Thanks Manikumar!
> +1 from me
>
> Justine
>
> On Wed, Mar 13, 2024 at 8:52 AM Manikumar 
> wrote:
>
> > Hi,
> >
> > I'd like to volunteer to be the release manager for a bug fix release of
> > the 3.6 line.
> > If there are no objections, I'll send out the release plan soon.
> >
> > Thanks,
> > Manikumar
> >
>


Re: [DISCUSS] Apache Kafka 3.6.2 release

2024-03-13 Thread Justine Olshan
Thanks Manikumar!
+1 from me

Justine

On Wed, Mar 13, 2024 at 8:52 AM Manikumar  wrote:

> Hi,
>
> I'd like to volunteer to be the release manager for a bug fix release of
> the 3.6 line.
> If there are no objections, I'll send out the release plan soon.
>
> Thanks,
> Manikumar
>


Re: [DISCUSS] KIP-1026: Handling producer snapshot when upgrading from < v2.8.0 for Tiered Storage

2024-03-13 Thread Greg Harris
Hi Arpit,

Thanks for the KIP!

I am not familiar with the necessity of producer snapshots, but your
explanation sounds like this should be made optional.

Can you expand the KIP to include the changes that need to be made to
the constructor and getter, and explain more about backwards
compatibility? From the description I can't tell if this change is
backwards-compatible or not.

Thanks,
Greg

On Wed, Mar 13, 2024 at 6:48 AM Arpit Goyal  wrote:
>
> Hi all,
>
> I just wanted to bump up this thread.
>
> The KIP introduces a really small change  and it would not take much of the
> time reviewing it.  This change would enable kafka users to use  tiered
> storage features seamlessly  for the topics migrated  from < 2.8 version
> which currently failed with NullPointerException.
>
> I am waiting for this KIP to get approved and then start working on it.
>
> On Mon, Mar 11, 2024, 14:26 Arpit Goyal  wrote:
>
> > Hi All,
> > Just a Reminder, KIP-1026  is open for discussion.
> > Thanks and Regards
> > Arpit Goyal
> > 8861094754
> >
> >
> > On Sat, Mar 9, 2024 at 9:27 AM Arpit Goyal 
> > wrote:
> >
> >> Hi All,
> >>
> >> I have created KIP-1026 for handling producerSnapshot empty scenarios
> >> when the topic is upgraded from the kafka  < 2.8 version.
> >>
> >>
> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1026%3A+Handling+producer+snapshot+when+upgrading+from+%3C+v2.8.0+for+Tiered+Storage
> >>
> >> Feedback and suggestions are welcome.
> >>
> >> Thanks and Regards
> >> Arpit Goyal
> >> 8861094754
> >>
> >


[DISCUSS] Apache Kafka 3.6.2 release

2024-03-13 Thread Manikumar
Hi,

I'd like to volunteer to be the release manager for a bug fix release of
the 3.6 line.
If there are no objections, I'll send out the release plan soon.

Thanks,
Manikumar


[jira] [Resolved] (KAFKA-16171) Controller failover during ZK migration can prevent metadata updates to ZK brokers

2024-03-13 Thread David Arthur (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16171?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Arthur resolved KAFKA-16171.
--
Resolution: Fixed

> Controller failover during ZK migration can prevent metadata updates to ZK 
> brokers
> --
>
> Key: KAFKA-16171
> URL: https://issues.apache.org/jira/browse/KAFKA-16171
> Project: Kafka
>  Issue Type: Bug
>  Components: controller, kraft, migration
>Affects Versions: 3.6.0, 3.7.0, 3.6.1
>Reporter: David Arthur
>Assignee: David Arthur
>Priority: Blocker
> Fix For: 3.6.2, 3.7.0
>
>
> h2. Description
> During the ZK migration, after KRaft becomes the active controller we enter a 
> state called hybrid mode. This means we have a mixture of ZK and KRaft 
> brokers. The KRaft controller updates the ZK brokers using the deprecated 
> controller RPCs (LeaderAndIsr, UpdateMetadata, etc). 
>  
> A race condition exists where the KRaft controller will get stuck in a retry 
> loop while initializing itself after a failover which prevents it from 
> sending these RPCs to ZK brokers.
> h2. Impact
> Since the KRaft controller cannot send any RPCs to the ZK brokers, the ZK 
> brokers will not receive any metadata updates. The ZK brokers will be able to 
> send requests to the controller (such as AlterPartitions), but the metadata 
> updates which come as a result of those requests will never be seen. This 
> essentially looks like the controller is unavailable from the ZK brokers 
> perspective.
> h2. Detection and Mitigation
> This bug can be seen by observing failed ZK writes from a recently elected 
> controller.
> The tell-tale error message is:
> {code:java}
> Check op on KRaft Migration ZNode failed. Expected zkVersion = 507823. This 
> indicates that another KRaft controller is making writes to ZooKeeper. {code}
> with a stacktrace like:
> {noformat}
> java.lang.RuntimeException: Check op on KRaft Migration ZNode failed. 
> Expected zkVersion = 507823. This indicates that another KRaft controller is 
> making writes to ZooKeeper.
>   at 
> kafka.zk.KafkaZkClient.handleUnwrappedMigrationResult$1(KafkaZkClient.scala:2613)
>   at 
> kafka.zk.KafkaZkClient.unwrapMigrationResponse$1(KafkaZkClient.scala:2639)
>   at 
> kafka.zk.KafkaZkClient.$anonfun$retryMigrationRequestsUntilConnected$2(KafkaZkClient.scala:2664)
>   at 
> scala.collection.StrictOptimizedIterableOps.map(StrictOptimizedIterableOps.scala:100)
>   at 
> scala.collection.StrictOptimizedIterableOps.map$(StrictOptimizedIterableOps.scala:87)
>   at scala.collection.mutable.ArrayBuffer.map(ArrayBuffer.scala:43)
>   at 
> kafka.zk.KafkaZkClient.retryMigrationRequestsUntilConnected(KafkaZkClient.scala:2664)
>   at 
> kafka.zk.migration.ZkTopicMigrationClient.$anonfun$createTopic$1(ZkTopicMigrationClient.scala:158)
>   at 
> kafka.zk.migration.ZkTopicMigrationClient.createTopic(ZkTopicMigrationClient.scala:141)
>   at 
> org.apache.kafka.metadata.migration.KRaftMigrationZkWriter.lambda$handleTopicsSnapshot$27(KRaftMigrationZkWriter.java:441)
>   at 
> org.apache.kafka.metadata.migration.KRaftMigrationDriver.applyMigrationOperation(KRaftMigrationDriver.java:262)
>   at 
> org.apache.kafka.metadata.migration.KRaftMigrationDriver.access$300(KRaftMigrationDriver.java:64)
>   at 
> org.apache.kafka.metadata.migration.KRaftMigrationDriver$SyncKRaftMetadataEvent.lambda$run$0(KRaftMigrationDriver.java:791)
>   at 
> org.apache.kafka.metadata.migration.KRaftMigrationDriver.lambda$countingOperationConsumer$6(KRaftMigrationDriver.java:880)
>   at 
> org.apache.kafka.metadata.migration.KRaftMigrationZkWriter.lambda$handleTopicsSnapshot$28(KRaftMigrationZkWriter.java:438)
>   at java.base/java.lang.Iterable.forEach(Iterable.java:75)
>   at 
> org.apache.kafka.metadata.migration.KRaftMigrationZkWriter.handleTopicsSnapshot(KRaftMigrationZkWriter.java:436)
>   at 
> org.apache.kafka.metadata.migration.KRaftMigrationZkWriter.handleSnapshot(KRaftMigrationZkWriter.java:115)
>   at 
> org.apache.kafka.metadata.migration.KRaftMigrationDriver$SyncKRaftMetadataEvent.run(KRaftMigrationDriver.java:790)
>   at 
> org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127)
>   at 
> org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210)
>   at 
> org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181)
>   at java.base/java.lang.Thread.run(Thread.java:1583)
>   at 
> org.apache.kafka.common.utils.KafkaThread.run(KafkaThread.java:66){noformat}
> To mitigate this problem, a new KRaft controller should be elected. This can 
> be done by restarting the problematic active controller. To verify that the 
> new 

Re: [DISCUSS] KIP-1026: Handling producer snapshot when upgrading from < v2.8.0 for Tiered Storage

2024-03-13 Thread Arpit Goyal
Hi all,

I just wanted to bump up this thread.

The KIP introduces a really small change  and it would not take much of the
time reviewing it.  This change would enable kafka users to use  tiered
storage features seamlessly  for the topics migrated  from < 2.8 version
which currently failed with NullPointerException.

I am waiting for this KIP to get approved and then start working on it.

On Mon, Mar 11, 2024, 14:26 Arpit Goyal  wrote:

> Hi All,
> Just a Reminder, KIP-1026  is open for discussion.
> Thanks and Regards
> Arpit Goyal
> 8861094754
>
>
> On Sat, Mar 9, 2024 at 9:27 AM Arpit Goyal 
> wrote:
>
>> Hi All,
>>
>> I have created KIP-1026 for handling producerSnapshot empty scenarios
>> when the topic is upgraded from the kafka  < 2.8 version.
>>
>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1026%3A+Handling+producer+snapshot+when+upgrading+from+%3C+v2.8.0+for+Tiered+Storage
>>
>> Feedback and suggestions are welcome.
>>
>> Thanks and Regards
>> Arpit Goyal
>> 8861094754
>>
>


Re: [DISCUSS] Minimum constraint for segment.ms

2024-03-13 Thread Divij Vaidya
+ users@kafka

Hi users of Apache Kafka

With the upcoming 4.0 release, we have an opportunity to improve the
constraints and default values for various Kafka configurations.

We are soliciting your feedback and suggestions on configurations where the
default values and/or constraints should be adjusted. Please reply in this
thread directly.

--
Divij Vaidya
Apache Kafka PMC



On Wed, Mar 13, 2024 at 12:56 PM Divij Vaidya 
wrote:

> Thanks for the discussion folks. I have started a KIP
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1030%3A+Change+constraints+and+default+values+for+various+configurations
> to keep track of the changes that we are discussion. Please consider this
> as a collaborative work-in-progress KIP and once it is ready to be
> published, we can start a discussion thread on it.
>
> I am also going to start a thread to solicit feedback from users@ mailing
> list as well.
>
> --
> Divij Vaidya
>
>
>
> On Wed, Mar 13, 2024 at 12:55 PM Christopher Shannon <
> christopher.l.shan...@gmail.com> wrote:
>
>> I think it's a great idea to raise a KIP to look at adjusting defaults and
>> minimum/maximum config values for version 4.0.
>>
>> As pointed out, the minimum values for segment.ms and segment.bytes don't
>> make sense and would probably bring down a cluster pretty quickly if set
>> that low, so version 4.0 is a good time to fix it and to also look at the
>> other configs as well for adjustments.
>>
>> On Wed, Mar 13, 2024 at 4:39 AM Sergio Daniel Troiano
>>  wrote:
>>
>> > hey guys,
>> >
>> > Regarding to num.recovery.threads.per.data.dir: I agree, in our company
>> we
>> > use the number of vCPUs to do so as this is not competing with ready
>> > cluster traffic.
>> >
>> >
>> > On Wed, 13 Mar 2024 at 09:29, Luke Chen  wrote:
>> >
>> > > Hi Divij,
>> > >
>> > > Thanks for raising this.
>> > > The valid minimum value 1 for `segment.ms` is completely
>> unreasonable.
>> > > Similarly for `segment.bytes`, `metadata.log.segment.ms`,
>> > > `metadata.log.segment.bytes`.
>> > >
>> > > In addition to that, there are also some config default values we'd
>> like
>> > to
>> > > propose to change in v4.0.
>> > > We can collect more comments from the community, and come out with a
>> KIP
>> > > for them.
>> > >
>> > > 1. num.recovery.threads.per.data.dir:
>> > > The current default value is 1. But the log recovery is happening
>> before
>> > > brokers are in ready state, which means, we should use all the
>> available
>> > > resource to speed up the log recovery to bring the broker to ready
>> state
>> > > soon. Default value should be... maybe 4 (to be decided)?
>> > >
>> > > 2. Other configs might be able to consider to change the default, but
>> > open
>> > > for comments:
>> > >2.1. num.replica.fetchers: default is 1, but that's not enough when
>> > > there are multiple partitions in the cluster
>> > >2.2. `socket.send.buffer.bytes`/`socket.receive.buffer.bytes`:
>> > > Currently, we set 100kb as default value, but that's not enough for
>> > > high-speed network.
>> > >
>> > > Thank you.
>> > > Luke
>> > >
>> > >
>> > > On Tue, Mar 12, 2024 at 1:32 AM Divij Vaidya > >
>> > > wrote:
>> > >
>> > > > Hey folks
>> > > >
>> > > > Before I file a KIP to change this in 4.0, I wanted to understand
>> the
>> > > > historical context for the value of the following setting.
>> > > >
>> > > > Currently, segment.ms minimum threshold is set to 1ms [1].
>> > > >
>> > > > Segments are expensive. Every segment uses multiple file descriptors
>> > and
>> > > > it's easy to run out of OS limits when creating a large number of
>> > > segments.
>> > > > Large number of segments also delays log loading on startup because
>> of
>> > > > expensive operations such as iterating through all directories &
>> > > > conditionally loading all producer state.
>> > > >
>> > > > I am currently not aware of a reason as to why someone might want to
>> > work
>> > > > with a segment.ms of less than ~10s (number chosen arbitrary that
>> > looks
>> > > > sane)
>> > > >
>> > > > What was the historical context of setting the minimum threshold to
>> 1ms
>> > > for
>> > > > this setting?
>> > > >
>> > > > [1]
>> > https://kafka.apache.org/documentation.html#topicconfigs_segment.ms
>> > > >
>> > > > --
>> > > > Divij Vaidya
>> > > >
>> > >
>> >
>>
>


Re: [DISCUSS] KIP-1025: Optionally URL-encode clientID and clientSecret in authorization header

2024-03-13 Thread Nelson B.
Hi all,

I just wanted to bump up this thread.

The KIP introduces a really small change and PR is already ready and only
waiting for this KIP to get approved to be merged.

Thanks,

On Wed, Mar 6, 2024 at 12:26 PM Nelson B.  wrote:

> Hi all,
>
> I would like to start a discussion on KIP-1025, which would optionally
> URL-encode clientID and clientSecret in the authorization header
>
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1025%3A+Optionally+URL-encode+clientID+and+clientSecret+in+authorization+header
>
> Best,
> Nelson B.
>


Re: [VOTE] KIP-939: Support Participation in 2PC

2024-03-13 Thread Omnia Ibrahim
I had a look at the discussion thread and the KIP looks exciting.
+1 non-binding

Best
Omnia

On 1 Dec 2023, at 19:06, Artem Livshits 
wrote:

Hello,

This is a voting thread for
https://cwiki.apache.org/confluence/display/KAFKA/KIP-939%3A+Support+Participation+in+2PC
.

The KIP proposes extending Kafka transaction support (that already uses 2PC
under the hood) to enable atomicity of dual writes to Kafka and an external
database, and helps to fix a long standing Flink issue.

An example of code that uses the dual write recipe with JDBC and should
work for most SQL databases is here
https://github.com/apache/kafka/pull/14231.

The FLIP for the sister fix in Flink is here
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=255071710

-Artem


Re: [DISCUSS] Minimum constraint for segment.ms

2024-03-13 Thread Divij Vaidya
Thanks for the discussion folks. I have started a KIP
https://cwiki.apache.org/confluence/display/KAFKA/KIP-1030%3A+Change+constraints+and+default+values+for+various+configurations
to keep track of the changes that we are discussion. Please consider this
as a collaborative work-in-progress KIP and once it is ready to be
published, we can start a discussion thread on it.

I am also going to start a thread to solicit feedback from users@ mailing
list as well.

--
Divij Vaidya



On Wed, Mar 13, 2024 at 12:55 PM Christopher Shannon <
christopher.l.shan...@gmail.com> wrote:

> I think it's a great idea to raise a KIP to look at adjusting defaults and
> minimum/maximum config values for version 4.0.
>
> As pointed out, the minimum values for segment.ms and segment.bytes don't
> make sense and would probably bring down a cluster pretty quickly if set
> that low, so version 4.0 is a good time to fix it and to also look at the
> other configs as well for adjustments.
>
> On Wed, Mar 13, 2024 at 4:39 AM Sergio Daniel Troiano
>  wrote:
>
> > hey guys,
> >
> > Regarding to num.recovery.threads.per.data.dir: I agree, in our company
> we
> > use the number of vCPUs to do so as this is not competing with ready
> > cluster traffic.
> >
> >
> > On Wed, 13 Mar 2024 at 09:29, Luke Chen  wrote:
> >
> > > Hi Divij,
> > >
> > > Thanks for raising this.
> > > The valid minimum value 1 for `segment.ms` is completely unreasonable.
> > > Similarly for `segment.bytes`, `metadata.log.segment.ms`,
> > > `metadata.log.segment.bytes`.
> > >
> > > In addition to that, there are also some config default values we'd
> like
> > to
> > > propose to change in v4.0.
> > > We can collect more comments from the community, and come out with a
> KIP
> > > for them.
> > >
> > > 1. num.recovery.threads.per.data.dir:
> > > The current default value is 1. But the log recovery is happening
> before
> > > brokers are in ready state, which means, we should use all the
> available
> > > resource to speed up the log recovery to bring the broker to ready
> state
> > > soon. Default value should be... maybe 4 (to be decided)?
> > >
> > > 2. Other configs might be able to consider to change the default, but
> > open
> > > for comments:
> > >2.1. num.replica.fetchers: default is 1, but that's not enough when
> > > there are multiple partitions in the cluster
> > >2.2. `socket.send.buffer.bytes`/`socket.receive.buffer.bytes`:
> > > Currently, we set 100kb as default value, but that's not enough for
> > > high-speed network.
> > >
> > > Thank you.
> > > Luke
> > >
> > >
> > > On Tue, Mar 12, 2024 at 1:32 AM Divij Vaidya 
> > > wrote:
> > >
> > > > Hey folks
> > > >
> > > > Before I file a KIP to change this in 4.0, I wanted to understand the
> > > > historical context for the value of the following setting.
> > > >
> > > > Currently, segment.ms minimum threshold is set to 1ms [1].
> > > >
> > > > Segments are expensive. Every segment uses multiple file descriptors
> > and
> > > > it's easy to run out of OS limits when creating a large number of
> > > segments.
> > > > Large number of segments also delays log loading on startup because
> of
> > > > expensive operations such as iterating through all directories &
> > > > conditionally loading all producer state.
> > > >
> > > > I am currently not aware of a reason as to why someone might want to
> > work
> > > > with a segment.ms of less than ~10s (number chosen arbitrary that
> > looks
> > > > sane)
> > > >
> > > > What was the historical context of setting the minimum threshold to
> 1ms
> > > for
> > > > this setting?
> > > >
> > > > [1]
> > https://kafka.apache.org/documentation.html#topicconfigs_segment.ms
> > > >
> > > > --
> > > > Divij Vaidya
> > > >
> > >
> >
>


Re: [DISCUSS] Minimum constraint for segment.ms

2024-03-13 Thread Christopher Shannon
I think it's a great idea to raise a KIP to look at adjusting defaults and
minimum/maximum config values for version 4.0.

As pointed out, the minimum values for segment.ms and segment.bytes don't
make sense and would probably bring down a cluster pretty quickly if set
that low, so version 4.0 is a good time to fix it and to also look at the
other configs as well for adjustments.

On Wed, Mar 13, 2024 at 4:39 AM Sergio Daniel Troiano
 wrote:

> hey guys,
>
> Regarding to num.recovery.threads.per.data.dir: I agree, in our company we
> use the number of vCPUs to do so as this is not competing with ready
> cluster traffic.
>
>
> On Wed, 13 Mar 2024 at 09:29, Luke Chen  wrote:
>
> > Hi Divij,
> >
> > Thanks for raising this.
> > The valid minimum value 1 for `segment.ms` is completely unreasonable.
> > Similarly for `segment.bytes`, `metadata.log.segment.ms`,
> > `metadata.log.segment.bytes`.
> >
> > In addition to that, there are also some config default values we'd like
> to
> > propose to change in v4.0.
> > We can collect more comments from the community, and come out with a KIP
> > for them.
> >
> > 1. num.recovery.threads.per.data.dir:
> > The current default value is 1. But the log recovery is happening before
> > brokers are in ready state, which means, we should use all the available
> > resource to speed up the log recovery to bring the broker to ready state
> > soon. Default value should be... maybe 4 (to be decided)?
> >
> > 2. Other configs might be able to consider to change the default, but
> open
> > for comments:
> >2.1. num.replica.fetchers: default is 1, but that's not enough when
> > there are multiple partitions in the cluster
> >2.2. `socket.send.buffer.bytes`/`socket.receive.buffer.bytes`:
> > Currently, we set 100kb as default value, but that's not enough for
> > high-speed network.
> >
> > Thank you.
> > Luke
> >
> >
> > On Tue, Mar 12, 2024 at 1:32 AM Divij Vaidya 
> > wrote:
> >
> > > Hey folks
> > >
> > > Before I file a KIP to change this in 4.0, I wanted to understand the
> > > historical context for the value of the following setting.
> > >
> > > Currently, segment.ms minimum threshold is set to 1ms [1].
> > >
> > > Segments are expensive. Every segment uses multiple file descriptors
> and
> > > it's easy to run out of OS limits when creating a large number of
> > segments.
> > > Large number of segments also delays log loading on startup because of
> > > expensive operations such as iterating through all directories &
> > > conditionally loading all producer state.
> > >
> > > I am currently not aware of a reason as to why someone might want to
> work
> > > with a segment.ms of less than ~10s (number chosen arbitrary that
> looks
> > > sane)
> > >
> > > What was the historical context of setting the minimum threshold to 1ms
> > for
> > > this setting?
> > >
> > > [1]
> https://kafka.apache.org/documentation.html#topicconfigs_segment.ms
> > >
> > > --
> > > Divij Vaidya
> > >
> >
>


[jira] [Created] (KAFKA-16368) Change constraints and default values for various configurations

2024-03-13 Thread Divij Vaidya (Jira)
Divij Vaidya created KAFKA-16368:


 Summary: Change constraints and default values for various 
configurations
 Key: KAFKA-16368
 URL: https://issues.apache.org/jira/browse/KAFKA-16368
 Project: Kafka
  Issue Type: Improvement
Reporter: Divij Vaidya
 Fix For: 4.0.0


This Jira is a parent item to track all the defaults and/or constraints that we 
would like to change with Kafka 4.0. This Jira will be associated with a KIP.

Currently, we are gathering feedback from the community on the configurations 
that don't have sane defaults.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] Minimum constraint for segment.ms

2024-03-13 Thread Sergio Daniel Troiano
hey guys,

Regarding to num.recovery.threads.per.data.dir: I agree, in our company we
use the number of vCPUs to do so as this is not competing with ready
cluster traffic.


On Wed, 13 Mar 2024 at 09:29, Luke Chen  wrote:

> Hi Divij,
>
> Thanks for raising this.
> The valid minimum value 1 for `segment.ms` is completely unreasonable.
> Similarly for `segment.bytes`, `metadata.log.segment.ms`,
> `metadata.log.segment.bytes`.
>
> In addition to that, there are also some config default values we'd like to
> propose to change in v4.0.
> We can collect more comments from the community, and come out with a KIP
> for them.
>
> 1. num.recovery.threads.per.data.dir:
> The current default value is 1. But the log recovery is happening before
> brokers are in ready state, which means, we should use all the available
> resource to speed up the log recovery to bring the broker to ready state
> soon. Default value should be... maybe 4 (to be decided)?
>
> 2. Other configs might be able to consider to change the default, but open
> for comments:
>2.1. num.replica.fetchers: default is 1, but that's not enough when
> there are multiple partitions in the cluster
>2.2. `socket.send.buffer.bytes`/`socket.receive.buffer.bytes`:
> Currently, we set 100kb as default value, but that's not enough for
> high-speed network.
>
> Thank you.
> Luke
>
>
> On Tue, Mar 12, 2024 at 1:32 AM Divij Vaidya 
> wrote:
>
> > Hey folks
> >
> > Before I file a KIP to change this in 4.0, I wanted to understand the
> > historical context for the value of the following setting.
> >
> > Currently, segment.ms minimum threshold is set to 1ms [1].
> >
> > Segments are expensive. Every segment uses multiple file descriptors and
> > it's easy to run out of OS limits when creating a large number of
> segments.
> > Large number of segments also delays log loading on startup because of
> > expensive operations such as iterating through all directories &
> > conditionally loading all producer state.
> >
> > I am currently not aware of a reason as to why someone might want to work
> > with a segment.ms of less than ~10s (number chosen arbitrary that looks
> > sane)
> >
> > What was the historical context of setting the minimum threshold to 1ms
> for
> > this setting?
> >
> > [1] https://kafka.apache.org/documentation.html#topicconfigs_segment.ms
> >
> > --
> > Divij Vaidya
> >
>


Re: [DISCUSS] Minimum constraint for segment.ms

2024-03-13 Thread Luke Chen
Hi Divij,

Thanks for raising this.
The valid minimum value 1 for `segment.ms` is completely unreasonable.
Similarly for `segment.bytes`, `metadata.log.segment.ms`,
`metadata.log.segment.bytes`.

In addition to that, there are also some config default values we'd like to
propose to change in v4.0.
We can collect more comments from the community, and come out with a KIP
for them.

1. num.recovery.threads.per.data.dir:
The current default value is 1. But the log recovery is happening before
brokers are in ready state, which means, we should use all the available
resource to speed up the log recovery to bring the broker to ready state
soon. Default value should be... maybe 4 (to be decided)?

2. Other configs might be able to consider to change the default, but open
for comments:
   2.1. num.replica.fetchers: default is 1, but that's not enough when
there are multiple partitions in the cluster
   2.2. `socket.send.buffer.bytes`/`socket.receive.buffer.bytes`:
Currently, we set 100kb as default value, but that's not enough for
high-speed network.

Thank you.
Luke


On Tue, Mar 12, 2024 at 1:32 AM Divij Vaidya 
wrote:

> Hey folks
>
> Before I file a KIP to change this in 4.0, I wanted to understand the
> historical context for the value of the following setting.
>
> Currently, segment.ms minimum threshold is set to 1ms [1].
>
> Segments are expensive. Every segment uses multiple file descriptors and
> it's easy to run out of OS limits when creating a large number of segments.
> Large number of segments also delays log loading on startup because of
> expensive operations such as iterating through all directories &
> conditionally loading all producer state.
>
> I am currently not aware of a reason as to why someone might want to work
> with a segment.ms of less than ~10s (number chosen arbitrary that looks
> sane)
>
> What was the historical context of setting the minimum threshold to 1ms for
> this setting?
>
> [1] https://kafka.apache.org/documentation.html#topicconfigs_segment.ms
>
> --
> Divij Vaidya
>


Re: [VOTE] KIP-974: Docker Image for GraalVM based Native Kafka Broker

2024-03-13 Thread Krishna Agarwal
Hi all,

Thanks for participating in the discussion and voting! KIP-974 has been
accepted with the following +1 votes:

- Justine Olshan (binding)
- Ismael Juma (binding)
- Manikumar (binding)
- Federico Valeri (non-binding)

Regards,
Krishna

>


Re: [DISCUSS] KIP-990: Capability to SUSPEND Tasks on DeserializationException

2024-03-13 Thread Sophie Blee-Goldman
>
>  I see way too many food-guns and complications that can be introduced.


What is a "food-gun"?? I'm picturing like a spud rifle/potato gun but I
don't think that's what you meant hahaha

I don't feel super strongly one way or another, but I have a few questions
& corrections about some of these complaints/concerns:

If one task
> pauses but other keep running, we keep advancing stream-time downstream,
> and thus when the task would resume later, there is a very high
> probability that records are dropped as window got already closed.

Just to make sure I/everyone understand what you're getting at here, you
would be
referring to the case of a stateful operation downstream of a key-changing
operation
which is in turn downstream of the  "paused" task -- ie with a repartition
separating
the paused task and the task with a windowed aggregation? Each task has its
own
view of stream-time (technically each processor within a task) so the only
way that
delaying one task and not another would affect which records get dropped is
if those
two tasks are rekeyed and the repartitioning results in their outputs being
mixed -- yes?

Anyways I think you make a good case for why pausing a single task -- or
even an entire
instance if others are allowed to continue running -- might make it too
easy for users to
shoot themselves in the foot without understanding the full ramifications.
Of course, there
are already a million ways for users to screw up their app if configured or
operated incorrectly,
and we shouldn't necessarily kill a feature just because some people might
use it when they
shouldn't. Why can't we just document that this feature should not be used
with applications
that include time-sensitive operators?

I also feel like you dismissed the "skip record case" somewhat too easily:

> For the "skip record case", it's also not possible to skip over an
> offset from outside while the application is running


True, you can't advance the offset from outside the app, but I don't see why
you would want to, much less why you should need to for this to work.
Surely the best way to implement this case would just be for the #resume
API to behave, and work, exactly the same as the handler's CONTINUE
option? In other words, opting for the PAUSE option would simply stall the
task, and upon #resume it would just be discarding that record and then
continuing on with processing (or even committing the offset immediately
after
it, perhaps even asynchronously since it presumably doesn't matter if it
doesn't succeed and the record is picked up again by accident -- as long as
 that doesn't happen repeatedly in an infinite loop, which I don't see why
it would.)

On the subject of committing...

Other questions: if a task would be paused, would we commit the current
> offset? What happens if we re-balance? Would we just lose the "pause"
> state, and hit the same error again and just pause again?


I was imagining that we would either just wait without committing, or
perhaps
even commit everything up to -- but not including -- the "bad" record when
PAUSE is triggered. Again, if we rebalance and "lose the pause" then
we'll just attempt to process it again, fail, and end up back in PAUSE. This
is no different than how successful processing works, no? Who cares if a
rebalance happens to strike and causes it to be PAUSED again?

All in all, I feel like these concerns are all essentially "true", but to
me they
just seem like implementation or design decisions and none of them strike
them as posing an unsolvable problem for this feature. But maybe I'm
just lacking in imagination...

Thoughts?


On Fri, Mar 8, 2024 at 5:30 PM Matthias J. Sax  wrote:

> Hey Nick,
>
> I am sorry that I have to say that I am not a fan of this KIP. I see way
> too many food-guns and complications that can be introduced.
>
> I am also not sure if I understand the motivation. You say, CONTINUE and
> FAIL is not good enough, but don't describe in detail why? If we
> understand the actual problem better, it might also get clear how
> task-pausing would help to address the problem.
>
>
> The main problem I see, as already mentioned by Sophie, it's about time
> synchronization. However, its not limited to joins, but affect all
> time-based operations, ie, also all windowed aggregations. If one task
> pauses but other keep running, we keep advancing stream-time downstream,
> and thus when the task would resume later, there is a very high
> probability that records are dropped as window got already closed.
>
> For the runtime itself, we also cannot really do a cascading downstream
> pause, because the runtime does not know anything about the semantics of
> operators. We don't know if we execute a DSL operator or a PAPI
> operator. (We could maybe track all downsteam tasks independent of
> semantics, but in the end it might just imply we could also just pause
> all task...)
>
> For the "skip record case", it's also not possible to skip over an
> offset from outside