jolshan commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r745943714



##########
File path: core/src/test/scala/unit/kafka/server/FetchSessionTest.scala
##########
@@ -877,34 +921,182 @@ class FetchSessionTest {
 
     // Create an incremental fetch request as though no topics changed.
     val reqData2 = new util.LinkedHashMap[TopicPartition, 
FetchRequest.PartitionData]
-    val request2 = createRequest(new JFetchMetadata(resp1.sessionId(), 1), 
reqData2, topicIds, EMPTY_PART_LIST, false)
+    val request2 = createRequest(new JFetchMetadata(resp1.sessionId(), 1), 
reqData2, EMPTY_PART_LIST, false)
     // Simulate ID changing on server.
     val topicNamesFooChanged =  Map(topicIds.get("bar") -> "bar", 
Uuid.randomUuid() -> "foo").asJava
-    val topicIdsFooChanged = topicNamesFooChanged.asScala.map(_.swap).asJava
     val context2 = fetchManager.newContext(
       request2.version,
       request2.metadata,
       request2.isFromFollower,
       request2.fetchData(topicNamesFooChanged),
       request2.forgottenTopics(topicNamesFooChanged),
-      topicIdsFooChanged
+      topicNames
     )
     assertEquals(classOf[IncrementalFetchContext], context2.getClass)
-    val respData2 = new util.LinkedHashMap[TopicPartition, 
FetchResponseData.PartitionData]
+    val respData2 = new util.LinkedHashMap[TopicIdPartition, 
FetchResponseData.PartitionData]
     // Likely if the topic ID is different in the broker, it will be different 
in the log. Simulate the log check finding an inconsistent ID.
-    respData2.put(new TopicPartition("foo", 0), new 
FetchResponseData.PartitionData()
+    respData2.put(tp0, new FetchResponseData.PartitionData()
       .setPartitionIndex(0)
       .setHighWatermark(-1)
       .setLastStableOffset(-1)
       .setLogStartOffset(-1)
       .setErrorCode(Errors.INCONSISTENT_TOPIC_ID.code))
     val resp2 = context2.updateAndGenerateResponseData(respData2)
 
-    assertEquals(Errors.INCONSISTENT_TOPIC_ID, resp2.error)
+    assertEquals(Errors.NONE, resp2.error)
+    assertTrue(resp2.sessionId > 0)
+    val responseData2 = resp2.responseData(topicNames, request2.version)
+    // We should have the inconsistent topic ID error on the partition
+    assertEquals(Errors.INCONSISTENT_TOPIC_ID.code, 
responseData2.get(tp0.topicPartition).errorCode)
+  }
+
+  def noErrorResponse: FetchResponseData.PartitionData = {
+    new FetchResponseData.PartitionData()
+      .setPartitionIndex(1)
+      .setHighWatermark(10)
+      .setLastStableOffset(10)
+      .setLogStartOffset(10)
+  }
+
+  def errorResponse(errorCode: Short): FetchResponseData.PartitionData  = {
+    new FetchResponseData.PartitionData()
+      .setPartitionIndex(0)
+      .setHighWatermark(-1)
+      .setLastStableOffset(-1)
+      .setLogStartOffset(-1)
+      .setErrorCode(errorCode)
+  }
+
+  // This test either simulates an update to a partition using all possible 
topic ID usage combinations.
+  // The possible change will be found in an update from the partition.
+  @ParameterizedTest
+  @MethodSource(Array("idUsageCombinations"))
+  def testUpdatedPartitionResolvesId(startsWithTopicIds: Boolean, 
endsWithTopicIds: Boolean): Unit = {

Review comment:
       You are correct. I can change to `startsWithTopicIdsInMetadataCache` etc 
if that is not too verbose.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to