Hi, thanks for the KIP!
LM1: We’re proposing a new API to set the consumer rebalance listener at
the consumer level (nice), but what will be the behaviour if an app calls
that new API + any of the “old” subscribe (e.g, subscribe(...,
ConsumerRebalanceListener)). Do we intend to accept it silently and
override the listener (similar to what happens today), or to throw some
kind of IllegalState? I imagine the latter is better to guide users on how
to properly use the APIs, and aligns with the Deprecation that the KIP
introduces on the subscribe with listener.
LM2: About the depreciation of subscribe with listener, one section of the
KIP says deprecate in 5.0, the sample code has @deprecate tags “since 4.4”.
What’s the intention?
LM3: About the new ConsumerRebalanceAdapter, could we include the explicit
definition of the component?
- Is it going to be a class? Or interface consistent with the Consumer
-> KafkaConsumer? Not clear what the intention is.
- There are some KafkaConsumer APIs that are not listed in the
“Included” section, nor in the “Excluded” section. (E.g, “subscription”,
the metrics-related ones like registerMetricForSubscription,
clientInstanceId). What's the intention there?
LM4: “ConsumerRebalanceAdapter” as a name seems confusing to me for this
component, as it's not clear what it is (it's just a Consumer in the end,
just a restricted one to be used for rebalance, correct?). Could we think
about a better/clear name? (e.g, RebalanceConsumer). I find this one clear
and to the point, but no strong position, naming is tricky :)
Thanks!
Lianet
On Fri, Apr 17, 2026 at 3:12 AM Andrew Schofield <[email protected]>
wrote:
> Thanks. Looks good to me.
>
> Andrew
>
> On 2026/04/13 23:00:13 Aditya Kousik wrote:
> > Thank you for the clarification, Andrew.
> >
> > I’d initially made the changes to the KIP along the same lines after
> your suggestion. I think I just got a little ahead of my thinking without a
> setListener method; instead to create an interceptor like we do for
> consumer - the KIP did not reflect this idea. It was just me floating the
> idea.
> >
> > I think the KIP currently reflects all your suggestions, please take a
> look when you can. It includes the explicit mark for removal of the
> subscribe methods, new setConsumerRebalanceListener method on Consumer.
> I’ve renamed the name ConsumerRebalanceAdapter - it’s not really just a
> view because we do allow write-like ops like commit/seek.
> >
> > Best,
> > Aditya
> >
> > > On Apr 13, 2026, at 10:12, Andrew Schofield <[email protected]>
> wrote:
> > >
> > > Hi Aditya,
> > > I think I might have confused you. I didn't write clearly enough so
> apologies. I'll try again.
> > >
> > > Today, we have:
> > >
> > > public interface ConsumerRebalanceListener {
> > >
> > > void onPartitionsAssigned(Collection<TopicPartition> partitions);
> > >
> > > void onPartitionsRevoked(Collection<TopicPartition> partitions);
> > >
> > > default void onPartitionsLost(Collection<TopicPartition>
> partitions) {
> > > onPartitionsRevoked(partitions);
> > > }
> > > }
> > >
> > > I think what you end up with is:
> > >
> > > public interface ConsumerRebalanceListener {
> > >
> > > void onPartitionsAssigned(Collection<TopicPartition> partitions);
> > >
> > > default void onPartitionsAssigned(Collection<TopicPartition>
> partitions,
> > > ConsumerRebalanceAdapter consumerAdapter) {
> > > onPartitionsAssigned(partitions);
> > > }
> > >
> > > void onPartitionsRevoked(Collection<TopicPartition> partitions);
> > >
> > > default void onPartitionsRevoked(Collection<TopicPartition>
> partitions,
> > > ConsumerRebalanceAdapter consumerAdapter) {
> > > onPartitionsRevoked(partitions);
> > > }
> > >
> > > default void onPartitionsLost(Collection<TopicPartition>
> partitions) {
> > > onPartitionsRevoked(partitions);
> > > }
> > >
> > > default void onPartitionsLost(Collection<TopicPartition> partitions,
> > > ConsumerRebalanceAdapter consumerAdapter) {
> > > onPartitionsRevoked(partitions, consumerAdapter);
> > > }
> > > }
> > >
> > > ConsumerRebalanceView is also fine for the cut-down interface to the
> Consumer. I do think it would be best to start the name with
> ConsumerRebalance... .
> > >
> > > And then the changes to Consumer are:
> > >
> > > * Add setConsumerRebalanceListener(ConsumerRebalanceListener listener)
> > > * Deprecate all of the subscribe variants which take a
> ConsumerRebalanceListener parameter
> > >
> > >
> > > I do take the point about having a list of rebalance listeners like
> the interceptor classes, but I wouldn't wrap that into the same KIP
> personally. People tend to implement these things in lambdas.
> > >
> > > Thanks,
> > > Andrew
> > >
> > >> On 2026/04/08 05:30:51 Aditya Kousik wrote:
> > >> Hello Andrew,
> > >>
> > >> I dwelt on this a bit more. I think supporting both listeners until
> AK5.0 may not be as irksome as I initially feared. We already do this for
> classic/async consumers with ConsumerDelegate.
> > >>
> > >> I was curious why subscribe() alone took a client-constructed object
> like ConsumerRebalanceListener whereas all other hooks were instantiated by
> the client code via Configurable. We can support both but amply call out
> via documentation and a log info line about which one will be activated at
> runtime. This can be opt-in at launch and eventually made the de facto
> pattern in the next major release.
> > >>
> > >> This would also tie in nicely with subscribe only supporting topic
> collections + regex and move the callback within the client code like other
> interceptors already.
> > >>
> > >> I also wanted to call out that this could be an opportunity to
> support a list of ConsumerRebalanceInterceptor like the others. Currently
> in my code, i wrap the outer one as a CompositeRebalanceListener with
> List<ConsumerRebalanceListener> invoked serially. We already do this as I
> mentioned earlier with producer/consumerInterceptors handling exceptions
> within each call in a loop by logging a warn.
> > >>
> > >> So rebalance.interceptors with a LIST of fqdn classnames instantiated
> within the constructor is my current favourite approach. We support one API
> exactly for all rebalances indicating which one at runtime.
> > >>
> > >> Lmk your thoughts on this.
> > >>
> > >> Thanks,
> > >> Aditya Kousik
> > >>
> > >>>> On Apr 5, 2026, at 12:37, Aditya Kousik <[email protected]>
> wrote:
> > >>>
> > >>> Hi Andrew,
> > >>>
> > >>> I see what you’re saying.
> > >>> With AS1,2 the flow becomes clearer for the subscribe interaction:
> we only change the subscription state for topics and leave rebalance events
> to a separate mechanism uncoupled from the subscribe() call.
> > >>>
> > >>> To keep in line with other kafka client classes, can we follow the
> same convention of using ConsumerConfig to handle this? A new
> `ConsumerRebalanceInterceptor` with the same signature I’d proposed.
> Instantiated with Utils#newConfiguredInstance and make the class
> Configurable. Naming and instantiating makes it closer to existing
> interceptor classes.
> > >>>
> > >>> My only worry is that as long as ConsumerRebalanceListener exists,
> this can be a source of confusion for which interface to use for rebalance
> events. Unless we deprecate it, we bear the burden of invoking both, even
> if we state that only oneOf(ConsumerRebalanceListener,
> ConsumberRebalanceInterceptor) will be invoked during rebalances.
> > >>>
> > >>> Would love to hear your thoughts on this.
> > >>>
> > >>> -Aditya
> > >>>
> > >>>> On Apr 5, 2026, at 09:39, Andrew Schofield <[email protected]>
> wrote:
> > >>>>
> > >>>> Hi Aditya,
> > >>>> I agree that using the existing ConsumerRebalanceListener gives a
> lower adoption burden.
> > >>>>
> > >>>> AS1: To be more concrete with what I mean here, we could:
> > >>>> * Deprecate Consumer.subscribe(Collection<String>,
> ConsumerRebalanceListener) for removal in AK 5.0
> > >>>> * Introduce
> Consumer.setConsumerRebalanceListener(ConsumerRebalanceListener)
> > >>>>
> > >>>> AS2: Given that we already have an interface called
> ConsumerRebalanceListener, I suggest that ConsumerRebalanceXXX would be a
> better naming choice for naming your new interface in terms of consistency.
> > >>>>
> > >>>> Thanks,
> > >>>> Andrew
> > >>>>
> > >>>>>> On 2026/04/04 23:48:28 Aditya Kousik wrote:
> > >>>>> Hi Andrew, thank you for the quick feedback. It turned out to be
> pivotal.
> > >>>>>
> > >>>>> One of the rejected alternatives was to Add default methods to
> ConsumerRebalanceListener.
> > >>>>> I was ambivalent on this approach with the hopes that a new method
> and new interface would create the least friction.
> > >>>>>
> > >>>>> You’re right about the state change w.r.t subscribe() variants.
> With the Classic consumer, we directly update SubscriptionType with
> setSubscriptionType and similarly a more complex setup for the async
> consumer. So your setRebalanceHander suggestion seems to follow existing
> patterns.
> > >>>>>
> > >>>>> However, looking at the places I’d need to pipe RebalanceHandler
> through, it’s going to add a burden to the plumbing and subscription state.
> > >>>>>
> > >>>>> I’m falling squarely in the extending the existing
> ConsumerRebalanceListener with new default methods. This also allows
> existing frameworks like Spring and SmallRye can directly hook into the new
> method with minimal change.
> > >>>>>
> > >>>>> I’ve renamed/updated the KIP to reflect this. (I can see why
> people use shareable URLs for the confluence docs)
> > >>>>>
> > >>>>> https://cwiki.apache.org/confluence/x/9ZU8G
> > >>>>>
> > >>>>> -Aditya
> > >>>>>
> > >>>>>
> > >>>>>>> On Apr 2, 2026, at 05:50, Andrew Schofield <
> [email protected]> wrote:
> > >>>>>>
> > >>>>>> Hi Aditya,
> > >>>>>> Thanks for the KIP. I've only taken a quick look so far, but
> here's an initial comment.
> > >>>>>>
> > >>>>>> AS1: One of the mistakes in the Kafka consumer API today is that
> the `subscribe(Collection<String>, ConsumerRebalanceListener)` does two
> jobs. First, it replaces the rebalance listener (when you might assume that
> the listener applies only to rebalance changes resulting from this call to
> subscribe). Second, it changes the subscription. If the second of these
> throws an exception, the first will still occur. It's a bit of a mess. I
> suggest you have a `Consumer.setRebalanceHandler(RebalanceHandler)` method
> and do not add a new override for `Consumer.subscribe`.
> > >>>>>>
> > >>>>>> Thanks,
> > >>>>>> Andrew
> > >>>>>>
> > >>>>>> On 2026/04/01 15:16:36 Aditya Kousik wrote:
> > >>>>>>> Hi all,
> > >>>>>>>
> > >>>>>>> I'd like to start a discussion on KIP-1306: RebalanceHandler:
> Consumer-Aware Rebalance Callback.
> > >>>>>>>
> > >>>>>>>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1306%3A+RebalanceHandler%3A+Consumer-Aware+Rebalance+Callback
> > >>>>>>>
> > >>>>>>> Spring Kafka, SmallRye, and Micronaut all pass the consumer into
> rebalance callbacks; the client doesn't. The standard workaround of
> constructor-injecting a full Consumer reference allows dangerous operations
> like poll() and close() inside a callback. This KIP proposes
> RebalanceHandler, with a RebalanceConsumerView that exposes only safe
> operations, making misuse a compile error.
> > >>>>>>>
> > >>>>>>> Looking forward to your feedback.
> > >>>>>>>
> > >>>>>>> Thanks
> > >>>>>>>
> > >>>>>
> > >>>>>
> > >>
> >
>