This is an automated email from the ASF dual-hosted git repository. wcarlson pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new a89b6489933 KAFKA-15970: Copy over KIP-951 tests in FetcherTest.java to FetchRequestManagerTest.java (#14916) a89b6489933 is described below commit a89b6489933fc0d492bc7f1e2a8584c4727709a0 Author: Mayank Shekhar Narula <42991652+msn-t...@users.noreply.github.com> AuthorDate: Wed Dec 6 14:52:36 2023 +0000 KAFKA-15970: Copy over KIP-951 tests in FetcherTest.java to FetchRequestManagerTest.java (#14916) Copying over tests added to FetcherTest.java in PR to FetchRequestManagerTest.java, so KIP-951 changes have coverage with new consumer being worked as described here https://cwiki.apache.org/confluence/display/KAFKA/Consumer+threading+refactor+project+overview Reviewers: Kirk True <kt...@confluent.io>, Walker Carlson <wcarl...@apache.org>, Andrew Schofield <aschofi...@confluent.io> --- .../internals/FetchRequestManagerTest.java | 208 ++++++++++++++++++++- .../clients/consumer/internals/FetcherTest.java | 79 ++++---- 2 files changed, 246 insertions(+), 41 deletions(-) diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java index e98ec1a3aad..e080f63c1a8 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java @@ -94,6 +94,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.EnumSource; import org.junit.jupiter.params.provider.MethodSource; import org.mockito.ArgumentCaptor; import org.slf4j.Logger; @@ -3126,6 +3127,205 @@ public class FetchRequestManagerTest { assertThrows(KafkaException.class, this::fetchRecords); } + + /** + * Test the scenario that FetchResponse returns with an error indicating leadership change for the partition, but it + * does not contain new leader info(defined in KIP-951). + */ + @ParameterizedTest + @EnumSource(value = Errors.class, names = {"FENCED_LEADER_EPOCH", "NOT_LEADER_OR_FOLLOWER"}) + public void testWhenFetchResponseReturnsALeaderShipChangeErrorButNoNewLeaderInformation(Errors error) { + // The test runs with 2 partitions where 1 partition is fetched without errors, and + // 2nd partition faces errors due to leadership changes. + buildFetcher(new MetricConfig(), OffsetResetStrategy.EARLIEST, new BytesDeserializer(), + new BytesDeserializer(), + Integer.MAX_VALUE, IsolationLevel.READ_UNCOMMITTED, + Duration.ofMinutes(5).toMillis()); + + // Setup so that tp0 & tp1 are subscribed and will be fetched from. + // Also, setup client's metadata for tp0 & tp1. + subscriptions.assignFromUser(new HashSet<>(Arrays.asList(tp0, tp1))); + client.updateMetadata( + RequestTestUtils.metadataUpdateWithIds(2, singletonMap(topicName, 4), + tp -> validLeaderEpoch, topicIds, false)); + Node tp0Leader = metadata.fetch().leaderFor(tp0); + Node tp1Leader = metadata.fetch().leaderFor(tp1); + Node nodeId0 = metadata.fetch().nodeById(0); + Cluster startingClusterMetadata = metadata.fetch(); + subscriptions.seek(tp0, 0); + subscriptions.seek(tp1, 0); + + // Setup preferred read replica to node=1 by doing a fetch for both partitions. + assertEquals(2, sendFetches()); + assertFalse(fetcher.hasCompletedFetches()); + client.prepareResponseFrom(fullFetchResponse(tidp0, this.records, Errors.NONE, 100L, + FetchResponse.INVALID_LAST_STABLE_OFFSET, 0, Optional.of(nodeId0.id())), tp0Leader); + client.prepareResponseFrom(fullFetchResponse(tidp1, this.records, Errors.NONE, 100L, + FetchResponse.INVALID_LAST_STABLE_OFFSET, 0, Optional.of(nodeId0.id())), tp1Leader); + networkClientDelegate.poll(time.timer(0)); + assertTrue(fetcher.hasCompletedFetches()); + Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecords = fetchRecords(); + assertTrue(partitionRecords.containsKey(tp0)); + assertTrue(partitionRecords.containsKey(tp1)); + // Validate setup of preferred read replica for tp0 & tp1 is done correctly. + Node selected = fetcher.selectReadReplica(tp0, Node.noNode(), time.milliseconds()); + assertEquals(nodeId0.id(), selected.id()); + selected = fetcher.selectReadReplica(tp1, Node.noNode(), time.milliseconds()); + assertEquals(nodeId0.id(), selected.id()); + + // Send next fetch request. + assertEquals(1, sendFetches()); + assertFalse(fetcher.hasCompletedFetches()); + // Verify that metadata-update isn't requested as metadata is considered upto-date. + assertFalse(metadata.updateRequested()); + + // TEST that next fetch returns an error(due to leadership change) but new leader info is not returned + // in the FetchResponse. This is the behaviour prior to KIP-951, should keep on working. + LinkedHashMap<TopicIdPartition, FetchResponseData.PartitionData> partitions = new LinkedHashMap<>(); + partitions.put(tidp0, + new FetchResponseData.PartitionData() + .setPartitionIndex(tidp0.topicPartition().partition()) + .setErrorCode(error.code())); + partitions.put(tidp1, + new FetchResponseData.PartitionData() + .setPartitionIndex(tidp1.topicPartition().partition()) + .setErrorCode(Errors.NONE.code()) + .setHighWatermark(100L) + .setLastStableOffset(FetchResponse.INVALID_LAST_STABLE_OFFSET) + .setLogStartOffset(0) + .setRecords(nextRecords)); + client.prepareResponseFrom(FetchResponse.of(Errors.NONE, 0, INVALID_SESSION_ID, partitions), nodeId0); + networkClientDelegate.poll(time.timer(0)); + partitionRecords = fetchRecords(); + assertFalse(partitionRecords.containsKey(tp0)); + assertTrue(partitionRecords.containsKey(tp1)); + + // Validate metadata is unchanged, as FetchResponse didn't have new leader information. + assertEquals(startingClusterMetadata, metadata.fetch()); + + // Validate metadata-update is requested due to the leadership-error on tp0. + assertTrue(metadata.updateRequested()); + + // Validate preferred-read-replica is cleared for tp0 due to the error. + assertEquals(Optional.empty(), + subscriptions.preferredReadReplica(tp0, time.milliseconds())); + // Validate preferred-read-replica is still set for tp1 as previous fetch for it was ok. + assertEquals(Optional.of(nodeId0.id()), + subscriptions.preferredReadReplica(tp1, time.milliseconds())); + + // Validate subscription is still valid & fetch-able for both tp0 & tp1. And tp0 points to original leader. + assertTrue(subscriptions.isFetchable(tp0)); + Metadata.LeaderAndEpoch currentLeader = subscriptions.position(tp0).currentLeader; + assertEquals(tp0Leader.id(), currentLeader.leader.get().id()); + assertEquals(validLeaderEpoch, currentLeader.epoch.get()); + assertTrue(subscriptions.isFetchable(tp1)); + } + + /** + * Test the scenario that FetchResponse returns with an error indicating leadership change for the partition, along with + * new leader info(defined in KIP-951). + */ + @ParameterizedTest + @EnumSource(value = Errors.class, names = {"FENCED_LEADER_EPOCH", "NOT_LEADER_OR_FOLLOWER"}) + public void testWhenFetchResponseReturnsALeaderShipChangeErrorAndNewLeaderInformation(Errors error) { + // The test runs with 2 partitions where 1 partition is fetched without errors, and + // 2nd partition faces errors due to leadership changes. + buildFetcher(new MetricConfig(), OffsetResetStrategy.EARLIEST, new BytesDeserializer(), + new BytesDeserializer(), + Integer.MAX_VALUE, IsolationLevel.READ_UNCOMMITTED, + Duration.ofMinutes(5).toMillis()); + + // Setup so that tp0 & tp1 are subscribed and will be fetched from. + // Also, setup client's metadata for tp0 & tp1. + subscriptions.assignFromUser(new HashSet<>(Arrays.asList(tp0, tp1))); + client.updateMetadata( + RequestTestUtils.metadataUpdateWithIds(2, singletonMap(topicName, 4), + tp -> validLeaderEpoch, topicIds, false)); + Node tp0Leader = metadata.fetch().leaderFor(tp0); + Node tp1Leader = metadata.fetch().leaderFor(tp1); + Node nodeId0 = metadata.fetch().nodeById(0); + Cluster startingClusterMetadata = metadata.fetch(); + subscriptions.seek(tp0, 0); + subscriptions.seek(tp1, 0); + + // Setup preferred read replica to node=1 by doing a fetch for both partitions. + assertEquals(2, sendFetches()); + assertFalse(fetcher.hasCompletedFetches()); + client.prepareResponseFrom(fullFetchResponse(tidp0, this.records, Errors.NONE, 100L, + FetchResponse.INVALID_LAST_STABLE_OFFSET, 0, Optional.of(nodeId0.id())), tp0Leader); + client.prepareResponseFrom(fullFetchResponse(tidp1, this.records, Errors.NONE, 100L, + FetchResponse.INVALID_LAST_STABLE_OFFSET, 0, Optional.of(nodeId0.id())), tp1Leader); + networkClientDelegate.poll(time.timer(0)); + assertTrue(fetcher.hasCompletedFetches()); + Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecords = fetchRecords(); + assertTrue(partitionRecords.containsKey(tp0)); + assertTrue(partitionRecords.containsKey(tp1)); + // Validate setup of preferred read replica for tp0 & tp1 is done correctly. + Node selected = fetcher.selectReadReplica(tp0, Node.noNode(), time.milliseconds()); + assertEquals(nodeId0.id(), selected.id()); + selected = fetcher.selectReadReplica(tp1, Node.noNode(), time.milliseconds()); + assertEquals(nodeId0.id(), selected.id()); + + // Send next fetch request. + assertEquals(1, sendFetches()); + assertFalse(fetcher.hasCompletedFetches()); + // Validate metadata-update isn't requested as no errors seen yet. + assertFalse(metadata.updateRequested()); + + // Test that next fetch returns an error(due to leadership change) and new leader info is returned, as introduced + // in KIP-951. The new leader is a new node, id = 999. For tp1 fetch returns with no error. + LinkedHashMap<TopicIdPartition, FetchResponseData.PartitionData> partitions = new LinkedHashMap<>(); + Node newNode = new Node(999, "newnode", 999, "newrack"); + FetchResponseData.PartitionData tp0Data = new FetchResponseData.PartitionData() + .setPartitionIndex(tidp0.topicPartition().partition()) + .setErrorCode(error.code()); + tp0Data.currentLeader().setLeaderId(newNode.id()); + int tp0NewLeaderEpoch = validLeaderEpoch + 100; + tp0Data.currentLeader().setLeaderEpoch(tp0NewLeaderEpoch); + partitions.put(tidp0, tp0Data); + partitions.put(tidp1, + new FetchResponseData.PartitionData() + .setPartitionIndex(tidp1.topicPartition().partition()) + .setErrorCode(Errors.NONE.code()) + .setHighWatermark(100L) + .setLastStableOffset(FetchResponse.INVALID_LAST_STABLE_OFFSET) + .setLogStartOffset(0) + .setRecords(nextRecords)); + client.prepareResponseFrom(FetchResponse.of(Errors.NONE, 0, INVALID_SESSION_ID, partitions, singletonList(newNode)), nodeId0); + networkClientDelegate.poll(time.timer(0)); + partitionRecords = fetchRecords(); + assertFalse(partitionRecords.containsKey(tp0)); + assertTrue(partitionRecords.containsKey(tp1)); + + // Validate metadata is changed, as previous FetchResponse had new leader info for tp0. + assertNotEquals(startingClusterMetadata, metadata.fetch()); + // Validate new-node(id=999) is part of the metadata + assertEquals(newNode, metadata.fetch().nodeById(999)); + // Validate metadata returns the new leader info for tp0. + Metadata.LeaderAndEpoch currentLeaderTp0 = metadata.currentLeader(tp0); + assertEquals(Optional.of(newNode), currentLeaderTp0.leader); + assertEquals(Optional.of(tp0NewLeaderEpoch), currentLeaderTp0.epoch); + + // Validate metadata-update is requested due to the leadership-error for tp0. + assertTrue(metadata.updateRequested()); + + // Validate preferred-read-replica is cleared for tp0 due to the error. + assertEquals(Optional.empty(), + subscriptions.preferredReadReplica(tp0, time.milliseconds())); + // Validate preferred-read-replica is still set for tp1 as previous fetch is ok. + assertEquals(Optional.of(nodeId0.id()), + subscriptions.preferredReadReplica(tp1, time.milliseconds())); + + // Validate subscription is valid & fetch-able, and points to the new leader. + assertTrue(subscriptions.isFetchable(tp0)); + Metadata.LeaderAndEpoch currentLeader = subscriptions.position(tp0).currentLeader; + assertEquals(newNode.id(), currentLeader.leader.get().id()); + assertEquals(tp0NewLeaderEpoch, currentLeader.epoch.get()); + + // Validate subscription is still valid & fetch-able for tp1. + assertTrue(subscriptions.isFetchable(tp1)); + } + private OffsetsForLeaderEpochResponse prepareOffsetsForLeaderEpochResponse( TopicPartition topicPartition, Errors error, @@ -3328,7 +3528,8 @@ public class FetchRequestManagerTest { new FetchBuffer(logContext), metricsManager, networkClientDelegate, - fetchCollector)); + fetchCollector, + apiVersions)); ConsumerNetworkClient consumerNetworkClient = new ConsumerNetworkClient( logContext, client, @@ -3386,8 +3587,9 @@ public class FetchRequestManagerTest { FetchBuffer fetchBuffer, FetchMetricsManager metricsManager, NetworkClientDelegate networkClientDelegate, - FetchCollector<K, V> fetchCollector) { - super(logContext, time, metadata, subscriptions, fetchConfig, fetchBuffer, metricsManager, networkClientDelegate, null); + FetchCollector<K, V> fetchCollector, + ApiVersions apiVersions) { + super(logContext, time, metadata, subscriptions, fetchConfig, fetchBuffer, metricsManager, networkClientDelegate, apiVersions); this.fetchCollector = fetchCollector; } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java index e6f5689a001..44d21f7c8d1 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java @@ -144,6 +144,11 @@ import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +/** + * If you are adding a test here, do evaluate if a similar test needs to be added in + * FetchRequestManagerTest.java, which captures the tests for the new consumer as part of the + * https://cwiki.apache.org/confluence/display/KAFKA/Consumer+threading+refactor+project+overview + */ public class FetcherTest { private static final double EPSILON = 0.0001; @@ -3466,16 +3471,16 @@ public class FetcherTest { */ @ParameterizedTest @EnumSource(value = Errors.class, names = {"FENCED_LEADER_EPOCH", "NOT_LEADER_OR_FOLLOWER"}) - public void testWhenFetchResponseReturnsWithALeaderShipChangeErrorButNoNewLeaderInformation(Errors error) { - // The test runs with 2 partitions where 1 partition is fetched without errors, and 2nd partitions faces errors due to leadership changes. - + public void testWhenFetchResponseReturnsALeaderShipChangeErrorButNoNewLeaderInformation(Errors error) { + // The test runs with 2 partitions where 1 partition is fetched without errors, and + // 2nd partition faces errors due to leadership changes. buildFetcher(new MetricConfig(), OffsetResetStrategy.EARLIEST, new BytesDeserializer(), new BytesDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_UNCOMMITTED, Duration.ofMinutes(5).toMillis()); - // Setup so that fetcher is subscribed to tp0 & tp1, and metadata setup with tp0. - // assignFromUser(singleton(tp0)); + // Setup so that tp0 & tp1 are subscribed and will be fetched from. + // Also, setup client's metadata for tp0 & tp1. subscriptions.assignFromUser(new HashSet<>(Arrays.asList(tp0, tp1))); client.updateMetadata( RequestTestUtils.metadataUpdateWithIds(2, singletonMap(topicName, 4), @@ -3496,53 +3501,52 @@ public class FetcherTest { FetchResponse.INVALID_LAST_STABLE_OFFSET, 0, Optional.of(nodeId0.id())), tp1Leader); consumerClient.poll(time.timer(0)); assertTrue(fetcher.hasCompletedFetches()); - Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecords = fetchRecords(); assertTrue(partitionRecords.containsKey(tp0)); assertTrue(partitionRecords.containsKey(tp1)); - - // Verify that preferred read replica is set for both tp0 & tp1. + // Validate setup of preferred read replica for tp0 & tp1 is done correctly. Node selected = fetcher.selectReadReplica(tp0, Node.noNode(), time.milliseconds()); assertEquals(nodeId0.id(), selected.id()); selected = fetcher.selectReadReplica(tp1, Node.noNode(), time.milliseconds()); assertEquals(nodeId0.id(), selected.id()); - // Next fetch returns an error(due to leadership change) but new leader info is missing, for tp0. - // For tp1 fetch returns with no error. + // Send next fetch request. assertEquals(1, sendFetches()); assertFalse(fetcher.hasCompletedFetches()); - // Verify that metadata-update isn't requested + // Verify that metadata-update isn't requested as metadata is considered upto-date. assertFalse(metadata.updateRequested()); + // TEST that next fetch returns an error(due to leadership change) but new leader info is not returned + // in the FetchResponse. This is the behaviour prior to KIP-951, should keep on working. LinkedHashMap<TopicIdPartition, FetchResponseData.PartitionData> partitions = new LinkedHashMap<>(); partitions.put(tidp0, - new FetchResponseData.PartitionData() - .setPartitionIndex(tidp0.topicPartition().partition()) - .setErrorCode(error.code())); + new FetchResponseData.PartitionData() + .setPartitionIndex(tidp0.topicPartition().partition()) + .setErrorCode(error.code())); partitions.put(tidp1, - new FetchResponseData.PartitionData() - .setPartitionIndex(tidp1.topicPartition().partition()) - .setErrorCode(Errors.NONE.code()) - .setHighWatermark(100L) - .setLastStableOffset(FetchResponse.INVALID_LAST_STABLE_OFFSET) - .setLogStartOffset(0) - .setRecords(nextRecords)); + new FetchResponseData.PartitionData() + .setPartitionIndex(tidp1.topicPartition().partition()) + .setErrorCode(Errors.NONE.code()) + .setHighWatermark(100L) + .setLastStableOffset(FetchResponse.INVALID_LAST_STABLE_OFFSET) + .setLogStartOffset(0) + .setRecords(nextRecords)); client.prepareResponseFrom(FetchResponse.of(Errors.NONE, 0, INVALID_SESSION_ID, partitions), nodeId0); consumerClient.poll(time.timer(0)); partitionRecords = fetchRecords(); assertFalse(partitionRecords.containsKey(tp0)); assertTrue(partitionRecords.containsKey(tp1)); - // Validate metadata is unchanged, as response has missing new leader info. + // Validate metadata is unchanged, as FetchResponse didn't have new leader information. assertEquals(startingClusterMetadata, metadata.fetch()); - // Validate metadata-update is requested due to the error for tp0. + // Validate metadata-update is requested due to the leadership-error on tp0. assertTrue(metadata.updateRequested()); // Validate preferred-read-replica is cleared for tp0 due to the error. assertEquals(Optional.empty(), subscriptions.preferredReadReplica(tp0, time.milliseconds())); - // Validate preferred-read-replica is still set for tp1 + // Validate preferred-read-replica is still set for tp1 as previous fetch for it was ok. assertEquals(Optional.of(nodeId0.id()), subscriptions.preferredReadReplica(tp1, time.milliseconds())); @@ -3560,16 +3564,16 @@ public class FetcherTest { */ @ParameterizedTest @EnumSource(value = Errors.class, names = {"FENCED_LEADER_EPOCH", "NOT_LEADER_OR_FOLLOWER"}) - public void testWhenFetchResponseReturnsWithALeaderShipChangeErrorAndNewLeaderInformation(Errors error) { - // The test runs with 2 partitions where 1 partition is fetched without errors, and 2nd partitions faces errors due to leadership changes. - + public void testWhenFetchResponseReturnsALeaderShipChangeErrorAndNewLeaderInformation(Errors error) { + // The test runs with 2 partitions where 1 partition is fetched without errors, and + // 2nd partition faces errors due to leadership changes. buildFetcher(new MetricConfig(), OffsetResetStrategy.EARLIEST, new BytesDeserializer(), new BytesDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_UNCOMMITTED, Duration.ofMinutes(5).toMillis()); - // Setup so that fetcher is subscribed to tp0 & tp1, and metadata setup with tp0. - // assignFromUser(singleton(tp0)); + // Setup so that tp0 & tp1 are subscribed and will be fetched from. + // Also, setup client's metadata for tp0 & tp1. subscriptions.assignFromUser(new HashSet<>(Arrays.asList(tp0, tp1))); client.updateMetadata( RequestTestUtils.metadataUpdateWithIds(2, singletonMap(topicName, 4), @@ -3590,24 +3594,23 @@ public class FetcherTest { FetchResponse.INVALID_LAST_STABLE_OFFSET, 0, Optional.of(nodeId0.id())), tp1Leader); consumerClient.poll(time.timer(0)); assertTrue(fetcher.hasCompletedFetches()); - Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecords = fetchRecords(); assertTrue(partitionRecords.containsKey(tp0)); assertTrue(partitionRecords.containsKey(tp1)); - - // Verify that preferred read replica is set for both tp0 & tp1. + // Validate setup of preferred read replica for tp0 & tp1 is done correctly. Node selected = fetcher.selectReadReplica(tp0, Node.noNode(), time.milliseconds()); assertEquals(nodeId0.id(), selected.id()); selected = fetcher.selectReadReplica(tp1, Node.noNode(), time.milliseconds()); assertEquals(nodeId0.id(), selected.id()); - // Next fetch returns an error(due to leadership change) and new leader info is returned. The new leader is a new node, id = 999. - // For tp1 fetch returns with no error. + // Send next fetch request. assertEquals(1, sendFetches()); assertFalse(fetcher.hasCompletedFetches()); - // Verify that metadata-update isn't requested + // Validate metadata-update isn't requested as no errors seen yet. assertFalse(metadata.updateRequested()); + // Test that next fetch returns an error(due to leadership change) and new leader info is returned, as introduced + // in KIP-951. The new leader is a new node, id = 999. For tp1 fetch returns with no error. LinkedHashMap<TopicIdPartition, FetchResponseData.PartitionData> partitions = new LinkedHashMap<>(); Node newNode = new Node(999, "newnode", 999, "newrack"); FetchResponseData.PartitionData tp0Data = new FetchResponseData.PartitionData() @@ -3640,13 +3643,13 @@ public class FetcherTest { assertEquals(Optional.of(newNode), currentLeaderTp0.leader); assertEquals(Optional.of(tp0NewLeaderEpoch), currentLeaderTp0.epoch); - // Validate metadata-update is requested due to the error for tp0. + // Validate metadata-update is requested due to the leadership-error for tp0. assertTrue(metadata.updateRequested()); - // Validate preferred-read-replica is cleared for tp0. + // Validate preferred-read-replica is cleared for tp0 due to the error. assertEquals(Optional.empty(), subscriptions.preferredReadReplica(tp0, time.milliseconds())); - // Validate preferred-read-replica is still set for tp1 + // Validate preferred-read-replica is still set for tp1 as previous fetch is ok. assertEquals(Optional.of(nodeId0.id()), subscriptions.preferredReadReplica(tp1, time.milliseconds()));