dajac commented on a change in pull request #11170:
URL: https://github.com/apache/kafka/pull/11170#discussion_r714815080



##########
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##########
@@ -862,6 +877,10 @@ case class PartitionFetchState(fetchOffset: Long,
       s", delay=${delay.map(_.delayMs).getOrElse(0)}ms" +

Review comment:
       Could we add the topic id to the `toString`?

##########
File path: core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
##########
@@ -3469,4 +3472,88 @@ class ReplicaManagerTest {
       ClientQuotasImage.EMPTY
     )
   }
+
+  def assertFetcherHasTopicId[T <: AbstractFetcherThread](manager: 
AbstractFetcherManager[T],
+                                                          tp: TopicPartition,
+                                                          expectedTopicId: 
Option[Uuid]): Unit = {
+    val fetchState = manager.getFetcher(tp).flatMap(_.fetchState(tp))
+    assertTrue(fetchState.isDefined)
+    assertEquals(expectedTopicId, fetchState.get.topicId)
+  }
+
+  @Test
+  def testPartitionFetchStateUpdatesWithTopicIdAdded(): Unit = {
+    val aliveBrokersIds = Seq(0, 1)
+    val replicaManager = setupReplicaManagerWithMockedPurgatories(new 
MockTimer(time),
+      brokerId = 0, aliveBrokersIds)
+    try {
+      val tp = new TopicPartition(topic, 0)
+      val leaderAndIsr = new LeaderAndIsr(1, 0, aliveBrokersIds.toList, 0)
+
+      val leaderAndIsrRequest1 = leaderAndIsrRequest(Uuid.ZERO_UUID, tp, 
aliveBrokersIds, leaderAndIsr)
+      val leaderAndIsrResponse1 = replicaManager.becomeLeaderOrFollower(0, 
leaderAndIsrRequest1, (_, _) => ())
+      assertEquals(Errors.NONE, leaderAndIsrResponse1.error)
+
+      assertFetcherHasTopicId(replicaManager.replicaFetcherManager, tp, None)
+
+      val leaderAndIsrRequest2 = leaderAndIsrRequest(topicId, tp, 
aliveBrokersIds, leaderAndIsr)
+      val leaderAndIsrResponse2 = replicaManager.becomeLeaderOrFollower(0, 
leaderAndIsrRequest2, (_, _) => ())
+      assertEquals(Errors.NONE, leaderAndIsrResponse2.error)
+
+      assertFetcherHasTopicId(replicaManager.replicaFetcherManager, tp, 
Some(topicId))
+
+    } finally {
+      replicaManager.shutdown(checkpointHW = false)
+    }
+  }
+
+  @ParameterizedTest
+  @ValueSource(booleans = Array(true, false))
+  def testReplicaAlterLogDirsWithAndWithoutIds(usesTopicIds: Boolean): Unit = {
+    val version = if (usesTopicIds) 
LeaderAndIsrRequestData.HIGHEST_SUPPORTED_VERSION else 4.toShort
+    val topicIdRaw = if (usesTopicIds) topicId else Uuid.ZERO_UUID

Review comment:
       nit: `topicId` instead of `topicIdRaw`?

##########
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##########
@@ -491,6 +491,21 @@ abstract class AbstractFetcherThread(name: String,
     } finally partitionMapLock.unlock()
   }
 
+  def maybeAddTopicIdsToThread(partitions: Set[TopicPartition], topicIds: 
String => Option[Uuid]) = {
+    partitionMapLock.lockInterruptibly()
+    try {
+      partitions.foreach { tp =>
+        val currentState = partitionStates.stateValue(tp)
+        if (currentState != null) {
+          val updatedState = currentState.updateTopicId(topicIds(tp.topic))
+          partitionStates.updateAndMoveToEnd(tp, updatedState)

Review comment:
       We need to address this point.

##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -1396,6 +1398,8 @@ class ReplicaManager(val config: KafkaConfig,
                     stateChangeLogger.info(s"Updating log for $topicPartition 
to assign topic ID " +
                       s"$topicId from LeaderAndIsr request from controller 
$controllerId with correlation " +
                       s"id $correlationId epoch $controllerEpoch")
+                    if (partitionState.leader != localBrokerId && 
metadataCache.hasAliveBroker(partitionState.leader))

Review comment:
       We need to address this point.

##########
File path: core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
##########
@@ -929,6 +929,29 @@ class AbstractFetcherThreadTest {
     fetcher.verifyLastFetchedEpoch(partition, Some(5))
   }
 
+  @Test
+  def testMaybeUpdateTopicIds(): Unit = {
+    val partition = new TopicPartition("topic1", 0)
+    val fetcher = new MockFetcherThread
+
+    // Start with no topic IDs
+    fetcher.setReplicaState(partition, 
MockFetcherThread.PartitionState(leaderEpoch = 0))
+    fetcher.addPartitions(Map(partition -> initialFetchState(None, 0L, 
leaderEpoch = 0)))
+
+
+    def verifyFetchState(fetchState: Option[PartitionFetchState], 
expectedTopicId: Option[Uuid]): Unit = {
+      assertTrue(fetchState.isDefined)
+      assertEquals(expectedTopicId, fetchState.get.topicId)
+    }
+
+    verifyFetchState(fetcher.fetchState(partition), None)
+
+    // Add topic ID
+    fetcher.maybeUpdateTopicIds(Set(partition), topicName => 
topicIds.get(topicName))

Review comment:
       Right. We should get here if the partition has a zero topic ID, I think.

##########
File path: core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
##########
@@ -3469,4 +3472,88 @@ class ReplicaManagerTest {
       ClientQuotasImage.EMPTY
     )
   }
+
+  def assertFetcherHasTopicId[T <: AbstractFetcherThread](manager: 
AbstractFetcherManager[T],
+                                                          tp: TopicPartition,
+                                                          expectedTopicId: 
Option[Uuid]): Unit = {
+    val fetchState = manager.getFetcher(tp).flatMap(_.fetchState(tp))
+    assertTrue(fetchState.isDefined)
+    assertEquals(expectedTopicId, fetchState.get.topicId)
+  }
+
+  @Test
+  def testPartitionFetchStateUpdatesWithTopicIdAdded(): Unit = {
+    val aliveBrokersIds = Seq(0, 1)
+    val replicaManager = setupReplicaManagerWithMockedPurgatories(new 
MockTimer(time),
+      brokerId = 0, aliveBrokersIds)
+    try {
+      val tp = new TopicPartition(topic, 0)
+      val leaderAndIsr = new LeaderAndIsr(1, 0, aliveBrokersIds.toList, 0)
+
+      val leaderAndIsrRequest1 = leaderAndIsrRequest(Uuid.ZERO_UUID, tp, 
aliveBrokersIds, leaderAndIsr)
+      val leaderAndIsrResponse1 = replicaManager.becomeLeaderOrFollower(0, 
leaderAndIsrRequest1, (_, _) => ())
+      assertEquals(Errors.NONE, leaderAndIsrResponse1.error)
+
+      assertFetcherHasTopicId(replicaManager.replicaFetcherManager, tp, None)
+
+      val leaderAndIsrRequest2 = leaderAndIsrRequest(topicId, tp, 
aliveBrokersIds, leaderAndIsr)
+      val leaderAndIsrResponse2 = replicaManager.becomeLeaderOrFollower(0, 
leaderAndIsrRequest2, (_, _) => ())
+      assertEquals(Errors.NONE, leaderAndIsrResponse2.error)
+
+      assertFetcherHasTopicId(replicaManager.replicaFetcherManager, tp, 
Some(topicId))
+
+    } finally {
+      replicaManager.shutdown(checkpointHW = false)
+    }
+  }
+
+  @ParameterizedTest
+  @ValueSource(booleans = Array(true, false))
+  def testReplicaAlterLogDirsWithAndWithoutIds(usesTopicIds: Boolean): Unit = {
+    val version = if (usesTopicIds) 
LeaderAndIsrRequestData.HIGHEST_SUPPORTED_VERSION else 4.toShort
+    val topicIdRaw = if (usesTopicIds) topicId else Uuid.ZERO_UUID
+    val topicIdOpt = if (usesTopicIds) Some(topicId) else None
+    val replicaManager = setupReplicaManagerWithMockedPurgatories(new 
MockTimer(time))
+    try {
+      val topicPartition = new TopicPartition(topic, 0)
+      val aliveBrokersIds = Seq(0, 1)
+      replicaManager.createPartition(topicPartition)
+        .createLogIfNotExists(isNew = false, isFutureReplica = false,
+          new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints), 
None)
+      val tp = new TopicPartition(topic, 0)
+      val leaderAndIsr = new LeaderAndIsr(0, 0, aliveBrokersIds.toList, 0)
+
+      val leaderAndIsrRequest1 = leaderAndIsrRequest(topicIdRaw, tp, 
aliveBrokersIds, leaderAndIsr, version = version)
+      replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) => 
())
+      val partition = replicaManager.getPartitionOrException(tp)
+      assertEquals(1, replicaManager.logManager.liveLogDirs.filterNot(_ == 
partition.log.get.dir.getParentFile).size)
+
+      // Append a couple of messages.
+      for (i <- 1 to 40) {
+        val records = TestUtils.singletonRecords(s"message $i".getBytes)
+        appendRecords(replicaManager, tp, records).onFire { response =>
+          assertEquals(Errors.NONE, response.error)
+        }
+      }
+
+      // find the live and different folder

Review comment:
       nit: Could we update the comments to start with a capital letter and to 
finish with a dot similarly to the comment above? Or, update the comment above 
to look like this one. I don't mind about the format but only about the 
consistency.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to