[
https://issues.apache.org/jira/browse/KAFKA-17804?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17892027#comment-17892027
]
kangning.li commented on KAFKA-17804:
-------------------------------------
Hi [~junrao] , I have some thoughts on this issue
The method {{completeDelayedOperationsWhenNotPartitionLeader}} has 5
purgatories currently:
# {{delayedProducePurgatory}} for produce, so we should check it whether it's
"stop replica" or "become follower replica"
# {{delayedFetchPurgatory}} fetch related
# {{delayedRemoteFetchPurgatory}} fetch related
# {{delayedRemoteListOffsetsPurgatory}} so we should check it whether it's
"stop replica" or "become follower replica"
# {{delayedShareFetchPurgatory}} fetch related
Obout fetch related, I think there are 2 cases:
# Fetch from follower replica: The target replica must be leader, so we should
check it whether it's "stop replica" or "become follower replica"
# Fetch from client: If client configuration item "client.rack" is set, we may
don't need check the purgatory; if not, we also should check it whether "stop
replica" or "become follower replica"
If the above thoughts are correct, we should not modify method
{{completeDelayedOperationsWhenNotPartitionLeader}} directly like this
private def completeDelayedOperationsWhenNotPartitionLeader(topicPartition:
TopicPartition, topicId: Option[Uuid],
isStopReplica:
Boolean): Unit = \{
val topicPartitionOperationKey = TopicPartitionOperationKey(topicPartition)
delayedProducePurgatory.checkAndComplete(topicPartitionOperationKey)
delayedRemoteListOffsetsPurgatory.checkAndComplete(topicPartitionOperationKey)
// just stop check
if (isStopReplica) {
delayedFetchPurgatory.checkAndComplete(topicPartitionOperationKey)
delayedRemoteFetchPurgatory.checkAndComplete(topicPartitionOperationKey)
if (topicId.isDefined) delayedShareFetchPurgatory.checkAndComplete(
new DelayedShareFetchPartitionKey(topicId.get,
topicPartition.partition()))
}
}
And we should modify purgatory related, check config {{{}FetchParams{}}}. If
it's from client and "client.rack" is set, then let it execute normally instead
of calling method {{{}forceComplete(){}}}. So we may need modify DelayedFetch
DelayedRemoteFetch DelayedShareFetch and add some junit tests.
I'm not sure if my thinking is correct. WDYT ?
> optimize ReplicaManager.completeDelayedOperationsWhenNotPartitionLeader
> -----------------------------------------------------------------------
>
> Key: KAFKA-17804
> URL: https://issues.apache.org/jira/browse/KAFKA-17804
> Project: Kafka
> Issue Type: Improvement
> Components: core
> Reporter: Jun Rao
> Assignee: kangning.li
> Priority: Minor
>
> Currently, ReplicaManager.completeDelayedOperationsWhenNotPartitionLeader is
> called when (1) a replica is removed from the broker and (2) a replica
> becomes a follower replica and it checks the completion of multiple
> purgatories. However, not all purgatories need to be checked in both
> situations. For example, the fetch purgatory doesn't need to be checked in
> case (2) since we support fetch from follower now.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)