showuon commented on code in PR #15305: URL: https://github.com/apache/kafka/pull/15305#discussion_r1556799227
########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java: ########## @@ -533,6 +536,47 @@ public void testSkippingAssignmentFails() { verify(configStorage).snapshot(); } + @Test + public void testPollTimeoutExpiry() throws InterruptedException { + + when(configStorage.snapshot()).thenReturn(configState1); + + client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, node)); + coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE)); + + client.prepareResponse(joinGroupFollowerResponse(1, "member", "leader", Errors.NONE)); + client.prepareResponse(syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, "leader", configState1.offset(), Collections.emptyList(), + Collections.singletonList(taskId1x0), Errors.NONE)); + client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, node)); + coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE)); + client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, node)); + client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, node)); + + client.prepareResponse(joinGroupFollowerResponse(1, "member", "leader", Errors.NONE)); + client.prepareResponse(syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, "leader", configState1.offset(), Collections.emptyList(), + Collections.singletonList(taskId1x0), Errors.NONE)); + + try (LogCaptureAppender logCaptureAppender = LogCaptureAppender.createAndRegister(WorkerCoordinator.class)) { + coordinator.ensureActiveGroup(); + coordinator.poll(0, () -> { + return null; + }); + + long now = time.milliseconds(); + // We keep the heartbeat thread running behind the scenes and poll frequently so that eventually + // the time goes past now + rebalanceTimeoutMs which triggers poll timeout expiry. + TestUtils.waitForCondition(() -> { + time.sleep(heartbeatIntervalMs - 1); + return time.milliseconds() > now + rebalanceTimeoutMs; + }, Duration.ofMinutes(1).toMillis(), "Coordinator did not poll for rebalance.timeout.ms"); + coordinator.poll(0, () -> { Review Comment: 1. You didn't provide HeartBeatResponse, so it'll have session timeout. 2. The `heartbeatIntervalMs` is the minimum time interval the heartbeat should send, but the real timeout for heartBeat should be sessionTimeout, so we can set `sessionTimeoutMs - 1` to make the time faster to reach `rebalanceTimeoutMs`. 3. The last poll doesn't make any sense because the poll timeout should be triggered already. Why do we need it? What I would write is something like this, FYR: ``` public void testPollTimeoutExpiry() throws InterruptedException { when(configStorage.snapshot()).thenReturn(configState1); client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, node)); coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE)); client.prepareResponse(joinGroupFollowerResponse(1, "member", "leader", Errors.NONE)); client.prepareResponse(syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, "leader", configState1.offset(), Collections.emptyList(), Collections.singletonList(taskId1x0), Errors.NONE)); // prepare 3 heartBeatResponses because we will trigger 3 heartBeat requests until rebalanceTimeout, // that is (sessionTimeoutMs - 1) * 3 > rebalanceTimeoutMs client.prepareResponse(new HeartbeatResponse(new HeartbeatResponseData())); client.prepareResponse(new HeartbeatResponse(new HeartbeatResponseData())); client.prepareResponse(new HeartbeatResponse(new HeartbeatResponseData())); try (LogCaptureAppender logCaptureAppender = LogCaptureAppender.createAndRegister(WorkerCoordinator.class)) { coordinator.ensureActiveGroup(); System.out.println("!!! poll"); coordinator.poll(0, () -> { return null; }); // We keep the heartbeat thread running behind the scenes and poll frequently so that eventually // the time goes past now + rebalanceTimeoutMs which triggers poll timeout expiry. TestUtils.waitForCondition(() -> { // sleep until sessionTimeoutMs to trigger a heartBeat request to avoid session timeout. // Not sure if this will be flaky in CI because the heartbeat thread might not send out the heartBeat request in time. time.sleep(sessionTimeoutMs - 1); return logCaptureAppender.getEvents().stream().anyMatch(e -> e.getLevel().equals("WARN")) && logCaptureAppender.getEvents().stream().anyMatch(e -> e.getMessage().startsWith("worker poll timeout has expired")); }, "Coordinator did not poll for rebalance.timeout.ms"); } } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org