philipnee commented on code in PR #16140:
URL: https://github.com/apache/kafka/pull/16140#discussion_r1632136201


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##########
@@ -211,178 +220,52 @@ public void testResetPositionsProcessFailureIsIgnored() {
     }
 
     @Test
-    public void testValidatePositionsEventIsProcessed() {
-        ValidatePositionsEvent e = new 
ValidatePositionsEvent(calculateDeadlineMs(time, 100));
-        applicationEventsQueue.add(e);
-        consumerNetworkThread.runOnce();
-        
verify(applicationEventProcessor).process(any(ValidatePositionsEvent.class));
-        assertTrue(applicationEventsQueue.isEmpty());
-    }
-
-    @Test
-    public void testAssignmentChangeEvent() {
-        HashMap<TopicPartition, OffsetAndMetadata> offset = 
mockTopicPartitionOffset();
-
-        final long currentTimeMs = time.milliseconds();
-        ApplicationEvent e = new AssignmentChangeEvent(offset, currentTimeMs);
-        applicationEventsQueue.add(e);
-
-        consumerNetworkThread.runOnce();
-        
verify(applicationEventProcessor).process(any(AssignmentChangeEvent.class));
-        verify(networkClient, times(1)).poll(anyLong(), anyLong());
-        verify(commitRequestManager, 
times(1)).updateAutoCommitTimer(currentTimeMs);
-        // Assignment change should generate an async commit (not retried).
-        verify(commitRequestManager, times(1)).maybeAutoCommitAsync();
-    }
-
-    @Test
-    void testFetchTopicMetadata() {
-        applicationEventsQueue.add(new TopicMetadataEvent("topic", 
Long.MAX_VALUE));
-        consumerNetworkThread.runOnce();
-        
verify(applicationEventProcessor).process(any(TopicMetadataEvent.class));
-    }
-
-    @Test
-    void testMaximumTimeToWait() {
+    public void testMaximumTimeToWait() {
+        final int defaultHeartbeatIntervalMs = 1000;
         // Initial value before runOnce has been called
         assertEquals(ConsumerNetworkThread.MAX_POLL_TIMEOUT_MS, 
consumerNetworkThread.maximumTimeToWait());
-        consumerNetworkThread.runOnce();
-        // After runOnce has been called, it takes the default heartbeat 
interval from the heartbeat request manager
-        assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS, 
consumerNetworkThread.maximumTimeToWait());
-    }
 
-    @Test
-    void testRequestManagersArePolledOnce() {
-        consumerNetworkThread.runOnce();
-        testBuilder.requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm 
-> verify(rm, times(1)).poll(anyLong())));
-        testBuilder.requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm 
-> verify(rm, times(1)).maximumTimeToWait(anyLong())));
-        verify(networkClient, times(1)).poll(anyLong(), anyLong());
-    }
+        
when(requestManagers.entries()).thenReturn(Collections.singletonList(Optional.of(heartbeatRequestManager)));
+        
when(heartbeatRequestManager.maximumTimeToWait(time.milliseconds())).thenReturn((long)
 defaultHeartbeatIntervalMs);
 
-    @Test
-    void testEnsureMetadataUpdateOnPoll() {
-        MetadataResponse metadataResponse = 
RequestTestUtils.metadataUpdateWith(2, Collections.emptyMap());
-        client.prepareMetadataUpdate(metadataResponse);
-        metadata.requestUpdate(false);
         consumerNetworkThread.runOnce();
-        verify(metadata, 
times(1)).updateWithCurrentRequestVersion(eq(metadataResponse), eq(false), 
anyLong());
-    }
-
-    @Test
-    void testEnsureEventsAreCompleted() {
-        // Mimic the logic of CompletableEventReaper.reap(Collection):
-        doAnswer(__ -> {
-            Iterator<ApplicationEvent> i = applicationEventsQueue.iterator();
-
-            while (i.hasNext()) {
-                ApplicationEvent event = i.next();
-
-                if (event instanceof CompletableEvent)
-                    ((CompletableEvent<?>) 
event).future().completeExceptionally(new TimeoutException());
-
-                i.remove();
-            }
-
-            return null;
-        }).when(applicationEventReaper).reap(any(Collection.class));
-
-        Node node = metadata.fetch().nodes().get(0);
-        coordinatorRequestManager.markCoordinatorUnknown("test", 
time.milliseconds());
-        
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
"group-id", node));
-        prepareOffsetCommitRequest(new HashMap<>(), Errors.NONE, false);
-        CompletableApplicationEvent<Void> event1 = spy(new 
AsyncCommitEvent(Collections.emptyMap()));
-        ApplicationEvent event2 = new AsyncCommitEvent(Collections.emptyMap());
-        CompletableFuture<Void> future = new CompletableFuture<>();
-        when(event1.future()).thenReturn(future);
-        applicationEventsQueue.add(event1);
-        applicationEventsQueue.add(event2);
-        assertFalse(future.isDone());
-        assertFalse(applicationEventsQueue.isEmpty());
-        consumerNetworkThread.cleanup();
-        assertTrue(future.isCompletedExceptionally());
-        assertTrue(applicationEventsQueue.isEmpty());
+        // After runOnce has been called, it takes the default heartbeat 
interval from the heartbeat request manager
+        assertEquals(defaultHeartbeatIntervalMs, 
consumerNetworkThread.maximumTimeToWait());
     }
 
     @Test
-    void testCleanupInvokesReaper() {
+    public void testCleanupInvokesReaper() {
+        LinkedList<NetworkClientDelegate.UnsentRequest> queue = new 
LinkedList<>();
+        when(networkClientDelegate.unsentRequests()).thenReturn(queue);
         consumerNetworkThread.cleanup();
         verify(applicationEventReaper).reap(applicationEventsQueue);
     }
 
     @Test
-    void testRunOnceInvokesReaper() {
+    public void testRunOnceInvokesReaper() {
         consumerNetworkThread.runOnce();
         verify(applicationEventReaper).reap(any(Long.class));
     }
 
     @Test
-    void testSendUnsentRequest() {
-        String groupId = "group-id";
-        NetworkClientDelegate.UnsentRequest request = new 
NetworkClientDelegate.UnsentRequest(
-            new FindCoordinatorRequest.Builder(
-                new FindCoordinatorRequestData()
-                    
.setKeyType(FindCoordinatorRequest.CoordinatorType.TRANSACTION.id())
-                    .setKey(groupId)),
-            Optional.empty());
-
-        networkClient.add(request);
-        assertTrue(networkClient.hasAnyPendingRequests());
-        assertFalse(networkClient.unsentRequests().isEmpty());
-        assertFalse(client.hasInFlightRequests());
+    public void testSendUnsentRequests() {
+        
when(networkClientDelegate.hasAnyPendingRequests()).thenReturn(true).thenReturn(true).thenReturn(false);
         consumerNetworkThread.cleanup();
-
-        assertTrue(networkClient.unsentRequests().isEmpty());
-        assertFalse(client.hasInFlightRequests());
-        assertFalse(networkClient.hasAnyPendingRequests());
-    }
-
-    private void prepareOffsetCommitRequest(final Map<TopicPartition, Long> 
expectedOffsets,
-                                            final Errors error,
-                                            final boolean disconnected) {
-        Map<TopicPartition, Errors> errors = 
partitionErrors(expectedOffsets.keySet(), error);
-        client.prepareResponse(offsetCommitRequestMatcher(expectedOffsets), 
offsetCommitResponse(errors), disconnected);
-    }
-
-    private Map<TopicPartition, Errors> partitionErrors(final 
Collection<TopicPartition> partitions,
-                                                        final Errors error) {
-        final Map<TopicPartition, Errors> errors = new HashMap<>();
-        for (TopicPartition partition : partitions) {
-            errors.put(partition, error);
-        }
-        return errors;
-    }
-
-    private OffsetCommitResponse offsetCommitResponse(final 
Map<TopicPartition, Errors> responseData) {
-        return new OffsetCommitResponse(responseData);
-    }
-
-    private MockClient.RequestMatcher offsetCommitRequestMatcher(final 
Map<TopicPartition, Long> expectedOffsets) {
-        return body -> {
-            OffsetCommitRequest req = (OffsetCommitRequest) body;
-            Map<TopicPartition, Long> offsets = req.offsets();
-            if (offsets.size() != expectedOffsets.size())
-                return false;
-
-            for (Map.Entry<TopicPartition, Long> expectedOffset : 
expectedOffsets.entrySet()) {
-                if (!offsets.containsKey(expectedOffset.getKey())) {
-                    return false;
-                } else {
-                    Long actualOffset = offsets.get(expectedOffset.getKey());
-                    if (!actualOffset.equals(expectedOffset.getValue())) {
-                        return false;
-                    }
-                }
-            }
-            return true;
-        };
+        verify(networkClientDelegate, times(2)).poll(anyLong(), anyLong());
     }
 
-    private HashMap<TopicPartition, OffsetAndMetadata> 
mockTopicPartitionOffset() {
-        final TopicPartition t0 = new TopicPartition("t0", 2);
-        final TopicPartition t1 = new TopicPartition("t0", 3);
-        HashMap<TopicPartition, OffsetAndMetadata> topicPartitionOffsets = new 
HashMap<>();
-        topicPartitionOffsets.put(t0, new OffsetAndMetadata(10L));
-        topicPartitionOffsets.put(t1, new OffsetAndMetadata(20L));
-        return topicPartitionOffsets;
+    private static Stream<Arguments> applicationEvents() {
+        Time time1 = new MockTime();
+        Map<TopicPartition, OffsetAndMetadata> offset = new HashMap<>();
+        final long currentTimeMs = time1.milliseconds();

Review Comment:
   can we just make up `currentTimeMs` value when constructing the event?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to