cadonna commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1608024280


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##########
@@ -1835,7 +1849,31 @@ public void 
testPollThrowsInterruptExceptionIfInterrupted() {
         }
         assertDoesNotThrow(() -> consumer.poll(Duration.ZERO));
     }
-    
+
+    @Test
+    void testReaperInvokedInClose() {
+        consumer = newConsumer();
+        consumer.close();
+        verify(backgroundEventReaper).reap(backgroundEventQueue);
+    }
+
+    @Test
+    void testReaperInvokedInUnsubscribe() {
+        consumer = newConsumer();
+        completeUnsubscribeApplicationEventSuccessfully();
+        consumer.unsubscribe();
+        verify(backgroundEventReaper).reap(any(Long.class));

Review Comment:
   You control the time here. Why do you not verify that `reap()` is called 
with the correct time?



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##########
@@ -281,64 +276,15 @@ void testEnsureMetadataUpdateOnPoll() {
     }
 
     @Test
-    void testEnsureEventsAreCompleted() {
-        Node node = metadata.fetch().nodes().get(0);
-        coordinatorRequestManager.markCoordinatorUnknown("test", 
time.milliseconds());
-        
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
"group-id", node));
-        prepareOffsetCommitRequest(new HashMap<>(), Errors.NONE, false);
-        CompletableApplicationEvent<Void> event1 = spy(new 
AsyncCommitEvent(Collections.emptyMap()));
-        ApplicationEvent event2 = new AsyncCommitEvent(Collections.emptyMap());
-        CompletableFuture<Void> future = new CompletableFuture<>();
-        when(event1.future()).thenReturn(future);
-        applicationEventsQueue.add(event1);
-        applicationEventsQueue.add(event2);
-        assertFalse(future.isDone());
-        assertFalse(applicationEventsQueue.isEmpty());
-
+    void testCleanupInvokesReaper() {
         consumerNetworkThread.cleanup();
-        assertTrue(future.isCompletedExceptionally());
-        assertTrue(applicationEventsQueue.isEmpty());
-    }
-
-    private void prepareOffsetCommitRequest(final Map<TopicPartition, Long> 
expectedOffsets,
-                                            final Errors error,
-                                            final boolean disconnected) {
-        Map<TopicPartition, Errors> errors = 
partitionErrors(expectedOffsets.keySet(), error);
-        client.prepareResponse(offsetCommitRequestMatcher(expectedOffsets), 
offsetCommitResponse(errors), disconnected);
-    }
-
-    private Map<TopicPartition, Errors> partitionErrors(final 
Collection<TopicPartition> partitions,
-                                                        final Errors error) {
-        final Map<TopicPartition, Errors> errors = new HashMap<>();
-        for (TopicPartition partition : partitions) {
-            errors.put(partition, error);
-        }
-        return errors;
-    }
-
-    private OffsetCommitResponse offsetCommitResponse(final 
Map<TopicPartition, Errors> responseData) {
-        return new OffsetCommitResponse(responseData);
+        verify(applicationEventReaper).reap(applicationEventsQueue);
     }
 
-    private MockClient.RequestMatcher offsetCommitRequestMatcher(final 
Map<TopicPartition, Long> expectedOffsets) {
-        return body -> {
-            OffsetCommitRequest req = (OffsetCommitRequest) body;
-            Map<TopicPartition, Long> offsets = req.offsets();
-            if (offsets.size() != expectedOffsets.size())
-                return false;
-
-            for (Map.Entry<TopicPartition, Long> expectedOffset : 
expectedOffsets.entrySet()) {
-                if (!offsets.containsKey(expectedOffset.getKey())) {
-                    return false;
-                } else {
-                    Long actualOffset = offsets.get(expectedOffset.getKey());
-                    if (!actualOffset.equals(expectedOffset.getValue())) {
-                        return false;
-                    }
-                }
-            }
-            return true;
-        };
+    @Test
+    void testRunOnceInvokesReaper() {
+        consumerNetworkThread.runOnce();
+        verify(applicationEventReaper).reap(any(Long.class));

Review Comment:
   You control the time here. Why do you not verify that `reap()` is called 
with the correct time?



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##########
@@ -281,64 +276,15 @@ void testEnsureMetadataUpdateOnPoll() {
     }
 
     @Test
-    void testEnsureEventsAreCompleted() {

Review Comment:
   Why did you remove this test without replacement?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to