showuon commented on code in PR #15305:
URL: https://github.com/apache/kafka/pull/15305#discussion_r1577219150


##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java:
##########
@@ -533,6 +536,57 @@ public void testSkippingAssignmentFails() {
         verify(configStorage).snapshot();
     }
 
+    @Test
+    public void testPollTimeoutExpiry() throws InterruptedException {
+        // We will create a new WorkerCoordinator object with a rebalance 
timeout smaller
+        // than session timeout. This might not happen in the real world but 
it makes testing
+        // easier and the test not flaky.
+        int smallRebalanceTimeout = 20;
+        this.rebalanceConfig = new GroupRebalanceConfig(sessionTimeoutMs,
+            smallRebalanceTimeout,
+            heartbeatIntervalMs,
+            groupId,
+            Optional.empty(),
+            retryBackoffMs,
+            retryBackoffMaxMs,
+            true);
+        this.coordinator = new WorkerCoordinator(rebalanceConfig,
+            logContext,
+            consumerClient,
+            new Metrics(time),
+            "consumer" + groupId,
+            time,
+            LEADER_URL,
+            configStorage,
+            rebalanceListener,
+            compatibility,
+            0);
+
+        when(configStorage.snapshot()).thenReturn(configState1);
+
+        
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node));
+        coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+        client.prepareResponse(joinGroupFollowerResponse(1, "member", 
"leader", Errors.NONE));
+        
client.prepareResponse(syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, 
"leader", configState1.offset(), Collections.emptyList(),
+            Collections.singletonList(taskId1x0), Errors.NONE));
+
+        try (LogCaptureAppender logCaptureAppender = 
LogCaptureAppender.createAndRegister(WorkerCoordinator.class)) {
+            coordinator.ensureActiveGroup();
+            coordinator.poll(0, () -> null);
+
+            // The heartbeat thread is running and keeps sending heartbeat 
requests.
+            TestUtils.waitForCondition(() -> {
+                // Rebalance timeout elapses while poll is never invoked 
causing a poll timeout expiry
+                coordinator.sendHeartbeatRequest();
+                client.prepareResponse(new HeartbeatResponse(new 
HeartbeatResponseData()));
+                time.sleep(1);

Review Comment:
   If you set `RebalanceTimeout < sessionTimeout`, why do we need the 
heartBeatRequest? You should be able to sleep (RebalanceTimeout + 1) directly 
to get the log. Ex:
   
   ```
   try (LogCaptureAppender logCaptureAppender = 
LogCaptureAppender.createAndRegister(WorkerCoordinator.class)) {
               coordinator.ensureActiveGroup();
               coordinator.poll(0, () -> null);
   
                time.sleep(smallRebalanceTimeout + 1);
   
               // The heartbeat thread is running and keeps sending heartbeat 
requests.
               TestUtils.waitForCondition(() -> 
logCaptureAppender.getEvents().stream().anyMatch(e -> 
e.getLevel().equals("WARN")) &&
                       logCaptureAppender.getEvents().stream().anyMatch(e -> 
e.getMessage().startsWith("worker poll timeout has expired"), ...)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to