Hi Lianet,

Thanks for the review.

LM1: My intent was to leave the existing behaviour unchanged and support both 
methods equally. For users who upgrade to the new version, subscribe() will 
break on day 1 if we change default implementation to throw 
IllegalStateException. The only change I was planning was to manually register 
the rebalance listener with the subscription state in the deprecated methods, 
where right now we simply pass the listener as an argument. 

LM2: I read the @Deprecated annotation as the version from which the method 
will stick around deprecated i.e. “since 4.4”, but marked for eventual removal 
at a future unspecified version (5.0/6.0).

LM3: Thanks for highlighting the missing methods. I’ve addressed them in the 
updated included/excluded section. I’ve also added the interface for the new 
view in the KIP. It’s meant to mirror the methods of Consumer -> KafkaConsumer 
by delegating the actual calls to the proper implementation.

LM4: I agree with the naming challenge. Some of the alternatives I’d 
considered/rejected: RebalanceHandler, RebalanceView, ConsumerRebalanceView, 
GuardedConsumerView. We landed on ConsumerRebalanceAdapter since it’s not 
really just a view (we allow pause/commit etc.); we already have 
ConsumerRebalanceListener so consistency played a role there. 

I had not considered RebalanceConsumer. It fits here because it supports nearly 
70% of the methods on Consumer and clearly denotes it’s meant for use at 
rebalance time like you said. I agree with this change. I’ve updated the KIP 
with the new name.

Best,
Aditya

> On Apr 21, 2026, at 05:17, Lianet Magrans <[email protected]> wrote:
> 
> Hi, thanks for the KIP!
> 
> LM1: We’re proposing a new API to set the consumer rebalance listener at
> the consumer level (nice), but what will be the behaviour if an app calls
> that new API + any of the “old” subscribe (e.g, subscribe(...,
> ConsumerRebalanceListener)). Do we intend to accept it silently and
> override the listener (similar to what happens today), or to throw some
> kind of IllegalState? I imagine the latter is better to guide users on how
> to properly use the APIs, and aligns with the Deprecation that the KIP
> introduces on the subscribe with listener.
> 
> LM2: About the depreciation of subscribe with listener, one section of the
> KIP says deprecate in 5.0, the sample code has @deprecate tags “since 4.4”.
> What’s the intention?
> 
> LM3: About the new ConsumerRebalanceAdapter, could we include the explicit
> definition of the component?
>    - Is it going to be a class? Or interface consistent with the Consumer
> -> KafkaConsumer? Not clear what the intention is.
>   - There are some KafkaConsumer APIs that are not listed in the
> “Included” section, nor in the “Excluded” section. (E.g, “subscription”,
> the metrics-related ones like registerMetricForSubscription,
> clientInstanceId). What's the intention there?
> 
> LM4: “ConsumerRebalanceAdapter” as a name seems confusing to me for this
> component, as it's not clear what it is (it's just a Consumer in the end,
> just a restricted one to be used for rebalance, correct?). Could we think
> about a better/clear name? (e.g, RebalanceConsumer). I find this one clear
> and to the point, but no strong position, naming is tricky :)
> 
> Thanks!
> Lianet
> 
>> On Fri, Apr 17, 2026 at 3:12 AM Andrew Schofield <[email protected]>
>> wrote:
>> 
>> Thanks. Looks good to me.
>> 
>> Andrew
>> 
>>> On 2026/04/13 23:00:13 Aditya Kousik wrote:
>>> Thank you for the clarification, Andrew.
>>> 
>>> I’d initially made the changes to the KIP along the same lines after
>> your suggestion. I think I just got a little ahead of my thinking without a
>> setListener method; instead to create an interceptor like we do for
>> consumer - the KIP did not reflect this idea. It was just me floating the
>> idea.
>>> 
>>> I think the KIP currently reflects all your suggestions, please take a
>> look when you can. It includes the explicit mark for removal of the
>> subscribe methods,  new setConsumerRebalanceListener method on Consumer.
>> I’ve renamed the name ConsumerRebalanceAdapter - it’s not really just a
>> view because we do allow write-like ops like commit/seek.
>>> 
>>> Best,
>>> Aditya
>>> 
>>>> On Apr 13, 2026, at 10:12, Andrew Schofield <[email protected]>
>> wrote:
>>>> 
>>>> Hi Aditya,
>>>> I think I might have confused you. I didn't write clearly enough so
>> apologies. I'll try again.
>>>> 
>>>> Today, we have:
>>>> 
>>>> public interface ConsumerRebalanceListener {
>>>> 
>>>>   void onPartitionsAssigned(Collection<TopicPartition> partitions);
>>>> 
>>>>   void onPartitionsRevoked(Collection<TopicPartition> partitions);
>>>> 
>>>>   default void onPartitionsLost(Collection<TopicPartition>
>> partitions) {
>>>>       onPartitionsRevoked(partitions);
>>>>   }
>>>> }
>>>> 
>>>> I think what you end up with is:
>>>> 
>>>> public interface ConsumerRebalanceListener {
>>>> 
>>>>   void onPartitionsAssigned(Collection<TopicPartition> partitions);
>>>> 
>>>>   default void onPartitionsAssigned(Collection<TopicPartition>
>> partitions,
>>>>       ConsumerRebalanceAdapter consumerAdapter) {
>>>>     onPartitionsAssigned(partitions);
>>>>   }
>>>> 
>>>>   void onPartitionsRevoked(Collection<TopicPartition> partitions);
>>>> 
>>>>   default void onPartitionsRevoked(Collection<TopicPartition>
>> partitions,
>>>>       ConsumerRebalanceAdapter consumerAdapter) {
>>>>     onPartitionsRevoked(partitions);
>>>>   }
>>>> 
>>>>   default void onPartitionsLost(Collection<TopicPartition>
>> partitions) {
>>>>     onPartitionsRevoked(partitions);
>>>>   }
>>>> 
>>>>   default void onPartitionsLost(Collection<TopicPartition> partitions,
>>>>       ConsumerRebalanceAdapter consumerAdapter) {
>>>>     onPartitionsRevoked(partitions, consumerAdapter);
>>>>   }
>>>> }
>>>> 
>>>> ConsumerRebalanceView is also fine for the cut-down interface to the
>> Consumer. I do think it would be best to start the name with
>> ConsumerRebalance... .
>>>> 
>>>> And then the changes to Consumer are:
>>>> 
>>>> * Add setConsumerRebalanceListener(ConsumerRebalanceListener listener)
>>>> * Deprecate all of the subscribe variants which take a
>> ConsumerRebalanceListener parameter
>>>> 
>>>> 
>>>> I do take the point about having a list of rebalance listeners like
>> the interceptor classes, but I wouldn't wrap that into the same KIP
>> personally. People tend to implement these things in lambdas.
>>>> 
>>>> Thanks,
>>>> Andrew
>>>> 
>>>>> On 2026/04/08 05:30:51 Aditya Kousik wrote:
>>>>> Hello Andrew,
>>>>> 
>>>>> I dwelt on this a bit more. I think supporting both listeners until
>> AK5.0 may not be as irksome as I initially feared. We already do this for
>> classic/async consumers with ConsumerDelegate.
>>>>> 
>>>>> I was curious why subscribe() alone took a client-constructed object
>> like ConsumerRebalanceListener whereas all other hooks were instantiated by
>> the client code via Configurable. We can support both but amply call out
>> via documentation and a log info line about which one will be activated at
>> runtime. This can be opt-in at launch and eventually made the de facto
>> pattern in the next major release.
>>>>> 
>>>>> This would also tie in nicely with subscribe only supporting topic
>> collections + regex and move the callback within the client code like other
>> interceptors already.
>>>>> 
>>>>> I also wanted to call out that this could be an opportunity to
>> support a list of ConsumerRebalanceInterceptor like the others. Currently
>> in my code, i wrap the outer one as a CompositeRebalanceListener with
>> List<ConsumerRebalanceListener> invoked serially. We already do this as I
>> mentioned earlier with producer/consumerInterceptors handling exceptions
>> within each call in a loop by logging a warn.
>>>>> 
>>>>> So rebalance.interceptors with a LIST of fqdn classnames instantiated
>> within the constructor is my current favourite approach. We support one API
>> exactly for all rebalances indicating which one at runtime.
>>>>> 
>>>>> Lmk your thoughts on this.
>>>>> 
>>>>> Thanks,
>>>>> Aditya Kousik
>>>>> 
>>>>>>> On Apr 5, 2026, at 12:37, Aditya Kousik <[email protected]>
>> wrote:
>>>>>> 
>>>>>> Hi Andrew,
>>>>>> 
>>>>>> I see what you’re saying.
>>>>>> With AS1,2 the flow becomes clearer for the subscribe interaction:
>> we only change the subscription state for topics and leave rebalance events
>> to a separate mechanism uncoupled from the subscribe() call.
>>>>>> 
>>>>>> To keep in line with other kafka client classes, can we follow the
>> same convention of using ConsumerConfig to handle this? A new
>> `ConsumerRebalanceInterceptor` with the same signature I’d proposed.
>> Instantiated with Utils#newConfiguredInstance and make the class
>> Configurable. Naming and instantiating makes it closer to existing
>> interceptor classes.
>>>>>> 
>>>>>> My only worry is that as long as ConsumerRebalanceListener exists,
>> this can be a source of confusion for which interface to use for rebalance
>> events. Unless we deprecate it, we bear the burden of invoking both, even
>> if we state that only oneOf(ConsumerRebalanceListener,
>> ConsumberRebalanceInterceptor) will be invoked during rebalances.
>>>>>> 
>>>>>> Would love to hear your thoughts on this.
>>>>>> 
>>>>>> -Aditya
>>>>>> 
>>>>>>> On Apr 5, 2026, at 09:39, Andrew Schofield <[email protected]>
>> wrote:
>>>>>>> 
>>>>>>> Hi Aditya,
>>>>>>> I agree that using the existing ConsumerRebalanceListener gives a
>> lower adoption burden.
>>>>>>> 
>>>>>>> AS1: To be more concrete with what I mean here, we could:
>>>>>>> * Deprecate Consumer.subscribe(Collection<String>,
>> ConsumerRebalanceListener) for removal in AK 5.0
>>>>>>> * Introduce
>> Consumer.setConsumerRebalanceListener(ConsumerRebalanceListener)
>>>>>>> 
>>>>>>> AS2: Given that we already have an interface called
>> ConsumerRebalanceListener, I suggest that ConsumerRebalanceXXX would be a
>> better naming choice for naming your new interface in terms of consistency.
>>>>>>> 
>>>>>>> Thanks,
>>>>>>> Andrew
>>>>>>> 
>>>>>>>>> On 2026/04/04 23:48:28 Aditya Kousik wrote:
>>>>>>>> Hi Andrew, thank you for the quick feedback. It turned out to be
>> pivotal.
>>>>>>>> 
>>>>>>>> One of the rejected alternatives was to Add default methods to
>> ConsumerRebalanceListener.
>>>>>>>> I was ambivalent on this approach with the hopes that a new method
>> and new interface would create the least friction.
>>>>>>>> 
>>>>>>>> You’re right about the state change w.r.t subscribe() variants.
>> With the Classic consumer, we directly update SubscriptionType with
>> setSubscriptionType and similarly a more complex setup for the async
>> consumer. So your setRebalanceHander suggestion seems to follow existing
>> patterns.
>>>>>>>> 
>>>>>>>> However, looking at the places I’d need to pipe RebalanceHandler
>> through, it’s going to add a burden to the plumbing and subscription state.
>>>>>>>> 
>>>>>>>> I’m falling squarely in the extending the existing
>> ConsumerRebalanceListener with new default methods. This also allows
>> existing frameworks like Spring and SmallRye can directly hook into the new
>> method with minimal change.
>>>>>>>> 
>>>>>>>> I’ve renamed/updated the KIP to reflect this. (I can see why
>> people use shareable URLs for the confluence docs)
>>>>>>>> 
>>>>>>>> https://cwiki.apache.org/confluence/x/9ZU8G
>>>>>>>> 
>>>>>>>> -Aditya
>>>>>>>> 
>>>>>>>> 
>>>>>>>>>> On Apr 2, 2026, at 05:50, Andrew Schofield <
>> [email protected]> wrote:
>>>>>>>>> 
>>>>>>>>> Hi Aditya,
>>>>>>>>> Thanks for the KIP. I've only taken a quick look so far, but
>> here's an initial comment.
>>>>>>>>> 
>>>>>>>>> AS1: One of the mistakes in the Kafka consumer API today is that
>> the `subscribe(Collection<String>, ConsumerRebalanceListener)` does two
>> jobs. First, it replaces the rebalance listener (when you might assume that
>> the listener applies only to rebalance changes resulting from this call to
>> subscribe). Second, it changes the subscription. If the second of these
>> throws an exception, the first will still occur. It's a bit of a mess. I
>> suggest you have a `Consumer.setRebalanceHandler(RebalanceHandler)` method
>> and do not add a new override for `Consumer.subscribe`.
>>>>>>>>> 
>>>>>>>>> Thanks,
>>>>>>>>> Andrew
>>>>>>>>> 
>>>>>>>>> On 2026/04/01 15:16:36 Aditya Kousik wrote:
>>>>>>>>>> Hi all,
>>>>>>>>>> 
>>>>>>>>>> I'd like to start a discussion on KIP-1306: RebalanceHandler:
>> Consumer-Aware Rebalance Callback.
>>>>>>>>>> 
>>>>>>>>>> 
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1306%3A+RebalanceHandler%3A+Consumer-Aware+Rebalance+Callback
>>>>>>>>>> 
>>>>>>>>>> Spring Kafka, SmallRye, and Micronaut all pass the consumer into
>> rebalance callbacks; the client doesn't. The standard workaround of
>> constructor-injecting a full Consumer reference allows dangerous operations
>> like poll() and close() inside a callback. This KIP proposes
>> RebalanceHandler, with a RebalanceConsumerView that exposes only safe
>> operations, making misuse a compile error.
>>>>>>>>>> 
>>>>>>>>>> Looking forward to your feedback.
>>>>>>>>>> 
>>>>>>>>>> Thanks
>>>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>> 
>>> 
>> 

Reply via email to