This is an automated email from the ASF dual-hosted git repository.

adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 6dd80eba2c HDDS-8233. ReplicationManager: Throttle delete container 
commands from over-replication handlers (#4447)
6dd80eba2c is described below

commit 6dd80eba2c9f448d8985701af9647811aa920205
Author: Stephen O'Donnell <[email protected]>
AuthorDate: Wed Mar 22 15:31:16 2023 +0000

    HDDS-8233. ReplicationManager: Throttle delete container commands from 
over-replication handlers (#4447)
---
 ....java => CommandTargetOverloadedException.java} |   4 +-
 .../replication/ECOverReplicationHandler.java      |  27 ++++-
 .../replication/ECUnderReplicationHandler.java     |   2 +-
 .../replication/MisReplicationHandler.java         |   2 +-
 .../replication/RatisOverReplicationHandler.java   |  52 +++++++--
 .../replication/RatisUnderReplicationHandler.java  |   2 +-
 .../container/replication/ReplicationManager.java  | 123 +++++++++++++++------
 .../container/replication/ReplicationTestUtil.java |  29 ++++-
 .../replication/TestECMisReplicationHandler.java   |   6 +-
 .../replication/TestECOverReplicationHandler.java  |  80 ++++++++++++--
 .../replication/TestECUnderReplicationHandler.java |   6 +-
 .../replication/TestMisReplicationHandler.java     |   2 +-
 .../TestRatisMisReplicationHandler.java            |   6 +-
 .../TestRatisOverReplicationHandler.java           |  88 ++++++++++++++-
 .../TestRatisUnderReplicationHandler.java          |   2 +-
 .../replication/TestReplicationManager.java        |  57 ++++++++--
 16 files changed, 401 insertions(+), 87 deletions(-)

diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/AllSourcesOverloadedException.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/CommandTargetOverloadedException.java
similarity index 88%
rename from 
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/AllSourcesOverloadedException.java
rename to 
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/CommandTargetOverloadedException.java
index 2b0b490c51..b83db03024 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/AllSourcesOverloadedException.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/CommandTargetOverloadedException.java
@@ -22,9 +22,9 @@ import java.io.IOException;
 /**
  * Exception class used to indicate that all sources are overloaded.
  */
-public class AllSourcesOverloadedException extends IOException {
+public class CommandTargetOverloadedException extends IOException {
 
-  public AllSourcesOverloadedException(String message) {
+  public CommandTargetOverloadedException(String message) {
     super(message);
   }
 
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECOverReplicationHandler.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECOverReplicationHandler.java
index 4db51d5263..a7350dfc54 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECOverReplicationHandler.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECOverReplicationHandler.java
@@ -68,7 +68,7 @@ public class ECOverReplicationHandler extends 
AbstractOverReplicationHandler {
   public int processAndSendCommands(
       Set<ContainerReplica> replicas, List<ContainerReplicaOp> pendingOps,
       ContainerHealthResult result, int remainingMaintenanceRedundancy)
-      throws NotLeaderException {
+      throws NotLeaderException, CommandTargetOverloadedException {
     ContainerInfo container = result.getContainerInfo();
 
     // We are going to check for over replication, so we should filter out any
@@ -154,6 +154,7 @@ public class ECOverReplicationHandler extends 
AbstractOverReplicationHandler {
       replicaIndexCounts.put(r.getReplicaIndex(),
           replicaIndexCounts.getOrDefault(r.getReplicaIndex(), 0) + 1);
     }
+    CommandTargetOverloadedException firstException = null;
     for (ContainerReplica r : replicasToRemove) {
       int currentCount = replicaIndexCounts.getOrDefault(
           r.getReplicaIndex(), 0);
@@ -162,16 +163,32 @@ public class ECOverReplicationHandler extends 
AbstractOverReplicationHandler {
             "for that index to zero. Candidate Replicas: {}", r, candidates);
         continue;
       }
-      replicaIndexCounts.put(r.getReplicaIndex(), currentCount - 1);
-      replicationManager.sendDeleteCommand(container, r.getReplicaIndex(),
-          r.getDatanodeDetails(), true);
-      commandsSent++;
+      try {
+        replicationManager.sendThrottledDeleteCommand(container,
+            r.getReplicaIndex(), r.getDatanodeDetails(), true);
+        replicaIndexCounts.put(r.getReplicaIndex(), currentCount - 1);
+        commandsSent++;
+      } catch (CommandTargetOverloadedException e) {
+        LOG.debug("Unable to send delete command for container {} replica " +
+            "index {} to {}",
+            container.getContainerID(), r.getReplicaIndex(),
+            r.getDatanodeDetails());
+        if (firstException == null) {
+          firstException = e;
+        }
+      }
     }
 
     if (commandsSent == 0) {
       LOG.warn("With the current state of available replicas {}, no" +
           " commands were created to remove excess replicas.", replicas);
     }
+    // If any of the "to remove" replicas were not able to be removed due to
+    // load on the datanodes, then throw the first exception we encountered.
+    // This will allow the container to be re-queued and tried again later.
+    if (firstException != null) {
+      throw firstException;
+    }
     return commandsSent;
   }
 }
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java
index 872835e444..dcdfb8d0a5 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java
@@ -446,7 +446,7 @@ public class ECUnderReplicationHandler implements 
UnhealthyReplicationHandler {
   private void createReplicateCommand(
       ContainerInfo container, Iterator<DatanodeDetails> iterator,
       ContainerReplica replica, ECContainerReplicaCount replicaCount)
-      throws AllSourcesOverloadedException, NotLeaderException {
+      throws CommandTargetOverloadedException, NotLeaderException {
     final boolean push = replicationManager.getConfig().isPush();
     DatanodeDetails source = replica.getDatanodeDetails();
     DatanodeDetails target = iterator.next();
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/MisReplicationHandler.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/MisReplicationHandler.java
index 3d013999b0..dd2dac5cf7 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/MisReplicationHandler.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/MisReplicationHandler.java
@@ -122,7 +122,7 @@ public abstract class MisReplicationHandler implements
       ContainerInfo containerInfo,
       Set<ContainerReplica> replicasToBeReplicated,
       List<DatanodeDetails> targetDns)
-      throws AllSourcesOverloadedException, NotLeaderException {
+      throws CommandTargetOverloadedException, NotLeaderException {
     int commandsSent = 0;
     int datanodeIdx = 0;
     for (ContainerReplica replica : replicasToBeReplicated) {
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisOverReplicationHandler.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisOverReplicationHandler.java
index f1796f503e..55108ff6b8 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisOverReplicationHandler.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisOverReplicationHandler.java
@@ -248,7 +248,7 @@ public class RatisOverReplicationHandler
 
   private int createCommands(
       ContainerInfo containerInfo, List<ContainerReplica> replicas,
-      int excess) throws NotLeaderException {
+      int excess) throws NotLeaderException, CommandTargetOverloadedException {
 
     /*
     Being in the over replication queue means we have enough replicas that
@@ -256,16 +256,31 @@ public class RatisOverReplicationHandler
     deleted. This might make the container violate placement policy.
      */
     int commandsSent = 0;
+    int initialExcess = excess;
+    CommandTargetOverloadedException firstOverloadedException = null;
     List<ContainerReplica> replicasRemoved = new ArrayList<>();
     for (ContainerReplica replica : replicas) {
       if (excess == 0) {
-        return commandsSent;
+        break;
       }
       if (!ReplicationManager.compareState(
           containerInfo.getState(), replica.getState())) {
-        replicationManager.sendDeleteCommand(containerInfo,
-            replica.getReplicaIndex(), replica.getDatanodeDetails(), true);
-        commandsSent++;
+        // Delete commands are throttled, so they may fail to send. However, 
the
+        // replicas here are not in the same state as the container, so they
+        // must be deleted in preference to "healthy" replicas later. 
Therefore,
+        // if they fail to delete, we continue to mark them as deleted by
+        // reducing the excess so healthy container are not removed later in
+        // this method.
+        try {
+          replicationManager.sendThrottledDeleteCommand(containerInfo,
+              replica.getReplicaIndex(), replica.getDatanodeDetails(), true);
+          commandsSent++;
+        } catch (CommandTargetOverloadedException e) {
+          LOG.debug("Unable to send delete command for a mis-matched state " +
+              "container {} to {} as it has too many pending delete commands",
+              containerInfo.containerID(), replica.getDatanodeDetails());
+          firstOverloadedException = e;
+        }
         replicasRemoved.add(replica);
         excess--;
       }
@@ -281,17 +296,34 @@ public class RatisOverReplicationHandler
     // iterate through replicas in deterministic order
     for (ContainerReplica replica : replicas) {
       if (excess == 0) {
-        return commandsSent;
+        break;
       }
 
       if (super.isPlacementStatusActuallyEqualAfterRemove(replicaSet, replica,
           containerInfo.getReplicationFactor().getNumber())) {
-        replicationManager.sendDeleteCommand(containerInfo,
-            replica.getReplicaIndex(), replica.getDatanodeDetails(), true);
-        commandsSent++;
-        excess--;
+        try {
+          replicationManager.sendThrottledDeleteCommand(containerInfo,
+              replica.getReplicaIndex(), replica.getDatanodeDetails(), true);
+          commandsSent++;
+          excess--;
+        } catch (CommandTargetOverloadedException e) {
+          LOG.debug("Unable to send delete command for container {} to {} as " 
+
+              "it has too many pending delete commands",
+              containerInfo.containerID(), replica.getDatanodeDetails());
+          if (firstOverloadedException == null) {
+            firstOverloadedException = e;
+          }
+        }
       }
     }
+    // If we encountered an overloaded exception, and then did not send as many
+    // delete commands as the original excess number, then it means there must
+    // be some replicas we did not delete when we should have. In this case,
+    // throw the exception so that container is requeued and processed again
+    // later.
+    if (firstOverloadedException != null && commandsSent != initialExcess) {
+      throw firstOverloadedException;
+    }
     return commandsSent;
   }
 
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisUnderReplicationHandler.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisUnderReplicationHandler.java
index 06485d4e71..04b6564579 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisUnderReplicationHandler.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisUnderReplicationHandler.java
@@ -241,7 +241,7 @@ public class RatisUnderReplicationHandler
 
   private int sendReplicationCommands(
       ContainerInfo containerInfo, List<DatanodeDetails> sources,
-      List<DatanodeDetails> targets) throws AllSourcesOverloadedException,
+      List<DatanodeDetails> targets) throws CommandTargetOverloadedException,
       NotLeaderException {
     final boolean push = replicationManager.getConfig().isPush();
     int commandsSent = 0;
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
index d72dca28bc..4c321bc2d9 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.conf.PostConstruct;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ReplicationCommandPriority;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type;
@@ -74,6 +75,7 @@ import java.io.IOException;
 import java.time.Clock;
 import java.time.Duration;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
@@ -177,6 +179,7 @@ public class ReplicationManager implements SCMService {
   private final OverReplicatedProcessor overReplicatedProcessor;
   private final HealthCheck containerCheckChain;
   private final int datanodeReplicationLimit;
+  private final int datanodeDeleteLimit;
 
   /**
    * Constructs ReplicationManager instance with the given configuration.
@@ -228,6 +231,7 @@ public class ReplicationManager implements SCMService {
     this.maintenanceRedundancy = rmConf.maintenanceRemainingRedundancy;
     this.ratisMaintenanceMinReplicas = rmConf.getMaintenanceReplicaMinimum();
     this.datanodeReplicationLimit = rmConf.getDatanodeReplicationLimit();
+    this.datanodeDeleteLimit = rmConf.getDatanodeDeleteLimit();
 
     ecUnderReplicationHandler = new ECUnderReplicationHandler(
         ecContainerPlacement, conf, this);
@@ -468,42 +472,58 @@ public class ReplicationManager implements SCMService {
         scmDeadlineEpochMs, datanodeDeadlineEpochMs);
   }
 
+  /**
+   * Sends delete container command for the given container to the given
+   * datanode, provided that the datanode is not overloaded with delete
+   * container commands. If the datanode is overloaded, an exception will be
+   * thrown.
+   * @param container Container to be deleted
+   * @param replicaIndex Index of the container replica to be deleted
+   * @param datanode  The datanode on which the replica should be deleted
+   * @param force true to force delete a container that is open or not empty
+   * @throws NotLeaderException when this SCM is not the leader
+   * @throws CommandTargetOverloadedException If the target datanode is has too
+   *                                          many pending commands.
+   */
+  public void sendThrottledDeleteCommand(final ContainerInfo container,
+      int replicaIndex, final DatanodeDetails datanode, boolean force)
+      throws NotLeaderException, CommandTargetOverloadedException {
+    List<Pair<Integer, DatanodeDetails>> datanodeWithCommandCount =
+        getAvailableDatanodes(Collections.singletonList(datanode),
+            Type.deleteContainerCommand, datanodeDeleteLimit);
+    if (datanodeWithCommandCount.isEmpty()) {
+      throw new CommandTargetOverloadedException("Cannot schedule a delete " +
+          "container command for container " + container.containerID() +
+          " on datanode " + datanode + " as it has too many pending delete " +
+          "commands");
+    }
+    sendDeleteCommand(container, replicaIndex, datanodeWithCommandCount.get(0)
+        .getRight(), force);
+  }
+
   /**
    * Create a ReplicateContainerCommand for the given container and to push the
    * container to the target datanode. The list of sources are checked to 
ensure
    * the datanode has sufficient capacity to accept the container command, and
    * then the command is sent to the datanode with the fewest pending commands.
-   * If all sources are overloaded, an AllSourcesOverloadedException is thrown.
-   * @param containerID The containerID to be replicated
+   * If all sources are overloaded, a CommandTargetOverloadedException is
+   * thrown.
+   * @param containerInfo The container to be replicated
    * @param sources The list of datanodes that can be used as sources
    * @param target The target datanode where the container should be replicated
    * @param replicaIndex The index of the container replica to be replicated
    * @return A pair containing the datanode that the command was sent to, and
    *         the command created.
-   * @throws AllSourcesOverloadedException
+   * @throws CommandTargetOverloadedException
    */
-  public Pair<DatanodeDetails, SCMCommand<?>>
-      createThrottledReplicationCommand(long containerID,
+  public void sendThrottledReplicationCommand(ContainerInfo containerInfo,
       List<DatanodeDetails> sources, DatanodeDetails target, int replicaIndex)
-      throws AllSourcesOverloadedException {
-    List<Pair<Integer, DatanodeDetails>> sourceWithCmds = new ArrayList<>();
-    for (DatanodeDetails source : sources)  {
-      try {
-        int commandCount = nodeManager.getTotalDatanodeCommandCount(source,
-            Type.replicateContainerCommand);
-        if (commandCount >= datanodeReplicationLimit) {
-          LOG.debug("Source {} has reached the maximum number of queued " +
-              "replication commands ({})", source, datanodeReplicationLimit);
-          continue;
-        }
-        sourceWithCmds.add(Pair.of(commandCount, source));
-      } catch (NodeNotFoundException e) {
-        LOG.error("Node {} not found in NodeManager. Should not happen",
-            source, e);
-      }
-    }
+      throws CommandTargetOverloadedException, NotLeaderException {
+    long containerID = containerInfo.getContainerID();
+    List<Pair<Integer, DatanodeDetails>> sourceWithCmds = 
getAvailableDatanodes(
+        sources, Type.replicateContainerCommand, datanodeReplicationLimit);
     if (sourceWithCmds.isEmpty()) {
-      throw new AllSourcesOverloadedException("No sources with capacity " +
+      throw new CommandTargetOverloadedException("No sources with capacity " +
           "available for replication of container " + containerID + " to " +
           target);
     }
@@ -513,16 +533,42 @@ public class ReplicationManager implements SCMService {
     ReplicateContainerCommand cmd =
         ReplicateContainerCommand.toTarget(containerID, target);
     cmd.setReplicaIndex(replicaIndex);
-    return Pair.of(sourceWithCmds.get(0).getRight(), cmd);
+    sendDatanodeCommand(cmd, containerInfo, sourceWithCmds.get(0).getRight());
   }
 
-  public void sendThrottledReplicationCommand(ContainerInfo containerInfo,
-      List<DatanodeDetails> sources, DatanodeDetails target, int replicaIndex)
-      throws AllSourcesOverloadedException, NotLeaderException {
-    Pair<DatanodeDetails, SCMCommand<?>> cmdPair =
-        createThrottledReplicationCommand(containerInfo.getContainerID(),
-            sources, target, replicaIndex);
-    sendDatanodeCommand(cmdPair.getRight(), containerInfo, cmdPair.getLeft());
+  /**
+   * For the given datanodes and command type, lookup the current queue command
+   * count and return a list of datanodes with the current command count. If
+   * any datanode is at or beyond the limit, then it will not be included in 
the
+   * returned list.
+   * @param datanodes List of datanodes to check for available capacity
+   * @param commandType The Type of datanode command to check the capacity for.
+   * @param limit The limit of commands of that type.
+   * @return List of datanodes with the current command count that are not over
+   *         the limit.
+   */
+  private List<Pair<Integer, DatanodeDetails>> getAvailableDatanodes(
+      List<DatanodeDetails> datanodes,
+      StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type commandType,
+      int limit) {
+    List<Pair<Integer, DatanodeDetails>> datanodeWithCommandCount
+        = new ArrayList<>();
+    for (DatanodeDetails dn : datanodes) {
+      try {
+        int commandCount = nodeManager.getTotalDatanodeCommandCount(dn,
+            commandType);
+        if (commandCount >= limit) {
+          LOG.debug("Datanode {} has reached the maximum number of queued " +
+              "{} commands ({})", dn, commandType, limit);
+          continue;
+        }
+        datanodeWithCommandCount.add(Pair.of(commandCount, dn));
+      } catch (NodeNotFoundException e) {
+        LOG.error("Node {} not found in NodeManager. Should not happen",
+            dn, e);
+      }
+    }
+    return datanodeWithCommandCount;
   }
 
   /**
@@ -1147,6 +1193,21 @@ public class ReplicationManager implements SCMService {
       return datanodeReplicationLimit;
     }
 
+    @Config(key = "datanode.delete.container.limit",
+        type = ConfigType.INT,
+        defaultValue = "40",
+        tags = { SCM, DATANODE },
+        description = "A limit to restrict the total number of delete " +
+            "container commands queued on a datanode. Note this is intended " +
+            "to be a temporary config until we have a more dynamic way of " +
+            "limiting load"
+    )
+    private int datanodeDeleteLimit = 40;
+
+    public int getDatanodeDeleteLimit() {
+      return datanodeDeleteLimit;
+    }
+
     public void setDatanodeReplicationLimit(int limit) {
       this.datanodeReplicationLimit = limit;
     }
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationTestUtil.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationTestUtil.java
index 3345340291..fc6b9c143a 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationTestUtil.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationTestUtil.java
@@ -325,11 +325,11 @@ public final class ReplicationTestUtil {
    * @param mock Mock of ReplicationManager
    * @param commandsSent Set to add the command to rather than sending it.
    * @throws NotLeaderException
-   * @throws AllSourcesOverloadedException
+   * @throws CommandTargetOverloadedException
    */
   public static void mockRMSendThrottleReplicateCommand(ReplicationManager 
mock,
       Set<Pair<DatanodeDetails, SCMCommand<?>>> commandsSent)
-      throws NotLeaderException, AllSourcesOverloadedException {
+      throws NotLeaderException, CommandTargetOverloadedException {
     doAnswer((Answer<Void>) invocationOnMock -> {
       List<DatanodeDetails> sources = invocationOnMock.getArgument(1);
       ContainerInfo containerInfo = invocationOnMock.getArgument(0);
@@ -386,4 +386,29 @@ public final class ReplicationTestUtil {
       return null;
     }).when(mock).sendDeleteCommand(any(), anyInt(), any(), anyBoolean());
   }
+
+  /**
+   * Given a Mockito mock of ReplicationManager, this method will mock the
+   * sendThrottledDeleteCommand method so that it adds the command created to
+   * the commandsSent set.
+   * @param mock Mock of ReplicationManager
+   * @param commandsSent Set to add the command to rather than sending it.
+   * @throws NotLeaderException
+   */
+  public static void mockRMSendThrottledDeleteCommand(ReplicationManager mock,
+      Set<Pair<DatanodeDetails, SCMCommand<?>>> commandsSent)
+      throws NotLeaderException, CommandTargetOverloadedException {
+    doAnswer((Answer<Void>) invocationOnMock -> {
+      ContainerInfo containerInfo = invocationOnMock.getArgument(0);
+      int replicaIndex = invocationOnMock.getArgument(1);
+      DatanodeDetails target = invocationOnMock.getArgument(2);
+      boolean forceDelete = invocationOnMock.getArgument(3);
+      DeleteContainerCommand deleteCommand = new DeleteContainerCommand(
+          containerInfo.getContainerID(), forceDelete);
+      deleteCommand.setReplicaIndex(replicaIndex);
+      commandsSent.add(Pair.of(target, deleteCommand));
+      return null;
+    }).when(mock)
+        .sendThrottledDeleteCommand(any(), anyInt(), any(), anyBoolean());
+  }
 }
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECMisReplicationHandler.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECMisReplicationHandler.java
index 4c27fce353..a660efa8b1 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECMisReplicationHandler.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECMisReplicationHandler.java
@@ -56,7 +56,7 @@ public class TestECMisReplicationHandler extends 
TestMisReplicationHandler {
 
   @BeforeEach
   public void setup() throws NodeNotFoundException,
-      AllSourcesOverloadedException, NotLeaderException {
+      CommandTargetOverloadedException, NotLeaderException {
     ECReplicationConfig repConfig = new ECReplicationConfig(DATA, PARITY);
     setup(repConfig);
   }
@@ -172,14 +172,14 @@ public class TestECMisReplicationHandler extends 
TestMisReplicationHandler {
   @Test
   public void testAllSourcesOverloaded() throws IOException {
     ReplicationManager replicationManager = getReplicationManager();
-    Mockito.doThrow(new AllSourcesOverloadedException("Overloaded"))
+    Mockito.doThrow(new CommandTargetOverloadedException("Overloaded"))
         .when(replicationManager).sendThrottledReplicationCommand(any(),
             anyList(), any(), anyInt());
     Set<ContainerReplica> availableReplicas = ReplicationTestUtil
         .createReplicas(Pair.of(IN_SERVICE, 1), Pair.of(IN_SERVICE, 2),
             Pair.of(IN_SERVICE, 3), Pair.of(IN_SERVICE, 4),
             Pair.of(IN_SERVICE, 5));
-    assertThrows(AllSourcesOverloadedException.class,
+    assertThrows(CommandTargetOverloadedException.class,
         () -> testMisReplication(availableReplicas, Collections.emptyList(),
             0, 1, 1));
   }
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECOverReplicationHandler.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECOverReplicationHandler.java
index 01d1c3a0d2..b69a1fbc43 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECOverReplicationHandler.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECOverReplicationHandler.java
@@ -40,10 +40,13 @@ import 
org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand;
 import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
 import org.apache.ratis.protocol.exceptions.NotLeaderException;
 import org.junit.Assert;
+import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.mockito.Mockito;
+import org.mockito.stubbing.Answer;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -51,6 +54,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 
 import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.DECOMMISSIONING;
@@ -60,6 +64,9 @@ import static 
org.apache.hadoop.hdds.scm.net.NetConstants.LEAF_SCHEMA;
 import static org.apache.hadoop.hdds.scm.net.NetConstants.RACK_SCHEMA;
 import static org.apache.hadoop.hdds.scm.net.NetConstants.ROOT_SCHEMA;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.Mockito.doAnswer;
 
 /**
  * Tests the ECOverReplicationHandling functionality.
@@ -75,7 +82,8 @@ public class TestECOverReplicationHandler {
   private Set<Pair<DatanodeDetails, SCMCommand<?>>> commandsSent;
 
   @BeforeEach
-  public void setup() throws NodeNotFoundException, NotLeaderException {
+  public void setup() throws NodeNotFoundException, NotLeaderException,
+      CommandTargetOverloadedException {
     staleNode = null;
 
     replicationManager = Mockito.mock(ReplicationManager.class);
@@ -91,7 +99,7 @@ public class TestECOverReplicationHandler {
         });
 
     commandsSent = new HashSet<>();
-    ReplicationTestUtil.mockRMSendDeleteCommand(replicationManager,
+    ReplicationTestUtil.mockRMSendThrottledDeleteCommand(replicationManager,
         commandsSent);
 
     nodeManager = new MockNodeManager(true, 10);
@@ -108,7 +116,7 @@ public class TestECOverReplicationHandler {
 
   @Test
   public void testNoOverReplication()
-      throws NotLeaderException {
+      throws NotLeaderException, CommandTargetOverloadedException {
     Set<ContainerReplica> availableReplicas = ReplicationTestUtil
         .createReplicas(Pair.of(IN_SERVICE, 1),
             Pair.of(IN_SERVICE, 2), Pair.of(IN_SERVICE, 3),
@@ -119,7 +127,7 @@ public class TestECOverReplicationHandler {
 
   @Test
   public void testOverReplicationFixedByPendingDelete()
-      throws NotLeaderException {
+      throws NotLeaderException, CommandTargetOverloadedException {
     Set<ContainerReplica> availableReplicas = ReplicationTestUtil
         .createReplicas(Pair.of(IN_SERVICE, 1),
             Pair.of(IN_SERVICE, 2), Pair.of(IN_SERVICE, 3),
@@ -137,7 +145,7 @@ public class TestECOverReplicationHandler {
 
   @Test
   public void testOverReplicationWithDecommissionIndexes()
-      throws NotLeaderException {
+      throws NotLeaderException, CommandTargetOverloadedException {
     Set<ContainerReplica> availableReplicas = ReplicationTestUtil
         .createReplicas(Pair.of(IN_SERVICE, 1),
             Pair.of(IN_SERVICE, 2), Pair.of(IN_SERVICE, 3),
@@ -149,7 +157,7 @@ public class TestECOverReplicationHandler {
 
   @Test
   public void testOverReplicationWithStaleIndexes()
-      throws NotLeaderException {
+      throws NotLeaderException, CommandTargetOverloadedException {
     Set<ContainerReplica> availableReplicas = ReplicationTestUtil
         .createReplicas(Pair.of(IN_SERVICE, 1),
             Pair.of(IN_SERVICE, 2), Pair.of(IN_SERVICE, 3),
@@ -167,7 +175,7 @@ public class TestECOverReplicationHandler {
 
   @Test
   public void testOverReplicationWithOpenReplica()
-      throws NotLeaderException {
+      throws NotLeaderException, CommandTargetOverloadedException {
     Set<ContainerReplica> availableReplicas = ReplicationTestUtil
         .createReplicas(Pair.of(IN_SERVICE, 1),
             Pair.of(IN_SERVICE, 2), Pair.of(IN_SERVICE, 3),
@@ -187,7 +195,7 @@ public class TestECOverReplicationHandler {
    */
   @Test
   public void testOverReplicationButPolicyReturnsWrongIndexes()
-      throws NotLeaderException {
+      throws NotLeaderException, CommandTargetOverloadedException {
     Set<ContainerReplica> availableReplicas = ReplicationTestUtil
         .createReplicas(Pair.of(IN_SERVICE, 2), Pair.of(IN_SERVICE, 3),
             Pair.of(IN_SERVICE, 4), Pair.of(IN_SERVICE, 5),
@@ -205,7 +213,7 @@ public class TestECOverReplicationHandler {
 
   @Test
   public void testOverReplicationWithOneSameIndexes()
-      throws NotLeaderException {
+      throws NotLeaderException, CommandTargetOverloadedException {
     Set<ContainerReplica> availableReplicas = ReplicationTestUtil
         .createReplicas(Pair.of(IN_SERVICE, 1),
             Pair.of(IN_SERVICE, 1), Pair.of(IN_SERVICE, 1),
@@ -220,7 +228,7 @@ public class TestECOverReplicationHandler {
 
   @Test
   public void testOverReplicationWithMultiSameIndexes()
-      throws NotLeaderException {
+      throws NotLeaderException, CommandTargetOverloadedException {
     Set<ContainerReplica> availableReplicas = ReplicationTestUtil
         .createReplicas(Pair.of(IN_SERVICE, 1),
             Pair.of(IN_SERVICE, 1), Pair.of(IN_SERVICE, 1),
@@ -244,7 +252,7 @@ public class TestECOverReplicationHandler {
    */
   @Test
   public void testOverReplicationWithUnderReplication()
-      throws NotLeaderException {
+      throws NotLeaderException, CommandTargetOverloadedException {
     Set<ContainerReplica> availableReplicas = ReplicationTestUtil
         .createReplicas(
             Pair.of(IN_SERVICE, 1), Pair.of(IN_SERVICE, 1),
@@ -267,10 +275,58 @@ public class TestECOverReplicationHandler {
     Assert.assertEquals(1, ((DeleteContainerCommand)cmd).getReplicaIndex());
   }
 
+  @Test
+  public void testDeleteThrottling() throws IOException {
+    Set<ContainerReplica> availableReplicas = ReplicationTestUtil
+        .createReplicas(
+            Pair.of(IN_SERVICE, 1), Pair.of(IN_SERVICE, 1),
+            Pair.of(IN_SERVICE, 2), Pair.of(IN_SERVICE, 2),
+            Pair.of(IN_SERVICE, 3),
+            Pair.of(IN_SERVICE, 4),
+            Pair.of(IN_SERVICE, 5));
+
+    ContainerHealthResult.UnderReplicatedHealthResult health =
+        new ContainerHealthResult.UnderReplicatedHealthResult(
+            container, 2, false, false, false);
+
+    // On the first call to throttled delete, throw an overloaded exception.
+    final AtomicBoolean shouldThrow = new AtomicBoolean(true);
+    // On the first call we throw, on subsequent calls we succeed.
+    doAnswer((Answer<Void>) invocationOnMock -> {
+      if (shouldThrow.get()) {
+        shouldThrow.set(false);
+        throw new CommandTargetOverloadedException("Test exception");
+      }
+      ContainerInfo containerInfo = invocationOnMock.getArgument(0);
+      int replicaIndex = invocationOnMock.getArgument(1);
+      DatanodeDetails target = invocationOnMock.getArgument(2);
+      boolean forceDelete = invocationOnMock.getArgument(3);
+      DeleteContainerCommand deleteCommand = new DeleteContainerCommand(
+          containerInfo.getContainerID(), forceDelete);
+      deleteCommand.setReplicaIndex(replicaIndex);
+      commandsSent.add(Pair.of(target, deleteCommand));
+      return null;
+    }).when(replicationManager)
+        .sendThrottledDeleteCommand(any(), anyInt(), any(), anyBoolean());
+
+    ECOverReplicationHandler ecORH =
+        new ECOverReplicationHandler(policy, replicationManager);
+
+    try {
+      ecORH.processAndSendCommands(availableReplicas, ImmutableList.of(),
+          health, 1);
+      Assertions.fail("Expected CommandTargetOverloadedException");
+    } catch (CommandTargetOverloadedException e) {
+      // This is expected.
+    }
+    Assert.assertEquals(1, commandsSent.size());
+  }
+
   private void testOverReplicationWithIndexes(
       Set<ContainerReplica> availableReplicas,
       Map<Integer, Integer> index2excessNum,
-      List<ContainerReplicaOp> pendingOps) throws NotLeaderException {
+      List<ContainerReplicaOp> pendingOps) throws NotLeaderException,
+      CommandTargetOverloadedException {
     ECOverReplicationHandler ecORH =
         new ECOverReplicationHandler(policy, replicationManager);
     ContainerHealthResult.OverReplicatedHealthResult result =
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java
index 2605469f21..867bdd15d9 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java
@@ -94,7 +94,7 @@ public class TestECUnderReplicationHandler {
 
   @BeforeEach
   public void setup() throws NodeNotFoundException,
-      AllSourcesOverloadedException, NotLeaderException {
+      CommandTargetOverloadedException, NotLeaderException {
     nodeManager = new MockNodeManager(true, 10) {
       @Override
       public NodeStatus getNodeStatus(DatanodeDetails dd) {
@@ -237,11 +237,11 @@ public class TestECUnderReplicationHandler {
         .createReplicas(Pair.of(DECOMMISSIONING, 1), Pair.of(IN_SERVICE, 2),
             Pair.of(IN_SERVICE, 3), Pair.of(IN_SERVICE, 4),
             Pair.of(IN_SERVICE, 5));
-    doThrow(new AllSourcesOverloadedException("Overloaded"))
+    doThrow(new CommandTargetOverloadedException("Overloaded"))
         .when(replicationManager).sendThrottledReplicationCommand(
             any(), anyList(), any(), anyInt());
 
-    Assertions.assertThrows(AllSourcesOverloadedException.class, () ->
+    Assertions.assertThrows(CommandTargetOverloadedException.class, () ->
         testUnderReplicationWithMissingIndexes(
             Lists.emptyList(), availableReplicas, 1, 0, policy));
   }
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestMisReplicationHandler.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestMisReplicationHandler.java
index 0af6e7008e..9350b72623 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestMisReplicationHandler.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestMisReplicationHandler.java
@@ -70,7 +70,7 @@ public abstract class TestMisReplicationHandler {
   private Set<Pair<DatanodeDetails, SCMCommand<?>>> commandsSent;
 
   protected void setup(ReplicationConfig repConfig)
-      throws NodeNotFoundException, AllSourcesOverloadedException,
+      throws NodeNotFoundException, CommandTargetOverloadedException,
       NotLeaderException {
 
     replicationManager = Mockito.mock(ReplicationManager.class);
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisMisReplicationHandler.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisMisReplicationHandler.java
index 2e1bfadb42..d7a857acee 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisMisReplicationHandler.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisMisReplicationHandler.java
@@ -55,7 +55,7 @@ public class TestRatisMisReplicationHandler extends 
TestMisReplicationHandler {
 
   @BeforeEach
   public void setup() throws NodeNotFoundException,
-      AllSourcesOverloadedException, NotLeaderException {
+      CommandTargetOverloadedException, NotLeaderException {
     RatisReplicationConfig repConfig = RatisReplicationConfig
             .getInstance(ReplicationFactor.THREE);
     setup(repConfig);
@@ -177,14 +177,14 @@ public class TestRatisMisReplicationHandler extends 
TestMisReplicationHandler {
   @Test
   public void testAllSourcesOverloaded() throws IOException {
     ReplicationManager replicationManager = getReplicationManager();
-    Mockito.doThrow(new AllSourcesOverloadedException("Overloaded"))
+    Mockito.doThrow(new CommandTargetOverloadedException("Overloaded"))
         .when(replicationManager).sendThrottledReplicationCommand(any(),
             anyList(), any(), anyInt());
 
     Set<ContainerReplica> availableReplicas = ReplicationTestUtil
         .createReplicas(Pair.of(IN_SERVICE, 0), Pair.of(IN_SERVICE, 0),
             Pair.of(IN_SERVICE, 0));
-    assertThrows(AllSourcesOverloadedException.class,
+    assertThrows(CommandTargetOverloadedException.class,
         () -> testMisReplication(availableReplicas, Collections.emptyList(),
             0, 1, 1));
   }
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisOverReplicationHandler.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisOverReplicationHandler.java
index 0c8ea277af..c83c2a30d9 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisOverReplicationHandler.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisOverReplicationHandler.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hdds.scm.container.ContainerReplica;
 import 
org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementStatusDefault;
 import org.apache.hadoop.hdds.scm.node.NodeStatus;
 import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
+import org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand;
 import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
 import org.apache.ozone.test.GenericTestUtils;
 import org.apache.ratis.protocol.exceptions.NotLeaderException;
@@ -38,6 +39,7 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
+import org.mockito.stubbing.Answer;
 import org.slf4j.event.Level;
 
 import java.io.IOException;
@@ -45,13 +47,20 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 
 import static 
org.apache.hadoop.hdds.scm.container.replication.ReplicationTestUtil.createContainer;
 import static 
org.apache.hadoop.hdds.scm.container.replication.ReplicationTestUtil.createContainerReplica;
 import static 
org.apache.hadoop.hdds.scm.container.replication.ReplicationTestUtil.createReplicas;
 import static 
org.apache.hadoop.hdds.scm.container.replication.ReplicationTestUtil.createReplicasWithSameOrigin;
+import static org.junit.jupiter.api.Assertions.fail;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doThrow;
 
 /**
  * Tests for {@link RatisOverReplicationHandler}.
@@ -65,7 +74,8 @@ public class TestRatisOverReplicationHandler {
   private Set<Pair<DatanodeDetails, SCMCommand<?>>> commandsSent;
 
   @Before
-  public void setup() throws NodeNotFoundException, NotLeaderException {
+  public void setup() throws NodeNotFoundException, NotLeaderException,
+      CommandTargetOverloadedException {
     container = createContainer(HddsProtos.LifeCycleState.CLOSED,
         RATIS_REPLICATION_CONFIG);
 
@@ -83,7 +93,7 @@ public class TestRatisOverReplicationHandler {
         });
 
     commandsSent = new HashSet<>();
-    ReplicationTestUtil.mockRMSendDeleteCommand(replicationManager,
+    ReplicationTestUtil.mockRMSendThrottledDeleteCommand(replicationManager,
         commandsSent);
 
     GenericTestUtils.setLogLevel(RatisOverReplicationHandler.LOG, Level.DEBUG);
@@ -269,6 +279,80 @@ public class TestRatisOverReplicationHandler {
     testProcessing(replicas, pendingOps, getOverReplicatedHealthResult(), 0);
   }
 
+  @Test
+  public void testDeleteThrottlingMisMatchedReplica() throws IOException {
+    Set<ContainerReplica> closedReplicas = createReplicas(
+        container.containerID(), ContainerReplicaProto.State.CLOSED,
+        0, 0, 0, 0);
+
+    ContainerReplica quasiClosedReplica = createContainerReplica(
+        container.containerID(), 0,
+        HddsProtos.NodeOperationalState.IN_SERVICE,
+        ContainerReplicaProto.State.QUASI_CLOSED);
+
+    // When processing the quasi closed replica, simulate an overloaded
+    // exception so that it does not get deleted. Then we can ensure that only
+    // one of the CLOSED replicas is removed.
+    doThrow(CommandTargetOverloadedException.class)
+        .when(replicationManager)
+        .sendThrottledDeleteCommand(Mockito.any(ContainerInfo.class),
+            anyInt(),
+            eq(quasiClosedReplica.getDatanodeDetails()),
+            anyBoolean());
+
+    Set<ContainerReplica> replicas = new HashSet<>();
+    replicas.add(quasiClosedReplica);
+    replicas.addAll(closedReplicas);
+
+    RatisOverReplicationHandler handler =
+        new RatisOverReplicationHandler(policy, replicationManager);
+
+    try {
+      handler.processAndSendCommands(replicas, Collections.emptyList(),
+          getOverReplicatedHealthResult(), 2);
+      fail("Expected CommandTargetOverloadedException");
+    } catch (CommandTargetOverloadedException e) {
+      // Expected
+    }
+    Assert.assertEquals(1, commandsSent.size());
+    Pair<DatanodeDetails, SCMCommand<?>> cmd = commandsSent.iterator().next();
+    Assert.assertNotEquals(quasiClosedReplica.getDatanodeDetails(),
+        cmd.getKey());
+  }
+
+  @Test
+  public void testDeleteThrottling() throws IOException {
+    Set<ContainerReplica> closedReplicas = createReplicas(
+        container.containerID(), ContainerReplicaProto.State.CLOSED,
+        0, 0, 0, 0, 0);
+
+    final AtomicBoolean shouldThrow = new AtomicBoolean(true);
+    // On the first call we throw, on subsequent calls we succeed.
+    doAnswer((Answer<Void>) invocationOnMock -> {
+      if (shouldThrow.get()) {
+        shouldThrow.set(false);
+        throw new CommandTargetOverloadedException("Test exception");
+      }
+      ContainerInfo containerInfo = invocationOnMock.getArgument(0);
+      int replicaIndex = invocationOnMock.getArgument(1);
+      DatanodeDetails target = invocationOnMock.getArgument(2);
+      boolean forceDelete = invocationOnMock.getArgument(3);
+      DeleteContainerCommand deleteCommand = new DeleteContainerCommand(
+          containerInfo.getContainerID(), forceDelete);
+      deleteCommand.setReplicaIndex(replicaIndex);
+      commandsSent.add(Pair.of(target, deleteCommand));
+      return null;
+    }).when(replicationManager)
+        .sendThrottledDeleteCommand(any(), anyInt(), any(), anyBoolean());
+
+    RatisOverReplicationHandler handler =
+        new RatisOverReplicationHandler(policy, replicationManager);
+
+    handler.processAndSendCommands(closedReplicas, Collections.emptyList(),
+        getOverReplicatedHealthResult(), 2);
+    Assert.assertEquals(2, commandsSent.size());
+  }
+
   /**
    * Tests whether the specified expectNumCommands number of commands are
    * created by the handler.
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisUnderReplicationHandler.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisUnderReplicationHandler.java
index 5c37f5c106..f545809107 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisUnderReplicationHandler.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisUnderReplicationHandler.java
@@ -69,7 +69,7 @@ public class TestRatisUnderReplicationHandler {
 
   @Before
   public void setup() throws NodeNotFoundException,
-      AllSourcesOverloadedException, NotLeaderException {
+      CommandTargetOverloadedException, NotLeaderException {
     container = ReplicationTestUtil.createContainer(
         HddsProtos.LifeCycleState.CLOSED, RATIS_REPLICATION_CONFIG);
 
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
index a6d5521db2..62737f15a6 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
@@ -858,7 +858,8 @@ public class TestReplicationManager {
 
   @Test
   public void testCreateThrottledReplicateContainerCommand()
-      throws AllSourcesOverloadedException, NodeNotFoundException {
+      throws CommandTargetOverloadedException, NodeNotFoundException,
+      NotLeaderException {
     Map<DatanodeDetails, Integer> sourceNodes = new HashMap<>();
     DatanodeDetails cmdTarget = MockDatanodeDetails.randomDatanodeDetails();
     sourceNodes.put(cmdTarget, 0);
@@ -873,20 +874,25 @@ public class TestReplicationManager {
           return sourceNodes.get(dn);
         });
 
+    ContainerInfo container = ReplicationTestUtil.createContainerInfo(
+        repConfig, 1, HddsProtos.LifeCycleState.CLOSED, 10, 20);
     DatanodeDetails destination = MockDatanodeDetails.randomDatanodeDetails();
-    Pair<DatanodeDetails, SCMCommand<?>> cmd = replicationManager
-        .createThrottledReplicationCommand(
-            1L, new ArrayList<>(sourceNodes.keySet()), destination, 0);
-    Assertions.assertEquals(cmdTarget, cmd.getLeft());
+    replicationManager.sendThrottledReplicationCommand(
+        container, new ArrayList<>(sourceNodes.keySet()), destination, 0);
+
+    Assertions.assertEquals(1, commandsSent.size());
+    Pair<UUID, SCMCommand<?>> cmd = commandsSent.iterator().next();
+    Assertions.assertEquals(cmdTarget.getUuid(), cmd.getLeft());
     Assertions.assertEquals(destination,
         ((ReplicateContainerCommand) cmd.getRight()).getTargetDatanode());
     Assertions.assertEquals(0,
         ((ReplicateContainerCommand) cmd.getRight()).getReplicaIndex());
   }
 
-  @Test(expected = AllSourcesOverloadedException.class)
+  @Test(expected = CommandTargetOverloadedException.class)
   public void testCreateThrottledReplicateContainerCommandThrowsWhenNoSources()
-      throws AllSourcesOverloadedException, NodeNotFoundException {
+      throws CommandTargetOverloadedException, NodeNotFoundException,
+      NotLeaderException {
     int limit = replicationManager.getConfig().getDatanodeReplicationLimit();
     Map<DatanodeDetails, Integer> sourceNodes = new HashMap<>();
     for (int i = 0; i < 3; i++) {
@@ -901,8 +907,41 @@ public class TestReplicationManager {
         });
 
     DatanodeDetails destination = MockDatanodeDetails.randomDatanodeDetails();
-    replicationManager.createThrottledReplicationCommand(
-            1L, new ArrayList<>(sourceNodes.keySet()), destination, 0);
+    ContainerInfo container = ReplicationTestUtil.createContainerInfo(
+        repConfig, 1, HddsProtos.LifeCycleState.CLOSED, 10, 20);
+    replicationManager.sendThrottledReplicationCommand(
+            container, new ArrayList<>(sourceNodes.keySet()), destination, 0);
+  }
+
+  @Test
+  public void testCreateThrottledDeleteContainerCommand()
+      throws CommandTargetOverloadedException, NodeNotFoundException,
+      NotLeaderException {
+    Mockito.when(nodeManager.getTotalDatanodeCommandCount(any(),
+            eq(SCMCommandProto.Type.deleteContainerCommand)))
+        .thenAnswer(invocation -> 0);
+
+    DatanodeDetails target = MockDatanodeDetails.randomDatanodeDetails();
+    ContainerInfo container = ReplicationTestUtil.createContainerInfo(
+        repConfig, 1, HddsProtos.LifeCycleState.CLOSED, 10, 20);
+    replicationManager.sendThrottledDeleteCommand(container, 1, target, true);
+    Assert.assertEquals(commandsSent.size(), 1);
+  }
+
+  @Test(expected = CommandTargetOverloadedException.class)
+  public void testCreateThrottledDeleteContainerCommandThrowsWhenNoSources()
+      throws CommandTargetOverloadedException, NodeNotFoundException,
+      NotLeaderException {
+    int limit = replicationManager.getConfig().getDatanodeDeleteLimit();
+
+    Mockito.when(nodeManager.getTotalDatanodeCommandCount(any(),
+            eq(SCMCommandProto.Type.deleteContainerCommand)))
+        .thenAnswer(invocation -> limit + 1);
+
+    DatanodeDetails target = MockDatanodeDetails.randomDatanodeDetails();
+    ContainerInfo container = ReplicationTestUtil.createContainerInfo(
+        repConfig, 1, HddsProtos.LifeCycleState.CLOSED, 10, 20);
+    replicationManager.sendThrottledDeleteCommand(container, 1, target, true);
   }
 
   @SafeVarargs


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to