skaundinya15 commented on a change in pull request #10962:
URL: https://github.com/apache/kafka/pull/10962#discussion_r664798862



##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
##########
@@ -78,26 +85,107 @@ boolean isAllTopicPartitions() {
             return this.data.topics() == ALL_TOPIC_PARTITIONS;
         }
 
+        public Builder(Map<String, List<TopicPartition>> 
groupIdToTopicPartitionMap,
+            boolean requireStable,
+            boolean throwOnFetchStableOffsetsUnsupported) {
+            super(ApiKeys.OFFSET_FETCH);
+
+            List<OffsetFetchRequestGroup> groups = new ArrayList<>();
+            for (Entry<String, List<TopicPartition>> entry : 
groupIdToTopicPartitionMap.entrySet()) {
+                String groupName = entry.getKey();
+                List<TopicPartition> tpList = entry.getValue();
+                final List<OffsetFetchRequestTopics> topics;
+                if (tpList != null) {
+                    Map<String, OffsetFetchRequestTopics> 
offsetFetchRequestTopicMap =
+                        new HashMap<>();
+                    for (TopicPartition topicPartition : tpList) {
+                        String topicName = topicPartition.topic();
+                        OffsetFetchRequestTopics topic = 
offsetFetchRequestTopicMap.getOrDefault(
+                            topicName, new 
OffsetFetchRequestTopics().setName(topicName));
+                        
topic.partitionIndexes().add(topicPartition.partition());
+                        offsetFetchRequestTopicMap.put(topicName, topic);
+                    }
+                    topics = new 
ArrayList<>(offsetFetchRequestTopicMap.values());
+                } else {
+                    topics = ALL_TOPIC_PARTITIONS_BATCH;
+                }
+                groups.add(new OffsetFetchRequestGroup()
+                    .setGroupId(groupName)
+                    .setTopics(topics));
+            }
+            this.data = new OffsetFetchRequestData()
+                .setGroupIds(groups)
+                .setRequireStable(requireStable);
+            this.throwOnFetchStableOffsetsUnsupported = 
throwOnFetchStableOffsetsUnsupported;
+        }
+
         @Override
         public OffsetFetchRequest build(short version) {
             if (isAllTopicPartitions() && version < 2) {
                 throw new UnsupportedVersionException("The broker only 
supports OffsetFetchRequest " +
                     "v" + version + ", but we need v2 or newer to request all 
topic partitions.");
             }
-
+            if (data.groupIds().size() > 1 && version < 8) {
+                throw new NoBatchedOffsetFetchRequestException("Broker does 
not support"
+                    + " batching groups for fetch offset request on version " 
+ version);
+            }
             if (data.requireStable() && version < 7) {
                 if (throwOnFetchStableOffsetsUnsupported) {
                     throw new UnsupportedVersionException("Broker unexpectedly 
" +
                         "doesn't support requireStable flag on version " + 
version);
                 } else {
                     log.trace("Fallback the requireStable flag to false as 
broker " +
-                                  "only supports OffsetFetchRequest version 
{}. Need " +
-                                  "v7 or newer to enable this feature", 
version);
+                        "only supports OffsetFetchRequest version {}. Need " +
+                        "v7 or newer to enable this feature", version);
 
                     return new 
OffsetFetchRequest(data.setRequireStable(false), version);
                 }
             }
-
+            if (version < 8) {
+                OffsetFetchRequestData oldDataFormat = null;
+                if (!data.groupIds().isEmpty()) {
+                    OffsetFetchRequestGroup group = data.groupIds().get(0);
+                    String groupName = group.groupId();
+                    List<OffsetFetchRequestTopics> topics = group.topics();
+                    List<OffsetFetchRequestTopic> oldFormatTopics = null;
+                    if (topics != null) {
+                        oldFormatTopics = topics
+                            .stream()
+                            .map(t ->
+                                new OffsetFetchRequestTopic()
+                                    .setName(t.name())
+                                    .setPartitionIndexes(t.partitionIndexes()))
+                            .collect(Collectors.toList());
+                    }
+                    oldDataFormat = new OffsetFetchRequestData()
+                        .setGroupId(groupName)
+                        .setTopics(oldFormatTopics)
+                        .setRequireStable(data.requireStable());
+                }
+                return new OffsetFetchRequest(oldDataFormat == null ? data : 
oldDataFormat, version);
+            }
+            // version 8 but have used old format of request, convert to 
version 8 of request

Review comment:
       Ah okay, yes that makes sense, will make the change.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to