jolshan commented on a change in pull request #11170: URL: https://github.com/apache/kafka/pull/11170#discussion_r714299871
########## File path: core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala ########## @@ -929,6 +929,29 @@ class AbstractFetcherThreadTest { fetcher.verifyLastFetchedEpoch(partition, Some(5)) } + @Test + def testMaybeUpdateTopicIds(): Unit = { + val partition = new TopicPartition("topic1", 0) + val fetcher = new MockFetcherThread + + // Start with no topic IDs + fetcher.setReplicaState(partition, MockFetcherThread.PartitionState(leaderEpoch = 0)) + fetcher.addPartitions(Map(partition -> initialFetchState(None, 0L, leaderEpoch = 0))) + + + def verifyFetchState(fetchState: Option[PartitionFetchState], expectedTopicId: Option[Uuid]): Unit = { + assertTrue(fetchState.isDefined) + assertEquals(expectedTopicId, fetchState.get.topicId) + } + + verifyFetchState(fetcher.fetchState(partition), None) + + // Add topic ID + fetcher.maybeUpdateTopicIds(Set(partition), topicName => topicIds.get(topicName)) Review comment: I can add a case where the partition doesn't exist. I thought of a potential case where the ID wasn't zero, but we don't handle that at this level (we will always write the ID regardless of the current value.) Should we change this? I'm trying to think if we will ever overwrite a value. Since this is synchronous I don't think so. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org