nileshkumar3 commented on code in PR #21800:
URL: https://github.com/apache/kafka/pull/21800#discussion_r2959890072


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##########
@@ -1677,6 +1679,53 @@ public void testLongPollWaitIsLimited() {
         assertEquals(partitions, consumer.assignment());
     }
 
+    /**
+     * Verifies that at most one {@link AsyncPollEvent} is in-flight at a 
time. When {@code poll()} runs
+     * multiple loop iterations (e.g. empty fetches), it must not enqueue a 
new event while the previous
+     * one is still in-flight. This prevents unnecessary queueing and matches 
the single-instance design
+     * used in share consumer (KAFKA-20309). See KAFKA-20315.
+     */
+    @Test
+    public void testPollDoesNotAddNewAsyncPollEventWhenOneIsAlreadyInFlight() {
+        // Build a consumer with a mocked FetchBuffer so we can 
deterministically advance MockTime and avoid
+        // tight spinning while poll() waits.
+        FetchBuffer fetchBuffer = mock(FetchBuffer.class);
+        ConsumerInterceptors<String, String> interceptors = 
mock(ConsumerInterceptors.class);
+        ConsumerRebalanceListenerInvoker rebalanceListenerInvoker = 
mock(ConsumerRebalanceListenerInvoker.class);
+        SubscriptionState subscriptions = new SubscriptionState(new 
LogContext(), AutoOffsetResetStrategy.NONE);
+        consumer = newConsumer(fetchBuffer, interceptors, 
rebalanceListenerInvoker, subscriptions);
+
+        final String topicName = "topic1";
+        final TopicPartition tp = new TopicPartition(topicName, 0);
+
+        // Satisfy poll() preconditions without needing assign() (which would 
require stubbing addAndGet()).
+        subscriptions.assignFromUser(singleton(tp));
+        subscriptions.seek(tp, 0);
+
+        // Make pollForFetches() "wait" by advancing mock time.
+        doReturn(100L).when(applicationEventHandler).maximumTimeToWait();
+        doAnswer(invocation -> {
+            Timer pollTimer = invocation.getArgument(0, Timer.class);
+            ((MockTime) time).sleep(pollTimer.remainingMs());
+            return null;
+        }).when(fetchBuffer).awaitWakeup(any(Timer.class));
+
+        
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
+        
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
+
+        // Leave AsyncPollEvent in-flight (do not complete it) so the next 
loop iteration sees inflightPoll != null
+        doAnswer(invocation -> 
null).when(applicationEventHandler).add(ArgumentMatchers.isA(AsyncPollEvent.class));
+
+        ConsumerRecords<?, ?> result = consumer.poll(Duration.ofMillis(450));

Review Comment:
   In this test we cap per-iteration wait to 100ms (maximumTimeToWait()), and 
awaitWakeup(timer) advances MockTime by pollTimer.remainingMs(). A 450ms poll 
budget reliably forces multiple loop iterations (roughly 100 + 100 + 100 + 100 
+ ~50) while avoiding exact-boundary timing, which lets us validate the core 
behavior: even across repeated iterations with empty fetches, only one 
AsyncPollEvent is enqueued while one is already in-flight (times(1)).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to