Re: [DISCUSS] Minimum constraint for segment.ms
One use case I see for setting the `segment.bytes` to 1 is to delete all the records from the topic. We can mention about it in the doc to use the `kafka-delete-records` API instead. On Wed, Mar 13, 2024 at 6:59 PM Divij Vaidya wrote: > + users@kafka > > Hi users of Apache Kafka > > With the upcoming 4.0 release, we have an opportunity to improve the > constraints and default values for various Kafka configurations. > > We are soliciting your feedback and suggestions on configurations where the > default values and/or constraints should be adjusted. Please reply in this > thread directly. > > -- > Divij Vaidya > Apache Kafka PMC > > > > On Wed, Mar 13, 2024 at 12:56 PM Divij Vaidya > wrote: > > > Thanks for the discussion folks. I have started a KIP > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-1030%3A+Change+constraints+and+default+values+for+various+configurations > > to keep track of the changes that we are discussion. Please consider this > > as a collaborative work-in-progress KIP and once it is ready to be > > published, we can start a discussion thread on it. > > > > I am also going to start a thread to solicit feedback from users@ > mailing > > list as well. > > > > -- > > Divij Vaidya > > > > > > > > On Wed, Mar 13, 2024 at 12:55 PM Christopher Shannon < > > christopher.l.shan...@gmail.com> wrote: > > > >> I think it's a great idea to raise a KIP to look at adjusting defaults > and > >> minimum/maximum config values for version 4.0. > >> > >> As pointed out, the minimum values for segment.ms and segment.bytes > don't > >> make sense and would probably bring down a cluster pretty quickly if set > >> that low, so version 4.0 is a good time to fix it and to also look at > the > >> other configs as well for adjustments. > >> > >> On Wed, Mar 13, 2024 at 4:39 AM Sergio Daniel Troiano > >> wrote: > >> > >> > hey guys, > >> > > >> > Regarding to num.recovery.threads.per.data.dir: I agree, in our > company > >> we > >> > use the number of vCPUs to do so as this is not competing with ready > >> > cluster traffic. > >> > > >> > > >> > On Wed, 13 Mar 2024 at 09:29, Luke Chen wrote: > >> > > >> > > Hi Divij, > >> > > > >> > > Thanks for raising this. > >> > > The valid minimum value 1 for `segment.ms` is completely > >> unreasonable. > >> > > Similarly for `segment.bytes`, `metadata.log.segment.ms`, > >> > > `metadata.log.segment.bytes`. > >> > > > >> > > In addition to that, there are also some config default values we'd > >> like > >> > to > >> > > propose to change in v4.0. > >> > > We can collect more comments from the community, and come out with a > >> KIP > >> > > for them. > >> > > > >> > > 1. num.recovery.threads.per.data.dir: > >> > > The current default value is 1. But the log recovery is happening > >> before > >> > > brokers are in ready state, which means, we should use all the > >> available > >> > > resource to speed up the log recovery to bring the broker to ready > >> state > >> > > soon. Default value should be... maybe 4 (to be decided)? > >> > > > >> > > 2. Other configs might be able to consider to change the default, > but > >> > open > >> > > for comments: > >> > >2.1. num.replica.fetchers: default is 1, but that's not enough > when > >> > > there are multiple partitions in the cluster > >> > >2.2. `socket.send.buffer.bytes`/`socket.receive.buffer.bytes`: > >> > > Currently, we set 100kb as default value, but that's not enough for > >> > > high-speed network. > >> > > > >> > > Thank you. > >> > > Luke > >> > > > >> > > > >> > > On Tue, Mar 12, 2024 at 1:32 AM Divij Vaidya < > divijvaidy...@gmail.com > >> > > >> > > wrote: > >> > > > >> > > > Hey folks > >> > > > > >> > > > Before I file a KIP to change this in 4.0, I wanted to understand > >> the > >> > > > historical context for the value of the following setting. > >> > > > > >> > > > Currently, segment.ms minimum threshold is set to 1ms [1]. > >> > > > > >> > > > Segments are expensive. Every segment uses multiple file > descriptors > >> > and > >> > > > it's easy to run out of OS limits when creating a large number of > >> > > segments. > >> > > > Large number of segments also delays log loading on startup > because > >> of > >> > > > expensive operations such as iterating through all directories & > >> > > > conditionally loading all producer state. > >> > > > > >> > > > I am currently not aware of a reason as to why someone might want > to > >> > work > >> > > > with a segment.ms of less than ~10s (number chosen arbitrary that > >> > looks > >> > > > sane) > >> > > > > >> > > > What was the historical context of setting the minimum threshold > to > >> 1ms > >> > > for > >> > > > this setting? > >> > > > > >> > > > [1] > >> > https://kafka.apache.org/documentation.html#topicconfigs_segment.ms > >> > > > > >> > > > -- > >> > > > Divij Vaidya > >> > > > > >> > > > >> > > >> > > >
Re: [DISCUSS] KIP-989: RocksDB Iterator Metrics
About your last two points: I completely agree that we should try to make this independent of RocksDB, and should probably adopt a general philosophy of being store-implementation agnostic unless there is good reason to focus on a particular store type: eg if it was only possible to implement for certain stores, or only made sense in the context of a certain store type but not necessarily stores in general. While leaking memory due to unclosed iterators on RocksDB stores is certainly the most common issue, I think Matthias sufficiently demonstrated that the problem of leaking iterators is not actually unique to RocksDB, and we should consider including in-memory stores at the very least. I also think that at this point, we may as well just implement the metrics for *all* store types, whether rocksdb or in-memory or custom. Not just because it probably applies to all store types (leaking iterators are rarely a good thing!) but because I imagine the best way to implement this would be to do so at the high-level iterator rather than implementing it separately for each specific iterator implementation for every store type. That said, I haven't thought all that carefully about the implementation yet -- it just strikes me as easiest to do at the top level of the store hierarchy rather than at the bottom. My gut instinct may very well be wrong, but that's what it's saying On Thu, Mar 7, 2024 at 10:43 AM Matthias J. Sax wrote: > Seems I am late to this party. Can we pick this up again aiming for 3.8 > release? I think it would be a great addition. Few comments: > > > - I think it does make sense to report `iterator-duration-avg` and > `iterator-duration-max` for all *closed* iterators -- it just seems to > be a useful metric (wondering if this would be _overall_ or bounded to > some time window?) > > - About the duration iterators are currently open, I believe the only > useful way is to report the "oldest iterator", ie, the smallest iterator > open-time, of all currently open-iterator? We all agree that in general, > leaking iterator would bump the count metric, and if there is a few ones > which are not closed and open for a long time, it seem sufficient to > detect the single oldest one for alerting purpose? > > - What I don't like about the KIP is it focus on RocksDB. I don't think > we should build on the internal RocksDB counters as proposed (I guess, > we could still expose them, similar to other RocksDB metrics which we > expose already). However, for this new metric, we should track it > ourselves and thus make it independent of RocksDB -- in the end, an > in-memory store could also leak memory (and kill a JVM with an > out-of-memory error) and we should be able to track it. > > - Not sure if we would like to add support for custom stores, to allow > them to register their iterators with this metric? Or would this not be > necessary, because custom stores could just register a custom metric > about it to begin with? > > > > -Matthias > > On 10/25/23 4:41 PM, Sophie Blee-Goldman wrote: > >> > >> If we used "iterator-duration-max", for > >> example, would it not be confusing that it includes Iterators that are > >> still open, and therefore the duration is not yet known? > > > > > > 1. Ah, I think I understand your concern better now -- I totally agree > that > > a > > "iterator-duration-max" metric would be confusing/misleading. I was > > thinking about it a bit differently, something more akin to the > > "last-rebalance-seconds-ago" consumer metric. As the name suggests, > > that basically just tracks how long the consumer has gone without > > rebalancing -- it doesn't purport to represent the actual duration > between > > rebalances, just the current time since the last one. The hard part is > > really > > in choosing a name that reflects this -- maybe you have some better ideas > > but off the top of my head, perhaps something like > "iterator-lifetime-max"? > > > > 2. I'm not quite sure how to interpret the "iterator-duration-total" > metric > > -- what exactly does it mean to add up all the iterator durations? For > > some context, while this is not a hard-and-fast rule, in general you'll > > find that Kafka/Streams metrics tend to come in pairs of avg/max or > > rate/total. Something that you might measure the avg for usually is > > also useful to measure the max, whereas a total metric is probably > > also useful as a rate but not so much as an avg. I actually think this > > is part of why it feels like it makes so much sense to include a "max" > > version of this metric, as Lucas suggested, even if the name of > > "iterator-duration-max" feels misleading. Ultimately the metric names > > are up to you, but for this reason, I would personally advocate for > > just going with an "iterator-duration-avg" and "iterator-duration-max" > > > > I did see your example in which you mention one could monitor the > > rate of change of the "-total" metric. While this does make sense to > > me, if the
Re: [DISCUSS] KIP-990: Capability to SUSPEND Tasks on DeserializationException
> > Well, the KIP mentions the ability to either re-try the record (eg, > after applying some external fix that would allow Kafka Streams to now > deserialize the record now) or to skip it by advancing the offset. That's fair -- you're definitely right that what's described in the KIP document right now would not be practical. I just wanted to clarify that this doesn't mean the feature as a whole is impractical, but certainly we'd want to update the proposal to remove the line about resetting offsets via external tool and come up with a more concrete approach, and perhaps describe it in more detail. That's probably not worth getting into until/unless we decide whether to go forward with this feature in the first place. I'll let Nick reflect on the motivation and your other comments and then decide whether he still wants to pursue it. To Nick: if you want to go through with this KIP and can expand on the motivation so that we understand it better, I'd be happy to help work out the details. For now I'll just wait for your decision On Wed, Mar 13, 2024 at 10:24 AM Matthias J. Sax wrote: > Yes, about the "drop records" case. It's a very common scenario to have > a repartition step before a windowed aggregation or a join with > grace-period. > > > About "add feature vs guard users": it's always a tricky question and > tradeoff. For this particular KIP, I personally think we should opt to > not add the feature but guard the users, as I don't see too much value > compared to the complexity and "traps" it adds. -- It's of course just > my personal opinion, and if there is a asked from many users to add this > feature, I would not push back further. As mentioned in my previous > reply, I don't fully understand the motivation yet; maybe Nick can > provide more context on it. > > > > In other words, opting for the PAUSE option would simply stall the > > task, and upon #resume it would just be discarding that record and then > > continuing on with processing > > Well, the KIP mentions the ability to either re-try the record (eg, > after applying some external fix that would allow Kafka Streams to now > deserialize the record now) or to skip it by advancing the offset. But > to do this, we need to extend the `resume()` callback to pass in this > information, making the whole setup and usage of this feature more > complex, as one needs to so more upfront instrumentation of their custom > code. -- It's just a technical thing we need to consider if we want to > move forward, and the KIP should not say "advancing the consumer > offsets, either via an external tool" because this cannot work. Just > pointing out incorrect technical assumption, not disregarding that it > can be done. > > > About committing: yes, I agree to all what you say, and again it was not > meant as concern, but just as honest questions about some technical > details. I think it would be good to consider there trade-offs and > explain in the KIP why we want to do what. That's all. > > > > -Matthias > > On 3/12/24 11:24 PM, Sophie Blee-Goldman wrote: > >> > >> I see way too many food-guns and complications that can be introduced. > > > > > > What is a "food-gun"?? I'm picturing like a spud rifle/potato gun but I > > don't think that's what you meant hahaha > > > > I don't feel super strongly one way or another, but I have a few > questions > > & corrections about some of these complaints/concerns: > > > > If one task > >> pauses but other keep running, we keep advancing stream-time downstream, > >> and thus when the task would resume later, there is a very high > >> probability that records are dropped as window got already closed. > > > > Just to make sure I/everyone understand what you're getting at here, you > > would be > > referring to the case of a stateful operation downstream of a > key-changing > > operation > > which is in turn downstream of the "paused" task -- ie with a > repartition > > separating > > the paused task and the task with a windowed aggregation? Each task has > its > > own > > view of stream-time (technically each processor within a task) so the > only > > way that > > delaying one task and not another would affect which records get dropped > is > > if those > > two tasks are rekeyed and the repartitioning results in their outputs > being > > mixed -- yes? > > > > Anyways I think you make a good case for why pausing a single task -- or > > even an entire > > instance if others are allowed to continue running -- might make it too > > easy for users to > > shoot themselves in the foot without understanding the full > ramifications. > > Of course, there > > are already a million ways for users to screw up their app if configured > or > > operated incorrectly, > > and we shouldn't necessarily kill a feature just because some people > might > > use it when they > > shouldn't. Why can't we just document that this feature should not be > used > > with applications > > that include time-sensitive operators? > > > > I also
Re: [DISCUSS] KIP-1020: Deprecate `window.size.ms` in `StreamsConfig`
By "don't add them" do you just mean we would not have any actual variables defined anywhere for these configs (eg WINDOW_SIZE_MS) and simply document -- somewhere -- that one can use the string "window.size.ms" when configuring a command-line client with a windowed serde? Or something else? I assume you aren't proposing to remove the ability to use and understand this config from the implementations themselves, but correct me if that's wrong. Are there any other configs in similar situations that we could look to for precedent? I personally am not aware of any but by definition I suppose these would be hard to discover unless you were actively looking for them, so I'm wondering if there might be other "shadow configs" elsewhere in the code base. If these are truly the first/only of their kind, I would vote to just stick them in the appropriate class. As for which class to put them in, I think I'm convinced that "window.size.ms" should only go in the TimeWindowedDeserializer rather than sticking them both in the TimeWindowedSerde as I originally suggested. However, I would even go a step further and not place the "inner.window.class.serde" in the TimeWindowedSerde class either. To me, it actually makes the most sense to define it in both the TimeWindowedSerializer and the TimeWindowedDeserializer. The reason being that, as discussed above, the only use case for these configs would be in the console consumer/producer which only uses the Serializer or Deserializer, and would never actually be used by/in Streams where we use the Serde version. And while defining the "inner.window.class.serde" in two places might seem redundant, this would mean that all the configs needed to properly configure the specific class being used by the particular kind of consumer client -- that is, Deserializer for a console consumer and Serializer for a console producer -- would be located in that exact class. I assume this would make them much easier to discover and be used than having to search for configs defined in classes you don't even need for the console client, like the Serde form Just my two cents -- happy to hear other opinions on this On Mon, Mar 11, 2024 at 6:58 PM Matthias J. Sax wrote: > Yes, it's used inside `TimeWindowedSerializer` and actually also inside > `TimeWindowDeserializer`. > > However, it does IMHO not change that we should remove it from > `StreamsConfig` because both configs are not intended to be used in Java > code... If one writes Java code, they should use > >new TimeWindowedSerializer(Serializer) >new TimeWindowDeserializer(Deserializer, Long) >new TimeWindowSerdes(Serde, Long) > > and thus they don't need either config. > > The configs are only needed for command line tool, that create the > (de)serializer via reflection using the default constructor. > > Does this make sense? > > > > The only open question is really, if and where to add them... Strictly > speaking, we don't need either config as public variable as nobody > should use them in Java code. To me, it just feels right/better do make > them public for documentation purpose that these configs exists? > > `inner.window.class.serde` has "serde" in it's name, so we could add it > to `TimeWindowSerdes`? For `window.size.ms`, it's only used by the > deserialize to maybe add it there? Just some ideas. -- Or we sidestep > this question and just don't add them; also fine with me. > > > -Matthias > > On 3/11/24 10:53 AM, Lucia Cerchie wrote: > > PS-- I was re-reading the PR that originated this discussion and realized > > that `window.inner.serde.class` is used here > > < > https://github.com/a0x8o/kafka/blob/master/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedSerializer.java#L44 > > > > in KStreams. This goes against removing it, yes? > > > > On Mon, Mar 11, 2024 at 10:40 AM Lucia Cerchie > > wrote: > > > >> Sophie, I'll add a paragraph about removing `windowed.inner.serde.class` > >> to the KIP. I'll also add putting it in the `TimeWindowedSerde` class > with > >> some add'tl guidance on the docs addition. > >> > >> Also, I double-checked setting window.size.ms on the client and it > >> doesn't throw an error at all, in response to Matthias's question. > Changing > >> the KIP in response to that. > >> > >> On Sun, Mar 10, 2024 at 6:04 PM Sophie Blee-Goldman < > sop...@responsive.dev> > >> wrote: > >> > >>> Thanks for responding Matthias -- you got there first, but I was going > to > >>> say exactly the same thing as in your most reply. In other words, I see > >>> the > >>> `windowed.inner.serde.class` as being in the same boat as the ` > >>> window.size.ms` config, so whatever we do with one we should do for > the > >>> other. > >>> > >>> I do agree with removing these from StreamsConfig, but defining them in > >>> ConsumerConfig feels weird as well. There's really no great answer > here. > >>> > >>> My only concern about adding it to the corresponding > >>> serde/serializer/deserializer class is
Re: [DISCUSS] Apache Kafka 3.6.2 release
+1, Thanks Mani for volunteering. On Thu, 14 Mar 2024 at 06:01, Luke Chen wrote: > > +1, Thanks Manikumar! > > On Thu, Mar 14, 2024 at 3:40 AM Bruno Cadonna wrote: > > > Thanks Manikumar! > > > > +1 > > > > Best, > > Bruno > > > > On 3/13/24 5:56 PM, Josep Prat wrote: > > > +1 thanks for volunteering! > > > > > > Best > > > --- > > > > > > Josep Prat > > > Open Source Engineering Director, aivenjosep.p...@aiven.io | > > > +491715557497 | aiven.io > > > Aiven Deutschland GmbH > > > Alexanderufer 3-7, 10117 Berlin > > > Geschäftsführer: Oskari Saarenmaa & Hannu Valtonen > > > Amtsgericht Charlottenburg, HRB 209739 B > > > > > > On Wed, Mar 13, 2024, 17:17 Divij Vaidya > > wrote: > > > > > >> +1 > > >> > > >> Thank you for volunteering. > > >> > > >> -- > > >> Divij Vaidya > > >> > > >> > > >> > > >> On Wed, Mar 13, 2024 at 4:58 PM Justine Olshan > > >> > > >> wrote: > > >> > > >>> Thanks Manikumar! > > >>> +1 from me > > >>> > > >>> Justine > > >>> > > >>> On Wed, Mar 13, 2024 at 8:52 AM Manikumar > > >>> wrote: > > >>> > > Hi, > > > > I'd like to volunteer to be the release manager for a bug fix release > > >> of > > the 3.6 line. > > If there are no objections, I'll send out the release plan soon. > > > > Thanks, > > Manikumar > > > > >>> > > >> > > > > >
Re: [DISCUSS] Apache Kafka 3.6.2 release
+1, Thanks Manikumar! On Thu, Mar 14, 2024 at 3:40 AM Bruno Cadonna wrote: > Thanks Manikumar! > > +1 > > Best, > Bruno > > On 3/13/24 5:56 PM, Josep Prat wrote: > > +1 thanks for volunteering! > > > > Best > > --- > > > > Josep Prat > > Open Source Engineering Director, aivenjosep.p...@aiven.io | > > +491715557497 | aiven.io > > Aiven Deutschland GmbH > > Alexanderufer 3-7, 10117 Berlin > > Geschäftsführer: Oskari Saarenmaa & Hannu Valtonen > > Amtsgericht Charlottenburg, HRB 209739 B > > > > On Wed, Mar 13, 2024, 17:17 Divij Vaidya > wrote: > > > >> +1 > >> > >> Thank you for volunteering. > >> > >> -- > >> Divij Vaidya > >> > >> > >> > >> On Wed, Mar 13, 2024 at 4:58 PM Justine Olshan > >> > >> wrote: > >> > >>> Thanks Manikumar! > >>> +1 from me > >>> > >>> Justine > >>> > >>> On Wed, Mar 13, 2024 at 8:52 AM Manikumar > >>> wrote: > >>> > Hi, > > I'd like to volunteer to be the release manager for a bug fix release > >> of > the 3.6 line. > If there are no objections, I'll send out the release plan soon. > > Thanks, > Manikumar > > >>> > >> > > >
Re: [DISCUSS] KIP-1024: Make the restore behavior of GlobalKTables with custom processors configureable
Thanks for the feedback Bruno, Matthias, and Lucas! There is a decent amount but I'm going to try and just hit the major points as I would like to keep this change simple. I've made corrections for the mistakes pointed out. Thanks for the suggestions everyone. The main sticking point seems to be with the method of signalling the restore behavior. It seems we can all agree with how the API should look with the default option we are adding. I think keeping the option to load directly from the topic into the store is a good idea. It is much more performant and could make a simple metric collector processor much simpler. I think something that Matthais said about creating a special class of processors for the global stores helps me think about the issue. I tend to fall into the category that we should keep global stores open to the possibility of having child nodes in the future. I don't really see the downside of having that as an option. It might not be best for a lot of cases, but something simple could be very useful to put in the PAPI. I like the idea of having a `GlobalStoreParameters` but only if we decide to make the processor need to extend an interface like 'GobalStoreProcessor`. If not that seems excessive. As of right now I don't see a better option than having a boolean flag for the reprocessOnRestore option. I expanded the description in the docs so I hope that helps. I am more than willing to take other ideas on it. thanks, Walker
Re: [DISCUSS] Apache Kafka 3.6.2 release
Thanks Manikumar! +1 Best, Bruno On 3/13/24 5:56 PM, Josep Prat wrote: +1 thanks for volunteering! Best --- Josep Prat Open Source Engineering Director, aivenjosep.p...@aiven.io | +491715557497 | aiven.io Aiven Deutschland GmbH Alexanderufer 3-7, 10117 Berlin Geschäftsführer: Oskari Saarenmaa & Hannu Valtonen Amtsgericht Charlottenburg, HRB 209739 B On Wed, Mar 13, 2024, 17:17 Divij Vaidya wrote: +1 Thank you for volunteering. -- Divij Vaidya On Wed, Mar 13, 2024 at 4:58 PM Justine Olshan wrote: Thanks Manikumar! +1 from me Justine On Wed, Mar 13, 2024 at 8:52 AM Manikumar wrote: Hi, I'd like to volunteer to be the release manager for a bug fix release of the 3.6 line. If there are no objections, I'll send out the release plan soon. Thanks, Manikumar
Dynamic Metrics Reporter Producer reload
Hello, We currently use short lived TLS certs for our CruiseControlMetricsReporter which is creating issues when the cert reloads the producer client is failing to restart as well. A rolling restart resolves this but I was hoping to find out if there is a dynamic way to reload these producers or any client reallly in a similar pattern we do with brokers. Github issue on the cruise control repo for another reference: https://github.com/linkedin/cruise-control/issues/1148 Kenneth Eversole
Re: [DISCUSS] Personal branches under apache/kafka
+1 Should be fine to just delete all of them (as long as nobody raised objections). Not sure if we could enable some protection GitHub side that disallow to push into non-existing branches and thus avoids accidental branch creation? -Matthias On 3/13/24 11:39 AM, Josep Prat wrote: Hi Michael, I think it's a good idea. Only "official" branches should exist in the upstream repo. I guess the only exception would be if a massive feature would be done by different individuals collaborating and they would need a "neutral" place for the branch to be. But This didn't happen yet and I doubt it will in the near future. Best, --- Josep Prat Open Source Engineering Director, aivenjosep.p...@aiven.io | +491715557497 | aiven.io Aiven Deutschland GmbH Alexanderufer 3-7, 10117 Berlin Geschäftsführer: Oskari Saarenmaa & Hannu Valtonen Amtsgericht Charlottenburg, HRB 209739 B On Wed, Mar 13, 2024, 19:27 José Armando García Sancio wrote: On Wed, Mar 13, 2024 at 11:02 AM Mickael Maison wrote: What do you think? I agree. I wouldn't be surprised if these branches (not trunk or release branches) were created by mistake by the committer. Thanks, -- -José
Re: [DISCUSS] Personal branches under apache/kafka
Hi Michael, I think it's a good idea. Only "official" branches should exist in the upstream repo. I guess the only exception would be if a massive feature would be done by different individuals collaborating and they would need a "neutral" place for the branch to be. But This didn't happen yet and I doubt it will in the near future. Best, --- Josep Prat Open Source Engineering Director, aivenjosep.p...@aiven.io | +491715557497 | aiven.io Aiven Deutschland GmbH Alexanderufer 3-7, 10117 Berlin Geschäftsführer: Oskari Saarenmaa & Hannu Valtonen Amtsgericht Charlottenburg, HRB 209739 B On Wed, Mar 13, 2024, 19:27 José Armando García Sancio wrote: > On Wed, Mar 13, 2024 at 11:02 AM Mickael Maison > wrote: > > What do you think? > > I agree. I wouldn't be surprised if these branches (not trunk or > release branches) were created by mistake by the committer. > > Thanks, > -- > -José >
Re: Incremental build for scala tests
There seems to be open issue in Gradle for it: https://github.com/gradle/gradle/issues/20854 >Monday, March 4, 2024 9:44 PM UTC from Pavel Pozdeev > > >Hi team, >I'm a new member, just joined the community. Trying to add Kraft support to >some existing unit-tests. >I noticed, that when I do any change to some unit-test, e.g. >"kafka.admin.AclCommandTest.scala", and then run: > >"./gradlew core:compileTestScala" > >the entire folder "core/build/classes/scala/test" is cleared, and gradle >re-compiles ALL tests. It takes quite a long time. >Looks like this issue affects only tests. If I change main scala code, e.g. >"kafka.admin.AclCommand.scala", and then run: > >"./gradlew core:compileScala" > >only single file is re-compiled, and it takes only a few seconds. >Has anybody noticed same issue before? > >Best, >Pavel Pozdeev
Re: [DISCUSS] Personal branches under apache/kafka
On Wed, Mar 13, 2024 at 11:02 AM Mickael Maison wrote: > What do you think? I agree. I wouldn't be surprised if these branches (not trunk or release branches) were created by mistake by the committer. Thanks, -- -José
[DISCUSS] Personal branches under apache/kafka
Hi, We have accumulated a number of personal branches in the github repository: https://github.com/apache/kafka/branches/all All these branches have been created by committers for various reasons, bugfix, tests. I wonder if we should avoid creating branches in the apache repository (always use your own fork like regular contributors) and in the rare cases this is necessary ensure we delete them once done? This way we would only have branches for the various releases (3.7, 3.6, etc). What do you think? Thanks, Mickael
Re: [DISCUSS] KIP-1022 Formatting and Updating Features
Hey folks -- let me summarize my understanding Artem requested we change the name of transaction version (tv) to transaction protocol version (tpv). If I do that, I will also probably change to group coordinator protocol version (gcpv). Folks were concerned about upgrades/downgrades with different feature versions when using the feature tool and --release version. I think we can address some concerns here: - don't allow a command that moves a version in the wrong direction (when upgrade actually downgrades a feature for example) - allow a command to describe all the feature versions for a given metadata version Note that Colin expressed some concern with having the --release-version flag at all. I wonder if we can reach a compromise with having the upgrade command have a "latest" command only. Jun proposed this "latest" functionality can be the default when no version or feature is specified. Jose requested that we make --release-version and --feature mutually exclusive for both the storage tool and the features tool. We should also specify each feature in the storage tool as a separate flag. Finally, Jun and Jose requested deprecation of the --metadata flag, but Colin felt we should keep it. I mentioned the --feature flag does the same but no reply yet. * So here are the updates I propose:* 1. Update the feature names as Artem described -- transaction protocol version and group coordinator protocol version 2. Rename --features in the storage tool to --feature. In the case where we use this flag, we must repeat the flag for the number of features to set all of them. 3. No argument behavior on the storage and upgrade tools is to set all the features to the latest (stable) version 4. Include an API to list the features for a given metadata version 5. I'm going back and forth on whether we should support the --release-version flag still. If we do, we need to include validation so we only upgrade on upgrade. Let me know if folks have any issues with the updates I proposed or think there is something I missed. Justine On Fri, Mar 8, 2024 at 11:59 AM Artem Livshits wrote: > Hi Justine, > > > Are you suggesting it should be called "transaction protocol version" or > "TPV"? I don't mind that, but just wanted to clarify if we want to include > protocol or if simply "transaction version" is enough. > > My understanding is that "metadata version" is the version of metadata > records, which is fairly straightforward. "Transaction version" may be > ambiguous. > > -Artem > > On Thu, Feb 29, 2024 at 3:39 PM Justine Olshan > > wrote: > > > Hey folks, > > > > Thanks for the discussion. Let me try to cover everyone's comments. > > > > Artem -- > > I can add the examples you mentioned. As for naming, right now the > feature > > is called "transaction version" or "TV". Are you suggesting it should be > > called "transaction protocol version" or "TPV"? I don't mind that, but > just > > wanted to clarify if we want to include protocol or if simply > "transaction > > version" is enough. > > > > Jun -- > > > > 10. *With **more features, would each of those be controlled by a > separate > > feature or* > > > > *multiple features. For example, is the new transaction record format* > > > > *controlled only by MV with TV having a dependency on MV or is it > > controlled* > > > > *by both MV and TV.* > > > > > > I think this will need to be decided on a case by case basis. There > should > > be a mechanism to set dependencies among features. > > For transaction version specifically, I have no metadata version > > dependencies besides requiring 3.3 to write the feature records and use > the > > feature tools. I would suspect all new features would have this > > requirement. > > > > > > 11. *Basically, if **--release-version is not used, the command will just > > use the latest* > > > > *production version of every feature. Should we apply that logic to both* > > > > *tools?* > > > > > > How would this work with the upgrade tool? I think we want a way to set a > > new feature version for one feature and not touch any of the others. > > > > > > *12. Should we remove --metadata METADATA from kafka-features? It does > the* > > > > *same thing as --release-version.* > > > > > > When I previously discussed with Colin McCabe offline about this tool, he > > was strongly against deprecation or changing flags. I personally think it > > could good > > > > to unify and not support a ton of flags, but I would want to make sure he > > is aligned. > > > > > > *13. KIP-853 also extends the tools to support a new feature > > kraft.version.* > > > > *It would be useful to have alignment between that KIP and this one.* > > > > > > Sounds good. Looks like Jose is in on the discussion so we can continue > > here. :) > > > > > > > > Jose -- > > > > > > *1. KIP-853 uses --feature for kafka-storage instead of --features.* > > > > *This is consistent with the use of --feature in the "kafka-feature.sh* > > > > *upgrade"
Re: [DISCUSS] KIP-1024: Make the restore behavior of GlobalKTables with custom processors configureable
If the custom store is a key-value store, yes, we could do this. But the interface does not enforce a key-value store, it's just a most generic `StateStore` that we pass in, and thus it could be something totally unknown to us, and we cannot apply a cast... The underlying idea is really about 100% flexibility in the PAPI layer. That's also the reason why all stores need to provide a callback for the restore path. Kafka Streams runtime can only read the record from the changelog, but it cannot put it into the store, as the runtime only sees the `StateStore` interface -- thus, we invoke a store specific callback (`StateRestoreCallback` interface) that needs to actually put the data into the store for us. For our built-in store, we of course provide these callbacks, but the point is, that the runtime does not know anything about the nature of the store but is fully agnostic to it, to allow the plugin of any custom store with any custom interface (which just needs to implement `StateStore`). Not sure if I understand what you mean by this transformation step? -Matthias On 3/12/24 3:04 AM, Lucas Brutschy wrote: @Matthias: Thanks, I didn't realize that we need processors for any custom store. Are we sure we cannot build a generic processor to load data into a custom key-value store? I'm not sure, but you know the code better than me. One other alternative is to allow the user to provide a state transformer `Function, ConsumerRecord>` to adapt the state before loading it, defaulting to identity. This would provide the ability to do efficient, non-deserializing transformations like => On Thu, Mar 7, 2024 at 7:19 PM Matthias J. Sax wrote: @Bruno: (1), I think you are spot for the ts-extractor: on the restore code path, we only support record-ts, but there is no need for a custom-ts because for regular changelog topics KS sets the ts, and thus, the optimization this KIP proposes required that the global topic follow the changelog format, ie, the ts must be in the record-ts. However, for the regular processing path, I am not sure if we can omit deserializers. The way the PAPI is wired up, seems to require that we give proper types to _other_ Processor that read from the global state store. For this reason, the store (which takes `Serdes` with proper types) is wrapped with a `MeteredStore` (like all others) to do the Serde work, and this MeteredStore is also exposed to the global-Processor? Might be good for Walker to dig into this to find out the details? If would of course be nice if we could avoid the unnecessary deserialization on topic read, and re-serialization on global-store put for this case, but it seems not to be straightforward to do... (2). Is this about the PAPI/Topology? For this case, we don't have any config object across the board. We only do this in the DSL. Hence, I would propose to just follow the existing pattern in this KIP to keep the API consistent. For the DSL, it could make sense of course. -- Of course, if we think the PAPI could be improved with config objects, we could do this in a dedicate KIP. @Lucas: The PAPI is unfortunately (by design) much more open and less restrictive. If a users has a custom state store, we need some `Processor` code from them, because we cannot provide a built-in processor for an unknown store. The overload which won't take a processor would only work for the built-in key-value store, what I assume would cover most use-cases, however, we should keep the door open for other use cases. Otherwise, we disallow this optimization for custom stores. PAPI is really about flexibility, and yes, with great power comes great responsibility for the users :) But this actually highlights a different aspect: the overload not accepting a custom `Processor` but using a built-in processor, should not accept a generic `StoreBuilder` but should restrict the type to `StoreBuilder`? -Matthias On 3/6/24 1:14 PM, Lucas Brutschy wrote: Hey Walker Thanks for the KIP, and congrats on the KiBiKIP ;) My main point is that I'd vote against introducing `reprocessOnRestore`. The behavior for `reprocessOnRestore = false` is just incorrect and should be removed or deprecated. If we think we need to keep the old behavior around, renaming the methods, e.g., to `addGlobalReadOnlyStore`, is a great opportunity to deprecate the old behavior. But at a first glance, the old behavior just looks like a bug to me and should just be removed. So for this KIP, I'd keep two variants as you propose and drop the boolean parameter, but the two variants will be 1) a copy-restore variant without custom processing, as you propose. 2) a process-restore variant with custom processing (parameters the same as before). This should be combined with a clear warning in the Javadoc of the performance downside of this approach. Presentation: 1) I wonder if you could make another pass on the motivation section. I was lacking some context on this problem, and I think the
[jira] [Created] (KAFKA-16369) Broker may not shut down when SocketServer fails to bind as Address already in use
Edoardo Comar created KAFKA-16369: - Summary: Broker may not shut down when SocketServer fails to bind as Address already in use Key: KAFKA-16369 URL: https://issues.apache.org/jira/browse/KAFKA-16369 Project: Kafka Issue Type: Bug Reporter: Edoardo Comar When in Zookeeper mode, if a port the broker should listen to is already bound the KafkaException: Socket server failed to bind to localhost:9092: Address already in use. is thrown but the Broker continues to startup . It correctly shuts down when in KRaft mode. Easy to reproduce when in Zookeper mode with server.config set to listen to localhost only {color:#00}listeners={color}{color:#a31515}PLAINTEXT://localhost:9092{color} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [DISCUSS] KIP-990: Capability to SUSPEND Tasks on DeserializationException
Yes, about the "drop records" case. It's a very common scenario to have a repartition step before a windowed aggregation or a join with grace-period. About "add feature vs guard users": it's always a tricky question and tradeoff. For this particular KIP, I personally think we should opt to not add the feature but guard the users, as I don't see too much value compared to the complexity and "traps" it adds. -- It's of course just my personal opinion, and if there is a asked from many users to add this feature, I would not push back further. As mentioned in my previous reply, I don't fully understand the motivation yet; maybe Nick can provide more context on it. In other words, opting for the PAUSE option would simply stall the task, and upon #resume it would just be discarding that record and then continuing on with processing Well, the KIP mentions the ability to either re-try the record (eg, after applying some external fix that would allow Kafka Streams to now deserialize the record now) or to skip it by advancing the offset. But to do this, we need to extend the `resume()` callback to pass in this information, making the whole setup and usage of this feature more complex, as one needs to so more upfront instrumentation of their custom code. -- It's just a technical thing we need to consider if we want to move forward, and the KIP should not say "advancing the consumer offsets, either via an external tool" because this cannot work. Just pointing out incorrect technical assumption, not disregarding that it can be done. About committing: yes, I agree to all what you say, and again it was not meant as concern, but just as honest questions about some technical details. I think it would be good to consider there trade-offs and explain in the KIP why we want to do what. That's all. -Matthias On 3/12/24 11:24 PM, Sophie Blee-Goldman wrote: I see way too many food-guns and complications that can be introduced. What is a "food-gun"?? I'm picturing like a spud rifle/potato gun but I don't think that's what you meant hahaha I don't feel super strongly one way or another, but I have a few questions & corrections about some of these complaints/concerns: If one task pauses but other keep running, we keep advancing stream-time downstream, and thus when the task would resume later, there is a very high probability that records are dropped as window got already closed. Just to make sure I/everyone understand what you're getting at here, you would be referring to the case of a stateful operation downstream of a key-changing operation which is in turn downstream of the "paused" task -- ie with a repartition separating the paused task and the task with a windowed aggregation? Each task has its own view of stream-time (technically each processor within a task) so the only way that delaying one task and not another would affect which records get dropped is if those two tasks are rekeyed and the repartitioning results in their outputs being mixed -- yes? Anyways I think you make a good case for why pausing a single task -- or even an entire instance if others are allowed to continue running -- might make it too easy for users to shoot themselves in the foot without understanding the full ramifications. Of course, there are already a million ways for users to screw up their app if configured or operated incorrectly, and we shouldn't necessarily kill a feature just because some people might use it when they shouldn't. Why can't we just document that this feature should not be used with applications that include time-sensitive operators? I also feel like you dismissed the "skip record case" somewhat too easily: For the "skip record case", it's also not possible to skip over an offset from outside while the application is running True, you can't advance the offset from outside the app, but I don't see why you would want to, much less why you should need to for this to work. Surely the best way to implement this case would just be for the #resume API to behave, and work, exactly the same as the handler's CONTINUE option? In other words, opting for the PAUSE option would simply stall the task, and upon #resume it would just be discarding that record and then continuing on with processing (or even committing the offset immediately after it, perhaps even asynchronously since it presumably doesn't matter if it doesn't succeed and the record is picked up again by accident -- as long as that doesn't happen repeatedly in an infinite loop, which I don't see why it would.) On the subject of committing... Other questions: if a task would be paused, would we commit the current offset? What happens if we re-balance? Would we just lose the "pause" state, and hit the same error again and just pause again? I was imagining that we would either just wait without committing, or perhaps even commit everything up to -- but not including -- the "bad" record when PAUSE is triggered.
Re: [DISCUSS] Apache Kafka 3.6.2 release
+1 thanks for volunteering! Best --- Josep Prat Open Source Engineering Director, aivenjosep.p...@aiven.io | +491715557497 | aiven.io Aiven Deutschland GmbH Alexanderufer 3-7, 10117 Berlin Geschäftsführer: Oskari Saarenmaa & Hannu Valtonen Amtsgericht Charlottenburg, HRB 209739 B On Wed, Mar 13, 2024, 17:17 Divij Vaidya wrote: > +1 > > Thank you for volunteering. > > -- > Divij Vaidya > > > > On Wed, Mar 13, 2024 at 4:58 PM Justine Olshan > > wrote: > > > Thanks Manikumar! > > +1 from me > > > > Justine > > > > On Wed, Mar 13, 2024 at 8:52 AM Manikumar > > wrote: > > > > > Hi, > > > > > > I'd like to volunteer to be the release manager for a bug fix release > of > > > the 3.6 line. > > > If there are no objections, I'll send out the release plan soon. > > > > > > Thanks, > > > Manikumar > > > > > >
Re: [DISCUSS] Apache Kafka 3.6.2 release
+1 Thank you for volunteering. -- Divij Vaidya On Wed, Mar 13, 2024 at 4:58 PM Justine Olshan wrote: > Thanks Manikumar! > +1 from me > > Justine > > On Wed, Mar 13, 2024 at 8:52 AM Manikumar > wrote: > > > Hi, > > > > I'd like to volunteer to be the release manager for a bug fix release of > > the 3.6 line. > > If there are no objections, I'll send out the release plan soon. > > > > Thanks, > > Manikumar > > >
Re: [DISCUSS] Apache Kafka 3.6.2 release
Thanks Manikumar! +1 from me Justine On Wed, Mar 13, 2024 at 8:52 AM Manikumar wrote: > Hi, > > I'd like to volunteer to be the release manager for a bug fix release of > the 3.6 line. > If there are no objections, I'll send out the release plan soon. > > Thanks, > Manikumar >
Re: [DISCUSS] KIP-1026: Handling producer snapshot when upgrading from < v2.8.0 for Tiered Storage
Hi Arpit, Thanks for the KIP! I am not familiar with the necessity of producer snapshots, but your explanation sounds like this should be made optional. Can you expand the KIP to include the changes that need to be made to the constructor and getter, and explain more about backwards compatibility? From the description I can't tell if this change is backwards-compatible or not. Thanks, Greg On Wed, Mar 13, 2024 at 6:48 AM Arpit Goyal wrote: > > Hi all, > > I just wanted to bump up this thread. > > The KIP introduces a really small change and it would not take much of the > time reviewing it. This change would enable kafka users to use tiered > storage features seamlessly for the topics migrated from < 2.8 version > which currently failed with NullPointerException. > > I am waiting for this KIP to get approved and then start working on it. > > On Mon, Mar 11, 2024, 14:26 Arpit Goyal wrote: > > > Hi All, > > Just a Reminder, KIP-1026 is open for discussion. > > Thanks and Regards > > Arpit Goyal > > 8861094754 > > > > > > On Sat, Mar 9, 2024 at 9:27 AM Arpit Goyal > > wrote: > > > >> Hi All, > >> > >> I have created KIP-1026 for handling producerSnapshot empty scenarios > >> when the topic is upgraded from the kafka < 2.8 version. > >> > >> > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1026%3A+Handling+producer+snapshot+when+upgrading+from+%3C+v2.8.0+for+Tiered+Storage > >> > >> Feedback and suggestions are welcome. > >> > >> Thanks and Regards > >> Arpit Goyal > >> 8861094754 > >> > >
[DISCUSS] Apache Kafka 3.6.2 release
Hi, I'd like to volunteer to be the release manager for a bug fix release of the 3.6 line. If there are no objections, I'll send out the release plan soon. Thanks, Manikumar
[jira] [Resolved] (KAFKA-16171) Controller failover during ZK migration can prevent metadata updates to ZK brokers
[ https://issues.apache.org/jira/browse/KAFKA-16171?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Arthur resolved KAFKA-16171. -- Resolution: Fixed > Controller failover during ZK migration can prevent metadata updates to ZK > brokers > -- > > Key: KAFKA-16171 > URL: https://issues.apache.org/jira/browse/KAFKA-16171 > Project: Kafka > Issue Type: Bug > Components: controller, kraft, migration >Affects Versions: 3.6.0, 3.7.0, 3.6.1 >Reporter: David Arthur >Assignee: David Arthur >Priority: Blocker > Fix For: 3.6.2, 3.7.0 > > > h2. Description > During the ZK migration, after KRaft becomes the active controller we enter a > state called hybrid mode. This means we have a mixture of ZK and KRaft > brokers. The KRaft controller updates the ZK brokers using the deprecated > controller RPCs (LeaderAndIsr, UpdateMetadata, etc). > > A race condition exists where the KRaft controller will get stuck in a retry > loop while initializing itself after a failover which prevents it from > sending these RPCs to ZK brokers. > h2. Impact > Since the KRaft controller cannot send any RPCs to the ZK brokers, the ZK > brokers will not receive any metadata updates. The ZK brokers will be able to > send requests to the controller (such as AlterPartitions), but the metadata > updates which come as a result of those requests will never be seen. This > essentially looks like the controller is unavailable from the ZK brokers > perspective. > h2. Detection and Mitigation > This bug can be seen by observing failed ZK writes from a recently elected > controller. > The tell-tale error message is: > {code:java} > Check op on KRaft Migration ZNode failed. Expected zkVersion = 507823. This > indicates that another KRaft controller is making writes to ZooKeeper. {code} > with a stacktrace like: > {noformat} > java.lang.RuntimeException: Check op on KRaft Migration ZNode failed. > Expected zkVersion = 507823. This indicates that another KRaft controller is > making writes to ZooKeeper. > at > kafka.zk.KafkaZkClient.handleUnwrappedMigrationResult$1(KafkaZkClient.scala:2613) > at > kafka.zk.KafkaZkClient.unwrapMigrationResponse$1(KafkaZkClient.scala:2639) > at > kafka.zk.KafkaZkClient.$anonfun$retryMigrationRequestsUntilConnected$2(KafkaZkClient.scala:2664) > at > scala.collection.StrictOptimizedIterableOps.map(StrictOptimizedIterableOps.scala:100) > at > scala.collection.StrictOptimizedIterableOps.map$(StrictOptimizedIterableOps.scala:87) > at scala.collection.mutable.ArrayBuffer.map(ArrayBuffer.scala:43) > at > kafka.zk.KafkaZkClient.retryMigrationRequestsUntilConnected(KafkaZkClient.scala:2664) > at > kafka.zk.migration.ZkTopicMigrationClient.$anonfun$createTopic$1(ZkTopicMigrationClient.scala:158) > at > kafka.zk.migration.ZkTopicMigrationClient.createTopic(ZkTopicMigrationClient.scala:141) > at > org.apache.kafka.metadata.migration.KRaftMigrationZkWriter.lambda$handleTopicsSnapshot$27(KRaftMigrationZkWriter.java:441) > at > org.apache.kafka.metadata.migration.KRaftMigrationDriver.applyMigrationOperation(KRaftMigrationDriver.java:262) > at > org.apache.kafka.metadata.migration.KRaftMigrationDriver.access$300(KRaftMigrationDriver.java:64) > at > org.apache.kafka.metadata.migration.KRaftMigrationDriver$SyncKRaftMetadataEvent.lambda$run$0(KRaftMigrationDriver.java:791) > at > org.apache.kafka.metadata.migration.KRaftMigrationDriver.lambda$countingOperationConsumer$6(KRaftMigrationDriver.java:880) > at > org.apache.kafka.metadata.migration.KRaftMigrationZkWriter.lambda$handleTopicsSnapshot$28(KRaftMigrationZkWriter.java:438) > at java.base/java.lang.Iterable.forEach(Iterable.java:75) > at > org.apache.kafka.metadata.migration.KRaftMigrationZkWriter.handleTopicsSnapshot(KRaftMigrationZkWriter.java:436) > at > org.apache.kafka.metadata.migration.KRaftMigrationZkWriter.handleSnapshot(KRaftMigrationZkWriter.java:115) > at > org.apache.kafka.metadata.migration.KRaftMigrationDriver$SyncKRaftMetadataEvent.run(KRaftMigrationDriver.java:790) > at > org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127) > at > org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210) > at > org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181) > at java.base/java.lang.Thread.run(Thread.java:1583) > at > org.apache.kafka.common.utils.KafkaThread.run(KafkaThread.java:66){noformat} > To mitigate this problem, a new KRaft controller should be elected. This can > be done by restarting the problematic active controller. To verify that the > new
Re: [DISCUSS] KIP-1026: Handling producer snapshot when upgrading from < v2.8.0 for Tiered Storage
Hi all, I just wanted to bump up this thread. The KIP introduces a really small change and it would not take much of the time reviewing it. This change would enable kafka users to use tiered storage features seamlessly for the topics migrated from < 2.8 version which currently failed with NullPointerException. I am waiting for this KIP to get approved and then start working on it. On Mon, Mar 11, 2024, 14:26 Arpit Goyal wrote: > Hi All, > Just a Reminder, KIP-1026 is open for discussion. > Thanks and Regards > Arpit Goyal > 8861094754 > > > On Sat, Mar 9, 2024 at 9:27 AM Arpit Goyal > wrote: > >> Hi All, >> >> I have created KIP-1026 for handling producerSnapshot empty scenarios >> when the topic is upgraded from the kafka < 2.8 version. >> >> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1026%3A+Handling+producer+snapshot+when+upgrading+from+%3C+v2.8.0+for+Tiered+Storage >> >> Feedback and suggestions are welcome. >> >> Thanks and Regards >> Arpit Goyal >> 8861094754 >> >
Re: [DISCUSS] Minimum constraint for segment.ms
+ users@kafka Hi users of Apache Kafka With the upcoming 4.0 release, we have an opportunity to improve the constraints and default values for various Kafka configurations. We are soliciting your feedback and suggestions on configurations where the default values and/or constraints should be adjusted. Please reply in this thread directly. -- Divij Vaidya Apache Kafka PMC On Wed, Mar 13, 2024 at 12:56 PM Divij Vaidya wrote: > Thanks for the discussion folks. I have started a KIP > https://cwiki.apache.org/confluence/display/KAFKA/KIP-1030%3A+Change+constraints+and+default+values+for+various+configurations > to keep track of the changes that we are discussion. Please consider this > as a collaborative work-in-progress KIP and once it is ready to be > published, we can start a discussion thread on it. > > I am also going to start a thread to solicit feedback from users@ mailing > list as well. > > -- > Divij Vaidya > > > > On Wed, Mar 13, 2024 at 12:55 PM Christopher Shannon < > christopher.l.shan...@gmail.com> wrote: > >> I think it's a great idea to raise a KIP to look at adjusting defaults and >> minimum/maximum config values for version 4.0. >> >> As pointed out, the minimum values for segment.ms and segment.bytes don't >> make sense and would probably bring down a cluster pretty quickly if set >> that low, so version 4.0 is a good time to fix it and to also look at the >> other configs as well for adjustments. >> >> On Wed, Mar 13, 2024 at 4:39 AM Sergio Daniel Troiano >> wrote: >> >> > hey guys, >> > >> > Regarding to num.recovery.threads.per.data.dir: I agree, in our company >> we >> > use the number of vCPUs to do so as this is not competing with ready >> > cluster traffic. >> > >> > >> > On Wed, 13 Mar 2024 at 09:29, Luke Chen wrote: >> > >> > > Hi Divij, >> > > >> > > Thanks for raising this. >> > > The valid minimum value 1 for `segment.ms` is completely >> unreasonable. >> > > Similarly for `segment.bytes`, `metadata.log.segment.ms`, >> > > `metadata.log.segment.bytes`. >> > > >> > > In addition to that, there are also some config default values we'd >> like >> > to >> > > propose to change in v4.0. >> > > We can collect more comments from the community, and come out with a >> KIP >> > > for them. >> > > >> > > 1. num.recovery.threads.per.data.dir: >> > > The current default value is 1. But the log recovery is happening >> before >> > > brokers are in ready state, which means, we should use all the >> available >> > > resource to speed up the log recovery to bring the broker to ready >> state >> > > soon. Default value should be... maybe 4 (to be decided)? >> > > >> > > 2. Other configs might be able to consider to change the default, but >> > open >> > > for comments: >> > >2.1. num.replica.fetchers: default is 1, but that's not enough when >> > > there are multiple partitions in the cluster >> > >2.2. `socket.send.buffer.bytes`/`socket.receive.buffer.bytes`: >> > > Currently, we set 100kb as default value, but that's not enough for >> > > high-speed network. >> > > >> > > Thank you. >> > > Luke >> > > >> > > >> > > On Tue, Mar 12, 2024 at 1:32 AM Divij Vaidya > > >> > > wrote: >> > > >> > > > Hey folks >> > > > >> > > > Before I file a KIP to change this in 4.0, I wanted to understand >> the >> > > > historical context for the value of the following setting. >> > > > >> > > > Currently, segment.ms minimum threshold is set to 1ms [1]. >> > > > >> > > > Segments are expensive. Every segment uses multiple file descriptors >> > and >> > > > it's easy to run out of OS limits when creating a large number of >> > > segments. >> > > > Large number of segments also delays log loading on startup because >> of >> > > > expensive operations such as iterating through all directories & >> > > > conditionally loading all producer state. >> > > > >> > > > I am currently not aware of a reason as to why someone might want to >> > work >> > > > with a segment.ms of less than ~10s (number chosen arbitrary that >> > looks >> > > > sane) >> > > > >> > > > What was the historical context of setting the minimum threshold to >> 1ms >> > > for >> > > > this setting? >> > > > >> > > > [1] >> > https://kafka.apache.org/documentation.html#topicconfigs_segment.ms >> > > > >> > > > -- >> > > > Divij Vaidya >> > > > >> > > >> > >> >
Re: [DISCUSS] KIP-1025: Optionally URL-encode clientID and clientSecret in authorization header
Hi all, I just wanted to bump up this thread. The KIP introduces a really small change and PR is already ready and only waiting for this KIP to get approved to be merged. Thanks, On Wed, Mar 6, 2024 at 12:26 PM Nelson B. wrote: > Hi all, > > I would like to start a discussion on KIP-1025, which would optionally > URL-encode clientID and clientSecret in the authorization header > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-1025%3A+Optionally+URL-encode+clientID+and+clientSecret+in+authorization+header > > Best, > Nelson B. >
Re: [VOTE] KIP-939: Support Participation in 2PC
I had a look at the discussion thread and the KIP looks exciting. +1 non-binding Best Omnia On 1 Dec 2023, at 19:06, Artem Livshits wrote: Hello, This is a voting thread for https://cwiki.apache.org/confluence/display/KAFKA/KIP-939%3A+Support+Participation+in+2PC . The KIP proposes extending Kafka transaction support (that already uses 2PC under the hood) to enable atomicity of dual writes to Kafka and an external database, and helps to fix a long standing Flink issue. An example of code that uses the dual write recipe with JDBC and should work for most SQL databases is here https://github.com/apache/kafka/pull/14231. The FLIP for the sister fix in Flink is here https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=255071710 -Artem
Re: [DISCUSS] Minimum constraint for segment.ms
Thanks for the discussion folks. I have started a KIP https://cwiki.apache.org/confluence/display/KAFKA/KIP-1030%3A+Change+constraints+and+default+values+for+various+configurations to keep track of the changes that we are discussion. Please consider this as a collaborative work-in-progress KIP and once it is ready to be published, we can start a discussion thread on it. I am also going to start a thread to solicit feedback from users@ mailing list as well. -- Divij Vaidya On Wed, Mar 13, 2024 at 12:55 PM Christopher Shannon < christopher.l.shan...@gmail.com> wrote: > I think it's a great idea to raise a KIP to look at adjusting defaults and > minimum/maximum config values for version 4.0. > > As pointed out, the minimum values for segment.ms and segment.bytes don't > make sense and would probably bring down a cluster pretty quickly if set > that low, so version 4.0 is a good time to fix it and to also look at the > other configs as well for adjustments. > > On Wed, Mar 13, 2024 at 4:39 AM Sergio Daniel Troiano > wrote: > > > hey guys, > > > > Regarding to num.recovery.threads.per.data.dir: I agree, in our company > we > > use the number of vCPUs to do so as this is not competing with ready > > cluster traffic. > > > > > > On Wed, 13 Mar 2024 at 09:29, Luke Chen wrote: > > > > > Hi Divij, > > > > > > Thanks for raising this. > > > The valid minimum value 1 for `segment.ms` is completely unreasonable. > > > Similarly for `segment.bytes`, `metadata.log.segment.ms`, > > > `metadata.log.segment.bytes`. > > > > > > In addition to that, there are also some config default values we'd > like > > to > > > propose to change in v4.0. > > > We can collect more comments from the community, and come out with a > KIP > > > for them. > > > > > > 1. num.recovery.threads.per.data.dir: > > > The current default value is 1. But the log recovery is happening > before > > > brokers are in ready state, which means, we should use all the > available > > > resource to speed up the log recovery to bring the broker to ready > state > > > soon. Default value should be... maybe 4 (to be decided)? > > > > > > 2. Other configs might be able to consider to change the default, but > > open > > > for comments: > > >2.1. num.replica.fetchers: default is 1, but that's not enough when > > > there are multiple partitions in the cluster > > >2.2. `socket.send.buffer.bytes`/`socket.receive.buffer.bytes`: > > > Currently, we set 100kb as default value, but that's not enough for > > > high-speed network. > > > > > > Thank you. > > > Luke > > > > > > > > > On Tue, Mar 12, 2024 at 1:32 AM Divij Vaidya > > > wrote: > > > > > > > Hey folks > > > > > > > > Before I file a KIP to change this in 4.0, I wanted to understand the > > > > historical context for the value of the following setting. > > > > > > > > Currently, segment.ms minimum threshold is set to 1ms [1]. > > > > > > > > Segments are expensive. Every segment uses multiple file descriptors > > and > > > > it's easy to run out of OS limits when creating a large number of > > > segments. > > > > Large number of segments also delays log loading on startup because > of > > > > expensive operations such as iterating through all directories & > > > > conditionally loading all producer state. > > > > > > > > I am currently not aware of a reason as to why someone might want to > > work > > > > with a segment.ms of less than ~10s (number chosen arbitrary that > > looks > > > > sane) > > > > > > > > What was the historical context of setting the minimum threshold to > 1ms > > > for > > > > this setting? > > > > > > > > [1] > > https://kafka.apache.org/documentation.html#topicconfigs_segment.ms > > > > > > > > -- > > > > Divij Vaidya > > > > > > > > > >
Re: [DISCUSS] Minimum constraint for segment.ms
I think it's a great idea to raise a KIP to look at adjusting defaults and minimum/maximum config values for version 4.0. As pointed out, the minimum values for segment.ms and segment.bytes don't make sense and would probably bring down a cluster pretty quickly if set that low, so version 4.0 is a good time to fix it and to also look at the other configs as well for adjustments. On Wed, Mar 13, 2024 at 4:39 AM Sergio Daniel Troiano wrote: > hey guys, > > Regarding to num.recovery.threads.per.data.dir: I agree, in our company we > use the number of vCPUs to do so as this is not competing with ready > cluster traffic. > > > On Wed, 13 Mar 2024 at 09:29, Luke Chen wrote: > > > Hi Divij, > > > > Thanks for raising this. > > The valid minimum value 1 for `segment.ms` is completely unreasonable. > > Similarly for `segment.bytes`, `metadata.log.segment.ms`, > > `metadata.log.segment.bytes`. > > > > In addition to that, there are also some config default values we'd like > to > > propose to change in v4.0. > > We can collect more comments from the community, and come out with a KIP > > for them. > > > > 1. num.recovery.threads.per.data.dir: > > The current default value is 1. But the log recovery is happening before > > brokers are in ready state, which means, we should use all the available > > resource to speed up the log recovery to bring the broker to ready state > > soon. Default value should be... maybe 4 (to be decided)? > > > > 2. Other configs might be able to consider to change the default, but > open > > for comments: > >2.1. num.replica.fetchers: default is 1, but that's not enough when > > there are multiple partitions in the cluster > >2.2. `socket.send.buffer.bytes`/`socket.receive.buffer.bytes`: > > Currently, we set 100kb as default value, but that's not enough for > > high-speed network. > > > > Thank you. > > Luke > > > > > > On Tue, Mar 12, 2024 at 1:32 AM Divij Vaidya > > wrote: > > > > > Hey folks > > > > > > Before I file a KIP to change this in 4.0, I wanted to understand the > > > historical context for the value of the following setting. > > > > > > Currently, segment.ms minimum threshold is set to 1ms [1]. > > > > > > Segments are expensive. Every segment uses multiple file descriptors > and > > > it's easy to run out of OS limits when creating a large number of > > segments. > > > Large number of segments also delays log loading on startup because of > > > expensive operations such as iterating through all directories & > > > conditionally loading all producer state. > > > > > > I am currently not aware of a reason as to why someone might want to > work > > > with a segment.ms of less than ~10s (number chosen arbitrary that > looks > > > sane) > > > > > > What was the historical context of setting the minimum threshold to 1ms > > for > > > this setting? > > > > > > [1] > https://kafka.apache.org/documentation.html#topicconfigs_segment.ms > > > > > > -- > > > Divij Vaidya > > > > > >
[jira] [Created] (KAFKA-16368) Change constraints and default values for various configurations
Divij Vaidya created KAFKA-16368: Summary: Change constraints and default values for various configurations Key: KAFKA-16368 URL: https://issues.apache.org/jira/browse/KAFKA-16368 Project: Kafka Issue Type: Improvement Reporter: Divij Vaidya Fix For: 4.0.0 This Jira is a parent item to track all the defaults and/or constraints that we would like to change with Kafka 4.0. This Jira will be associated with a KIP. Currently, we are gathering feedback from the community on the configurations that don't have sane defaults. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [DISCUSS] Minimum constraint for segment.ms
hey guys, Regarding to num.recovery.threads.per.data.dir: I agree, in our company we use the number of vCPUs to do so as this is not competing with ready cluster traffic. On Wed, 13 Mar 2024 at 09:29, Luke Chen wrote: > Hi Divij, > > Thanks for raising this. > The valid minimum value 1 for `segment.ms` is completely unreasonable. > Similarly for `segment.bytes`, `metadata.log.segment.ms`, > `metadata.log.segment.bytes`. > > In addition to that, there are also some config default values we'd like to > propose to change in v4.0. > We can collect more comments from the community, and come out with a KIP > for them. > > 1. num.recovery.threads.per.data.dir: > The current default value is 1. But the log recovery is happening before > brokers are in ready state, which means, we should use all the available > resource to speed up the log recovery to bring the broker to ready state > soon. Default value should be... maybe 4 (to be decided)? > > 2. Other configs might be able to consider to change the default, but open > for comments: >2.1. num.replica.fetchers: default is 1, but that's not enough when > there are multiple partitions in the cluster >2.2. `socket.send.buffer.bytes`/`socket.receive.buffer.bytes`: > Currently, we set 100kb as default value, but that's not enough for > high-speed network. > > Thank you. > Luke > > > On Tue, Mar 12, 2024 at 1:32 AM Divij Vaidya > wrote: > > > Hey folks > > > > Before I file a KIP to change this in 4.0, I wanted to understand the > > historical context for the value of the following setting. > > > > Currently, segment.ms minimum threshold is set to 1ms [1]. > > > > Segments are expensive. Every segment uses multiple file descriptors and > > it's easy to run out of OS limits when creating a large number of > segments. > > Large number of segments also delays log loading on startup because of > > expensive operations such as iterating through all directories & > > conditionally loading all producer state. > > > > I am currently not aware of a reason as to why someone might want to work > > with a segment.ms of less than ~10s (number chosen arbitrary that looks > > sane) > > > > What was the historical context of setting the minimum threshold to 1ms > for > > this setting? > > > > [1] https://kafka.apache.org/documentation.html#topicconfigs_segment.ms > > > > -- > > Divij Vaidya > > >
Re: [DISCUSS] Minimum constraint for segment.ms
Hi Divij, Thanks for raising this. The valid minimum value 1 for `segment.ms` is completely unreasonable. Similarly for `segment.bytes`, `metadata.log.segment.ms`, `metadata.log.segment.bytes`. In addition to that, there are also some config default values we'd like to propose to change in v4.0. We can collect more comments from the community, and come out with a KIP for them. 1. num.recovery.threads.per.data.dir: The current default value is 1. But the log recovery is happening before brokers are in ready state, which means, we should use all the available resource to speed up the log recovery to bring the broker to ready state soon. Default value should be... maybe 4 (to be decided)? 2. Other configs might be able to consider to change the default, but open for comments: 2.1. num.replica.fetchers: default is 1, but that's not enough when there are multiple partitions in the cluster 2.2. `socket.send.buffer.bytes`/`socket.receive.buffer.bytes`: Currently, we set 100kb as default value, but that's not enough for high-speed network. Thank you. Luke On Tue, Mar 12, 2024 at 1:32 AM Divij Vaidya wrote: > Hey folks > > Before I file a KIP to change this in 4.0, I wanted to understand the > historical context for the value of the following setting. > > Currently, segment.ms minimum threshold is set to 1ms [1]. > > Segments are expensive. Every segment uses multiple file descriptors and > it's easy to run out of OS limits when creating a large number of segments. > Large number of segments also delays log loading on startup because of > expensive operations such as iterating through all directories & > conditionally loading all producer state. > > I am currently not aware of a reason as to why someone might want to work > with a segment.ms of less than ~10s (number chosen arbitrary that looks > sane) > > What was the historical context of setting the minimum threshold to 1ms for > this setting? > > [1] https://kafka.apache.org/documentation.html#topicconfigs_segment.ms > > -- > Divij Vaidya >
Re: [VOTE] KIP-974: Docker Image for GraalVM based Native Kafka Broker
Hi all, Thanks for participating in the discussion and voting! KIP-974 has been accepted with the following +1 votes: - Justine Olshan (binding) - Ismael Juma (binding) - Manikumar (binding) - Federico Valeri (non-binding) Regards, Krishna >
Re: [DISCUSS] KIP-990: Capability to SUSPEND Tasks on DeserializationException
> > I see way too many food-guns and complications that can be introduced. What is a "food-gun"?? I'm picturing like a spud rifle/potato gun but I don't think that's what you meant hahaha I don't feel super strongly one way or another, but I have a few questions & corrections about some of these complaints/concerns: If one task > pauses but other keep running, we keep advancing stream-time downstream, > and thus when the task would resume later, there is a very high > probability that records are dropped as window got already closed. Just to make sure I/everyone understand what you're getting at here, you would be referring to the case of a stateful operation downstream of a key-changing operation which is in turn downstream of the "paused" task -- ie with a repartition separating the paused task and the task with a windowed aggregation? Each task has its own view of stream-time (technically each processor within a task) so the only way that delaying one task and not another would affect which records get dropped is if those two tasks are rekeyed and the repartitioning results in their outputs being mixed -- yes? Anyways I think you make a good case for why pausing a single task -- or even an entire instance if others are allowed to continue running -- might make it too easy for users to shoot themselves in the foot without understanding the full ramifications. Of course, there are already a million ways for users to screw up their app if configured or operated incorrectly, and we shouldn't necessarily kill a feature just because some people might use it when they shouldn't. Why can't we just document that this feature should not be used with applications that include time-sensitive operators? I also feel like you dismissed the "skip record case" somewhat too easily: > For the "skip record case", it's also not possible to skip over an > offset from outside while the application is running True, you can't advance the offset from outside the app, but I don't see why you would want to, much less why you should need to for this to work. Surely the best way to implement this case would just be for the #resume API to behave, and work, exactly the same as the handler's CONTINUE option? In other words, opting for the PAUSE option would simply stall the task, and upon #resume it would just be discarding that record and then continuing on with processing (or even committing the offset immediately after it, perhaps even asynchronously since it presumably doesn't matter if it doesn't succeed and the record is picked up again by accident -- as long as that doesn't happen repeatedly in an infinite loop, which I don't see why it would.) On the subject of committing... Other questions: if a task would be paused, would we commit the current > offset? What happens if we re-balance? Would we just lose the "pause" > state, and hit the same error again and just pause again? I was imagining that we would either just wait without committing, or perhaps even commit everything up to -- but not including -- the "bad" record when PAUSE is triggered. Again, if we rebalance and "lose the pause" then we'll just attempt to process it again, fail, and end up back in PAUSE. This is no different than how successful processing works, no? Who cares if a rebalance happens to strike and causes it to be PAUSED again? All in all, I feel like these concerns are all essentially "true", but to me they just seem like implementation or design decisions and none of them strike them as posing an unsolvable problem for this feature. But maybe I'm just lacking in imagination... Thoughts? On Fri, Mar 8, 2024 at 5:30 PM Matthias J. Sax wrote: > Hey Nick, > > I am sorry that I have to say that I am not a fan of this KIP. I see way > too many food-guns and complications that can be introduced. > > I am also not sure if I understand the motivation. You say, CONTINUE and > FAIL is not good enough, but don't describe in detail why? If we > understand the actual problem better, it might also get clear how > task-pausing would help to address the problem. > > > The main problem I see, as already mentioned by Sophie, it's about time > synchronization. However, its not limited to joins, but affect all > time-based operations, ie, also all windowed aggregations. If one task > pauses but other keep running, we keep advancing stream-time downstream, > and thus when the task would resume later, there is a very high > probability that records are dropped as window got already closed. > > For the runtime itself, we also cannot really do a cascading downstream > pause, because the runtime does not know anything about the semantics of > operators. We don't know if we execute a DSL operator or a PAPI > operator. (We could maybe track all downsteam tasks independent of > semantics, but in the end it might just imply we could also just pause > all task...) > > For the "skip record case", it's also not possible to skip over an > offset from outside