Thanks. Looks good to me. Andrew
On 2026/04/13 23:00:13 Aditya Kousik wrote: > Thank you for the clarification, Andrew. > > I’d initially made the changes to the KIP along the same lines after your > suggestion. I think I just got a little ahead of my thinking without a > setListener method; instead to create an interceptor like we do for consumer > - the KIP did not reflect this idea. It was just me floating the idea. > > I think the KIP currently reflects all your suggestions, please take a look > when you can. It includes the explicit mark for removal of the subscribe > methods, new setConsumerRebalanceListener method on Consumer. I’ve renamed > the name ConsumerRebalanceAdapter - it’s not really just a view because we do > allow write-like ops like commit/seek. > > Best, > Aditya > > > On Apr 13, 2026, at 10:12, Andrew Schofield <[email protected]> wrote: > > > > Hi Aditya, > > I think I might have confused you. I didn't write clearly enough so > > apologies. I'll try again. > > > > Today, we have: > > > > public interface ConsumerRebalanceListener { > > > > void onPartitionsAssigned(Collection<TopicPartition> partitions); > > > > void onPartitionsRevoked(Collection<TopicPartition> partitions); > > > > default void onPartitionsLost(Collection<TopicPartition> partitions) { > > onPartitionsRevoked(partitions); > > } > > } > > > > I think what you end up with is: > > > > public interface ConsumerRebalanceListener { > > > > void onPartitionsAssigned(Collection<TopicPartition> partitions); > > > > default void onPartitionsAssigned(Collection<TopicPartition> partitions, > > ConsumerRebalanceAdapter consumerAdapter) { > > onPartitionsAssigned(partitions); > > } > > > > void onPartitionsRevoked(Collection<TopicPartition> partitions); > > > > default void onPartitionsRevoked(Collection<TopicPartition> partitions, > > ConsumerRebalanceAdapter consumerAdapter) { > > onPartitionsRevoked(partitions); > > } > > > > default void onPartitionsLost(Collection<TopicPartition> partitions) { > > onPartitionsRevoked(partitions); > > } > > > > default void onPartitionsLost(Collection<TopicPartition> partitions, > > ConsumerRebalanceAdapter consumerAdapter) { > > onPartitionsRevoked(partitions, consumerAdapter); > > } > > } > > > > ConsumerRebalanceView is also fine for the cut-down interface to the > > Consumer. I do think it would be best to start the name with > > ConsumerRebalance... . > > > > And then the changes to Consumer are: > > > > * Add setConsumerRebalanceListener(ConsumerRebalanceListener listener) > > * Deprecate all of the subscribe variants which take a > > ConsumerRebalanceListener parameter > > > > > > I do take the point about having a list of rebalance listeners like the > > interceptor classes, but I wouldn't wrap that into the same KIP personally. > > People tend to implement these things in lambdas. > > > > Thanks, > > Andrew > > > >> On 2026/04/08 05:30:51 Aditya Kousik wrote: > >> Hello Andrew, > >> > >> I dwelt on this a bit more. I think supporting both listeners until AK5.0 > >> may not be as irksome as I initially feared. We already do this for > >> classic/async consumers with ConsumerDelegate. > >> > >> I was curious why subscribe() alone took a client-constructed object like > >> ConsumerRebalanceListener whereas all other hooks were instantiated by the > >> client code via Configurable. We can support both but amply call out via > >> documentation and a log info line about which one will be activated at > >> runtime. This can be opt-in at launch and eventually made the de facto > >> pattern in the next major release. > >> > >> This would also tie in nicely with subscribe only supporting topic > >> collections + regex and move the callback within the client code like > >> other interceptors already. > >> > >> I also wanted to call out that this could be an opportunity to support a > >> list of ConsumerRebalanceInterceptor like the others. Currently in my > >> code, i wrap the outer one as a CompositeRebalanceListener with > >> List<ConsumerRebalanceListener> invoked serially. We already do this as I > >> mentioned earlier with producer/consumerInterceptors handling exceptions > >> within each call in a loop by logging a warn. > >> > >> So rebalance.interceptors with a LIST of fqdn classnames instantiated > >> within the constructor is my current favourite approach. We support one > >> API exactly for all rebalances indicating which one at runtime. > >> > >> Lmk your thoughts on this. > >> > >> Thanks, > >> Aditya Kousik > >> > >>>> On Apr 5, 2026, at 12:37, Aditya Kousik <[email protected]> wrote: > >>> > >>> Hi Andrew, > >>> > >>> I see what you’re saying. > >>> With AS1,2 the flow becomes clearer for the subscribe interaction: we > >>> only change the subscription state for topics and leave rebalance events > >>> to a separate mechanism uncoupled from the subscribe() call. > >>> > >>> To keep in line with other kafka client classes, can we follow the same > >>> convention of using ConsumerConfig to handle this? A new > >>> `ConsumerRebalanceInterceptor` with the same signature I’d proposed. > >>> Instantiated with Utils#newConfiguredInstance and make the class > >>> Configurable. Naming and instantiating makes it closer to existing > >>> interceptor classes. > >>> > >>> My only worry is that as long as ConsumerRebalanceListener exists, this > >>> can be a source of confusion for which interface to use for rebalance > >>> events. Unless we deprecate it, we bear the burden of invoking both, even > >>> if we state that only oneOf(ConsumerRebalanceListener, > >>> ConsumberRebalanceInterceptor) will be invoked during rebalances. > >>> > >>> Would love to hear your thoughts on this. > >>> > >>> -Aditya > >>> > >>>> On Apr 5, 2026, at 09:39, Andrew Schofield <[email protected]> wrote: > >>>> > >>>> Hi Aditya, > >>>> I agree that using the existing ConsumerRebalanceListener gives a lower > >>>> adoption burden. > >>>> > >>>> AS1: To be more concrete with what I mean here, we could: > >>>> * Deprecate Consumer.subscribe(Collection<String>, > >>>> ConsumerRebalanceListener) for removal in AK 5.0 > >>>> * Introduce > >>>> Consumer.setConsumerRebalanceListener(ConsumerRebalanceListener) > >>>> > >>>> AS2: Given that we already have an interface called > >>>> ConsumerRebalanceListener, I suggest that ConsumerRebalanceXXX would be > >>>> a better naming choice for naming your new interface in terms of > >>>> consistency. > >>>> > >>>> Thanks, > >>>> Andrew > >>>> > >>>>>> On 2026/04/04 23:48:28 Aditya Kousik wrote: > >>>>> Hi Andrew, thank you for the quick feedback. It turned out to be > >>>>> pivotal. > >>>>> > >>>>> One of the rejected alternatives was to Add default methods to > >>>>> ConsumerRebalanceListener. > >>>>> I was ambivalent on this approach with the hopes that a new method and > >>>>> new interface would create the least friction. > >>>>> > >>>>> You’re right about the state change w.r.t subscribe() variants. With > >>>>> the Classic consumer, we directly update SubscriptionType with > >>>>> setSubscriptionType and similarly a more complex setup for the async > >>>>> consumer. So your setRebalanceHander suggestion seems to follow > >>>>> existing patterns. > >>>>> > >>>>> However, looking at the places I’d need to pipe RebalanceHandler > >>>>> through, it’s going to add a burden to the plumbing and subscription > >>>>> state. > >>>>> > >>>>> I’m falling squarely in the extending the existing > >>>>> ConsumerRebalanceListener with new default methods. This also allows > >>>>> existing frameworks like Spring and SmallRye can directly hook into the > >>>>> new method with minimal change. > >>>>> > >>>>> I’ve renamed/updated the KIP to reflect this. (I can see why people use > >>>>> shareable URLs for the confluence docs) > >>>>> > >>>>> https://cwiki.apache.org/confluence/x/9ZU8G > >>>>> > >>>>> -Aditya > >>>>> > >>>>> > >>>>>>> On Apr 2, 2026, at 05:50, Andrew Schofield <[email protected]> > >>>>>>> wrote: > >>>>>> > >>>>>> Hi Aditya, > >>>>>> Thanks for the KIP. I've only taken a quick look so far, but here's an > >>>>>> initial comment. > >>>>>> > >>>>>> AS1: One of the mistakes in the Kafka consumer API today is that the > >>>>>> `subscribe(Collection<String>, ConsumerRebalanceListener)` does two > >>>>>> jobs. First, it replaces the rebalance listener (when you might assume > >>>>>> that the listener applies only to rebalance changes resulting from > >>>>>> this call to subscribe). Second, it changes the subscription. If the > >>>>>> second of these throws an exception, the first will still occur. It's > >>>>>> a bit of a mess. I suggest you have a > >>>>>> `Consumer.setRebalanceHandler(RebalanceHandler)` method and do not add > >>>>>> a new override for `Consumer.subscribe`. > >>>>>> > >>>>>> Thanks, > >>>>>> Andrew > >>>>>> > >>>>>> On 2026/04/01 15:16:36 Aditya Kousik wrote: > >>>>>>> Hi all, > >>>>>>> > >>>>>>> I'd like to start a discussion on KIP-1306: RebalanceHandler: > >>>>>>> Consumer-Aware Rebalance Callback. > >>>>>>> > >>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1306%3A+RebalanceHandler%3A+Consumer-Aware+Rebalance+Callback > >>>>>>> > >>>>>>> Spring Kafka, SmallRye, and Micronaut all pass the consumer into > >>>>>>> rebalance callbacks; the client doesn't. The standard workaround of > >>>>>>> constructor-injecting a full Consumer reference allows dangerous > >>>>>>> operations like poll() and close() inside a callback. This KIP > >>>>>>> proposes RebalanceHandler, with a RebalanceConsumerView that exposes > >>>>>>> only safe operations, making misuse a compile error. > >>>>>>> > >>>>>>> Looking forward to your feedback. > >>>>>>> > >>>>>>> Thanks > >>>>>>> > >>>>> > >>>>> > >> >
