This is an automated email from the ASF dual-hosted git repository. showuon pushed a commit to branch 3.7 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.7 by this push: new ba665a4a89f KAFKA-16365: AssignmentsManager callback handling issues (#15521) ba665a4a89f is described below commit ba665a4a89f6c0113bbe2e8891118d2d8aa06874 Author: Igor Soarez <i...@soarez.me> AuthorDate: Tue Apr 2 04:11:32 2024 +0100 KAFKA-16365: AssignmentsManager callback handling issues (#15521) When moving replicas between directories in the same broker, future replica promotion hinges on acknowledgment from the controller of a change in the directory assignment. ReplicaAlterLogDirsThread relies on AssignmentsManager for a completion notification of the directory assignment change. In its current form, under certain assignment scheduling, AssignmentsManager both miss completion notifications, or prematurely trigger them. Reviewers: Luke Chen <show...@gmail.com>, Omnia Ibrahim <o.g.h.ibra...@gmail.com>, Gaurav Narula <gaurav_naru...@apple.com> --- .../apache/kafka/server/AssignmentsManager.java | 31 ++++----- .../kafka/server/AssignmentsManagerTest.java | 76 ++++++++++++++++++---- 2 files changed, 76 insertions(+), 31 deletions(-) diff --git a/server/src/main/java/org/apache/kafka/server/AssignmentsManager.java b/server/src/main/java/org/apache/kafka/server/AssignmentsManager.java index 0885f4e4fbc..17d0d56178d 100644 --- a/server/src/main/java/org/apache/kafka/server/AssignmentsManager.java +++ b/server/src/main/java/org/apache/kafka/server/AssignmentsManager.java @@ -181,9 +181,6 @@ public class AssignmentsManager { if (!partition.equals(other.partition)) { throw new IllegalArgumentException("Cannot merge events for different partitions"); } - if (!dirId.equals(other.dirId)) { - throw new IllegalArgumentException("Cannot merge events for different directories"); - } completionHandlers.addAll(other.completionHandlers); } void onComplete() { @@ -193,25 +190,29 @@ public class AssignmentsManager { } @Override public void run() throws Exception { + log.trace("Received assignment {}", this); AssignmentEvent existing = pending.getOrDefault(partition, null); + boolean existingIsInFlight = false; if (existing == null && inflight != null) { existing = inflight.getOrDefault(partition, null); + existingIsInFlight = true; } if (existing != null) { if (existing.dirId.equals(dirId)) { existing.merge(this); - if (log.isDebugEnabled()) log.debug("Ignoring duplicate assignment {}", this); + log.debug("Ignoring duplicate assignment {}", this); return; } if (existing.timestampNs > timestampNs) { - existing.onComplete(); - if (log.isDebugEnabled()) log.debug("Dropping assignment {} because it's older than {}", this, existing); + existing.merge(this); + log.debug("Dropping assignment {} because it's older than existing {}", this, existing); return; + } else if (!existingIsInFlight) { + this.merge(existing); + log.debug("Dropping existing assignment {} because it's older than {}", existing, this); } } - if (log.isDebugEnabled()) { - log.debug("Received new assignment {}", this); - } + log.debug("Queueing new assignment {}", this); pending.put(partition, this); if (inflight == null || inflight.isEmpty()) { @@ -272,9 +273,7 @@ public class AssignmentsManager { } Map<TopicIdPartition, Uuid> assignment = inflight.entrySet().stream() .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().dirId)); - if (log.isDebugEnabled()) { - log.debug("Dispatching {} assignments: {}", assignment.size(), assignment); - } + log.debug("Dispatching {} assignments: {}", assignment.size(), assignment); channelManager.sendRequest(new AssignReplicasToDirsRequest.Builder( buildRequestData(brokerId, brokerEpochSupplier.get(), assignment)), new AssignReplicasToDirsRequestCompletionHandler()); @@ -331,9 +330,7 @@ public class AssignmentsManager { } @Override public void onComplete(ClientResponse response) { - if (log.isDebugEnabled()) { - log.debug("Received controller response: {}", response); - } + log.debug("Received controller response: {}", response); appendResponseEvent(response); } void appendResponseEvent(ClientResponse response) { @@ -352,9 +349,7 @@ public class AssignmentsManager { } private void scheduleDispatch(long delayNs) { - if (log.isDebugEnabled()) { - log.debug("Scheduling dispatch in {}ns", delayNs); - } + log.debug("Scheduling dispatch in {}ns", delayNs); eventQueue.enqueue(EventQueue.EventInsertionType.DEFERRED, DispatchEvent.TAG, new EventQueue.LatestDeadlineFunction(time.nanoseconds() + delayNs), new DispatchEvent()); } diff --git a/server/src/test/java/org/apache/kafka/server/AssignmentsManagerTest.java b/server/src/test/java/org/apache/kafka/server/AssignmentsManagerTest.java index 043afde9386..25d58f4339d 100644 --- a/server/src/test/java/org/apache/kafka/server/AssignmentsManagerTest.java +++ b/server/src/test/java/org/apache/kafka/server/AssignmentsManagerTest.java @@ -45,11 +45,14 @@ import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; import static org.apache.kafka.metadata.AssignmentsHelper.buildRequestData; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.atMostOnce; import static org.mockito.Mockito.doAnswer; @@ -282,19 +285,7 @@ public class AssignmentsManagerTest { doAnswer(invocation -> { AssignReplicasToDirsRequestData request = invocation.getArgument(0, AssignReplicasToDirsRequest.Builder.class).build().data(); ControllerRequestCompletionHandler completionHandler = invocation.getArgument(1, ControllerRequestCompletionHandler.class); - Map<Uuid, Map<TopicIdPartition, Errors>> errors = new HashMap<>(); - for (AssignReplicasToDirsRequestData.DirectoryData directory : request.directories()) { - for (AssignReplicasToDirsRequestData.TopicData topic : directory.topics()) { - for (AssignReplicasToDirsRequestData.PartitionData partition : topic.partitions()) { - TopicIdPartition topicIdPartition = new TopicIdPartition(topic.topicId(), partition.partitionIndex()); - errors.computeIfAbsent(directory.id(), d -> new HashMap<>()).put(topicIdPartition, Errors.NONE); - } - } - } - AssignReplicasToDirsResponseData responseData = AssignmentsHelper.buildResponseData(Errors.NONE.code(), 0, errors); - completionHandler.onComplete(new ClientResponse(null, null, null, - 0L, 0L, false, false, null, null, - new AssignReplicasToDirsResponse(responseData))); + completionHandler.onComplete(buildSuccessfulResponse(request)); return null; }).when(channelManager).sendRequest(any(AssignReplicasToDirsRequest.Builder.class), @@ -310,6 +301,65 @@ public class AssignmentsManagerTest { } } + private static ClientResponse buildSuccessfulResponse(AssignReplicasToDirsRequestData request) { + Map<Uuid, Map<TopicIdPartition, Errors>> errors = new HashMap<>(); + for (AssignReplicasToDirsRequestData.DirectoryData directory : request.directories()) { + for (AssignReplicasToDirsRequestData.TopicData topic : directory.topics()) { + for (AssignReplicasToDirsRequestData.PartitionData partition : topic.partitions()) { + TopicIdPartition topicIdPartition = new TopicIdPartition(topic.topicId(), partition.partitionIndex()); + errors.computeIfAbsent(directory.id(), d -> new HashMap<>()).put(topicIdPartition, Errors.NONE); + } + } + } + AssignReplicasToDirsResponseData responseData = AssignmentsHelper.buildResponseData(Errors.NONE.code(), 0, errors); + ClientResponse response = new ClientResponse(null, null, null, + 0L, 0L, false, false, null, null, + new AssignReplicasToDirsResponse(responseData)); + return response; + } + + @Test + public void testAssignmentCompaction() throws Exception { + // Delay the first controller response to force assignment compaction logic + CompletableFuture<Runnable> completionFuture = new CompletableFuture<>(); + doAnswer(invocation -> { + AssignReplicasToDirsRequestData request = invocation.getArgument(0, AssignReplicasToDirsRequest.Builder.class).build().data(); + ControllerRequestCompletionHandler completionHandler = invocation.getArgument(1, ControllerRequestCompletionHandler.class); + ClientResponse response = buildSuccessfulResponse(request); + Runnable completion = () -> completionHandler.onComplete(response); + if (completionFuture.isDone()) completion.run(); + else completionFuture.complete(completion); + return null; + }).when(channelManager).sendRequest(any(AssignReplicasToDirsRequest.Builder.class), + any(ControllerRequestCompletionHandler.class)); + + CountDownLatch remainingInvocations = new CountDownLatch(20); + Runnable onComplete = () -> { + assertTrue(completionFuture.isDone(), "Premature invocation"); + assertTrue(remainingInvocations.getCount() > 0, "Extra invocation"); + remainingInvocations.countDown(); + }; + Uuid[] dirs = {DIR_1, DIR_2, DIR_3}; + for (int i = 0; i < remainingInvocations.getCount(); i++) { + time.sleep(100); + manager.onAssignment(new TopicIdPartition(TOPIC_1, 0), dirs[i % 3], onComplete); + } + activeWait(completionFuture::isDone); + completionFuture.get().run(); + activeWait(() -> remainingInvocations.getCount() == 0); + } + + void activeWait(Supplier<Boolean> predicate) throws InterruptedException { + TestUtils.waitForCondition(() -> { + boolean conditionSatisfied = predicate.get(); + if (!conditionSatisfied) { + time.sleep(100); + manager.wakeup(); + } + return conditionSatisfied; + }, TestUtils.DEFAULT_MAX_WAIT_MS, 50, null); + } + static Metric findMetric(String name) { for (Map.Entry<MetricName, Metric> entry : KafkaYammerMetrics.defaultRegistry().allMetrics().entrySet()) { MetricName metricName = entry.getKey();