dajac commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r743578252



##########
File path: core/src/test/scala/unit/kafka/server/FetchSessionTest.scala
##########
@@ -659,88 +670,108 @@ class FetchSessionTest {
   }
 
   @Test
-  def testIncrementalFetchSessionWithIdsWhenSessionDoesNotUseIds() : Unit = {
+  def testFetchSessionWithUnknownId(): Unit = {
     val time = new MockTime()
     val cache = new FetchSessionCache(10, 1000)
     val fetchManager = new FetchManager(time, cache)
-    val topicIds = new util.HashMap[String, Uuid]()
-    val topicNames = new util.HashMap[Uuid, String]()
+    val topicNames = Map(Uuid.randomUuid() -> "foo", Uuid.randomUuid() -> 
"bar").asJava
+    val topicIds = topicNames.asScala.map(_.swap).asJava
+    val foo0 = new TopicIdPartition(topicIds.getOrDefault("foo", 
Uuid.ZERO_UUID), new TopicPartition("foo", 0))
+    val foo1 = new TopicIdPartition(topicIds.getOrDefault("foo", 
Uuid.ZERO_UUID), new TopicPartition("foo", 1))
+    val emptyFoo0 = new TopicIdPartition(topicIds.getOrDefault("foo", 
Uuid.ZERO_UUID), new TopicPartition(null, 0))
+    val emptyFoo1 = new TopicIdPartition(topicIds.getOrDefault("foo", 
Uuid.ZERO_UUID), new TopicPartition(null, 1))
 
-    // Create a new fetch session with foo-0
+    // Create a new fetch session with foo-0 and foo-1
     val reqData1 = new util.LinkedHashMap[TopicPartition, 
FetchRequest.PartitionData]
-    reqData1.put(new TopicPartition("foo", 0), new 
FetchRequest.PartitionData(0, 0, 100,
+    reqData1.put(foo0.topicPartition, new 
FetchRequest.PartitionData(foo0.topicId, 0, 0, 100,
       Optional.empty()))
-    val request1 = createRequestWithoutTopicIds(JFetchMetadata.INITIAL, 
reqData1, topicIds, EMPTY_PART_LIST, false)
-    // Start a fetch session using a request version that does not use topic 
IDs.
+    reqData1.put(foo1.topicPartition, new 
FetchRequest.PartitionData(foo1.topicId,10, 0, 100,
+      Optional.empty()))
+    val request1 = createRequest(JFetchMetadata.INITIAL, reqData1, 
EMPTY_PART_LIST, false)
+    // Simulate unknown topic ID for foo.
+    val topicNamesOnlyBar = Collections.singletonMap(topicIds.get("bar"), 
"bar")
+    // We should not throw error since we have an older request version.
     val context1 = fetchManager.newContext(
       request1.version,
       request1.metadata,
       request1.isFromFollower,
-      request1.fetchData(topicNames),
-      request1.forgottenTopics(topicNames),
-      topicIds
+      request1.fetchData(topicNamesOnlyBar),
+      request1.forgottenTopics(topicNamesOnlyBar),
+      topicNamesOnlyBar
     )
     assertEquals(classOf[FullFetchContext], context1.getClass)
-    val respData1 = new util.LinkedHashMap[TopicPartition, 
FetchResponseData.PartitionData]
-    respData1.put(new TopicPartition("foo", 0), new 
FetchResponseData.PartitionData()
+    val respData1 = new util.LinkedHashMap[TopicIdPartition, 
FetchResponseData.PartitionData]

Review comment:
       Yeah, I meant exactly that. How about using `assertPartitionsOrder` 
helper? The assertion would be more complete.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to