dajac commented on code in PR #21692:
URL: https://github.com/apache/kafka/pull/21692#discussion_r2933320945
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##########
@@ -692,21 +715,35 @@ public CoordinatorResult<TxnOffsetCommitResponseData,
CoordinatorRecord> commitT
final TxnOffsetCommitResponseTopic topicResponse = new
TxnOffsetCommitResponseTopic().setName(topic.name());
response.topics().add(topicResponse);
+ // Resolve topic ID if it's ZERO_UUID
+ final Uuid resolvedTopicId = metadataImage
+ .topicMetadata(topic.name())
+ .map(CoordinatorMetadataImage.TopicMetadata::id)
+ .orElse(Uuid.ZERO_UUID);
+
topic.partitions().forEach(partition -> {
if (isMetadataInvalid(partition.committedMetadata())) {
topicResponse.partitions().add(new
TxnOffsetCommitResponsePartition()
.setPartitionIndex(partition.partitionIndex())
.setErrorCode(Errors.OFFSET_METADATA_TOO_LARGE.code()));
} else {
- // Validate commit per-partition
- try {
- validator.validate(
- topic.name(),
- org.apache.kafka.common.Uuid.ZERO_UUID,
- partition.partitionIndex()
- );
- } catch (StaleMemberEpochException ex) {
- throw Errors.ILLEGAL_GENERATION.exception();
+ if (resolvedTopicId.equals(Uuid.ZERO_UUID)) {
Review Comment:
This should rather directly go after `resolvedTopicId` as it is not
necessary to check it for every partitions. Moreover, we should ensure that
`ILLEGAL_GENERATION` is returned.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##########
@@ -619,8 +626,16 @@ public CoordinatorResult<OffsetCommitResponseData,
CoordinatorRecord> commitOffs
final OptionalLong expireTimestampMs =
expireTimestampMs(request.retentionTimeMs(), currentTimeMs);
request.topics().forEach(topic -> {
+ // Resolve topic ID if it's ZERO_UUID
+ final Uuid resolvedTopicId = topic.topicId().equals(Uuid.ZERO_UUID)
Review Comment:
@lucliu1108 I think that the consensus is to resolve topic ids in KafkaApis
for commitOffset and to keep it in the OffsetMetadataManager for
commitTransactionalOffset until we augment the RPC to support topic ids.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]