vamossagar12 commented on code in PR #15305:
URL: https://github.com/apache/kafka/pull/15305#discussion_r1559256180


##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java:
##########
@@ -533,6 +537,45 @@ public void testSkippingAssignmentFails() {
         verify(configStorage).snapshot();
     }
 
+    @Test
+    public void testPollTimeoutExpiry() throws InterruptedException {
+
+        when(configStorage.snapshot()).thenReturn(configState1);
+
+        
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node));
+        coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+        client.prepareResponse(joinGroupFollowerResponse(1, "member", 
"leader", Errors.NONE));
+        
client.prepareResponse(syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, 
"leader", configState1.offset(), Collections.emptyList(),
+            Collections.singletonList(taskId1x0), Errors.NONE));
+
+        
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node));
+        
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node));
+

Review Comment:
   If I don't have these 2 lines, the tests become flaky. With the 2 lines 
added, I ran all the tests in WorkerCoordinatorTest 30 times and all the tests 
passes. This is needed because sometimes in the test the connectivity with 
coordinator goes away due to session timeout and a classcast exception gets 
thrown. Adding logs for referecene:
   
   ```
   [2024-04-10 16:35:06,023] INFO Cluster ID: kafka-cluster 
(org.apache.kafka.clients.Metadata:349)
   [2024-04-10 16:35:06,028] DEBUG Sending FindCoordinator request to broker 
localhost:1969 (id: 0 rack: null) 
(org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:904)
   [2024-04-10 16:35:06,029] DEBUG Received FindCoordinator response 
ClientResponse(receivedTimeMs=1712747106023, latencyMs=0, disconnected=false, 
timedOut=false, requestHeader=RequestHeader(apiKey=FIND_COORDINATOR, 
apiVersion=4, clientId=mockClientId, correlationId=0, headerVersion=2), 
responseBody=FindCoordinatorResponseData(throttleTimeMs=0, errorCode=0, 
errorMessage='', nodeId=0, host='', port=0, 
coordinators=[Coordinator(key='test-group', nodeId=0, host='localhost', 
port=1969, errorCode=0, errorMessage='NONE')])) 
(org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:917)
   [2024-04-10 16:35:06,029] INFO Discovered group coordinator localhost:1969 
(id: 2147483647 rack: null) 
(org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:936)
   [2024-04-10 16:35:06,030] INFO Rebalance started 
(org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:242)
   [2024-04-10 16:35:06,030] DEBUG Heartbeat thread started 
(org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:1481)
   [2024-04-10 16:35:06,030] DEBUG Cooperative rebalance triggered. Keeping 
assignment null until it's explicitly revoked. 
(org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:250)
   [2024-04-10 16:35:06,030] INFO (Re-)joining group 
(org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:604)
   [2024-04-10 16:35:06,031] DEBUG Sending JoinGroup 
(JoinGroupRequestData(groupId='test-group', sessionTimeoutMs=30, 
rebalanceTimeoutMs=60, memberId='', groupInstanceId=null, 
protocolType='connect', protocols=[JoinGroupRequestProtocol(name='compatible', 
metadata=[0, 1, 0, 14, 108, 101, 97, 100, 101, 114, 85, 114, 108, 58, 56, 48, 
56, 51, 0, 0, 0, 0, 0, 0, 0, 4, -1, -1, -1, -1]), 
JoinGroupRequestProtocol(name='default', metadata=[0, 0, 0, 14, 108, 101, 97, 
100, 101, 114, 85, 114, 108, 58, 56, 48, 56, 51, 0, 0, 0, 0, 0, 0, 0, 4])], 
reason='')) to coordinator localhost:1969 (id: 2147483647 rack: null) 
(org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:617)
   [2024-04-10 16:35:06,031] DEBUG Received successful JoinGroup response: 
JoinGroupResponseData(throttleTimeMs=0, errorCode=0, generationId=1, 
protocolType=null, protocolName='default', leader='leader', 
skipAssignment=false, memberId='member', members=[]) 
(org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:645)
   [2024-04-10 16:35:06,031] DEBUG Enabling heartbeat thread 
(org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:1449)
   [2024-04-10 16:35:06,031] INFO Successfully joined group with generation 
Generation{generationId=1, memberId='member', protocol='default'} 
(org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:665)
   [2024-04-10 16:35:06,031] DEBUG Sending follower SyncGroup to coordinator 
localhost:1969 (id: 2147483647 rack: null): 
SyncGroupRequestData(groupId='test-group', generationId=1, memberId='member', 
groupInstanceId=null, protocolType='connect', protocolName='default', 
assignments=[]) 
(org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:759)
   [2024-04-10 16:35:06,032] DEBUG Received successful SyncGroup response: 
SyncGroupResponseData(throttleTimeMs=0, errorCode=0, protocolType=null, 
protocolName=null, assignment=[0, 0, 0, 0, 0, 6, 108, 101, 97, 100, 101, 114, 
0, 14, 108, 101, 97, 100, 101, 114, 85, 114, 108, 58, 56, 48, 56, 51, 0, 0, 0, 
0, 0, 0, 0, 4, 0, 0, 0, 1, 0, 10, 99, 111, 110, 110, 101, 99, 116, 111, 114, 
49, 0, 0, 0, 1, 0, 0, 0, 0]) 
(org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:826)
   [2024-04-10 16:35:06,032] INFO Successfully synced group in generation 
Generation{generationId=1, memberId='member', protocol='default'} 
(org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:842)
   [2024-04-10 16:35:06,032] DEBUG Deserialized new assignment: 
Assignment{error=0, leader='leader', leaderUrl='leaderUrl:8083', offset=4, 
connectorIds=[], taskIds=[connector1-0], revokedConnectorIds=[], 
revokedTaskIds=[], delay=0} 
(org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:200)
   [2024-04-10 16:35:06,137] INFO Group coordinator localhost:1969 (id: 
2147483647 rack: null) is unavailable or invalid due to cause: session timed 
out without receiving a heartbeat response. isDisconnected: false. Rediscovery 
will be attempted. 
(org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:999)
   [2024-04-10 16:35:06,137] INFO Requesting disconnect from last known 
coordinator localhost:1969 (id: 2147483647 rack: null) 
(org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:1012)
   [2024-04-10 16:35:06,138] DEBUG Sending FindCoordinator request to broker 
localhost:1969 (id: 0 rack: null) 
(org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:904)
   [2024-04-10 16:35:06,242] DEBUG Received FindCoordinator response 
ClientResponse(receivedTimeMs=1712747106110, latencyMs=29, disconnected=false, 
timedOut=false, requestHeader=RequestHeader(apiKey=FIND_COORDINATOR, 
apiVersion=4, clientId=mockClientId, correlationId=3, headerVersion=2), 
responseBody=HeartbeatResponseData(throttleTimeMs=0, errorCode=0)) 
(org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:917)
   [2024-04-10 16:35:06,245] ERROR Heartbeat thread failed due to unexpected 
error (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:1572)
   java.lang.ClassCastException: 
org.apache.kafka.common.requests.HeartbeatResponse cannot be cast to 
org.apache.kafka.common.requests.FindCoordinatorResponse
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:919)
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:913)
        at 
org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:206)
        at 
org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:169)
        at 
org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:129)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:616)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:428)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:313)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:322)
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:1500)
   [2024-04-10 16:35:06,249] DEBUG Heartbeat thread has closed 
(org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:1578)
   [2024-04-10 16:35:21,119] INFO Metrics scheduler closed 
(org.apache.kafka.common.metrics.Metrics:694)
   [2024-04-10 16:35:21,124] INFO Metrics reporters closed 
(org.apache.kafka.common.metrics.Metrics:704)
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to