squah-confluent commented on code in PR #21692:
URL: https://github.com/apache/kafka/pull/21692#discussion_r2928956092
##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -1448,6 +1448,83 @@ class KafkaApisTest extends Logging {
assertEquals(expectedOffsetCommitResponse, response.data)
}
+ @Test
+ def testHandleOffsetCommitRequestWithZeroUuidResolvesTopicId(): Unit = {
+ val topicName = "foo"
+ val topicId = Uuid.randomUuid()
+ addTopicToMetadataCache(topicName, topicId = topicId, numPartitions = 2)
+
+ for (version <- ApiKeys.OFFSET_COMMIT.oldestVersion() to
ApiKeys.OFFSET_COMMIT.latestVersion()) {
+ // Version >= 10 requires topic IDs, skip.
+ if (version >= 10) return
+ reset(groupCoordinator, clientRequestQuotaManager, requestChannel)
+
+ // Request sends ZERO_UUID with topic name
+ val offsetCommitRequest = new OffsetCommitRequestData()
+ .setGroupId("group")
+ .setMemberId("member")
+ .setTopics(util.List.of(
+ new OffsetCommitRequestData.OffsetCommitRequestTopic()
+ .setTopicId(Uuid.ZERO_UUID)
+ .setName(topicName)
+ .setPartitions(util.List.of(
+ new OffsetCommitRequestData.OffsetCommitRequestPartition()
+ .setPartitionIndex(0)
+ .setCommittedOffset(10),
+ new OffsetCommitRequestData.OffsetCommitRequestPartition()
+ .setPartitionIndex(1)
+ .setCommittedOffset(20)))))
+
+ // Expected request should have resolved topic ID
+ val expectedOffsetCommitRequest = new OffsetCommitRequestData()
+ .setGroupId("group")
+ .setMemberId("member")
+ .setTopics(util.List.of(
+ new OffsetCommitRequestData.OffsetCommitRequestTopic()
+ .setTopicId(topicId)
+ .setName(topicName)
+ .setPartitions(util.List.of(
+ new OffsetCommitRequestData.OffsetCommitRequestPartition()
+ .setPartitionIndex(0)
+ .setCommittedOffset(10),
+ new OffsetCommitRequestData.OffsetCommitRequestPartition()
+ .setPartitionIndex(1)
+ .setCommittedOffset(20)))))
+
+ val requestChannelRequest =
+
buildRequest(OffsetCommitRequest.Builder.forTopicIdsOrNames(offsetCommitRequest).build(version.toShort))
+
+ val future = new CompletableFuture[OffsetCommitResponseData]()
+ when(groupCoordinator.commitOffsets(
+ requestChannelRequest.context,
+ expectedOffsetCommitRequest,
+ RequestLocal.noCaching.bufferSupplier
+ )).thenReturn(future)
+ kafkaApis = createKafkaApis()
+ kafkaApis.handle(
+ requestChannelRequest,
+ RequestLocal.noCaching
+ )
+
+ val offsetCommitResponse = new OffsetCommitResponseData()
+ .setTopics(util.List.of(
+ new OffsetCommitResponseData.OffsetCommitResponseTopic()
+ .setTopicId(Uuid.ZERO_UUID)
Review Comment:
That's not what I was getting at.
If we change this line of code
```
- .setTopicId(Uuid.ZERO_UUID)
+ .setTopicId(topicId)
```
does the test continue to pass or does it fail because we serialize and
deserialize the response at version < 10 and the topicId field gets dropped?
---
> For version<10, if topicId is set incorrectly instead of ZERO_UUID
somehow, the wrong topicId will be preserved without throwing an exception for
the testHandleOffsetCommitRequest, which is not desired.
I don't think we need to consider this case. It's impossible unless we have
a bug in the broker.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]