Hangleton commented on code in PR #13240: URL: https://github.com/apache/kafka/pull/13240#discussion_r1122175488
########## clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java: ########## @@ -119,43 +130,62 @@ public boolean shouldClientThrottle(short version) { return version >= 4; } - public static class Builder { - OffsetCommitResponseData data = new OffsetCommitResponseData(); - HashMap<String, OffsetCommitResponseTopic> byTopicName = new HashMap<>(); + public short version() { + return version; + } - private OffsetCommitResponseTopic getOrCreateTopic( - String topicName - ) { - OffsetCommitResponseTopic topic = byTopicName.get(topicName); - if (topic == null) { - topic = new OffsetCommitResponseTopic().setName(topicName); - data.topics().add(topic); - byTopicName.put(topicName, topic); - } - return topic; + public static Builder<?> newBuilder(TopicResolver topicResolver, short version) { + if (version >= 9) { + return new Builder<>(topicResolver, new ByTopicId(), version); + } else { + return new Builder<>(topicResolver, new ByTopicName(), version); } + } - public Builder addPartition( - String topicName, - int partitionIndex, - Errors error - ) { - final OffsetCommitResponseTopic topicResponse = getOrCreateTopic(topicName); + public static final class Builder<T> { + private final TopicResolver topicResolver; + private final TopicClassifier<T> topicClassifier; + private final short version; - topicResponse.partitions().add(new OffsetCommitResponsePartition() - .setPartitionIndex(partitionIndex) - .setErrorCode(error.code())); + private OffsetCommitResponseData data = new OffsetCommitResponseData(); + private final Map<T, OffsetCommitResponseTopic> topics = new HashMap<>(); + + protected Builder(TopicResolver topicResolver, TopicClassifier<T> topicClassifier, short version) { + this.topicResolver = topicResolver; + this.topicClassifier = topicClassifier; + this.version = version; + } + + public Builder<T> addPartition(String topicName, Uuid topicId, int partitionIndex, Errors error) { + Uuid resolvedTopicId = maybeResolveTopicId(topicName, topicId); + + if (version >= 9 && Uuid.ZERO_UUID.equals(resolvedTopicId)) { + Errors reported = error != Errors.NONE ? error : Errors.UNKNOWN_TOPIC_ID; Review Comment: This case shouldn't be reachable because once we have proceeded with constructing the response via `addPartition` all topic ids are supposed to have been resolved successfully. Here, we choose to add the topic to the response with the error code `UNKNOWN_TOPIC_ID` if no error is already set. Any existing error is not overwritten. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org