jolshan commented on a change in pull request #11331: URL: https://github.com/apache/kafka/pull/11331#discussion_r745943714
########## File path: core/src/test/scala/unit/kafka/server/FetchSessionTest.scala ########## @@ -877,34 +921,182 @@ class FetchSessionTest { // Create an incremental fetch request as though no topics changed. val reqData2 = new util.LinkedHashMap[TopicPartition, FetchRequest.PartitionData] - val request2 = createRequest(new JFetchMetadata(resp1.sessionId(), 1), reqData2, topicIds, EMPTY_PART_LIST, false) + val request2 = createRequest(new JFetchMetadata(resp1.sessionId(), 1), reqData2, EMPTY_PART_LIST, false) // Simulate ID changing on server. val topicNamesFooChanged = Map(topicIds.get("bar") -> "bar", Uuid.randomUuid() -> "foo").asJava - val topicIdsFooChanged = topicNamesFooChanged.asScala.map(_.swap).asJava val context2 = fetchManager.newContext( request2.version, request2.metadata, request2.isFromFollower, request2.fetchData(topicNamesFooChanged), request2.forgottenTopics(topicNamesFooChanged), - topicIdsFooChanged + topicNames ) assertEquals(classOf[IncrementalFetchContext], context2.getClass) - val respData2 = new util.LinkedHashMap[TopicPartition, FetchResponseData.PartitionData] + val respData2 = new util.LinkedHashMap[TopicIdPartition, FetchResponseData.PartitionData] // Likely if the topic ID is different in the broker, it will be different in the log. Simulate the log check finding an inconsistent ID. - respData2.put(new TopicPartition("foo", 0), new FetchResponseData.PartitionData() + respData2.put(tp0, new FetchResponseData.PartitionData() .setPartitionIndex(0) .setHighWatermark(-1) .setLastStableOffset(-1) .setLogStartOffset(-1) .setErrorCode(Errors.INCONSISTENT_TOPIC_ID.code)) val resp2 = context2.updateAndGenerateResponseData(respData2) - assertEquals(Errors.INCONSISTENT_TOPIC_ID, resp2.error) + assertEquals(Errors.NONE, resp2.error) + assertTrue(resp2.sessionId > 0) + val responseData2 = resp2.responseData(topicNames, request2.version) + // We should have the inconsistent topic ID error on the partition + assertEquals(Errors.INCONSISTENT_TOPIC_ID.code, responseData2.get(tp0.topicPartition).errorCode) + } + + def noErrorResponse: FetchResponseData.PartitionData = { + new FetchResponseData.PartitionData() + .setPartitionIndex(1) + .setHighWatermark(10) + .setLastStableOffset(10) + .setLogStartOffset(10) + } + + def errorResponse(errorCode: Short): FetchResponseData.PartitionData = { + new FetchResponseData.PartitionData() + .setPartitionIndex(0) + .setHighWatermark(-1) + .setLastStableOffset(-1) + .setLogStartOffset(-1) + .setErrorCode(errorCode) + } + + // This test either simulates an update to a partition using all possible topic ID usage combinations. + // The possible change will be found in an update from the partition. + @ParameterizedTest + @MethodSource(Array("idUsageCombinations")) + def testUpdatedPartitionResolvesId(startsWithTopicIds: Boolean, endsWithTopicIds: Boolean): Unit = { Review comment: You are correct. I can change to `startsWithTopicIdsInMetadataCache` etc if that is not too verbose. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org