dajac commented on a change in pull request #11170:
URL: https://github.com/apache/kafka/pull/11170#discussion_r714549045



##########
File path: 
core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala
##########
@@ -133,4 +137,75 @@ class AbstractFetcherManagerTest {
     assertEquals(0, fetcherManager.deadThreadCount)
     EasyMock.verify(fetcher)
   }
+
+  @Test
+  def testMaybeUpdateTopicIds(): Unit = {
+    val fetcher: AbstractFetcherThread = 
EasyMock.mock(classOf[AbstractFetcherThread])
+    val fetcherManager = new 
AbstractFetcherManager[AbstractFetcherThread]("fetcher-manager", 
"fetcher-manager", 2) {
+      override def createFetcherThread(fetcherId: Int, sourceBroker: 
BrokerEndPoint): AbstractFetcherThread = {
+        fetcher
+      }
+    }
+
+    val fetchOffset = 10L
+    val leaderEpoch = 15
+    val tp1 = new TopicPartition("topic", 0)
+    val tp2 = new TopicPartition("topic", 1)
+    val topicId = Some(Uuid.randomUuid())
+
+    // Start out with no topic ID.
+    val initialFetchState1 = InitialFetchState(
+      topicId = None,
+      leader = new BrokerEndPoint(0, "localhost", 9092),
+      currentLeaderEpoch = leaderEpoch,
+      initOffset = fetchOffset)
+
+    // Include a partition on a different leader
+    val initialFetchState2 = InitialFetchState(
+      topicId = None,
+      leader = new BrokerEndPoint(1, "localhost", 9092),
+      currentLeaderEpoch = leaderEpoch,
+      initOffset = fetchOffset)
+
+    val partitionsToUpdate = Map(tp1 -> initialFetchState1.leader.id, tp2 -> 
initialFetchState2.leader.id)
+    val topicIds = (_: String) => topicId
+
+    // Simulate calls to different fetchers due to different leaders
+    EasyMock.expect(fetcher.start())
+    EasyMock.expect(fetcher.start())
+
+    EasyMock.expect(fetcher.addPartitions(Map(tp1 -> initialFetchState1)))
+      .andReturn(Set(tp1))
+    EasyMock.expect(fetcher.addPartitions(Map(tp2 -> initialFetchState2)))
+      .andReturn(Set(tp2))
+
+    EasyMock.expect(fetcher.fetchState(tp1))
+      .andReturn(Some(PartitionFetchState(None, fetchOffset, None, 
leaderEpoch, Truncating, lastFetchedEpoch = None)))
+    EasyMock.expect(fetcher.fetchState(tp2))
+      .andReturn(Some(PartitionFetchState(None, fetchOffset, None, 
leaderEpoch, Truncating, lastFetchedEpoch = None)))
+
+    EasyMock.expect(fetcher.maybeUpdateTopicIds(Set(tp1), topicIds))
+    EasyMock.expect(fetcher.maybeUpdateTopicIds(Set(tp2), topicIds))
+
+    EasyMock.expect(fetcher.fetchState(tp1))
+      .andReturn(Some(PartitionFetchState(topicId, fetchOffset, None, 
leaderEpoch, Truncating, lastFetchedEpoch = None)))
+    EasyMock.expect(fetcher.fetchState(tp2))
+      .andReturn(Some(PartitionFetchState(topicId, fetchOffset, None, 
leaderEpoch, Truncating, lastFetchedEpoch = None)))
+    EasyMock.replay(fetcher)
+
+    def verifyFetchState(fetchState: Option[PartitionFetchState], 
expectedTopicId: Option[Uuid]): Unit = {
+      assertTrue(fetchState.isDefined)
+      assertEquals(expectedTopicId, fetchState.get.topicId)
+    }
+
+    fetcherManager.addFetcherForPartitions(Map(tp1 -> initialFetchState1, tp2 
-> initialFetchState2))
+    verifyFetchState(fetcher.fetchState(tp1), None)
+    verifyFetchState(fetcher.fetchState(tp2), None)
+
+    fetcherManager.maybeUpdateTopicIds(partitionsToUpdate, topicIds)
+    verifyFetchState(fetcher.fetchState(tp1), topicId)
+    verifyFetchState(fetcher.fetchState(tp2), topicId)
+
+    EasyMock.verify(fetcher)

Review comment:
       Right.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to