dajac commented on a change in pull request #11170:
URL: https://github.com/apache/kafka/pull/11170#discussion_r710837733



##########
File path: core/src/main/scala/kafka/server/AbstractFetcherManager.scala
##########
@@ -163,6 +163,16 @@ abstract class AbstractFetcherManager[T <: 
AbstractFetcherThread](val name: Stri
     info(s"Added fetcher to broker ${fetcherThread.sourceBroker.id} for 
partitions $initialOffsetAndEpochs")
   }
 
+  def addTopicIdsToFetcherThread(partitionsToUpdate: Set[(TopicPartition, 
Int)], topicIds: String => Option[Uuid]): Unit = {

Review comment:
       1) Should we called this one `maybeUpdateTopicIds`? Using `add` feels a 
bit awkward for the topic id field. Also, the `ToFetcherThread` is not 
necessary in my opinion because we already know that the fetcher manager is all 
about managing fetcher threads.
   
   2) Could we use a `Map` instead of `Set`? `Map` feels a bit more natural for 
a key-value pair mapping.
   
   3) Could we add a small scaladoc to explain the parameters?

##########
File path: 
core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala
##########
@@ -20,7 +20,7 @@ import com.yammer.metrics.core.Gauge
 import kafka.cluster.BrokerEndPoint
 import kafka.metrics.KafkaYammerMetrics
 import kafka.utils.TestUtils
-import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.{TopicPartition, Uuid}

Review comment:
       Should we add few unit tests for the newly added methods in this suite 
and in `AbstractFetcherThreadTest` ?

##########
File path: core/src/main/scala/kafka/server/AbstractFetcherManager.scala
##########
@@ -163,6 +163,16 @@ abstract class AbstractFetcherManager[T <: 
AbstractFetcherThread](val name: Stri
     info(s"Added fetcher to broker ${fetcherThread.sourceBroker.id} for 
partitions $initialOffsetAndEpochs")
   }
 
+  def addTopicIdsToFetcherThread(partitionsToUpdate: Set[(TopicPartition, 
Int)], topicIds: String => Option[Uuid]): Unit = {
+    lock synchronized {
+      val partitionsPerFetcher = 
partitionsToUpdate.groupMap(partitionsToUpdate => 
BrokerIdAndFetcherId(partitionsToUpdate._2, 
getFetcherId(partitionsToUpdate._1)))(_._1)

Review comment:
       We usually avoid using `._1` or `._2` and would rather prefer to 
deconstruct the tuple in order to name the argument: `groupMap { case (tp, 
leader) => ....`.
   
   Does `groupMap` work with all the versions of Scala that we use?

##########
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##########
@@ -491,6 +491,21 @@ abstract class AbstractFetcherThread(name: String,
     } finally partitionMapLock.unlock()
   }
 
+  def maybeAddTopicIdsToThread(partitions: Set[TopicPartition], topicIds: 
String => Option[Uuid]) = {

Review comment:
       If we rename `addTopicIdsToFetcherThread`, we should also rename this 
one to follow the same naming.

##########
File path: core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
##########
@@ -281,14 +281,14 @@ class ReplicaAlterLogDirsThread(name: String,
     val fetchRequestOpt = if (requestMap.isEmpty) {
       None
     } else {
-      val version: Short = if (topicIds.containsKey(tp.topic()))
+      val version: Short = if (fetchState.topicId.isEmpty)
         12
       else
         ApiKeys.FETCH.latestVersion
       // Set maxWait and minBytes to 0 because the response should return 
immediately if
       // the future log has caught up with the current log of the partition
       val requestBuilder = FetchRequest.Builder.forReplica(version, replicaId, 
0, 0, requestMap,
-        topicIds).setMaxBytes(maxBytes)
+        Collections.singletonMap(tp.topic(), 
fetchState.topicId.getOrElse(Uuid.ZERO_UUID))).setMaxBytes(maxBytes)

Review comment:
       nit: `topic()` -> `topic`.

##########
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##########
@@ -491,6 +491,21 @@ abstract class AbstractFetcherThread(name: String,
     } finally partitionMapLock.unlock()
   }
 
+  def maybeAddTopicIdsToThread(partitions: Set[TopicPartition], topicIds: 
String => Option[Uuid]) = {
+    partitionMapLock.lockInterruptibly()
+    try {
+      partitions.foreach { tp =>
+        val currentState = partitionStates.stateValue(tp)
+        if (currentState != null) {
+          val updatedState = currentState.updateTopicId(topicIds(tp.topic))
+          partitionStates.updateAndMoveToEnd(tp, updatedState)

Review comment:
       I think that we should use `update` here instead of 
`updateAndMoveToEnd`. `updateAndMoveToEnd` will basically deprioritize the 
partition and I don't think that we want to do this when we only update its 
topic id.

##########
File path: core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
##########
@@ -2810,4 +2811,132 @@ class ReplicaManagerTest {
           Replicas.NONE, Replicas.NONE, 2, 123, 456)))),
     replicaManager.calculateDeltaChanges(TEST_DELTA))
   }
+
+  @Test
+  def testPartitionFetchStateUpdatesWithTopicIdAdded(): Unit = {
+    val aliveBrokersIds = Seq(0, 1)
+    val replicaManager = setupReplicaManagerWithMockedPurgatories(new 
MockTimer(time),
+      brokerId = 0, aliveBrokersIds)
+    try {
+      val tp = new TopicPartition(topic, 0)
+      val replicas = aliveBrokersIds.toList.map(Int.box).asJava
+
+      def leaderAndIsrRequest(epoch: Int, topicIds: util.Map[String, Uuid]): 
LeaderAndIsrRequest =

Review comment:
       You use this method in the two tests. Should we pull it out into an 
helper method? Also, it might be worth checking if we already have such helper 
in the class.

##########
File path: core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
##########
@@ -2810,4 +2811,132 @@ class ReplicaManagerTest {
           Replicas.NONE, Replicas.NONE, 2, 123, 456)))),
     replicaManager.calculateDeltaChanges(TEST_DELTA))
   }
+
+  @Test
+  def testPartitionFetchStateUpdatesWithTopicIdAdded(): Unit = {
+    val aliveBrokersIds = Seq(0, 1)
+    val replicaManager = setupReplicaManagerWithMockedPurgatories(new 
MockTimer(time),
+      brokerId = 0, aliveBrokersIds)
+    try {
+      val tp = new TopicPartition(topic, 0)
+      val replicas = aliveBrokersIds.toList.map(Int.box).asJava
+
+      def leaderAndIsrRequest(epoch: Int, topicIds: util.Map[String, Uuid]): 
LeaderAndIsrRequest =
+        new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 
0, 0, brokerEpoch,
+          Seq(new LeaderAndIsrPartitionState()
+            .setTopicName(topic)
+            .setPartitionIndex(0)
+            .setControllerEpoch(0)
+            .setLeader(1)
+            .setLeaderEpoch(epoch)
+            .setIsr(replicas)
+            .setZkVersion(0)
+            .setReplicas(replicas)
+            .setIsNew(true)).asJava,
+          topicIds,
+          Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
+
+      val leaderAndIsrRequest1 = leaderAndIsrRequest(0, Collections.emptyMap())
+      val leaderAndIsrResponse = replicaManager.becomeLeaderOrFollower(0, 
leaderAndIsrRequest1, (_, _) => ())

Review comment:
       nit: As `leaderAndIsrRequest1` is only used one, I would be tempted to 
inline it directly. You can break the line if it becomes too long.

##########
File path: core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
##########
@@ -2810,4 +2811,132 @@ class ReplicaManagerTest {
           Replicas.NONE, Replicas.NONE, 2, 123, 456)))),
     replicaManager.calculateDeltaChanges(TEST_DELTA))
   }
+
+  @Test
+  def testPartitionFetchStateUpdatesWithTopicIdAdded(): Unit = {
+    val aliveBrokersIds = Seq(0, 1)
+    val replicaManager = setupReplicaManagerWithMockedPurgatories(new 
MockTimer(time),
+      brokerId = 0, aliveBrokersIds)
+    try {
+      val tp = new TopicPartition(topic, 0)
+      val replicas = aliveBrokersIds.toList.map(Int.box).asJava
+
+      def leaderAndIsrRequest(epoch: Int, topicIds: util.Map[String, Uuid]): 
LeaderAndIsrRequest =
+        new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 
0, 0, brokerEpoch,
+          Seq(new LeaderAndIsrPartitionState()
+            .setTopicName(topic)
+            .setPartitionIndex(0)
+            .setControllerEpoch(0)
+            .setLeader(1)
+            .setLeaderEpoch(epoch)
+            .setIsr(replicas)
+            .setZkVersion(0)
+            .setReplicas(replicas)
+            .setIsNew(true)).asJava,
+          topicIds,
+          Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
+
+      val leaderAndIsrRequest1 = leaderAndIsrRequest(0, Collections.emptyMap())
+      val leaderAndIsrResponse = replicaManager.becomeLeaderOrFollower(0, 
leaderAndIsrRequest1, (_, _) => ())
+      assertEquals(Errors.NONE, leaderAndIsrResponse.error)
+
+      val fetchState = 
replicaManager.replicaFetcherManager.getFetcher(tp).flatMap(fetcher => 
fetcher.fetchState(tp))
+      assertTrue(fetchState.isDefined)
+      assertEquals(None, fetchState.get.topicId)

Review comment:
       nit: This assertion is used multiple times in the newly added tests. How 
about extracting it into a small helper method? e.g. 
`assertFetcherHasTopicId(manager, tp, expectedTopicId)` or something like this.

##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -1755,6 +1762,40 @@ class ReplicaManager(val config: KafkaConfig,
     partitionsToMakeFollower
   }
 
+  private def updateTopicIdForFollowers(controllerId: Int,
+                                        controllerEpoch: Int,
+                                        partitionStates: Set[Partition],
+                                        correlationId: Int,
+                                        topicIds: String => Option[Uuid]): 
Unit = {
+    val traceLoggingEnabled = stateChangeLogger.isTraceEnabled
+
+    try {
+      if (isShuttingDown.get()) {
+        if (traceLoggingEnabled) {
+          partitionStates.foreach { partition =>
+            stateChangeLogger.trace(s"Skipped the update topic ID step of the 
become-follower state " +
+              s"change with correlation id $correlationId from controller 
$controllerId epoch $controllerEpoch for " +
+              s"partition ${partition.topicPartition} since it is shutting 
down")
+          }
+        }
+      } else {
+        val partitionsToUpdateFollowerWithLeader = partitionStates.map { 
partition =>
+          val leaderNode = partition.leaderReplicaIdOpt.flatMap(leaderId => 
metadataCache.
+            getAliveBrokerNode(leaderId, 
config.interBrokerListenerName)).getOrElse(Node.noNode())
+          val leaderId = leaderNode.id

Review comment:
       So, you take the leader id from the partition, then you lookup the node 
from the metadata cache, finally you get the leader id from the node. I guess 
you could use the leader id directly. :)
   
   Initially, I thought that you would put the 
`metadataCache.hasAliveBroker(partitionState.leader)` check here instead of 
doing it at L1407. It does not matter much but doing it here would be a bit 
more aligned with the logic in the `makeFollowers` method.

##########
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##########
@@ -491,6 +491,21 @@ abstract class AbstractFetcherThread(name: String,
     } finally partitionMapLock.unlock()
   }
 
+  def maybeAddTopicIdsToThread(partitions: Set[TopicPartition], topicIds: 
String => Option[Uuid]) = {
+    partitionMapLock.lockInterruptibly()
+    try {
+      partitions.foreach { tp =>
+        val currentState = partitionStates.stateValue(tp)
+        if (currentState != null) {
+          val updatedState = currentState.updateTopicId(topicIds(tp.topic))
+          partitionStates.updateAndMoveToEnd(tp, updatedState)
+        }
+      }
+

Review comment:
       nit: We could remove this empty line.

##########
File path: core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
##########
@@ -2810,4 +2811,132 @@ class ReplicaManagerTest {
           Replicas.NONE, Replicas.NONE, 2, 123, 456)))),
     replicaManager.calculateDeltaChanges(TEST_DELTA))
   }
+
+  @Test
+  def testPartitionFetchStateUpdatesWithTopicIdAdded(): Unit = {
+    val aliveBrokersIds = Seq(0, 1)
+    val replicaManager = setupReplicaManagerWithMockedPurgatories(new 
MockTimer(time),
+      brokerId = 0, aliveBrokersIds)
+    try {
+      val tp = new TopicPartition(topic, 0)
+      val replicas = aliveBrokersIds.toList.map(Int.box).asJava
+
+      def leaderAndIsrRequest(epoch: Int, topicIds: util.Map[String, Uuid]): 
LeaderAndIsrRequest =
+        new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 
0, 0, brokerEpoch,
+          Seq(new LeaderAndIsrPartitionState()
+            .setTopicName(topic)
+            .setPartitionIndex(0)
+            .setControllerEpoch(0)
+            .setLeader(1)
+            .setLeaderEpoch(epoch)
+            .setIsr(replicas)
+            .setZkVersion(0)
+            .setReplicas(replicas)
+            .setIsNew(true)).asJava,
+          topicIds,
+          Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
+
+      val leaderAndIsrRequest1 = leaderAndIsrRequest(0, Collections.emptyMap())
+      val leaderAndIsrResponse = replicaManager.becomeLeaderOrFollower(0, 
leaderAndIsrRequest1, (_, _) => ())
+      assertEquals(Errors.NONE, leaderAndIsrResponse.error)
+
+      val fetchState = 
replicaManager.replicaFetcherManager.getFetcher(tp).flatMap(fetcher => 
fetcher.fetchState(tp))
+      assertTrue(fetchState.isDefined)
+      assertEquals(None, fetchState.get.topicId)
+
+      val leaderAndIsrRequest2 = leaderAndIsrRequest(0, topicIds.asJava)
+      val leaderAndIsrResponse2 = replicaManager.becomeLeaderOrFollower(0, 
leaderAndIsrRequest2, (_, _) => ())
+      assertEquals(Errors.NONE, leaderAndIsrResponse2.error)
+
+      val fetchState2 = 
replicaManager.replicaFetcherManager.getFetcher(tp).flatMap(fetcher => 
fetcher.fetchState(tp))
+      assertTrue(fetchState2.isDefined)
+      assertEquals(Some(topicId), fetchState2.get.topicId)
+
+    } finally {
+      replicaManager.shutdown(checkpointHW = false)
+    }
+  }
+
+  @Test
+  def testReplicaAlterLogDirsWithAndWithoutIds(): Unit = {

Review comment:
       Does this test work? I have the impression that we never update the 
`ReplicaAlterLogDirsManager`.

##########
File path: core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
##########
@@ -76,7 +76,8 @@ class ReplicaAlterLogDirsThread(name: String,
     var partitionData: Seq[(TopicPartition, FetchData)] = null
     val request = fetchRequest.build()
 
-    val (topicIds, topicNames) = replicaMgr.metadataCache.topicIdInfo()
+    val topicIds = request.data().topics().asScala.map(topic => (topic.topic, 
topic.topicId)).toMap
+    val topicNames = topicIds.map(_.swap)

Review comment:
       1) Is it fine to rely on the request to construct the topic to topicId 
mapping? I suppose that it is OK here because the request was populated with 
both, right? If this is correct, is it worth adding a small comment here?
   
   2) nit: We could omit few parenthesis here (e.g. `data`, `topics`).
   
   3) Is it worth using two mutable Maps which would be populated while 
iterating over the `topics`? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to