jolshan commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r743351017



##########
File path: core/src/test/scala/unit/kafka/server/FetchSessionTest.scala
##########
@@ -1361,102 +1542,113 @@ class FetchSessionTest {
     val resp4 = context2.updateAndGenerateResponseData(respData)
     assertEquals(Errors.NONE, resp4.error)
     assertEquals(resp1.sessionId, resp4.sessionId)
-    assertEquals(Utils.mkSet(tp1, tp2), resp4.responseData(topicNames, 
request2.version).keySet)
+    assertEquals(Utils.mkSet(tp1.topicPartition, tp2.topicPartition), 
resp4.responseData(topicNames, request2.version).keySet)
   }
 
   @Test
   def testDeprioritizesPartitionsWithRecordsOnly(): Unit = {
     val time = new MockTime()
     val cache = new FetchSessionCache(10, 1000)
     val fetchManager = new FetchManager(time, cache)
-    val tp1 = new TopicPartition("foo", 1)
-    val tp2 = new TopicPartition("bar", 2)
-    val tp3 = new TopicPartition("zar", 3)
     val topicIds = Map("foo" -> Uuid.randomUuid(), "bar" -> Uuid.randomUuid(), 
"zar" -> Uuid.randomUuid()).asJava
     val topicNames = topicIds.asScala.map(_.swap).asJava
+    val tp1 = new TopicIdPartition(topicIds.get("foo"), new 
TopicPartition("foo", 1))
+    val tp2 = new TopicIdPartition(topicIds.get("bar"), new 
TopicPartition("bar", 2))
+    val tp3 = new TopicIdPartition(topicIds.get("zar"), new 
TopicPartition("zar", 3))
 
-    val reqData = new util.LinkedHashMap[TopicPartition, 
FetchRequest.PartitionData]
-    reqData.put(tp1, new FetchRequest.PartitionData(100, 0, 1000, 
Optional.of(5), Optional.of(4)))
-    reqData.put(tp2, new FetchRequest.PartitionData(100, 0, 1000, 
Optional.of(5), Optional.of(4)))
-    reqData.put(tp3, new FetchRequest.PartitionData(100, 0, 1000, 
Optional.of(5), Optional.of(4)))
+    val reqData = new util.LinkedHashMap[TopicIdPartition, 
FetchRequest.PartitionData]
+    reqData.put(tp1, new FetchRequest.PartitionData(topicIds.get("foo"), 100, 
0, 1000, Optional.of(5), Optional.of(4)))
+    reqData.put(tp2, new FetchRequest.PartitionData(topicIds.get("bar"), 100, 
0, 1000, Optional.of(5), Optional.of(4)))
+    reqData.put(tp3, new FetchRequest.PartitionData(topicIds.get("zar"), 100, 
0, 1000, Optional.of(5), Optional.of(4)))
 
     // Full fetch context returns all partitions in the response
     val context1 = fetchManager.newContext(ApiKeys.FETCH.latestVersion(), 
JFetchMetadata.INITIAL, false,
-     reqData, Collections.emptyList(), topicIds)
+     reqData, Collections.emptyList(), topicNames)
     assertEquals(classOf[FullFetchContext], context1.getClass)
 
-    val respData1 = new util.LinkedHashMap[TopicPartition, 
FetchResponseData.PartitionData]
+    val respData1 = new util.LinkedHashMap[TopicIdPartition, 
FetchResponseData.PartitionData]
     respData1.put(tp1, new FetchResponseData.PartitionData()
-      .setPartitionIndex(tp1.partition)
+      .setPartitionIndex(tp1.topicPartition.partition)
       .setHighWatermark(50)
       .setLastStableOffset(50)
       .setLogStartOffset(0))
     respData1.put(tp2, new FetchResponseData.PartitionData()
-      .setPartitionIndex(tp2.partition)
+      .setPartitionIndex(tp2.topicPartition.partition)
       .setHighWatermark(50)
       .setLastStableOffset(50)
       .setLogStartOffset(0))
     respData1.put(tp3, new FetchResponseData.PartitionData()
-      .setPartitionIndex(tp3.partition)
+      .setPartitionIndex(tp3.topicPartition.partition)
       .setHighWatermark(50)
       .setLastStableOffset(50)
       .setLogStartOffset(0))
 
     val resp1 = context1.updateAndGenerateResponseData(respData1)
     assertEquals(Errors.NONE, resp1.error)
     assertNotEquals(INVALID_SESSION_ID, resp1.sessionId)
-    assertEquals(Utils.mkSet(tp1, tp2, tp3), resp1.responseData(topicNames, 
ApiKeys.FETCH.latestVersion()).keySet())
+    assertEquals(Utils.mkSet(tp1.topicPartition, tp2.topicPartition, 
tp3.topicPartition), resp1.responseData(topicNames, 
ApiKeys.FETCH.latestVersion()).keySet())
 
     // Incremental fetch context returns partitions with changes but only 
deprioritizes
     // the partitions with records
     val context2 = fetchManager.newContext(ApiKeys.FETCH.latestVersion(), new 
JFetchMetadata(resp1.sessionId, 1), false,
-      reqData, Collections.emptyList(), topicIds)
+      reqData, Collections.emptyList(), topicNames)
     assertEquals(classOf[IncrementalFetchContext], context2.getClass)
 
     // Partitions are ordered in the session as per last response
     assertPartitionsOrder(context2, Seq(tp1, tp2, tp3))
 
     // Response is empty
-    val respData2 = new util.LinkedHashMap[TopicPartition, 
FetchResponseData.PartitionData]
+    val respData2 = new util.LinkedHashMap[TopicIdPartition, 
FetchResponseData.PartitionData]
     val resp2 = context2.updateAndGenerateResponseData(respData2)
     assertEquals(Errors.NONE, resp2.error)
     assertEquals(resp1.sessionId, resp2.sessionId)
     assertEquals(Collections.emptySet(), resp2.responseData(topicNames, 
ApiKeys.FETCH.latestVersion()).keySet)
 
     // All partitions with changes should be returned.
-    val respData3 = new util.LinkedHashMap[TopicPartition, 
FetchResponseData.PartitionData]
+    val respData3 = new util.LinkedHashMap[TopicIdPartition, 
FetchResponseData.PartitionData]
     respData3.put(tp1, new FetchResponseData.PartitionData()
-      .setPartitionIndex(tp1.partition)
+      .setPartitionIndex(tp1.topicPartition.partition)
       .setHighWatermark(60)
       .setLastStableOffset(50)
       .setLogStartOffset(0))
     respData3.put(tp2, new FetchResponseData.PartitionData()
-      .setPartitionIndex(tp2.partition)
+      .setPartitionIndex(tp2.topicPartition.partition)
       .setHighWatermark(60)
       .setLastStableOffset(50)
       .setLogStartOffset(0)
       .setRecords(MemoryRecords.withRecords(CompressionType.NONE,
         new SimpleRecord(100, null))))
     respData3.put(tp3, new FetchResponseData.PartitionData()
-      .setPartitionIndex(tp3.partition)
+      .setPartitionIndex(tp3.topicPartition.partition)
       .setHighWatermark(50)
       .setLastStableOffset(50)
       .setLogStartOffset(0))
     val resp3 = context2.updateAndGenerateResponseData(respData3)
     assertEquals(Errors.NONE, resp3.error)
     assertEquals(resp1.sessionId, resp3.sessionId)
-    assertEquals(Utils.mkSet(tp1, tp2), resp3.responseData(topicNames, 
ApiKeys.FETCH.latestVersion()).keySet)
+    assertEquals(Utils.mkSet(tp1.topicPartition, tp2.topicPartition), 
resp3.responseData(topicNames, ApiKeys.FETCH.latestVersion()).keySet)
 
     // Only the partitions whose returned records in the last response
     // were deprioritized
     assertPartitionsOrder(context2, Seq(tp1, tp3, tp2))
   }
 
-  private def assertPartitionsOrder(context: FetchContext, partitions: 
Seq[TopicPartition]): Unit = {
-    val partitionsInContext = ArrayBuffer.empty[TopicPartition]
-    context.foreachPartition { (tp, _, _) =>
+  private def assertPartitionsOrder(context: FetchContext, partitions: 
Seq[TopicIdPartition]): Unit = {
+    val partitionsInContext = ArrayBuffer.empty[TopicIdPartition]
+    context.foreachPartition { (tp, _) =>
       partitionsInContext += tp
     }
     assertEquals(partitions, partitionsInContext.toSeq)
   }
 }
+
+object FetchSessionTest {
+  def idUsageCombinations: java.util.stream.Stream[Arguments] = {
+    Seq(
+      Arguments.of(true, true),
+      Arguments.of(true, false),
+      Arguments.of(false, true),
+      Arguments.of(false, false)
+    ).asJava.stream()
+  }
+}

Review comment:
       I can add some for the equals and hash methods in CachedPartition. What 
classes were you thinking of for others?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to