lianetm commented on code in PR #20521:
URL: https://github.com/apache/kafka/pull/20521#discussion_r2455130962


##########
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java:
##########
@@ -2686,12 +2686,11 @@ public void testCurrentLag(GroupProtocol groupProtocol) 
throws InterruptedExcept
         // poll once again, which should send the list-offset request
         consumer.seek(tp0, 50L);
         consumer.poll(Duration.ofMillis(0));
-        // requests: list-offset, fetch
-        TestUtils.waitForCondition(() -> {
-            boolean hasListOffsetRequest = requestGenerated(client, 
ApiKeys.LIST_OFFSETS);
-            boolean hasFetchRequest = requestGenerated(client, ApiKeys.FETCH);
-            return hasListOffsetRequest && hasFetchRequest;
-        }, "No list-offset & fetch request sent");

Review Comment:
   > There's still a LIST_OFFSETS RPC that's sent asynchronously, but it 
doesn't block the FETCH.
   
   This is the bit I'm seeing differently (and I'm concerned about). We are 
indeed blocking sending FETCH until we get positions, right?
   
   
https://github.com/kirktrue/kafka/blob/3588239637c02f48812c7ed01ae9b1fe0e9b4d76/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java#L740
   
   And that is fine within a single call to poll if we don't have positions. 
But if the consumer gets the position manually (seek), the following call to 
poll should send a FETCH right away (and the test showed we didn't, we always 
wait for the previous inflightPoll to finish before sending a next one which is 
the only one that will realize that we have positions already). Could be 
missing something, wdyt? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to