jolshan commented on a change in pull request #11170:
URL: https://github.com/apache/kafka/pull/11170#discussion_r714134701



##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -1424,6 +1428,21 @@ class ReplicaManager(val config: KafkaConfig,
           val followerTopicSet = partitionsBecomeFollower.map(_.topic).toSet
           updateLeaderAndFollowerMetrics(followerTopicSet)
 
+          if (topicIdUpdateFollowerPartitions.nonEmpty)
+            updateTopicIdForFollowers(controllerId, controllerEpoch, 
topicIdUpdateFollowerPartitions, correlationId, topicIdFromRequest)
+
+          leaderAndIsrRequest.partitionStates.forEach { partitionState =>
+            val topicPartition = new TopicPartition(partitionState.topicName, 
partitionState.partitionIndex)
+            /*
+           * If there is offline log directory, a Partition object may have 
been created by getOrCreatePartition()
+           * before getOrCreateReplica() failed to create local replica due to 
KafkaStorageException.
+           * In this case ReplicaManager.allPartitions will map this 
topic-partition to an empty Partition object.
+           * we need to map this topic-partition to OfflinePartition instead.
+           */
+            if (localLog(topicPartition).isEmpty)
+              markPartitionOffline(topicPartition)
+          }

Review comment:
       If it is no longer in trunk, I will remove it




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to