vamossagar12 commented on code in PR #15305: URL: https://github.com/apache/kafka/pull/15305#discussion_r1559256180
########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java: ########## @@ -533,6 +537,45 @@ public void testSkippingAssignmentFails() { verify(configStorage).snapshot(); } + @Test + public void testPollTimeoutExpiry() throws InterruptedException { + + when(configStorage.snapshot()).thenReturn(configState1); + + client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, node)); + coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE)); + + client.prepareResponse(joinGroupFollowerResponse(1, "member", "leader", Errors.NONE)); + client.prepareResponse(syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, "leader", configState1.offset(), Collections.emptyList(), + Collections.singletonList(taskId1x0), Errors.NONE)); + + client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, node)); + client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, node)); + Review Comment: If I don't have these 2 lines, the tests become flaky. With the 2 lines added, I ran all the tests in WorkerCoordinatorTest 30 times and all the tests passes. This is needed because sometimes in the test the connectivity with coordinator goes away due to session timeout and a classcast exception gets thrown. Adding logs for referecene: ``` [2024-04-10 16:35:06,023] INFO Cluster ID: kafka-cluster (org.apache.kafka.clients.Metadata:349) [2024-04-10 16:35:06,028] DEBUG Sending FindCoordinator request to broker localhost:1969 (id: 0 rack: null) (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:904) [2024-04-10 16:35:06,029] DEBUG Received FindCoordinator response ClientResponse(receivedTimeMs=1712747106023, latencyMs=0, disconnected=false, timedOut=false, requestHeader=RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=4, clientId=mockClientId, correlationId=0, headerVersion=2), responseBody=FindCoordinatorResponseData(throttleTimeMs=0, errorCode=0, errorMessage='', nodeId=0, host='', port=0, coordinators=[Coordinator(key='test-group', nodeId=0, host='localhost', port=1969, errorCode=0, errorMessage='NONE')])) (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:917) [2024-04-10 16:35:06,029] INFO Discovered group coordinator localhost:1969 (id: 2147483647 rack: null) (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:936) [2024-04-10 16:35:06,030] INFO Rebalance started (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:242) [2024-04-10 16:35:06,030] DEBUG Heartbeat thread started (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:1481) [2024-04-10 16:35:06,030] DEBUG Cooperative rebalance triggered. Keeping assignment null until it's explicitly revoked. (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:250) [2024-04-10 16:35:06,030] INFO (Re-)joining group (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:604) [2024-04-10 16:35:06,031] DEBUG Sending JoinGroup (JoinGroupRequestData(groupId='test-group', sessionTimeoutMs=30, rebalanceTimeoutMs=60, memberId='', groupInstanceId=null, protocolType='connect', protocols=[JoinGroupRequestProtocol(name='compatible', metadata=[0, 1, 0, 14, 108, 101, 97, 100, 101, 114, 85, 114, 108, 58, 56, 48, 56, 51, 0, 0, 0, 0, 0, 0, 0, 4, -1, -1, -1, -1]), JoinGroupRequestProtocol(name='default', metadata=[0, 0, 0, 14, 108, 101, 97, 100, 101, 114, 85, 114, 108, 58, 56, 48, 56, 51, 0, 0, 0, 0, 0, 0, 0, 4])], reason='')) to coordinator localhost:1969 (id: 2147483647 rack: null) (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:617) [2024-04-10 16:35:06,031] DEBUG Received successful JoinGroup response: JoinGroupResponseData(throttleTimeMs=0, errorCode=0, generationId=1, protocolType=null, protocolName='default', leader='leader', skipAssignment=false, memberId='member', members=[]) (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:645) [2024-04-10 16:35:06,031] DEBUG Enabling heartbeat thread (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:1449) [2024-04-10 16:35:06,031] INFO Successfully joined group with generation Generation{generationId=1, memberId='member', protocol='default'} (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:665) [2024-04-10 16:35:06,031] DEBUG Sending follower SyncGroup to coordinator localhost:1969 (id: 2147483647 rack: null): SyncGroupRequestData(groupId='test-group', generationId=1, memberId='member', groupInstanceId=null, protocolType='connect', protocolName='default', assignments=[]) (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:759) [2024-04-10 16:35:06,032] DEBUG Received successful SyncGroup response: SyncGroupResponseData(throttleTimeMs=0, errorCode=0, protocolType=null, protocolName=null, assignment=[0, 0, 0, 0, 0, 6, 108, 101, 97, 100, 101, 114, 0, 14, 108, 101, 97, 100, 101, 114, 85, 114, 108, 58, 56, 48, 56, 51, 0, 0, 0, 0, 0, 0, 0, 4, 0, 0, 0, 1, 0, 10, 99, 111, 110, 110, 101, 99, 116, 111, 114, 49, 0, 0, 0, 1, 0, 0, 0, 0]) (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:826) [2024-04-10 16:35:06,032] INFO Successfully synced group in generation Generation{generationId=1, memberId='member', protocol='default'} (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:842) [2024-04-10 16:35:06,032] DEBUG Deserialized new assignment: Assignment{error=0, leader='leader', leaderUrl='leaderUrl:8083', offset=4, connectorIds=[], taskIds=[connector1-0], revokedConnectorIds=[], revokedTaskIds=[], delay=0} (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:200) [2024-04-10 16:35:06,137] INFO Group coordinator localhost:1969 (id: 2147483647 rack: null) is unavailable or invalid due to cause: session timed out without receiving a heartbeat response. isDisconnected: false. Rediscovery will be attempted. (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:999) [2024-04-10 16:35:06,137] INFO Requesting disconnect from last known coordinator localhost:1969 (id: 2147483647 rack: null) (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:1012) [2024-04-10 16:35:06,138] DEBUG Sending FindCoordinator request to broker localhost:1969 (id: 0 rack: null) (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:904) [2024-04-10 16:35:06,242] DEBUG Received FindCoordinator response ClientResponse(receivedTimeMs=1712747106110, latencyMs=29, disconnected=false, timedOut=false, requestHeader=RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=4, clientId=mockClientId, correlationId=3, headerVersion=2), responseBody=HeartbeatResponseData(throttleTimeMs=0, errorCode=0)) (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:917) [2024-04-10 16:35:06,245] ERROR Heartbeat thread failed due to unexpected error (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:1572) java.lang.ClassCastException: org.apache.kafka.common.requests.HeartbeatResponse cannot be cast to org.apache.kafka.common.requests.FindCoordinatorResponse at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:919) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:913) at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:206) at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:169) at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:129) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:616) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:428) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:313) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:322) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:1500) [2024-04-10 16:35:06,249] DEBUG Heartbeat thread has closed (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:1578) [2024-04-10 16:35:21,119] INFO Metrics scheduler closed (org.apache.kafka.common.metrics.Metrics:694) [2024-04-10 16:35:21,124] INFO Metrics reporters closed (org.apache.kafka.common.metrics.Metrics:704) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org