Re: [PR] KAFKA-16298: Ensure rebalance listener exceptions are propagated to the user on consumer poll [kafka]
lucasbru merged PR #15742: URL: https://github.com/apache/kafka/pull/15742 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16298: Ensure rebalance listener exceptions are propagated to the user on consumer poll [kafka]
kirktrue commented on code in PR #15742: URL: https://github.com/apache/kafka/pull/15742#discussion_r1579868091 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -228,13 +228,16 @@ private void process(final ErrorEvent event) { } private void process(final ConsumerRebalanceListenerCallbackNeededEvent event) { -ApplicationEvent invokedEvent = invokeRebalanceCallbacks( +ConsumerRebalanceListenerCallbackCompletedEvent invokedEvent = invokeRebalanceCallbacks( rebalanceListenerInvoker, event.methodName(), event.partitions(), event.future() ); applicationEventHandler.add(invokedEvent); +if (invokedEvent.error().isPresent()) { +throw invokedEvent.error().get(); Review Comment: Sorry for the delay! Yes, I'm in agreement with the perspectives you and @lianetm stated. No qualms from me -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16298: Ensure rebalance listener exceptions are propagated to the user on consumer poll [kafka]
lucasbru commented on code in PR #15742: URL: https://github.com/apache/kafka/pull/15742#discussion_r1574581701 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -228,13 +228,16 @@ private void process(final ErrorEvent event) { } private void process(final ConsumerRebalanceListenerCallbackNeededEvent event) { -ApplicationEvent invokedEvent = invokeRebalanceCallbacks( +ConsumerRebalanceListenerCallbackCompletedEvent invokedEvent = invokeRebalanceCallbacks( rebalanceListenerInvoker, event.methodName(), event.partitions(), event.future() ); applicationEventHandler.add(invokedEvent); +if (invokedEvent.error().isPresent()) { +throw invokedEvent.error().get(); Review Comment: @kirktrue let me know if you are okay with this, otherwise maybe we can discuss it on Thursday -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16298: Ensure rebalance listener exceptions are propagated to the user on consumer poll [kafka]
lucasbru commented on code in PR #15742: URL: https://github.com/apache/kafka/pull/15742#discussion_r1574580739 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java: ## @@ -1333,32 +1342,41 @@ public void testListenerCallbacksInvoke(List listenerCallbacksInvokeSource() { Optional empty = Optional.empty(); Optional error = Optional.of(new RuntimeException("Intentional error")); +Optional kafkaException = Optional.of(new KafkaException("Intentional error")); +Optional wrappedException = Optional.of(new KafkaException("User rebalance callback throws an error", error.get())); return Stream.of( // Tests if we don't have an event, the listener doesn't get called. -Arguments.of(Collections.emptyList(), empty, empty, empty, 0, 0, 0), +Arguments.of(Collections.emptyList(), empty, empty, empty, 0, 0, 0, empty), // Tests if we get an event for a revocation, that we invoke our listener. -Arguments.of(Collections.singletonList(ON_PARTITIONS_REVOKED), empty, empty, empty, 1, 0, 0), +Arguments.of(Collections.singletonList(ON_PARTITIONS_REVOKED), empty, empty, empty, 1, 0, 0, empty), // Tests if we get an event for an assignment, that we invoke our listener. -Arguments.of(Collections.singletonList(ON_PARTITIONS_ASSIGNED), empty, empty, empty, 0, 1, 0), +Arguments.of(Collections.singletonList(ON_PARTITIONS_ASSIGNED), empty, empty, empty, 0, 1, 0, empty), // Tests that we invoke our listener even if it encounters an exception. -Arguments.of(Collections.singletonList(ON_PARTITIONS_LOST), empty, empty, empty, 0, 0, 1), +Arguments.of(Collections.singletonList(ON_PARTITIONS_LOST), empty, empty, empty, 0, 0, 1, empty), // Tests that we invoke our listener even if it encounters an exception. -Arguments.of(Collections.singletonList(ON_PARTITIONS_REVOKED), error, empty, empty, 1, 0, 0), +Arguments.of(Collections.singletonList(ON_PARTITIONS_REVOKED), error, empty, empty, 1, 0, 0, wrappedException), // Tests that we invoke our listener even if it encounters an exception. -Arguments.of(Collections.singletonList(ON_PARTITIONS_ASSIGNED), empty, error, empty, 0, 1, 0), +Arguments.of(Collections.singletonList(ON_PARTITIONS_ASSIGNED), empty, error, empty, 0, 1, 0, wrappedException), // Tests that we invoke our listener even if it encounters an exception. -Arguments.of(Collections.singletonList(ON_PARTITIONS_LOST), empty, empty, error, 0, 0, 1), +Arguments.of(Collections.singletonList(ON_PARTITIONS_LOST), empty, empty, error, 0, 0, 1, wrappedException), + +// Tests that we invoke our listener even if it encounters an exception. Special case to test that a kafka exception is not wrapped. +Arguments.of(Collections.singletonList(ON_PARTITIONS_LOST), empty, empty, kafkaException, 0, 0, 1, kafkaException), Review Comment: I think I did not add them because I didn't think they'd add interesting coverage. If you think we need it, let me add it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16298: Ensure rebalance listener exceptions are propagated to the user on consumer poll [kafka]
cadonna commented on code in PR #15742: URL: https://github.com/apache/kafka/pull/15742#discussion_r1574508879 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java: ## @@ -1333,32 +1342,41 @@ public void testListenerCallbacksInvoke(List listenerCallbacksInvokeSource() { Optional empty = Optional.empty(); Optional error = Optional.of(new RuntimeException("Intentional error")); +Optional kafkaException = Optional.of(new KafkaException("Intentional error")); +Optional wrappedException = Optional.of(new KafkaException("User rebalance callback throws an error", error.get())); return Stream.of( // Tests if we don't have an event, the listener doesn't get called. -Arguments.of(Collections.emptyList(), empty, empty, empty, 0, 0, 0), +Arguments.of(Collections.emptyList(), empty, empty, empty, 0, 0, 0, empty), // Tests if we get an event for a revocation, that we invoke our listener. -Arguments.of(Collections.singletonList(ON_PARTITIONS_REVOKED), empty, empty, empty, 1, 0, 0), +Arguments.of(Collections.singletonList(ON_PARTITIONS_REVOKED), empty, empty, empty, 1, 0, 0, empty), // Tests if we get an event for an assignment, that we invoke our listener. -Arguments.of(Collections.singletonList(ON_PARTITIONS_ASSIGNED), empty, empty, empty, 0, 1, 0), +Arguments.of(Collections.singletonList(ON_PARTITIONS_ASSIGNED), empty, empty, empty, 0, 1, 0, empty), // Tests that we invoke our listener even if it encounters an exception. -Arguments.of(Collections.singletonList(ON_PARTITIONS_LOST), empty, empty, empty, 0, 0, 1), +Arguments.of(Collections.singletonList(ON_PARTITIONS_LOST), empty, empty, empty, 0, 0, 1, empty), // Tests that we invoke our listener even if it encounters an exception. -Arguments.of(Collections.singletonList(ON_PARTITIONS_REVOKED), error, empty, empty, 1, 0, 0), +Arguments.of(Collections.singletonList(ON_PARTITIONS_REVOKED), error, empty, empty, 1, 0, 0, wrappedException), // Tests that we invoke our listener even if it encounters an exception. -Arguments.of(Collections.singletonList(ON_PARTITIONS_ASSIGNED), empty, error, empty, 0, 1, 0), +Arguments.of(Collections.singletonList(ON_PARTITIONS_ASSIGNED), empty, error, empty, 0, 1, 0, wrappedException), // Tests that we invoke our listener even if it encounters an exception. -Arguments.of(Collections.singletonList(ON_PARTITIONS_LOST), empty, empty, error, 0, 0, 1), +Arguments.of(Collections.singletonList(ON_PARTITIONS_LOST), empty, empty, error, 0, 0, 1, wrappedException), + +// Tests that we invoke our listener even if it encounters an exception. Special case to test that a kafka exception is not wrapped. +Arguments.of(Collections.singletonList(ON_PARTITIONS_LOST), empty, empty, kafkaException, 0, 0, 1, kafkaException), Review Comment: Don't you need to repeat this test also for partition assigned and partition revoked? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16298: Ensure rebalance listener exceptions are propagated to the user on consumer poll [kafka]
lianetm commented on code in PR #15742: URL: https://github.com/apache/kafka/pull/15742#discussion_r1570871801 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -228,13 +228,16 @@ private void process(final ErrorEvent event) { } private void process(final ConsumerRebalanceListenerCallbackNeededEvent event) { -ApplicationEvent invokedEvent = invokeRebalanceCallbacks( +ConsumerRebalanceListenerCallbackCompletedEvent invokedEvent = invokeRebalanceCallbacks( rebalanceListenerInvoker, event.methodName(), event.partitions(), event.future() ); applicationEventHandler.add(invokedEvent); +if (invokedEvent.error().isPresent()) { +throw invokedEvent.error().get(); Review Comment: Kind of aligned with @lucasbru here. I totally get your concern @kirktrue , but as I see it we're in a very different territory, not only with the new consumer architecture (all that @lucasbru described), but also with the new protocol (which is the only one supported here), so I lean towards keeping it simple as an initial approach, based on how we expect things to happen in practice here. With the new protocol, we get revocations first, and then new partitions in a following reconciliation loop. If revocation callback fails, the reconciliation will continue to be retried on the next poll loop, triggering callbacks continuously (that's what will be happening in the background). At the same time, in the foreground, we'll be raising the revocation callback failure to the user (with this PR). > But after a listener execution has failed, we don't seem to update the subscription state in the reconciliation. Agree, just for the record, that holds true for the listeners of partitions revoked and lost (subscription state is only updated when the callbacks complete). In the case of assigned partitions, the subscription is updated before the callback, just aligning with the onPartitionsAssigned contract, which is that it is called when the rebalance completes -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16298: Ensure rebalance listener exceptions are propagated to the user on consumer poll [kafka]
lucasbru commented on code in PR #15742: URL: https://github.com/apache/kafka/pull/15742#discussion_r1570342805 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -228,13 +228,16 @@ private void process(final ErrorEvent event) { } private void process(final ConsumerRebalanceListenerCallbackNeededEvent event) { -ApplicationEvent invokedEvent = invokeRebalanceCallbacks( +ConsumerRebalanceListenerCallbackCompletedEvent invokedEvent = invokeRebalanceCallbacks( rebalanceListenerInvoker, event.methodName(), event.partitions(), event.future() ); applicationEventHandler.add(invokedEvent); +if (invokedEvent.error().isPresent()) { +throw invokedEvent.error().get(); Review Comment: Looking at the reconciliation logic, I think if `onPartitionsRevoked` throws, we'll not execute `onPartitionsAssigned`. And the call to `onPartitionsLost` seems to be independent of reconciliation. So not sure how we'd end up with two exceptions. You are right that there is a behavioral difference around finishing the reconciliation. The old consumer throws _after_ finishing the reconciliation, while the new consumer throws on a different thread, so there is no strict time ordering between finishing the reconciliation and throwing. But I'm struggling to see how one can observe the difference. The reconciliation will have finished the next time the background thread processes any events, so in a sense, you cannot observe the difference based on the queue architecture. The difference may only be observable through shared state that breaks the queue-based architecture. SubscriptionState comes to mind here. Thinking of something like 1. application thread enters poll, fails during rebalance listener execution and throws 2. application thread somehow reads subscription state 3. background thread updates subscription state as part of reconciliation Now the application thread has observed an "incomplete reconciliation". But after a listener execution has failed, we don't seem to update the subscription state in the reconciliation. So in summary - not sure if we are going to notice the different behaviors? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16298: Ensure rebalance listener exceptions are propagated to the user on consumer poll [kafka]
kirktrue commented on code in PR #15742: URL: https://github.com/apache/kafka/pull/15742#discussion_r1569217494 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -228,13 +228,16 @@ private void process(final ErrorEvent event) { } private void process(final ConsumerRebalanceListenerCallbackNeededEvent event) { -ApplicationEvent invokedEvent = invokeRebalanceCallbacks( +ConsumerRebalanceListenerCallbackCompletedEvent invokedEvent = invokeRebalanceCallbacks( rebalanceListenerInvoker, event.methodName(), event.partitions(), event.future() ); applicationEventHandler.add(invokedEvent); +if (invokedEvent.error().isPresent()) { +throw invokedEvent.error().get(); Review Comment: `ConsumerRebalanceListenerCallbackNeededEvent` handles 'assign', 'revoke', and 'lose' callbacks. It was my understanding—I could be wrong—that we wanted to _wait_ to throw the exception after the reconciliation was fully processed. That is, not necessarily on the first callback 樂 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -228,13 +228,16 @@ private void process(final ErrorEvent event) { } private void process(final ConsumerRebalanceListenerCallbackNeededEvent event) { -ApplicationEvent invokedEvent = invokeRebalanceCallbacks( +ConsumerRebalanceListenerCallbackCompletedEvent invokedEvent = invokeRebalanceCallbacks( rebalanceListenerInvoker, event.methodName(), event.partitions(), event.future() ); applicationEventHandler.add(invokedEvent); +if (invokedEvent.error().isPresent()) { +throw invokedEvent.error().get(); Review Comment: However, this implementation has the interesting property that it will both throw the exception _and_ continue processing. It seems like this could potentially yield **two** exceptions, if, say, both the `onPartitionsRevoked()` _and_ `onPartitionsAssigned()` threw exceptions. Is that the intent? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16298: Ensure rebalance listener exceptions are propagated to the user on consumer poll [kafka]
lianetm commented on code in PR #15742: URL: https://github.com/apache/kafka/pull/15742#discussion_r1568991555 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -228,13 +228,16 @@ private void process(final ErrorEvent event) { } private void process(final ConsumerRebalanceListenerCallbackNeededEvent event) { -ApplicationEvent invokedEvent = invokeRebalanceCallbacks( +ConsumerRebalanceListenerCallbackCompletedEvent invokedEvent = invokeRebalanceCallbacks( rebalanceListenerInvoker, event.methodName(), event.partitions(), event.future() ); applicationEventHandler.add(invokedEvent); +if (invokedEvent.error().isPresent()) { +throw invokedEvent.error().get(); Review Comment: I was expecting to see here the logic for wrapping the callback error into a `KafkaException`, but I see it is at a lower level in the `invokeRebalanceCallbacks`, which it's a bit more obfuscated I would say? Still I see how it's deeply tied to the `ConsumerRebalanceListenerCallbackCompletedEvent` so ok for me to leave as it is if we feel it's clear enough. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16298: Ensure rebalance listener exceptions are propagated to the user on consumer poll [kafka]
lianetm commented on code in PR #15742: URL: https://github.com/apache/kafka/pull/15742#discussion_r1568972567 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java: ## @@ -1333,32 +1342,41 @@ public void testListenerCallbacksInvoke(List listenerCallbacksInvokeSource() { Optional empty = Optional.empty(); Optional error = Optional.of(new RuntimeException("Intentional error")); +Optional kafkaException = Optional.of(new KafkaException("Intentional error")); +Optional wrappedException = Optional.of(new KafkaException("User rebalance callback throws an error", error.get())); return Stream.of( // Tests if we don't have an event, the listener doesn't get called. -Arguments.of(Collections.emptyList(), empty, empty, empty, 0, 0, 0), +Arguments.of(Collections.emptyList(), empty, empty, empty, 0, 0, 0, empty), // Tests if we get an event for a revocation, that we invoke our listener. -Arguments.of(Collections.singletonList(ON_PARTITIONS_REVOKED), empty, empty, empty, 1, 0, 0), +Arguments.of(Collections.singletonList(ON_PARTITIONS_REVOKED), empty, empty, empty, 1, 0, 0, empty), // Tests if we get an event for an assignment, that we invoke our listener. -Arguments.of(Collections.singletonList(ON_PARTITIONS_ASSIGNED), empty, empty, empty, 0, 1, 0), +Arguments.of(Collections.singletonList(ON_PARTITIONS_ASSIGNED), empty, empty, empty, 0, 1, 0, empty), // Tests that we invoke our listener even if it encounters an exception. -Arguments.of(Collections.singletonList(ON_PARTITIONS_LOST), empty, empty, empty, 0, 0, 1), +Arguments.of(Collections.singletonList(ON_PARTITIONS_LOST), empty, empty, empty, 0, 0, 1, empty), // Tests that we invoke our listener even if it encounters an exception. -Arguments.of(Collections.singletonList(ON_PARTITIONS_REVOKED), error, empty, empty, 1, 0, 0), +Arguments.of(Collections.singletonList(ON_PARTITIONS_REVOKED), error, empty, empty, 1, 0, 0, wrappedException), // Tests that we invoke our listener even if it encounters an exception. -Arguments.of(Collections.singletonList(ON_PARTITIONS_ASSIGNED), empty, error, empty, 0, 1, 0), +Arguments.of(Collections.singletonList(ON_PARTITIONS_ASSIGNED), empty, error, empty, 0, 1, 0, wrappedException), // Tests that we invoke our listener even if it encounters an exception. -Arguments.of(Collections.singletonList(ON_PARTITIONS_LOST), empty, empty, error, 0, 0, 1), +Arguments.of(Collections.singletonList(ON_PARTITIONS_LOST), empty, empty, error, 0, 0, 1, wrappedException), + +// Tests that we invoke our listener even if it encounters an exception. Special case to test that a kafka exception is not wrapped. +Arguments.of(Collections.singletonList(ON_PARTITIONS_LOST), empty, empty, kafkaException, 0, 0, 1, kafkaException), Review Comment: Nice addition, indeed part of what the legacy logic does for not wrapping a KafkaException -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16298: Ensure rebalance listener exceptions are propagated to the user on consumer poll [kafka]
lucasbru commented on PR #15742: URL: https://github.com/apache/kafka/pull/15742#issuecomment-2060865659 @kirktrue @lianetm could you have a look? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16298: Ensure rebalance listener exceptions are propagated to the user on consumer poll [kafka]
lucasbru opened a new pull request, #15742: URL: https://github.com/apache/kafka/pull/15742 When user-defined rebalance listeners fail with an exception, the expectation is that the error should be propagated to the user as a KafkaExpception and break the poll loop (behaviour in the legacy coordinator). The new consumer executes callbacks in the application thread, and sends an event to the background with the callback result and error if any, [passing the error along with the event here](https://github.com/apache/kafka/blob/98a658f871fc2c533b16fb5fd567a5ceb1c340b7/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1882) to the background thread, but does not seem to propagate the exception to the user. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org