brenden20 commented on code in PR #16140:
URL: https://github.com/apache/kafka/pull/16140#discussion_r1631524427


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##########
@@ -211,178 +232,80 @@ public void testResetPositionsProcessFailureIsIgnored() {
     }
 
     @Test
-    public void testValidatePositionsEventIsProcessed() {
-        ValidatePositionsEvent e = new 
ValidatePositionsEvent(calculateDeadlineMs(time, 100));
-        applicationEventsQueue.add(e);
-        consumerNetworkThread.runOnce();
-        
verify(applicationEventProcessor).process(any(ValidatePositionsEvent.class));
-        assertTrue(applicationEventsQueue.isEmpty());
-    }
-
-    @Test
-    public void testAssignmentChangeEvent() {
-        HashMap<TopicPartition, OffsetAndMetadata> offset = 
mockTopicPartitionOffset();
-
-        final long currentTimeMs = time.milliseconds();
-        ApplicationEvent e = new AssignmentChangeEvent(offset, currentTimeMs);
-        applicationEventsQueue.add(e);
-
-        consumerNetworkThread.runOnce();
-        
verify(applicationEventProcessor).process(any(AssignmentChangeEvent.class));
-        verify(networkClient, times(1)).poll(anyLong(), anyLong());
-        verify(commitRequestManager, 
times(1)).updateAutoCommitTimer(currentTimeMs);
-        // Assignment change should generate an async commit (not retried).
-        verify(commitRequestManager, times(1)).maybeAutoCommitAsync();
-    }
+    public void testPollResultTimer() {
+        NetworkClientDelegate networkClientDelegate = new 
NetworkClientDelegate(
+                time,
+                config,
+                logContext,
+                client
+        );
 
-    @Test
-    void testFetchTopicMetadata() {
-        applicationEventsQueue.add(new TopicMetadataEvent("topic", 
Long.MAX_VALUE));
-        consumerNetworkThread.runOnce();
-        
verify(applicationEventProcessor).process(any(TopicMetadataEvent.class));
+        NetworkClientDelegate.UnsentRequest req = new 
NetworkClientDelegate.UnsentRequest(
+                new FindCoordinatorRequest.Builder(
+                        new FindCoordinatorRequestData()
+                                
.setKeyType(FindCoordinatorRequest.CoordinatorType.TRANSACTION.id())
+                                .setKey("foobar")),
+                Optional.empty());
+        req.setTimer(time, DEFAULT_REQUEST_TIMEOUT_MS);
+
+        // purposely setting a non-MAX time to ensure it is returning 
Long.MAX_VALUE upon success
+        NetworkClientDelegate.PollResult success = new 
NetworkClientDelegate.PollResult(
+                10,
+                Collections.singletonList(req));
+        assertEquals(10, networkClientDelegate.addAll(success));
+
+        NetworkClientDelegate.PollResult failure = new 
NetworkClientDelegate.PollResult(
+                10,
+                new ArrayList<>());
+        assertEquals(10, networkClientDelegate.addAll(failure));

Review Comment:
   It sems some of my changes were overwritten when I merged, I am fixing that 
up



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to