Re: [PR] KAFKA-16298: Ensure rebalance listener exceptions are propagated to the user on consumer poll [kafka]

2024-04-26 Thread via GitHub


lucasbru merged PR #15742:
URL: https://github.com/apache/kafka/pull/15742


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16298: Ensure rebalance listener exceptions are propagated to the user on consumer poll [kafka]

2024-04-25 Thread via GitHub


kirktrue commented on code in PR #15742:
URL: https://github.com/apache/kafka/pull/15742#discussion_r1579868091


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -228,13 +228,16 @@ private void process(final ErrorEvent event) {
 }
 
 private void process(final 
ConsumerRebalanceListenerCallbackNeededEvent event) {
-ApplicationEvent invokedEvent = invokeRebalanceCallbacks(
+ConsumerRebalanceListenerCallbackCompletedEvent invokedEvent = 
invokeRebalanceCallbacks(
 rebalanceListenerInvoker,
 event.methodName(),
 event.partitions(),
 event.future()
 );
 applicationEventHandler.add(invokedEvent);
+if (invokedEvent.error().isPresent()) {
+throw invokedEvent.error().get();

Review Comment:
   Sorry for the delay!
   
   Yes, I'm in agreement with the perspectives you and @lianetm stated. No 
qualms from me 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16298: Ensure rebalance listener exceptions are propagated to the user on consumer poll [kafka]

2024-04-22 Thread via GitHub


lucasbru commented on code in PR #15742:
URL: https://github.com/apache/kafka/pull/15742#discussion_r1574581701


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -228,13 +228,16 @@ private void process(final ErrorEvent event) {
 }
 
 private void process(final 
ConsumerRebalanceListenerCallbackNeededEvent event) {
-ApplicationEvent invokedEvent = invokeRebalanceCallbacks(
+ConsumerRebalanceListenerCallbackCompletedEvent invokedEvent = 
invokeRebalanceCallbacks(
 rebalanceListenerInvoker,
 event.methodName(),
 event.partitions(),
 event.future()
 );
 applicationEventHandler.add(invokedEvent);
+if (invokedEvent.error().isPresent()) {
+throw invokedEvent.error().get();

Review Comment:
   @kirktrue  let me know if you are okay with this, otherwise maybe we can 
discuss it on Thursday



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16298: Ensure rebalance listener exceptions are propagated to the user on consumer poll [kafka]

2024-04-22 Thread via GitHub


lucasbru commented on code in PR #15742:
URL: https://github.com/apache/kafka/pull/15742#discussion_r1574580739


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##
@@ -1333,32 +1342,41 @@ public void 
testListenerCallbacksInvoke(List listenerCallbacksInvokeSource() {
 Optional empty = Optional.empty();
 Optional error = Optional.of(new 
RuntimeException("Intentional error"));
+Optional kafkaException = Optional.of(new 
KafkaException("Intentional error"));
+Optional wrappedException = Optional.of(new 
KafkaException("User rebalance callback throws an error", error.get()));
 
 return Stream.of(
 // Tests if we don't have an event, the listener doesn't get 
called.
-Arguments.of(Collections.emptyList(), empty, empty, empty, 0, 0, 
0),
+Arguments.of(Collections.emptyList(), empty, empty, empty, 0, 0, 
0, empty),
 
 // Tests if we get an event for a revocation, that we invoke our 
listener.
-Arguments.of(Collections.singletonList(ON_PARTITIONS_REVOKED), 
empty, empty, empty, 1, 0, 0),
+Arguments.of(Collections.singletonList(ON_PARTITIONS_REVOKED), 
empty, empty, empty, 1, 0, 0, empty),
 
 // Tests if we get an event for an assignment, that we invoke our 
listener.
-Arguments.of(Collections.singletonList(ON_PARTITIONS_ASSIGNED), 
empty, empty, empty, 0, 1, 0),
+Arguments.of(Collections.singletonList(ON_PARTITIONS_ASSIGNED), 
empty, empty, empty, 0, 1, 0, empty),
 
 // Tests that we invoke our listener even if it encounters an 
exception.
-Arguments.of(Collections.singletonList(ON_PARTITIONS_LOST), empty, 
empty, empty, 0, 0, 1),
+Arguments.of(Collections.singletonList(ON_PARTITIONS_LOST), empty, 
empty, empty, 0, 0, 1, empty),
 
 // Tests that we invoke our listener even if it encounters an 
exception.
-Arguments.of(Collections.singletonList(ON_PARTITIONS_REVOKED), 
error, empty, empty, 1, 0, 0),
+Arguments.of(Collections.singletonList(ON_PARTITIONS_REVOKED), 
error, empty, empty, 1, 0, 0, wrappedException),
 
 // Tests that we invoke our listener even if it encounters an 
exception.
-Arguments.of(Collections.singletonList(ON_PARTITIONS_ASSIGNED), 
empty, error, empty, 0, 1, 0),
+Arguments.of(Collections.singletonList(ON_PARTITIONS_ASSIGNED), 
empty, error, empty, 0, 1, 0, wrappedException),
 
 // Tests that we invoke our listener even if it encounters an 
exception.
-Arguments.of(Collections.singletonList(ON_PARTITIONS_LOST), empty, 
empty, error, 0, 0, 1),
+Arguments.of(Collections.singletonList(ON_PARTITIONS_LOST), empty, 
empty, error, 0, 0, 1, wrappedException),
+
+// Tests that we invoke our listener even if it encounters an 
exception. Special case to test that a kafka exception is not wrapped.
+Arguments.of(Collections.singletonList(ON_PARTITIONS_LOST), empty, 
empty, kafkaException, 0, 0, 1, kafkaException),

Review Comment:
   I think I did not add them because I didn't think they'd add interesting 
coverage. If you think we need it, let me add it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16298: Ensure rebalance listener exceptions are propagated to the user on consumer poll [kafka]

2024-04-22 Thread via GitHub


cadonna commented on code in PR #15742:
URL: https://github.com/apache/kafka/pull/15742#discussion_r1574508879


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##
@@ -1333,32 +1342,41 @@ public void 
testListenerCallbacksInvoke(List listenerCallbacksInvokeSource() {
 Optional empty = Optional.empty();
 Optional error = Optional.of(new 
RuntimeException("Intentional error"));
+Optional kafkaException = Optional.of(new 
KafkaException("Intentional error"));
+Optional wrappedException = Optional.of(new 
KafkaException("User rebalance callback throws an error", error.get()));
 
 return Stream.of(
 // Tests if we don't have an event, the listener doesn't get 
called.
-Arguments.of(Collections.emptyList(), empty, empty, empty, 0, 0, 
0),
+Arguments.of(Collections.emptyList(), empty, empty, empty, 0, 0, 
0, empty),
 
 // Tests if we get an event for a revocation, that we invoke our 
listener.
-Arguments.of(Collections.singletonList(ON_PARTITIONS_REVOKED), 
empty, empty, empty, 1, 0, 0),
+Arguments.of(Collections.singletonList(ON_PARTITIONS_REVOKED), 
empty, empty, empty, 1, 0, 0, empty),
 
 // Tests if we get an event for an assignment, that we invoke our 
listener.
-Arguments.of(Collections.singletonList(ON_PARTITIONS_ASSIGNED), 
empty, empty, empty, 0, 1, 0),
+Arguments.of(Collections.singletonList(ON_PARTITIONS_ASSIGNED), 
empty, empty, empty, 0, 1, 0, empty),
 
 // Tests that we invoke our listener even if it encounters an 
exception.
-Arguments.of(Collections.singletonList(ON_PARTITIONS_LOST), empty, 
empty, empty, 0, 0, 1),
+Arguments.of(Collections.singletonList(ON_PARTITIONS_LOST), empty, 
empty, empty, 0, 0, 1, empty),
 
 // Tests that we invoke our listener even if it encounters an 
exception.
-Arguments.of(Collections.singletonList(ON_PARTITIONS_REVOKED), 
error, empty, empty, 1, 0, 0),
+Arguments.of(Collections.singletonList(ON_PARTITIONS_REVOKED), 
error, empty, empty, 1, 0, 0, wrappedException),
 
 // Tests that we invoke our listener even if it encounters an 
exception.
-Arguments.of(Collections.singletonList(ON_PARTITIONS_ASSIGNED), 
empty, error, empty, 0, 1, 0),
+Arguments.of(Collections.singletonList(ON_PARTITIONS_ASSIGNED), 
empty, error, empty, 0, 1, 0, wrappedException),
 
 // Tests that we invoke our listener even if it encounters an 
exception.
-Arguments.of(Collections.singletonList(ON_PARTITIONS_LOST), empty, 
empty, error, 0, 0, 1),
+Arguments.of(Collections.singletonList(ON_PARTITIONS_LOST), empty, 
empty, error, 0, 0, 1, wrappedException),
+
+// Tests that we invoke our listener even if it encounters an 
exception. Special case to test that a kafka exception is not wrapped.
+Arguments.of(Collections.singletonList(ON_PARTITIONS_LOST), empty, 
empty, kafkaException, 0, 0, 1, kafkaException),

Review Comment:
   Don't you need to repeat this test also for partition assigned and partition 
revoked?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16298: Ensure rebalance listener exceptions are propagated to the user on consumer poll [kafka]

2024-04-18 Thread via GitHub


lianetm commented on code in PR #15742:
URL: https://github.com/apache/kafka/pull/15742#discussion_r1570871801


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -228,13 +228,16 @@ private void process(final ErrorEvent event) {
 }
 
 private void process(final 
ConsumerRebalanceListenerCallbackNeededEvent event) {
-ApplicationEvent invokedEvent = invokeRebalanceCallbacks(
+ConsumerRebalanceListenerCallbackCompletedEvent invokedEvent = 
invokeRebalanceCallbacks(
 rebalanceListenerInvoker,
 event.methodName(),
 event.partitions(),
 event.future()
 );
 applicationEventHandler.add(invokedEvent);
+if (invokedEvent.error().isPresent()) {
+throw invokedEvent.error().get();

Review Comment:
   Kind of aligned with @lucasbru here. I totally get your concern @kirktrue , 
but as I see it we're in a very different territory, not only with the new 
consumer architecture (all that @lucasbru described), but also with the new 
protocol (which is the only one supported here), so I lean towards keeping it 
simple as an initial approach, based on how we expect things to happen in 
practice here. With the new protocol, we get revocations first, and then new 
partitions in a following reconciliation loop. If revocation callback fails, 
the reconciliation will continue to be retried on the next poll loop, 
triggering callbacks continuously (that's what will be happening in the 
background). At the same time, in the foreground, we'll be raising the 
revocation callback failure to the user (with this PR). 
   
   > But after a listener execution has failed, we don't seem to update the 
subscription state in the reconciliation.
   
   Agree, just for the record, that holds true for the listeners of partitions 
revoked and lost (subscription state is only updated when the callbacks 
complete). In the case of assigned partitions, the subscription is updated 
before the callback, just aligning with the onPartitionsAssigned contract, 
which is that it is called when the rebalance completes



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16298: Ensure rebalance listener exceptions are propagated to the user on consumer poll [kafka]

2024-04-18 Thread via GitHub


lucasbru commented on code in PR #15742:
URL: https://github.com/apache/kafka/pull/15742#discussion_r1570342805


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -228,13 +228,16 @@ private void process(final ErrorEvent event) {
 }
 
 private void process(final 
ConsumerRebalanceListenerCallbackNeededEvent event) {
-ApplicationEvent invokedEvent = invokeRebalanceCallbacks(
+ConsumerRebalanceListenerCallbackCompletedEvent invokedEvent = 
invokeRebalanceCallbacks(
 rebalanceListenerInvoker,
 event.methodName(),
 event.partitions(),
 event.future()
 );
 applicationEventHandler.add(invokedEvent);
+if (invokedEvent.error().isPresent()) {
+throw invokedEvent.error().get();

Review Comment:
   Looking at the reconciliation logic, I think if `onPartitionsRevoked` 
throws, we'll not execute `onPartitionsAssigned`. And the call to 
`onPartitionsLost` seems to be independent of reconciliation. So not sure how 
we'd end up with two exceptions.
   
   You are right that there is a behavioral difference around finishing the 
reconciliation. The old consumer throws _after_ finishing the reconciliation, 
while the new consumer throws on a different thread, so there is no strict time 
ordering between finishing the reconciliation and throwing. But I'm struggling 
to see how one can observe the difference. The reconciliation will have 
finished the next time the background thread processes any events, so in a 
sense, you cannot observe the difference based on the queue architecture. The 
difference may only be observable through shared state that breaks the 
queue-based architecture. SubscriptionState comes to mind here. Thinking of 
something like
   
   1. application thread enters poll, fails during rebalance listener execution 
and throws
   2. application thread somehow reads subscription state
   3. background thread updates subscription state as part of reconciliation
   
   Now the application thread has observed an "incomplete reconciliation". But 
after a listener execution has failed, we don't seem to update the subscription 
state in the reconciliation. 
   
   So in summary - not sure if we are going to notice the different behaviors?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16298: Ensure rebalance listener exceptions are propagated to the user on consumer poll [kafka]

2024-04-17 Thread via GitHub


kirktrue commented on code in PR #15742:
URL: https://github.com/apache/kafka/pull/15742#discussion_r1569217494


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -228,13 +228,16 @@ private void process(final ErrorEvent event) {
 }
 
 private void process(final 
ConsumerRebalanceListenerCallbackNeededEvent event) {
-ApplicationEvent invokedEvent = invokeRebalanceCallbacks(
+ConsumerRebalanceListenerCallbackCompletedEvent invokedEvent = 
invokeRebalanceCallbacks(
 rebalanceListenerInvoker,
 event.methodName(),
 event.partitions(),
 event.future()
 );
 applicationEventHandler.add(invokedEvent);
+if (invokedEvent.error().isPresent()) {
+throw invokedEvent.error().get();

Review Comment:
   `ConsumerRebalanceListenerCallbackNeededEvent` handles 'assign', 'revoke', 
and 'lose' callbacks. It was my understanding—I could be wrong—that we wanted 
to _wait_ to throw the exception after the reconciliation was fully processed. 
That is, not necessarily on the first callback 樂



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -228,13 +228,16 @@ private void process(final ErrorEvent event) {
 }
 
 private void process(final 
ConsumerRebalanceListenerCallbackNeededEvent event) {
-ApplicationEvent invokedEvent = invokeRebalanceCallbacks(
+ConsumerRebalanceListenerCallbackCompletedEvent invokedEvent = 
invokeRebalanceCallbacks(
 rebalanceListenerInvoker,
 event.methodName(),
 event.partitions(),
 event.future()
 );
 applicationEventHandler.add(invokedEvent);
+if (invokedEvent.error().isPresent()) {
+throw invokedEvent.error().get();

Review Comment:
   However, this implementation has the interesting property that it will both 
throw the exception _and_ continue processing. It seems like this could 
potentially yield **two** exceptions, if, say, both the `onPartitionsRevoked()` 
_and_ `onPartitionsAssigned()` threw exceptions. Is that the intent?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16298: Ensure rebalance listener exceptions are propagated to the user on consumer poll [kafka]

2024-04-17 Thread via GitHub


lianetm commented on code in PR #15742:
URL: https://github.com/apache/kafka/pull/15742#discussion_r1568991555


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -228,13 +228,16 @@ private void process(final ErrorEvent event) {
 }
 
 private void process(final 
ConsumerRebalanceListenerCallbackNeededEvent event) {
-ApplicationEvent invokedEvent = invokeRebalanceCallbacks(
+ConsumerRebalanceListenerCallbackCompletedEvent invokedEvent = 
invokeRebalanceCallbacks(
 rebalanceListenerInvoker,
 event.methodName(),
 event.partitions(),
 event.future()
 );
 applicationEventHandler.add(invokedEvent);
+if (invokedEvent.error().isPresent()) {
+throw invokedEvent.error().get();

Review Comment:
   I was expecting to see here the logic for wrapping the callback error into a 
`KafkaException`, but I see it is at a lower level in the 
`invokeRebalanceCallbacks`, which it's a bit more obfuscated I would say? Still 
I see how it's deeply tied to the 
`ConsumerRebalanceListenerCallbackCompletedEvent` so ok for me to leave as it 
is if we feel it's clear enough. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16298: Ensure rebalance listener exceptions are propagated to the user on consumer poll [kafka]

2024-04-17 Thread via GitHub


lianetm commented on code in PR #15742:
URL: https://github.com/apache/kafka/pull/15742#discussion_r1568972567


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##
@@ -1333,32 +1342,41 @@ public void 
testListenerCallbacksInvoke(List listenerCallbacksInvokeSource() {
 Optional empty = Optional.empty();
 Optional error = Optional.of(new 
RuntimeException("Intentional error"));
+Optional kafkaException = Optional.of(new 
KafkaException("Intentional error"));
+Optional wrappedException = Optional.of(new 
KafkaException("User rebalance callback throws an error", error.get()));
 
 return Stream.of(
 // Tests if we don't have an event, the listener doesn't get 
called.
-Arguments.of(Collections.emptyList(), empty, empty, empty, 0, 0, 
0),
+Arguments.of(Collections.emptyList(), empty, empty, empty, 0, 0, 
0, empty),
 
 // Tests if we get an event for a revocation, that we invoke our 
listener.
-Arguments.of(Collections.singletonList(ON_PARTITIONS_REVOKED), 
empty, empty, empty, 1, 0, 0),
+Arguments.of(Collections.singletonList(ON_PARTITIONS_REVOKED), 
empty, empty, empty, 1, 0, 0, empty),
 
 // Tests if we get an event for an assignment, that we invoke our 
listener.
-Arguments.of(Collections.singletonList(ON_PARTITIONS_ASSIGNED), 
empty, empty, empty, 0, 1, 0),
+Arguments.of(Collections.singletonList(ON_PARTITIONS_ASSIGNED), 
empty, empty, empty, 0, 1, 0, empty),
 
 // Tests that we invoke our listener even if it encounters an 
exception.
-Arguments.of(Collections.singletonList(ON_PARTITIONS_LOST), empty, 
empty, empty, 0, 0, 1),
+Arguments.of(Collections.singletonList(ON_PARTITIONS_LOST), empty, 
empty, empty, 0, 0, 1, empty),
 
 // Tests that we invoke our listener even if it encounters an 
exception.
-Arguments.of(Collections.singletonList(ON_PARTITIONS_REVOKED), 
error, empty, empty, 1, 0, 0),
+Arguments.of(Collections.singletonList(ON_PARTITIONS_REVOKED), 
error, empty, empty, 1, 0, 0, wrappedException),
 
 // Tests that we invoke our listener even if it encounters an 
exception.
-Arguments.of(Collections.singletonList(ON_PARTITIONS_ASSIGNED), 
empty, error, empty, 0, 1, 0),
+Arguments.of(Collections.singletonList(ON_PARTITIONS_ASSIGNED), 
empty, error, empty, 0, 1, 0, wrappedException),
 
 // Tests that we invoke our listener even if it encounters an 
exception.
-Arguments.of(Collections.singletonList(ON_PARTITIONS_LOST), empty, 
empty, error, 0, 0, 1),
+Arguments.of(Collections.singletonList(ON_PARTITIONS_LOST), empty, 
empty, error, 0, 0, 1, wrappedException),
+
+// Tests that we invoke our listener even if it encounters an 
exception. Special case to test that a kafka exception is not wrapped.
+Arguments.of(Collections.singletonList(ON_PARTITIONS_LOST), empty, 
empty, kafkaException, 0, 0, 1, kafkaException),

Review Comment:
   Nice addition, indeed part of what the legacy logic does for not wrapping a 
KafkaException  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16298: Ensure rebalance listener exceptions are propagated to the user on consumer poll [kafka]

2024-04-17 Thread via GitHub


lucasbru commented on PR #15742:
URL: https://github.com/apache/kafka/pull/15742#issuecomment-2060865659

   @kirktrue @lianetm could you have a look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16298: Ensure rebalance listener exceptions are propagated to the user on consumer poll [kafka]

2024-04-17 Thread via GitHub


lucasbru opened a new pull request, #15742:
URL: https://github.com/apache/kafka/pull/15742

   When user-defined rebalance listeners fail with an exception, the 
expectation is that the error should be propagated to the user as a 
KafkaExpception and break the poll loop (behaviour in the legacy coordinator). 
The new consumer executes callbacks in the application thread, and sends an 
event to the background with the callback result and error if any, [passing the 
error along with the event 
here](https://github.com/apache/kafka/blob/98a658f871fc2c533b16fb5fd567a5ceb1c340b7/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1882)
 to the background thread, but does not seem to propagate the exception to the 
user.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org