jolshan commented on a change in pull request #11331: URL: https://github.com/apache/kafka/pull/11331#discussion_r743351017
########## File path: core/src/test/scala/unit/kafka/server/FetchSessionTest.scala ########## @@ -1361,102 +1542,113 @@ class FetchSessionTest { val resp4 = context2.updateAndGenerateResponseData(respData) assertEquals(Errors.NONE, resp4.error) assertEquals(resp1.sessionId, resp4.sessionId) - assertEquals(Utils.mkSet(tp1, tp2), resp4.responseData(topicNames, request2.version).keySet) + assertEquals(Utils.mkSet(tp1.topicPartition, tp2.topicPartition), resp4.responseData(topicNames, request2.version).keySet) } @Test def testDeprioritizesPartitionsWithRecordsOnly(): Unit = { val time = new MockTime() val cache = new FetchSessionCache(10, 1000) val fetchManager = new FetchManager(time, cache) - val tp1 = new TopicPartition("foo", 1) - val tp2 = new TopicPartition("bar", 2) - val tp3 = new TopicPartition("zar", 3) val topicIds = Map("foo" -> Uuid.randomUuid(), "bar" -> Uuid.randomUuid(), "zar" -> Uuid.randomUuid()).asJava val topicNames = topicIds.asScala.map(_.swap).asJava + val tp1 = new TopicIdPartition(topicIds.get("foo"), new TopicPartition("foo", 1)) + val tp2 = new TopicIdPartition(topicIds.get("bar"), new TopicPartition("bar", 2)) + val tp3 = new TopicIdPartition(topicIds.get("zar"), new TopicPartition("zar", 3)) - val reqData = new util.LinkedHashMap[TopicPartition, FetchRequest.PartitionData] - reqData.put(tp1, new FetchRequest.PartitionData(100, 0, 1000, Optional.of(5), Optional.of(4))) - reqData.put(tp2, new FetchRequest.PartitionData(100, 0, 1000, Optional.of(5), Optional.of(4))) - reqData.put(tp3, new FetchRequest.PartitionData(100, 0, 1000, Optional.of(5), Optional.of(4))) + val reqData = new util.LinkedHashMap[TopicIdPartition, FetchRequest.PartitionData] + reqData.put(tp1, new FetchRequest.PartitionData(topicIds.get("foo"), 100, 0, 1000, Optional.of(5), Optional.of(4))) + reqData.put(tp2, new FetchRequest.PartitionData(topicIds.get("bar"), 100, 0, 1000, Optional.of(5), Optional.of(4))) + reqData.put(tp3, new FetchRequest.PartitionData(topicIds.get("zar"), 100, 0, 1000, Optional.of(5), Optional.of(4))) // Full fetch context returns all partitions in the response val context1 = fetchManager.newContext(ApiKeys.FETCH.latestVersion(), JFetchMetadata.INITIAL, false, - reqData, Collections.emptyList(), topicIds) + reqData, Collections.emptyList(), topicNames) assertEquals(classOf[FullFetchContext], context1.getClass) - val respData1 = new util.LinkedHashMap[TopicPartition, FetchResponseData.PartitionData] + val respData1 = new util.LinkedHashMap[TopicIdPartition, FetchResponseData.PartitionData] respData1.put(tp1, new FetchResponseData.PartitionData() - .setPartitionIndex(tp1.partition) + .setPartitionIndex(tp1.topicPartition.partition) .setHighWatermark(50) .setLastStableOffset(50) .setLogStartOffset(0)) respData1.put(tp2, new FetchResponseData.PartitionData() - .setPartitionIndex(tp2.partition) + .setPartitionIndex(tp2.topicPartition.partition) .setHighWatermark(50) .setLastStableOffset(50) .setLogStartOffset(0)) respData1.put(tp3, new FetchResponseData.PartitionData() - .setPartitionIndex(tp3.partition) + .setPartitionIndex(tp3.topicPartition.partition) .setHighWatermark(50) .setLastStableOffset(50) .setLogStartOffset(0)) val resp1 = context1.updateAndGenerateResponseData(respData1) assertEquals(Errors.NONE, resp1.error) assertNotEquals(INVALID_SESSION_ID, resp1.sessionId) - assertEquals(Utils.mkSet(tp1, tp2, tp3), resp1.responseData(topicNames, ApiKeys.FETCH.latestVersion()).keySet()) + assertEquals(Utils.mkSet(tp1.topicPartition, tp2.topicPartition, tp3.topicPartition), resp1.responseData(topicNames, ApiKeys.FETCH.latestVersion()).keySet()) // Incremental fetch context returns partitions with changes but only deprioritizes // the partitions with records val context2 = fetchManager.newContext(ApiKeys.FETCH.latestVersion(), new JFetchMetadata(resp1.sessionId, 1), false, - reqData, Collections.emptyList(), topicIds) + reqData, Collections.emptyList(), topicNames) assertEquals(classOf[IncrementalFetchContext], context2.getClass) // Partitions are ordered in the session as per last response assertPartitionsOrder(context2, Seq(tp1, tp2, tp3)) // Response is empty - val respData2 = new util.LinkedHashMap[TopicPartition, FetchResponseData.PartitionData] + val respData2 = new util.LinkedHashMap[TopicIdPartition, FetchResponseData.PartitionData] val resp2 = context2.updateAndGenerateResponseData(respData2) assertEquals(Errors.NONE, resp2.error) assertEquals(resp1.sessionId, resp2.sessionId) assertEquals(Collections.emptySet(), resp2.responseData(topicNames, ApiKeys.FETCH.latestVersion()).keySet) // All partitions with changes should be returned. - val respData3 = new util.LinkedHashMap[TopicPartition, FetchResponseData.PartitionData] + val respData3 = new util.LinkedHashMap[TopicIdPartition, FetchResponseData.PartitionData] respData3.put(tp1, new FetchResponseData.PartitionData() - .setPartitionIndex(tp1.partition) + .setPartitionIndex(tp1.topicPartition.partition) .setHighWatermark(60) .setLastStableOffset(50) .setLogStartOffset(0)) respData3.put(tp2, new FetchResponseData.PartitionData() - .setPartitionIndex(tp2.partition) + .setPartitionIndex(tp2.topicPartition.partition) .setHighWatermark(60) .setLastStableOffset(50) .setLogStartOffset(0) .setRecords(MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord(100, null)))) respData3.put(tp3, new FetchResponseData.PartitionData() - .setPartitionIndex(tp3.partition) + .setPartitionIndex(tp3.topicPartition.partition) .setHighWatermark(50) .setLastStableOffset(50) .setLogStartOffset(0)) val resp3 = context2.updateAndGenerateResponseData(respData3) assertEquals(Errors.NONE, resp3.error) assertEquals(resp1.sessionId, resp3.sessionId) - assertEquals(Utils.mkSet(tp1, tp2), resp3.responseData(topicNames, ApiKeys.FETCH.latestVersion()).keySet) + assertEquals(Utils.mkSet(tp1.topicPartition, tp2.topicPartition), resp3.responseData(topicNames, ApiKeys.FETCH.latestVersion()).keySet) // Only the partitions whose returned records in the last response // were deprioritized assertPartitionsOrder(context2, Seq(tp1, tp3, tp2)) } - private def assertPartitionsOrder(context: FetchContext, partitions: Seq[TopicPartition]): Unit = { - val partitionsInContext = ArrayBuffer.empty[TopicPartition] - context.foreachPartition { (tp, _, _) => + private def assertPartitionsOrder(context: FetchContext, partitions: Seq[TopicIdPartition]): Unit = { + val partitionsInContext = ArrayBuffer.empty[TopicIdPartition] + context.foreachPartition { (tp, _) => partitionsInContext += tp } assertEquals(partitions, partitionsInContext.toSeq) } } + +object FetchSessionTest { + def idUsageCombinations: java.util.stream.Stream[Arguments] = { + Seq( + Arguments.of(true, true), + Arguments.of(true, false), + Arguments.of(false, true), + Arguments.of(false, false) + ).asJava.stream() + } +} Review comment: I can add some for the equals and hash methods in CachedPartition. What classes were you thinking of for others? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org