dajac commented on code in PR #13378:
URL: https://github.com/apache/kafka/pull/13378#discussion_r1134345366


##########
core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala:
##########
@@ -284,7 +284,8 @@ class GroupCoordinatorConcurrencyTest extends 
AbstractCoordinatorConcurrencyTest
     }
     override def runWithCallback(member: GroupMember, responseCallback: 
CommitOffsetCallback): Unit = {
       val tp = new TopicPartition("topic", 0)
-      val offsets = immutable.Map(tp -> OffsetAndMetadata(1, "", 
Time.SYSTEM.milliseconds()))
+      val topicId = Uuid.randomUuid()

Review Comment:
   nit: Could we combine those two into on `TopicIdPartition`?



##########
core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala:
##########
@@ -827,6 +831,14 @@ private[group] class GroupMetadata(val groupId: String, 
initialState: GroupState
   // visible for testing
   private[group] def offsetWithRecordMetadata(topicPartition: TopicPartition): 
Option[CommitRecordMetadataAndOffset] = offsets.get(topicPartition)
 
+  private[group] def pendingOffsetCommit(topicIdPartition: TopicIdPartition): 
Option[OffsetAndMetadata] = {

Review Comment:
   Are those two methods for testing? If so, could we add a comment for each: 
`Used for testing`.



##########
core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala:
##########
@@ -2847,7 +2851,10 @@ class GroupCoordinatorTest {
     // Different producers will commit offsets for different partitions.
     // Each partition's offsets should be materialized when the corresponding 
producer's marker is received.
 
-    val partitions = List(new TopicPartition("topic1", 0), new 
TopicPartition("topic2", 0))
+    val partitions = List(

Review Comment:
   ditto.



##########
core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala:
##########
@@ -2917,34 +2925,33 @@ class GroupCoordinatorTest {
 
   @Test
   def testFetchAllOffsets(): Unit = {
-    val tp1 = new TopicPartition("topic", 0)
-    val tp2 = new TopicPartition("topic", 1)
-    val tp3 = new TopicPartition("other-topic", 0)
     val offset1 = offsetAndMetadata(15)
     val offset2 = offsetAndMetadata(16)
     val offset3 = offsetAndMetadata(17)
+    val (topicId, otherTopicId) = (Uuid.randomUuid(), Uuid.randomUuid())
+    val tip1 = new TopicIdPartition(topicId, 0, "topic")
+    val tip2 = new TopicIdPartition(topicId, 1, "topic")
+    val tip3 = new TopicIdPartition(otherTopicId, 0, "other-topic")

Review Comment:
   nit: Could we keep this block first as it was? Moreover, I would define it 
as follow:
   ```
       val tip1 = new TopicIdPartition(Uuid.randomUuid(), 0, "topic")
       val tip2 = new TopicIdPartition(tip1.topic(), 1, "topic")
       val tip3 = new TopicIdPartition(tip1, 0, "other-topic")
   ```



##########
core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala:
##########
@@ -2768,7 +2768,10 @@ class GroupCoordinatorTest {
     // Both group have pending offset commits.
     // Marker for only one partition is received. That commit should be 
materialized while the other should not.
 
-    val partitions = List(new TopicPartition("topic1", 0), new 
TopicPartition("topic2", 0))
+    val partitions = List(

Review Comment:
   nit: `topicIdPartitions`?



##########
core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala:
##########
@@ -1824,9 +1854,13 @@ class GroupMetadataManagerTest {
     val clientId = "clientId"
     val clientHost = "localhost"
     val topic = "foo"
+    val topicId = Uuid.randomUuid()
     val topicPartition1 = new TopicPartition(topic, 0)
     val topicPartition2 = new TopicPartition(topic, 1)
     val topicPartition3 = new TopicPartition(topic, 2)
+    val topicIdPartition1 = new TopicIdPartition(topicId, topicPartition1)
+    val topicIdPartition2 = new TopicIdPartition(topicId, topicPartition2)
+    val topicIdPartition3 = new TopicIdPartition(topicId, topicPartition3)

Review Comment:
   nit: Could we combine them here as well?



##########
core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala:
##########
@@ -649,26 +650,27 @@ private[group] class GroupMetadata(val groupId: String, 
initialState: GroupState
     }
   }
 
-  def failPendingOffsetWrite(topicPartition: TopicPartition, offset: 
OffsetAndMetadata): Unit = {
+  def failPendingOffsetWrite(topicIdPartition: TopicIdPartition, offset: 
OffsetAndMetadata): Unit = {
+    val topicPartition = topicIdPartition.topicPartition
     pendingOffsetCommits.get(topicPartition) match {
       case Some(pendingOffset) if offset == pendingOffset => 
pendingOffsetCommits.remove(topicPartition)
       case _ =>
     }
   }
 
-  def prepareOffsetCommit(offsets: Map[TopicPartition, OffsetAndMetadata]): 
Unit = {
+  def prepareOffsetCommit(offsets: Map[TopicIdPartition, OffsetAndMetadata]): 
Unit = {
     receivedConsumerOffsetCommits = true
-    pendingOffsetCommits ++= offsets
+    offsets.foreach { case (k,v) => pendingOffsetCommits += k.topicPartition 
-> v }

Review Comment:
   nit: We could use `forKeyValue` here. Moreover, let's use 
`(topicIdPartition, offsetAndMetadata)` (note the space after the `,`). instead 
of `(k, v)`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to