This is an automated email from the ASF dual-hosted git repository.

wcarlson pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a89b6489933 KAFKA-15970: Copy over KIP-951 tests in FetcherTest.java 
to FetchRequestManagerTest.java (#14916)
a89b6489933 is described below

commit a89b6489933fc0d492bc7f1e2a8584c4727709a0
Author: Mayank Shekhar Narula <42991652+msn-t...@users.noreply.github.com>
AuthorDate: Wed Dec 6 14:52:36 2023 +0000

    KAFKA-15970: Copy over KIP-951 tests in FetcherTest.java to 
FetchRequestManagerTest.java (#14916)
    
    Copying over tests added to FetcherTest.java in PR to 
FetchRequestManagerTest.java, so KIP-951 changes have coverage with new 
consumer being worked as described here
    
https://cwiki.apache.org/confluence/display/KAFKA/Consumer+threading+refactor+project+overview
    
    Reviewers: Kirk True <kt...@confluent.io>,  Walker Carlson 
<wcarl...@apache.org>, Andrew Schofield <aschofi...@confluent.io>
---
 .../internals/FetchRequestManagerTest.java         | 208 ++++++++++++++++++++-
 .../clients/consumer/internals/FetcherTest.java    |  79 ++++----
 2 files changed, 246 insertions(+), 41 deletions(-)

diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java
index e98ec1a3aad..e080f63c1a8 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java
@@ -94,6 +94,7 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.EnumSource;
 import org.junit.jupiter.params.provider.MethodSource;
 import org.mockito.ArgumentCaptor;
 import org.slf4j.Logger;
@@ -3126,6 +3127,205 @@ public class FetchRequestManagerTest {
         assertThrows(KafkaException.class, this::fetchRecords);
     }
 
+
+    /**
+     * Test the scenario that FetchResponse returns with an error indicating 
leadership change for the partition, but it
+     * does not contain new leader info(defined in KIP-951).
+     */
+    @ParameterizedTest
+    @EnumSource(value = Errors.class, names = {"FENCED_LEADER_EPOCH", 
"NOT_LEADER_OR_FOLLOWER"})
+    public void 
testWhenFetchResponseReturnsALeaderShipChangeErrorButNoNewLeaderInformation(Errors
 error) {
+        // The test runs with 2 partitions where 1 partition is fetched 
without errors, and
+        // 2nd partition faces errors due to leadership changes.
+        buildFetcher(new MetricConfig(), OffsetResetStrategy.EARLIEST, new 
BytesDeserializer(),
+            new BytesDeserializer(),
+            Integer.MAX_VALUE, IsolationLevel.READ_UNCOMMITTED,
+            Duration.ofMinutes(5).toMillis());
+
+        // Setup so that tp0 & tp1 are subscribed and will be fetched from.
+        // Also, setup client's metadata for tp0 & tp1.
+        subscriptions.assignFromUser(new HashSet<>(Arrays.asList(tp0, tp1)));
+        client.updateMetadata(
+            RequestTestUtils.metadataUpdateWithIds(2, singletonMap(topicName, 
4),
+                tp -> validLeaderEpoch, topicIds, false));
+        Node tp0Leader = metadata.fetch().leaderFor(tp0);
+        Node tp1Leader = metadata.fetch().leaderFor(tp1);
+        Node nodeId0 = metadata.fetch().nodeById(0);
+        Cluster startingClusterMetadata = metadata.fetch();
+        subscriptions.seek(tp0, 0);
+        subscriptions.seek(tp1, 0);
+
+        // Setup preferred read replica to node=1 by doing a fetch for both 
partitions.
+        assertEquals(2, sendFetches());
+        assertFalse(fetcher.hasCompletedFetches());
+        client.prepareResponseFrom(fullFetchResponse(tidp0, this.records, 
Errors.NONE, 100L,
+            FetchResponse.INVALID_LAST_STABLE_OFFSET, 0, 
Optional.of(nodeId0.id())), tp0Leader);
+        client.prepareResponseFrom(fullFetchResponse(tidp1, this.records, 
Errors.NONE, 100L,
+            FetchResponse.INVALID_LAST_STABLE_OFFSET, 0, 
Optional.of(nodeId0.id())), tp1Leader);
+        networkClientDelegate.poll(time.timer(0));
+        assertTrue(fetcher.hasCompletedFetches());
+        Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> 
partitionRecords = fetchRecords();
+        assertTrue(partitionRecords.containsKey(tp0));
+        assertTrue(partitionRecords.containsKey(tp1));
+        // Validate setup of preferred read replica for tp0 & tp1 is done 
correctly.
+        Node selected = fetcher.selectReadReplica(tp0, Node.noNode(), 
time.milliseconds());
+        assertEquals(nodeId0.id(), selected.id());
+        selected = fetcher.selectReadReplica(tp1, Node.noNode(), 
time.milliseconds());
+        assertEquals(nodeId0.id(), selected.id());
+
+        // Send next fetch request.
+        assertEquals(1, sendFetches());
+        assertFalse(fetcher.hasCompletedFetches());
+        // Verify that metadata-update isn't requested as metadata is 
considered upto-date.
+        assertFalse(metadata.updateRequested());
+
+        // TEST that next fetch returns an error(due to leadership change) but 
new leader info is not returned
+        // in the FetchResponse. This is the behaviour prior to KIP-951, 
should keep on working.
+        LinkedHashMap<TopicIdPartition, FetchResponseData.PartitionData> 
partitions = new LinkedHashMap<>();
+        partitions.put(tidp0,
+            new FetchResponseData.PartitionData()
+                .setPartitionIndex(tidp0.topicPartition().partition())
+                .setErrorCode(error.code()));
+        partitions.put(tidp1,
+            new FetchResponseData.PartitionData()
+                .setPartitionIndex(tidp1.topicPartition().partition())
+                .setErrorCode(Errors.NONE.code())
+                .setHighWatermark(100L)
+                .setLastStableOffset(FetchResponse.INVALID_LAST_STABLE_OFFSET)
+                .setLogStartOffset(0)
+                .setRecords(nextRecords));
+        client.prepareResponseFrom(FetchResponse.of(Errors.NONE, 0, 
INVALID_SESSION_ID, partitions), nodeId0);
+        networkClientDelegate.poll(time.timer(0));
+        partitionRecords = fetchRecords();
+        assertFalse(partitionRecords.containsKey(tp0));
+        assertTrue(partitionRecords.containsKey(tp1));
+
+        // Validate metadata is unchanged, as FetchResponse didn't have new 
leader information.
+        assertEquals(startingClusterMetadata, metadata.fetch());
+
+        // Validate metadata-update is requested due to the leadership-error 
on tp0.
+        assertTrue(metadata.updateRequested());
+
+        // Validate preferred-read-replica is cleared for tp0 due to the error.
+        assertEquals(Optional.empty(),
+            subscriptions.preferredReadReplica(tp0, time.milliseconds()));
+        // Validate preferred-read-replica is still set for tp1 as previous 
fetch for it was ok.
+        assertEquals(Optional.of(nodeId0.id()),
+            subscriptions.preferredReadReplica(tp1, time.milliseconds()));
+
+        // Validate subscription is still valid & fetch-able for both tp0 & 
tp1. And tp0 points to original leader.
+        assertTrue(subscriptions.isFetchable(tp0));
+        Metadata.LeaderAndEpoch currentLeader = 
subscriptions.position(tp0).currentLeader;
+        assertEquals(tp0Leader.id(), currentLeader.leader.get().id());
+        assertEquals(validLeaderEpoch, currentLeader.epoch.get());
+        assertTrue(subscriptions.isFetchable(tp1));
+    }
+
+    /**
+     * Test the scenario that FetchResponse returns with an error indicating 
leadership change for the partition, along with
+     * new leader info(defined in KIP-951).
+     */
+    @ParameterizedTest
+    @EnumSource(value = Errors.class, names = {"FENCED_LEADER_EPOCH", 
"NOT_LEADER_OR_FOLLOWER"})
+    public void 
testWhenFetchResponseReturnsALeaderShipChangeErrorAndNewLeaderInformation(Errors
 error) {
+        // The test runs with 2 partitions where 1 partition is fetched 
without errors, and
+        // 2nd partition faces errors due to leadership changes.
+        buildFetcher(new MetricConfig(), OffsetResetStrategy.EARLIEST, new 
BytesDeserializer(),
+            new BytesDeserializer(),
+            Integer.MAX_VALUE, IsolationLevel.READ_UNCOMMITTED,
+            Duration.ofMinutes(5).toMillis());
+
+        // Setup so that tp0 & tp1 are subscribed and will be fetched from.
+        // Also, setup client's metadata for tp0 & tp1.
+        subscriptions.assignFromUser(new HashSet<>(Arrays.asList(tp0, tp1)));
+        client.updateMetadata(
+            RequestTestUtils.metadataUpdateWithIds(2, singletonMap(topicName, 
4),
+                tp -> validLeaderEpoch, topicIds, false));
+        Node tp0Leader = metadata.fetch().leaderFor(tp0);
+        Node tp1Leader = metadata.fetch().leaderFor(tp1);
+        Node nodeId0 = metadata.fetch().nodeById(0);
+        Cluster startingClusterMetadata = metadata.fetch();
+        subscriptions.seek(tp0, 0);
+        subscriptions.seek(tp1, 0);
+
+        // Setup preferred read replica to node=1 by doing a fetch for both 
partitions.
+        assertEquals(2, sendFetches());
+        assertFalse(fetcher.hasCompletedFetches());
+        client.prepareResponseFrom(fullFetchResponse(tidp0, this.records, 
Errors.NONE, 100L,
+            FetchResponse.INVALID_LAST_STABLE_OFFSET, 0, 
Optional.of(nodeId0.id())), tp0Leader);
+        client.prepareResponseFrom(fullFetchResponse(tidp1, this.records, 
Errors.NONE, 100L,
+            FetchResponse.INVALID_LAST_STABLE_OFFSET, 0, 
Optional.of(nodeId0.id())), tp1Leader);
+        networkClientDelegate.poll(time.timer(0));
+        assertTrue(fetcher.hasCompletedFetches());
+        Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> 
partitionRecords = fetchRecords();
+        assertTrue(partitionRecords.containsKey(tp0));
+        assertTrue(partitionRecords.containsKey(tp1));
+        // Validate setup of preferred read replica for tp0 & tp1 is done 
correctly.
+        Node selected = fetcher.selectReadReplica(tp0, Node.noNode(), 
time.milliseconds());
+        assertEquals(nodeId0.id(), selected.id());
+        selected = fetcher.selectReadReplica(tp1, Node.noNode(), 
time.milliseconds());
+        assertEquals(nodeId0.id(), selected.id());
+
+        // Send next fetch request.
+        assertEquals(1, sendFetches());
+        assertFalse(fetcher.hasCompletedFetches());
+        // Validate metadata-update isn't requested as no errors seen yet.
+        assertFalse(metadata.updateRequested());
+
+        // Test that next fetch returns an error(due to leadership change) and 
new leader info is returned, as introduced
+        // in KIP-951. The new leader is a new node, id = 999. For tp1 fetch 
returns with no error.
+        LinkedHashMap<TopicIdPartition, FetchResponseData.PartitionData> 
partitions = new LinkedHashMap<>();
+        Node newNode = new Node(999, "newnode", 999, "newrack");
+        FetchResponseData.PartitionData tp0Data = new 
FetchResponseData.PartitionData()
+            .setPartitionIndex(tidp0.topicPartition().partition())
+            .setErrorCode(error.code());
+        tp0Data.currentLeader().setLeaderId(newNode.id());
+        int tp0NewLeaderEpoch = validLeaderEpoch + 100;
+        tp0Data.currentLeader().setLeaderEpoch(tp0NewLeaderEpoch);
+        partitions.put(tidp0, tp0Data);
+        partitions.put(tidp1,
+            new FetchResponseData.PartitionData()
+                .setPartitionIndex(tidp1.topicPartition().partition())
+                .setErrorCode(Errors.NONE.code())
+                .setHighWatermark(100L)
+                .setLastStableOffset(FetchResponse.INVALID_LAST_STABLE_OFFSET)
+                .setLogStartOffset(0)
+                .setRecords(nextRecords));
+        client.prepareResponseFrom(FetchResponse.of(Errors.NONE, 0, 
INVALID_SESSION_ID, partitions, singletonList(newNode)), nodeId0);
+        networkClientDelegate.poll(time.timer(0));
+        partitionRecords = fetchRecords();
+        assertFalse(partitionRecords.containsKey(tp0));
+        assertTrue(partitionRecords.containsKey(tp1));
+
+        // Validate metadata is changed, as previous FetchResponse had new 
leader info for tp0.
+        assertNotEquals(startingClusterMetadata, metadata.fetch());
+        // Validate new-node(id=999) is part of the metadata
+        assertEquals(newNode, metadata.fetch().nodeById(999));
+        // Validate metadata returns the new leader info for tp0.
+        Metadata.LeaderAndEpoch currentLeaderTp0 = metadata.currentLeader(tp0);
+        assertEquals(Optional.of(newNode), currentLeaderTp0.leader);
+        assertEquals(Optional.of(tp0NewLeaderEpoch), currentLeaderTp0.epoch);
+
+        // Validate metadata-update is requested due to the leadership-error 
for tp0.
+        assertTrue(metadata.updateRequested());
+
+        // Validate preferred-read-replica is cleared for tp0 due to the error.
+        assertEquals(Optional.empty(),
+            subscriptions.preferredReadReplica(tp0, time.milliseconds()));
+        // Validate preferred-read-replica is still set for tp1 as previous 
fetch is ok.
+        assertEquals(Optional.of(nodeId0.id()),
+            subscriptions.preferredReadReplica(tp1, time.milliseconds()));
+
+        // Validate subscription is valid & fetch-able, and points to the new 
leader.
+        assertTrue(subscriptions.isFetchable(tp0));
+        Metadata.LeaderAndEpoch currentLeader = 
subscriptions.position(tp0).currentLeader;
+        assertEquals(newNode.id(), currentLeader.leader.get().id());
+        assertEquals(tp0NewLeaderEpoch, currentLeader.epoch.get());
+
+        // Validate subscription is still valid & fetch-able for tp1.
+        assertTrue(subscriptions.isFetchable(tp1));
+    }
+
     private OffsetsForLeaderEpochResponse prepareOffsetsForLeaderEpochResponse(
             TopicPartition topicPartition,
             Errors error,
@@ -3328,7 +3528,8 @@ public class FetchRequestManagerTest {
                 new FetchBuffer(logContext),
                 metricsManager,
                 networkClientDelegate,
-                fetchCollector));
+                fetchCollector,
+                apiVersions));
         ConsumerNetworkClient consumerNetworkClient = new 
ConsumerNetworkClient(
                 logContext,
                 client,
@@ -3386,8 +3587,9 @@ public class FetchRequestManagerTest {
                                            FetchBuffer fetchBuffer,
                                            FetchMetricsManager metricsManager,
                                            NetworkClientDelegate 
networkClientDelegate,
-                                           FetchCollector<K, V> 
fetchCollector) {
-            super(logContext, time, metadata, subscriptions, fetchConfig, 
fetchBuffer, metricsManager, networkClientDelegate, null);
+                                           FetchCollector<K, V> fetchCollector,
+                                           ApiVersions apiVersions) {
+            super(logContext, time, metadata, subscriptions, fetchConfig, 
fetchBuffer, metricsManager, networkClientDelegate, apiVersions);
             this.fetchCollector = fetchCollector;
         }
 
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index e6f5689a001..44d21f7c8d1 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -144,6 +144,11 @@ import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
+/**
+ * If you are adding a test here, do evaluate if a similar test needs to be 
added in
+ * FetchRequestManagerTest.java, which captures the tests for the new consumer 
as part of the
+ * 
https://cwiki.apache.org/confluence/display/KAFKA/Consumer+threading+refactor+project+overview
+ */
 public class FetcherTest {
     private static final double EPSILON = 0.0001;
 
@@ -3466,16 +3471,16 @@ public class FetcherTest {
      */
     @ParameterizedTest
     @EnumSource(value = Errors.class, names = {"FENCED_LEADER_EPOCH", 
"NOT_LEADER_OR_FOLLOWER"})
-    public void 
testWhenFetchResponseReturnsWithALeaderShipChangeErrorButNoNewLeaderInformation(Errors
 error) {
-        // The test runs with 2 partitions where 1 partition is fetched 
without errors, and 2nd partitions faces errors due to leadership changes.
-
+    public void 
testWhenFetchResponseReturnsALeaderShipChangeErrorButNoNewLeaderInformation(Errors
 error) {
+        // The test runs with 2 partitions where 1 partition is fetched 
without errors, and
+        // 2nd partition faces errors due to leadership changes.
         buildFetcher(new MetricConfig(), OffsetResetStrategy.EARLIEST, new 
BytesDeserializer(),
             new BytesDeserializer(),
             Integer.MAX_VALUE, IsolationLevel.READ_UNCOMMITTED,
             Duration.ofMinutes(5).toMillis());
 
-        // Setup so that fetcher is subscribed to tp0 & tp1, and metadata 
setup with tp0.
-        // assignFromUser(singleton(tp0));
+        // Setup so that tp0 & tp1 are subscribed and will be fetched from.
+        // Also, setup client's metadata for tp0 & tp1.
         subscriptions.assignFromUser(new HashSet<>(Arrays.asList(tp0, tp1)));
         client.updateMetadata(
             RequestTestUtils.metadataUpdateWithIds(2, singletonMap(topicName, 
4),
@@ -3496,53 +3501,52 @@ public class FetcherTest {
             FetchResponse.INVALID_LAST_STABLE_OFFSET, 0, 
Optional.of(nodeId0.id())), tp1Leader);
         consumerClient.poll(time.timer(0));
         assertTrue(fetcher.hasCompletedFetches());
-
         Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> 
partitionRecords = fetchRecords();
         assertTrue(partitionRecords.containsKey(tp0));
         assertTrue(partitionRecords.containsKey(tp1));
-
-        // Verify that preferred read replica is set for both tp0 & tp1.
+        // Validate setup of preferred read replica for tp0 & tp1 is done 
correctly.
         Node selected = fetcher.selectReadReplica(tp0, Node.noNode(), 
time.milliseconds());
         assertEquals(nodeId0.id(), selected.id());
         selected = fetcher.selectReadReplica(tp1, Node.noNode(), 
time.milliseconds());
         assertEquals(nodeId0.id(), selected.id());
 
-        // Next fetch returns an error(due to leadership change) but new 
leader info is missing, for tp0.
-        // For tp1 fetch returns with no error.
+        // Send next fetch request.
         assertEquals(1, sendFetches());
         assertFalse(fetcher.hasCompletedFetches());
-        // Verify that metadata-update isn't requested
+        // Verify that metadata-update isn't requested as metadata is 
considered upto-date.
         assertFalse(metadata.updateRequested());
 
+        // TEST that next fetch returns an error(due to leadership change) but 
new leader info is not returned
+        // in the FetchResponse. This is the behaviour prior to KIP-951, 
should keep on working.
         LinkedHashMap<TopicIdPartition, FetchResponseData.PartitionData> 
partitions = new LinkedHashMap<>();
         partitions.put(tidp0,
-                new FetchResponseData.PartitionData()
-                    .setPartitionIndex(tidp0.topicPartition().partition())
-                    .setErrorCode(error.code()));
+            new FetchResponseData.PartitionData()
+                .setPartitionIndex(tidp0.topicPartition().partition())
+                .setErrorCode(error.code()));
         partitions.put(tidp1,
-                new FetchResponseData.PartitionData()
-                    .setPartitionIndex(tidp1.topicPartition().partition())
-                    .setErrorCode(Errors.NONE.code())
-                    .setHighWatermark(100L)
-                    
.setLastStableOffset(FetchResponse.INVALID_LAST_STABLE_OFFSET)
-                    .setLogStartOffset(0)
-                    .setRecords(nextRecords));
+            new FetchResponseData.PartitionData()
+                .setPartitionIndex(tidp1.topicPartition().partition())
+                .setErrorCode(Errors.NONE.code())
+                .setHighWatermark(100L)
+                .setLastStableOffset(FetchResponse.INVALID_LAST_STABLE_OFFSET)
+                .setLogStartOffset(0)
+                .setRecords(nextRecords));
         client.prepareResponseFrom(FetchResponse.of(Errors.NONE, 0, 
INVALID_SESSION_ID, partitions), nodeId0);
         consumerClient.poll(time.timer(0));
         partitionRecords = fetchRecords();
         assertFalse(partitionRecords.containsKey(tp0));
         assertTrue(partitionRecords.containsKey(tp1));
 
-        // Validate metadata is unchanged, as response has missing new leader 
info.
+        // Validate metadata is unchanged, as FetchResponse didn't have new 
leader information.
         assertEquals(startingClusterMetadata, metadata.fetch());
 
-        // Validate metadata-update is requested due to the error for tp0.
+        // Validate metadata-update is requested due to the leadership-error 
on tp0.
         assertTrue(metadata.updateRequested());
 
         // Validate preferred-read-replica is cleared for tp0 due to the error.
         assertEquals(Optional.empty(),
             subscriptions.preferredReadReplica(tp0, time.milliseconds()));
-        // Validate preferred-read-replica is still set for tp1
+        // Validate preferred-read-replica is still set for tp1 as previous 
fetch for it was ok.
         assertEquals(Optional.of(nodeId0.id()),
             subscriptions.preferredReadReplica(tp1, time.milliseconds()));
 
@@ -3560,16 +3564,16 @@ public class FetcherTest {
      */
     @ParameterizedTest
     @EnumSource(value = Errors.class, names = {"FENCED_LEADER_EPOCH", 
"NOT_LEADER_OR_FOLLOWER"})
-    public void 
testWhenFetchResponseReturnsWithALeaderShipChangeErrorAndNewLeaderInformation(Errors
 error) {
-        // The test runs with 2 partitions where 1 partition is fetched 
without errors, and 2nd partitions faces errors due to leadership changes.
-
+    public void 
testWhenFetchResponseReturnsALeaderShipChangeErrorAndNewLeaderInformation(Errors
 error) {
+        // The test runs with 2 partitions where 1 partition is fetched 
without errors, and
+        // 2nd partition faces errors due to leadership changes.
         buildFetcher(new MetricConfig(), OffsetResetStrategy.EARLIEST, new 
BytesDeserializer(),
             new BytesDeserializer(),
             Integer.MAX_VALUE, IsolationLevel.READ_UNCOMMITTED,
             Duration.ofMinutes(5).toMillis());
 
-        // Setup so that fetcher is subscribed to tp0 & tp1, and metadata 
setup with tp0.
-        // assignFromUser(singleton(tp0));
+        // Setup so that tp0 & tp1 are subscribed and will be fetched from.
+        // Also, setup client's metadata for tp0 & tp1.
         subscriptions.assignFromUser(new HashSet<>(Arrays.asList(tp0, tp1)));
         client.updateMetadata(
             RequestTestUtils.metadataUpdateWithIds(2, singletonMap(topicName, 
4),
@@ -3590,24 +3594,23 @@ public class FetcherTest {
             FetchResponse.INVALID_LAST_STABLE_OFFSET, 0, 
Optional.of(nodeId0.id())), tp1Leader);
         consumerClient.poll(time.timer(0));
         assertTrue(fetcher.hasCompletedFetches());
-
         Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> 
partitionRecords = fetchRecords();
         assertTrue(partitionRecords.containsKey(tp0));
         assertTrue(partitionRecords.containsKey(tp1));
-
-        // Verify that preferred read replica is set for both tp0 & tp1.
+        // Validate setup of preferred read replica for tp0 & tp1 is done 
correctly.
         Node selected = fetcher.selectReadReplica(tp0, Node.noNode(), 
time.milliseconds());
         assertEquals(nodeId0.id(), selected.id());
         selected = fetcher.selectReadReplica(tp1, Node.noNode(), 
time.milliseconds());
         assertEquals(nodeId0.id(), selected.id());
 
-        // Next fetch returns an error(due to leadership change) and new 
leader info is returned. The new leader is a new node, id = 999.
-        // For tp1 fetch returns with no error.
+        // Send next fetch request.
         assertEquals(1, sendFetches());
         assertFalse(fetcher.hasCompletedFetches());
-        // Verify that metadata-update isn't requested
+        // Validate metadata-update isn't requested as no errors seen yet.
         assertFalse(metadata.updateRequested());
 
+        // Test that next fetch returns an error(due to leadership change) and 
new leader info is returned, as introduced
+        // in KIP-951. The new leader is a new node, id = 999. For tp1 fetch 
returns with no error.
         LinkedHashMap<TopicIdPartition, FetchResponseData.PartitionData> 
partitions = new LinkedHashMap<>();
         Node newNode = new Node(999, "newnode", 999, "newrack");
         FetchResponseData.PartitionData tp0Data = new 
FetchResponseData.PartitionData()
@@ -3640,13 +3643,13 @@ public class FetcherTest {
         assertEquals(Optional.of(newNode), currentLeaderTp0.leader);
         assertEquals(Optional.of(tp0NewLeaderEpoch), currentLeaderTp0.epoch);
 
-        // Validate metadata-update is requested due to the error for tp0.
+        // Validate metadata-update is requested due to the leadership-error 
for tp0.
         assertTrue(metadata.updateRequested());
 
-        // Validate preferred-read-replica is cleared for tp0.
+        // Validate preferred-read-replica is cleared for tp0 due to the error.
         assertEquals(Optional.empty(),
             subscriptions.preferredReadReplica(tp0, time.milliseconds()));
-        // Validate preferred-read-replica is still set for tp1
+        // Validate preferred-read-replica is still set for tp1 as previous 
fetch is ok.
         assertEquals(Optional.of(nodeId0.id()),
             subscriptions.preferredReadReplica(tp1, time.milliseconds()));
 

Reply via email to