AndrewJSchofield commented on code in PR #14916:
URL: https://github.com/apache/kafka/pull/14916#discussion_r1414431120


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java:
##########
@@ -3126,6 +3127,207 @@ public void testCorruptMessageError() {
         assertThrows(KafkaException.class, this::fetchRecords);
     }
 
+
+    /**
+     * Test the scenario that FetchResponse returns with an error indicating 
leadership change for the partition, but it
+     * does not contain new leader info(defined in KIP-951).
+     */
+    @ParameterizedTest
+    @EnumSource(value = Errors.class, names = {"FENCED_LEADER_EPOCH", 
"NOT_LEADER_OR_FOLLOWER"})
+    public void 
testWhenFetchResponseReturnsWithALeaderShipChangeErrorButNoNewLeaderInformation(Errors
 error) {
+        // The test runs with 2 partitions where 1 partition is fetched 
without errors, and 2nd partitions faces errors due to leadership changes.
+
+        buildFetcher(new MetricConfig(), OffsetResetStrategy.EARLIEST, new 
BytesDeserializer(),
+            new BytesDeserializer(),
+            Integer.MAX_VALUE, IsolationLevel.READ_UNCOMMITTED,
+            Duration.ofMinutes(5).toMillis());
+
+        // Setup so that fetcher is subscribed to tp0 & tp1, and metadata 
setup with tp0.
+        // assignFromUser(singleton(tp0));
+        subscriptions.assignFromUser(new HashSet<>(Arrays.asList(tp0, tp1)));
+        client.updateMetadata(
+            RequestTestUtils.metadataUpdateWithIds(2, singletonMap(topicName, 
4),
+                tp -> validLeaderEpoch, topicIds, false));
+        Node tp0Leader = metadata.fetch().leaderFor(tp0);
+        Node tp1Leader = metadata.fetch().leaderFor(tp1);
+        Node nodeId0 = metadata.fetch().nodeById(0);
+        Cluster startingClusterMetadata = metadata.fetch();
+        subscriptions.seek(tp0, 0);
+        subscriptions.seek(tp1, 0);
+
+        // Setup preferred read replica to node=1 by doing a fetch for both 
partitions.
+        assertEquals(2, sendFetches());
+        assertFalse(fetcher.hasCompletedFetches());
+        client.prepareResponseFrom(fullFetchResponse(tidp0, this.records, 
Errors.NONE, 100L,
+            FetchResponse.INVALID_LAST_STABLE_OFFSET, 0, 
Optional.of(nodeId0.id())), tp0Leader);
+        client.prepareResponseFrom(fullFetchResponse(tidp1, this.records, 
Errors.NONE, 100L,
+            FetchResponse.INVALID_LAST_STABLE_OFFSET, 0, 
Optional.of(nodeId0.id())), tp1Leader);
+        networkClientDelegate.poll(time.timer(0));
+        assertTrue(fetcher.hasCompletedFetches());
+
+        Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> 
partitionRecords = fetchRecords();
+        assertTrue(partitionRecords.containsKey(tp0));
+        assertTrue(partitionRecords.containsKey(tp1));
+
+        // Verify that preferred read replica is set for both tp0 & tp1.
+        Node selected = fetcher.selectReadReplica(tp0, Node.noNode(), 
time.milliseconds());
+        assertEquals(nodeId0.id(), selected.id());
+        selected = fetcher.selectReadReplica(tp1, Node.noNode(), 
time.milliseconds());
+        assertEquals(nodeId0.id(), selected.id());
+
+        // Next fetch returns an error(due to leadership change) but new 
leader info is missing, for tp0.
+        // For tp1 fetch returns with no error.
+        assertEquals(1, sendFetches());
+        assertFalse(fetcher.hasCompletedFetches());
+        // Verify that metadata-update isn't requested
+        assertFalse(metadata.updateRequested());
+
+        LinkedHashMap<TopicIdPartition, FetchResponseData.PartitionData> 
partitions = new LinkedHashMap<>();
+        partitions.put(tidp0,
+            new FetchResponseData.PartitionData()
+                .setPartitionIndex(tidp0.topicPartition().partition())
+                .setErrorCode(error.code()));
+        partitions.put(tidp1,
+            new FetchResponseData.PartitionData()
+                .setPartitionIndex(tidp1.topicPartition().partition())
+                .setErrorCode(Errors.NONE.code())
+                .setHighWatermark(100L)
+                .setLastStableOffset(FetchResponse.INVALID_LAST_STABLE_OFFSET)
+                .setLogStartOffset(0)
+                .setRecords(nextRecords));
+        client.prepareResponseFrom(FetchResponse.of(Errors.NONE, 0, 
INVALID_SESSION_ID, partitions), nodeId0);
+        networkClientDelegate.poll(time.timer(0));
+        partitionRecords = fetchRecords();
+        assertFalse(partitionRecords.containsKey(tp0));
+        assertTrue(partitionRecords.containsKey(tp1));
+
+        // Validate metadata is unchanged, as response has missing new leader 
info.
+        assertEquals(startingClusterMetadata, metadata.fetch());
+
+        // Validate metadata-update is requested due to the error for tp0.
+        assertTrue(metadata.updateRequested());
+
+        // Validate preferred-read-replica is cleared for tp0 due to the error.
+        assertEquals(Optional.empty(),
+            subscriptions.preferredReadReplica(tp0, time.milliseconds()));
+        // Validate preferred-read-replica is still set for tp1
+        assertEquals(Optional.of(nodeId0.id()),
+            subscriptions.preferredReadReplica(tp1, time.milliseconds()));
+
+        // Validate subscription is still valid & fetch-able for both tp0 & 
tp1. And tp0 points to original leader.
+        assertTrue(subscriptions.isFetchable(tp0));
+        Metadata.LeaderAndEpoch currentLeader = 
subscriptions.position(tp0).currentLeader;
+        assertEquals(tp0Leader.id(), currentLeader.leader.get().id());
+        assertEquals(validLeaderEpoch, currentLeader.epoch.get());
+        assertTrue(subscriptions.isFetchable(tp1));
+    }
+
+    /**
+     * Test the scenario that FetchResponse returns with an error indicating 
leadership change for the partition, along with
+     * new leader info(defined in KIP-951).
+     */
+    @ParameterizedTest
+    @EnumSource(value = Errors.class, names = {"FENCED_LEADER_EPOCH", 
"NOT_LEADER_OR_FOLLOWER"})
+    public void 
testWhenFetchResponseReturnsWithALeaderShipChangeErrorAndNewLeaderInformation(Errors
 error) {
+        // The test runs with 2 partitions where 1 partition is fetched 
without errors, and 2nd partitions faces errors due to leadership changes.
+
+        buildFetcher(new MetricConfig(), OffsetResetStrategy.EARLIEST, new 
BytesDeserializer(),
+            new BytesDeserializer(),
+            Integer.MAX_VALUE, IsolationLevel.READ_UNCOMMITTED,
+            Duration.ofMinutes(5).toMillis());
+
+        // Setup so that fetcher is subscribed to tp0 & tp1, and metadata 
setup with tp0.
+        // assignFromUser(singleton(tp0));
+        subscriptions.assignFromUser(new HashSet<>(Arrays.asList(tp0, tp1)));
+        client.updateMetadata(
+            RequestTestUtils.metadataUpdateWithIds(2, singletonMap(topicName, 
4),
+                tp -> validLeaderEpoch, topicIds, false));
+        Node tp0Leader = metadata.fetch().leaderFor(tp0);
+        Node tp1Leader = metadata.fetch().leaderFor(tp1);
+        Node nodeId0 = metadata.fetch().nodeById(0);
+        Cluster startingClusterMetadata = metadata.fetch();
+        subscriptions.seek(tp0, 0);
+        subscriptions.seek(tp1, 0);
+
+        // Setup preferred read replica to node=1 by doing a fetch for both 
partitions.
+        assertEquals(2, sendFetches());
+        assertFalse(fetcher.hasCompletedFetches());
+        client.prepareResponseFrom(fullFetchResponse(tidp0, this.records, 
Errors.NONE, 100L,
+            FetchResponse.INVALID_LAST_STABLE_OFFSET, 0, 
Optional.of(nodeId0.id())), tp0Leader);
+        client.prepareResponseFrom(fullFetchResponse(tidp1, this.records, 
Errors.NONE, 100L,
+            FetchResponse.INVALID_LAST_STABLE_OFFSET, 0, 
Optional.of(nodeId0.id())), tp1Leader);
+        networkClientDelegate.poll(time.timer(0));
+        assertTrue(fetcher.hasCompletedFetches());
+
+        Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> 
partitionRecords = fetchRecords();
+        assertTrue(partitionRecords.containsKey(tp0));
+        assertTrue(partitionRecords.containsKey(tp1));
+
+        // Verify that preferred read replica is set for both tp0 & tp1.
+        Node selected = fetcher.selectReadReplica(tp0, Node.noNode(), 
time.milliseconds());
+        assertEquals(nodeId0.id(), selected.id());
+        selected = fetcher.selectReadReplica(tp1, Node.noNode(), 
time.milliseconds());
+        assertEquals(nodeId0.id(), selected.id());
+
+        // Next fetch returns an error(due to leadership change) and new 
leader info is returned. The new leader is a new node, id = 999.

Review Comment:
   This comment seems quite out of place. The vital line of code is 3283.



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java:
##########
@@ -3126,6 +3127,207 @@ public void testCorruptMessageError() {
         assertThrows(KafkaException.class, this::fetchRecords);
     }
 
+
+    /**
+     * Test the scenario that FetchResponse returns with an error indicating 
leadership change for the partition, but it
+     * does not contain new leader info(defined in KIP-951).
+     */
+    @ParameterizedTest
+    @EnumSource(value = Errors.class, names = {"FENCED_LEADER_EPOCH", 
"NOT_LEADER_OR_FOLLOWER"})
+    public void 
testWhenFetchResponseReturnsWithALeaderShipChangeErrorButNoNewLeaderInformation(Errors
 error) {
+        // The test runs with 2 partitions where 1 partition is fetched 
without errors, and 2nd partitions faces errors due to leadership changes.
+
+        buildFetcher(new MetricConfig(), OffsetResetStrategy.EARLIEST, new 
BytesDeserializer(),
+            new BytesDeserializer(),
+            Integer.MAX_VALUE, IsolationLevel.READ_UNCOMMITTED,
+            Duration.ofMinutes(5).toMillis());
+
+        // Setup so that fetcher is subscribed to tp0 & tp1, and metadata 
setup with tp0.
+        // assignFromUser(singleton(tp0));
+        subscriptions.assignFromUser(new HashSet<>(Arrays.asList(tp0, tp1)));
+        client.updateMetadata(
+            RequestTestUtils.metadataUpdateWithIds(2, singletonMap(topicName, 
4),
+                tp -> validLeaderEpoch, topicIds, false));
+        Node tp0Leader = metadata.fetch().leaderFor(tp0);
+        Node tp1Leader = metadata.fetch().leaderFor(tp1);
+        Node nodeId0 = metadata.fetch().nodeById(0);
+        Cluster startingClusterMetadata = metadata.fetch();
+        subscriptions.seek(tp0, 0);
+        subscriptions.seek(tp1, 0);
+
+        // Setup preferred read replica to node=1 by doing a fetch for both 
partitions.
+        assertEquals(2, sendFetches());
+        assertFalse(fetcher.hasCompletedFetches());
+        client.prepareResponseFrom(fullFetchResponse(tidp0, this.records, 
Errors.NONE, 100L,
+            FetchResponse.INVALID_LAST_STABLE_OFFSET, 0, 
Optional.of(nodeId0.id())), tp0Leader);
+        client.prepareResponseFrom(fullFetchResponse(tidp1, this.records, 
Errors.NONE, 100L,
+            FetchResponse.INVALID_LAST_STABLE_OFFSET, 0, 
Optional.of(nodeId0.id())), tp1Leader);
+        networkClientDelegate.poll(time.timer(0));
+        assertTrue(fetcher.hasCompletedFetches());
+
+        Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> 
partitionRecords = fetchRecords();
+        assertTrue(partitionRecords.containsKey(tp0));
+        assertTrue(partitionRecords.containsKey(tp1));
+
+        // Verify that preferred read replica is set for both tp0 & tp1.
+        Node selected = fetcher.selectReadReplica(tp0, Node.noNode(), 
time.milliseconds());
+        assertEquals(nodeId0.id(), selected.id());
+        selected = fetcher.selectReadReplica(tp1, Node.noNode(), 
time.milliseconds());
+        assertEquals(nodeId0.id(), selected.id());
+
+        // Next fetch returns an error(due to leadership change) but new 
leader info is missing, for tp0.
+        // For tp1 fetch returns with no error.
+        assertEquals(1, sendFetches());
+        assertFalse(fetcher.hasCompletedFetches());
+        // Verify that metadata-update isn't requested
+        assertFalse(metadata.updateRequested());
+
+        LinkedHashMap<TopicIdPartition, FetchResponseData.PartitionData> 
partitions = new LinkedHashMap<>();
+        partitions.put(tidp0,
+            new FetchResponseData.PartitionData()
+                .setPartitionIndex(tidp0.topicPartition().partition())
+                .setErrorCode(error.code()));
+        partitions.put(tidp1,
+            new FetchResponseData.PartitionData()
+                .setPartitionIndex(tidp1.topicPartition().partition())
+                .setErrorCode(Errors.NONE.code())
+                .setHighWatermark(100L)
+                .setLastStableOffset(FetchResponse.INVALID_LAST_STABLE_OFFSET)
+                .setLogStartOffset(0)
+                .setRecords(nextRecords));
+        client.prepareResponseFrom(FetchResponse.of(Errors.NONE, 0, 
INVALID_SESSION_ID, partitions), nodeId0);
+        networkClientDelegate.poll(time.timer(0));
+        partitionRecords = fetchRecords();
+        assertFalse(partitionRecords.containsKey(tp0));
+        assertTrue(partitionRecords.containsKey(tp1));
+
+        // Validate metadata is unchanged, as response has missing new leader 
info.

Review Comment:
   It's not really missing new leader info, which sounds like a problem. It's 
just that KIP-951 provides a way to attach the new leader info to the response, 
and this first test shows the case where that is not being done.



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java:
##########
@@ -3126,6 +3127,207 @@ public void testCorruptMessageError() {
         assertThrows(KafkaException.class, this::fetchRecords);
     }
 
+
+    /**
+     * Test the scenario that FetchResponse returns with an error indicating 
leadership change for the partition, but it
+     * does not contain new leader info(defined in KIP-951).
+     */
+    @ParameterizedTest
+    @EnumSource(value = Errors.class, names = {"FENCED_LEADER_EPOCH", 
"NOT_LEADER_OR_FOLLOWER"})
+    public void 
testWhenFetchResponseReturnsWithALeaderShipChangeErrorButNoNewLeaderInformation(Errors
 error) {

Review Comment:
   `...ALeadershipChangeError...` please.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to