lianetm commented on code in PR #15742: URL: https://github.com/apache/kafka/pull/15742#discussion_r1570871801
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ########## @@ -228,13 +228,16 @@ private void process(final ErrorEvent event) { } private void process(final ConsumerRebalanceListenerCallbackNeededEvent event) { - ApplicationEvent invokedEvent = invokeRebalanceCallbacks( + ConsumerRebalanceListenerCallbackCompletedEvent invokedEvent = invokeRebalanceCallbacks( rebalanceListenerInvoker, event.methodName(), event.partitions(), event.future() ); applicationEventHandler.add(invokedEvent); + if (invokedEvent.error().isPresent()) { + throw invokedEvent.error().get(); Review Comment: Kind of aligned with @lucasbru here. I totally get your concern @kirktrue , but as I see it we're in a very different territory, not only with the new consumer architecture (all that @lucasbru described), but also with the new protocol (which is the only one supported here), so I lean towards keeping it simple as an initial approach, based on how we expect things to happen in practice here. With the new protocol, we get revocations first, and then new partitions in a following reconciliation loop. If revocation callback fails, the reconciliation will continue to be retried on the next poll loop, triggering callbacks continuously (that's what will be happening in the background). At the same time, in the foreground, we'll be raising the revocation callback failure to the user (with this PR). > But after a listener execution has failed, we don't seem to update the subscription state in the reconciliation. Agree, just for the record, that holds true for the listeners of partitions revoked and lost (subscription state is only updated when the callbacks complete). In the case of assigned partitions, the subscription is updated before the callback, just aligning with the onPartitionsAssigned contract, which is that it is called when the rebalance completes -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org