kirktrue commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1602382027


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##########
@@ -294,6 +299,46 @@ void testEnsureEventsAreCompleted() {
         assertTrue(applicationEventsQueue.isEmpty());
     }
 
+    @Test
+    void testReaperExpiresExpiredEvents() {
+        long event1TimeoutMs = 100;
+        long event2TimeoutMs = 200;
+        SyncCommitEvent event1 = new SyncCommitEvent(new HashMap<>(), 
calculateDeadlineMs(time, event1TimeoutMs));
+        SyncCommitEvent event2 = new SyncCommitEvent(new HashMap<>(), 
calculateDeadlineMs(time, event2TimeoutMs));
+        applicationEventsQueue.add(event1);
+        applicationEventsQueue.add(event2);
+        consumerNetworkThread.runOnce();
+
+        // Make sure both events have been moved from the event queue to the 
reaper's "tracked" list.
+        assertFalse(applicationEventsQueue.contains(event1));
+        assertFalse(applicationEventsQueue.contains(event2));
+        assertTrue(applicationEventReaper.contains(event1));
+        assertTrue(applicationEventReaper.contains(event2));
+        assertEquals(2, applicationEventReaper.size());
+
+        // Sleep long enough for the first event to have expired.
+        time.sleep(event1TimeoutMs + 1);
+
+        consumerNetworkThread.runOnce();
+
+        // Validate that the first event was expired, but the second continues 
to be tracked
+        assertTrue(event1.future().isCompletedExceptionally());
+        assertThrows(TimeoutException.class, () -> 
ConsumerUtils.getResult(event1.future()));
+        assertFalse(applicationEventReaper.contains(event1));
+
+        assertTrue(applicationEventReaper.contains(event2));
+        assertFalse(event2.future().isDone());
+        assertEquals(1, applicationEventReaper.size());
+
+        // The cleanup will trigger the reaper's
+        consumerNetworkThread.cleanup();

Review Comment:
   Yes, PTAL.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to