brenden20 commented on code in PR #16140: URL: https://github.com/apache/kafka/pull/16140#discussion_r1631524427
########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ########## @@ -211,178 +232,80 @@ public void testResetPositionsProcessFailureIsIgnored() { } @Test - public void testValidatePositionsEventIsProcessed() { - ValidatePositionsEvent e = new ValidatePositionsEvent(calculateDeadlineMs(time, 100)); - applicationEventsQueue.add(e); - consumerNetworkThread.runOnce(); - verify(applicationEventProcessor).process(any(ValidatePositionsEvent.class)); - assertTrue(applicationEventsQueue.isEmpty()); - } - - @Test - public void testAssignmentChangeEvent() { - HashMap<TopicPartition, OffsetAndMetadata> offset = mockTopicPartitionOffset(); - - final long currentTimeMs = time.milliseconds(); - ApplicationEvent e = new AssignmentChangeEvent(offset, currentTimeMs); - applicationEventsQueue.add(e); - - consumerNetworkThread.runOnce(); - verify(applicationEventProcessor).process(any(AssignmentChangeEvent.class)); - verify(networkClient, times(1)).poll(anyLong(), anyLong()); - verify(commitRequestManager, times(1)).updateAutoCommitTimer(currentTimeMs); - // Assignment change should generate an async commit (not retried). - verify(commitRequestManager, times(1)).maybeAutoCommitAsync(); - } + public void testPollResultTimer() { + NetworkClientDelegate networkClientDelegate = new NetworkClientDelegate( + time, + config, + logContext, + client + ); - @Test - void testFetchTopicMetadata() { - applicationEventsQueue.add(new TopicMetadataEvent("topic", Long.MAX_VALUE)); - consumerNetworkThread.runOnce(); - verify(applicationEventProcessor).process(any(TopicMetadataEvent.class)); + NetworkClientDelegate.UnsentRequest req = new NetworkClientDelegate.UnsentRequest( + new FindCoordinatorRequest.Builder( + new FindCoordinatorRequestData() + .setKeyType(FindCoordinatorRequest.CoordinatorType.TRANSACTION.id()) + .setKey("foobar")), + Optional.empty()); + req.setTimer(time, DEFAULT_REQUEST_TIMEOUT_MS); + + // purposely setting a non-MAX time to ensure it is returning Long.MAX_VALUE upon success + NetworkClientDelegate.PollResult success = new NetworkClientDelegate.PollResult( + 10, + Collections.singletonList(req)); + assertEquals(10, networkClientDelegate.addAll(success)); + + NetworkClientDelegate.PollResult failure = new NetworkClientDelegate.PollResult( + 10, + new ArrayList<>()); + assertEquals(10, networkClientDelegate.addAll(failure)); Review Comment: It sems some of my changes were overwritten when I merged, I am fixing that up -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org