This is an automated email from the ASF dual-hosted git repository. soarez pushed a commit to branch 3.7 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.7 by this push: new cf67774d8cd KAFKA-16886: Detect replica demotion in AssignmentsManager (#16232) cf67774d8cd is described below commit cf67774d8cd3f223e9a680fb567773559bbb3181 Author: Igor Soarez <i...@soarez.me> AuthorDate: Sat Jun 8 16:10:23 2024 +0300 KAFKA-16886: Detect replica demotion in AssignmentsManager (#16232) JBOD Brokers keep the Controller up to date with replica-to-directory placement via AssignReplicasToDirsRequest. These requests are queued, compacted and sent by AssignmentsManager. The Controller returns the error NOT_LEADER_OR_FOLLOWER when handling a AssignReplicasToDirsRequest from a broker that is not a replica. A partition reassignment can take place, removing the Broker as a replica before the AssignReplicasToDirsRequest successfully reaches the Controller. AssignmentsManager retries failed requests, and will continuously try to propagate this assignment, until the Broker either shuts down, or is added back as a replica. When encountering a NOT_LEADER_OR_FOLLOWER error, AssignmentsManager should assume that the broker is no longer a replica, and stop trying to propagate the directory assignment for that partition. Reviewers: Luke Chen <show...@gmail.com> --- .../apache/kafka/server/AssignmentsManager.java | 8 ++- .../kafka/server/AssignmentsManagerTest.java | 58 +++++++++++++++++++++- 2 files changed, 64 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/apache/kafka/server/AssignmentsManager.java b/server/src/main/java/org/apache/kafka/server/AssignmentsManager.java index 17d0d56178d..237cb8a25bf 100644 --- a/server/src/main/java/org/apache/kafka/server/AssignmentsManager.java +++ b/server/src/main/java/org/apache/kafka/server/AssignmentsManager.java @@ -120,6 +120,10 @@ public class AssignmentsManager { } } + public void onAssignment(TopicIdPartition topicPartition, Uuid dirId) { + onAssignment(topicPartition, dirId, null); + } + public void onAssignment(TopicIdPartition topicPartition, Uuid dirId, Runnable callback) { if (callback == null) { callback = () -> { }; @@ -420,7 +424,9 @@ public class AssignmentsManager { } else { acknowledged.add(topicPartition); Errors error = Errors.forCode(partition.errorCode()); - if (error != Errors.NONE) { + if (error == Errors.NOT_LEADER_OR_FOLLOWER) { + log.info("Dropping late directory assignment for partition {} into directory {} because this broker is no longer a replica", partition, event.dirId); + } else if (error != Errors.NONE) { log.error("Controller returned error {} for assignment of partition {} into directory {}", error.name(), partition, event.dirId); failures.add(event); diff --git a/server/src/test/java/org/apache/kafka/server/AssignmentsManagerTest.java b/server/src/test/java/org/apache/kafka/server/AssignmentsManagerTest.java index 25d58f4339d..f402d6b9435 100644 --- a/server/src/test/java/org/apache/kafka/server/AssignmentsManagerTest.java +++ b/server/src/test/java/org/apache/kafka/server/AssignmentsManagerTest.java @@ -40,14 +40,17 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; import org.mockito.ArgumentCaptor; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.function.Function; import java.util.function.Supplier; import static org.apache.kafka.metadata.AssignmentsHelper.buildRequestData; @@ -302,12 +305,21 @@ public class AssignmentsManagerTest { } private static ClientResponse buildSuccessfulResponse(AssignReplicasToDirsRequestData request) { + return buildResponse(request, topicIdPartition -> Errors.NONE); + } + + private static ClientResponse buildResponse(AssignReplicasToDirsRequestData request, + Function<TopicIdPartition, Errors> perPartitionError) { Map<Uuid, Map<TopicIdPartition, Errors>> errors = new HashMap<>(); for (AssignReplicasToDirsRequestData.DirectoryData directory : request.directories()) { for (AssignReplicasToDirsRequestData.TopicData topic : directory.topics()) { for (AssignReplicasToDirsRequestData.PartitionData partition : topic.partitions()) { TopicIdPartition topicIdPartition = new TopicIdPartition(topic.topicId(), partition.partitionIndex()); - errors.computeIfAbsent(directory.id(), d -> new HashMap<>()).put(topicIdPartition, Errors.NONE); + Errors error = perPartitionError.apply(topicIdPartition); + if (error == null) { + error = Errors.NONE; + } + errors.computeIfAbsent(directory.id(), d -> new HashMap<>()).put(topicIdPartition, error); } } } @@ -395,4 +407,48 @@ public class AssignmentsManagerTest { } TestUtils.retryOnExceptionWithTimeout(5_000, () -> assertEquals(8, queuedReplicaToDirAssignments.value())); } + + // AssignmentsManager retries to propagate assignments (via AssignReplicasToDirsRequest) after failures. + // When an assignment fails to propagate with NOT_LEADER_OR_FOLLOWER, AssignmentsManager should conclude + // that the broker has been removed as a replica for the partition, and stop trying to propagate it. + @Test + void testDropsOldAssignments() throws InterruptedException { + TopicIdPartition tp1 = new TopicIdPartition(TOPIC_1, 1), tp2 = new TopicIdPartition(TOPIC_1, 2); + List<AssignReplicasToDirsRequestData> requests = new ArrayList<>(); + CountDownLatch readyToAssert = new CountDownLatch(2); + doAnswer(invocation -> { + AssignReplicasToDirsRequestData request = invocation.getArgument(0, AssignReplicasToDirsRequest.Builder.class).build().data(); + ControllerRequestCompletionHandler completionHandler = invocation.getArgument(1, ControllerRequestCompletionHandler.class); + if (readyToAssert.getCount() == 2) { + // First request, reply with a partition-level NOT_LEADER_OR_FOLLOWER error and queue a different assignment + completionHandler.onComplete(buildResponse(request, topicIdPartition -> Errors.NOT_LEADER_OR_FOLLOWER)); + manager.onAssignment(tp2, DIR_1); + } + if (readyToAssert.getCount() == 1) { + // Second request, reply with success + completionHandler.onComplete(buildSuccessfulResponse(request)); + } + requests.add(request); + readyToAssert.countDown(); + return null; + }).when(channelManager).sendRequest(any(), any()); + + manager.onAssignment(tp1, DIR_1); + TestUtils.waitForCondition(() -> { + time.sleep(TimeUnit.SECONDS.toMillis(1)); + manager.wakeup(); + return readyToAssert.await(1, TimeUnit.MILLISECONDS); + }, "Timed out waiting for AssignReplicasToDirsRequest to be sent."); + + assertEquals(Arrays.asList( + buildRequestData(8, 100, new HashMap<TopicIdPartition, Uuid>() {{ + put(tp1, DIR_1); + }}), + // Even though the controller replied with NOT_LEADER_OR_FOLLOWER, the second request does not include + // partition 1, meaning AssignmentManager dropped (no longer retries) the assignment. + buildRequestData(8, 100, new HashMap<TopicIdPartition, Uuid>() {{ + put(tp2, DIR_1); + }}) + ), requests); + } }