lianetm commented on code in PR #20521:
URL: https://github.com/apache/kafka/pull/20521#discussion_r2455130962
##########
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java:
##########
@@ -2686,12 +2686,11 @@ public void testCurrentLag(GroupProtocol groupProtocol)
throws InterruptedExcept
// poll once again, which should send the list-offset request
consumer.seek(tp0, 50L);
consumer.poll(Duration.ofMillis(0));
- // requests: list-offset, fetch
- TestUtils.waitForCondition(() -> {
- boolean hasListOffsetRequest = requestGenerated(client,
ApiKeys.LIST_OFFSETS);
- boolean hasFetchRequest = requestGenerated(client, ApiKeys.FETCH);
- return hasListOffsetRequest && hasFetchRequest;
- }, "No list-offset & fetch request sent");
Review Comment:
> There's still a LIST_OFFSETS RPC that's sent asynchronously, but it
doesn't block the FETCH.
This is the bit I'm seeing differently (and I'm concerned about). We are
indeed blocking sending FETCH until we get positions, right?
https://github.com/kirktrue/kafka/blob/3588239637c02f48812c7ed01ae9b1fe0e9b4d76/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java#L740
And that is fine within a single call to poll if we don't have positions.
But if the consumer gets the position manually (seek), the following call to
poll should send a FETCH right away (and the test showed we didn't, we always
wait for the previous inflightPoll to finish before sending a next one which is
the only one that will realize that we have positions already). Could be
missing something, wdyt?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]