Re: [PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll … [kafka]
appchemist commented on code in PR #15961: URL: https://github.com/apache/kafka/pull/15961#discussion_r1602392678 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1607,6 +1607,7 @@ private Fetch pollForFetches(Timer timer) { wakeupTrigger.clearTask(); } +metadata.maybeThrowAnyException(); Review Comment: Thank you for review. @lianetm I learned a lot from it. It seems that we can solve the wiring issue using `MetadataUpdater` in this case. What do you think? I need more time for this issue and I don't think we have a lot of time left for this issue because it's a blocker. so it's fine if you hand it off to someone else. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll … [kafka]
appchemist commented on PR #15961: URL: https://github.com/apache/kafka/pull/15961#issuecomment-2113684883 Hi @lianetm Thanks for review! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll … [kafka]
appchemist commented on code in PR #15961: URL: https://github.com/apache/kafka/pull/15961#discussion_r1602392678 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1607,6 +1607,7 @@ private Fetch pollForFetches(Timer timer) { wakeupTrigger.clearTask(); } +metadata.maybeThrowAnyException(); Review Comment: Thank you for review. @lianetm I learned a lot from it. It seems that we can solve the wiring issue using MetadataUpdater in this case. What do you think? I need more time for this issue and I don't think we have a lot of time left for this issue because it's a blocker. so it's fine if you hand it off to someone else. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll … [kafka]
lianetm commented on PR #15961: URL: https://github.com/apache/kafka/pull/15961#issuecomment-2113054675 Hey @appchemist , thanks a lot for taking this one! I left some comments, mainly concerns about the approach, this is definitely a tricky bit. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll … [kafka]
lianetm commented on code in PR #15961: URL: https://github.com/apache/kafka/pull/15961#discussion_r1601985896 ## clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java: ## @@ -2974,7 +2974,7 @@ public void testSubscriptionOnInvalidTopic(GroupProtocol groupProtocol) { KafkaConsumer consumer = newConsumer(groupProtocol, time, client, subscription, metadata, assignor, true, groupInstanceId); consumer.subscribe(singleton(invalidTopicName), getConsumerRebalanceListener(consumer)); -assertThrows(InvalidTopicException.class, () -> consumer.poll(Duration.ZERO)); +assertThrows(InvalidTopicException.class, () -> consumer.poll(Duration.ofMillis(100))); Review Comment: I'm guessing you had to change the poll timeout here to make the test pass? Definitely a sign that we have some issues still. We should be able to keep the test as it was, passing with poll(0), as it does for the legacy consumer. I would suggest to address my concern above, and then review this to know if there is something else making the test fail with poll(0) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll … [kafka]
lianetm commented on code in PR #15961: URL: https://github.com/apache/kafka/pull/15961#discussion_r1601976218 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1607,6 +1607,7 @@ private Fetch pollForFetches(Timer timer) { wakeupTrigger.clearTask(); } +metadata.maybeThrowAnyException(); Review Comment: uhm I would say that having this here, in the app thread, is not right, since the metadata updates happen in the background thread, so we could easily get a race condition I would expect. The metadata object is updated in the [NetworkClient](https://github.com/apache/kafka/blob/bb3ff0f84a0a4c6d213fbe50eb2c3d3a8cdecdf0/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L1193), when a response to a metadata request is received. So only at that point in the background thread is that may accurately know that there is an invalid topic. I imagine we should then propagate an `ErrorEvent` to the app thread, that would be raised in the poll loop [here](https://github.com/apache/kafka/blob/bb3ff0f84a0a4c6d213fbe50eb2c3d3a8cdecdf0/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L213)) Seen from a conceptual POV, we have something that could wrong in the background (invalid topic received in metadata response), and we want to throw an error in the foreground, so the pattern would be via events. Similar probably to what is done to raise errors that may happen while resetting positions in the background thread, that are propagated to the app thread [here](https://github.com/apache/kafka/blob/bb3ff0f84a0a4c6d213fbe50eb2c3d3a8cdecdf0/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java#L202). Difference in this new case is that the failure is detected deep down the network client (not in the request managers layer), so if we follow this approach, we would need some wiring there. Just thinking out loud, this is definitely a tricky missing bit -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org