cadonna commented on code in PR #15640: URL: https://github.com/apache/kafka/pull/15640#discussion_r1601414540
########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java: ########## @@ -150,6 +151,7 @@ public class AsyncKafkaConsumerTest { private final ApplicationEventHandler applicationEventHandler = mock(ApplicationEventHandler.class); private final ConsumerMetadata metadata = mock(ConsumerMetadata.class); private final LinkedBlockingQueue<BackgroundEvent> backgroundEventQueue = new LinkedBlockingQueue<>(); + private final CompletableEventReaper backgroundEventReaper = new CompletableEventReaper(new LogContext()); Review Comment: Why is this not a mock? You do not need to test the actual reaper here. You just need to verify that the reaper is called correctly in the correct situations. Please do not use a spy. 🙏 ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ########## @@ -294,6 +299,46 @@ void testEnsureEventsAreCompleted() { assertTrue(applicationEventsQueue.isEmpty()); } + @Test + void testReaperExpiresExpiredEvents() { + long event1TimeoutMs = 100; + long event2TimeoutMs = 200; + SyncCommitEvent event1 = new SyncCommitEvent(new HashMap<>(), calculateDeadlineMs(time, event1TimeoutMs)); + SyncCommitEvent event2 = new SyncCommitEvent(new HashMap<>(), calculateDeadlineMs(time, event2TimeoutMs)); + applicationEventsQueue.add(event1); + applicationEventsQueue.add(event2); + consumerNetworkThread.runOnce(); + + // Make sure both events have been moved from the event queue to the reaper's "tracked" list. + assertFalse(applicationEventsQueue.contains(event1)); + assertFalse(applicationEventsQueue.contains(event2)); + assertTrue(applicationEventReaper.contains(event1)); + assertTrue(applicationEventReaper.contains(event2)); + assertEquals(2, applicationEventReaper.size()); + + // Sleep long enough for the first event to have expired. + time.sleep(event1TimeoutMs + 1); + + consumerNetworkThread.runOnce(); + + // Validate that the first event was expired, but the second continues to be tracked + assertTrue(event1.future().isCompletedExceptionally()); + assertThrows(TimeoutException.class, () -> ConsumerUtils.getResult(event1.future())); + assertFalse(applicationEventReaper.contains(event1)); + + assertTrue(applicationEventReaper.contains(event2)); + assertFalse(event2.future().isDone()); + assertEquals(1, applicationEventReaper.size()); + + // The cleanup will trigger the reaper's + consumerNetworkThread.cleanup(); Review Comment: I would write two distinct tests for `runOnce()` and `cleanup()`. By using a mock, it should get simpler to separate the tests. ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java: ########## @@ -1846,7 +1849,34 @@ public void testPollThrowsInterruptExceptionIfInterrupted() { } assertDoesNotThrow(() -> consumer.poll(Duration.ZERO)); } - + + @Test + void testReaperExpiresExpiredEvents() { + consumer = newConsumer(); + final String topicName = "foo"; + final int partition = 3; + final TopicPartition tp = new TopicPartition(topicName, partition); + final SortedSet<TopicPartition> partitions = new TreeSet<>(TOPIC_PARTITION_COMPARATOR); + partitions.add(tp); + + consumer.subscribe(Collections.singletonList(topicName), new CounterConsumerRebalanceListener()); + + final ConsumerRebalanceListenerCallbackNeededEvent event1 = new ConsumerRebalanceListenerCallbackNeededEvent(ON_PARTITIONS_REVOKED, partitions); + final ConsumerRebalanceListenerCallbackNeededEvent event2 = new ConsumerRebalanceListenerCallbackNeededEvent(ON_PARTITIONS_ASSIGNED, partitions); + backgroundEventReaper.add(event1); + backgroundEventQueue.add(event2); + + assertEquals(1, backgroundEventReaper.size()); + assertEquals(1, backgroundEventQueue.size()); + + consumer.close(); Review Comment: The reaper is not only called in `close()`. It is also called in `unsubscribe()` and `poll()`. I do not know how important it is that the reaper is called in `unsubscribe()` or if it is a collateral that we do not need to test. You know best. However, verifying the calls to the reaper in `poll()` seems important to me, doesn't it? ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java: ########## @@ -1846,7 +1849,34 @@ public void testPollThrowsInterruptExceptionIfInterrupted() { } assertDoesNotThrow(() -> consumer.poll(Duration.ZERO)); } - + + @Test + void testReaperExpiresExpiredEvents() { + consumer = newConsumer(); + final String topicName = "foo"; + final int partition = 3; + final TopicPartition tp = new TopicPartition(topicName, partition); + final SortedSet<TopicPartition> partitions = new TreeSet<>(TOPIC_PARTITION_COMPARATOR); + partitions.add(tp); + + consumer.subscribe(Collections.singletonList(topicName), new CounterConsumerRebalanceListener()); + + final ConsumerRebalanceListenerCallbackNeededEvent event1 = new ConsumerRebalanceListenerCallbackNeededEvent(ON_PARTITIONS_REVOKED, partitions); + final ConsumerRebalanceListenerCallbackNeededEvent event2 = new ConsumerRebalanceListenerCallbackNeededEvent(ON_PARTITIONS_ASSIGNED, partitions); + backgroundEventReaper.add(event1); + backgroundEventQueue.add(event2); + + assertEquals(1, backgroundEventReaper.size()); + assertEquals(1, backgroundEventQueue.size()); + + consumer.close(); + + assertEquals(0, backgroundEventReaper.size()); + assertEquals(0, backgroundEventQueue.size()); + assertTrue(event1.future().isCompletedExceptionally()); + assertTrue(event2.future().isCompletedExceptionally()); + } Review Comment: If you used a mock for the reaper, something like this would be enough to verify the correct use of the reaper in `close()`. ```suggestion void testReaperIsCalledInClose() { consumer = newConsumer(); consumer.close(); verify(backgroundEventReaper).reap(backgroundEventQueue) } ``` ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ########## @@ -294,6 +299,46 @@ void testEnsureEventsAreCompleted() { assertTrue(applicationEventsQueue.isEmpty()); } + @Test + void testReaperExpiresExpiredEvents() { Review Comment: With a mock also this test should get simpler. ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ########## @@ -88,6 +91,7 @@ public class ConsumerNetworkThreadTest { private CommitRequestManager commitRequestManager; private CoordinatorRequestManager coordinatorRequestManager; private ConsumerNetworkThread consumerNetworkThread; + private CompletableEventReaper applicationEventReaper; Review Comment: Also here, why not a mock? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org