jolshan commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r746096126



##########
File path: core/src/test/scala/unit/kafka/server/FetchSessionTest.scala
##########
@@ -877,34 +921,182 @@ class FetchSessionTest {
 
     // Create an incremental fetch request as though no topics changed.
     val reqData2 = new util.LinkedHashMap[TopicPartition, 
FetchRequest.PartitionData]
-    val request2 = createRequest(new JFetchMetadata(resp1.sessionId(), 1), 
reqData2, topicIds, EMPTY_PART_LIST, false)
+    val request2 = createRequest(new JFetchMetadata(resp1.sessionId(), 1), 
reqData2, EMPTY_PART_LIST, false)
     // Simulate ID changing on server.
     val topicNamesFooChanged =  Map(topicIds.get("bar") -> "bar", 
Uuid.randomUuid() -> "foo").asJava
-    val topicIdsFooChanged = topicNamesFooChanged.asScala.map(_.swap).asJava
     val context2 = fetchManager.newContext(
       request2.version,
       request2.metadata,
       request2.isFromFollower,
       request2.fetchData(topicNamesFooChanged),
       request2.forgottenTopics(topicNamesFooChanged),
-      topicIdsFooChanged
+      topicNames
     )
     assertEquals(classOf[IncrementalFetchContext], context2.getClass)
-    val respData2 = new util.LinkedHashMap[TopicPartition, 
FetchResponseData.PartitionData]
+    val respData2 = new util.LinkedHashMap[TopicIdPartition, 
FetchResponseData.PartitionData]
     // Likely if the topic ID is different in the broker, it will be different 
in the log. Simulate the log check finding an inconsistent ID.
-    respData2.put(new TopicPartition("foo", 0), new 
FetchResponseData.PartitionData()
+    respData2.put(tp0, new FetchResponseData.PartitionData()
       .setPartitionIndex(0)
       .setHighWatermark(-1)
       .setLastStableOffset(-1)
       .setLogStartOffset(-1)
       .setErrorCode(Errors.INCONSISTENT_TOPIC_ID.code))
     val resp2 = context2.updateAndGenerateResponseData(respData2)
 
-    assertEquals(Errors.INCONSISTENT_TOPIC_ID, resp2.error)
+    assertEquals(Errors.NONE, resp2.error)
+    assertTrue(resp2.sessionId > 0)
+    val responseData2 = resp2.responseData(topicNames, request2.version)
+    // We should have the inconsistent topic ID error on the partition
+    assertEquals(Errors.INCONSISTENT_TOPIC_ID.code, 
responseData2.get(tp0.topicPartition).errorCode)
+  }
+
+  def noErrorResponse: FetchResponseData.PartitionData = {
+    new FetchResponseData.PartitionData()
+      .setPartitionIndex(1)
+      .setHighWatermark(10)
+      .setLastStableOffset(10)
+      .setLogStartOffset(10)
+  }
+
+  def errorResponse(errorCode: Short): FetchResponseData.PartitionData  = {
+    new FetchResponseData.PartitionData()
+      .setPartitionIndex(0)
+      .setHighWatermark(-1)
+      .setLastStableOffset(-1)
+      .setLogStartOffset(-1)
+      .setErrorCode(errorCode)
+  }
+
+  // This test either simulates an update to a partition using all possible 
topic ID usage combinations.
+  // The possible change will be found in an update from the partition.
+  @ParameterizedTest
+  @MethodSource(Array("idUsageCombinations"))
+  def testUpdatedPartitionResolvesId(startsWithTopicIds: Boolean, 
endsWithTopicIds: Boolean): Unit = {
+    val time = new MockTime()
+    val cache = new FetchSessionCache(10, 1000)
+    val fetchManager = new FetchManager(time, cache)
+    val fooId = Uuid.randomUuid()
+    val topicNames = Map(fooId -> "foo").asJava
+    val tp0 = new TopicPartition("foo", 0)
+    val tidp0 = new TopicIdPartition(fooId, tp0)
+    val nullTidp0 = new TopicIdPartition(fooId, new TopicPartition(null, 
tp0.partition))
+
+    // Create a new fetch session with foo-0
+    val reqData1 = new util.LinkedHashMap[TopicPartition, 
FetchRequest.PartitionData]
+    reqData1.put(tp0, new FetchRequest.PartitionData(fooId, 0, 0, 100,
+      Optional.empty()))
+    val request1 = createRequest(JFetchMetadata.INITIAL, reqData1, 
EMPTY_PART_LIST, false)
+    val topicNamesForRequest1 = if (startsWithTopicIds) topicNames else 
Map[Uuid, String]().asJava
+    // Start a fetch session. Simulate no error or unknown topic ID depending 
on whether we used topic IDs.
+    val context1 = fetchManager.newContext(
+      request1.version,
+      request1.metadata,
+      request1.isFromFollower,
+      request1.fetchData(topicNamesForRequest1),
+      request1.forgottenTopics(topicNamesForRequest1),
+      topicNames
+    )
+    assertEquals(classOf[FullFetchContext], context1.getClass)
+    val respData1 = new util.LinkedHashMap[TopicIdPartition, 
FetchResponseData.PartitionData]
+    val errorCode1 = if (startsWithTopicIds) Errors.NONE.code else 
Errors.UNKNOWN_TOPIC_ID.code
+    val fooResponseTp1 = if (startsWithTopicIds) tidp0 else nullTidp0
+    val fooResponse1 = if (startsWithTopicIds) noErrorResponse else 
errorResponse(errorCode1)
+    respData1.put(fooResponseTp1, fooResponse1)
+    val resp1 = context1.updateAndGenerateResponseData(respData1)
+    assertEquals(Errors.NONE, resp1.error())
+    assertTrue(resp1.sessionId() != INVALID_SESSION_ID)
+    val responseData1 = resp1.responseData(topicNames, request1.version)
+    assertEquals(errorCode1, responseData1.get(tp0).errorCode)
+
+    // Create an incremental fetch request with an update to the partition.
+    val reqData2 = new util.LinkedHashMap[TopicPartition, 
FetchRequest.PartitionData]
+    reqData2.put(tp0, new FetchRequest.PartitionData(fooId, 0, 0, 100,
+      Optional.empty()))
+    val request2 = createRequest(new JFetchMetadata(resp1.sessionId(), 1), 
reqData2, EMPTY_PART_LIST, false)
+    val topicNamesForRequest2 = if (endsWithTopicIds) Map(fooId -> 
"foo").asJava else Map[Uuid, String]().asJava
+    val context2 = fetchManager.newContext(
+      request2.version,
+      request2.metadata,
+      request2.isFromFollower,
+      request2.fetchData(topicNamesForRequest2),
+      request2.forgottenTopics(topicNamesForRequest2),
+      topicNames
+    )
+    assertEquals(classOf[IncrementalFetchContext], context2.getClass)
+    val respData2 = new util.LinkedHashMap[TopicIdPartition, 
FetchResponseData.PartitionData]
+    // Likely if the topic ID is not in the broker, the partition has been 
deleted. In this case return UNKNOWN_TOPIC_OR_PARTITION
+    // If we started with unknown IDs and switched to having them, we won't 
have an error.
+    val errorCode2 = if (endsWithTopicIds) Errors.NONE.code else 
Errors.UNKNOWN_TOPIC_OR_PARTITION.code
+    val fooResponseTp2 = if (endsWithTopicIds) tidp0 else nullTidp0
+    val fooResponse2 = if (endsWithTopicIds) noErrorResponse else 
errorResponse(errorCode2)
+    respData2.put(fooResponseTp2, fooResponse2)
+    val resp2 = context2.updateAndGenerateResponseData(respData2)
+
+    assertEquals(Errors.NONE, resp2.error)
     assertTrue(resp2.sessionId > 0)
     val responseData2 = resp2.responseData(topicNames, request2.version)
-    // We should have no partition responses with this top level error.
-    assertEquals(0, responseData2.size())
+    if (startsWithTopicIds && endsWithTopicIds) {
+      // if both requests had topic IDs there was no change so we won't have a 
response
+      assertEquals(0, responseData2.size())
+    } else {
+      assertEquals(errorCode2, responseData2.get(tp0).errorCode)
+    }
+  }
+
+  // This test simulates trying to forget a topic partition with all possible 
topic ID usages for both requests.
+  @ParameterizedTest
+  @MethodSource(Array("idUsageCombinations"))
+  def testToForgetCases(startsWithTopicIds: Boolean, endsWithTopicIds: 
Boolean): Unit = {
+    val time = new MockTime()
+    val cache = new FetchSessionCache(10, 1000)
+    val fetchManager = new FetchManager(time, cache)
+    val fooId = Uuid.randomUuid()
+    val topicNames = Map(fooId -> "foo").asJava
+    val tp0 = new TopicPartition("foo", 0)
+    val tidp0 = new TopicIdPartition(fooId, tp0)
+    val nullTidp0 = new TopicIdPartition(fooId, new TopicPartition(null, 
tp0.partition))
+
+    // Create a new fetch session with foo-0
+    val reqData1 = new util.LinkedHashMap[TopicPartition, 
FetchRequest.PartitionData]
+    reqData1.put(tp0, new FetchRequest.PartitionData(fooId, 0, 0, 100,
+      Optional.empty()))
+    val request1 = createRequest(JFetchMetadata.INITIAL, reqData1, 
EMPTY_PART_LIST, false)
+    val topicNamesForRequest1 = if (startsWithTopicIds) topicNames else 
Map[Uuid, String]().asJava
+    // Start a fetch session. Start a fetch session. Simulate no error or 
unknown topic ID depending on whether we used topic IDs.
+    val context1 = fetchManager.newContext(
+      request1.version,
+      request1.metadata,
+      request1.isFromFollower,
+      request1.fetchData(topicNamesForRequest1),
+      request1.forgottenTopics(topicNamesForRequest1),
+      topicNames
+    )
+    assertEquals(classOf[FullFetchContext], context1.getClass)
+    val respData1 = new util.LinkedHashMap[TopicIdPartition, 
FetchResponseData.PartitionData]
+    val errorCode1 = if (startsWithTopicIds) Errors.NONE.code else 
Errors.UNKNOWN_TOPIC_ID.code
+    val fooResponseTp1 = if (startsWithTopicIds) tidp0 else nullTidp0
+    val fooResponse1 = if (startsWithTopicIds) noErrorResponse else 
errorResponse(errorCode1)
+    respData1.put(fooResponseTp1, fooResponse1)
+    val resp1 = context1.updateAndGenerateResponseData(respData1)
+    assertEquals(Errors.NONE, resp1.error())
+    assertTrue(resp1.sessionId() != INVALID_SESSION_ID)
+    val responseData1 = resp1.responseData(topicNames, request1.version)
+    assertEquals(errorCode1, responseData1.get(tp0).errorCode)
+
+    // Create an incremental fetch request forgetting the partition.
+    val reqData2 = new util.LinkedHashMap[TopicPartition, 
FetchRequest.PartitionData]
+    val request2 = createRequest(new JFetchMetadata(resp1.sessionId(), 1), 
reqData2, Collections.singletonList(tidp0), false)
+    val topicNamesForRequest2 = if (endsWithTopicIds) Map(fooId -> 
"foo").asJava else Map[Uuid, String]().asJava
+    val context2 = fetchManager.newContext(
+      request2.version,
+      request2.metadata,
+      request2.isFromFollower,
+      request2.fetchData(topicNamesForRequest2),
+      request2.forgottenTopics(topicNamesForRequest2),
+      topicNames
+    )
+    // If we forget the last partition, we will have a sessionless context.
+    assertEquals(classOf[SessionlessFetchContext], context2.getClass)

Review comment:
       I think the issue with that approach is it doesn't quite cover the four 
cases, right? I could keep as is, but have a second partition that just uses 
IDs and resolve that one on the second round.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to