junrao commented on code in PR #17700:
URL: https://github.com/apache/kafka/pull/17700#discussion_r1932750879
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java:
##########
@@ -3437,21 +3443,338 @@ public void testPollWithRedundantCreateFetchRequests()
{
}
- private OffsetsForLeaderEpochResponse prepareOffsetsForLeaderEpochResponse(
- TopicPartition topicPartition,
- Errors error,
- int leaderEpoch,
- long endOffset
- ) {
- OffsetForLeaderEpochResponseData data = new
OffsetForLeaderEpochResponseData();
- data.topics().add(new OffsetForLeaderTopicResult()
- .setTopic(topicPartition.topic())
- .setPartitions(Collections.singletonList(new EpochEndOffset()
- .setPartition(topicPartition.partition())
- .setErrorCode(error.code())
- .setLeaderEpoch(leaderEpoch)
- .setEndOffset(endOffset))));
- return new OffsetsForLeaderEpochResponse(data);
+ /**
+ * This test makes several calls to {@link #sendFetches()}, and after
each, the buffered partitions are
+ * modified to either cause (or prevent) a fetch from being requested.
+ */
+ @Test
+ public void testFetchRequestWithBufferedPartitions() {
+ buildFetcher();
+
+ // The partitions are spread across multiple nodes to ensure the
fetcher's logic correctly handles the
+ // partition-to-node mapping.
+ int numNodes = 2;
+ Set<TopicPartition> partitions = Set.of(tp0, tp1, tp2, tp3);
+ assignFromUser(partitions, numNodes);
+
+ // Seek each partition so that it becomes eligible to fetch.
+ partitions.forEach(tp -> subscriptions.seek(tp, 0));
+
+ // Get all the nodes serving as the leader for these partitions.
+ List<Node> nodes = nodesForPartitionLeaders(partitions);
+
+ // Extract the nodes and their respective set of partitions to make
things easier to keep track of later.
+ assertEquals(2, nodes.size());
+ Node node0 = nodes.get(0);
+ Node node1 = nodes.get(1);
+ List<TopicPartition> node0Partitions = partitionsForNode(node0,
partitions);
+ List<TopicPartition> node1Partitions = partitionsForNode(node1,
partitions);
+ assertEquals(2, node0Partitions.size());
+ assertEquals(2, node1Partitions.size());
+ TopicPartition node0Partition1 = node0Partitions.get(0);
+ TopicPartition node0Partition2 = node0Partitions.get(1);
+ TopicPartition node1Partition1 = node1Partitions.get(0);
+ TopicPartition node1Partition2 = node1Partitions.get(1);
+
+ // sendFetches() call #1 should issue requests to node 0 or node 1
since neither has buffered data.
+ assertEquals(2, sendFetches());
+ prepareFetchResponses(node0, node0Partitions, 0);
+ prepareFetchResponses(node1, node1Partitions, 0);
+ networkClientDelegate.poll(time.timer(0));
+
+ assertEquals(4, fetcher.fetchBuffer.bufferedPartitions().size());
+ collectSelectedPartition(node0Partition1, partitions);
+ node0Partitions.remove(node0Partition1);
+ assertEquals(3, fetcher.fetchBuffer.bufferedPartitions().size());
+
+ // sendFetches() call #2 shouldn't issue requests to either node 0 or
node 1 since they both have buffered data.
+ assertEquals(0, sendFetches());
+ networkClientDelegate.poll(time.timer(0));
+ collectSelectedPartition(node1Partition1, partitions);
+ node1Partitions.remove(node1Partition1);
+ assertEquals(2, fetcher.fetchBuffer.bufferedPartitions().size());
+
+ // sendFetches() call #3 shouldn't issue requests to either node 0 or
node 1 since they both have buffered data.
+ assertEquals(0, sendFetches());
+ networkClientDelegate.poll(time.timer(0));
+ collectSelectedPartition(node0Partition2, partitions);
+ node0Partitions.remove(node0Partition2);
+ assertEquals(1, fetcher.fetchBuffer.bufferedPartitions().size());
+
+ // Validate that all of node 0's partitions have all been collected.
+ assertTrue(node0Partitions.isEmpty());
+
+ // Reset the list of partitions for node 0 so the next fetch pass
requests data.
+ node0Partitions = partitionsForNode(node0, partitions);
+
+ // sendFetches() call #4 should issue a request to node 0 since its
buffered data was collected.
+ assertEquals(1, sendFetches());
+ prepareFetchResponses(node0, node0Partitions, 10);
+ networkClientDelegate.poll(time.timer(0));
+
+ collectSelectedPartition(node1Partition2, partitions);
+ node1Partitions.remove(node1Partition2);
+ assertEquals(2, fetcher.fetchBuffer.bufferedPartitions().size());
+
+ // Node 1's partitions have likewise all been collected, so validate
that.
+ assertTrue(node1Partitions.isEmpty());
+
+ // Again, reset the list of partitions, this time for node 1, so the
next fetch pass requests data.
+ node1Partitions = partitionsForNode(node1, partitions);
+
+ // sendFetches() call #5 should issue a request to node 1 since its
buffered data was collected.
+ assertEquals(1, sendFetches());
+ prepareFetchResponses(node1, node1Partitions, 10);
+ networkClientDelegate.poll(time.timer(0));
+ assertEquals(4, fetcher.fetchBuffer.bufferedPartitions().size());
+
+ // Collect all the records and make sure they include all the
partitions, and validate that there is no data
+ // remaining in the fetch buffer.
+ assertEquals(partitions, fetchRecords().keySet());
+ assertEquals(0, fetcher.fetchBuffer.bufferedPartitions().size());
+
+ // sendFetches() call #6 should issue a request to nodes 0 and 1 since
its buffered data was collected.
+ assertEquals(2, sendFetches());
+ prepareFetchResponses(node0, node0Partitions, 20);
+ prepareFetchResponses(node1, node1Partitions, 20);
+ networkClientDelegate.poll(time.timer(0));
+ assertEquals(4, fetcher.fetchBuffer.bufferedPartitions().size());
+
+ // Just for completeness, collect all the records and make sure they
include all the partitions, and validate
+ // that there is no data remaining in the fetch buffer.
+ assertEquals(partitions, fetchRecords().keySet());
+ assertEquals(0, fetcher.fetchBuffer.bufferedPartitions().size());
+ }
+
+ @Test
+ public void testFetchRequestWithBufferedPartitionNotAssigned() {
+ buildFetcher();
+
+ // The partitions are spread across multiple nodes to ensure the
fetcher's logic correctly handles the
+ // partition-to-node mapping.
+ int numNodes = 2;
+ Set<TopicPartition> partitions = Set.of(tp0, tp1, tp2, tp3);
+ assignFromUser(partitions, numNodes);
+
+ // Seek each partition so that it becomes eligible to fetch.
+ partitions.forEach(tp -> subscriptions.seek(tp, 0));
+
+ // Get all the nodes serving as the leader for these partitions.
+ List<Node> nodes = nodesForPartitionLeaders(partitions);
+
+ // Extract the nodes and their respective set of partitions to make
things easier to keep track of later.
+ assertEquals(2, nodes.size());
+ Node node0 = nodes.get(0);
+ Node node1 = nodes.get(1);
+ List<TopicPartition> node0Partitions = partitionsForNode(node0,
partitions);
+ List<TopicPartition> node1Partitions = partitionsForNode(node1,
partitions);
+ assertEquals(2, node0Partitions.size());
+ assertEquals(2, node1Partitions.size());
+ TopicPartition node0Partition1 = node0Partitions.get(0);
+ TopicPartition node0Partition2 = node0Partitions.get(1);
+ TopicPartition node1Partition1 = node1Partitions.get(0);
+ TopicPartition node1Partition2 = node1Partitions.get(1);
+
+ // sendFetches() call #1 should issue requests to node 0 or node 1
since neither has buffered data.
+ assertEquals(2, sendFetches());
+ prepareFetchResponses(node0, node0Partitions, 0);
+ prepareFetchResponses(node1, node1Partitions, 0);
+ networkClientDelegate.poll(time.timer(0));
+
+ // Collect node0Partition1 so that it doesn't have anything in the
fetch buffer.
+ assertEquals(4, fetcher.fetchBuffer.bufferedPartitions().size());
+ collectSelectedPartition(node0Partition1, partitions);
+ assertEquals(3, fetcher.fetchBuffer.bufferedPartitions().size());
+
+ // Exclude node0Partition2 (the remaining buffered partition for node
0) when updating the assigned partitions
+ // to cause it to become unassigned.
+ subscriptions.assignFromUser(Set.of(
+ node0Partition1,
+ // node0Partition2, // Intentionally omit this partition
so that it is unassigned
+ node1Partition1,
+ node1Partition2
+ ));
+
+ // node0Partition1 (the collected partition) should have a retrievable
position, but the node0Partition2
Review Comment:
the node0Partition2 => node0Partition2
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]