Daniel Urban created KAFKA-14667: ------------------------------------ Summary: Delayed leader election operation gets stuck in purgatory Key: KAFKA-14667 URL: https://issues.apache.org/jira/browse/KAFKA-14667 Project: Kafka Issue Type: Bug Affects Versions: 3.1.1 Reporter: Daniel Urban
This was observer with Kafka 3.1.1, but I believe that latest versions are also affected. In the Cruise Control project, there is an integration test: com.linkedin.kafka.cruisecontrol.executor.ExecutorTest#testReplicaReassignmentProgressWithThrottle On our infrastructure, this test fails every ~20th run with a timeout - the triggered preferred leadership election is never completed. After some investigation, it turns out that: # The admin client never gets a response from the broker. # The leadership change is executed successfully. # The ElectLeader purgatory never gets an update for the relevant topic partition. A few relevant lines from a failed run (this test uses an embedded cluster, logs are mixed): CC successfully sends a preferred election request to the controller (broker 0), topic1-0 needs a leadership change from broker 0 to broker 1: {code:java} 2023-02-01 01:20:26.028 [controller-event-thread] DEBUG kafka.controller.KafkaController - [Controller id=0] Waiting for any successful result for election type (PREFERRED) by AdminClientTriggered for partitions: Map(topic1-0 -> Right(1), topic0-0 -> Left(ApiError(error=ELECTION_NOT_NEEDED, message=Leader election not needed for topic partition.))) 2023-02-01 01:20:26.031 [controller-event-thread] DEBUG kafka.server.DelayedElectLeader - tryComplete() waitingPartitions: HashMap(topic1-0 -> 1) {code} The delayed operation for the leader election is triggered 2 times in quick succession (yes, same ms in both logs): {code:java} 2023-02-01 01:20:26.031 [controller-event-thread] DEBUG kafka.server.DelayedElectLeader - tryComplete() waitingPartitions: HashMap(topic1-0 -> 1) 2023-02-01 01:20:26.031 [controller-event-thread] DEBUG kafka.server.DelayedElectLeader - tryComplete() waitingPartitions: HashMap(topic1-0 -> 1){code} Shortly after (few ms later based on the logs), broker 0 receives an UpdateMetadataRequest from the controller (itself) and processes it: {code:java} 2023-02-01 01:20:26.033 [Controller-0-to-broker-0-send-thread] DEBUG org.apache.kafka.clients.NetworkClient - [Controller id=0, targetBrokerId=0] Sending UPDATE_METADATA request with header RequestHeader(apiKey=UPDATE_METADATA, apiVersion=7, clientId=0, correlationId=19) and timeout 30000 to node 0: UpdateMetadataRequestData(controllerId=0, controllerEpoch=1, brokerEpoch=25, ungroupedPartitionStates=[], topicStates=[UpdateMetadataTopicState(topicName='topic1', topicId=gkFP8VnkSGyEf_LBBZSowQ, partitionStates=[UpdateMetadataPartitionState(topicName='topic1', partitionIndex=0, controllerEpoch=1, leader=1, leaderEpoch=2, isr=[0, 1], zkVersion=2, replicas=[1, 0], offlineReplicas=[])])], liveBrokers=[UpdateMetadataBroker(id=1, v0Host='', v0Port=0, endpoints=[UpdateMetadataEndpoint(port=40236, host='localhost', listener='PLAINTEXT', securityProtocol=0)], rack=null), UpdateMetadataBroker(id=0, v0Host='', v0Port=0, endpoints=[UpdateMetadataEndpoint(port=42556, host='localhost', listener='PLAINTEXT', securityProtocol=0)], rack=null)]) 2023-02-01 01:20:26.035 [Controller-0-to-broker-0-send-thread] DEBUG org.apache.kafka.clients.NetworkClient - [Controller id=0, targetBrokerId=0] Received UPDATE_METADATA response from node 0 for request with header RequestHeader(apiKey=UPDATE_METADATA, apiVersion=7, clientId=0, correlationId=19): UpdateMetadataResponseData(errorCode=0) 2023-02-01 01:20:26.035 [data-plane-kafka-network-thread-0-ListenerName(PLAINTEXT)-PLAINTEXT-0] DEBUG kafka.request.logger - Completed request:{"isForwarded":false,"requestHeader":{"requestApiKey":6,"requestApiVersion":7,"correlationId":19,"clientId":"0","requestApiKeyName":"UPDATE_METADATA"},"request":{"controllerId":0,"controllerEpoch":1,"brokerEpoch":25,"topicStates":[{"topicName":"topic1","topicId":"gkFP8VnkSGyEf_LBBZSowQ","partitionStates":[{"partitionIndex":0,"controllerEpoch":1,"leader":1,"leaderEpoch":2,"isr":[0,1],"zkVersion":2,"replicas":[1,0],"offlineReplicas":[]}]}],"liveBrokers":[{"id":1,"endpoints":[{"port":40236,"host":"localhost","listener":"PLAINTEXT","securityProtocol":0}],"rack":null},{"id":0,"endpoints":[{"port":42556,"host":"localhost","listener":"PLAINTEXT","securityProtocol":0}],"rack":null}]},"response":{"errorCode":0},"connection":"127.0.0.1:42556-127.0.0.1:55952-0","totalTimeMs":1.904,"requestQueueTimeMs":0.108,"localTimeMs":0.788,"remoteTimeMs":0.0,"throttleTimeMs":0,"responseQueueTimeMs":0.842,"sendTimeMs":0.164,"securityProtocol":"PLAINTEXT","principal":"User:ANONYMOUS","listener":"PLAINTEXT","clientInformation":{"softwareName":"unknown","softwareVersion":"unknown"}} {code} The update metadata request should trigger an update on the ElectLeader purgatory, and we should see a log line like this: "Request key X unblocked Y ElectLeader." In the failing test, this last line never appears. In successful tests, it appears. I believe that kafka.server.KafkaApis#handleUpdateMetadataRequest, kafka.server.ReplicaManager#hasDelayedElectionOperations and kafka.server.DelayedOperationPurgatory#tryCompleteElseWatch have a concurrency issue. handleUpdateMetadataRequest calls hasDelayedElectionOperations which doesn't lock on the state of the purgatory: {code:java} if (replicaManager.hasDelayedElectionOperations) { updateMetadataRequest.partitionStates.forEach { partitionState => val tp = new TopicPartition(partitionState.topicName, partitionState.partitionIndex) replicaManager.tryCompleteElection(TopicPartitionOperationKey(tp)) } } {code} Since the "Request key X unblocked Y ElectLeader." log never appears in the failed run, but the request processing finishes (so it is not a deadlock in the request handler), it is safe to assume that handleUpdateMetadataRequest never enters the then branch. I don't have an exact scenario how can this happen (a concurrent metadata update and a delayed elect leader operation are not "syncing" up), but this definitely looks like a concurrency problem. -- This message was sent by Atlassian Jira (v8.20.10#820010)