junrao commented on code in PR #17700:
URL: https://github.com/apache/kafka/pull/17700#discussion_r1911759105


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java:
##########
@@ -3437,6 +3441,72 @@ public void testPollWithRedundantCreateFetchRequests() {
 
     }
 
+    @Test
+    public void testFetchSessionTestEviction() {

Review Comment:
   testFetchSessionTestEviction => testFetchSessionEviction ?



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java:
##########
@@ -3437,6 +3441,72 @@ public void testPollWithRedundantCreateFetchRequests() {
 
     }
 
+    @Test
+    public void testFetchSessionTestEviction() {
+        buildFetcher();
+
+        Set<TopicPartition> partitions = Set.of(tp0, tp1, tp2, tp3);
+        assignFromUser(partitions);
+        client.updateMetadata(
+            RequestTestUtils.metadataUpdateWithIds(
+                2,
+                singletonMap(topicName, 4),
+                tp -> validLeaderEpoch,
+                topicIds,
+                false
+            )
+        );
+
+        Map<Node, Set<TopicPartition>> nodeToPartitionMap = new HashMap<>();
+
+        partitions.forEach(tp -> {
+            subscriptions.seek(tp, 1);
+            Node node = metadata.fetch().leaderFor(tp);
+            nodeToPartitionMap.computeIfAbsent(node, k -> new 
HashSet<>()).add(tp);
+        });
+
+        assertEquals(nodeToPartitionMap.size(), sendFetches());
+        assertFalse(fetcher.hasCompletedFetches());
+
+        nodeToPartitionMap.keySet().forEach(node -> {
+            Set<TopicPartition> nodePartitions = nodeToPartitionMap.get(node);
+            assertNotNull(nodePartitions);
+            LinkedHashMap<TopicIdPartition, FetchResponseData.PartitionData> 
partitionDataMap = new LinkedHashMap<>();
+
+            nodePartitions.forEach(tp -> {
+                TopicIdPartition tidp = new TopicIdPartition(topicId, tp);
+                FetchResponseData.PartitionData partitionData = new 
FetchResponseData.PartitionData()
+                    .setPartitionIndex(tp1.partition())
+                    .setHighWatermark(100)
+                    .setRecords(records);
+                partitionDataMap.put(tidp, partitionData);
+            });
+
+            client.prepareResponseFrom(
+                FetchResponse.of(Errors.NONE, 0, INVALID_SESSION_ID, 
partitionDataMap),
+                node
+            );
+        });
+
+        final ArgumentCaptor<NetworkClientDelegate.UnsentRequest> argument = 
ArgumentCaptor.forClass(NetworkClientDelegate.UnsentRequest.class);
+        networkClientDelegate.poll(time.timer(0));
+
+        verify(networkClientDelegate, 
times(nodeToPartitionMap.size())).doSend(argument.capture(), any(Long.class));
+
+        for (NetworkClientDelegate.UnsentRequest unsentRequest : 
argument.getAllValues()) {
+            Optional<Node> nodeOpt = unsentRequest.node();
+            assertTrue(nodeOpt.isPresent());
+            Node node = nodeOpt.get();
+            FetchRequest.Builder builder = (FetchRequest.Builder) 
unsentRequest.requestBuilder();
+            Set<TopicPartition> nodePartitions = nodeToPartitionMap.get(node);
+            assertNotNull(nodePartitions);
+            assertEquals(nodePartitions.size(), builder.fetchData().size());
+        }
+
+        assertTrue(fetcher.hasCompletedFetches());
+        fetchRecords();

Review Comment:
   This doesn't verify anything. Also, should we test that if 1 partition has 
buffered data and another doesn't, no fetch request will be sent to that node?



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java:
##########
@@ -1823,7 +1828,7 @@ public void testSeekBeforeException() {
         buildFetcher(AutoOffsetResetStrategy.NONE, new ByteArrayDeserializer(),
                 new ByteArrayDeserializer(), 2, 
IsolationLevel.READ_UNCOMMITTED);
 
-        assignFromUser(Set.of(tp0));
+        assignFromUser(Set.of(tp0), 2);         // Use multiple nodes so 
partitions have different leaders

Review Comment:
   There is only 1 partition. Do we need 2 nodes?



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java:
##########
@@ -1152,16 +1156,17 @@ public void testFetchMaxPollRecordsUnaligned() {
         Set<TopicPartition> tps = new HashSet<>();
         tps.add(tp0);
         tps.add(tp1);
-        assignFromUser(tps);
+        assignFromUser(tps, 2);                 // Use multiple nodes so 
partitions have different leaders
         subscriptions.seek(tp0, 1);
         subscriptions.seek(tp1, 6);
 
-        client.prepareResponse(fetchResponse2(tidp0, records, 100L, tidp1, 
moreRecords, 100L));

Review Comment:
   fetchResponse2 is no longer used.



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java:
##########
@@ -3437,6 +3441,72 @@ public void testPollWithRedundantCreateFetchRequests() {
 
     }
 
+    @Test
+    public void testFetchSessionTestEviction() {
+        buildFetcher();
+
+        Set<TopicPartition> partitions = Set.of(tp0, tp1, tp2, tp3);
+        assignFromUser(partitions);
+        client.updateMetadata(
+            RequestTestUtils.metadataUpdateWithIds(
+                2,
+                singletonMap(topicName, 4),
+                tp -> validLeaderEpoch,
+                topicIds,
+                false
+            )
+        );
+
+        Map<Node, Set<TopicPartition>> nodeToPartitionMap = new HashMap<>();
+
+        partitions.forEach(tp -> {
+            subscriptions.seek(tp, 1);
+            Node node = metadata.fetch().leaderFor(tp);
+            nodeToPartitionMap.computeIfAbsent(node, k -> new 
HashSet<>()).add(tp);
+        });
+
+        assertEquals(nodeToPartitionMap.size(), sendFetches());
+        assertFalse(fetcher.hasCompletedFetches());
+
+        nodeToPartitionMap.keySet().forEach(node -> {
+            Set<TopicPartition> nodePartitions = nodeToPartitionMap.get(node);
+            assertNotNull(nodePartitions);
+            LinkedHashMap<TopicIdPartition, FetchResponseData.PartitionData> 
partitionDataMap = new LinkedHashMap<>();
+
+            nodePartitions.forEach(tp -> {
+                TopicIdPartition tidp = new TopicIdPartition(topicId, tp);
+                FetchResponseData.PartitionData partitionData = new 
FetchResponseData.PartitionData()
+                    .setPartitionIndex(tp1.partition())

Review Comment:
   Should we use tp instead of tp1?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to