showuon commented on code in PR #15305: URL: https://github.com/apache/kafka/pull/15305#discussion_r1570226078
########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java: ########## @@ -533,6 +537,45 @@ public void testSkippingAssignmentFails() { verify(configStorage).snapshot(); } + @Test + public void testPollTimeoutExpiry() throws InterruptedException { + + when(configStorage.snapshot()).thenReturn(configState1); + + client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, node)); + coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE)); + + client.prepareResponse(joinGroupFollowerResponse(1, "member", "leader", Errors.NONE)); + client.prepareResponse(syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, "leader", configState1.offset(), Collections.emptyList(), + Collections.singletonList(taskId1x0), Errors.NONE)); + + client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, node)); + client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, node)); + Review Comment: What I want to say is, with your current change, this test will work like what you did before, which is not sending heartbeat response at all, and forcing session timeout directly. That works though, it's just not readable and doesn't make sense (i.e. not normal experience). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org