skaundinya15 commented on a change in pull request #10962:
URL: https://github.com/apache/kafka/pull/10962#discussion_r663417840



##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
##########
@@ -68,36 +74,117 @@ public Builder(String groupId,
             }
 
             this.data = new OffsetFetchRequestData()
-                            .setGroupId(groupId)
-                            .setRequireStable(requireStable)
-                            .setTopics(topics);
+                .setGroupId(groupId)
+                .setRequireStable(requireStable)
+                .setTopics(topics);
             this.throwOnFetchStableOffsetsUnsupported = 
throwOnFetchStableOffsetsUnsupported;
         }
 
         boolean isAllTopicPartitions() {
             return this.data.topics() == ALL_TOPIC_PARTITIONS;
         }
 
-        @Override
-        public OffsetFetchRequest build(short version) {
-            if (isAllTopicPartitions() && version < 2) {
-                throw new UnsupportedVersionException("The broker only 
supports OffsetFetchRequest " +
-                    "v" + version + ", but we need v2 or newer to request all 
topic partitions.");
-            }
+        public Builder(Map<String, List<TopicPartition>> 
groupIdToTopicPartitionMap,
+            boolean requireStable,
+            boolean throwOnFetchStableOffsetsUnsupported) {
+            super(ApiKeys.OFFSET_FETCH);
 
-            if (data.requireStable() && version < 7) {
-                if (throwOnFetchStableOffsetsUnsupported) {
-                    throw new UnsupportedVersionException("Broker unexpectedly 
" +
-                        "doesn't support requireStable flag on version " + 
version);
+            List<OffsetFetchRequestGroup> groups = new ArrayList<>();
+            for (Entry<String, List<TopicPartition>> entry : 
groupIdToTopicPartitionMap.entrySet()) {
+                final List<OffsetFetchRequestTopics> topics;
+                if (groupIdToTopicPartitionMap.get(entry.getKey()) != null) {

Review comment:
       Here I'm using `entry.getKey()` because we are trying to get the list of 
topic partitions for a specific group id. `entry.getValue()` would give us 
`List<TopicPartition>` whereas we need `String`, which `entry.getKey()` would 
give us.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to