This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch 3.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.4 by this push:
     new f65cf7e1d9d KAFKA-14532: Correctly handle failed fetch when partitions 
unassigned (#13023)
f65cf7e1d9d is described below

commit f65cf7e1d9d885cdca0b85cbe7d2bf51a33ac239
Author: Lucas Brutschy <[email protected]>
AuthorDate: Wed Dec 21 09:17:11 2022 +0100

    KAFKA-14532: Correctly handle failed fetch when partitions unassigned 
(#13023)
    
    The failure handling code for fetches could run into an 
IllegalStateException if a fetch response came back with a failure after the 
corresponding topic partition has already been removed from the assignment.
    
    Reviewers: David Jacot <[email protected]>
---
 .../consumer/internals/SubscriptionState.java      |  7 ++++-
 .../clients/consumer/internals/FetcherTest.java    | 36 ++++++++++++++++++++++
 2 files changed, 42 insertions(+), 1 deletion(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index 03b664e1392..b41122f0e50 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -620,7 +620,12 @@ public class SubscriptionState {
      * @return the removed preferred read replica if set, None otherwise.
      */
     public synchronized Optional<Integer> 
clearPreferredReadReplica(TopicPartition tp) {
-        return assignedState(tp).clearPreferredReadReplica();
+        final TopicPartitionState topicPartitionState = 
assignedStateOrNull(tp);
+        if (topicPartitionState == null) {
+            return Optional.empty();
+        } else {
+            return topicPartitionState.clearPreferredReadReplica();
+        }
     }
 
     public synchronized Map<TopicPartition, OffsetAndMetadata> allConsumed() {
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 0e14355ecc8..595f6404d63 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -137,6 +137,7 @@ import java.util.stream.Collectors;
 import static java.util.Arrays.asList;
 import static java.util.Collections.emptyList;
 import static java.util.Collections.emptyMap;
+import static java.util.Collections.emptySet;
 import static java.util.Collections.singleton;
 import static java.util.Collections.singletonList;
 import static java.util.Collections.singletonMap;
@@ -4762,6 +4763,41 @@ public class FetcherTest {
         assertEquals(-1, selected.id());
     }
 
+    @Test
+    public void 
testFetchDisconnectedShouldNotClearPreferredReadReplicaIfUnassigned() {
+        buildFetcher(new MetricConfig(), OffsetResetStrategy.EARLIEST, new 
BytesDeserializer(), new BytesDeserializer(),
+            Integer.MAX_VALUE, IsolationLevel.READ_COMMITTED, 
Duration.ofMinutes(5).toMillis());
+
+        subscriptions.assignFromUser(singleton(tp0));
+        client.updateMetadata(RequestTestUtils.metadataUpdateWithIds(2, 
singletonMap(topicName, 4), tp -> validLeaderEpoch, topicIds, false));
+        subscriptions.seek(tp0, 0);
+        assertEquals(1, fetcher.sendFetches());
+
+        // Set preferred read replica to node=1
+        client.prepareResponse(fullFetchResponse(tidp0, this.records, 
Errors.NONE, 100L,
+            FetchResponse.INVALID_LAST_STABLE_OFFSET, 0, Optional.of(1)));
+        consumerClient.poll(time.timer(0));
+        assertTrue(fetcher.hasCompletedFetches());
+        fetchedRecords();
+
+        // Verify
+        Node selected = fetcher.selectReadReplica(tp0, Node.noNode(), 
time.milliseconds());
+        assertEquals(1, selected.id());
+        assertEquals(1, fetcher.sendFetches());
+        assertFalse(fetcher.hasCompletedFetches());
+
+        // Disconnect and remove tp0 from assignment
+        client.prepareResponse(fullFetchResponse(tidp0, this.records, 
Errors.NONE, 100L, 0), true);
+        subscriptions.assignFromUser(emptySet());
+
+        // Preferred read replica should not be cleared
+        consumerClient.poll(time.timer(0));
+        assertFalse(fetcher.hasCompletedFetches());
+        fetchedRecords();
+        selected = fetcher.selectReadReplica(tp0, Node.noNode(), 
time.milliseconds());
+        assertEquals(-1, selected.id());
+    }
+
     @Test
     public void testFetchErrorShouldClearPreferredReadReplica() {
         buildFetcher(new MetricConfig(), OffsetResetStrategy.EARLIEST, new 
BytesDeserializer(), new BytesDeserializer(),

Reply via email to