Thanks. Looks good to me.

Andrew

On 2026/04/13 23:00:13 Aditya Kousik wrote:
> Thank you for the clarification, Andrew. 
> 
> I’d initially made the changes to the KIP along the same lines after your 
> suggestion. I think I just got a little ahead of my thinking without a 
> setListener method; instead to create an interceptor like we do for consumer 
> - the KIP did not reflect this idea. It was just me floating the idea.  
> 
> I think the KIP currently reflects all your suggestions, please take a look 
> when you can. It includes the explicit mark for removal of the subscribe 
> methods,  new setConsumerRebalanceListener method on Consumer. I’ve renamed 
> the name ConsumerRebalanceAdapter - it’s not really just a view because we do 
> allow write-like ops like commit/seek.
> 
> Best,
> Aditya
> 
> > On Apr 13, 2026, at 10:12, Andrew Schofield <[email protected]> wrote:
> > 
> > Hi Aditya,
> > I think I might have confused you. I didn't write clearly enough so 
> > apologies. I'll try again.
> > 
> > Today, we have:
> > 
> > public interface ConsumerRebalanceListener {
> > 
> >    void onPartitionsAssigned(Collection<TopicPartition> partitions);
> > 
> >    void onPartitionsRevoked(Collection<TopicPartition> partitions);
> > 
> >    default void onPartitionsLost(Collection<TopicPartition> partitions) {
> >        onPartitionsRevoked(partitions);
> >    }
> > }
> > 
> > I think what you end up with is:
> > 
> > public interface ConsumerRebalanceListener {
> > 
> >    void onPartitionsAssigned(Collection<TopicPartition> partitions);
> > 
> >    default void onPartitionsAssigned(Collection<TopicPartition> partitions,
> >        ConsumerRebalanceAdapter consumerAdapter) {
> >      onPartitionsAssigned(partitions);
> >    }
> > 
> >    void onPartitionsRevoked(Collection<TopicPartition> partitions);
> > 
> >    default void onPartitionsRevoked(Collection<TopicPartition> partitions,
> >        ConsumerRebalanceAdapter consumerAdapter) {
> >      onPartitionsRevoked(partitions);
> >    }
> > 
> >    default void onPartitionsLost(Collection<TopicPartition> partitions) {
> >      onPartitionsRevoked(partitions);
> >    }
> > 
> >    default void onPartitionsLost(Collection<TopicPartition> partitions,
> >        ConsumerRebalanceAdapter consumerAdapter) {
> >      onPartitionsRevoked(partitions, consumerAdapter);
> >    }
> > }
> > 
> > ConsumerRebalanceView is also fine for the cut-down interface to the 
> > Consumer. I do think it would be best to start the name with 
> > ConsumerRebalance... .
> > 
> > And then the changes to Consumer are:
> > 
> > * Add setConsumerRebalanceListener(ConsumerRebalanceListener listener)
> > * Deprecate all of the subscribe variants which take a 
> > ConsumerRebalanceListener parameter
> > 
> > 
> > I do take the point about having a list of rebalance listeners like the 
> > interceptor classes, but I wouldn't wrap that into the same KIP personally. 
> > People tend to implement these things in lambdas.
> > 
> > Thanks,
> > Andrew
> > 
> >> On 2026/04/08 05:30:51 Aditya Kousik wrote:
> >> Hello Andrew,
> >> 
> >> I dwelt on this a bit more. I think supporting both listeners until AK5.0 
> >> may not be as irksome as I initially feared. We already do this for 
> >> classic/async consumers with ConsumerDelegate.
> >> 
> >> I was curious why subscribe() alone took a client-constructed object like 
> >> ConsumerRebalanceListener whereas all other hooks were instantiated by the 
> >> client code via Configurable. We can support both but amply call out via 
> >> documentation and a log info line about which one will be activated at 
> >> runtime. This can be opt-in at launch and eventually made the de facto 
> >> pattern in the next major release.
> >> 
> >> This would also tie in nicely with subscribe only supporting topic 
> >> collections + regex and move the callback within the client code like 
> >> other interceptors already.
> >> 
> >> I also wanted to call out that this could be an opportunity to support a 
> >> list of ConsumerRebalanceInterceptor like the others. Currently in my 
> >> code, i wrap the outer one as a CompositeRebalanceListener with 
> >> List<ConsumerRebalanceListener> invoked serially. We already do this as I 
> >> mentioned earlier with producer/consumerInterceptors handling exceptions 
> >> within each call in a loop by logging a warn.
> >> 
> >> So rebalance.interceptors with a LIST of fqdn classnames instantiated 
> >> within the constructor is my current favourite approach. We support one 
> >> API exactly for all rebalances indicating which one at runtime.
> >> 
> >> Lmk your thoughts on this.
> >> 
> >> Thanks,
> >> Aditya Kousik
> >> 
> >>>> On Apr 5, 2026, at 12:37, Aditya Kousik <[email protected]> wrote:
> >>> 
> >>> Hi Andrew,
> >>> 
> >>> I see what you’re saying.
> >>> With AS1,2 the flow becomes clearer for the subscribe interaction: we 
> >>> only change the subscription state for topics and leave rebalance events 
> >>> to a separate mechanism uncoupled from the subscribe() call.
> >>> 
> >>> To keep in line with other kafka client classes, can we follow the same 
> >>> convention of using ConsumerConfig to handle this? A new 
> >>> `ConsumerRebalanceInterceptor` with the same signature I’d proposed. 
> >>> Instantiated with Utils#newConfiguredInstance and make the class 
> >>> Configurable. Naming and instantiating makes it closer to existing 
> >>> interceptor classes.
> >>> 
> >>> My only worry is that as long as ConsumerRebalanceListener exists, this 
> >>> can be a source of confusion for which interface to use for rebalance 
> >>> events. Unless we deprecate it, we bear the burden of invoking both, even 
> >>> if we state that only oneOf(ConsumerRebalanceListener, 
> >>> ConsumberRebalanceInterceptor) will be invoked during rebalances.
> >>> 
> >>> Would love to hear your thoughts on this.
> >>> 
> >>> -Aditya
> >>> 
> >>>> On Apr 5, 2026, at 09:39, Andrew Schofield <[email protected]> wrote:
> >>>> 
> >>>> Hi Aditya,
> >>>> I agree that using the existing ConsumerRebalanceListener gives a lower 
> >>>> adoption burden.
> >>>> 
> >>>> AS1: To be more concrete with what I mean here, we could:
> >>>> * Deprecate Consumer.subscribe(Collection<String>, 
> >>>> ConsumerRebalanceListener) for removal in AK 5.0
> >>>> * Introduce 
> >>>> Consumer.setConsumerRebalanceListener(ConsumerRebalanceListener)
> >>>> 
> >>>> AS2: Given that we already have an interface called 
> >>>> ConsumerRebalanceListener, I suggest that ConsumerRebalanceXXX would be 
> >>>> a better naming choice for naming your new interface in terms of 
> >>>> consistency.
> >>>> 
> >>>> Thanks,
> >>>> Andrew
> >>>> 
> >>>>>> On 2026/04/04 23:48:28 Aditya Kousik wrote:
> >>>>> Hi Andrew, thank you for the quick feedback. It turned out to be 
> >>>>> pivotal.
> >>>>> 
> >>>>> One of the rejected alternatives was to Add default methods to 
> >>>>> ConsumerRebalanceListener.
> >>>>> I was ambivalent on this approach with the hopes that a new method and 
> >>>>> new interface would create the least friction.
> >>>>> 
> >>>>> You’re right about the state change w.r.t subscribe() variants. With 
> >>>>> the Classic consumer, we directly update SubscriptionType with 
> >>>>> setSubscriptionType and similarly a more complex setup for the async 
> >>>>> consumer. So your setRebalanceHander suggestion seems to follow 
> >>>>> existing patterns.
> >>>>> 
> >>>>> However, looking at the places I’d need to pipe RebalanceHandler 
> >>>>> through, it’s going to add a burden to the plumbing and subscription 
> >>>>> state.
> >>>>> 
> >>>>> I’m falling squarely in the extending the existing 
> >>>>> ConsumerRebalanceListener with new default methods. This also allows 
> >>>>> existing frameworks like Spring and SmallRye can directly hook into the 
> >>>>> new method with minimal change.
> >>>>> 
> >>>>> I’ve renamed/updated the KIP to reflect this. (I can see why people use 
> >>>>> shareable URLs for the confluence docs)
> >>>>> 
> >>>>> https://cwiki.apache.org/confluence/x/9ZU8G
> >>>>> 
> >>>>> -Aditya
> >>>>> 
> >>>>> 
> >>>>>>> On Apr 2, 2026, at 05:50, Andrew Schofield <[email protected]> 
> >>>>>>> wrote:
> >>>>>> 
> >>>>>> Hi Aditya,
> >>>>>> Thanks for the KIP. I've only taken a quick look so far, but here's an 
> >>>>>> initial comment.
> >>>>>> 
> >>>>>> AS1: One of the mistakes in the Kafka consumer API today is that the 
> >>>>>> `subscribe(Collection<String>, ConsumerRebalanceListener)` does two 
> >>>>>> jobs. First, it replaces the rebalance listener (when you might assume 
> >>>>>> that the listener applies only to rebalance changes resulting from 
> >>>>>> this call to subscribe). Second, it changes the subscription. If the 
> >>>>>> second of these throws an exception, the first will still occur. It's 
> >>>>>> a bit of a mess. I suggest you have a 
> >>>>>> `Consumer.setRebalanceHandler(RebalanceHandler)` method and do not add 
> >>>>>> a new override for `Consumer.subscribe`.
> >>>>>> 
> >>>>>> Thanks,
> >>>>>> Andrew
> >>>>>> 
> >>>>>> On 2026/04/01 15:16:36 Aditya Kousik wrote:
> >>>>>>> Hi all,
> >>>>>>> 
> >>>>>>> I'd like to start a discussion on KIP-1306: RebalanceHandler: 
> >>>>>>> Consumer-Aware Rebalance Callback.
> >>>>>>> 
> >>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1306%3A+RebalanceHandler%3A+Consumer-Aware+Rebalance+Callback
> >>>>>>> 
> >>>>>>> Spring Kafka, SmallRye, and Micronaut all pass the consumer into 
> >>>>>>> rebalance callbacks; the client doesn't. The standard workaround of 
> >>>>>>> constructor-injecting a full Consumer reference allows dangerous 
> >>>>>>> operations like poll() and close() inside a callback. This KIP 
> >>>>>>> proposes RebalanceHandler, with a RebalanceConsumerView that exposes 
> >>>>>>> only safe operations, making misuse a compile error.
> >>>>>>> 
> >>>>>>> Looking forward to your feedback.
> >>>>>>> 
> >>>>>>> Thanks
> >>>>>>> 
> >>>>> 
> >>>>> 
> >> 
> 

Reply via email to