dajac commented on code in PR #21692:
URL: https://github.com/apache/kafka/pull/21692#discussion_r2933320945


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##########
@@ -692,21 +715,35 @@ public CoordinatorResult<TxnOffsetCommitResponseData, 
CoordinatorRecord> commitT
             final TxnOffsetCommitResponseTopic topicResponse = new 
TxnOffsetCommitResponseTopic().setName(topic.name());
             response.topics().add(topicResponse);
 
+            // Resolve topic ID if it's ZERO_UUID
+            final Uuid resolvedTopicId = metadataImage
+                .topicMetadata(topic.name())
+                .map(CoordinatorMetadataImage.TopicMetadata::id)
+                .orElse(Uuid.ZERO_UUID);
+
             topic.partitions().forEach(partition -> {
                 if (isMetadataInvalid(partition.committedMetadata())) {
                     topicResponse.partitions().add(new 
TxnOffsetCommitResponsePartition()
                         .setPartitionIndex(partition.partitionIndex())
                         
.setErrorCode(Errors.OFFSET_METADATA_TOO_LARGE.code()));
                 } else {
-                    // Validate commit per-partition
-                    try {
-                        validator.validate(
-                            topic.name(),
-                            org.apache.kafka.common.Uuid.ZERO_UUID,
-                            partition.partitionIndex()
-                        );
-                    } catch (StaleMemberEpochException ex) {
-                        throw Errors.ILLEGAL_GENERATION.exception();
+                    if (resolvedTopicId.equals(Uuid.ZERO_UUID)) {

Review Comment:
   This should rather directly go after `resolvedTopicId` as it is not 
necessary to check it for every partitions. Moreover, we should ensure that 
`ILLEGAL_GENERATION` is returned.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##########
@@ -619,8 +626,16 @@ public CoordinatorResult<OffsetCommitResponseData, 
CoordinatorRecord> commitOffs
         final OptionalLong expireTimestampMs = 
expireTimestampMs(request.retentionTimeMs(), currentTimeMs);
 
         request.topics().forEach(topic -> {
+            // Resolve topic ID if it's ZERO_UUID
+            final Uuid resolvedTopicId = topic.topicId().equals(Uuid.ZERO_UUID)

Review Comment:
   @lucliu1108 I think that the consensus is to resolve topic ids in KafkaApis 
for commitOffset and to keep it in the OffsetMetadataManager for 
commitTransactionalOffset until we augment the RPC to support topic ids.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to