kirktrue commented on code in PR #16310: URL: https://github.com/apache/kafka/pull/16310#discussion_r1639004424
########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java: ########## @@ -574,6 +575,93 @@ public void testPollLongThrowsException() { "This method is deprecated and will be removed in the next major release.", e.getMessage()); } + @Test + public void testOffsetFetchStoresPendingEvent() { + consumer = newConsumer(); + long timeoutMs = 0; + doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class)); + consumer.assign(Collections.singleton(new TopicPartition("topic1", 0))); + + // The first attempt at poll() creates an event, enqueues it, but its Future does not complete within the + // timeout, leaving a pending fetch. + consumer.poll(Duration.ofMillis(timeoutMs)); + verify(applicationEventHandler, times(1)).add(any(FetchCommittedOffsetsEvent.class)); + CompletableApplicationEvent<Map<TopicPartition, OffsetAndMetadata>> event = getLastEnqueuedEvent(); + assertThrows(TimeoutException.class, () -> ConsumerUtils.getResult(event.future(), time.timer(timeoutMs))); + assertTrue(consumer.hasPendingOffsetFetchEvent()); + + // For the second attempt, the event is reused, and this time the Future returns successfully, clearing + // the pending fetch. Verify that the number of FetchCommittedOffsetsEvent enqueued remains at 1. + event.future().complete(Collections.emptyMap()); + consumer.poll(Duration.ofMillis(timeoutMs)); + verify(applicationEventHandler, times(1)).add(any(FetchCommittedOffsetsEvent.class)); + assertDoesNotThrow(() -> ConsumerUtils.getResult(event.future(), time.timer(timeoutMs))); + assertFalse(consumer.hasPendingOffsetFetchEvent()); + } + + @Test + public void testOffsetFetchDoesNotReuseMismatchedPendingEvent() { + consumer = newConsumer(); + long timeoutMs = 0; + doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class)); + + // The first attempt at poll() retrieves data for partition 0 of the topic. poll() creates an event, + // enqueues it, but its Future does not complete within the timeout, leaving a pending fetch. + consumer.assign(Collections.singleton(new TopicPartition("topic1", 0))); + consumer.poll(Duration.ofMillis(timeoutMs)); + verify(applicationEventHandler, times(1)).add(any(FetchCommittedOffsetsEvent.class)); + CompletableApplicationEvent<Map<TopicPartition, OffsetAndMetadata>> event1 = getLastEnqueuedEvent(); + assertThrows(TimeoutException.class, () -> ConsumerUtils.getResult(event1.future(), time.timer(timeoutMs))); + assertTrue(consumer.hasPendingOffsetFetchEvent()); + + // For the second attempt, the set of partitions is reassigned, causing the pending offset to be replaced. + // Verify that the number of FetchCommittedOffsetsEvent enqueued is updated to 2. + consumer.assign(Collections.singleton(new TopicPartition("topic1", 1))); + consumer.poll(Duration.ofMillis(timeoutMs)); + verify(applicationEventHandler, times(2)).add(any(FetchCommittedOffsetsEvent.class)); Review Comment: I went ahead and used `clearInvocations()` here too, just for consistency. Is that OK? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org