This is an automated email from the ASF dual-hosted git repository.

soarez pushed a commit to branch 3.7
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.7 by this push:
     new cf67774d8cd KAFKA-16886: Detect replica demotion in AssignmentsManager 
(#16232)
cf67774d8cd is described below

commit cf67774d8cd3f223e9a680fb567773559bbb3181
Author: Igor Soarez <i...@soarez.me>
AuthorDate: Sat Jun 8 16:10:23 2024 +0300

    KAFKA-16886: Detect replica demotion in AssignmentsManager (#16232)
    
    JBOD Brokers keep the Controller up to date with replica-to-directory
    placement via AssignReplicasToDirsRequest. These requests are queued,
    compacted and sent by AssignmentsManager.
    
    The Controller returns the error NOT_LEADER_OR_FOLLOWER when handling
    a AssignReplicasToDirsRequest from a broker that is not a replica.
    
    A partition reassignment can take place, removing the Broker
    as a replica before the AssignReplicasToDirsRequest successfully
    reaches the Controller. AssignmentsManager retries failed
    requests, and will continuously try to propagate this assignment,
    until the Broker either shuts down, or is added back as a replica.
    
    When encountering a NOT_LEADER_OR_FOLLOWER error, AssignmentsManager
    should assume that the broker is no longer a replica, and stop
    trying to propagate the directory assignment for that partition.
    
    Reviewers: Luke Chen <show...@gmail.com>
---
 .../apache/kafka/server/AssignmentsManager.java    |  8 ++-
 .../kafka/server/AssignmentsManagerTest.java       | 58 +++++++++++++++++++++-
 2 files changed, 64 insertions(+), 2 deletions(-)

diff --git 
a/server/src/main/java/org/apache/kafka/server/AssignmentsManager.java 
b/server/src/main/java/org/apache/kafka/server/AssignmentsManager.java
index 17d0d56178d..237cb8a25bf 100644
--- a/server/src/main/java/org/apache/kafka/server/AssignmentsManager.java
+++ b/server/src/main/java/org/apache/kafka/server/AssignmentsManager.java
@@ -120,6 +120,10 @@ public class AssignmentsManager {
         }
     }
 
+    public void onAssignment(TopicIdPartition topicPartition, Uuid dirId) {
+        onAssignment(topicPartition, dirId, null);
+    }
+
     public void onAssignment(TopicIdPartition topicPartition, Uuid dirId, 
Runnable callback) {
         if (callback == null) {
             callback = () -> { };
@@ -420,7 +424,9 @@ public class AssignmentsManager {
                     } else {
                         acknowledged.add(topicPartition);
                         Errors error = Errors.forCode(partition.errorCode());
-                        if (error != Errors.NONE) {
+                        if (error == Errors.NOT_LEADER_OR_FOLLOWER) {
+                            log.info("Dropping late directory assignment for 
partition {} into directory {} because this broker is no longer a replica", 
partition, event.dirId);
+                        } else if (error != Errors.NONE) {
                             log.error("Controller returned error {} for 
assignment of partition {} into directory {}",
                                     error.name(), partition, event.dirId);
                             failures.add(event);
diff --git 
a/server/src/test/java/org/apache/kafka/server/AssignmentsManagerTest.java 
b/server/src/test/java/org/apache/kafka/server/AssignmentsManagerTest.java
index 25d58f4339d..f402d6b9435 100644
--- a/server/src/test/java/org/apache/kafka/server/AssignmentsManagerTest.java
+++ b/server/src/test/java/org/apache/kafka/server/AssignmentsManagerTest.java
@@ -40,14 +40,17 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 import org.mockito.ArgumentCaptor;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
 import java.util.function.Supplier;
 
 import static org.apache.kafka.metadata.AssignmentsHelper.buildRequestData;
@@ -302,12 +305,21 @@ public class AssignmentsManagerTest {
     }
 
     private static ClientResponse 
buildSuccessfulResponse(AssignReplicasToDirsRequestData request) {
+        return buildResponse(request, topicIdPartition -> Errors.NONE);
+    }
+
+    private static ClientResponse 
buildResponse(AssignReplicasToDirsRequestData request,
+                                                Function<TopicIdPartition, 
Errors> perPartitionError) {
         Map<Uuid, Map<TopicIdPartition, Errors>> errors = new HashMap<>();
         for (AssignReplicasToDirsRequestData.DirectoryData directory : 
request.directories()) {
             for (AssignReplicasToDirsRequestData.TopicData topic : 
directory.topics()) {
                 for (AssignReplicasToDirsRequestData.PartitionData partition : 
topic.partitions()) {
                     TopicIdPartition topicIdPartition = new 
TopicIdPartition(topic.topicId(), partition.partitionIndex());
-                    errors.computeIfAbsent(directory.id(), d -> new 
HashMap<>()).put(topicIdPartition, Errors.NONE);
+                    Errors error = perPartitionError.apply(topicIdPartition);
+                    if (error == null) {
+                        error = Errors.NONE;
+                    }
+                    errors.computeIfAbsent(directory.id(), d -> new 
HashMap<>()).put(topicIdPartition, error);
                 }
             }
         }
@@ -395,4 +407,48 @@ public class AssignmentsManagerTest {
         }
         TestUtils.retryOnExceptionWithTimeout(5_000, () -> assertEquals(8, 
queuedReplicaToDirAssignments.value()));
     }
+
+    // AssignmentsManager retries to propagate assignments (via 
AssignReplicasToDirsRequest) after failures.
+    // When an assignment fails to propagate with NOT_LEADER_OR_FOLLOWER, 
AssignmentsManager should conclude
+    // that the broker has been removed as a replica for the partition, and 
stop trying to propagate it.
+    @Test
+    void testDropsOldAssignments() throws InterruptedException {
+        TopicIdPartition tp1 = new TopicIdPartition(TOPIC_1, 1), tp2 = new 
TopicIdPartition(TOPIC_1, 2);
+        List<AssignReplicasToDirsRequestData> requests = new ArrayList<>();
+        CountDownLatch readyToAssert = new CountDownLatch(2);
+        doAnswer(invocation -> {
+            AssignReplicasToDirsRequestData request = 
invocation.getArgument(0, 
AssignReplicasToDirsRequest.Builder.class).build().data();
+            ControllerRequestCompletionHandler completionHandler = 
invocation.getArgument(1, ControllerRequestCompletionHandler.class);
+            if (readyToAssert.getCount() == 2) {
+                // First request, reply with a partition-level 
NOT_LEADER_OR_FOLLOWER error and queue a different assignment
+                completionHandler.onComplete(buildResponse(request, 
topicIdPartition -> Errors.NOT_LEADER_OR_FOLLOWER));
+                manager.onAssignment(tp2, DIR_1);
+            }
+            if (readyToAssert.getCount() == 1) {
+                // Second request, reply with success
+                completionHandler.onComplete(buildSuccessfulResponse(request));
+            }
+            requests.add(request);
+            readyToAssert.countDown();
+            return null;
+        }).when(channelManager).sendRequest(any(), any());
+
+        manager.onAssignment(tp1, DIR_1);
+        TestUtils.waitForCondition(() -> {
+            time.sleep(TimeUnit.SECONDS.toMillis(1));
+            manager.wakeup();
+            return readyToAssert.await(1, TimeUnit.MILLISECONDS);
+        }, "Timed out waiting for AssignReplicasToDirsRequest to be sent.");
+
+        assertEquals(Arrays.asList(
+                buildRequestData(8, 100, new HashMap<TopicIdPartition, Uuid>() 
{{
+                        put(tp1, DIR_1);
+                    }}),
+                // Even though the controller replied with 
NOT_LEADER_OR_FOLLOWER, the second request does not include
+                // partition 1, meaning AssignmentManager dropped (no longer 
retries) the assignment.
+                buildRequestData(8, 100, new HashMap<TopicIdPartition, Uuid>() 
{{
+                        put(tp2, DIR_1);
+                    }})
+        ), requests);
+    }
 }

Reply via email to