lianetm commented on code in PR #16140:
URL: https://github.com/apache/kafka/pull/16140#discussion_r1631549092


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##########
@@ -72,68 +64,98 @@
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 public class ConsumerNetworkThreadTest {
-
-    private ConsumerTestBuilder testBuilder;
-    private Time time;
-    private ConsumerMetadata metadata;
-    private NetworkClientDelegate networkClient;
-    private BlockingQueue<ApplicationEvent> applicationEventsQueue;
-    private ApplicationEventProcessor applicationEventProcessor;
-    private OffsetsRequestManager offsetsRequestManager;
-    private CommitRequestManager commitRequestManager;
-    private CoordinatorRequestManager coordinatorRequestManager;
-    private ConsumerNetworkThread consumerNetworkThread;
-    private final CompletableEventReaper applicationEventReaper = 
mock(CompletableEventReaper.class);
-    private MockClient client;
-
-    @BeforeEach
-    public void setup() {
-        testBuilder = new ConsumerTestBuilder(createDefaultGroupInformation());
-        time = testBuilder.time;
-        metadata = testBuilder.metadata;
-        networkClient = testBuilder.networkClientDelegate;
-        client = testBuilder.client;
-        applicationEventsQueue = testBuilder.applicationEventQueue;
-        applicationEventProcessor = testBuilder.applicationEventProcessor;
-        commitRequestManager = 
testBuilder.commitRequestManager.orElseThrow(IllegalStateException::new);
-        offsetsRequestManager = testBuilder.offsetsRequestManager;
-        coordinatorRequestManager = 
testBuilder.coordinatorRequestManager.orElseThrow(IllegalStateException::new);
-        consumerNetworkThread = new ConsumerNetworkThread(
-                testBuilder.logContext,
+    static final int DEFAULT_HEARTBEAT_INTERVAL_MS = 1000;
+
+    private final Time time;
+    private final BlockingQueue<ApplicationEvent> applicationEventsQueue;
+    private final ApplicationEventProcessor applicationEventProcessor;
+    private final OffsetsRequestManager offsetsRequestManager;
+    private final HeartbeatRequestManager heartbeatRequestManager;
+    private final CoordinatorRequestManager coordinatorRequestManager;
+    private final ConsumerNetworkThread consumerNetworkThread;
+    private final NetworkClientDelegate networkClientDelegate;
+    private final RequestManagers requestManagers;
+    private final CompletableEventReaper applicationEventReaper;
+
+    ConsumerNetworkThreadTest() {
+        this.networkClientDelegate = mock(NetworkClientDelegate.class);
+        this.requestManagers = mock(RequestManagers.class);
+        this.offsetsRequestManager = mock(OffsetsRequestManager.class);
+        this.heartbeatRequestManager = mock(HeartbeatRequestManager.class);
+        this.coordinatorRequestManager = mock(CoordinatorRequestManager.class);
+        this.applicationEventProcessor = mock(ApplicationEventProcessor.class);
+        this.applicationEventReaper = mock(CompletableEventReaper.class);
+        this.time = new MockTime();
+        this.applicationEventsQueue = new LinkedBlockingQueue<>();
+        LogContext logContext = new LogContext();
+
+        this.consumerNetworkThread = new ConsumerNetworkThread(
+                logContext,
                 time,
-                testBuilder.applicationEventQueue,
+                applicationEventsQueue,
                 applicationEventReaper,
                 () -> applicationEventProcessor,
-                () -> testBuilder.networkClientDelegate,
-                () -> testBuilder.requestManagers
+                () -> networkClientDelegate,
+                () -> requestManagers
         );
+    }
+
+    @BeforeEach
+    public void setup() {
         consumerNetworkThread.initializeResources();
     }
 
     @AfterEach
     public void tearDown() {
-        if (testBuilder != null) {
-            testBuilder.close();
-            consumerNetworkThread.close(Duration.ZERO);
-        }
+        if (consumerNetworkThread != null)
+            consumerNetworkThread.close();
+    }
+
+    @Test
+    public void testEnsureCloseStopsRunningThread() {
+        assertTrue(consumerNetworkThread.isRunning(),
+            "ConsumerNetworkThread should start running when created");
+
+        consumerNetworkThread.close();
+        assertFalse(consumerNetworkThread.isRunning(),
+            "close() should make consumerNetworkThread.running false by 
calling closeInternal(Duration timeout)");
+    }
+
+    @ParameterizedTest
+    @ValueSource(longs = {100, 4999, 5001})
+    public void testConsumerNetworkThreadPollTimeComputations(long 
exampleTime) {
+        List<Optional<? extends RequestManager>> list = new ArrayList<>();
+        list.add(Optional.of(coordinatorRequestManager));
+        list.add(Optional.of(heartbeatRequestManager));
+
+        when(requestManagers.entries()).thenReturn(list);
+
+        NetworkClientDelegate.PollResult pollResult = new 
NetworkClientDelegate.PollResult(exampleTime);
+        NetworkClientDelegate.PollResult pollResult1 = new 
NetworkClientDelegate.PollResult(exampleTime + 100);
+
+        long t = time.milliseconds();
+        when(coordinatorRequestManager.poll(t)).thenReturn(pollResult);
+        
when(coordinatorRequestManager.maximumTimeToWait(t)).thenReturn(exampleTime);
+        when(heartbeatRequestManager.poll(t)).thenReturn(pollResult1);
+        
when(heartbeatRequestManager.maximumTimeToWait(t)).thenReturn(exampleTime + 
100);
+        
when(networkClientDelegate.addAll(pollResult)).thenReturn(pollResult.timeUntilNextPollMs);
+        
when(networkClientDelegate.addAll(pollResult1)).thenReturn(pollResult1.timeUntilNextPollMs);
+        consumerNetworkThread.runOnce();
+
+        verify(networkClientDelegate).poll(exampleTime < 5001 ? exampleTime : 
5000, time.milliseconds());

Review Comment:
   It took me a bit to understand this test, and the trick is that this 5000 is 
the MAX_POLL_TIMEOUT_MS, so would be clearer to use the constant I believe 
(here and on the param 5001 -> MAX_POLL_TIMEOUT_MS + 1).
   
   If using the constant then here would probably be clearer to have 
`Math.min(exampleTime, MAX_POLL_TIMEOUT_MS)`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to