sjhajharia commented on code in PR #21800:
URL: https://github.com/apache/kafka/pull/21800#discussion_r2958449148
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java:
##########
@@ -820,6 +820,34 @@ public void testEnsurePollEventSentOnConsumerPoll() {
verify(applicationEventHandler).addAndGet(any(ShareAcknowledgeOnCloseEvent.class));
}
+ @Test
+ public void testPollDoesNotAddNewSharePollEventWhenOneIsAlreadyInFlight() {
+ ShareFetchBuffer fetchBuffer = mock(ShareFetchBuffer.class);
+ SubscriptionState subscriptions = new SubscriptionState(new
LogContext(), AutoOffsetResetStrategy.NONE);
+ consumer = newConsumer(fetchBuffer, subscriptions, "group-id",
"client-id", "implicit");
+
+ TopicPartition tp = new TopicPartition("topic1", 0);
+ subscriptions.assignFromUser(Collections.singleton(tp));
+ subscriptions.seek(tp, 0);
+
+ // Keep pollForFetches from spinning by making it "wait" and advance
MockTime.
+ doReturn(100L).when(applicationEventHandler).maximumTimeToWait();
+ doAnswer(invocation -> {
+ Timer pollTimer = invocation.getArgument(0, Timer.class);
+ ((MockTime) time).sleep(pollTimer.remainingMs());
+ return null;
+ }).when(fetchBuffer).awaitNotEmpty(any(Timer.class));
+
+ // Always empty fetch: forces multiple loop iterations until the
overall poll timeout expires.
+
doReturn(ShareFetch.empty()).when(fetchCollector).collect(any(ShareFetchBuffer.class));
+
+ ConsumerRecords<?, ?> result = consumer.poll(Duration.ofMillis(450));
+ assertTrue(result.isEmpty());
+
+ verify(fetchBuffer,
org.mockito.Mockito.atLeastOnce()).awaitNotEmpty(any(Timer.class));
Review Comment:
Can we add a static import statement instead of using this pattern
`org.mockito.Mockito.atLeastOnce()`
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java:
##########
@@ -820,6 +820,34 @@ public void testEnsurePollEventSentOnConsumerPoll() {
verify(applicationEventHandler).addAndGet(any(ShareAcknowledgeOnCloseEvent.class));
}
+ @Test
+ public void testPollDoesNotAddNewSharePollEventWhenOneIsAlreadyInFlight() {
+ ShareFetchBuffer fetchBuffer = mock(ShareFetchBuffer.class);
+ SubscriptionState subscriptions = new SubscriptionState(new
LogContext(), AutoOffsetResetStrategy.NONE);
+ consumer = newConsumer(fetchBuffer, subscriptions, "group-id",
"client-id", "implicit");
+
+ TopicPartition tp = new TopicPartition("topic1", 0);
+ subscriptions.assignFromUser(Collections.singleton(tp));
+ subscriptions.seek(tp, 0);
+
+ // Keep pollForFetches from spinning by making it "wait" and advance
MockTime.
+ doReturn(100L).when(applicationEventHandler).maximumTimeToWait();
+ doAnswer(invocation -> {
+ Timer pollTimer = invocation.getArgument(0, Timer.class);
+ ((MockTime) time).sleep(pollTimer.remainingMs());
+ return null;
+ }).when(fetchBuffer).awaitNotEmpty(any(Timer.class));
+
+ // Always empty fetch: forces multiple loop iterations until the
overall poll timeout expires.
+
doReturn(ShareFetch.empty()).when(fetchCollector).collect(any(ShareFetchBuffer.class));
+
+ ConsumerRecords<?, ?> result = consumer.poll(Duration.ofMillis(450));
Review Comment:
Can you share the reasoning behind using 450 as the poll time? It seems
oddly specific.
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##########
@@ -1677,6 +1679,53 @@ public void testLongPollWaitIsLimited() {
assertEquals(partitions, consumer.assignment());
}
+ /**
+ * Verifies that at most one {@link AsyncPollEvent} is in-flight at a
time. When {@code poll()} runs
+ * multiple loop iterations (e.g. empty fetches), it must not enqueue a
new event while the previous
+ * one is still in-flight. This prevents unnecessary queueing and matches
the single-instance design
+ * used in share consumer (KAFKA-20309). See KAFKA-20315.
+ */
+ @Test
+ public void testPollDoesNotAddNewAsyncPollEventWhenOneIsAlreadyInFlight() {
+ // Build a consumer with a mocked FetchBuffer so we can
deterministically advance MockTime and avoid
+ // tight spinning while poll() waits.
+ FetchBuffer fetchBuffer = mock(FetchBuffer.class);
+ ConsumerInterceptors<String, String> interceptors =
mock(ConsumerInterceptors.class);
+ ConsumerRebalanceListenerInvoker rebalanceListenerInvoker =
mock(ConsumerRebalanceListenerInvoker.class);
+ SubscriptionState subscriptions = new SubscriptionState(new
LogContext(), AutoOffsetResetStrategy.NONE);
+ consumer = newConsumer(fetchBuffer, interceptors,
rebalanceListenerInvoker, subscriptions);
+
+ final String topicName = "topic1";
+ final TopicPartition tp = new TopicPartition(topicName, 0);
+
+ // Satisfy poll() preconditions without needing assign() (which would
require stubbing addAndGet()).
+ subscriptions.assignFromUser(singleton(tp));
+ subscriptions.seek(tp, 0);
+
+ // Make pollForFetches() "wait" by advancing mock time.
+ doReturn(100L).when(applicationEventHandler).maximumTimeToWait();
+ doAnswer(invocation -> {
+ Timer pollTimer = invocation.getArgument(0, Timer.class);
+ ((MockTime) time).sleep(pollTimer.remainingMs());
+ return null;
+ }).when(fetchBuffer).awaitWakeup(any(Timer.class));
+
+
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
+
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
+
+ // Leave AsyncPollEvent in-flight (do not complete it) so the next
loop iteration sees inflightPoll != null
+ doAnswer(invocation ->
null).when(applicationEventHandler).add(ArgumentMatchers.isA(AsyncPollEvent.class));
+
+ ConsumerRecords<?, ?> result = consumer.poll(Duration.ofMillis(450));
Review Comment:
Can you share the reasoning behind using `450` as the poll time? It seems
oddly specific.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]