This is an automated email from the ASF dual-hosted git repository.

soarez pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e1b2adea07c KAFKA-17190: AssignmentsManager gets stuck retrying on 
deleted topics (#16672)
e1b2adea07c is described below

commit e1b2adea07cebaac6d1138ec18778c575aef57f4
Author: Colin Patrick McCabe <cmcc...@apache.org>
AuthorDate: Sat Aug 10 04:31:45 2024 -0700

    KAFKA-17190: AssignmentsManager gets stuck retrying on deleted topics 
(#16672)
    
    In MetadataVersion 3.7-IV2 and above, the broker's AssignmentsManager sends 
an RPC to the
    controller informing it about which directory we have chosen to place each 
new replica on.
    Unfortunately, the code does not check to see if the topic still exists in 
the MetadataImage before
    sending the RPC. It will also retry infinitely. Therefore, after a topic is 
created and deleted in
    rapid succession, we can get stuck including the now-defunct replica in our 
subsequent
    AssignReplicasToDirsRequests forever.
    
    In order to prevent this problem, the AssignmentsManager should check if a 
topic still exists (and
    is still present on the broker in question) before sending the RPC. In 
order to prevent log spam,
    we should not log any error messages until several minutes have gone past 
without success.
    Finally, rather than creating a new EventQueue event for each assignment 
request, we should simply
    modify a shared data structure and schedule a deferred event to send the 
accumulated RPCs. This
    will improve efficiency.
    
    Reviewers: Igor Soarez <i...@soarez.me>, Ron Dagostino <rndg...@gmail.com>
---
 build.gradle                                       |   1 +
 checkstyle/import-control-server.xml               |   2 +
 .../kafka/common/utils/ExponentialBackoff.java     |   4 +
 .../src/main/scala/kafka/server/BrokerServer.scala |   6 +-
 .../kafka/server/metadata/KRaftMetadataCache.scala |   4 +
 .../java/org/apache/kafka/image/ClusterImage.java  |   8 +
 .../org/apache/kafka/image/ClusterImageTest.java   |  11 +
 .../kafka/server/common/DirectoryEventHandler.java |   1 +
 .../java/org/apache/kafka/server/Assignment.java   | 128 ++++
 .../apache/kafka/server/AssignmentsManager.java    | 675 +++++++++---------
 .../server/AssignmentsManagerDeadlineFunction.java |  96 +++
 .../org/apache/kafka/server/AssignmentTest.java    | 126 ++++
 .../AssignmentsManagerDeadlineFunctionTest.java    |  82 +++
 .../kafka/server/AssignmentsManagerTest.java       | 784 +++++++++++----------
 14 files changed, 1219 insertions(+), 709 deletions(-)

diff --git a/build.gradle b/build.gradle
index 2bec58209d4..41229843eec 100644
--- a/build.gradle
+++ b/build.gradle
@@ -841,6 +841,7 @@ project(':server') {
 
   dependencies {
     implementation project(':clients')
+    implementation project(':metadata')
     implementation project(':server-common')
     implementation project(':storage')
     implementation project(':group-coordinator')
diff --git a/checkstyle/import-control-server.xml 
b/checkstyle/import-control-server.xml
index 27b7bcda8df..22ab6a449e6 100644
--- a/checkstyle/import-control-server.xml
+++ b/checkstyle/import-control-server.xml
@@ -80,6 +80,8 @@
   <allow pkg="org.apache.kafka.raft" />
 
   <subpackage name="server">
+    <allow pkg="org.apache.kafka.server" />
+    <allow pkg="org.apache.kafka.image" />
     <subpackage name="metrics">
       <allow 
class="org.apache.kafka.server.authorizer.AuthorizableRequestContext" />
       <allow pkg="org.apache.kafka.server.telemetry" />
diff --git 
a/clients/src/main/java/org/apache/kafka/common/utils/ExponentialBackoff.java 
b/clients/src/main/java/org/apache/kafka/common/utils/ExponentialBackoff.java
index 64dd9f01318..05994480147 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/utils/ExponentialBackoff.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/utils/ExponentialBackoff.java
@@ -47,6 +47,10 @@ public class ExponentialBackoff {
                 Math.log(maxInterval / (double) Math.max(initialInterval, 1)) 
/ Math.log(multiplier) : 0;
     }
 
+    public long initialInterval() {
+        return initialInterval;
+    }
+
     public long backoff(long attempts) {
         if (expMax == 0) {
             return initialInterval;
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala 
b/core/src/main/scala/kafka/server/BrokerServer.scala
index 265d3abe14b..49c0051dd9b 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -303,9 +303,9 @@ class BrokerServer(
         time,
         assignmentsChannelManager,
         config.brokerId,
-        () => lifecycleManager.brokerEpoch,
-        (directoryId: Uuid) => logManager.directoryPath(directoryId).asJava,
-        (topicId: Uuid) => 
Optional.ofNullable(metadataCache.topicIdsToNames().get(topicId))
+        () => metadataCache.getImage(),
+        (directoryId: Uuid) => logManager.directoryPath(directoryId).
+          getOrElse("[unknown directory path]")
       )
       val directoryEventHandler = new DirectoryEventHandler {
         override def handleAssignment(partition: TopicIdPartition, 
directoryId: Uuid, reason: String, callback: Runnable): Unit =
diff --git a/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala 
b/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
index 56a90e4c94c..f8353df8053 100644
--- a/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
+++ b/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
@@ -530,6 +530,10 @@ class KRaftMetadataCache(
     _currentImage = newImage
   }
 
+  def getImage(): MetadataImage = {
+    _currentImage
+  }
+
   override def config(configResource: ConfigResource): Properties =
     _currentImage.configs().configProperties(configResource)
 
diff --git a/metadata/src/main/java/org/apache/kafka/image/ClusterImage.java 
b/metadata/src/main/java/org/apache/kafka/image/ClusterImage.java
index 4f42cecf5da..17bb385bad8 100644
--- a/metadata/src/main/java/org/apache/kafka/image/ClusterImage.java
+++ b/metadata/src/main/java/org/apache/kafka/image/ClusterImage.java
@@ -70,6 +70,14 @@ public final class ClusterImage {
         return brokers.containsKey(brokerId);
     }
 
+    public long brokerEpoch(int brokerId) {
+        BrokerRegistration brokerRegistration = broker(brokerId);
+        if (brokerRegistration == null) {
+            return -1L;
+        }
+        return brokerRegistration.epoch();
+    }
+
     public void write(ImageWriter writer, ImageWriterOptions options) {
         for (BrokerRegistration broker : brokers.values()) {
             writer.write(broker.toRecord(options));
diff --git 
a/metadata/src/test/java/org/apache/kafka/image/ClusterImageTest.java 
b/metadata/src/test/java/org/apache/kafka/image/ClusterImageTest.java
index cc7e04ed1a5..cf79d276b84 100644
--- a/metadata/src/test/java/org/apache/kafka/image/ClusterImageTest.java
+++ b/metadata/src/test/java/org/apache/kafka/image/ClusterImageTest.java
@@ -317,4 +317,15 @@ public class ClusterImageTest {
                 build());
         assertEquals("controller registration data", lossString.get());
     }
+
+    @Test
+    public void testBrokerEpoch() {
+        assertEquals(123L, IMAGE1.brokerEpoch(2));
+    }
+
+    @Test
+    public void testBrokerEpochForNonExistentBroker() {
+        assertEquals(-1L, IMAGE1.brokerEpoch(20));
+    }
+
 }
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/common/DirectoryEventHandler.java
 
b/server-common/src/main/java/org/apache/kafka/server/common/DirectoryEventHandler.java
index 274b5b9422a..b7104986738 100644
--- 
a/server-common/src/main/java/org/apache/kafka/server/common/DirectoryEventHandler.java
+++ 
b/server-common/src/main/java/org/apache/kafka/server/common/DirectoryEventHandler.java
@@ -33,6 +33,7 @@ public interface DirectoryEventHandler {
      * Handle the assignment of a topic partition to a directory.
      * @param directoryId  The directory ID
      * @param partition    The topic partition
+     * @param reason       The reason
      * @param callback     Callback to apply when the request is completed.
      */
     void handleAssignment(TopicIdPartition partition, Uuid directoryId, String 
reason, Runnable callback);
diff --git a/server/src/main/java/org/apache/kafka/server/Assignment.java 
b/server/src/main/java/org/apache/kafka/server/Assignment.java
new file mode 100644
index 00000000000..582c638cb7e
--- /dev/null
+++ b/server/src/main/java/org/apache/kafka/server/Assignment.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.image.TopicImage;
+import org.apache.kafka.metadata.PartitionRegistration;
+import org.apache.kafka.metadata.Replicas;
+import org.apache.kafka.server.common.TopicIdPartition;
+
+import java.util.Objects;
+
+final class Assignment {
+    /**
+     * The topic ID and partition index of the replica.
+     */
+    private final TopicIdPartition topicIdPartition;
+
+    /**
+     * The ID of the directory we are placing the replica into.
+     */
+    private final Uuid directoryId;
+
+    /**
+     * The time in monotonic nanosecond when this assignment was created.
+     */
+    private final long submissionTimeNs;
+
+    /**
+     * The callback to invoke on success.
+     */
+    private final Runnable successCallback;
+
+    Assignment(
+        TopicIdPartition topicIdPartition,
+        Uuid directoryId,
+        long submissionTimeNs,
+        Runnable successCallback
+    ) {
+        this.topicIdPartition = topicIdPartition;
+        this.directoryId = directoryId;
+        this.submissionTimeNs = submissionTimeNs;
+        this.successCallback = successCallback;
+    }
+
+    TopicIdPartition topicIdPartition() {
+        return topicIdPartition;
+    }
+
+    Uuid directoryId() {
+        return directoryId;
+    }
+
+    long submissionTimeNs() {
+        return submissionTimeNs;
+    }
+
+    Runnable successCallback() {
+        return successCallback;
+    }
+
+    /**
+     * Check if this Assignment is still valid to be sent.
+     *
+     * @param nodeId    The broker ID.
+     * @param image     The metadata image.
+     *
+     * @return          True only if the Assignment is still valid.
+     */
+    boolean valid(int nodeId, MetadataImage image) {
+        TopicImage topicImage = 
image.topics().getTopic(topicIdPartition.topicId());
+        if (topicImage == null) {
+            return false; // The topic has been deleted.
+        }
+        PartitionRegistration partition = 
topicImage.partitions().get(topicIdPartition.partitionId());
+        if (partition == null) {
+            return false; // The partition no longer exists.
+        }
+        // Check if this broker is still a replica.
+        return Replicas.contains(partition.replicas, nodeId);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (o == null || (!(o instanceof Assignment))) return false;
+        Assignment other = (Assignment) o;
+        return topicIdPartition.equals(other.topicIdPartition) &&
+            directoryId.equals(other.directoryId) &&
+            submissionTimeNs == other.submissionTimeNs &&
+            successCallback.equals(other.successCallback);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(topicIdPartition,
+            directoryId,
+            submissionTimeNs,
+            successCallback);
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder bld = new StringBuilder();
+        bld.append("Assignment");
+        bld.append("(topicIdPartition=").append(topicIdPartition);
+        bld.append(", directoryId=").append(directoryId);
+        bld.append(", submissionTimeNs=").append(submissionTimeNs);
+        bld.append(", successCallback=").append(successCallback);
+        bld.append(")");
+        return bld.toString();
+    }
+}
\ No newline at end of file
diff --git 
a/server/src/main/java/org/apache/kafka/server/AssignmentsManager.java 
b/server/src/main/java/org/apache/kafka/server/AssignmentsManager.java
index 48c9e5a2d6e..d8e1d33b452 100644
--- a/server/src/main/java/org/apache/kafka/server/AssignmentsManager.java
+++ b/server/src/main/java/org/apache/kafka/server/AssignmentsManager.java
@@ -30,442 +30,443 @@ import 
org.apache.kafka.common.requests.AssignReplicasToDirsResponse;
 import org.apache.kafka.common.utils.ExponentialBackoff;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.image.TopicImage;
 import org.apache.kafka.queue.EventQueue;
 import org.apache.kafka.queue.KafkaEventQueue;
 import org.apache.kafka.server.common.TopicIdPartition;
-import org.apache.kafka.server.metrics.KafkaMetricsGroup;
+import org.apache.kafka.server.metrics.KafkaYammerMetrics;
+
+import com.yammer.metrics.core.Gauge;
+import com.yammer.metrics.core.MetricName;
+import com.yammer.metrics.core.MetricsRegistry;
 
 import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
-import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
+import java.util.Iterator;
 import java.util.Map;
-import java.util.Objects;
 import java.util.Optional;
-import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 import java.util.function.Supplier;
-import java.util.stream.Collectors;
 
-public class AssignmentsManager {
+import static 
org.apache.kafka.common.requests.AssignReplicasToDirsRequest.MAX_ASSIGNMENTS_PER_REQUEST;
+
+public final class AssignmentsManager {
+    static final ExponentialBackoff STANDARD_BACKOFF = new ExponentialBackoff(
+            TimeUnit.MILLISECONDS.toNanos(100),
+            2,
+            TimeUnit.SECONDS.toNanos(10),
+            0.02);
 
-    private static final Logger log = 
LoggerFactory.getLogger(AssignmentsManager.class);
+    /**
+     * The minimum amount of time we will wait before logging individual 
assignment failures.
+     */
+    static final long MIN_NOISY_FAILURE_INTERVAL_NS = 
TimeUnit.MINUTES.toNanos(2);
 
     /**
-     * Assignments are dispatched to the controller this long after
-     * being submitted to {@link AssignmentsManager}, if there
-     * is no request in flight already.
-     * The interval is reset when a new assignment is submitted.
-     * If {@link AssignReplicasToDirsRequest#MAX_ASSIGNMENTS_PER_REQUEST}
-     * is reached, we ignore this interval and dispatch immediately.
+     * The metric reflecting the number of pending assignments.
      */
-    private static final long DISPATCH_INTERVAL_NS = 
TimeUnit.MILLISECONDS.toNanos(500);
+    static final MetricName QUEUED_REPLICA_TO_DIR_ASSIGNMENTS_METRIC =
+            metricName("QueuedReplicaToDirAssignments");
 
-    private static final long MAX_BACKOFF_INTERVAL_MS = 
TimeUnit.SECONDS.toNanos(10);
+    /**
+     * The event at which we send assignments, if appropriate.
+     */
+    static final String MAYBE_SEND_ASSIGNMENTS_EVENT = 
"MaybeSendAssignmentsEvent";
 
-    // visible for testing.
-    static final String QUEUE_REPLICA_TO_DIR_ASSIGNMENTS_METRIC_NAME = 
"QueuedReplicaToDirAssignments";
+    /**
+     * The log4j object.
+     */
+    private final Logger log;
 
+    /**
+     * The exponential backoff strategy to use.
+     */
+    private final ExponentialBackoff backoff;
+
+    /**
+     * The clock object to use.
+     */
     private final Time time;
+
+    /**
+     * Used to send messages to the controller.
+     */
     private final NodeToControllerChannelManager channelManager;
-    private final int brokerId;
-    private final Supplier<Long> brokerEpochSupplier;
+
+    /**
+     * The node ID.
+     */
+    private final int nodeId;
+
+    /**
+     * Supplies the latest MetadataImage.
+     */
+    private final Supplier<MetadataImage> metadataImageSupplier;
+
+    /**
+     * Maps directory IDs to descriptions for logging purposes.
+     */
+    private final Function<Uuid, String> directoryIdToDescription;
+
+    /**
+     * Maps partitions to assignments that are ready to send.
+     */
+    private final ConcurrentHashMap<TopicIdPartition, Assignment> ready;
+
+    /**
+     * Maps partitions to assignments that are in-flight. Older entries come 
first.
+     */
+    private volatile Map<TopicIdPartition, Assignment> inflight;
+
+    /**
+     * The registry to register our metrics with.
+     */
+    private final MetricsRegistry metricsRegistry;
+
+    /**
+     * The number of global failures we had previously (cleared after any 
success).
+     */
+    private int previousGlobalFailures;
+
+    /**
+     * The event queue.
+     */
     private final KafkaEventQueue eventQueue;
-    private final KafkaMetricsGroup metricsGroup = new 
KafkaMetricsGroup(this.getClass());
-
-    // These variables should only be mutated from the KafkaEventQueue thread,
-    // but `inflight` and `pending` are also read from a Yammer metrics gauge.
-    private volatile Map<TopicIdPartition, AssignmentEvent> inflight = null;
-    private volatile Map<TopicIdPartition, AssignmentEvent> pending = new 
HashMap<>();
-    private final ExponentialBackoff resendExponentialBackoff =
-            new ExponentialBackoff(100, 2, MAX_BACKOFF_INTERVAL_MS, 0.02);
-    private final Function<Uuid, Optional<String>> dirIdToPath;
-    private final Function<Uuid, Optional<String>> topicIdToName;
-    private int failedAttempts = 0;
-
-    public AssignmentsManager(Time time,
-                              NodeToControllerChannelManager channelManager,
-                              int brokerId,
-                              Supplier<Long> brokerEpochSupplier,
-                              Function<Uuid, Optional<String>> dirIdToPath,
-                              Function<Uuid, Optional<String>> topicIdToName) {
+
+    static MetricName metricName(String name) {
+        return KafkaYammerMetrics.getMetricName("org.apache.kafka.server", 
"AssignmentsManager", name);
+    }
+
+    public AssignmentsManager(
+        Time time,
+        NodeToControllerChannelManager channelManager,
+        int nodeId,
+        Supplier<MetadataImage> metadataImageSupplier,
+        Function<Uuid, String> directoryIdToDescription
+    ) {
+        this(STANDARD_BACKOFF,
+            time,
+            channelManager,
+            nodeId,
+            metadataImageSupplier,
+            directoryIdToDescription,
+            KafkaYammerMetrics.defaultRegistry());
+    }
+
+    AssignmentsManager(
+        ExponentialBackoff backoff,
+        Time time,
+        NodeToControllerChannelManager channelManager,
+        int nodeId,
+        Supplier<MetadataImage> metadataImageSupplier,
+        Function<Uuid, String> directoryIdToDescription,
+        MetricsRegistry metricsRegistry
+    ) {
+        this.log = new LogContext("[AssignmentsManager id=" + nodeId + "] ").
+            logger(AssignmentsManager.class);
+        this.backoff = backoff;
         this.time = time;
         this.channelManager = channelManager;
-        this.brokerId = brokerId;
-        this.brokerEpochSupplier = brokerEpochSupplier;
+        this.nodeId = nodeId;
+        this.directoryIdToDescription = directoryIdToDescription;
+        this.metadataImageSupplier = metadataImageSupplier;
+        this.ready = new ConcurrentHashMap<>();
+        this.inflight = Collections.emptyMap();
+        this.metricsRegistry = metricsRegistry;
+        
this.metricsRegistry.newGauge(QUEUED_REPLICA_TO_DIR_ASSIGNMENTS_METRIC, new 
Gauge<Integer>() {
+                @Override
+                public Integer value() {
+                    return numPending();
+                }
+            });
+        this.previousGlobalFailures = 0;
         this.eventQueue = new KafkaEventQueue(time,
-                new LogContext("[AssignmentsManager id=" + brokerId + "]"),
-                "broker-" + brokerId + "-directory-assignments-manager-",
-                new ShutdownEvent());
+            new LogContext("[AssignmentsManager id=" + nodeId + "]"),
+            "broker-" + nodeId + "-directory-assignments-manager-",
+            new ShutdownEvent());
         channelManager.start();
-        
this.metricsGroup.newGauge(QUEUE_REPLICA_TO_DIR_ASSIGNMENTS_METRIC_NAME, () -> 
getMapSize(inflight) + getMapSize(pending));
-        if (dirIdToPath == null) dirIdToPath = id -> Optional.empty();
-        this.dirIdToPath = dirIdToPath;
-        if (topicIdToName == null) topicIdToName = id -> Optional.empty();
-        this.topicIdToName = topicIdToName;
     }
 
-    public void close() throws InterruptedException {
-        try {
-            eventQueue.close();
-        } finally {
-            
metricsGroup.removeMetric(QUEUE_REPLICA_TO_DIR_ASSIGNMENTS_METRIC_NAME);
-        }
+    public int numPending() {
+        return ready.size() + inflight.size();
     }
 
-    public void onAssignment(TopicIdPartition topicPartition, Uuid dirId, 
String reason) {
-        onAssignment(topicPartition, dirId, reason, null);
+    public void close() throws InterruptedException {
+        eventQueue.close();
     }
 
-    public void onAssignment(TopicIdPartition topicPartition, Uuid dirId, 
String reason, Runnable callback) {
-        if (callback == null) {
-            callback = () -> { };
+    public void onAssignment(
+        TopicIdPartition topicIdPartition,
+        Uuid directoryId,
+        String reason,
+        Runnable successCallback
+    ) {
+        long nowNs = time.nanoseconds();
+        Assignment assignment = new Assignment(
+                topicIdPartition, directoryId, nowNs, successCallback);
+        ready.put(topicIdPartition, assignment);
+        if (log.isTraceEnabled()) {
+            String topicDescription = 
Optional.ofNullable(metadataImageSupplier.get().topics().
+                getTopic(assignment.topicIdPartition().topicId())).
+                    
map(TopicImage::name).orElse(assignment.topicIdPartition().topicId().toString());
+            log.trace("Registered assignment {}: {}, moving {}-{} into {}",
+                assignment,
+                reason,
+                topicDescription,
+                topicIdPartition.partitionId(),
+                directoryIdToDescription.apply(assignment.directoryId()));
         }
-        AssignmentEvent assignment = new AssignmentEvent(time.nanoseconds(), 
topicPartition, dirId, reason, callback);
-        if (log.isDebugEnabled()) {
-            log.debug("Queued assignment {}", assignment);
-        }
-        eventQueue.append(assignment);
+        rescheduleMaybeSendAssignmentsEvent(nowNs);
     }
 
-    // only for testing
-    void wakeup() {
-        eventQueue.wakeup();
+    void rescheduleMaybeSendAssignmentsEvent(long nowNs) {
+        eventQueue.scheduleDeferred(MAYBE_SEND_ASSIGNMENTS_EVENT,
+            new AssignmentsManagerDeadlineFunction(backoff,
+                nowNs, previousGlobalFailures, !inflight.isEmpty(), 
ready.size()),
+            new MaybeSendAssignmentsEvent());
     }
 
     /**
-     * Base class for all the events handled by {@link AssignmentsManager}.
+     * Handles shutdown.
      */
-    private abstract static class Event implements EventQueue.Event {
-        /**
-         * Override the default behavior in
-         * {@link EventQueue.Event#handleException}
-         * which swallows the exception.
-         */
+    private class ShutdownEvent implements EventQueue.Event {
         @Override
-        public void handleException(Throwable e) {
-            log.error("Unexpected error handling {}", this, e);
+        public void run() {
+            log.info("shutting down.");
+            try {
+                channelManager.shutdown();
+            } catch (Exception e) {
+                log.error("Unexpected exception shutting down 
NodeToControllerChannelManager", e);
+            }
+            try {
+                
metricsRegistry.removeMetric(QUEUED_REPLICA_TO_DIR_ASSIGNMENTS_METRIC);
+            } catch (Exception e) {
+                log.error("Unexpected exception removing metrics.", e);
+            }
         }
     }
 
     /**
-     * Handles shutdown of the {@link AssignmentsManager}.
+     * An event that processes the assignments in the ready map.
      */
-    private class ShutdownEvent extends Event {
+    private class MaybeSendAssignmentsEvent implements EventQueue.Event {
         @Override
         public void run() {
-            channelManager.shutdown();
+            try {
+                maybeSendAssignments();
+            } catch (Exception e) {
+                log.error("Unexpected exception in MaybeSendAssignmentsEvent", 
e);
+            }
         }
     }
 
     /**
-     * Handles new generated assignments, to be propagated to the controller.
-     * Assignment events may be handled out of order, so for any two assignment
-     * events for the same topic partition, the one with the oldest timestamp 
is
-     * disregarded.
+     * An event that handles the controller's response to our request.
      */
-    private class AssignmentEvent extends Event {
-        final long timestampNs;
-        final TopicIdPartition partition;
-        final Uuid dirId;
-        final String reason;
-        final List<Runnable> completionHandlers;
-        AssignmentEvent(long timestampNs, TopicIdPartition partition, Uuid 
dirId, String reason, Runnable onComplete) {
-            this.timestampNs = timestampNs;
-            this.partition = Objects.requireNonNull(partition);
-            this.dirId = Objects.requireNonNull(dirId);
-            this.reason = reason;
-            this.completionHandlers = new ArrayList<>();
-            if (onComplete != null) {
-                completionHandlers.add(onComplete);
-            }
-        }
-        void merge(AssignmentEvent other) {
-            if (!partition.equals(other.partition)) {
-                throw new IllegalArgumentException("Cannot merge events for 
different partitions");
-            }
-            completionHandlers.addAll(other.completionHandlers);
-        }
-        void onComplete() {
-            for (Runnable onComplete : completionHandlers) {
-                onComplete.run();
-            }
+    private class HandleResponseEvent implements EventQueue.Event {
+        private final Map<TopicIdPartition, Assignment> sent;
+        private final Optional<ClientResponse> response;
+
+        HandleResponseEvent(
+            Map<TopicIdPartition, Assignment> sent,
+            Optional<ClientResponse> response
+        ) {
+            this.sent = sent;
+            this.response = response;
         }
+
         @Override
         public void run() {
-            log.trace("Received assignment {}", this);
-            AssignmentEvent existing = pending.getOrDefault(partition, null);
-            boolean existingIsInFlight = false;
-            if (existing == null && inflight != null) {
-                existing = inflight.getOrDefault(partition, null);
-                existingIsInFlight = true;
-            }
-            if (existing != null) {
-                if (existing.dirId.equals(dirId)) {
-                    existing.merge(this);
-                    log.debug("Ignoring duplicate assignment {}", this);
-                    return;
+            try {
+                handleResponse(sent, response);
+            } catch (Exception e) {
+                log.error("Unexpected exception in HandleResponseEvent", e);
+            } finally {
+                if (!ready.isEmpty()) {
+                    rescheduleMaybeSendAssignmentsEvent(time.nanoseconds());
                 }
-                if (existing.timestampNs > timestampNs) {
-                    existing.merge(this);
-                    log.debug("Dropping assignment {} because it's older than 
existing {}", this, existing);
-                    return;
-                } else if (!existingIsInFlight) {
-                    this.merge(existing);
-                    log.debug("Dropping existing assignment {} because it's 
older than {}", existing, this);
-                }
-            }
-            log.debug("Queueing new assignment {}", this);
-            pending.put(partition, this);
-
-            if (inflight == null || inflight.isEmpty()) {
-                scheduleDispatch();
             }
         }
-        @Override
-        public String toString() {
-            String partitionString = topicIdToName.apply(partition.topicId())
-                    .map(name -> name + ":" + partition.partitionId())
-                    .orElseGet(() -> "<topic name unknown id: " + 
partition.topicId() + " partition: " + partition.partitionId() + ">");
-            String dirString = dirIdToPath.apply(dirId)
-                    .orElseGet(() -> "<dir path unknown id:" + dirId + ">");
-            return "Assignment{" +
-                    "timestampNs=" + timestampNs +
-                    ", partition=" + partitionString +
-                    ", dir=" + dirString +
-                    ", reason='" + reason + '\'' +
-                    '}';
-        }
-        @Override
-        public boolean equals(Object o) {
-            if (this == o) return true;
-            if (o == null || getClass() != o.getClass()) return false;
-            AssignmentEvent that = (AssignmentEvent) o;
-            return timestampNs == that.timestampNs
-                    && Objects.equals(partition, that.partition)
-                    && Objects.equals(dirId, that.dirId)
-                    && Objects.equals(reason, that.reason);
-        }
-        @Override
-        public int hashCode() {
-            return Objects.hash(timestampNs, partition, dirId, reason);
-        }
     }
 
     /**
-     * Gathers pending assignments and pushes them to the controller in a 
{@link AssignReplicasToDirsRequest}.
+     * A callback object that handles the controller's response to our request.
      */
-    private class DispatchEvent extends Event {
-        static final String TAG = "dispatch";
+    private class CompletionHandler implements 
ControllerRequestCompletionHandler {
+        private final Map<TopicIdPartition, Assignment> sent;
+
+        CompletionHandler(Map<TopicIdPartition, Assignment> sent) {
+            this.sent = sent;
+        }
+
         @Override
-        public void run() {
-            if (inflight != null) {
-                throw new IllegalStateException("Bug. Should not be 
dispatching while there are assignments in flight");
-            }
-            if (pending.isEmpty()) {
-                log.trace("No pending assignments, no-op dispatch");
-                return;
-            }
-            Collection<AssignmentEvent> events = pending.values();
-            pending = new HashMap<>();
-            inflight = new HashMap<>();
-            for (AssignmentEvent event : events) {
-                if (inflight.size() < 
AssignReplicasToDirsRequest.MAX_ASSIGNMENTS_PER_REQUEST) {
-                    inflight.put(event.partition, event);
-                } else {
-                    pending.put(event.partition, event);
-                }
-            }
-            if (!pending.isEmpty()) {
-                log.warn("Too many assignments ({}) to fit in one call, 
sending only {} and queueing the rest",
-                        
AssignReplicasToDirsRequest.MAX_ASSIGNMENTS_PER_REQUEST + pending.size(),
-                        
AssignReplicasToDirsRequest.MAX_ASSIGNMENTS_PER_REQUEST);
-            }
-            Map<TopicIdPartition, Uuid> assignment = 
inflight.entrySet().stream()
-                    .collect(Collectors.toMap(Map.Entry::getKey, e -> 
e.getValue().dirId));
-            log.debug("Dispatching {} assignments:  {}", assignment.size(), 
assignment);
-            channelManager.sendRequest(new AssignReplicasToDirsRequest.Builder(
-                    buildRequestData(brokerId, brokerEpochSupplier.get(), 
assignment)),
-                    new AssignReplicasToDirsRequestCompletionHandler());
+        public void onTimeout() {
+            eventQueue.append(new HandleResponseEvent(sent, Optional.empty()));
+        }
+
+        @Override
+        public void onComplete(ClientResponse response) {
+            eventQueue.append(new HandleResponseEvent(sent, 
Optional.of(response)));
         }
     }
 
-    /**
-     * Handles the response to a dispatched {@link 
AssignReplicasToDirsRequest}.
-     */
-    private class AssignmentResponseEvent extends Event {
-        private final ClientResponse response;
-        public AssignmentResponseEvent(ClientResponse response) {
-            this.response = response;
+    void maybeSendAssignments() {
+        int inflightSize = inflight.size();
+        if (log.isTraceEnabled()) {
+            log.trace("maybeSendAssignments: inflightSize = {}.", 
inflightSize);
         }
-        @Override
-        public void run() {
-            if (inflight == null) {
-                throw new IllegalStateException("Bug. Cannot not be handling a 
client response if there is are no assignments in flight");
-            }
-            if (responseIsError(response)) {
-                requeueAllAfterFailure();
+        if (inflightSize > 0) {
+            log.trace("maybeSendAssignments: cannot send new assignments 
because there are " +
+                "{} still in flight.", inflightSize);
+            return;
+        }
+        MetadataImage image = metadataImageSupplier.get();
+        Map<TopicIdPartition, Assignment> newInFlight = new HashMap<>();
+        int numInvalid = 0;
+        for (Iterator<Assignment> iterator = ready.values().iterator();
+             iterator.hasNext() && newInFlight.size() < 
MAX_ASSIGNMENTS_PER_REQUEST;
+             ) {
+            Assignment assignment = iterator.next();
+            iterator.remove();
+            if (assignment.valid(nodeId, image)) {
+                newInFlight.put(assignment.topicIdPartition(), assignment);
             } else {
-                failedAttempts = 0;
-                AssignReplicasToDirsResponseData data = 
((AssignReplicasToDirsResponse) response.responseBody()).data();
-
-                Set<AssignmentEvent> failed = filterFailures(data, inflight);
-                Set<AssignmentEvent> completed = Utils.diff(HashSet::new, new 
HashSet<>(inflight.values()), failed);
-                for (AssignmentEvent assignmentEvent : completed) {
-                    if (log.isDebugEnabled()) {
-                        log.debug("Successfully propagated assignment {}", 
assignmentEvent);
-                    }
-                    assignmentEvent.onComplete();
-                }
-
-                if (!failed.isEmpty()) {
-                    log.warn("Re-queueing assignments: {}", failed);
-                    for (AssignmentEvent event : failed) {
-                        pending.put(event.partition, event);
-                    }
-                }
-                inflight = null;
-                if (!pending.isEmpty()) {
-                    scheduleDispatch();
-                }
+                numInvalid++;
             }
         }
+        log.info("maybeSendAssignments: sending {} assignments; invalidated {} 
assignments " +
+            "prior to sending.", newInFlight.size(), numInvalid);
+        if (!newInFlight.isEmpty()) {
+            sendAssignments(image.cluster().brokerEpoch(nodeId), newInFlight);
+        }
     }
 
-    /**
-     * Callback for a {@link AssignReplicasToDirsRequest}.
-     */
-    private class AssignReplicasToDirsRequestCompletionHandler implements 
ControllerRequestCompletionHandler {
-        @Override
-        public void onTimeout() {
-            log.warn("Request to controller timed out");
-            appendResponseEvent(null);
+    void sendAssignments(long brokerEpoch, Map<TopicIdPartition, Assignment> 
newInflight) {
+        CompletionHandler completionHandler = new 
CompletionHandler(newInflight);
+        channelManager.sendRequest(new AssignReplicasToDirsRequest.Builder(
+            buildRequestData(nodeId, brokerEpoch, newInflight)),
+            completionHandler);
+        inflight = newInflight;
+    }
+
+    void handleResponse(
+        Map<TopicIdPartition, Assignment> sent,
+        Optional<ClientResponse> assignmentResponse
+    ) {
+        inflight = Collections.emptyMap();
+        Optional<String> globalResponseError = 
globalResponseError(assignmentResponse);
+        if (globalResponseError.isPresent()) {
+            previousGlobalFailures++;
+            log.error("handleResponse: {} assignments failed; global error: 
{}. Retrying.",
+                sent.size(), globalResponseError.get());
+            sent.entrySet().forEach(e -> ready.putIfAbsent(e.getKey(), 
e.getValue()));
+            return;
         }
-        @Override
-        public void onComplete(ClientResponse response) {
-            log.debug("Received controller response: {}", response);
-            appendResponseEvent(response);
+        previousGlobalFailures = 0;
+        AssignReplicasToDirsResponseData responseData =
+            ((AssignReplicasToDirsResponse) 
assignmentResponse.get().responseBody()).data();
+        long nowNs = time.nanoseconds();
+        for (AssignReplicasToDirsResponseData.DirectoryData directoryData : 
responseData.directories()) {
+            for (AssignReplicasToDirsResponseData.TopicData topicData : 
directoryData.topics()) {
+                for (AssignReplicasToDirsResponseData.PartitionData 
partitionData : topicData.partitions()) {
+                    TopicIdPartition topicIdPartition =
+                        new TopicIdPartition(topicData.topicId(), 
partitionData.partitionIndex());
+                    handleAssignmentResponse(topicIdPartition, sent,
+                            Errors.forCode(partitionData.errorCode()), nowNs);
+                    sent.remove(topicIdPartition);
+                }
+            }
         }
-        void appendResponseEvent(ClientResponse response) {
-            eventQueue.prepend(new AssignmentResponseEvent(response));
+        for (Assignment assignment : sent.values()) {
+            ready.putIfAbsent(assignment.topicIdPartition(), assignment);
+            log.error("handleResponse: no result in response for partition 
{}.",
+                assignment.topicIdPartition());
         }
     }
 
-    private void scheduleDispatch() {
-        if (pending.size() < 
AssignReplicasToDirsRequest.MAX_ASSIGNMENTS_PER_REQUEST) {
-            scheduleDispatch(DISPATCH_INTERVAL_NS);
+    void handleAssignmentResponse(
+        TopicIdPartition topicIdPartition,
+        Map<TopicIdPartition, Assignment> sent,
+        Errors error,
+        long nowNs
+    ) {
+        Assignment assignment = sent.get(topicIdPartition);
+        if (assignment == null) {
+            log.error("handleResponse: response contained topicIdPartition {}, 
but this was not " +
+                "in the request.", topicIdPartition);
+        } else if (error.equals(Errors.NONE)) {
+            try {
+                assignment.successCallback().run();
+            } catch (Exception e) {
+                log.error("handleResponse: unexpected callback exception", e);
+            }
         } else {
-            log.debug("Too many pending assignments, dispatching immediately");
-            eventQueue.enqueue(EventQueue.EventInsertionType.APPEND, 
DispatchEvent.TAG + "-immediate",
-                    new EventQueue.NoDeadlineFunction(), new DispatchEvent());
+            ready.putIfAbsent(topicIdPartition, assignment);
+            if (log.isDebugEnabled() || nowNs > assignment.submissionTimeNs() 
+ MIN_NOISY_FAILURE_INTERVAL_NS) {
+                log.error("handleResponse: error assigning {}: {}.", 
assignment.topicIdPartition(), error);
+            }
         }
     }
 
-    private void scheduleDispatch(long delayNs) {
-        log.debug("Scheduling dispatch in {}ns", delayNs);
-        eventQueue.enqueue(EventQueue.EventInsertionType.DEFERRED, 
DispatchEvent.TAG,
-                new EventQueue.LatestDeadlineFunction(time.nanoseconds() + 
delayNs), new DispatchEvent());
+    int previousGlobalFailures() throws ExecutionException, 
InterruptedException {
+        CompletableFuture<Integer> future = new CompletableFuture<>();
+        eventQueue.append(() -> future.complete(previousGlobalFailures));
+        return future.get();
     }
 
-    private void requeueAllAfterFailure() {
-        if (inflight != null) {
-            log.debug("Re-queueing all in-flight assignments after failure");
-            for (AssignmentEvent event : inflight.values()) {
-                pending.put(event.partition, event);
-            }
-            inflight = null;
-            ++failedAttempts;
-            long backoffNs = 
TimeUnit.MILLISECONDS.toNanos(resendExponentialBackoff.backoff(failedAttempts));
-            scheduleDispatch(DISPATCH_INTERVAL_NS + backoffNs);
-        }
+    int numInFlight() {
+        return inflight.size();
     }
 
-    private static boolean responseIsError(ClientResponse response) {
-        if (response == null) {
-            log.error("Response is null");
-            return true;
+    static Optional<String> globalResponseError(Optional<ClientResponse> 
response) {
+        if (!response.isPresent()) {
+            return Optional.of("Timeout");
         }
-        if (response.authenticationException() != null) {
-            log.error("Failed to propagate directory assignments because 
authentication failed", response.authenticationException());
-            return true;
+        if (response.get().authenticationException() != null) {
+            return Optional.of("AuthenticationException");
         }
-        if (response.versionMismatch() != null) {
-            log.error("Failed to propagate directory assignments because the 
request version is unsupported", response.versionMismatch());
-            return true;
+        if (response.get().wasTimedOut()) {
+            return Optional.of("Disonnected[Timeout]");
         }
-        if (response.wasDisconnected()) {
-            log.error("Failed to propagate directory assignments because the 
connection to the controller was disconnected");
-            return true;
+        if (response.get().wasDisconnected()) {
+            return Optional.of("Disconnected");
         }
-        if (response.wasTimedOut()) {
-            log.error("Failed to propagate directory assignments because the 
request timed out");
-            return true;
+        if (response.get().versionMismatch() != null) {
+            return Optional.of("UnsupportedVersionException");
         }
-        if (response.responseBody() == null) {
-            log.error("Failed to propagate directory assignments because the 
Controller returned an empty response");
-            return true;
+        if (response.get().responseBody() == null) {
+            return Optional.of("EmptyResponse");
         }
-        if (!(response.responseBody() instanceof 
AssignReplicasToDirsResponse)) {
-            log.error("Failed to propagate directory assignments because the 
Controller returned an invalid response type");
-            return true;
+        if (!(response.get().responseBody() instanceof 
AssignReplicasToDirsResponse)) {
+            return Optional.of("ClassCastException");
         }
-        AssignReplicasToDirsResponseData data = 
((AssignReplicasToDirsResponse) response.responseBody()).data();
+        AssignReplicasToDirsResponseData data = ((AssignReplicasToDirsResponse)
+            response.get().responseBody()).data();
         Errors error = Errors.forCode(data.errorCode());
         if (error != Errors.NONE) {
-            log.error("Failed to propagate directory assignments because the 
Controller returned error {}", error.name());
-            return true;
+            return Optional.of("Response-level error: " + error.name());
         }
-        return false;
+        return Optional.empty();
     }
 
-    private static Set<AssignmentEvent> filterFailures(
-            AssignReplicasToDirsResponseData data,
-            Map<TopicIdPartition, AssignmentEvent> sent) {
-        Set<AssignmentEvent> failures = new HashSet<>();
-        Set<TopicIdPartition> acknowledged = new HashSet<>();
-        for (AssignReplicasToDirsResponseData.DirectoryData directory : 
data.directories()) {
-            for (AssignReplicasToDirsResponseData.TopicData topic : 
directory.topics()) {
-                for (AssignReplicasToDirsResponseData.PartitionData partition 
: topic.partitions()) {
-                    TopicIdPartition topicPartition = new 
TopicIdPartition(topic.topicId(), partition.partitionIndex());
-                    AssignmentEvent event = sent.get(topicPartition);
-                    if (event == null) {
-                        log.error("AssignReplicasToDirsResponse contains 
unexpected partition {} into directory {}", partition, directory.id());
-                    } else {
-                        acknowledged.add(topicPartition);
-                        Errors error = Errors.forCode(partition.errorCode());
-                        if (error == Errors.NOT_LEADER_OR_FOLLOWER) {
-                            log.info("Dropping late directory assignment for 
partition {} into directory {} because this broker is no longer a replica", 
partition, event.dirId);
-                        } else if (error != Errors.NONE) {
-                            log.error("Controller returned error {} for 
assignment of partition {} into directory {}",
-                                    error.name(), partition, event.dirId);
-                            failures.add(event);
-                        }
-                    }
-                }
-            }
-        }
-        for (AssignmentEvent event : sent.values()) {
-            if (!acknowledged.contains(event.partition)) {
-                log.error("AssignReplicasToDirsResponse is missing assignment 
of partition {} into directory {}", event.partition, event.dirId);
-                failures.add(event);
-            }
-        }
-        return failures;
-    }
-
-    // visible for testing
-    static AssignReplicasToDirsRequestData buildRequestData(int brokerId, long 
brokerEpoch, Map<TopicIdPartition, Uuid> assignment) {
+    static AssignReplicasToDirsRequestData buildRequestData(
+        int nodeId,
+        long brokerEpoch,
+        Map<TopicIdPartition, Assignment> assignments
+    ) {
         Map<Uuid, DirectoryData> directoryMap = new HashMap<>();
         Map<Uuid, Map<Uuid, TopicData>> topicMap = new HashMap<>();
-        for (Map.Entry<TopicIdPartition, Uuid> entry : assignment.entrySet()) {
+        for (Map.Entry<TopicIdPartition, Assignment> entry : 
assignments.entrySet()) {
             TopicIdPartition topicPartition = entry.getKey();
-            Uuid directoryId = entry.getValue();
+            Uuid directoryId = entry.getValue().directoryId();
             DirectoryData directory = 
directoryMap.computeIfAbsent(directoryId, d -> new 
DirectoryData().setId(directoryId));
             TopicData topic = topicMap.computeIfAbsent(directoryId, d -> new 
HashMap<>())
                     .computeIfAbsent(topicPartition.topicId(), topicId -> {
@@ -477,12 +478,8 @@ public class AssignmentsManager {
             topic.partitions().add(partition);
         }
         return new AssignReplicasToDirsRequestData()
-                .setBrokerId(brokerId)
+                .setBrokerId(nodeId)
                 .setBrokerEpoch(brokerEpoch)
                 .setDirectories(new ArrayList<>(directoryMap.values()));
     }
-
-    private static int getMapSize(Map<TopicIdPartition, AssignmentEvent> map) {
-        return map == null ? 0 : map.size();
-    }
 }
diff --git 
a/server/src/main/java/org/apache/kafka/server/AssignmentsManagerDeadlineFunction.java
 
b/server/src/main/java/org/apache/kafka/server/AssignmentsManagerDeadlineFunction.java
new file mode 100644
index 00000000000..bf9edf94fac
--- /dev/null
+++ 
b/server/src/main/java/org/apache/kafka/server/AssignmentsManagerDeadlineFunction.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server;
+
+import org.apache.kafka.common.utils.ExponentialBackoff;
+
+import java.util.OptionalLong;
+import java.util.function.Function;
+
+import static 
org.apache.kafka.common.requests.AssignReplicasToDirsRequest.MAX_ASSIGNMENTS_PER_REQUEST;
+
+/**
+ * This class calculates when the MaybeSendAssignmentsEvent should run for 
AssignmentsManager.
+ */
+public class AssignmentsManagerDeadlineFunction implements 
Function<OptionalLong, OptionalLong> {
+
+    /**
+     * The exponential backoff to use.
+     */
+    private final ExponentialBackoff backoff;
+
+    /**
+     * The current time in monotonic nanoseconds.
+     */
+    private final long nowNs;
+
+    /**
+     * The number of global failures immediately prior to this attempt.
+     */
+    private final int previousGlobalFailures;
+
+    /**
+     * True if there are current inflight requests.
+     */
+    private final boolean hasInflightRequests;
+
+    /**
+     * The number of requests that are ready to send.
+     */
+    private final int numReadyRequests;
+
+    AssignmentsManagerDeadlineFunction(
+        ExponentialBackoff backoff,
+        long nowNs,
+        int previousGlobalFailures,
+        boolean hasInflightRequests,
+        int numReadyRequests
+    ) {
+        this.backoff = backoff;
+        this.nowNs = nowNs;
+        this.previousGlobalFailures = previousGlobalFailures;
+        this.hasInflightRequests = hasInflightRequests;
+        this.numReadyRequests = numReadyRequests;
+    }
+
+    @Override
+    public OptionalLong apply(OptionalLong previousSendTimeNs) {
+        long delayNs;
+        if (previousGlobalFailures > 0) {
+            // If there were global failures (like a response timeout), we 
want to wait for the
+            // full backoff period.
+            delayNs = backoff.backoff(previousGlobalFailures);
+        } else if ((numReadyRequests > MAX_ASSIGNMENTS_PER_REQUEST) && 
!hasInflightRequests) {
+            // If there were no previous failures, and we have lots of 
requests, send it as soon
+            // as possible.
+            delayNs = 0;
+        } else {
+            // Otherwise, use the standard delay period. This helps to promote 
batching, which
+            // reduces load on the controller.
+            delayNs = backoff.initialInterval();
+        }
+        long newSendTimeNs = nowNs + delayNs;
+        if (previousSendTimeNs.isPresent() && previousSendTimeNs.getAsLong() < 
newSendTimeNs) {
+            // If the previous send time was before the new one we calculated, 
go with the
+            // previous one.
+            return previousSendTimeNs;
+        }
+        // Otherwise, return our new send time.
+        return OptionalLong.of(newSendTimeNs);
+    }
+}
diff --git a/server/src/test/java/org/apache/kafka/server/AssignmentTest.java 
b/server/src/test/java/org/apache/kafka/server/AssignmentTest.java
new file mode 100644
index 00000000000..55f18031865
--- /dev/null
+++ b/server/src/test/java/org/apache/kafka/server/AssignmentTest.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server;
+
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.metadata.FeatureLevelRecord;
+import org.apache.kafka.common.metadata.PartitionRecord;
+import org.apache.kafka.common.metadata.TopicRecord;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.image.MetadataProvenance;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.server.common.TopicIdPartition;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class AssignmentTest {
+    private static final MetadataImage TEST_IMAGE;
+
+    static {
+        MetadataDelta delta = new MetadataDelta.Builder().
+            setImage(MetadataImage.EMPTY).
+            build();
+        delta.replay(new FeatureLevelRecord().
+            setName(MetadataVersion.FEATURE_NAME).
+            setFeatureLevel(MetadataVersion.IBP_3_8_IV0.featureLevel()));
+        delta.replay(new TopicRecord().
+            setName("foo").
+            setTopicId(Uuid.fromString("rTudty6ITOCcO_ldVyzZYg")));
+        delta.replay(new PartitionRecord().
+            setPartitionId(0).
+            setTopicId(Uuid.fromString("rTudty6ITOCcO_ldVyzZYg")).
+            setReplicas(Arrays.asList(0, 1, 2)).
+            setIsr(Arrays.asList(0, 1, 2)).
+            setLeader(1));
+        delta.replay(new PartitionRecord().
+            setPartitionId(1).
+            setTopicId(Uuid.fromString("rTudty6ITOCcO_ldVyzZYg")).
+            setReplicas(Arrays.asList(1, 2, 3)).
+            setIsr(Arrays.asList(1, 2, 3)).
+            setLeader(1));
+        TEST_IMAGE = delta.apply(MetadataProvenance.EMPTY);
+    }
+
+    static class NoOpRunnable implements Runnable {
+        static final NoOpRunnable INSTANCE = new NoOpRunnable();
+
+        @Override
+        public void run() {
+        }
+
+        @Override
+        public String toString() {
+            return "NoOpRunnable";
+        }
+    }
+
+    @Test
+    public void testValidAssignment() {
+        assertTrue(new Assignment(
+            new TopicIdPartition(Uuid.fromString("rTudty6ITOCcO_ldVyzZYg"), 0),
+            Uuid.fromString("rzRT8XZaSbKsP6j238zogg"),
+            0,
+            NoOpRunnable.INSTANCE).valid(0, TEST_IMAGE));
+    }
+
+    @Test
+    public void testAssignmentForNonExistentTopicIsNotValid() {
+        assertFalse(new Assignment(
+            new TopicIdPartition(Uuid.fromString("uuOi4qGPSsuM0QwnYINvOw"), 0),
+            Uuid.fromString("rzRT8XZaSbKsP6j238zogg"),
+            0,
+            NoOpRunnable.INSTANCE).valid(0, TEST_IMAGE));
+    }
+
+    @Test
+    public void testAssignmentForNonExistentPartitionIsNotValid() {
+        assertFalse(new Assignment(
+            new TopicIdPartition(Uuid.fromString("rTudty6ITOCcO_ldVyzZYg"), 2),
+            Uuid.fromString("rzRT8XZaSbKsP6j238zogg"),
+            0,
+            NoOpRunnable.INSTANCE).valid(0, TEST_IMAGE));
+    }
+
+    @Test
+    public void testAssignmentReplicaNotOnBrokerIsNotValid() {
+        assertFalse(new Assignment(
+            new TopicIdPartition(Uuid.fromString("rTudty6ITOCcO_ldVyzZYg"), 0),
+            Uuid.fromString("rzRT8XZaSbKsP6j238zogg"),
+            0,
+            NoOpRunnable.INSTANCE).valid(3, TEST_IMAGE));
+    }
+
+    @Test
+    public void testAssignmentToString() {
+        assertEquals("Assignment(topicIdPartition=rTudty6ITOCcO_ldVyzZYg:1, " +
+            "directoryId=rzRT8XZaSbKsP6j238zogg, " +
+            "submissionTimeNs=123, " +
+            "successCallback=NoOpRunnable)",
+            new Assignment(new 
TopicIdPartition(Uuid.fromString("rTudty6ITOCcO_ldVyzZYg"), 1),
+                Uuid.fromString("rzRT8XZaSbKsP6j238zogg"),
+                123,
+                NoOpRunnable.INSTANCE).toString());
+    }
+}
diff --git 
a/server/src/test/java/org/apache/kafka/server/AssignmentsManagerDeadlineFunctionTest.java
 
b/server/src/test/java/org/apache/kafka/server/AssignmentsManagerDeadlineFunctionTest.java
new file mode 100644
index 00000000000..b900c759af9
--- /dev/null
+++ 
b/server/src/test/java/org/apache/kafka/server/AssignmentsManagerDeadlineFunctionTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server;
+
+import org.apache.kafka.common.utils.ExponentialBackoff;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.OptionalLong;
+
+import static 
org.apache.kafka.common.requests.AssignReplicasToDirsRequest.MAX_ASSIGNMENTS_PER_REQUEST;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class AssignmentsManagerDeadlineFunctionTest {
+    private static final ExponentialBackoff BACKOFF = new 
ExponentialBackoff(1000, 2, 8000, 0.0);
+
+    @Test
+    public void applyAfterDispatchInterval() {
+        assertEquals(OptionalLong.of(BACKOFF.initialInterval()),
+            new AssignmentsManagerDeadlineFunction(BACKOFF, 0, 0, false, 12).
+                apply(OptionalLong.empty()));
+    }
+
+    @Test
+    public void applyAfterDispatchIntervalWithExistingEarlierDeadline() {
+        assertEquals(OptionalLong.of(BACKOFF.initialInterval() / 2),
+            new AssignmentsManagerDeadlineFunction(BACKOFF, 0, 0, false, 12).
+                apply(OptionalLong.of(BACKOFF.initialInterval() / 2)));
+    }
+
+    @Test
+    public void applyBackoffInterval() {
+        assertEquals(OptionalLong.of(BACKOFF.initialInterval() * 2),
+            new AssignmentsManagerDeadlineFunction(BACKOFF, 0, 1, false, 12).
+                apply(OptionalLong.empty()));
+    }
+
+    @Test
+    public void applyBackoffIntervalWithExistingEarlierDeadline() {
+        assertEquals(OptionalLong.of(BACKOFF.initialInterval() / 2),
+            new AssignmentsManagerDeadlineFunction(BACKOFF, 0, 1, false, 12).
+                apply(OptionalLong.of(BACKOFF.initialInterval() / 2)));
+    }
+
+    @Test
+    public void scheduleImmediatelyWhenOverloaded() {
+        assertEquals(OptionalLong.of(0),
+            new AssignmentsManagerDeadlineFunction(BACKOFF, 0, 0, false,
+                MAX_ASSIGNMENTS_PER_REQUEST + 1).
+                    apply(OptionalLong.of(BACKOFF.initialInterval() / 2)));
+    }
+
+    @Test
+    public void 
doNotScheduleImmediatelyWhenOverloadedIfThereAreInFlightRequests() {
+        assertEquals(OptionalLong.of(BACKOFF.initialInterval()),
+            new AssignmentsManagerDeadlineFunction(BACKOFF, 0, 0, true,
+                MAX_ASSIGNMENTS_PER_REQUEST + 1).
+                    apply(OptionalLong.empty()));
+    }
+
+    @Test
+    public void 
doNotScheduleImmediatelyWhenOverloadedIfThereArePreviousGlobalFailures() {
+        assertEquals(OptionalLong.of(BACKOFF.initialInterval() * 2),
+            new AssignmentsManagerDeadlineFunction(BACKOFF, 0, 1, false,
+                MAX_ASSIGNMENTS_PER_REQUEST + 1).
+                    apply(OptionalLong.empty()));
+    }
+}
diff --git 
a/server/src/test/java/org/apache/kafka/server/AssignmentsManagerTest.java 
b/server/src/test/java/org/apache/kafka/server/AssignmentsManagerTest.java
index 976fd08f319..f819e37f199 100644
--- a/server/src/test/java/org/apache/kafka/server/AssignmentsManagerTest.java
+++ b/server/src/test/java/org/apache/kafka/server/AssignmentsManagerTest.java
@@ -14,451 +14,501 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.kafka.server;
 
 import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.NodeApiVersions;
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.errors.AuthenticationException;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.message.ApiVersionsResponseData;
 import org.apache.kafka.common.message.AssignReplicasToDirsRequestData;
 import org.apache.kafka.common.message.AssignReplicasToDirsResponseData;
+import org.apache.kafka.common.metadata.FeatureLevelRecord;
+import org.apache.kafka.common.metadata.PartitionRecord;
+import org.apache.kafka.common.metadata.RegisterBrokerRecord;
+import org.apache.kafka.common.metadata.TopicRecord;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.ApiVersionsResponse;
 import org.apache.kafka.common.requests.AssignReplicasToDirsRequest;
 import org.apache.kafka.common.requests.AssignReplicasToDirsResponse;
-import org.apache.kafka.common.utils.MockTime;
-import org.apache.kafka.metadata.AssignmentsHelper;
+import org.apache.kafka.common.utils.ExponentialBackoff;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.image.MetadataProvenance;
+import org.apache.kafka.server.common.MetadataVersion;
 import org.apache.kafka.server.common.TopicIdPartition;
-import org.apache.kafka.server.metrics.KafkaYammerMetrics;
 import org.apache.kafka.test.TestUtils;
 
 import com.yammer.metrics.core.Gauge;
 import com.yammer.metrics.core.Metric;
 import com.yammer.metrics.core.MetricName;
+import com.yammer.metrics.core.MetricsRegistry;
 
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.Timeout;
-import org.mockito.ArgumentCaptor;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.util.AbstractMap;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.Comparator;
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.LinkedBlockingDeque;
 import java.util.function.Function;
-import java.util.function.Supplier;
 
-import static org.apache.kafka.metadata.AssignmentsHelper.buildRequestData;
+import static 
org.apache.kafka.server.AssignmentsManager.QUEUED_REPLICA_TO_DIR_ASSIGNMENTS_METRIC;
 import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.atMostOnce;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
 
 public class AssignmentsManagerTest {
-
+    private static final Logger LOG = 
LoggerFactory.getLogger(AssignmentsManagerTest.class);
     private static final Uuid TOPIC_1 = 
Uuid.fromString("88rnFIqYSZykX4ZSKv81bg");
     private static final Uuid TOPIC_2 = 
Uuid.fromString("VKCnzHdhR5uDQc1shqBYrQ");
+    private static final Uuid TOPIC_3 = 
Uuid.fromString("ZeAwvYt-Ro2suQudGUdbRg");
     private static final Uuid DIR_1 = 
Uuid.fromString("cbgD8WdLQCyzLrFIMBhv3w");
     private static final Uuid DIR_2 = 
Uuid.fromString("zO0bDc0vSuam7Db9iH7rYQ");
     private static final Uuid DIR_3 = 
Uuid.fromString("CGBWbrFkRkeJQy6Aryzq2Q");
 
-    private MockTime time;
-    private NodeToControllerChannelManager channelManager;
-    private AssignmentsManager manager;
-
-    @BeforeEach
-    public void setup() {
-        time = new MockTime();
-        channelManager = mock(NodeToControllerChannelManager.class);
-        Map<Uuid, String> topicNames = new HashMap<>();
-        topicNames.put(TOPIC_1, "TOPIC_1");
-        topicNames.put(TOPIC_2, "TOPIC_2");
-        Map<Uuid, String> dirPaths = new HashMap<>();
-        dirPaths.put(DIR_1, "DIR_1");
-        dirPaths.put(DIR_2, "DIR_2");
-        dirPaths.put(DIR_3, "DIR_3");
-        manager = new AssignmentsManager(time, channelManager, 8, () -> 100L,
-                id -> Optional.ofNullable(dirPaths.get(id)), id -> 
Optional.ofNullable(topicNames.get(id)));
+    private static final MetadataImage TEST_IMAGE;
+
+    static {
+        MetadataDelta delta = new MetadataDelta.Builder().
+            setImage(MetadataImage.EMPTY).
+            build();
+        delta.replay(new FeatureLevelRecord().
+            setName(MetadataVersion.FEATURE_NAME).
+            setFeatureLevel(MetadataVersion.IBP_3_8_IV0.featureLevel()));
+        delta.replay(new RegisterBrokerRecord().
+            setBrokerId(0).
+            setIncarnationId(Uuid.fromString("JJsH6zB0R7eKbr0Sy49ULw")).
+            setBrokerEpoch(123));
+        delta.replay(new RegisterBrokerRecord().
+            setBrokerId(1).
+            setIncarnationId(Uuid.fromString("DtnWclXyQ4qNDvL97JlnvQ")).
+            setBrokerEpoch(456));
+        delta.replay(new RegisterBrokerRecord().
+            setBrokerId(2).
+            setIncarnationId(Uuid.fromString("UFa_RKgLR4mxEXyquEPEmg")).
+            setBrokerEpoch(789));
+        delta.replay(new RegisterBrokerRecord().
+            setBrokerId(3).
+            setIncarnationId(Uuid.fromString("jj-cnHYASAmb_H9JR6nmtQ")).
+            setBrokerEpoch(987));
+        delta.replay(new TopicRecord().
+            setName("foo").
+            setTopicId(TOPIC_1));
+        delta.replay(new PartitionRecord().
+            setPartitionId(0).
+            setTopicId(TOPIC_1).
+            setReplicas(Arrays.asList(0, 1, 2)).
+            setIsr(Arrays.asList(0, 1, 2)).
+            setLeader(1));
+        delta.replay(new PartitionRecord().
+            setPartitionId(1).
+            setTopicId(TOPIC_1).
+            setReplicas(Arrays.asList(1, 2, 3)).
+            setIsr(Arrays.asList(1, 2, 3)).
+            setLeader(1));
+        delta.replay(new TopicRecord().
+            setName("bar").
+            setTopicId(TOPIC_2));
+        delta.replay(new PartitionRecord().
+            setPartitionId(0).
+            setTopicId(TOPIC_2).
+            setReplicas(Arrays.asList(0, 3, 2)).
+            setIsr(Arrays.asList(0, 3, 2)).
+            setLeader(1));
+        delta.replay(new PartitionRecord().
+            setPartitionId(1).
+            setTopicId(TOPIC_2).
+            setReplicas(Arrays.asList(1, 2, 3)).
+            setIsr(Arrays.asList(2)).
+            setLeader(2));
+        delta.replay(new PartitionRecord().
+            setPartitionId(2).
+            setTopicId(TOPIC_2).
+            setReplicas(Arrays.asList(3, 2, 1)).
+            setIsr(Arrays.asList(3, 2, 1)).
+            setLeader(3));
+        TEST_IMAGE = delta.apply(MetadataProvenance.EMPTY);
     }
 
-    @AfterEach
-    void tearDown() throws InterruptedException {
-        manager.close();
-    }
+    static class MockNodeToControllerChannelManager implements 
NodeToControllerChannelManager {
+        LinkedBlockingDeque<Map.Entry<AssignReplicasToDirsRequestData, 
ControllerRequestCompletionHandler>> callbacks =
+            new LinkedBlockingDeque<>();
 
-    AssignReplicasToDirsRequestData normalize(AssignReplicasToDirsRequestData 
request) {
-        request = request.duplicate();
-        request.directories().sort(Comparator.comparing(
-            AssignReplicasToDirsRequestData.DirectoryData::id));
-        for (AssignReplicasToDirsRequestData.DirectoryData directory : 
request.directories()) {
-            directory.topics().sort(Comparator.comparing(
-                AssignReplicasToDirsRequestData.TopicData::topicId));
-            for (AssignReplicasToDirsRequestData.TopicData topic : 
directory.topics()) {
-                topic.partitions().sort(Comparator.comparing(
-                    
AssignReplicasToDirsRequestData.PartitionData::partitionIndex));
-            }
+        @Override
+        public void start() {
         }
-        return request;
-    }
 
+        @Override
+        public void shutdown() {
+        }
 
-    void assertRequestEquals(
-        AssignReplicasToDirsRequestData expected,
-        AssignReplicasToDirsRequestData actual
-    ) {
-        assertEquals(normalize(expected), normalize(actual));
-    }
+        @Override
+        public Optional<NodeApiVersions> controllerApiVersions() {
+            return Optional.empty();
+        }
 
-    @Test
-    void testBuildRequestData() {
-        Map<TopicIdPartition, Uuid> assignment = new HashMap<TopicIdPartition, 
Uuid>() {{
-                put(new TopicIdPartition(TOPIC_1, 1), DIR_1);
-                put(new TopicIdPartition(TOPIC_1, 2), DIR_2);
-                put(new TopicIdPartition(TOPIC_1, 3), DIR_3);
-                put(new TopicIdPartition(TOPIC_1, 4), DIR_1);
-                put(new TopicIdPartition(TOPIC_2, 5), DIR_2);
-            }};
-        AssignReplicasToDirsRequestData built = 
AssignmentsManager.buildRequestData(8, 100L, assignment);
-        AssignReplicasToDirsRequestData expected = new 
AssignReplicasToDirsRequestData()
-            .setBrokerId(8)
-            .setBrokerEpoch(100L)
-            .setDirectories(Arrays.asList(
-                new AssignReplicasToDirsRequestData.DirectoryData()
-                    .setId(DIR_2)
-                    .setTopics(Arrays.asList(
-                        new AssignReplicasToDirsRequestData.TopicData()
-                            .setTopicId(TOPIC_1)
-                            .setPartitions(Collections.singletonList(
-                                    new 
AssignReplicasToDirsRequestData.PartitionData()
-                                            .setPartitionIndex(2))),
-                new AssignReplicasToDirsRequestData.TopicData()
-                    .setTopicId(TOPIC_2)
-                    .setPartitions(Collections.singletonList(
-                            new AssignReplicasToDirsRequestData.PartitionData()
-                                    .setPartitionIndex(5))))),
-            new AssignReplicasToDirsRequestData.DirectoryData()
-                .setId(DIR_3)
-                .setTopics(Collections.singletonList(
-                    new AssignReplicasToDirsRequestData.TopicData()
-                        .setTopicId(TOPIC_1)
-                        .setPartitions(Collections.singletonList(
-                            new AssignReplicasToDirsRequestData.PartitionData()
-                                    .setPartitionIndex(3))))),
-            new AssignReplicasToDirsRequestData.DirectoryData()
-                .setId(DIR_1)
-                .setTopics(Collections.singletonList(
-                    new AssignReplicasToDirsRequestData.TopicData()
-                        .setTopicId(TOPIC_1)
-                        .setPartitions(Arrays.asList(
-                            new AssignReplicasToDirsRequestData.PartitionData()
-                                .setPartitionIndex(4),
-                            new AssignReplicasToDirsRequestData.PartitionData()
-                                .setPartitionIndex(1)))))));
-        assertRequestEquals(expected, built);
-    }
+        @Override
+        public void sendRequest(
+            AbstractRequest.Builder<? extends AbstractRequest> request,
+            ControllerRequestCompletionHandler callback
+        ) {
+            AssignReplicasToDirsRequest inputRequest = 
(AssignReplicasToDirsRequest) request.build();
+            synchronized (this) {
+                callbacks.add(new 
AbstractMap.SimpleEntry<>(inputRequest.data(), callback));
+            }
+        }
 
-    @Test
-    public void testAssignmentAggregation() throws InterruptedException {
-        CountDownLatch readyToAssert = new CountDownLatch(1);
-        doAnswer(invocation -> {
-            readyToAssert.countDown();
-            return null;
-        
}).when(channelManager).sendRequest(any(AssignReplicasToDirsRequest.Builder.class),
-            any(ControllerRequestCompletionHandler.class));
-
-        manager.onAssignment(new TopicIdPartition(TOPIC_1, 1), DIR_1, 
"testAssignmentAggregation", () -> { });
-        manager.onAssignment(new TopicIdPartition(TOPIC_1, 2), DIR_2, 
"testAssignmentAggregation", () -> { });
-        manager.onAssignment(new TopicIdPartition(TOPIC_1, 3), DIR_3, 
"testAssignmentAggregation", () -> { });
-        manager.onAssignment(new TopicIdPartition(TOPIC_1, 4), DIR_1, 
"testAssignmentAggregation", () -> { });
-        manager.onAssignment(new TopicIdPartition(TOPIC_2, 5), DIR_2, 
"testAssignmentAggregation", () -> { });
-        TestUtils.waitForCondition(() -> {
-            time.sleep(100);
-            manager.wakeup();
-            return readyToAssert.await(1, TimeUnit.MILLISECONDS);
-        }, "Timed out waiting for AssignReplicasToDirsRequest to be sent.");
-
-        ArgumentCaptor<AssignReplicasToDirsRequest.Builder> captor =
-            ArgumentCaptor.forClass(AssignReplicasToDirsRequest.Builder.class);
-        verify(channelManager, times(1)).start();
-        verify(channelManager).sendRequest(captor.capture(), 
any(ControllerRequestCompletionHandler.class));
-        verify(channelManager, atMostOnce()).shutdown();
-        verifyNoMoreInteractions(channelManager);
-        assertEquals(1, captor.getAllValues().size());
-        AssignReplicasToDirsRequestData actual = 
captor.getValue().build().data();
-        AssignReplicasToDirsRequestData expected = buildRequestData(
-            8, 100L, new HashMap<TopicIdPartition, Uuid>() {{
-                    put(new TopicIdPartition(TOPIC_1, 1), DIR_1);
-                    put(new TopicIdPartition(TOPIC_1, 2), DIR_2);
-                    put(new TopicIdPartition(TOPIC_1, 3), DIR_3);
-                    put(new TopicIdPartition(TOPIC_1, 4), DIR_1);
-                    put(new TopicIdPartition(TOPIC_2, 5), DIR_2);
-                }}
-        );
-        assertRequestEquals(expected, actual);
+        void completeCallback(Function<AssignReplicasToDirsRequestData, 
Optional<ClientResponse>> completionist) throws InterruptedException {
+            Map.Entry<AssignReplicasToDirsRequestData, 
ControllerRequestCompletionHandler> entry = callbacks.take();
+            Optional<ClientResponse> clientResponse = 
completionist.apply(entry.getKey());
+            if (clientResponse.isPresent()) {
+                entry.getValue().onComplete(clientResponse.get());
+            } else {
+                entry.getValue().onTimeout();
+            }
+        }
     }
 
-    @Test
-    void testRequeuesFailedAssignmentPropagations() throws 
InterruptedException {
-        CountDownLatch readyToAssert = new CountDownLatch(5);
-        doAnswer(invocation -> {
-            readyToAssert.countDown();
-            if (readyToAssert.getCount() == 4) {
-                invocation.getArgument(1, 
ControllerRequestCompletionHandler.class).onTimeout();
-                manager.onAssignment(new TopicIdPartition(TOPIC_1, 2), DIR_3, 
"testRequeuesFailedAssignmentPropagations", () -> { });
+    static class TestEnv implements AutoCloseable {
+        final ExponentialBackoff backoff;
+        final MockNodeToControllerChannelManager channelManager;
+        final MetricsRegistry metricsRegistry = new MetricsRegistry();
+        final AssignmentsManager assignmentsManager;
+        final Map<TopicIdPartition, Integer> successes;
+
+        TestEnv() {
+            this.backoff = new ExponentialBackoff(1, 2, 4, 0);
+            this.channelManager = new MockNodeToControllerChannelManager();
+            this.assignmentsManager = new AssignmentsManager(
+                    backoff, Time.SYSTEM, channelManager, 1, () -> TEST_IMAGE,
+                        t -> t.toString(), metricsRegistry);
+            this.successes = new HashMap<>();
+        }
+
+        void onAssignment(TopicIdPartition topicIdPartition, Uuid directoryId) 
{
+            assignmentsManager.onAssignment(topicIdPartition, directoryId, 
"test", () -> {
+                synchronized (successes) {
+                    successes.put(topicIdPartition, 
successes.getOrDefault(topicIdPartition, 0) + 1);
+                }
+            });
+        }
+
+        int success(TopicIdPartition topicIdPartition) {
+            synchronized (successes) {
+                return successes.getOrDefault(topicIdPartition, 0);
             }
-            if (readyToAssert.getCount() == 3) {
-                invocation.getArgument(1, 
ControllerRequestCompletionHandler.class).onComplete(
-                    new ClientResponse(null, null, null, 0L, 0L, false, false,
-                        new UnsupportedVersionException("test unsupported 
version exception"), null, null));
+        }
 
-                // duplicate should be ignored
-                manager.onAssignment(new TopicIdPartition(TOPIC_1, 2), DIR_3, 
"testRequeuesFailedAssignmentPropagations", () -> { });
+        void successfullyCompleteCallbackOfRequestAssigningTopic1ToDir1() 
throws Exception {
+            channelManager.completeCallback(req -> {
+                AssignReplicasToDirsRequestData.DirectoryData directoryData = 
req.directories().get(0);
+                assertEquals(DIR_1, directoryData.id());
+                AssignReplicasToDirsRequestData.TopicData topicData = 
directoryData.topics().get(0);
+                assertEquals(TOPIC_1, topicData.topicId());
+                assertEquals(0, 
topicData.partitions().get(0).partitionIndex());
+                return mockClientResponse(new 
AssignReplicasToDirsResponseData().
+                    setDirectories(Arrays.asList(new 
AssignReplicasToDirsResponseData.DirectoryData().
+                        setId(DIR_1).
+                        setTopics(Arrays.asList(new 
AssignReplicasToDirsResponseData.TopicData().
+                            setTopicId(TOPIC_1).
+                            setPartitions(Arrays.asList(new 
AssignReplicasToDirsResponseData.PartitionData().
+                                setPartitionIndex(0).
+                                setErrorCode((short) 0))))))));
+            });
+        }
 
-                manager.onAssignment(new TopicIdPartition(TOPIC_1, 3),
-                     Uuid.fromString("xHLCnG54R9W3lZxTPnpk1Q"), 
"testRequeuesFailedAssignmentPropagations", () -> { });
+        Metric findMetric(MetricName name) {
+            for (Map.Entry<MetricName, Metric> entry : 
metricsRegistry.allMetrics().entrySet()) {
+                if (name.equals(entry.getKey())) {
+                    return entry.getValue();
+                }
             }
-            if (readyToAssert.getCount() == 2) {
-                invocation.getArgument(1, 
ControllerRequestCompletionHandler.class).onComplete(
-                        new ClientResponse(null, null, null, 0L, 0L, false, 
false, null,
-                                new AuthenticationException("test 
authentication exception"), null)
-                );
-
-                // duplicate should be ignored
-                manager.onAssignment(new TopicIdPartition(TOPIC_1, 3),
-                     Uuid.fromString("xHLCnG54R9W3lZxTPnpk1Q"), 
"testRequeuesFailedAssignmentPropagations", () -> { });
-
-                manager.onAssignment(new TopicIdPartition(TOPIC_1, 4),
-                     Uuid.fromString("RCYu1A0CTa6eEIpuKDOfxw"), 
"testRequeuesFailedAssignmentPropagations", () -> { });
+            throw new IllegalArgumentException("metric named " + name + " not 
found");
+        }
+
+        @SuppressWarnings("unchecked") // do not warn about Gauge typecast.
+        int queuedReplicaToDirAssignments() {
+            Gauge<Integer> queuedReplicaToDirAssignments =
+                    (Gauge<Integer>) 
findMetric(QUEUED_REPLICA_TO_DIR_ASSIGNMENTS_METRIC);
+            return queuedReplicaToDirAssignments.value();
+        }
+
+        @Override
+        public void close() throws Exception {
+            try {
+                assignmentsManager.close();
+            } catch (Exception e) {
+                LOG.error("error shutting down assignmentsManager", e);
             }
-            if (readyToAssert.getCount() == 1) {
-                invocation.getArgument(1, 
ControllerRequestCompletionHandler.class).onComplete(
-                    new ClientResponse(null, null, null, 0L, 0L, false, false, 
null, null,
-                        new AssignReplicasToDirsResponse(new 
AssignReplicasToDirsResponseData()
-                            .setErrorCode(Errors.NOT_CONTROLLER.code())
-                            .setThrottleTimeMs(0))));
+            try {
+                metricsRegistry.shutdown();
+            } catch (Exception e) {
+                LOG.error("error shutting down metricsRegistry", e);
             }
-            return null;
-        
}).when(channelManager).sendRequest(any(AssignReplicasToDirsRequest.Builder.class),
-            any(ControllerRequestCompletionHandler.class));
-
-        manager.onAssignment(new TopicIdPartition(TOPIC_1, 1), DIR_1, 
"testRequeuesFailedAssignmentPropagations", () -> { });
-        TestUtils.waitForCondition(() -> {
-            time.sleep(TimeUnit.SECONDS.toMillis(1));
-            manager.wakeup();
-            return readyToAssert.await(1, TimeUnit.MILLISECONDS);
-        }, "Timed out waiting for AssignReplicasToDirsRequest to be sent.");
-
-        ArgumentCaptor<AssignReplicasToDirsRequest.Builder> captor =
-            ArgumentCaptor.forClass(AssignReplicasToDirsRequest.Builder.class);
-        verify(channelManager, times(1)).start();
-        verify(channelManager, times(5)).sendRequest(captor.capture(),
-            any(ControllerRequestCompletionHandler.class));
-        verify(channelManager, atMostOnce()).shutdown();
-        verifyNoMoreInteractions(channelManager);
-        assertEquals(5, captor.getAllValues().size());
-        assertRequestEquals(buildRequestData(
-            8, 100L, new HashMap<TopicIdPartition, Uuid>() {{
-                    put(new TopicIdPartition(TOPIC_1, 1), DIR_1);
-                }}
-        ), captor.getAllValues().get(0).build().data());
-        assertRequestEquals(buildRequestData(
-            8, 100L, new HashMap<TopicIdPartition, Uuid>() {{
-                    put(new TopicIdPartition(TOPIC_1, 1), DIR_1);
-                    put(new TopicIdPartition(TOPIC_1, 2), DIR_3);
-                }}
-        ), captor.getAllValues().get(1).build().data());
-        assertRequestEquals(buildRequestData(
-            8, 100L, new HashMap<TopicIdPartition, Uuid>() {{
-                    put(new TopicIdPartition(TOPIC_1, 1), DIR_1);
-                    put(new TopicIdPartition(TOPIC_1, 2), DIR_3);
-                    put(new TopicIdPartition(TOPIC_1, 3), 
Uuid.fromString("xHLCnG54R9W3lZxTPnpk1Q"));
-                    put(new TopicIdPartition(TOPIC_1, 4), 
Uuid.fromString("RCYu1A0CTa6eEIpuKDOfxw"));
-                }}
-        ), captor.getAllValues().get(4).build().data());
+        }
+    }
+
+    static Optional<ClientResponse> 
mockClientResponse(AssignReplicasToDirsResponseData data) {
+        return Optional.of(new ClientResponse(null, null, "", 0, 0, false,
+            null, null, new AssignReplicasToDirsResponse(data)));
+    }
+
+    @Test
+    public void testStartAndShutdown() throws Exception {
+        try (TestEnv testEnv = new TestEnv()) {
+        }
     }
 
-    @Timeout(30)
     @Test
-    void testOnCompletion() throws Exception {
-        CountDownLatch readyToAssert = new CountDownLatch(300);
-        doAnswer(invocation -> {
-            AssignReplicasToDirsRequestData request = 
invocation.getArgument(0, 
AssignReplicasToDirsRequest.Builder.class).build().data();
-            ControllerRequestCompletionHandler completionHandler = 
invocation.getArgument(1, ControllerRequestCompletionHandler.class);
-            completionHandler.onComplete(buildSuccessfulResponse(request));
-
-            return null;
-        
}).when(channelManager).sendRequest(any(AssignReplicasToDirsRequest.Builder.class),
-                any(ControllerRequestCompletionHandler.class));
-
-        for (int i = 0; i < 300; i++) {
-            manager.onAssignment(new TopicIdPartition(TOPIC_1, i % 5), DIR_1, 
"testOnCompletion", readyToAssert::countDown);
+    public void testSuccessfulAssignment() throws Exception {
+        try (TestEnv testEnv = new TestEnv()) {
+            assertEquals(0, testEnv.queuedReplicaToDirAssignments());
+            testEnv.onAssignment(new TopicIdPartition(TOPIC_1, 0), DIR_1);
+            TestUtils.retryOnExceptionWithTimeout(60_000, () -> {
+                assertEquals(1, testEnv.assignmentsManager.numPending());
+                assertEquals(1, testEnv.queuedReplicaToDirAssignments());
+            });
+            assertEquals(0, 
testEnv.assignmentsManager.previousGlobalFailures());
+            assertEquals(1, testEnv.assignmentsManager.numInFlight());
+            
testEnv.successfullyCompleteCallbackOfRequestAssigningTopic1ToDir1();
+            TestUtils.retryOnExceptionWithTimeout(60_000, () -> {
+                assertEquals(0, testEnv.assignmentsManager.numPending());
+                assertEquals(0, testEnv.queuedReplicaToDirAssignments());
+                assertEquals(1, testEnv.success(new TopicIdPartition(TOPIC_1, 
0)));
+            });
+            assertEquals(0, 
testEnv.assignmentsManager.previousGlobalFailures());
         }
+    }
 
-        TestUtils.waitForCondition(() -> {
-            time.sleep(TimeUnit.SECONDS.toMillis(1));
-            manager.wakeup();
-            return readyToAssert.await(1, TimeUnit.MILLISECONDS);
-        }, "Timed out waiting for AssignReplicasToDirsRequest to be sent.");
+    @ParameterizedTest
+    @ValueSource(strings = {"invalidRequest", "timeout"})
+    public void testUnSuccessfulRequestCausesRetransmission(String 
failureType) throws Exception {
+        try (TestEnv testEnv = new TestEnv()) {
+            testEnv.onAssignment(new TopicIdPartition(TOPIC_1, 0), DIR_1);
+            TestUtils.retryOnExceptionWithTimeout(60_000, () -> {
+                assertEquals(1, testEnv.assignmentsManager.numPending());
+            });
+            if (failureType.equals("invalidRequest")) {
+                testEnv.channelManager.completeCallback(req -> {
+                    return mockClientResponse(new 
AssignReplicasToDirsResponseData().
+                        setErrorCode(Errors.INVALID_REQUEST.code()));
+                });
+            } else if (failureType.equals("timeout")) {
+                testEnv.channelManager.completeCallback(req -> 
Optional.empty());
+            }
+            TestUtils.retryOnExceptionWithTimeout(60_000, () -> {
+                assertEquals(1, testEnv.assignmentsManager.numPending());
+                assertEquals(0, testEnv.success(new TopicIdPartition(TOPIC_1, 
0)));
+            });
+            assertEquals(1, 
testEnv.assignmentsManager.previousGlobalFailures());
+            
testEnv.successfullyCompleteCallbackOfRequestAssigningTopic1ToDir1();
+            TestUtils.retryOnExceptionWithTimeout(60_000, () -> {
+                assertEquals(0, testEnv.assignmentsManager.numPending());
+                assertEquals(1, testEnv.success(new TopicIdPartition(TOPIC_1, 
0)));
+            });
+            assertEquals(0, 
testEnv.assignmentsManager.previousGlobalFailures());
+        }
     }
 
-    private static ClientResponse 
buildSuccessfulResponse(AssignReplicasToDirsRequestData request) {
-        return buildResponse(request, topicIdPartition -> Errors.NONE);
+    @ParameterizedTest
+    @ValueSource(strings = {"missingTopic", "missingPartition", "notReplica"})
+    public void testMismatchedInputDoesNotTriggerCompletion(String 
mismatchType) throws Exception {
+        try (TestEnv testEnv = new TestEnv()) {
+            TopicIdPartition target;
+            if (mismatchType.equals("missingTopic")) {
+                target = new TopicIdPartition(TOPIC_3, 0);
+            } else if (mismatchType.equals("missingPartition")) {
+                target = new TopicIdPartition(TOPIC_1, 2);
+            } else if (mismatchType.equals("notReplica")) {
+                target = new TopicIdPartition(TOPIC_2, 0);
+            } else {
+                throw new RuntimeException("invalid mismatchType argument.");
+            }
+            testEnv.onAssignment(target, DIR_1);
+            TestUtils.retryOnExceptionWithTimeout(60_000, () -> {
+                assertEquals(0, testEnv.assignmentsManager.numPending());
+                assertEquals(0, testEnv.success(target));
+            });
+        }
     }
 
-    private static ClientResponse 
buildResponse(AssignReplicasToDirsRequestData request,
-                                                Function<TopicIdPartition, 
Errors> perPartitionError) {
-        Map<Uuid, Map<TopicIdPartition, Errors>> errors = new HashMap<>();
-        for (AssignReplicasToDirsRequestData.DirectoryData directory : 
request.directories()) {
-            for (AssignReplicasToDirsRequestData.TopicData topic : 
directory.topics()) {
-                for (AssignReplicasToDirsRequestData.PartitionData partition : 
topic.partitions()) {
-                    TopicIdPartition topicIdPartition = new 
TopicIdPartition(topic.topicId(), partition.partitionIndex());
-                    Errors error = perPartitionError.apply(topicIdPartition);
-                    if (error == null) {
-                        error = Errors.NONE;
+    @ParameterizedTest
+    @ValueSource(strings = {"missingResult", "errorResult"})
+    public void testOneAssignmentFailsOneSucceeds(String failureType) throws 
Exception {
+        try (TestEnv testEnv = new TestEnv()) {
+            testEnv.onAssignment(new TopicIdPartition(TOPIC_1, 0), DIR_1);
+            testEnv.onAssignment(new TopicIdPartition(TOPIC_1, 1), DIR_1);
+            TestUtils.retryOnExceptionWithTimeout(60_000, () -> {
+                assertEquals(2, testEnv.assignmentsManager.numPending());
+                assertEquals(0, testEnv.success(new TopicIdPartition(TOPIC_1, 
0)));
+                assertEquals(0, testEnv.success(new TopicIdPartition(TOPIC_1, 
1)));
+            });
+            testEnv.channelManager.completeCallback(req -> {
+                AssignReplicasToDirsRequestData.DirectoryData directoryData = 
req.directories().get(0);
+                assertEquals(DIR_1, directoryData.id());
+                AssignReplicasToDirsRequestData.TopicData topicData = 
directoryData.topics().get(0);
+                assertEquals(TOPIC_1, topicData.topicId());
+                HashSet<Integer> foundPartitions = new HashSet<>();
+                topicData.partitions().forEach(p -> 
foundPartitions.add(p.partitionIndex()));
+                List<AssignReplicasToDirsResponseData.PartitionData> 
partitions = new ArrayList<>();
+                if (foundPartitions.contains(0)) {
+                    partitions.add(new 
AssignReplicasToDirsResponseData.PartitionData().
+                        setPartitionIndex(0).
+                        setErrorCode((short) 0));
+                }
+                if (foundPartitions.contains(1)) {
+                    if (failureType.equals("missingResult")) {
+                        // do nothing
+                    } else if (failureType.equals("errorResult")) {
+                        partitions.add(new 
AssignReplicasToDirsResponseData.PartitionData().
+                            setPartitionIndex(1).
+                            
setErrorCode(Errors.NOT_LEADER_OR_FOLLOWER.code()));
+                    } else {
+                        throw new RuntimeException("invalid failureType 
argument.");
                     }
-                    errors.computeIfAbsent(directory.id(), d -> new 
HashMap<>()).put(topicIdPartition, error);
                 }
-            }
+                return mockClientResponse(new 
AssignReplicasToDirsResponseData().
+                    setDirectories(Arrays.asList(new 
AssignReplicasToDirsResponseData.DirectoryData().
+                        setId(DIR_1).
+                        setTopics(Arrays.asList(new 
AssignReplicasToDirsResponseData.TopicData().
+                            setTopicId(TOPIC_1).
+                            setPartitions(partitions))))));
+            });
+            TestUtils.retryOnExceptionWithTimeout(60_000, () -> {
+                assertEquals(1, testEnv.assignmentsManager.numPending());
+                assertEquals(1, testEnv.assignmentsManager.numInFlight());
+                assertEquals(1, testEnv.success(new TopicIdPartition(TOPIC_1, 
0)));
+                assertEquals(0, testEnv.success(new TopicIdPartition(TOPIC_1, 
1)));
+                assertEquals(0, 
testEnv.assignmentsManager.previousGlobalFailures());
+            });
         }
-        AssignReplicasToDirsResponseData responseData = 
AssignmentsHelper.buildResponseData(Errors.NONE.code(), 0, errors);
-        return new ClientResponse(null, null, null,
-                0L, 0L, false, false, null, null,
-                new AssignReplicasToDirsResponse(responseData));
     }
 
     @Test
-    public void testAssignmentCompaction() throws Exception {
-        // Delay the first controller response to force assignment compaction 
logic
-        CompletableFuture<Runnable> completionFuture = new 
CompletableFuture<>();
-        doAnswer(invocation -> {
-            AssignReplicasToDirsRequestData request = 
invocation.getArgument(0, 
AssignReplicasToDirsRequest.Builder.class).build().data();
-            ControllerRequestCompletionHandler completionHandler = 
invocation.getArgument(1, ControllerRequestCompletionHandler.class);
-            ClientResponse response = buildSuccessfulResponse(request);
-            Runnable completion = () -> completionHandler.onComplete(response);
-            if (completionFuture.isDone()) completion.run();
-            else completionFuture.complete(completion);
-            return null;
-        
}).when(channelManager).sendRequest(any(AssignReplicasToDirsRequest.Builder.class),
-                any(ControllerRequestCompletionHandler.class));
-
-        CountDownLatch remainingInvocations = new CountDownLatch(20);
-        Runnable onComplete = () -> {
-            assertTrue(completionFuture.isDone(), "Premature invocation");
-            assertTrue(remainingInvocations.getCount() > 0, "Extra 
invocation");
-            remainingInvocations.countDown();
-        };
-        Uuid[] dirs = {DIR_1, DIR_2, DIR_3};
-        for (int i = 0; i < remainingInvocations.getCount(); i++) {
-            time.sleep(100);
-            manager.onAssignment(new TopicIdPartition(TOPIC_1, 0), dirs[i % 
3], "testAssignmentCompaction", onComplete);
-        }
-        activeWait(completionFuture::isDone);
-        completionFuture.get().run();
-        activeWait(() -> remainingInvocations.getCount() == 0);
+    public void testGlobalResponseErrorTimeout() {
+        assertEquals(Optional.of("Timeout"),
+            AssignmentsManager.globalResponseError(Optional.empty()));
     }
 
-    void activeWait(Supplier<Boolean> predicate) throws InterruptedException {
-        TestUtils.waitForCondition(() -> {
-            boolean conditionSatisfied = predicate.get();
-            if (!conditionSatisfied) {
-                time.sleep(100);
-                manager.wakeup();
-            }
-            return conditionSatisfied;
-        }, TestUtils.DEFAULT_MAX_WAIT_MS, 50, null);
+    @Test
+    public void testNoGlobalResponseError() {
+        assertEquals(Optional.empty(),
+            AssignmentsManager.globalResponseError(Optional.of(
+                new ClientResponse(null, null, "", 0, 0, false, null,
+                    null, new AssignReplicasToDirsResponse(
+                        new AssignReplicasToDirsResponseData())))));
     }
 
-    static Metric findMetric(String name) {
-        for (Map.Entry<MetricName, Metric> entry : 
KafkaYammerMetrics.defaultRegistry().allMetrics().entrySet()) {
-            MetricName metricName = entry.getKey();
-            if 
(AssignmentsManager.class.getSimpleName().equals(metricName.getType()) && 
metricName.getName().equals(name)) {
-                return entry.getValue();
-            }
-        }
-        throw new IllegalArgumentException("metric named " + name + " not 
found");
+    @Test
+    public void testGlobalResponseErrorAuthenticationException() {
+        assertEquals(Optional.of("AuthenticationException"),
+            AssignmentsManager.globalResponseError(Optional.of(
+                new ClientResponse(null, null, "", 0, 0, false, null,
+                    new AuthenticationException("failed"), null))));
     }
 
-    @SuppressWarnings("unchecked")
     @Test
-    void testQueuedReplicaToDirAssignmentsMetric() throws Exception {
-        CountDownLatch readyToAssert = new CountDownLatch(1);
-        doAnswer(invocation -> {
-            readyToAssert.countDown();
-            return null;
-        
}).when(channelManager).sendRequest(any(AssignReplicasToDirsRequest.Builder.class),
 any(ControllerRequestCompletionHandler.class));
-
-        Gauge<Integer> queuedReplicaToDirAssignments = (Gauge<Integer>) 
findMetric(AssignmentsManager.QUEUE_REPLICA_TO_DIR_ASSIGNMENTS_METRIC_NAME);
-        assertEquals(0, queuedReplicaToDirAssignments.value());
-
-        for (int i = 0; i < 4; i++) {
-            manager.onAssignment(new TopicIdPartition(TOPIC_1, i), DIR_1, 
"testQueuedReplicaToDirAssignmentsMetric", () -> { });
-        }
-        TestUtils.waitForCondition(() -> {
-            time.sleep(100);
-            return readyToAssert.await(1, TimeUnit.MILLISECONDS);
-        }, "Timed out waiting for AssignReplicasToDirsRequest to be sent.");
-        assertEquals(4, queuedReplicaToDirAssignments.value());
-
-        for (int i = 4; i < 8; i++) {
-            manager.onAssignment(new TopicIdPartition(TOPIC_1, i), DIR_1, 
"testQueuedReplicaToDirAssignmentsMetric", () -> { });
-        }
-        TestUtils.retryOnExceptionWithTimeout(5_000, () -> assertEquals(8, 
queuedReplicaToDirAssignments.value()));
+    public void testGlobalResponseErrorUnsupportedVersionException() {
+        assertEquals(Optional.of("UnsupportedVersionException"),
+            AssignmentsManager.globalResponseError(Optional.of(
+                new ClientResponse(null, null, "", 0, 0, false,
+                    new UnsupportedVersionException("failed"), null, null))));
     }
 
-    // AssignmentsManager retries to propagate assignments (via 
AssignReplicasToDirsRequest) after failures.
-    // When an assignment fails to propagate with NOT_LEADER_OR_FOLLOWER, 
AssignmentsManager should conclude
-    // that the broker has been removed as a replica for the partition, and 
stop trying to propagate it.
     @Test
-    void testDropsOldAssignments() throws InterruptedException {
-        TopicIdPartition tp1 = new TopicIdPartition(TOPIC_1, 1), tp2 = new 
TopicIdPartition(TOPIC_1, 2);
-        List<AssignReplicasToDirsRequestData> requests = new ArrayList<>();
-        CountDownLatch readyToAssert = new CountDownLatch(2);
-        doAnswer(invocation -> {
-            AssignReplicasToDirsRequestData request = 
invocation.getArgument(0, 
AssignReplicasToDirsRequest.Builder.class).build().data();
-            ControllerRequestCompletionHandler completionHandler = 
invocation.getArgument(1, ControllerRequestCompletionHandler.class);
-            if (readyToAssert.getCount() == 2) {
-                // First request, reply with a partition-level 
NOT_LEADER_OR_FOLLOWER error and queue a different assignment
-                completionHandler.onComplete(buildResponse(request, 
topicIdPartition -> Errors.NOT_LEADER_OR_FOLLOWER));
-                manager.onAssignment(tp2, DIR_1, 
"testDropsOldAssignments-second");
-            }
-            if (readyToAssert.getCount() == 1) {
-                // Second request, reply with success
-                completionHandler.onComplete(buildSuccessfulResponse(request));
-            }
-            requests.add(request);
-            readyToAssert.countDown();
-            return null;
-        }).when(channelManager).sendRequest(any(), any());
-
-        manager.onAssignment(tp1, DIR_1, "testDropsOldAssignments-first");
-        TestUtils.waitForCondition(() -> {
-            time.sleep(TimeUnit.SECONDS.toMillis(1));
-            manager.wakeup();
-            return readyToAssert.await(1, TimeUnit.MILLISECONDS);
-        }, "Timed out waiting for AssignReplicasToDirsRequest to be sent.");
-
-        assertEquals(Arrays.asList(
-                buildRequestData(8, 100, new HashMap<TopicIdPartition, Uuid>() 
{{
-                        put(tp1, DIR_1);
-                    }}),
-                // Even though the controller replied with 
NOT_LEADER_OR_FOLLOWER, the second request does not include
-                // partition 1, meaning AssignmentManager dropped (no longer 
retries) the assignment.
-                buildRequestData(8, 100, new HashMap<TopicIdPartition, Uuid>() 
{{
-                        put(tp2, DIR_1);
-                    }})
-        ), requests);
+    public void testGlobalResponseErrorDisconnectedTimedOut() {
+        assertEquals(Optional.of("Disonnected[Timeout]"),
+            AssignmentsManager.globalResponseError(Optional.of(
+                new ClientResponse(null, null, "", 0, 0, true, true,
+                   null, null, null))));
+    }
+
+    @Test
+    public void testGlobalResponseErrorEmptyResponse() {
+        assertEquals(Optional.of("EmptyResponse"),
+            AssignmentsManager.globalResponseError(Optional.of(
+                new ClientResponse(null, null, "", 0, 0, false, false,
+                        null, null, null))));
+    }
+
+    @Test
+    public void testGlobalResponseErrorClassCastException() {
+        assertEquals(Optional.of("ClassCastException"),
+            AssignmentsManager.globalResponseError(Optional.of(
+                new ClientResponse(null, null, "", 0, 0, false, false,
+                    null, null, new ApiVersionsResponse(new 
ApiVersionsResponseData())))));
+    }
+
+    @Test
+    public void testGlobalResponseErrorResponseLevelError() {
+        assertEquals(Optional.of("Response-level error: INVALID_REQUEST"),
+            AssignmentsManager.globalResponseError(Optional.of(
+                new ClientResponse(null, null, "", 0, 0, false, false,
+                        null, null, new AssignReplicasToDirsResponse(
+                            new AssignReplicasToDirsResponseData().
+                                
setErrorCode(Errors.INVALID_REQUEST.code()))))));
+    }
+
+    @Test
+    void testBuildRequestData() {
+        Map<TopicIdPartition, Uuid> assignments = new LinkedHashMap<>();
+        assignments.put(new TopicIdPartition(TOPIC_1, 1), DIR_1);
+        assignments.put(new TopicIdPartition(TOPIC_1, 2), DIR_2);
+        assignments.put(new TopicIdPartition(TOPIC_1, 3), DIR_3);
+        assignments.put(new TopicIdPartition(TOPIC_1, 4), DIR_1);
+        assignments.put(new TopicIdPartition(TOPIC_2, 5), DIR_2);
+        Map<TopicIdPartition, Assignment> targetAssignments = new 
LinkedHashMap<>();
+        assignments.entrySet().forEach(e -> targetAssignments.put(e.getKey(),
+            new Assignment(e.getKey(), e.getValue(), 0, () -> { })));
+        AssignReplicasToDirsRequestData built =
+            AssignmentsManager.buildRequestData(8, 100L, targetAssignments);
+        AssignReplicasToDirsRequestData expected = new 
AssignReplicasToDirsRequestData().
+            setBrokerId(8).
+            setBrokerEpoch(100L).
+            setDirectories(Arrays.asList(
+                new AssignReplicasToDirsRequestData.DirectoryData().
+                    setId(DIR_2).
+                    setTopics(Arrays.asList(
+                        new AssignReplicasToDirsRequestData.TopicData().
+                            setTopicId(TOPIC_1).
+                            setPartitions(Collections.singletonList(
+                                new 
AssignReplicasToDirsRequestData.PartitionData().
+                                    setPartitionIndex(2))),
+                new AssignReplicasToDirsRequestData.TopicData().
+                    setTopicId(TOPIC_2).
+                    setPartitions(Collections.singletonList(
+                        new AssignReplicasToDirsRequestData.PartitionData().
+                            setPartitionIndex(5))))),
+                new AssignReplicasToDirsRequestData.DirectoryData().
+                    setId(DIR_3).
+                    setTopics(Collections.singletonList(
+                        new AssignReplicasToDirsRequestData.TopicData().
+                            setTopicId(TOPIC_1).
+                            setPartitions(Collections.singletonList(
+                                new 
AssignReplicasToDirsRequestData.PartitionData().
+                                    setPartitionIndex(3))))),
+                new AssignReplicasToDirsRequestData.DirectoryData().
+                    setId(DIR_1).
+                    setTopics(Collections.singletonList(
+                        new AssignReplicasToDirsRequestData.TopicData().
+                            setTopicId(TOPIC_1).
+                            setPartitions(Arrays.asList(
+                                new 
AssignReplicasToDirsRequestData.PartitionData().
+                                    setPartitionIndex(1),
+                                new 
AssignReplicasToDirsRequestData.PartitionData().
+                                    setPartitionIndex(4)))))));
+        assertEquals(expected, built);
     }
 }

Reply via email to