dajac commented on a change in pull request #11170:
URL: https://github.com/apache/kafka/pull/11170#discussion_r709122840



##########
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##########
@@ -491,6 +495,19 @@ abstract class AbstractFetcherThread(name: String,
     } finally partitionMapLock.unlock()
   }
 
+  def addTopicIdsToThread(partitions: Set[TopicPartition], topicIds: String => 
Option[Uuid]) = {

Review comment:
       nit: Should we prefix this method with `maybe` to indicate that it would 
set the topic id only if there is a state for the topic partition?

##########
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##########
@@ -491,6 +495,19 @@ abstract class AbstractFetcherThread(name: String,
     } finally partitionMapLock.unlock()
   }
 
+  def addTopicIdsToThread(partitions: Set[TopicPartition], topicIds: String => 
Option[Uuid]) = {
+    partitionMapLock.lockInterruptibly()
+    try {
+      partitions.foreach { tp =>
+        val currentState = partitionStates.stateValue(tp)

Review comment:
       Should we ensure that there is actually a state? It must be there but it 
might be better to be safe.

##########
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##########
@@ -459,18 +459,22 @@ abstract class AbstractFetcherThread(name: String,
    */
   private def partitionFetchState(tp: TopicPartition, initialFetchState: 
InitialFetchState, currentState: PartitionFetchState): PartitionFetchState = {
     if (currentState != null && currentState.currentLeaderEpoch == 
initialFetchState.currentLeaderEpoch) {
-      currentState
+      if (currentState.topicId.isEmpty && initialFetchState.topicId.isDefined) 
{
+        currentState.updateTopicId(initialFetchState.topicId)
+      } else {
+        currentState
+      }

Review comment:
       Is this change still necessary? 

##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -1755,6 +1762,66 @@ class ReplicaManager(val config: KafkaConfig,
     partitionsToMakeFollower
   }
 
+  private def updateTopicIdForFollowers(controllerId: Int,
+                                        controllerEpoch: Int,
+                                        partitionStates: Map[Partition, 
LeaderAndIsrPartitionState],
+                                        correlationId: Int,
+                                        topicIds: String => Option[Uuid]) : 
Set[Partition] = {
+    val traceLoggingEnabled = stateChangeLogger.isTraceEnabled
+
+    val partitionsToUpdateFollower = mutable.Set.empty[Partition]
+    try {
+      partitionStates.forKeyValue { (partition, partitionState) =>
+        val newLeaderBrokerId = partitionState.leader
+          if (metadataCache.hasAliveBroker(newLeaderBrokerId)) {
+            // Only change partition state when the leader is available
+            partitionsToUpdateFollower += partition
+          } else {
+            // The leader broker should always be present in the metadata 
cache.
+            // If not, we should record the error message and abort the 
transition process for this partition
+            stateChangeLogger.error(s"Received LeaderAndIsrRequest with 
correlation id $correlationId from " +
+              s"controller $controllerId epoch $controllerEpoch for partition 
${partition.topicPartition} " +
+              s"(last update controller epoch 
${partitionState.controllerEpoch}) " +
+              s"but cannot become follower since the new leader 
$newLeaderBrokerId is unavailable.")
+          }
+      }
+
+      if (isShuttingDown.get()) {
+        if (traceLoggingEnabled) {
+          partitionsToUpdateFollower.foreach { partition =>
+            stateChangeLogger.trace(s"Skipped the update topic ID step of the 
become-follower state " +
+              s"change with correlation id $correlationId from controller 
$controllerId epoch $controllerEpoch for " +
+              s"partition ${partition.topicPartition} with leader 
${partitionStates(partition).leader} " +
+              "since it is shutting down")
+          }
+        }
+      } else {
+        val partitionsToUpdateFollowerWithLeader = 
partitionsToUpdateFollower.map { partition =>
+          val leaderNode = partition.leaderReplicaIdOpt.flatMap(leaderId => 
metadataCache.
+            getAliveBrokerNode(leaderId, 
config.interBrokerListenerName)).getOrElse(Node.noNode())
+          val leader = new BrokerEndPoint(leaderNode.id(), leaderNode.host(), 
leaderNode.port())
+          (partition.topicPartition, BrokerAndFetcherId(leader, 
replicaFetcherManager.getFetcherId(partition.topicPartition)))

Review comment:
       It looks like that `addTopicIdsToFetcherThread` only needs the `leader`, 
the `topic-partition` (to compute the fetcher id, and the `topic id`. How about 
passing just those? I would also let the fetcher manager compute the fetcher id.

##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -1755,6 +1762,66 @@ class ReplicaManager(val config: KafkaConfig,
     partitionsToMakeFollower
   }
 
+  private def updateTopicIdForFollowers(controllerId: Int,
+                                        controllerEpoch: Int,
+                                        partitionStates: Map[Partition, 
LeaderAndIsrPartitionState],
+                                        correlationId: Int,
+                                        topicIds: String => Option[Uuid]) : 
Set[Partition] = {
+    val traceLoggingEnabled = stateChangeLogger.isTraceEnabled
+
+    val partitionsToUpdateFollower = mutable.Set.empty[Partition]
+    try {
+      partitionStates.forKeyValue { (partition, partitionState) =>
+        val newLeaderBrokerId = partitionState.leader
+          if (metadataCache.hasAliveBroker(newLeaderBrokerId)) {
+            // Only change partition state when the leader is available
+            partitionsToUpdateFollower += partition
+          } else {
+            // The leader broker should always be present in the metadata 
cache.
+            // If not, we should record the error message and abort the 
transition process for this partition
+            stateChangeLogger.error(s"Received LeaderAndIsrRequest with 
correlation id $correlationId from " +
+              s"controller $controllerId epoch $controllerEpoch for partition 
${partition.topicPartition} " +
+              s"(last update controller epoch 
${partitionState.controllerEpoch}) " +
+              s"but cannot become follower since the new leader 
$newLeaderBrokerId is unavailable.")
+          }
+      }
+
+      if (isShuttingDown.get()) {
+        if (traceLoggingEnabled) {
+          partitionsToUpdateFollower.foreach { partition =>
+            stateChangeLogger.trace(s"Skipped the update topic ID step of the 
become-follower state " +
+              s"change with correlation id $correlationId from controller 
$controllerId epoch $controllerEpoch for " +
+              s"partition ${partition.topicPartition} with leader 
${partitionStates(partition).leader} " +
+              "since it is shutting down")
+          }
+        }
+      } else {
+        val partitionsToUpdateFollowerWithLeader = 
partitionsToUpdateFollower.map { partition =>
+          val leaderNode = partition.leaderReplicaIdOpt.flatMap(leaderId => 
metadataCache.
+            getAliveBrokerNode(leaderId, 
config.interBrokerListenerName)).getOrElse(Node.noNode())
+          val leader = new BrokerEndPoint(leaderNode.id(), leaderNode.host(), 
leaderNode.port())
+          (partition.topicPartition, BrokerAndFetcherId(leader, 
replicaFetcherManager.getFetcherId(partition.topicPartition)))
+        }
+        
replicaFetcherManager.addTopicIdsToFetcherThread(partitionsToUpdateFollowerWithLeader,
 topicIds)

Review comment:
       I have the impression that we could almost remove everything in this 
method but this block of code. All test rest seems redundant to me and/or 
misleading. We could still keep the `isShuttingDown.get()` check though.

##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -1402,6 +1404,8 @@ class ReplicaManager(val config: KafkaConfig,
                     stateChangeLogger.info(s"Updating log for $topicPartition 
to assign topic ID " +
                       s"$topicId from LeaderAndIsr request from controller 
$controllerId with correlation " +
                       s"id $correlationId epoch $controllerEpoch")
+                    if (partitionState.leader != localBrokerId)
+                      topicIdUpdatePartitions.put(partition, partitionState)

Review comment:
       nit: Should we rename `topicIdUpdatePartitions` to indicate that it is 
only about the followers?

##########
File path: core/src/main/scala/kafka/server/AbstractFetcherManager.scala
##########
@@ -163,6 +163,17 @@ abstract class AbstractFetcherManager[T <: 
AbstractFetcherThread](val name: Stri
     info(s"Added fetcher to broker ${fetcherThread.sourceBroker.id} for 
partitions $initialOffsetAndEpochs")
   }
 
+  def addTopicIdsToFetcherThread(partitionsToUpdate: Set[(TopicPartition, 
BrokerAndFetcherId)], topicIds: String => Option[Uuid]): Unit = {
+    lock synchronized {
+      val partitionsPerFetcher = partitionsToUpdate.groupMap(_._2)(_._1)
+
+      for ((brokerAndFetcherId, partitions) <- partitionsPerFetcher) {
+        val brokerIdAndFetcherId = 
BrokerIdAndFetcherId(brokerAndFetcherId.broker.id, brokerAndFetcherId.fetcherId)

Review comment:
       It is a bit weird that you have to recreate `BrokerIdAndFetcherId` from 
`BrokerIdAndFetcherId` here.

##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -741,7 +741,8 @@ class ReplicaManager(val config: KafkaConfig,
 
           // throw NotLeaderOrFollowerException if replica does not exist for 
the given partition
           val partition = getPartitionOrException(topicPartition)
-          partition.localLogOrException

Review comment:
       This is weird... Do you know why it was here?

##########
File path: core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
##########
@@ -76,7 +76,8 @@ class ReplicaAlterLogDirsThread(name: String,
     var partitionData: Seq[(TopicPartition, FetchData)] = null
     val request = fetchRequest.build()
 
-    val (topicIds, topicNames) = replicaMgr.metadataCache.topicIdInfo()
+    val topicIds = request.data().topics().asScala.map { topic => 
(topic.topic(), topic.topicId()) }.toMap

Review comment:
       nit: We could remove some parenthesis here and we usually use `map` with 
parenthesis instead of the curly braces when inline: `map { topic => ... }` -> 
`map(topic=> ....)`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to