Re: [PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll … [kafka]

2024-05-15 Thread via GitHub


appchemist commented on code in PR #15961:
URL: https://github.com/apache/kafka/pull/15961#discussion_r1602392678


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1607,6 +1607,7 @@ private Fetch pollForFetches(Timer timer) {
 wakeupTrigger.clearTask();
 }
 
+metadata.maybeThrowAnyException();

Review Comment:
   Thank you for review. @lianetm 
   I learned a lot from it.
   
   It seems that we can solve the wiring issue using `MetadataUpdater` in this 
case.
   What do you think?
   
   I need more time for this issue and I don't think we have a lot of time left 
for this issue because it's a blocker.
   so it's fine if you hand it off to someone else.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll … [kafka]

2024-05-15 Thread via GitHub


appchemist commented on PR #15961:
URL: https://github.com/apache/kafka/pull/15961#issuecomment-2113684883

   Hi @lianetm 
   Thanks for review!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll … [kafka]

2024-05-15 Thread via GitHub


appchemist commented on code in PR #15961:
URL: https://github.com/apache/kafka/pull/15961#discussion_r1602392678


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1607,6 +1607,7 @@ private Fetch pollForFetches(Timer timer) {
 wakeupTrigger.clearTask();
 }
 
+metadata.maybeThrowAnyException();

Review Comment:
   Thank you for review. @lianetm 
   I learned a lot from it.
   
   It seems that we can solve the wiring issue using MetadataUpdater in this 
case.
   What do you think?
   
   I need more time for this issue and I don't think we have a lot of time left 
for this issue because it's a blocker.
   so it's fine if you hand it off to someone else.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll … [kafka]

2024-05-15 Thread via GitHub


lianetm commented on PR #15961:
URL: https://github.com/apache/kafka/pull/15961#issuecomment-2113054675

   Hey @appchemist , thanks a lot for taking this one! I left some comments, 
mainly concerns about the approach, this is definitely a tricky bit. Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll … [kafka]

2024-05-15 Thread via GitHub


lianetm commented on code in PR #15961:
URL: https://github.com/apache/kafka/pull/15961#discussion_r1601985896


##
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java:
##
@@ -2974,7 +2974,7 @@ public void testSubscriptionOnInvalidTopic(GroupProtocol 
groupProtocol) {
 KafkaConsumer consumer = newConsumer(groupProtocol, 
time, client, subscription, metadata, assignor, true, groupInstanceId);
 consumer.subscribe(singleton(invalidTopicName), 
getConsumerRebalanceListener(consumer));
 
-assertThrows(InvalidTopicException.class, () -> 
consumer.poll(Duration.ZERO));
+assertThrows(InvalidTopicException.class, () -> 
consumer.poll(Duration.ofMillis(100)));

Review Comment:
   I'm guessing you had to change the poll timeout here to make the test pass? 
Definitely a sign that we have some issues still. We should be able to keep the 
test as it was, passing with poll(0), as it does for the legacy consumer. I 
would suggest to address my concern above, and then review this to know if 
there is something else making the test fail with poll(0)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll … [kafka]

2024-05-15 Thread via GitHub


lianetm commented on code in PR #15961:
URL: https://github.com/apache/kafka/pull/15961#discussion_r1601976218


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1607,6 +1607,7 @@ private Fetch pollForFetches(Timer timer) {
 wakeupTrigger.clearTask();
 }
 
+metadata.maybeThrowAnyException();

Review Comment:
   uhm I would say that having this here, in the app thread, is not right, 
since the metadata updates happen in the background thread, so we could easily 
get a race condition I would expect. The metadata object is updated in the 
[NetworkClient](https://github.com/apache/kafka/blob/bb3ff0f84a0a4c6d213fbe50eb2c3d3a8cdecdf0/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L1193),
 when a response to a metadata request is received. So only at that point in 
the background thread is that may accurately know that there is an invalid 
topic. I imagine we should then propagate an `ErrorEvent` to the app thread, 
that would be raised in the poll loop 
[here](https://github.com/apache/kafka/blob/bb3ff0f84a0a4c6d213fbe50eb2c3d3a8cdecdf0/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L213))
   
   Seen from a conceptual POV, we have something that could wrong in the 
background (invalid topic received in metadata response), and we want to throw 
an error in the foreground, so the pattern would be via events. Similar 
probably to what is done to raise errors that may happen while resetting 
positions in the background thread, that are propagated to the app thread 
[here](https://github.com/apache/kafka/blob/bb3ff0f84a0a4c6d213fbe50eb2c3d3a8cdecdf0/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java#L202).
 Difference in this new case is that the failure is detected deep down the 
network client (not in the request managers layer), so if we follow this 
approach, we would need some wiring there. Just thinking out loud, this is 
definitely a tricky missing bit



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org