GEORGE LI created KAFKA-13331:
---------------------------------
Summary: Slow reassignments in 2.8 because of large number of
UpdateMetadataResponseReceived(UpdateMetadataResponseData(errorCode=0),<broker.id>)
Events
Key: KAFKA-13331
URL: https://issues.apache.org/jira/browse/KAFKA-13331
Project: Kafka
Issue Type: Bug
Components: admin
Affects Versions: 3.0.0, 2.8.1
Reporter: GEORGE LI
Attachments: Screen Shot 2021-09-28 at 12.57.34 PM.png
Slowness is observed when doing reassignments on clusters with more brokers
(e.g. 80 brokers).
After investigation, it looks like the slowness is because for reassignments,
it sends the UpdateMetadataRequest to all the broker for every topic partition
affected by the reassignment (maybe some optimization can be done). e.g.
for a reassignment with batch size of 50 partitions. it will generate about
10k - 20k ControllerEventManager.EventQueueSize and the p99 EventQueueTimeMs
will be 1M. if the batch size is 100 partitions, about 40K EventQeuueSize
and 3M p99 EventQueueTimeMs. See below screen shot on the metrics.
!Screen Shot 2021-09-28 at 12.57.34 PM.png!
it takes about 10-30minutes to process 100 reassignments per batch. and 20-30
seconds for 1 reassignment per batch even the topic partition is almost empty.
Version 1.1, the reassignment is almost instant.
Looking at what is in the ControllerEventManager.EventQueue, the majority
(depends on the how many brokers in the cluster, it can be 90%+) is
{{UpdateMetadataResponseReceived(UpdateMetadataResponseData(errorCode=0),<broker.id>)}}
events. which is introduced in this commit:
{code}
commit 4e431246c31170a7f632da8edfdb9cf4f882f6ef
Author: Jason Gustafson <[email protected]>
Date: Thu Nov 21 07:41:29 2019 -0800
MINOR: Controller should log UpdateMetadata response errors (#7717)
Create a controller event for handling UpdateMetadata responses and log a
message when a response contains an error.
Reviewers: Stanislav Kozlovski <[email protected]>, Ismael
Juma <[email protected]>
{code}
Checking how the events in the ControllerEventManager are processed for
{{UpdateMetadata response}}, it seems like it's only checking whether there is
an error, and simply log the error. but it takes about 3ms - 60ms to dequeue
each event. Because it's a FIFO queue, other events were waiting in the
queue.
{code}
private def processUpdateMetadataResponseReceived(updateMetadataResponse:
UpdateMetadataResponse, brokerId: Int): Unit = {
if (!isActive) return
if (updateMetadataResponse.error != Errors.NONE) {
stateChangeLogger.error(s"Received error ${updateMetadataResponse.error}
in UpdateMetadata " +
s"response $updateMetadataResponse from broker $brokerId")
}
}
{code}
There might be more sophisticated logic for handling the UpdateMetadata
response error in the future. For current version, would it be better to
check whether the response error code is Errors.NONE before putting into the
Event Queue? e.g. I put this additional check and see the Reassignment
Performance increase dramatically on the 80 brokers cluster.
{code}
val updateMetadataRequestBuilder = new
UpdateMetadataRequest.Builder(updateMetadataRequestVersion,
controllerId, controllerEpoch, brokerEpoch, partitionStates.asJava,
liveBrokers.asJava, topicIds.asJava)
sendRequest(broker, updateMetadataRequestBuilder, (r: AbstractResponse)
=> {
val updateMetadataResponse = r.asInstanceOf[UpdateMetadataResponse]
if (updateMetadataResponse.error != Errors.NONE) { //<============
Add additional check whether the response code, if no error, which is almost
99.99% the case, skip adding updateMetadataResponse to the Event Queue.
sendEvent(UpdateMetadataResponseReceived(updateMetadataResponse,
broker))
}
})
{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)