cadonna commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1601414540


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##########
@@ -150,6 +151,7 @@ public class AsyncKafkaConsumerTest {
     private final ApplicationEventHandler applicationEventHandler = 
mock(ApplicationEventHandler.class);
     private final ConsumerMetadata metadata = mock(ConsumerMetadata.class);
     private final LinkedBlockingQueue<BackgroundEvent> backgroundEventQueue = 
new LinkedBlockingQueue<>();
+    private final CompletableEventReaper backgroundEventReaper = new 
CompletableEventReaper(new LogContext());

Review Comment:
   Why is this not a mock?
   You do not need to test the actual reaper here. You just need to verify that 
the reaper is called correctly in the correct situations. 
   
   Please do not use a spy. 🙏



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##########
@@ -294,6 +299,46 @@ void testEnsureEventsAreCompleted() {
         assertTrue(applicationEventsQueue.isEmpty());
     }
 
+    @Test
+    void testReaperExpiresExpiredEvents() {
+        long event1TimeoutMs = 100;
+        long event2TimeoutMs = 200;
+        SyncCommitEvent event1 = new SyncCommitEvent(new HashMap<>(), 
calculateDeadlineMs(time, event1TimeoutMs));
+        SyncCommitEvent event2 = new SyncCommitEvent(new HashMap<>(), 
calculateDeadlineMs(time, event2TimeoutMs));
+        applicationEventsQueue.add(event1);
+        applicationEventsQueue.add(event2);
+        consumerNetworkThread.runOnce();
+
+        // Make sure both events have been moved from the event queue to the 
reaper's "tracked" list.
+        assertFalse(applicationEventsQueue.contains(event1));
+        assertFalse(applicationEventsQueue.contains(event2));
+        assertTrue(applicationEventReaper.contains(event1));
+        assertTrue(applicationEventReaper.contains(event2));
+        assertEquals(2, applicationEventReaper.size());
+
+        // Sleep long enough for the first event to have expired.
+        time.sleep(event1TimeoutMs + 1);
+
+        consumerNetworkThread.runOnce();
+
+        // Validate that the first event was expired, but the second continues 
to be tracked
+        assertTrue(event1.future().isCompletedExceptionally());
+        assertThrows(TimeoutException.class, () -> 
ConsumerUtils.getResult(event1.future()));
+        assertFalse(applicationEventReaper.contains(event1));
+
+        assertTrue(applicationEventReaper.contains(event2));
+        assertFalse(event2.future().isDone());
+        assertEquals(1, applicationEventReaper.size());
+
+        // The cleanup will trigger the reaper's
+        consumerNetworkThread.cleanup();

Review Comment:
   I would write two distinct tests for `runOnce()` and `cleanup()`. By using a 
mock, it should get simpler to separate the tests.



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##########
@@ -1846,7 +1849,34 @@ public void 
testPollThrowsInterruptExceptionIfInterrupted() {
         }
         assertDoesNotThrow(() -> consumer.poll(Duration.ZERO));
     }
-    
+
+    @Test
+    void testReaperExpiresExpiredEvents() {
+        consumer = newConsumer();
+        final String topicName = "foo";
+        final int partition = 3;
+        final TopicPartition tp = new TopicPartition(topicName, partition);
+        final SortedSet<TopicPartition> partitions = new 
TreeSet<>(TOPIC_PARTITION_COMPARATOR);
+        partitions.add(tp);
+
+        consumer.subscribe(Collections.singletonList(topicName), new 
CounterConsumerRebalanceListener());
+
+        final ConsumerRebalanceListenerCallbackNeededEvent event1 = new 
ConsumerRebalanceListenerCallbackNeededEvent(ON_PARTITIONS_REVOKED, partitions);
+        final ConsumerRebalanceListenerCallbackNeededEvent event2 = new 
ConsumerRebalanceListenerCallbackNeededEvent(ON_PARTITIONS_ASSIGNED, 
partitions);
+        backgroundEventReaper.add(event1);
+        backgroundEventQueue.add(event2);
+
+        assertEquals(1, backgroundEventReaper.size());
+        assertEquals(1, backgroundEventQueue.size());
+
+        consumer.close();

Review Comment:
   The reaper is not only called in `close()`. It is also called in 
`unsubscribe()` and `poll()`. I do not know how important it is that the reaper 
is called in `unsubscribe()` or if it is a collateral that we do not need to 
test. You know best. However, verifying the calls to the reaper in `poll()` 
seems important to me, doesn't it?



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##########
@@ -1846,7 +1849,34 @@ public void 
testPollThrowsInterruptExceptionIfInterrupted() {
         }
         assertDoesNotThrow(() -> consumer.poll(Duration.ZERO));
     }
-    
+
+    @Test
+    void testReaperExpiresExpiredEvents() {
+        consumer = newConsumer();
+        final String topicName = "foo";
+        final int partition = 3;
+        final TopicPartition tp = new TopicPartition(topicName, partition);
+        final SortedSet<TopicPartition> partitions = new 
TreeSet<>(TOPIC_PARTITION_COMPARATOR);
+        partitions.add(tp);
+
+        consumer.subscribe(Collections.singletonList(topicName), new 
CounterConsumerRebalanceListener());
+
+        final ConsumerRebalanceListenerCallbackNeededEvent event1 = new 
ConsumerRebalanceListenerCallbackNeededEvent(ON_PARTITIONS_REVOKED, partitions);
+        final ConsumerRebalanceListenerCallbackNeededEvent event2 = new 
ConsumerRebalanceListenerCallbackNeededEvent(ON_PARTITIONS_ASSIGNED, 
partitions);
+        backgroundEventReaper.add(event1);
+        backgroundEventQueue.add(event2);
+
+        assertEquals(1, backgroundEventReaper.size());
+        assertEquals(1, backgroundEventQueue.size());
+
+        consumer.close();
+
+        assertEquals(0, backgroundEventReaper.size());
+        assertEquals(0, backgroundEventQueue.size());
+        assertTrue(event1.future().isCompletedExceptionally());
+        assertTrue(event2.future().isCompletedExceptionally());
+    }

Review Comment:
   If you used a mock for the reaper, something like this would be enough to 
verify the correct use of the reaper in `close()`.
   
   ```suggestion
       void testReaperIsCalledInClose() {
           consumer = newConsumer();
   
           consumer.close();
   
           verify(backgroundEventReaper).reap(backgroundEventQueue)
       }
   ```



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##########
@@ -294,6 +299,46 @@ void testEnsureEventsAreCompleted() {
         assertTrue(applicationEventsQueue.isEmpty());
     }
 
+    @Test
+    void testReaperExpiresExpiredEvents() {

Review Comment:
   With a mock also this test should get simpler.



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##########
@@ -88,6 +91,7 @@ public class ConsumerNetworkThreadTest {
     private CommitRequestManager commitRequestManager;
     private CoordinatorRequestManager coordinatorRequestManager;
     private ConsumerNetworkThread consumerNetworkThread;
+    private CompletableEventReaper applicationEventReaper;

Review Comment:
   Also here, why not a mock?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to