junrao commented on code in PR #17700:
URL: https://github.com/apache/kafka/pull/17700#discussion_r1931115871
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java:
##########
@@ -3437,21 +3443,330 @@ public void testPollWithRedundantCreateFetchRequests()
{
}
- private OffsetsForLeaderEpochResponse prepareOffsetsForLeaderEpochResponse(
- TopicPartition topicPartition,
- Errors error,
- int leaderEpoch,
- long endOffset
- ) {
- OffsetForLeaderEpochResponseData data = new
OffsetForLeaderEpochResponseData();
- data.topics().add(new OffsetForLeaderTopicResult()
- .setTopic(topicPartition.topic())
- .setPartitions(Collections.singletonList(new EpochEndOffset()
- .setPartition(topicPartition.partition())
- .setErrorCode(error.code())
- .setLeaderEpoch(leaderEpoch)
- .setEndOffset(endOffset))));
- return new OffsetsForLeaderEpochResponse(data);
+ /**
+ * This test makes several calls to {@link #sendFetches()}, and after
each, the buffered partitions are
+ * modified to either cause (or prevent) a fetch from being requested.
+ */
+ @Test
+ public void testFetchRequestWithBufferedPartitions() {
+ buildFetcher();
+
+ // The test requires that there are multiple nodes as the fetch
request logic is based in part off of the
+ // partition-to-node relationship.
+ int numNodes = 2;
+ Set<TopicPartition> partitions = Set.of(tp0, tp1, tp2, tp3);
+ assignFromUser(partitions, numNodes);
+
+ // Get all the nodes serving as the leader for these partitions, and
then split them into the separate
+ // nodes and set of partitions to make things easier to keep track of
later.
+ List<Node> nodes = nodesForPartitionLeaders(partitions);
+ Node node0 = nodes.get(0);
+ Node node1 = nodes.get(1);
+ List<TopicPartition> node0Partitions = partitionsForNode(node0,
partitions);
+ List<TopicPartition> node1Partitions = partitionsForNode(node1,
partitions);
+
+ // Seek each partition so that it becomes eligible to fetch.
+ partitions.forEach(tp -> subscriptions.seek(tp, 0));
+
+ // sendFetches() call #1 should issue requests to node 0 or node 1
since neither has buffered data.
+ assertEquals(2, sendFetches());
+ prepareFetchResponses(node0, node0Partitions, 0);
+ prepareFetchResponses(node1, node1Partitions, 0);
+ networkClientDelegate.poll(time.timer(0));
+ assertEquals(4, fetcher.fetchBuffer.bufferedPartitions().size());
+ collectSelectedPartition(node0Partitions.remove(0), partitions);
+ assertEquals(3, fetcher.fetchBuffer.bufferedPartitions().size());
+
+ // sendFetches() call #2 shouldn't issue requests to either node 0 or
node 1 since they both have buffered data.
+ assertEquals(0, sendFetches());
+ networkClientDelegate.poll(time.timer(0));
+ collectSelectedPartition(node1Partitions.remove(0), partitions);
+ assertEquals(2, fetcher.fetchBuffer.bufferedPartitions().size());
+
+ // sendFetches() call #3 shouldn't issue requests to either node 0 or
node 1 since they both have buffered data.
+ assertEquals(0, sendFetches());
+ networkClientDelegate.poll(time.timer(0));
+ collectSelectedPartition(node0Partitions.remove(0), partitions);
+ assertEquals(1, fetcher.fetchBuffer.bufferedPartitions().size());
+
+ // Node 0's partitions have all been collected, so validate that and
then reset the list of partitions
+ // from which to fetch data so the next pass should request can fetch
more data.
+ assertTrue(node0Partitions.isEmpty());
+ node0Partitions = partitionsForNode(node0, partitions);
+
+ // sendFetches() call #4 should issue a request to node 0 since its
buffered data was collected.
+ assertEquals(1, sendFetches());
+ prepareFetchResponses(node0, node0Partitions, 10);
+ networkClientDelegate.poll(time.timer(0));
+ collectSelectedPartition(node1Partitions.remove(0), partitions);
+ assertEquals(2, fetcher.fetchBuffer.bufferedPartitions().size());
+
+ // Node 1's partitions have likewise all been collected, so validate
that and reset.
+ assertTrue(node1Partitions.isEmpty());
+ node1Partitions = partitionsForNode(node1, partitions);
+
+ // sendFetches() call #5 should issue a request to node 1 since its
buffered data was collected.
+ assertEquals(1, sendFetches());
+ prepareFetchResponses(node1, node1Partitions, 10);
+ networkClientDelegate.poll(time.timer(0));
+ assertEquals(4, fetcher.fetchBuffer.bufferedPartitions().size());
+
+ // Collect all the records and make sure they include all the
partitions, and validate that there is no data
+ // remaining in the fetch buffer.
+ assertEquals(partitions, fetchRecords().keySet());
+ assertEquals(0, fetcher.fetchBuffer.bufferedPartitions().size());
+
+ // sendFetches() call #6 should issue a request to nodes 0 and 1 since
its buffered data was collected.
+ assertEquals(2, sendFetches());
+ prepareFetchResponses(node0, node0Partitions, 20);
+ prepareFetchResponses(node1, node1Partitions, 20);
+ networkClientDelegate.poll(time.timer(0));
+ assertEquals(4, fetcher.fetchBuffer.bufferedPartitions().size());
+
+ // Just for completeness, collect all the records and make sure they
include all the partitions, and validate
+ // that there is no data remaining in the fetch buffer.
+ assertEquals(partitions, fetchRecords().keySet());
+ assertEquals(0, fetcher.fetchBuffer.bufferedPartitions().size());
+ }
+
+ @Test
+ public void testFetchRequestWithBufferedPartitionNotAssigned() {
+ buildFetcher();
+
+ // The test requires that there are multiple nodes as the fetch
request logic is based in part off of the
+ // partition-to-node relationship.
+ int numNodes = 2;
+ Set<TopicPartition> partitions = Set.of(tp0, tp1, tp2, tp3);
+ assignFromUser(partitions, numNodes);
+
+ // Get all the nodes serving as the leader for these partitions, and
then split them into the separate
+ // nodes and set of partitions to make things easier to keep track of
later.
+ List<Node> nodes = nodesForPartitionLeaders(partitions);
+ Node node0 = nodes.get(0);
+ Node node1 = nodes.get(1);
+ List<TopicPartition> node0Partitions = partitionsForNode(node0,
partitions);
+ List<TopicPartition> node1Partitions = partitionsForNode(node1,
partitions);
+
+ // Seek each partition so that it becomes eligible to fetch.
+ partitions.forEach(tp -> subscriptions.seek(tp, 0));
+
+ // sendFetches() call #1 should issue requests to node 0 or node 1
since neither has buffered data.
+ assertEquals(2, sendFetches());
+ prepareFetchResponses(node0, node0Partitions, 0);
+ prepareFetchResponses(node1, node1Partitions, 0);
+ networkClientDelegate.poll(time.timer(0));
+
+ // Grab both partitions for node 0. The first partition will be
collected so that it doesn't have anything
+ // in the fetch buffer. The second node will be left in the buffer,
but will be unassigned in a bit.
+ TopicPartition node0Partition1 = node0Partitions.remove(0);
+ TopicPartition node0Partition2 = node0Partitions.remove(0);
+
+ assertEquals(4, fetcher.fetchBuffer.bufferedPartitions().size());
+ collectSelectedPartition(node0Partition1, partitions);
+ assertEquals(3, fetcher.fetchBuffer.bufferedPartitions().size());
+
+ // Change the set of assigned partitions to exclude the remaining
buffered partition for node 0, which means
+ // that partition is unassigned.
+ Set<TopicPartition> topicsWithoutUnassignedPartition = new
HashSet<>(partitions);
+ topicsWithoutUnassignedPartition.remove(node0Partition2);
Review Comment:
Could we just initialize topicsWithoutUnassignedPartition with
node0Partition2?
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java:
##########
@@ -3437,21 +3443,330 @@ public void testPollWithRedundantCreateFetchRequests()
{
}
- private OffsetsForLeaderEpochResponse prepareOffsetsForLeaderEpochResponse(
- TopicPartition topicPartition,
- Errors error,
- int leaderEpoch,
- long endOffset
- ) {
- OffsetForLeaderEpochResponseData data = new
OffsetForLeaderEpochResponseData();
- data.topics().add(new OffsetForLeaderTopicResult()
- .setTopic(topicPartition.topic())
- .setPartitions(Collections.singletonList(new EpochEndOffset()
- .setPartition(topicPartition.partition())
- .setErrorCode(error.code())
- .setLeaderEpoch(leaderEpoch)
- .setEndOffset(endOffset))));
- return new OffsetsForLeaderEpochResponse(data);
+ /**
+ * This test makes several calls to {@link #sendFetches()}, and after
each, the buffered partitions are
+ * modified to either cause (or prevent) a fetch from being requested.
+ */
+ @Test
+ public void testFetchRequestWithBufferedPartitions() {
+ buildFetcher();
+
+ // The test requires that there are multiple nodes as the fetch
request logic is based in part off of the
+ // partition-to-node relationship.
+ int numNodes = 2;
+ Set<TopicPartition> partitions = Set.of(tp0, tp1, tp2, tp3);
+ assignFromUser(partitions, numNodes);
+
+ // Get all the nodes serving as the leader for these partitions, and
then split them into the separate
+ // nodes and set of partitions to make things easier to keep track of
later.
+ List<Node> nodes = nodesForPartitionLeaders(partitions);
+ Node node0 = nodes.get(0);
+ Node node1 = nodes.get(1);
+ List<TopicPartition> node0Partitions = partitionsForNode(node0,
partitions);
+ List<TopicPartition> node1Partitions = partitionsForNode(node1,
partitions);
+
+ // Seek each partition so that it becomes eligible to fetch.
+ partitions.forEach(tp -> subscriptions.seek(tp, 0));
+
+ // sendFetches() call #1 should issue requests to node 0 or node 1
since neither has buffered data.
+ assertEquals(2, sendFetches());
+ prepareFetchResponses(node0, node0Partitions, 0);
+ prepareFetchResponses(node1, node1Partitions, 0);
+ networkClientDelegate.poll(time.timer(0));
+ assertEquals(4, fetcher.fetchBuffer.bufferedPartitions().size());
+ collectSelectedPartition(node0Partitions.remove(0), partitions);
+ assertEquals(3, fetcher.fetchBuffer.bufferedPartitions().size());
+
+ // sendFetches() call #2 shouldn't issue requests to either node 0 or
node 1 since they both have buffered data.
+ assertEquals(0, sendFetches());
+ networkClientDelegate.poll(time.timer(0));
+ collectSelectedPartition(node1Partitions.remove(0), partitions);
+ assertEquals(2, fetcher.fetchBuffer.bufferedPartitions().size());
+
+ // sendFetches() call #3 shouldn't issue requests to either node 0 or
node 1 since they both have buffered data.
+ assertEquals(0, sendFetches());
+ networkClientDelegate.poll(time.timer(0));
+ collectSelectedPartition(node0Partitions.remove(0), partitions);
+ assertEquals(1, fetcher.fetchBuffer.bufferedPartitions().size());
+
+ // Node 0's partitions have all been collected, so validate that and
then reset the list of partitions
+ // from which to fetch data so the next pass should request can fetch
more data.
Review Comment:
`so the next pass should request can fetch more data.` doesn't read well.
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java:
##########
@@ -3437,21 +3443,318 @@ public void testPollWithRedundantCreateFetchRequests()
{
}
- private OffsetsForLeaderEpochResponse prepareOffsetsForLeaderEpochResponse(
- TopicPartition topicPartition,
- Errors error,
- int leaderEpoch,
- long endOffset
- ) {
- OffsetForLeaderEpochResponseData data = new
OffsetForLeaderEpochResponseData();
- data.topics().add(new OffsetForLeaderTopicResult()
- .setTopic(topicPartition.topic())
- .setPartitions(Collections.singletonList(new EpochEndOffset()
- .setPartition(topicPartition.partition())
- .setErrorCode(error.code())
- .setLeaderEpoch(leaderEpoch)
- .setEndOffset(endOffset))));
- return new OffsetsForLeaderEpochResponse(data);
+ /**
+ * This test makes several calls to {@link #sendFetches()}, and after
each, the buffered partitions are
+ * modified to either cause (or prevent) a fetch from being requested.
+ */
+ @Test
+ public void testFetchRequestWithBufferedPartitions() {
+ buildFetcher();
+
+ // The test requires that there are multiple nodes as the fetch
request logic is based in part off of the
+ // partition-to-node relationship.
+ int numNodes = 2;
+ Set<TopicPartition> partitions = Set.of(tp0, tp1, tp2, tp3);
+ assignFromUser(partitions, numNodes);
+
+ // Get all the nodes serving as the leader for these partitions, and
then split them into the separate
+ // nodes and set of partitions to make things easier to keep track of
later.
+ List<Node> nodes = nodesForPartitionLeaders(partitions);
+ Node node0 = nodes.get(0);
+ Node node1 = nodes.get(1);
+ List<TopicPartition> node0Partitions = partitionsForNode(node0,
partitions);
+ List<TopicPartition> node1Partitions = partitionsForNode(node1,
partitions);
+
+ // Seek each partition so that it becomes eligible to fetch.
+ partitions.forEach(tp -> subscriptions.seek(tp, 0));
+
+ // sendFetches() call #1 should issue requests to node 0 or node 1
since neither has buffered data.
+ assertEquals(2, sendFetches());
+ prepareFetchResponses(node0, node0Partitions, 0);
+ prepareFetchResponses(node1, node1Partitions, 0);
+ networkClientDelegate.poll(time.timer(0));
+ assertEquals(4, fetcher.fetchBuffer.bufferedPartitions().size());
+ collectSelectedPartition(node0Partitions.remove(0), partitions);
+ assertEquals(3, fetcher.fetchBuffer.bufferedPartitions().size());
+
+ // sendFetches() call #2 shouldn't issue requests to either node 0 or
node 1 since they both have buffered data.
+ assertEquals(0, sendFetches());
+ networkClientDelegate.poll(time.timer(0));
+ collectSelectedPartition(node1Partitions.remove(0), partitions);
+ assertEquals(2, fetcher.fetchBuffer.bufferedPartitions().size());
+
+ // sendFetches() call #3 shouldn't issue requests to either node 0 or
node 1 since they both have buffered data.
+ assertEquals(0, sendFetches());
+ networkClientDelegate.poll(time.timer(0));
+ collectSelectedPartition(node0Partitions.remove(0), partitions);
+ assertEquals(1, fetcher.fetchBuffer.bufferedPartitions().size());
+
+ // Node 0's partitions have all been collected, so validate that and
then reset the list of partitions
+ // from which to fetch data so the next pass should request can fetch
more data.
+ assertTrue(node0Partitions.isEmpty());
+ node0Partitions = partitionsForNode(node0, partitions);
+
+ // sendFetches() call #4 should issue a request to node 0 since its
buffered data was collected.
+ assertEquals(1, sendFetches());
+ prepareFetchResponses(node0, node0Partitions, 10);
+ networkClientDelegate.poll(time.timer(0));
+ collectSelectedPartition(node1Partitions.remove(0), partitions);
+ assertEquals(2, fetcher.fetchBuffer.bufferedPartitions().size());
+
+ // Node 1's partitions have likewise all been collected, so validate
that and reset.
+ assertTrue(node1Partitions.isEmpty());
+ node1Partitions = partitionsForNode(node1, partitions);
+
+ // sendFetches() call #5 should issue a request to node 1 since its
buffered data was collected.
+ assertEquals(1, sendFetches());
+ prepareFetchResponses(node1, node1Partitions, 10);
+ networkClientDelegate.poll(time.timer(0));
+ assertEquals(4, fetcher.fetchBuffer.bufferedPartitions().size());
+
+ // Collect all the records and make sure they include all the
partitions, and validate that there is no data
+ // remaining in the fetch buffer.
+ assertEquals(partitions, fetchRecords().keySet());
+ assertEquals(0, fetcher.fetchBuffer.bufferedPartitions().size());
+
+ // sendFetches() call #6 should issue a request to nodes 0 and 1 since
its buffered data was collected.
+ assertEquals(2, sendFetches());
+ prepareFetchResponses(node0, node0Partitions, 20);
+ prepareFetchResponses(node1, node1Partitions, 20);
+ networkClientDelegate.poll(time.timer(0));
+ assertEquals(4, fetcher.fetchBuffer.bufferedPartitions().size());
+
+ // Just for completeness, collect all the records and make sure they
include all the partitions, and validate
+ // that there is no data remaining in the fetch buffer.
+ assertEquals(partitions, fetchRecords().keySet());
+ assertEquals(0, fetcher.fetchBuffer.bufferedPartitions().size());
+ }
+
+ @Test
+ public void testFetchRequestWithBufferedPartitionNotAssigned() {
+ buildFetcher();
+
+ // The test requires that there are multiple nodes as the fetch
request logic is based in part off of the
+ // partition-to-node relationship.
+ int numNodes = 2;
+ Set<TopicPartition> partitions = Set.of(tp0, tp1, tp2, tp3);
+ assignFromUser(partitions, numNodes);
+
+ // Get all the nodes serving as the leader for these partitions, and
then split them into the separate
+ // nodes and set of partitions to make things easier to keep track of
later.
+ List<Node> nodes = nodesForPartitionLeaders(partitions);
+ Node node0 = nodes.get(0);
+ Node node1 = nodes.get(1);
+ List<TopicPartition> node0Partitions = partitionsForNode(node0,
partitions);
+ List<TopicPartition> node1Partitions = partitionsForNode(node1,
partitions);
+
+ // Seek each partition so that it becomes eligible to fetch.
+ partitions.forEach(tp -> subscriptions.seek(tp, 0));
+
+ // sendFetches() call #1 should issue requests to node 0 or node 1
since neither has buffered data.
+ assertEquals(2, sendFetches());
+ prepareFetchResponses(node0, node0Partitions, 0);
+ prepareFetchResponses(node1, node1Partitions, 0);
+ networkClientDelegate.poll(time.timer(0));
+
+ // Grab both partitions for node 0. The first partition will be
collected so that it doesn't have anything
+ // in the fetch buffer. The second node will be left in the buffer,
but will be unassigned in a bit.
+ TopicPartition node0Partition1 = node0Partitions.remove(0);
+ TopicPartition node0Partition2 = node0Partitions.remove(0);
+
+ assertEquals(4, fetcher.fetchBuffer.bufferedPartitions().size());
+ collectSelectedPartition(node0Partition1, partitions);
+ assertEquals(3, fetcher.fetchBuffer.bufferedPartitions().size());
+
+ // Change the set of assigned partitions to exclude the remaining
buffered partition for node 0, which means
+ // that partition is unassigned.
+ Set<TopicPartition> topicsWithoutUnassignedPartition = new
HashSet<>(partitions);
+ topicsWithoutUnassignedPartition.remove(node0Partition2);
+ subscriptions.assignFromUser(topicsWithoutUnassignedPartition);
+
+ // The collected partition should have a retrievable position, but the
unassigned position should throw
+ // an error when attempting to retrieve its position.
+ assertDoesNotThrow(() -> subscriptions.position(node0Partition1));
+ assertThrows(IllegalStateException.class, () ->
subscriptions.position(node0Partition2));
+
+ // sendFetches() call #2 should issue a request to node 0 because the
first partition in node 0 was collected
+ // (and its buffer removed) and the second partition for node 0 was
unassigned. As a result, there are now no
+ // *assigned* partitions for node 0 that are buffered.
+ assertEquals(1, sendFetches());
+ }
+
+ @Test
+ public void testFetchRequestWithBufferedPartitionMissingLeader() {
+ buildFetcher();
+
+ // The test requires that there are multiple nodes as the fetch
request logic is based in part off of the
+ // partition-to-node relationship.
+ int numNodes = 2;
+ Set<TopicPartition> partitions = Set.of(tp0, tp1, tp2, tp3);
+ assignFromUser(partitions, numNodes);
+
+ // Get all the nodes serving as the leader for these partitions, and
then split them into the separate
+ // nodes and set of partitions to make things easier to keep track of
later.
+ List<Node> nodes = nodesForPartitionLeaders(partitions);
+ Node node0 = nodes.get(0);
+ Node node1 = nodes.get(1);
+ List<TopicPartition> node0Partitions = partitionsForNode(node0,
partitions);
+ List<TopicPartition> node1Partitions = partitionsForNode(node1,
partitions);
+
+ // Seek each partition so that it becomes eligible to fetch.
+ partitions.forEach(tp -> subscriptions.seek(tp, 0));
+
+ // sendFetches() call #1 should issue requests to node 0 or node 1
since neither has buffered data.
+ assertEquals(2, sendFetches());
+ prepareFetchResponses(node0, node0Partitions, 0);
+ prepareFetchResponses(node1, node1Partitions, 0);
+ networkClientDelegate.poll(time.timer(0));
+
+ // Grab both partitions for node 0. The first partition will be
collected so that it doesn't have anything
+ // in the fetch buffer. The second node will be left in the buffer,
but will clear out its position's leader
+ // node.
+ TopicPartition node0Partition1 = node0Partitions.remove(0);
+ TopicPartition node0Partition2 = node0Partitions.remove(0);
+
+ assertEquals(4, fetcher.fetchBuffer.bufferedPartitions().size());
+ collectSelectedPartition(node0Partition1, partitions);
+ assertEquals(3, fetcher.fetchBuffer.bufferedPartitions().size());
+
+ // Overwrite the position with an empty leader to trigger the test
case.
+ SubscriptionState.FetchPosition leaderlessPosition = new
SubscriptionState.FetchPosition(
+ 0,
+ Optional.empty(),
+ Metadata.LeaderAndEpoch.noLeaderOrEpoch()
+ );
+ subscriptions.position(node0Partition2, leaderlessPosition);
+
+ // Both the collected partition and the position without a partition
leader should have a retrievable position.
+ // Confirm that position() doesn't throw an exception and that the
leader for the second partition is missing.
+ assertNotEquals(Optional.empty(),
subscriptions.position(node0Partition1).currentLeader.leader);
+ assertEquals(Optional.empty(),
subscriptions.position(node0Partition2).currentLeader.leader);
+
+ // sendFetches() call #2 should issue a fetch request to node 0
because its first partition was collected
+ // (and its buffer removed) and the second partition had its leader
node cleared. As a result, there are now
+ // effectively no topic partitions for node 0.
+ assertEquals(1, sendFetches());
+ }
+
+ @Test
+ public void testFetchRequestWithBufferedPartitionMissingPosition() {
+ buildFetcher();
+
+ // The test requires that there are multiple nodes as the fetch
request logic is based in part off of the
+ // partition-to-node relationship.
+ int numNodes = 2;
+ Set<TopicPartition> partitions = Set.of(tp0, tp1, tp2, tp3);
+ assignFromUser(partitions, numNodes);
+
+ // Get all the nodes serving as the leader for these partitions, and
then split them into the separate
+ // nodes and set of partitions to make things easier to keep track of
later.
+ List<Node> nodes = nodesForPartitionLeaders(partitions);
+ Node node0 = nodes.get(0);
+ Node node1 = nodes.get(1);
+ List<TopicPartition> node0Partitions = partitionsForNode(node0,
partitions);
+ List<TopicPartition> node1Partitions = partitionsForNode(node1,
partitions);
+
+ // Seek each partition so that it becomes eligible to fetch.
+ partitions.forEach(tp -> subscriptions.seek(tp, 0));
+
+ // sendFetches() call #1 should issue requests to node 0 or node 1
since neither has buffered data.
+ assertEquals(2, sendFetches());
+ prepareFetchResponses(node0, node0Partitions, 0);
+ prepareFetchResponses(node1, node1Partitions, 0);
+ networkClientDelegate.poll(time.timer(0));
+
+ // Grab both partitions for node 0. The first partition will be
collected so that it doesn't have anything
+ // in the fetch buffer. The second node will be left in the buffer,
but will clear out its position's leader
+ // node.
+ TopicPartition node0Partition1 = node0Partitions.remove(0);
+ TopicPartition node0Partition2 = node0Partitions.remove(0);
+
+ assertEquals(4, fetcher.fetchBuffer.bufferedPartitions().size());
+ collectSelectedPartition(node0Partition1, partitions);
+ assertEquals(3, fetcher.fetchBuffer.bufferedPartitions().size());
+
+ // Overwrite the position with an empty leader to trigger the test
case.
+ subscriptions.position(node0Partition2, null);
+
+ // Both the collected partition and the position without a partition
leader should have a retrievable position.
+ // Confirm that position() doesn't throw an exception and that the
leader for the second partition is missing.
+ assertNotEquals(Optional.empty(),
subscriptions.position(node0Partition1).currentLeader.leader);
+ assertNull(subscriptions.position(node0Partition2));
+
+ // sendFetches() call #2 will now fail to send any requests as we have
an invalid position in the assignment.
+ // The Consumer.poll() API will throw an IllegalStateException to the
user.
+ Future<Void> future = fetcher.createFetchRequests();
+ assertEquals(0, sendFetches());
+ assertFutureThrows(future, IllegalStateException.class);
Review Comment:
Thanks for the explanation. Sounds good.
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java:
##########
@@ -3437,21 +3443,330 @@ public void testPollWithRedundantCreateFetchRequests()
{
}
- private OffsetsForLeaderEpochResponse prepareOffsetsForLeaderEpochResponse(
- TopicPartition topicPartition,
- Errors error,
- int leaderEpoch,
- long endOffset
- ) {
- OffsetForLeaderEpochResponseData data = new
OffsetForLeaderEpochResponseData();
- data.topics().add(new OffsetForLeaderTopicResult()
- .setTopic(topicPartition.topic())
- .setPartitions(Collections.singletonList(new EpochEndOffset()
- .setPartition(topicPartition.partition())
- .setErrorCode(error.code())
- .setLeaderEpoch(leaderEpoch)
- .setEndOffset(endOffset))));
- return new OffsetsForLeaderEpochResponse(data);
+ /**
+ * This test makes several calls to {@link #sendFetches()}, and after
each, the buffered partitions are
+ * modified to either cause (or prevent) a fetch from being requested.
+ */
+ @Test
+ public void testFetchRequestWithBufferedPartitions() {
+ buildFetcher();
+
+ // The test requires that there are multiple nodes as the fetch
request logic is based in part off of the
+ // partition-to-node relationship.
+ int numNodes = 2;
+ Set<TopicPartition> partitions = Set.of(tp0, tp1, tp2, tp3);
+ assignFromUser(partitions, numNodes);
+
+ // Get all the nodes serving as the leader for these partitions, and
then split them into the separate
+ // nodes and set of partitions to make things easier to keep track of
later.
+ List<Node> nodes = nodesForPartitionLeaders(partitions);
+ Node node0 = nodes.get(0);
+ Node node1 = nodes.get(1);
+ List<TopicPartition> node0Partitions = partitionsForNode(node0,
partitions);
+ List<TopicPartition> node1Partitions = partitionsForNode(node1,
partitions);
+
+ // Seek each partition so that it becomes eligible to fetch.
+ partitions.forEach(tp -> subscriptions.seek(tp, 0));
+
+ // sendFetches() call #1 should issue requests to node 0 or node 1
since neither has buffered data.
+ assertEquals(2, sendFetches());
+ prepareFetchResponses(node0, node0Partitions, 0);
+ prepareFetchResponses(node1, node1Partitions, 0);
+ networkClientDelegate.poll(time.timer(0));
+ assertEquals(4, fetcher.fetchBuffer.bufferedPartitions().size());
+ collectSelectedPartition(node0Partitions.remove(0), partitions);
+ assertEquals(3, fetcher.fetchBuffer.bufferedPartitions().size());
+
+ // sendFetches() call #2 shouldn't issue requests to either node 0 or
node 1 since they both have buffered data.
+ assertEquals(0, sendFetches());
+ networkClientDelegate.poll(time.timer(0));
+ collectSelectedPartition(node1Partitions.remove(0), partitions);
+ assertEquals(2, fetcher.fetchBuffer.bufferedPartitions().size());
+
+ // sendFetches() call #3 shouldn't issue requests to either node 0 or
node 1 since they both have buffered data.
+ assertEquals(0, sendFetches());
+ networkClientDelegate.poll(time.timer(0));
+ collectSelectedPartition(node0Partitions.remove(0), partitions);
+ assertEquals(1, fetcher.fetchBuffer.bufferedPartitions().size());
+
+ // Node 0's partitions have all been collected, so validate that and
then reset the list of partitions
+ // from which to fetch data so the next pass should request can fetch
more data.
+ assertTrue(node0Partitions.isEmpty());
+ node0Partitions = partitionsForNode(node0, partitions);
+
+ // sendFetches() call #4 should issue a request to node 0 since its
buffered data was collected.
+ assertEquals(1, sendFetches());
+ prepareFetchResponses(node0, node0Partitions, 10);
+ networkClientDelegate.poll(time.timer(0));
+ collectSelectedPartition(node1Partitions.remove(0), partitions);
+ assertEquals(2, fetcher.fetchBuffer.bufferedPartitions().size());
+
+ // Node 1's partitions have likewise all been collected, so validate
that and reset.
+ assertTrue(node1Partitions.isEmpty());
+ node1Partitions = partitionsForNode(node1, partitions);
+
+ // sendFetches() call #5 should issue a request to node 1 since its
buffered data was collected.
+ assertEquals(1, sendFetches());
+ prepareFetchResponses(node1, node1Partitions, 10);
+ networkClientDelegate.poll(time.timer(0));
+ assertEquals(4, fetcher.fetchBuffer.bufferedPartitions().size());
+
+ // Collect all the records and make sure they include all the
partitions, and validate that there is no data
+ // remaining in the fetch buffer.
+ assertEquals(partitions, fetchRecords().keySet());
+ assertEquals(0, fetcher.fetchBuffer.bufferedPartitions().size());
+
+ // sendFetches() call #6 should issue a request to nodes 0 and 1 since
its buffered data was collected.
+ assertEquals(2, sendFetches());
+ prepareFetchResponses(node0, node0Partitions, 20);
+ prepareFetchResponses(node1, node1Partitions, 20);
+ networkClientDelegate.poll(time.timer(0));
+ assertEquals(4, fetcher.fetchBuffer.bufferedPartitions().size());
+
+ // Just for completeness, collect all the records and make sure they
include all the partitions, and validate
+ // that there is no data remaining in the fetch buffer.
+ assertEquals(partitions, fetchRecords().keySet());
+ assertEquals(0, fetcher.fetchBuffer.bufferedPartitions().size());
+ }
+
+ @Test
+ public void testFetchRequestWithBufferedPartitionNotAssigned() {
+ buildFetcher();
+
+ // The test requires that there are multiple nodes as the fetch
request logic is based in part off of the
+ // partition-to-node relationship.
+ int numNodes = 2;
+ Set<TopicPartition> partitions = Set.of(tp0, tp1, tp2, tp3);
+ assignFromUser(partitions, numNodes);
+
+ // Get all the nodes serving as the leader for these partitions, and
then split them into the separate
+ // nodes and set of partitions to make things easier to keep track of
later.
+ List<Node> nodes = nodesForPartitionLeaders(partitions);
+ Node node0 = nodes.get(0);
+ Node node1 = nodes.get(1);
+ List<TopicPartition> node0Partitions = partitionsForNode(node0,
partitions);
+ List<TopicPartition> node1Partitions = partitionsForNode(node1,
partitions);
+
+ // Seek each partition so that it becomes eligible to fetch.
+ partitions.forEach(tp -> subscriptions.seek(tp, 0));
+
+ // sendFetches() call #1 should issue requests to node 0 or node 1
since neither has buffered data.
+ assertEquals(2, sendFetches());
+ prepareFetchResponses(node0, node0Partitions, 0);
+ prepareFetchResponses(node1, node1Partitions, 0);
+ networkClientDelegate.poll(time.timer(0));
+
+ // Grab both partitions for node 0. The first partition will be
collected so that it doesn't have anything
+ // in the fetch buffer. The second node will be left in the buffer,
but will be unassigned in a bit.
+ TopicPartition node0Partition1 = node0Partitions.remove(0);
+ TopicPartition node0Partition2 = node0Partitions.remove(0);
+
+ assertEquals(4, fetcher.fetchBuffer.bufferedPartitions().size());
+ collectSelectedPartition(node0Partition1, partitions);
+ assertEquals(3, fetcher.fetchBuffer.bufferedPartitions().size());
+
+ // Change the set of assigned partitions to exclude the remaining
buffered partition for node 0, which means
+ // that partition is unassigned.
+ Set<TopicPartition> topicsWithoutUnassignedPartition = new
HashSet<>(partitions);
+ topicsWithoutUnassignedPartition.remove(node0Partition2);
+ subscriptions.assignFromUser(topicsWithoutUnassignedPartition);
+
+ // The collected partition should have a retrievable position, but the
unassigned position should throw
+ // an error when attempting to retrieve its position.
+ assertDoesNotThrow(() -> subscriptions.position(node0Partition1));
+ assertThrows(IllegalStateException.class, () ->
subscriptions.position(node0Partition2));
+
+ // sendFetches() call #2 should issue a request to node 0 because the
first partition in node 0 was collected
+ // (and its buffer removed) and the second partition for node 0 was
unassigned. As a result, there are now no
+ // *assigned* partitions for node 0 that are buffered.
+ assertEquals(1, sendFetches());
+ }
+
+ @Test
+ public void testFetchRequestWithBufferedPartitionMissingLeader() {
+ buildFetcher();
+
+ // The test requires that there are multiple nodes as the fetch
request logic is based in part off of the
+ // partition-to-node relationship.
+ int numNodes = 2;
+ Set<TopicPartition> partitions = Set.of(tp0, tp1, tp2, tp3);
+ assignFromUser(partitions, numNodes);
+
+ // Get all the nodes serving as the leader for these partitions, and
then pull out the separate
+ // nodes and set of partitions to make things easier to keep track of
later.
+ List<Node> nodes = nodesForPartitionLeaders(partitions);
+ Node node0 = nodes.get(0);
+ Node node1 = nodes.get(1);
+ List<TopicPartition> node0Partitions = partitionsForNode(node0,
partitions);
+ List<TopicPartition> node1Partitions = partitionsForNode(node1,
partitions);
+
+ // Seek each partition so that it becomes eligible to fetch.
+ partitions.forEach(tp -> subscriptions.seek(tp, 0));
+
+ // sendFetches() call #1 should issue requests to node 0 and node 1
since neither has buffered data.
+ assertEquals(2, sendFetches());
+ prepareFetchResponses(node0, node0Partitions, 0);
+ prepareFetchResponses(node1, node1Partitions, 0);
+ networkClientDelegate.poll(time.timer(0));
+
+ TopicPartition node0Partition1 = node0Partitions.remove(0);
+ TopicPartition node0Partition2 = node0Partitions.remove(0);
+
+ // Per the fetch response, data for both of node 0's partitions are in
the fetch buffer.
+
assertTrue(fetcher.fetchBuffer.bufferedPartitions().contains(node0Partition1));
+
assertTrue(fetcher.fetchBuffer.bufferedPartitions().contains(node0Partition2));
+
+ // Collect node 0's first partition (node0Partition1) which will
remove it from the fetch buffer.
+ collectSelectedPartition(node0Partition1, partitions);
+
+ // Node 0's first partition (node0Partition1) was collected so it's
not in the fetch buffer, but node 0's
+ // second partition (node0Partition2) remains in the fetch buffer.
+
assertFalse(fetcher.fetchBuffer.bufferedPartitions().contains(node0Partition1));
+
assertTrue(fetcher.fetchBuffer.bufferedPartitions().contains(node0Partition2));
+
+ // Overwrite node0Partition2's position with an empty leader, but
verify that it is still buffered. Having
+ // a leaderless, buffered partition is key to triggering the test case.
+ subscriptions.position(node0Partition2, new
SubscriptionState.FetchPosition(
+ 0,
+ Optional.empty(),
+ Metadata.LeaderAndEpoch.noLeaderOrEpoch()
+ ));
+
assertTrue(fetcher.fetchBuffer.bufferedPartitions().contains(node0Partition2));
+
+ // Validate the state of the collected partition (node0Partition1) and
leaderless partition (node0Partition2)
+ // before sending the fetch requests.
+ Optional<Node> node0Partition1Leader =
subscriptions.position(node0Partition1).currentLeader.leader;
+ Optional<Node> node0Partition2Leader =
subscriptions.position(node0Partition2).currentLeader.leader;
+ assertTrue(node0Partition1Leader.isPresent());
+ assertEquals(node0, node0Partition1Leader.get());
+ assertFalse(node0Partition2Leader.isPresent());
+
+ // sendFetches() call #2 should issue a fetch request to node 0
because it has no buffered partitions:
+ //
+ // - node0Partition1 was collected and thus not in the fetch buffer
+ // - node0Partition2, while still in the fetch buffer, is leaderless
+ //
+ // As a result, there are now effectively no buffered partitions for
which node 0 is the leader.
+ assertEquals(1, sendFetches());
+ }
+
+ @Test
+ public void testFetchRequestWithBufferedPartitionMissingPosition() {
+ buildFetcher();
+
+ // The test requires that there are multiple nodes as the fetch
request logic is based in part off of the
+ // partition-to-node relationship.
+ int numNodes = 2;
+ Set<TopicPartition> partitions = Set.of(tp0, tp1, tp2, tp3);
+ assignFromUser(partitions, numNodes);
+
+ // Get all the nodes serving as the leader for these partitions, and
then split them into the separate
+ // nodes and set of partitions to make things easier to keep track of
later.
+ List<Node> nodes = nodesForPartitionLeaders(partitions);
+ Node node0 = nodes.get(0);
+ Node node1 = nodes.get(1);
+ List<TopicPartition> node0Partitions = partitionsForNode(node0,
partitions);
+ List<TopicPartition> node1Partitions = partitionsForNode(node1,
partitions);
+
+ // Seek each partition so that it becomes eligible to fetch.
+ partitions.forEach(tp -> subscriptions.seek(tp, 0));
+
+ // sendFetches() call #1 should issue requests to node 0 or node 1
since neither has buffered data.
+ assertEquals(2, sendFetches());
+ prepareFetchResponses(node0, node0Partitions, 0);
+ prepareFetchResponses(node1, node1Partitions, 0);
+ networkClientDelegate.poll(time.timer(0));
+
+ // Grab both partitions for node 0. The first partition will be
collected so that it doesn't have anything
+ // in the fetch buffer. The second node will be left in the buffer,
but will clear out its position's leader
+ // node.
+ TopicPartition node0Partition1 = node0Partitions.remove(0);
+ TopicPartition node0Partition2 = node0Partitions.remove(0);
+
+ assertEquals(4, fetcher.fetchBuffer.bufferedPartitions().size());
+ collectSelectedPartition(node0Partition1, partitions);
+ assertEquals(3, fetcher.fetchBuffer.bufferedPartitions().size());
+
+ // Overwrite the position with an empty leader to trigger the test
case.
+ subscriptions.position(node0Partition2, null);
+
+ // Both the collected partition and the position without a partition
leader should have a retrievable position.
Review Comment:
This still doesn't read well. Also, the second partition's position is not
available.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]