jolshan commented on a change in pull request #11170:
URL: https://github.com/apache/kafka/pull/11170#discussion_r714901210



##########
File path: core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
##########
@@ -3469,4 +3472,88 @@ class ReplicaManagerTest {
       ClientQuotasImage.EMPTY
     )
   }
+
+  def assertFetcherHasTopicId[T <: AbstractFetcherThread](manager: 
AbstractFetcherManager[T],
+                                                          tp: TopicPartition,
+                                                          expectedTopicId: 
Option[Uuid]): Unit = {
+    val fetchState = manager.getFetcher(tp).flatMap(_.fetchState(tp))
+    assertTrue(fetchState.isDefined)
+    assertEquals(expectedTopicId, fetchState.get.topicId)
+  }
+
+  @Test
+  def testPartitionFetchStateUpdatesWithTopicIdAdded(): Unit = {
+    val aliveBrokersIds = Seq(0, 1)
+    val replicaManager = setupReplicaManagerWithMockedPurgatories(new 
MockTimer(time),
+      brokerId = 0, aliveBrokersIds)
+    try {
+      val tp = new TopicPartition(topic, 0)
+      val leaderAndIsr = new LeaderAndIsr(1, 0, aliveBrokersIds.toList, 0)
+
+      val leaderAndIsrRequest1 = leaderAndIsrRequest(Uuid.ZERO_UUID, tp, 
aliveBrokersIds, leaderAndIsr)
+      val leaderAndIsrResponse1 = replicaManager.becomeLeaderOrFollower(0, 
leaderAndIsrRequest1, (_, _) => ())
+      assertEquals(Errors.NONE, leaderAndIsrResponse1.error)
+
+      assertFetcherHasTopicId(replicaManager.replicaFetcherManager, tp, None)
+
+      val leaderAndIsrRequest2 = leaderAndIsrRequest(topicId, tp, 
aliveBrokersIds, leaderAndIsr)
+      val leaderAndIsrResponse2 = replicaManager.becomeLeaderOrFollower(0, 
leaderAndIsrRequest2, (_, _) => ())
+      assertEquals(Errors.NONE, leaderAndIsrResponse2.error)
+
+      assertFetcherHasTopicId(replicaManager.replicaFetcherManager, tp, 
Some(topicId))
+
+    } finally {
+      replicaManager.shutdown(checkpointHW = false)
+    }
+  }
+
+  @ParameterizedTest
+  @ValueSource(booleans = Array(true, false))
+  def testReplicaAlterLogDirsWithAndWithoutIds(usesTopicIds: Boolean): Unit = {
+    val version = if (usesTopicIds) 
LeaderAndIsrRequestData.HIGHEST_SUPPORTED_VERSION else 4.toShort
+    val topicIdRaw = if (usesTopicIds) topicId else Uuid.ZERO_UUID

Review comment:
       I had this and scala told me it was a recursive definition. Maybe I can 
use something like this.topicId in the definition to clarify the different 
topic ID usages.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to