sjhajharia commented on code in PR #21800:
URL: https://github.com/apache/kafka/pull/21800#discussion_r2958449148


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java:
##########
@@ -820,6 +820,34 @@ public void testEnsurePollEventSentOnConsumerPoll() {
         
verify(applicationEventHandler).addAndGet(any(ShareAcknowledgeOnCloseEvent.class));
     }
 
+    @Test
+    public void testPollDoesNotAddNewSharePollEventWhenOneIsAlreadyInFlight() {
+        ShareFetchBuffer fetchBuffer = mock(ShareFetchBuffer.class);
+        SubscriptionState subscriptions = new SubscriptionState(new 
LogContext(), AutoOffsetResetStrategy.NONE);
+        consumer = newConsumer(fetchBuffer, subscriptions, "group-id", 
"client-id", "implicit");
+
+        TopicPartition tp = new TopicPartition("topic1", 0);
+        subscriptions.assignFromUser(Collections.singleton(tp));
+        subscriptions.seek(tp, 0);
+
+        // Keep pollForFetches from spinning by making it "wait" and advance 
MockTime.
+        doReturn(100L).when(applicationEventHandler).maximumTimeToWait();
+        doAnswer(invocation -> {
+            Timer pollTimer = invocation.getArgument(0, Timer.class);
+            ((MockTime) time).sleep(pollTimer.remainingMs());
+            return null;
+        }).when(fetchBuffer).awaitNotEmpty(any(Timer.class));
+
+        // Always empty fetch: forces multiple loop iterations until the 
overall poll timeout expires.
+        
doReturn(ShareFetch.empty()).when(fetchCollector).collect(any(ShareFetchBuffer.class));
+
+        ConsumerRecords<?, ?> result = consumer.poll(Duration.ofMillis(450));
+        assertTrue(result.isEmpty());
+
+        verify(fetchBuffer, 
org.mockito.Mockito.atLeastOnce()).awaitNotEmpty(any(Timer.class));

Review Comment:
   Can we add a static import statement instead of using this pattern 
`org.mockito.Mockito.atLeastOnce()`



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java:
##########
@@ -820,6 +820,34 @@ public void testEnsurePollEventSentOnConsumerPoll() {
         
verify(applicationEventHandler).addAndGet(any(ShareAcknowledgeOnCloseEvent.class));
     }
 
+    @Test
+    public void testPollDoesNotAddNewSharePollEventWhenOneIsAlreadyInFlight() {
+        ShareFetchBuffer fetchBuffer = mock(ShareFetchBuffer.class);
+        SubscriptionState subscriptions = new SubscriptionState(new 
LogContext(), AutoOffsetResetStrategy.NONE);
+        consumer = newConsumer(fetchBuffer, subscriptions, "group-id", 
"client-id", "implicit");
+
+        TopicPartition tp = new TopicPartition("topic1", 0);
+        subscriptions.assignFromUser(Collections.singleton(tp));
+        subscriptions.seek(tp, 0);
+
+        // Keep pollForFetches from spinning by making it "wait" and advance 
MockTime.
+        doReturn(100L).when(applicationEventHandler).maximumTimeToWait();
+        doAnswer(invocation -> {
+            Timer pollTimer = invocation.getArgument(0, Timer.class);
+            ((MockTime) time).sleep(pollTimer.remainingMs());
+            return null;
+        }).when(fetchBuffer).awaitNotEmpty(any(Timer.class));
+
+        // Always empty fetch: forces multiple loop iterations until the 
overall poll timeout expires.
+        
doReturn(ShareFetch.empty()).when(fetchCollector).collect(any(ShareFetchBuffer.class));
+
+        ConsumerRecords<?, ?> result = consumer.poll(Duration.ofMillis(450));

Review Comment:
   Can you share the reasoning behind using 450 as the poll time? It seems 
oddly specific.



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##########
@@ -1677,6 +1679,53 @@ public void testLongPollWaitIsLimited() {
         assertEquals(partitions, consumer.assignment());
     }
 
+    /**
+     * Verifies that at most one {@link AsyncPollEvent} is in-flight at a 
time. When {@code poll()} runs
+     * multiple loop iterations (e.g. empty fetches), it must not enqueue a 
new event while the previous
+     * one is still in-flight. This prevents unnecessary queueing and matches 
the single-instance design
+     * used in share consumer (KAFKA-20309). See KAFKA-20315.
+     */
+    @Test
+    public void testPollDoesNotAddNewAsyncPollEventWhenOneIsAlreadyInFlight() {
+        // Build a consumer with a mocked FetchBuffer so we can 
deterministically advance MockTime and avoid
+        // tight spinning while poll() waits.
+        FetchBuffer fetchBuffer = mock(FetchBuffer.class);
+        ConsumerInterceptors<String, String> interceptors = 
mock(ConsumerInterceptors.class);
+        ConsumerRebalanceListenerInvoker rebalanceListenerInvoker = 
mock(ConsumerRebalanceListenerInvoker.class);
+        SubscriptionState subscriptions = new SubscriptionState(new 
LogContext(), AutoOffsetResetStrategy.NONE);
+        consumer = newConsumer(fetchBuffer, interceptors, 
rebalanceListenerInvoker, subscriptions);
+
+        final String topicName = "topic1";
+        final TopicPartition tp = new TopicPartition(topicName, 0);
+
+        // Satisfy poll() preconditions without needing assign() (which would 
require stubbing addAndGet()).
+        subscriptions.assignFromUser(singleton(tp));
+        subscriptions.seek(tp, 0);
+
+        // Make pollForFetches() "wait" by advancing mock time.
+        doReturn(100L).when(applicationEventHandler).maximumTimeToWait();
+        doAnswer(invocation -> {
+            Timer pollTimer = invocation.getArgument(0, Timer.class);
+            ((MockTime) time).sleep(pollTimer.remainingMs());
+            return null;
+        }).when(fetchBuffer).awaitWakeup(any(Timer.class));
+
+        
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
+        
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
+
+        // Leave AsyncPollEvent in-flight (do not complete it) so the next 
loop iteration sees inflightPoll != null
+        doAnswer(invocation -> 
null).when(applicationEventHandler).add(ArgumentMatchers.isA(AsyncPollEvent.class));
+
+        ConsumerRecords<?, ?> result = consumer.poll(Duration.ofMillis(450));

Review Comment:
   Can you share the reasoning behind using `450` as the poll time? It seems 
oddly specific.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to