skaundinya15 commented on a change in pull request #10962:
URL: https://github.com/apache/kafka/pull/10962#discussion_r664738871



##########
File path: 
clients/src/test/java/org/apache/kafka/common/requests/OffsetFetchRequestTest.java
##########
@@ -76,62 +73,169 @@ public void testConstructor() {
         }
 
         for (short version : ApiKeys.OFFSET_FETCH.allVersions()) {
-            OffsetFetchRequest request = builder.build(version);
-            assertFalse(request.isAllPartitions());
-            assertEquals(groupId, request.groupId());
-            assertEquals(partitions, request.partitions());
-
-            OffsetFetchResponse response = 
request.getErrorResponse(throttleTimeMs, Errors.NONE);
-            assertEquals(Errors.NONE, response.error());
-            assertFalse(response.hasError());
-            assertEquals(Collections.singletonMap(Errors.NONE, version <= 
(short) 1 ? 3 : 1), response.errorCounts(),
-                "Incorrect error count for version " + version);
-
-            if (version <= 1) {
-                assertEquals(expectedData, response.responseData());
+            if (version < 8) {
+                builder = new OffsetFetchRequest.Builder(
+                    group1,
+                    false,
+                    partitions,
+                    false);
+                assertFalse(builder.isAllTopicPartitions());
+                OffsetFetchRequest request = builder.build(version);
+                assertFalse(request.isAllPartitions());
+                assertEquals(group1, request.groupId());
+                assertEquals(partitions, request.partitions());
+
+                OffsetFetchResponse response = 
request.getErrorResponse(throttleTimeMs, Errors.NONE);
+                assertEquals(Errors.NONE, response.error());
+                assertFalse(response.hasError());
+                assertEquals(Collections.singletonMap(Errors.NONE, version <= 
(short) 1 ? 3 : 1), response.errorCounts(),
+                    "Incorrect error count for version " + version);
+
+                if (version <= 1) {
+                    assertEquals(expectedData, response.responseDataV0ToV7());
+                }
+
+                if (version >= 3) {
+                    assertEquals(throttleTimeMs, response.throttleTimeMs());
+                } else {
+                    assertEquals(DEFAULT_THROTTLE_TIME, 
response.throttleTimeMs());
+                }
+            } else {
+                builder = new Builder(Collections.singletonMap(group1, 
partitions), false, false);
+                OffsetFetchRequest request = builder.build(version);
+                Map<String, List<TopicPartition>> groupToPartitionMap =
+                    request.groupIdsToPartitions();
+                Map<String, List<OffsetFetchRequestTopics>> groupToTopicMap =
+                    request.groupIdsToTopics();
+                assertFalse(request.isAllPartitionsForGroup(group1));
+                assertTrue(groupToPartitionMap.containsKey(group1) && 
groupToTopicMap.containsKey(
+                    group1));
+                assertEquals(partitions, groupToPartitionMap.get(group1));
+                OffsetFetchResponse response = 
request.getErrorResponse(throttleTimeMs, Errors.NONE);
+                assertEquals(Errors.NONE, response.groupLevelError(group1));
+                assertFalse(response.groupHasError(group1));
+                assertEquals(Collections.singletonMap(Errors.NONE, 1), 
response.errorCounts(),
+                    "Incorrect error count for version " + version);
+                assertEquals(throttleTimeMs, response.throttleTimeMs());
             }
+        }
+    }
+
+    @Test
+    public void testConstructorWithMultipleGroups() {
+        List<TopicPartition> topic1Partitions = Arrays.asList(
+            new TopicPartition(topicOne, partitionOne),
+            new TopicPartition(topicOne, partitionTwo));
+        List<TopicPartition> topic2Partitions = Arrays.asList(
+            new TopicPartition(topicTwo, partitionOne),
+            new TopicPartition(topicTwo, partitionTwo));
+        List<TopicPartition> topic3Partitions = Arrays.asList(
+            new TopicPartition(topicThree, partitionOne),
+            new TopicPartition(topicThree, partitionTwo));
+        Map<String, List<TopicPartition>> groupToTp = new HashMap<>();
+        groupToTp.put(group1, topic1Partitions);
+        groupToTp.put(group2, topic2Partitions);
+        groupToTp.put(group3, topic3Partitions);
+        groupToTp.put(group4, null);
+        groupToTp.put(group5, null);
+        int throttleTimeMs = 10;
 
-            if (version >= 3) {
+        for (short version : ApiKeys.OFFSET_FETCH.allVersions()) {
+            if (version >= 8) {
+                builder = new Builder(groupToTp, false, false);
+                OffsetFetchRequest request = builder.build(version);
+                Map<String, List<TopicPartition>> groupToPartitionMap =
+                    request.groupIdsToPartitions();
+                Map<String, List<OffsetFetchRequestTopics>> groupToTopicMap =
+                    request.groupIdsToTopics();
+                assertEquals(groupToTp.keySet(), groupToTopicMap.keySet());
+                assertEquals(groupToTp.keySet(), groupToPartitionMap.keySet());
+                assertFalse(request.isAllPartitionsForGroup(group1));
+                assertFalse(request.isAllPartitionsForGroup(group2));
+                assertFalse(request.isAllPartitionsForGroup(group3));
+                assertTrue(request.isAllPartitionsForGroup(group4));
+                assertTrue(request.isAllPartitionsForGroup(group5));
+                OffsetFetchResponse response = 
request.getErrorResponse(throttleTimeMs, Errors.NONE);
+                assertEquals(Errors.NONE, response.groupLevelError(group1));
+                assertEquals(Errors.NONE, response.groupLevelError(group2));
+                assertEquals(Errors.NONE, response.groupLevelError(group3));
+                assertEquals(Errors.NONE, response.groupLevelError(group4));
+                assertEquals(Errors.NONE, response.groupLevelError(group5));
+                assertFalse(response.groupHasError(group1));
+                assertFalse(response.groupHasError(group2));
+                assertFalse(response.groupHasError(group3));
+                assertFalse(response.groupHasError(group4));
+                assertFalse(response.groupHasError(group5));
+                assertEquals(Collections.singletonMap(Errors.NONE, 5), 
response.errorCounts(),
+                    "Incorrect error count for version " + version);
                 assertEquals(throttleTimeMs, response.throttleTimeMs());
-            } else {
-                assertEquals(DEFAULT_THROTTLE_TIME, response.throttleTimeMs());
             }
         }
     }
 
     @Test
-    public void testConstructorFailForUnsupportedRequireStable() {
+    public void testBuildThrowForUnsupportedBatchRequest() {
         for (short version : ApiKeys.OFFSET_FETCH.allVersions()) {
-            // The builder needs to be initialized every cycle as the internal 
data `requireStable` flag is flipped.
-            builder = new OffsetFetchRequest.Builder(groupId, true, null, 
false);
-            final short finalVersion = version;
-            if (version < 2) {
-                assertThrows(UnsupportedVersionException.class, () -> 
builder.build(finalVersion));
-            } else {
-                OffsetFetchRequest request = builder.build(finalVersion);
-                assertEquals(groupId, request.groupId());
-                assertNull(request.partitions());
-                assertTrue(request.isAllPartitions());
-                if (version < 7) {
-                    assertFalse(request.requireStable());
-                } else {
-                    assertTrue(request.requireStable());
-                }
+            if (version < 8) {
+                Map<String, List<TopicPartition>> groupPartitionMap = new 
HashMap<>();
+                groupPartitionMap.put(group1, null);
+                groupPartitionMap.put(group2, null);
+                builder = new Builder(groupPartitionMap, true, false);
+                final short finalVersion = version;
+                assertThrows(NoBatchedOffsetFetchRequestException.class, () -> 
builder.build(finalVersion));

Review comment:
       This specific test checks if the correct exception is thrown if we try 
to create a batched request for a version less than 8. The `testConstructor` 
method tests the single group request with version 8 and above. Is there any 
other case you think we should add for `version >=8`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to