nileshkumar3 commented on code in PR #21800:
URL: https://github.com/apache/kafka/pull/21800#discussion_r2959890072
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##########
@@ -1677,6 +1679,53 @@ public void testLongPollWaitIsLimited() {
assertEquals(partitions, consumer.assignment());
}
+ /**
+ * Verifies that at most one {@link AsyncPollEvent} is in-flight at a
time. When {@code poll()} runs
+ * multiple loop iterations (e.g. empty fetches), it must not enqueue a
new event while the previous
+ * one is still in-flight. This prevents unnecessary queueing and matches
the single-instance design
+ * used in share consumer (KAFKA-20309). See KAFKA-20315.
+ */
+ @Test
+ public void testPollDoesNotAddNewAsyncPollEventWhenOneIsAlreadyInFlight() {
+ // Build a consumer with a mocked FetchBuffer so we can
deterministically advance MockTime and avoid
+ // tight spinning while poll() waits.
+ FetchBuffer fetchBuffer = mock(FetchBuffer.class);
+ ConsumerInterceptors<String, String> interceptors =
mock(ConsumerInterceptors.class);
+ ConsumerRebalanceListenerInvoker rebalanceListenerInvoker =
mock(ConsumerRebalanceListenerInvoker.class);
+ SubscriptionState subscriptions = new SubscriptionState(new
LogContext(), AutoOffsetResetStrategy.NONE);
+ consumer = newConsumer(fetchBuffer, interceptors,
rebalanceListenerInvoker, subscriptions);
+
+ final String topicName = "topic1";
+ final TopicPartition tp = new TopicPartition(topicName, 0);
+
+ // Satisfy poll() preconditions without needing assign() (which would
require stubbing addAndGet()).
+ subscriptions.assignFromUser(singleton(tp));
+ subscriptions.seek(tp, 0);
+
+ // Make pollForFetches() "wait" by advancing mock time.
+ doReturn(100L).when(applicationEventHandler).maximumTimeToWait();
+ doAnswer(invocation -> {
+ Timer pollTimer = invocation.getArgument(0, Timer.class);
+ ((MockTime) time).sleep(pollTimer.remainingMs());
+ return null;
+ }).when(fetchBuffer).awaitWakeup(any(Timer.class));
+
+
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
+
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
+
+ // Leave AsyncPollEvent in-flight (do not complete it) so the next
loop iteration sees inflightPoll != null
+ doAnswer(invocation ->
null).when(applicationEventHandler).add(ArgumentMatchers.isA(AsyncPollEvent.class));
+
+ ConsumerRecords<?, ?> result = consumer.poll(Duration.ofMillis(450));
Review Comment:
In this test we cap per-iteration wait to 100ms (maximumTimeToWait()), and
awaitWakeup(timer) advances MockTime by pollTimer.remainingMs(). A 450ms poll
budget reliably forces multiple loop iterations (roughly 100 + 100 + 100 + 100
+ ~50) while avoiding exact-boundary timing, which lets us validate the core
behavior: even across repeated iterations with empty fetches, only one
AsyncPollEvent is enqueued while one is already in-flight (times(1)).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]