nileshkumar3 commented on code in PR #21800:
URL: https://github.com/apache/kafka/pull/21800#discussion_r2959862667


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java:
##########
@@ -820,6 +820,34 @@ public void testEnsurePollEventSentOnConsumerPoll() {
         
verify(applicationEventHandler).addAndGet(any(ShareAcknowledgeOnCloseEvent.class));
     }
 
+    @Test
+    public void testPollDoesNotAddNewSharePollEventWhenOneIsAlreadyInFlight() {
+        ShareFetchBuffer fetchBuffer = mock(ShareFetchBuffer.class);
+        SubscriptionState subscriptions = new SubscriptionState(new 
LogContext(), AutoOffsetResetStrategy.NONE);
+        consumer = newConsumer(fetchBuffer, subscriptions, "group-id", 
"client-id", "implicit");
+
+        TopicPartition tp = new TopicPartition("topic1", 0);
+        subscriptions.assignFromUser(Collections.singleton(tp));
+        subscriptions.seek(tp, 0);
+
+        // Keep pollForFetches from spinning by making it "wait" and advance 
MockTime.
+        doReturn(100L).when(applicationEventHandler).maximumTimeToWait();
+        doAnswer(invocation -> {
+            Timer pollTimer = invocation.getArgument(0, Timer.class);
+            ((MockTime) time).sleep(pollTimer.remainingMs());
+            return null;
+        }).when(fetchBuffer).awaitNotEmpty(any(Timer.class));
+
+        // Always empty fetch: forces multiple loop iterations until the 
overall poll timeout expires.
+        
doReturn(ShareFetch.empty()).when(fetchCollector).collect(any(ShareFetchBuffer.class));
+
+        ConsumerRecords<?, ?> result = consumer.poll(Duration.ofMillis(450));

Review Comment:
   450ms is picked to guarantee multiple loop cycles with the mocked 100ms wait 
cap, while avoiding exact-boundary timing.
   In practice it gives ~4 full waits plus a partial final wait, which is 
enough to prove “don’t enqueue a second SharePollEvent while one is in-flight” 
without making the test slow or flaky.



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java:
##########
@@ -820,6 +820,34 @@ public void testEnsurePollEventSentOnConsumerPoll() {
         
verify(applicationEventHandler).addAndGet(any(ShareAcknowledgeOnCloseEvent.class));
     }
 
+    @Test
+    public void testPollDoesNotAddNewSharePollEventWhenOneIsAlreadyInFlight() {
+        ShareFetchBuffer fetchBuffer = mock(ShareFetchBuffer.class);
+        SubscriptionState subscriptions = new SubscriptionState(new 
LogContext(), AutoOffsetResetStrategy.NONE);
+        consumer = newConsumer(fetchBuffer, subscriptions, "group-id", 
"client-id", "implicit");
+
+        TopicPartition tp = new TopicPartition("topic1", 0);
+        subscriptions.assignFromUser(Collections.singleton(tp));
+        subscriptions.seek(tp, 0);
+
+        // Keep pollForFetches from spinning by making it "wait" and advance 
MockTime.
+        doReturn(100L).when(applicationEventHandler).maximumTimeToWait();
+        doAnswer(invocation -> {
+            Timer pollTimer = invocation.getArgument(0, Timer.class);
+            ((MockTime) time).sleep(pollTimer.remainingMs());
+            return null;
+        }).when(fetchBuffer).awaitNotEmpty(any(Timer.class));
+
+        // Always empty fetch: forces multiple loop iterations until the 
overall poll timeout expires.
+        
doReturn(ShareFetch.empty()).when(fetchCollector).collect(any(ShareFetchBuffer.class));
+
+        ConsumerRecords<?, ?> result = consumer.poll(Duration.ofMillis(450));
+        assertTrue(result.isEmpty());
+
+        verify(fetchBuffer, 
org.mockito.Mockito.atLeastOnce()).awaitNotEmpty(any(Timer.class));

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to