junrao commented on code in PR #17700:
URL: https://github.com/apache/kafka/pull/17700#discussion_r1911759105
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java:
##########
@@ -3437,6 +3441,72 @@ public void testPollWithRedundantCreateFetchRequests() {
}
+ @Test
+ public void testFetchSessionTestEviction() {
Review Comment:
testFetchSessionTestEviction => testFetchSessionEviction ?
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java:
##########
@@ -3437,6 +3441,72 @@ public void testPollWithRedundantCreateFetchRequests() {
}
+ @Test
+ public void testFetchSessionTestEviction() {
+ buildFetcher();
+
+ Set<TopicPartition> partitions = Set.of(tp0, tp1, tp2, tp3);
+ assignFromUser(partitions);
+ client.updateMetadata(
+ RequestTestUtils.metadataUpdateWithIds(
+ 2,
+ singletonMap(topicName, 4),
+ tp -> validLeaderEpoch,
+ topicIds,
+ false
+ )
+ );
+
+ Map<Node, Set<TopicPartition>> nodeToPartitionMap = new HashMap<>();
+
+ partitions.forEach(tp -> {
+ subscriptions.seek(tp, 1);
+ Node node = metadata.fetch().leaderFor(tp);
+ nodeToPartitionMap.computeIfAbsent(node, k -> new
HashSet<>()).add(tp);
+ });
+
+ assertEquals(nodeToPartitionMap.size(), sendFetches());
+ assertFalse(fetcher.hasCompletedFetches());
+
+ nodeToPartitionMap.keySet().forEach(node -> {
+ Set<TopicPartition> nodePartitions = nodeToPartitionMap.get(node);
+ assertNotNull(nodePartitions);
+ LinkedHashMap<TopicIdPartition, FetchResponseData.PartitionData>
partitionDataMap = new LinkedHashMap<>();
+
+ nodePartitions.forEach(tp -> {
+ TopicIdPartition tidp = new TopicIdPartition(topicId, tp);
+ FetchResponseData.PartitionData partitionData = new
FetchResponseData.PartitionData()
+ .setPartitionIndex(tp1.partition())
+ .setHighWatermark(100)
+ .setRecords(records);
+ partitionDataMap.put(tidp, partitionData);
+ });
+
+ client.prepareResponseFrom(
+ FetchResponse.of(Errors.NONE, 0, INVALID_SESSION_ID,
partitionDataMap),
+ node
+ );
+ });
+
+ final ArgumentCaptor<NetworkClientDelegate.UnsentRequest> argument =
ArgumentCaptor.forClass(NetworkClientDelegate.UnsentRequest.class);
+ networkClientDelegate.poll(time.timer(0));
+
+ verify(networkClientDelegate,
times(nodeToPartitionMap.size())).doSend(argument.capture(), any(Long.class));
+
+ for (NetworkClientDelegate.UnsentRequest unsentRequest :
argument.getAllValues()) {
+ Optional<Node> nodeOpt = unsentRequest.node();
+ assertTrue(nodeOpt.isPresent());
+ Node node = nodeOpt.get();
+ FetchRequest.Builder builder = (FetchRequest.Builder)
unsentRequest.requestBuilder();
+ Set<TopicPartition> nodePartitions = nodeToPartitionMap.get(node);
+ assertNotNull(nodePartitions);
+ assertEquals(nodePartitions.size(), builder.fetchData().size());
+ }
+
+ assertTrue(fetcher.hasCompletedFetches());
+ fetchRecords();
Review Comment:
This doesn't verify anything. Also, should we test that if 1 partition has
buffered data and another doesn't, no fetch request will be sent to that node?
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java:
##########
@@ -1823,7 +1828,7 @@ public void testSeekBeforeException() {
buildFetcher(AutoOffsetResetStrategy.NONE, new ByteArrayDeserializer(),
new ByteArrayDeserializer(), 2,
IsolationLevel.READ_UNCOMMITTED);
- assignFromUser(Set.of(tp0));
+ assignFromUser(Set.of(tp0), 2); // Use multiple nodes so
partitions have different leaders
Review Comment:
There is only 1 partition. Do we need 2 nodes?
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java:
##########
@@ -1152,16 +1156,17 @@ public void testFetchMaxPollRecordsUnaligned() {
Set<TopicPartition> tps = new HashSet<>();
tps.add(tp0);
tps.add(tp1);
- assignFromUser(tps);
+ assignFromUser(tps, 2); // Use multiple nodes so
partitions have different leaders
subscriptions.seek(tp0, 1);
subscriptions.seek(tp1, 6);
- client.prepareResponse(fetchResponse2(tidp0, records, 100L, tidp1,
moreRecords, 100L));
Review Comment:
fetchResponse2 is no longer used.
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java:
##########
@@ -3437,6 +3441,72 @@ public void testPollWithRedundantCreateFetchRequests() {
}
+ @Test
+ public void testFetchSessionTestEviction() {
+ buildFetcher();
+
+ Set<TopicPartition> partitions = Set.of(tp0, tp1, tp2, tp3);
+ assignFromUser(partitions);
+ client.updateMetadata(
+ RequestTestUtils.metadataUpdateWithIds(
+ 2,
+ singletonMap(topicName, 4),
+ tp -> validLeaderEpoch,
+ topicIds,
+ false
+ )
+ );
+
+ Map<Node, Set<TopicPartition>> nodeToPartitionMap = new HashMap<>();
+
+ partitions.forEach(tp -> {
+ subscriptions.seek(tp, 1);
+ Node node = metadata.fetch().leaderFor(tp);
+ nodeToPartitionMap.computeIfAbsent(node, k -> new
HashSet<>()).add(tp);
+ });
+
+ assertEquals(nodeToPartitionMap.size(), sendFetches());
+ assertFalse(fetcher.hasCompletedFetches());
+
+ nodeToPartitionMap.keySet().forEach(node -> {
+ Set<TopicPartition> nodePartitions = nodeToPartitionMap.get(node);
+ assertNotNull(nodePartitions);
+ LinkedHashMap<TopicIdPartition, FetchResponseData.PartitionData>
partitionDataMap = new LinkedHashMap<>();
+
+ nodePartitions.forEach(tp -> {
+ TopicIdPartition tidp = new TopicIdPartition(topicId, tp);
+ FetchResponseData.PartitionData partitionData = new
FetchResponseData.PartitionData()
+ .setPartitionIndex(tp1.partition())
Review Comment:
Should we use tp instead of tp1?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]