squah-confluent commented on code in PR #21692:
URL: https://github.com/apache/kafka/pull/21692#discussion_r2928518045


##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -1448,6 +1448,83 @@ class KafkaApisTest extends Logging {
     assertEquals(expectedOffsetCommitResponse, response.data)
   }
 
+  @Test
+  def testHandleOffsetCommitRequestWithZeroUuidResolvesTopicId(): Unit = {
+    val topicName = "foo"
+    val topicId = Uuid.randomUuid()
+    addTopicToMetadataCache(topicName, topicId = topicId, numPartitions = 2)
+
+    for (version <- ApiKeys.OFFSET_COMMIT.oldestVersion() to 
ApiKeys.OFFSET_COMMIT.latestVersion()) {
+      // Version >= 10 requires topic IDs, skip.
+      if (version >= 10) return
+      reset(groupCoordinator, clientRequestQuotaManager, requestChannel)
+
+      // Request sends ZERO_UUID with topic name
+      val offsetCommitRequest = new OffsetCommitRequestData()
+        .setGroupId("group")
+        .setMemberId("member")
+        .setTopics(util.List.of(
+          new OffsetCommitRequestData.OffsetCommitRequestTopic()
+            .setTopicId(Uuid.ZERO_UUID)
+            .setName(topicName)
+            .setPartitions(util.List.of(
+              new OffsetCommitRequestData.OffsetCommitRequestPartition()
+                .setPartitionIndex(0)
+                .setCommittedOffset(10),
+              new OffsetCommitRequestData.OffsetCommitRequestPartition()
+                .setPartitionIndex(1)
+                .setCommittedOffset(20)))))
+
+      // Expected request should have resolved topic ID
+      val expectedOffsetCommitRequest = new OffsetCommitRequestData()
+        .setGroupId("group")
+        .setMemberId("member")
+        .setTopics(util.List.of(
+          new OffsetCommitRequestData.OffsetCommitRequestTopic()
+            .setTopicId(topicId)
+            .setName(topicName)
+            .setPartitions(util.List.of(
+              new OffsetCommitRequestData.OffsetCommitRequestPartition()
+                .setPartitionIndex(0)
+                .setCommittedOffset(10),
+              new OffsetCommitRequestData.OffsetCommitRequestPartition()
+                .setPartitionIndex(1)
+                .setCommittedOffset(20)))))
+
+      val requestChannelRequest =
+        
buildRequest(OffsetCommitRequest.Builder.forTopicIdsOrNames(offsetCommitRequest).build(version.toShort))
+
+      val future = new CompletableFuture[OffsetCommitResponseData]()
+      when(groupCoordinator.commitOffsets(
+        requestChannelRequest.context,
+        expectedOffsetCommitRequest,
+        RequestLocal.noCaching.bufferSupplier
+      )).thenReturn(future)
+      kafkaApis = createKafkaApis()
+      kafkaApis.handle(
+        requestChannelRequest,
+        RequestLocal.noCaching
+      )
+
+      val offsetCommitResponse = new OffsetCommitResponseData()
+        .setTopics(util.List.of(
+          new OffsetCommitResponseData.OffsetCommitResponseTopic()
+            .setTopicId(Uuid.ZERO_UUID)

Review Comment:
   The group coordinator would return a response with a `topicId`. Would the 
assert below would fail when a topic id is set because there is no topic id 
field in the response until version 10?



##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -283,11 +283,11 @@ class KafkaApis(val requestChannel: RequestChannel,
     } else {
       val useTopicIds = 
OffsetCommitResponse.useTopicIds(request.header.apiVersion)
 
-      if (useTopicIds) {
-        offsetCommitRequest.data.topics.forEach { topic =>
-          if (topic.topicId != Uuid.ZERO_UUID) {
-            metadataCache.getTopicName(topic.topicId).ifPresent(name => 
topic.setName(name))
-          }
+      offsetCommitRequest.data.topics.forEach { topic =>
+        if (topic.topicId == Uuid.ZERO_UUID) {

Review Comment:
   Could we swap the order of these conditions?
   
   > 1. If `topic.topicId == Uuid.ZERO_UUID`, then it couldn't of an 
OffsetCommit.API_VERSION>=10
   > 2. If OffsetCommit.API_VERSION>=10, then topicId could't be ZERO_UUID 
(enforced by `OffsetCommitRequest#build()` method).
   
   That might not be true. We can't guarantee that the client is well-behaved 
or that we are talking to the Java client. However, it's certainly safe to 
assume that below version 10, the topic id is `ZERO_UUID` and above version 10, 
the topic name is absent based on the request schema.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java:
##########
@@ -1272,9 +1273,43 @@ public void testOnLoaded() {
             any(), eq(image)
         );
 
+        verify(offsetMetadataManager, times(1)).onMetadataUpdate(
+            any(), eq(image)
+        );
+
         verify(groupMetadataManager, times(1)).onLoaded();
     }
 
+    @Test
+    public void testOnMetadataUpdate() {
+        CoordinatorMetadataImage image = CoordinatorMetadataImage.EMPTY;
+        CoordinatorMetadataDelta delta = mock(CoordinatorMetadataDelta.class);

Review Comment:
   We could use `CoordinatorMetadataDelta.EMPTY` here instead of a mock.
   ```suggestion
           CoordinatorMetadataDelta delta = CoordinatorMetadataDelta.EMPTY;
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to