lianetm commented on code in PR #15742:
URL: https://github.com/apache/kafka/pull/15742#discussion_r1570871801


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -228,13 +228,16 @@ private void process(final ErrorEvent event) {
         }
 
         private void process(final 
ConsumerRebalanceListenerCallbackNeededEvent event) {
-            ApplicationEvent invokedEvent = invokeRebalanceCallbacks(
+            ConsumerRebalanceListenerCallbackCompletedEvent invokedEvent = 
invokeRebalanceCallbacks(
                 rebalanceListenerInvoker,
                 event.methodName(),
                 event.partitions(),
                 event.future()
             );
             applicationEventHandler.add(invokedEvent);
+            if (invokedEvent.error().isPresent()) {
+                throw invokedEvent.error().get();

Review Comment:
   Kind of aligned with @lucasbru here. I totally get your concern @kirktrue , 
but as I see it we're in a very different territory, not only with the new 
consumer architecture (all that @lucasbru described), but also with the new 
protocol (which is the only one supported here), so I lean towards keeping it 
simple as an initial approach, based on how we expect things to happen in 
practice here. With the new protocol, we get revocations first, and then new 
partitions in a following reconciliation loop. If revocation callback fails, 
the reconciliation will continue to be retried on the next poll loop, 
triggering callbacks continuously (that's what will be happening in the 
background). At the same time, in the foreground, we'll be raising the 
revocation callback failure to the user (with this PR). 
   
   > But after a listener execution has failed, we don't seem to update the 
subscription state in the reconciliation.
   
   Agree, just for the record, that holds true for the listeners of partitions 
revoked and lost (subscription state is only updated when the callbacks 
complete). In the case of assigned partitions, the subscription is updated 
before the callback, just aligning with the onPartitionsAssigned contract, 
which is that it is called when the rebalance completes



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to