This is an automated email from the ASF dual-hosted git repository.

showuon pushed a commit to branch 3.7
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.7 by this push:
     new ba665a4a89f KAFKA-16365: AssignmentsManager callback handling issues 
(#15521)
ba665a4a89f is described below

commit ba665a4a89f6c0113bbe2e8891118d2d8aa06874
Author: Igor Soarez <i...@soarez.me>
AuthorDate: Tue Apr 2 04:11:32 2024 +0100

    KAFKA-16365: AssignmentsManager callback handling issues (#15521)
    
    When moving replicas between directories in the same broker, future replica 
promotion hinges on acknowledgment from the controller of a change in the 
directory assignment.
    
    ReplicaAlterLogDirsThread relies on AssignmentsManager for a completion 
notification of the directory assignment change.
    
    In its current form, under certain assignment scheduling, 
AssignmentsManager both miss completion notifications, or prematurely trigger 
them.
    
    Reviewers: Luke Chen <show...@gmail.com>, Omnia Ibrahim 
<o.g.h.ibra...@gmail.com>, Gaurav Narula <gaurav_naru...@apple.com>
---
 .../apache/kafka/server/AssignmentsManager.java    | 31 ++++-----
 .../kafka/server/AssignmentsManagerTest.java       | 76 ++++++++++++++++++----
 2 files changed, 76 insertions(+), 31 deletions(-)

diff --git 
a/server/src/main/java/org/apache/kafka/server/AssignmentsManager.java 
b/server/src/main/java/org/apache/kafka/server/AssignmentsManager.java
index 0885f4e4fbc..17d0d56178d 100644
--- a/server/src/main/java/org/apache/kafka/server/AssignmentsManager.java
+++ b/server/src/main/java/org/apache/kafka/server/AssignmentsManager.java
@@ -181,9 +181,6 @@ public class AssignmentsManager {
             if (!partition.equals(other.partition)) {
                 throw new IllegalArgumentException("Cannot merge events for 
different partitions");
             }
-            if (!dirId.equals(other.dirId)) {
-                throw new IllegalArgumentException("Cannot merge events for 
different directories");
-            }
             completionHandlers.addAll(other.completionHandlers);
         }
         void onComplete() {
@@ -193,25 +190,29 @@ public class AssignmentsManager {
         }
         @Override
         public void run() throws Exception {
+            log.trace("Received assignment {}", this);
             AssignmentEvent existing = pending.getOrDefault(partition, null);
+            boolean existingIsInFlight = false;
             if (existing == null && inflight != null) {
                 existing = inflight.getOrDefault(partition, null);
+                existingIsInFlight = true;
             }
             if (existing != null) {
                 if (existing.dirId.equals(dirId)) {
                     existing.merge(this);
-                    if (log.isDebugEnabled()) log.debug("Ignoring duplicate 
assignment {}", this);
+                    log.debug("Ignoring duplicate assignment {}", this);
                     return;
                 }
                 if (existing.timestampNs > timestampNs) {
-                    existing.onComplete();
-                    if (log.isDebugEnabled()) log.debug("Dropping assignment 
{} because it's older than {}", this, existing);
+                    existing.merge(this);
+                    log.debug("Dropping assignment {} because it's older than 
existing {}", this, existing);
                     return;
+                } else if (!existingIsInFlight) {
+                    this.merge(existing);
+                    log.debug("Dropping existing assignment {} because it's 
older than {}", existing, this);
                 }
             }
-            if (log.isDebugEnabled()) {
-                log.debug("Received new assignment {}", this);
-            }
+            log.debug("Queueing new assignment {}", this);
             pending.put(partition, this);
 
             if (inflight == null || inflight.isEmpty()) {
@@ -272,9 +273,7 @@ public class AssignmentsManager {
             }
             Map<TopicIdPartition, Uuid> assignment = 
inflight.entrySet().stream()
                     .collect(Collectors.toMap(Map.Entry::getKey, e -> 
e.getValue().dirId));
-            if (log.isDebugEnabled()) {
-                log.debug("Dispatching {} assignments:  {}", 
assignment.size(), assignment);
-            }
+            log.debug("Dispatching {} assignments:  {}", assignment.size(), 
assignment);
             channelManager.sendRequest(new AssignReplicasToDirsRequest.Builder(
                     buildRequestData(brokerId, brokerEpochSupplier.get(), 
assignment)),
                     new AssignReplicasToDirsRequestCompletionHandler());
@@ -331,9 +330,7 @@ public class AssignmentsManager {
         }
         @Override
         public void onComplete(ClientResponse response) {
-            if (log.isDebugEnabled()) {
-                log.debug("Received controller response: {}", response);
-            }
+            log.debug("Received controller response: {}", response);
             appendResponseEvent(response);
         }
         void appendResponseEvent(ClientResponse response) {
@@ -352,9 +349,7 @@ public class AssignmentsManager {
     }
 
     private void scheduleDispatch(long delayNs) {
-        if (log.isDebugEnabled()) {
-            log.debug("Scheduling dispatch in {}ns", delayNs);
-        }
+        log.debug("Scheduling dispatch in {}ns", delayNs);
         eventQueue.enqueue(EventQueue.EventInsertionType.DEFERRED, 
DispatchEvent.TAG,
                 new EventQueue.LatestDeadlineFunction(time.nanoseconds() + 
delayNs), new DispatchEvent());
     }
diff --git 
a/server/src/test/java/org/apache/kafka/server/AssignmentsManagerTest.java 
b/server/src/test/java/org/apache/kafka/server/AssignmentsManagerTest.java
index 043afde9386..25d58f4339d 100644
--- a/server/src/test/java/org/apache/kafka/server/AssignmentsManagerTest.java
+++ b/server/src/test/java/org/apache/kafka/server/AssignmentsManagerTest.java
@@ -45,11 +45,14 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
 
 import static org.apache.kafka.metadata.AssignmentsHelper.buildRequestData;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.atMostOnce;
 import static org.mockito.Mockito.doAnswer;
@@ -282,19 +285,7 @@ public class AssignmentsManagerTest {
         doAnswer(invocation -> {
             AssignReplicasToDirsRequestData request = 
invocation.getArgument(0, 
AssignReplicasToDirsRequest.Builder.class).build().data();
             ControllerRequestCompletionHandler completionHandler = 
invocation.getArgument(1, ControllerRequestCompletionHandler.class);
-            Map<Uuid, Map<TopicIdPartition, Errors>> errors = new HashMap<>();
-            for (AssignReplicasToDirsRequestData.DirectoryData directory : 
request.directories()) {
-                for (AssignReplicasToDirsRequestData.TopicData topic : 
directory.topics()) {
-                    for (AssignReplicasToDirsRequestData.PartitionData 
partition : topic.partitions()) {
-                        TopicIdPartition topicIdPartition = new 
TopicIdPartition(topic.topicId(), partition.partitionIndex());
-                        errors.computeIfAbsent(directory.id(), d -> new 
HashMap<>()).put(topicIdPartition, Errors.NONE);
-                    }
-                }
-            }
-            AssignReplicasToDirsResponseData responseData = 
AssignmentsHelper.buildResponseData(Errors.NONE.code(), 0, errors);
-            completionHandler.onComplete(new ClientResponse(null, null, null,
-                    0L, 0L, false, false, null, null,
-                            new AssignReplicasToDirsResponse(responseData)));
+            completionHandler.onComplete(buildSuccessfulResponse(request));
 
             return null;
         
}).when(channelManager).sendRequest(any(AssignReplicasToDirsRequest.Builder.class),
@@ -310,6 +301,65 @@ public class AssignmentsManagerTest {
         }
     }
 
+    private static ClientResponse 
buildSuccessfulResponse(AssignReplicasToDirsRequestData request) {
+        Map<Uuid, Map<TopicIdPartition, Errors>> errors = new HashMap<>();
+        for (AssignReplicasToDirsRequestData.DirectoryData directory : 
request.directories()) {
+            for (AssignReplicasToDirsRequestData.TopicData topic : 
directory.topics()) {
+                for (AssignReplicasToDirsRequestData.PartitionData partition : 
topic.partitions()) {
+                    TopicIdPartition topicIdPartition = new 
TopicIdPartition(topic.topicId(), partition.partitionIndex());
+                    errors.computeIfAbsent(directory.id(), d -> new 
HashMap<>()).put(topicIdPartition, Errors.NONE);
+                }
+            }
+        }
+        AssignReplicasToDirsResponseData responseData = 
AssignmentsHelper.buildResponseData(Errors.NONE.code(), 0, errors);
+        ClientResponse response = new ClientResponse(null, null, null,
+                0L, 0L, false, false, null, null,
+                new AssignReplicasToDirsResponse(responseData));
+        return response;
+    }
+
+    @Test
+    public void testAssignmentCompaction() throws Exception {
+        // Delay the first controller response to force assignment compaction 
logic
+        CompletableFuture<Runnable> completionFuture = new 
CompletableFuture<>();
+        doAnswer(invocation -> {
+            AssignReplicasToDirsRequestData request = 
invocation.getArgument(0, 
AssignReplicasToDirsRequest.Builder.class).build().data();
+            ControllerRequestCompletionHandler completionHandler = 
invocation.getArgument(1, ControllerRequestCompletionHandler.class);
+            ClientResponse response = buildSuccessfulResponse(request);
+            Runnable completion = () -> completionHandler.onComplete(response);
+            if (completionFuture.isDone()) completion.run();
+            else completionFuture.complete(completion);
+            return null;
+        
}).when(channelManager).sendRequest(any(AssignReplicasToDirsRequest.Builder.class),
+                any(ControllerRequestCompletionHandler.class));
+
+        CountDownLatch remainingInvocations = new CountDownLatch(20);
+        Runnable onComplete = () -> {
+            assertTrue(completionFuture.isDone(), "Premature invocation");
+            assertTrue(remainingInvocations.getCount() > 0, "Extra 
invocation");
+            remainingInvocations.countDown();
+        };
+        Uuid[] dirs = {DIR_1, DIR_2, DIR_3};
+        for (int i = 0; i < remainingInvocations.getCount(); i++) {
+            time.sleep(100);
+            manager.onAssignment(new TopicIdPartition(TOPIC_1, 0), dirs[i % 
3], onComplete);
+        }
+        activeWait(completionFuture::isDone);
+        completionFuture.get().run();
+        activeWait(() -> remainingInvocations.getCount() == 0);
+    }
+
+    void activeWait(Supplier<Boolean> predicate) throws InterruptedException {
+        TestUtils.waitForCondition(() -> {
+            boolean conditionSatisfied = predicate.get();
+            if (!conditionSatisfied) {
+                time.sleep(100);
+                manager.wakeup();
+            }
+            return conditionSatisfied;
+        }, TestUtils.DEFAULT_MAX_WAIT_MS, 50, null);
+    }
+
     static Metric findMetric(String name) {
         for (Map.Entry<MetricName, Metric> entry : 
KafkaYammerMetrics.defaultRegistry().allMetrics().entrySet()) {
             MetricName metricName = entry.getKey();

Reply via email to