Hangleton commented on code in PR #13240:
URL: https://github.com/apache/kafka/pull/13240#discussion_r1122175488


##########
clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java:
##########
@@ -119,43 +130,62 @@ public boolean shouldClientThrottle(short version) {
         return version >= 4;
     }
 
-    public static class Builder {
-        OffsetCommitResponseData data = new OffsetCommitResponseData();
-        HashMap<String, OffsetCommitResponseTopic> byTopicName = new 
HashMap<>();
+    public short version() {
+        return version;
+    }
 
-        private OffsetCommitResponseTopic getOrCreateTopic(
-            String topicName
-        ) {
-            OffsetCommitResponseTopic topic = byTopicName.get(topicName);
-            if (topic == null) {
-                topic = new OffsetCommitResponseTopic().setName(topicName);
-                data.topics().add(topic);
-                byTopicName.put(topicName, topic);
-            }
-            return topic;
+    public static Builder<?> newBuilder(TopicResolver topicResolver, short 
version) {
+        if (version >= 9) {
+            return new Builder<>(topicResolver, new ByTopicId(), version);
+        } else {
+            return new Builder<>(topicResolver, new ByTopicName(), version);
         }
+    }
 
-        public Builder addPartition(
-            String topicName,
-            int partitionIndex,
-            Errors error
-        ) {
-            final OffsetCommitResponseTopic topicResponse = 
getOrCreateTopic(topicName);
+    public static final class Builder<T> {
+        private final TopicResolver topicResolver;
+        private final TopicClassifier<T> topicClassifier;
+        private final short version;
 
-            topicResponse.partitions().add(new OffsetCommitResponsePartition()
-                .setPartitionIndex(partitionIndex)
-                .setErrorCode(error.code()));
+        private OffsetCommitResponseData data = new OffsetCommitResponseData();
+        private final Map<T, OffsetCommitResponseTopic> topics = new 
HashMap<>();
+
+        protected Builder(TopicResolver topicResolver, TopicClassifier<T> 
topicClassifier, short version) {
+            this.topicResolver = topicResolver;
+            this.topicClassifier = topicClassifier;
+            this.version = version;
+        }
+
+        public Builder<T> addPartition(String topicName, Uuid topicId, int 
partitionIndex, Errors error) {
+            Uuid resolvedTopicId = maybeResolveTopicId(topicName, topicId);
+
+            if (version >= 9 && Uuid.ZERO_UUID.equals(resolvedTopicId)) {
+                Errors reported = error != Errors.NONE ? error : 
Errors.UNKNOWN_TOPIC_ID;

Review Comment:
   This case shouldn't be reachable because once we have proceeded with 
constructing the response via `addPartition` all topic ids are supposed to have 
been resolved successfully. Here, we choose to add the topic to the response 
with the error code `UNKNOWN_TOPIC_ID` if no error is already set. Any existing 
error is not overwritten.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to