kirktrue commented on code in PR #20521:
URL: https://github.com/apache/kafka/pull/20521#discussion_r2452798105


##########
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java:
##########
@@ -2686,12 +2686,11 @@ public void testCurrentLag(GroupProtocol groupProtocol) 
throws InterruptedExcept
         // poll once again, which should send the list-offset request
         consumer.seek(tp0, 50L);
         consumer.poll(Duration.ofMillis(0));
-        // requests: list-offset, fetch
-        TestUtils.waitForCondition(() -> {
-            boolean hasListOffsetRequest = requestGenerated(client, 
ApiKeys.LIST_OFFSETS);
-            boolean hasFetchRequest = requestGenerated(client, ApiKeys.FETCH);
-            return hasListOffsetRequest && hasFetchRequest;
-        }, "No list-offset & fetch request sent");

Review Comment:
   You're correct. In this test, `Consumer.seek()` doesn't create the 
`LIST_OFFSETS` RPC, it's `Consumer.currentLag()` API call. `currentLag()` 
enqueues the request, which is later sent on `Consumer.poll()`.
   
   > The difference I'm concerned about is that, with the current state of the 
PR, seems we're delaying sending FETCH in a new way right? (the consumer not 
FETCH-ing as soon as it is polled with a manually set position)
   
   I misspoke: the call to `seek()` _does_ allow the `FETCH` RPC to be sent 
immediately. There's still a `LIST_OFFSETS` RPC that's sent asynchronously, but 
it doesn't block the `FETCH`.
   
   > And that reveals a gap, we're not checking if we have all positions on 
every poll anymore (as we used to before this PR). We're only checking for 
hasAllPositions when we trigger a new inflightPoll (andinflightPoll is not 
triggered on every consumer.poll). Is that check on every poll what we're maybe 
missing?
   
   The reason we keep `inflightPoll` across multiple calls to `poll()` is to 
catch the edge case where both of the following are true:
   
   1. The poll timeout is very short, e.g. 0
   2. The poll generates an error
   
   The short timeout prevents us from reliably arriving at the error within a 
given call to `Consumer.poll()`. In the rest of the cases, the consumer will 
detect that the previous `inflightPoll` is expired and clear it out so that a 
new event can be submitted.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to