This is an automated email from the ASF dual-hosted git repository.

sumitagrawal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 88e18e3ecc HDDS-8882. Manage status of DeleteBlocksCommand in SCM to 
avoid sending duplicates to Datanode (#4988)
88e18e3ecc is described below

commit 88e18e3ecc7f6e1d9b4ecacf269e72fdd192ca95
Author: XiChen <[email protected]>
AuthorDate: Mon Dec 18 21:01:24 2023 +0800

    HDDS-8882. Manage status of DeleteBlocksCommand in SCM to avoid sending 
duplicates to Datanode (#4988)
---
 .../report/CommandStatusReportPublisher.java       |   2 +-
 .../common/report/TestReportPublisher.java         |   4 +-
 .../hadoop/hdds/scm/block/DeletedBlockLog.java     |  37 +-
 .../hadoop/hdds/scm/block/DeletedBlockLogImpl.java | 238 ++++-----
 .../hdds/scm/block/SCMBlockDeletingService.java    |  10 +-
 .../SCMDeletedBlockTransactionStatusManager.java   | 581 +++++++++++++++++++++
 .../scm/command/CommandStatusReportHandler.java    |  36 +-
 .../hadoop/hdds/scm/node/DeadNodeHandler.java      |  20 +
 .../apache/hadoop/hdds/scm/node/NodeManager.java   |  13 +
 .../hadoop/hdds/scm/node/SCMNodeManager.java       |  19 +
 .../hdds/scm/server/StorageContainerManager.java   |   4 +
 .../hadoop/hdds/scm/block/TestDeletedBlockLog.java | 221 +++++++-
 .../TestSCMDeleteBlocksCommandStatusManager.java   | 256 +++++++++
 .../hadoop/hdds/scm/node/TestDeadNodeHandler.java  |  12 +-
 .../hadoop/hdds/scm/node/TestSCMNodeManager.java   |   5 +-
 15 files changed, 1268 insertions(+), 190 deletions(-)

diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/CommandStatusReportPublisher.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/CommandStatusReportPublisher.java
index 19fde71468..ee08c9f79d 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/CommandStatusReportPublisher.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/CommandStatusReportPublisher.java
@@ -78,9 +78,9 @@ public class CommandStatusReportPublisher extends
       // If status is still pending then don't remove it from map as
       // CommandHandler will change its status when it works on this command.
       if (!cmdStatus.getStatus().equals(Status.PENDING)) {
-        builder.addCmdStatus(cmdStatus.getProtoBufMessage());
         map.remove(key);
       }
+      builder.addCmdStatus(cmdStatus.getProtoBufMessage());
     });
     return builder.getCmdStatusCount() > 0 ? builder.build() : null;
   }
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java
index 9fb9c7251c..42529cabc7 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java
@@ -173,8 +173,8 @@ public class TestReportPublisher {
         .build();
     cmdStatusMap.put(obj1.getCmdId(), obj1);
     cmdStatusMap.put(obj2.getCmdId(), obj2);
-    // We are not sending the commands whose status is PENDING.
-    Assertions.assertEquals(1,
+    // We will sending the commands whose status is PENDING and EXECUTED
+    Assertions.assertEquals(2,
         ((CommandStatusReportPublisher) publisher).getReport()
             .getCmdStatusCount(),
         "Should publish report with 2 status objects");
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLog.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLog.java
index cb9cf603b1..45d53c0ef2 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLog.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLog.java
@@ -19,12 +19,10 @@ package org.apache.hadoop.hdds.scm.block;
 
 import java.util.Set;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto
-    .DeleteBlockTransactionResult;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
 import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
 
 import java.io.Closeable;
 import java.io.IOException;
@@ -88,14 +86,33 @@ public interface DeletedBlockLog extends Closeable {
   int resetCount(List<Long> txIDs) throws IOException;
 
   /**
-   * Commits a transaction means to delete all footprints of a transaction
-   * from the log. This method doesn't guarantee all transactions can be
-   * successfully deleted, it tolerate failures and tries best efforts to.
-   *  @param transactionResults - delete block transaction results.
-   * @param dnID - ID of datanode which acknowledges the delete block command.
+   * Records the creation of a transaction for a DataNode.
+   *
+   * @param dnId The identifier of the DataNode.
+   * @param scmCmdId The ID of the SCM command.
+   * @param dnTxSet Set of transaction IDs for the DataNode.
+   */
+  void recordTransactionCreated(
+      UUID dnId, long scmCmdId, Set<Long> dnTxSet);
+
+  /**
+   * Handles the cleanup process when a DataNode is reported dead. This method
+   * is responsible for updating or cleaning up the transaction records
+   * associated with the dead DataNode.
+   *
+   * @param dnId The identifier of the dead DataNode.
+   */
+  void onDatanodeDead(UUID dnId);
+
+  /**
+   * Records the event of sending a block deletion command to a DataNode. This
+   * method is called when a command is successfully dispatched to a DataNode,
+   * and it helps in tracking the status of the command.
+   *
+   * @param dnId Details of the DataNode.
+   * @param scmCommand The block deletion command sent.
    */
-  void commitTransactions(List<DeleteBlockTransactionResult> 
transactionResults,
-      UUID dnID);
+  void onSent(DatanodeDetails dnId, SCMCommand<?> scmCommand);
 
   /**
    * Creates block deletion transactions for a set of containers,
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
index 8e2d014916..ac64f6e973 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
@@ -18,25 +18,25 @@
 package org.apache.hadoop.hdds.scm.block;
 
 import java.io.IOException;
+import java.time.Duration;
 import java.util.HashSet;
 import java.util.List;
 import java.util.UUID;
 import java.util.Set;
 import java.util.Map;
-import java.util.LinkedHashSet;
 import java.util.ArrayList;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.stream.Collectors;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatus;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto.DeleteBlockTransactionResult;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
-import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatus;
 import 
org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler.DeleteBlockStatus;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.ContainerManager;
@@ -55,11 +55,12 @@ import org.apache.hadoop.hdds.utils.db.Table;
 import org.apache.hadoop.hdds.utils.db.TableIterator;
 
 import com.google.common.collect.Lists;
-import static java.lang.Math.min;
 import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_BLOCK_DELETION_MAX_RETRY;
 import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_BLOCK_DELETION_MAX_RETRY_DEFAULT;
+import static 
org.apache.hadoop.hdds.scm.block.SCMDeletedBlockTransactionStatusManager.SCMDeleteBlocksCommandStatusManager.CmdStatus;
 import static org.apache.hadoop.hdds.scm.ha.SequenceIdGenerator.DEL_TXN_ID;
 
+import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -82,16 +83,15 @@ public class DeletedBlockLogImpl
   private final int maxRetry;
   private final ContainerManager containerManager;
   private final Lock lock;
-  // Maps txId to set of DNs which are successful in committing the transaction
-  private final Map<Long, Set<UUID>> transactionToDNsCommitMap;
-  // Maps txId to its retry counts;
-  private final Map<Long, Integer> transactionToRetryCountMap;
   // The access to DeletedBlocksTXTable is protected by
   // DeletedBlockLogStateManager.
   private final DeletedBlockLogStateManager deletedBlockLogStateManager;
   private final SCMContext scmContext;
   private final SequenceIdGenerator sequenceIdGen;
   private final ScmBlockDeletingServiceMetrics metrics;
+  private final SCMDeletedBlockTransactionStatusManager
+      transactionStatusManager;
+  private long scmCommandTimeoutMs = Duration.ofSeconds(300).toMillis();
 
   private static final int LIST_ALL_FAILED_TRANSACTIONS = -1;
 
@@ -109,12 +109,6 @@ public class DeletedBlockLogImpl
     this.containerManager = containerManager;
     this.lock = new ReentrantLock();
 
-    // transactionToDNsCommitMap is updated only when
-    // transaction is added to the log and when it is removed.
-
-    // maps transaction to dns which have committed it.
-    transactionToDNsCommitMap = new ConcurrentHashMap<>();
-    transactionToRetryCountMap = new ConcurrentHashMap<>();
     this.deletedBlockLogStateManager = DeletedBlockLogStateManagerImpl
         .newBuilder()
         .setConfiguration(conf)
@@ -126,6 +120,9 @@ public class DeletedBlockLogImpl
     this.scmContext = scmContext;
     this.sequenceIdGen = sequenceIdGen;
     this.metrics = metrics;
+    this.transactionStatusManager =
+        new 
SCMDeletedBlockTransactionStatusManager(deletedBlockLogStateManager,
+            containerManager, scmContext, metrics, scmCommandTimeoutMs);
   }
 
   @Override
@@ -170,25 +167,7 @@ public class DeletedBlockLogImpl
       throws IOException {
     lock.lock();
     try {
-      ArrayList<Long> txIDsToUpdate = new ArrayList<>();
-      for (Long txID : txIDs) {
-        int currentCount =
-            transactionToRetryCountMap.getOrDefault(txID, 0);
-        if (currentCount > maxRetry) {
-          continue;
-        } else {
-          currentCount += 1;
-          if (currentCount > maxRetry) {
-            txIDsToUpdate.add(txID);
-          }
-          transactionToRetryCountMap.put(txID, currentCount);
-        }
-      }
-
-      if (!txIDsToUpdate.isEmpty()) {
-        deletedBlockLogStateManager
-            .increaseRetryCountOfTransactionInDB(txIDsToUpdate);
-      }
+      transactionStatusManager.incrementRetryCount(txIDs, maxRetry);
     } finally {
       lock.unlock();
     }
@@ -207,9 +186,7 @@ public class DeletedBlockLogImpl
             .map(DeletedBlocksTransaction::getTxID)
             .collect(Collectors.toList());
       }
-      for (Long txID: txIDs) {
-        transactionToRetryCountMap.computeIfPresent(txID, (key, value) -> 0);
-      }
+      transactionStatusManager.resetRetryCount(txIDs);
       return deletedBlockLogStateManager.resetRetryCountOfTransactionInDB(
           new ArrayList<>(new HashSet<>(txIDs)));
     } finally {
@@ -227,89 +204,6 @@ public class DeletedBlockLogImpl
         .build();
   }
 
-  /**
-   * {@inheritDoc}
-   *
-   * @param transactionResults - transaction IDs.
-   * @param dnID               - Id of Datanode which has acknowledged
-   *                           a delete block command.
-   * @throws IOException
-   */
-  @Override
-  public void commitTransactions(
-      List<DeleteBlockTransactionResult> transactionResults, UUID dnID) {
-    lock.lock();
-    try {
-      ArrayList<Long> txIDsToBeDeleted = new ArrayList<>();
-      Set<UUID> dnsWithCommittedTxn;
-      for (DeleteBlockTransactionResult transactionResult :
-          transactionResults) {
-        if (isTransactionFailed(transactionResult)) {
-          metrics.incrBlockDeletionTransactionFailure();
-          continue;
-        }
-        try {
-          metrics.incrBlockDeletionTransactionSuccess();
-          long txID = transactionResult.getTxID();
-          // set of dns which have successfully committed transaction txId.
-          dnsWithCommittedTxn = transactionToDNsCommitMap.get(txID);
-          final ContainerID containerId = ContainerID.valueOf(
-              transactionResult.getContainerID());
-          if (dnsWithCommittedTxn == null) {
-            // Mostly likely it's a retried delete command response.
-            if (LOG.isDebugEnabled()) {
-              LOG.debug(
-                  "Transaction txId={} commit by dnId={} for containerID={}"
-                      + " failed. Corresponding entry not found.", txID, dnID,
-                  containerId);
-            }
-            continue;
-          }
-
-          dnsWithCommittedTxn.add(dnID);
-          final ContainerInfo container =
-              containerManager.getContainer(containerId);
-          final Set<ContainerReplica> replicas =
-              containerManager.getContainerReplicas(containerId);
-          // The delete entry can be safely removed from the log if all the
-          // corresponding nodes commit the txn. It is required to check that
-          // the nodes returned in the pipeline match the replication factor.
-          if (min(replicas.size(), dnsWithCommittedTxn.size())
-              >= container.getReplicationConfig().getRequiredNodes()) {
-            List<UUID> containerDns = replicas.stream()
-                .map(ContainerReplica::getDatanodeDetails)
-                .map(DatanodeDetails::getUuid)
-                .collect(Collectors.toList());
-            if (dnsWithCommittedTxn.containsAll(containerDns)) {
-              transactionToDNsCommitMap.remove(txID);
-              transactionToRetryCountMap.remove(txID);
-              if (LOG.isDebugEnabled()) {
-                LOG.debug("Purging txId={} from block deletion log", txID);
-              }
-              txIDsToBeDeleted.add(txID);
-            }
-          }
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Datanode txId={} containerId={} committed by dnId={}",
-                txID, containerId, dnID);
-          }
-        } catch (IOException e) {
-          LOG.warn("Could not commit delete block transaction: " +
-              transactionResult.getTxID(), e);
-        }
-      }
-      try {
-        deletedBlockLogStateManager.removeTransactionsFromDB(txIDsToBeDeleted);
-        metrics.incrBlockDeletionTransactionCompleted(txIDsToBeDeleted.size());
-      } catch (IOException e) {
-        LOG.warn("Could not commit delete block transactions: "
-            + txIDsToBeDeleted, e);
-      }
-    } finally {
-      lock.unlock();
-    }
-  }
-
   private boolean isTransactionFailed(DeleteBlockTransactionResult result) {
     if (LOG.isDebugEnabled()) {
       LOG.debug(
@@ -348,7 +242,7 @@ public class DeletedBlockLogImpl
   @Override
   public void reinitialize(
       Table<Long, DeletedBlocksTransaction> deletedTable) {
-    // we don't need handle transactionToDNsCommitMap and
+    // we don't need to handle SCMDeletedBlockTransactionStatusManager and
     // deletedBlockLogStateManager, since they will be cleared
     // when becoming leader.
     deletedBlockLogStateManager.reinitialize(deletedTable);
@@ -359,8 +253,7 @@ public class DeletedBlockLogImpl
    *  leader.
    */
   public void onBecomeLeader() {
-    transactionToDNsCommitMap.clear();
-    transactionToRetryCountMap.clear();
+    transactionStatusManager.clear();
   }
 
   /**
@@ -404,23 +297,21 @@ public class DeletedBlockLogImpl
 
   private void getTransaction(DeletedBlocksTransaction tx,
       DatanodeDeletedBlockTransactions transactions,
-      Set<DatanodeDetails> dnList, Set<ContainerReplica> replicas) {
+      Set<DatanodeDetails> dnList, Set<ContainerReplica> replicas,
+      Map<UUID, Map<Long, CmdStatus>> commandStatus) {
     DeletedBlocksTransaction updatedTxn =
         DeletedBlocksTransaction.newBuilder(tx)
-            .setCount(transactionToRetryCountMap.getOrDefault(tx.getTxID(), 0))
+            .setCount(transactionStatusManager.getOrDefaultRetryCount(
+              tx.getTxID(), 0))
             .build();
     for (ContainerReplica replica : replicas) {
-      UUID dnID = replica.getDatanodeDetails().getUuid();
-      if (!dnList.contains(replica.getDatanodeDetails())) {
+      DatanodeDetails details = replica.getDatanodeDetails();
+      if (!dnList.contains(details)) {
         continue;
       }
-      Set<UUID> dnsWithTransactionCommitted =
-          transactionToDNsCommitMap.get(updatedTxn.getTxID());
-      if (dnsWithTransactionCommitted == null || !dnsWithTransactionCommitted
-          .contains(dnID)) {
-        // Transaction need not be sent to dns which have
-        // already committed it
-        transactions.addTransactionToDN(dnID, updatedTxn);
+      if (!transactionStatusManager.isDuplication(
+          details, updatedTxn.getTxID(), commandStatus)) {
+        transactions.addTransactionToDN(details.getUuid(), updatedTxn);
       }
     }
   }
@@ -442,15 +333,26 @@ public class DeletedBlockLogImpl
       throws IOException {
     lock.lock();
     try {
+      // Here we can clean up the Datanode timeout command that no longer
+      // reports heartbeats
+      getSCMDeletedBlockTransactionStatusManager().cleanAllTimeoutSCMCommand(
+          scmCommandTimeoutMs);
       DatanodeDeletedBlockTransactions transactions =
           new DatanodeDeletedBlockTransactions();
       try (TableIterator<Long,
           ? extends Table.KeyValue<Long, DeletedBlocksTransaction>> iter =
                deletedBlockLogStateManager.getReadOnlyIterator()) {
+        // Get the CmdStatus status of the aggregation, so that the current
+        // status of the specified transaction can be found faster
+        Map<UUID, Map<Long, CmdStatus>> commandStatus =
+            getSCMDeletedBlockTransactionStatusManager()
+                .getCommandStatusByTxId(dnList.stream().
+                map(DatanodeDetails::getUuid).collect(Collectors.toSet()));
         ArrayList<Long> txIDs = new ArrayList<>();
         // Here takes block replica count as the threshold to avoid the case
         // that part of replicas committed the TXN and recorded in the
-        // transactionToDNsCommitMap, while they are counted in the threshold.
+        // SCMDeletedBlockTransactionStatusManager, while they are counted
+        // in the threshold.
         while (iter.hasNext() &&
             transactions.getBlocksDeleted() < blockDeletionLimit) {
           Table.KeyValue<Long, DeletedBlocksTransaction> keyValue = 
iter.next();
@@ -471,9 +373,8 @@ public class DeletedBlockLogImpl
               if (checkInadequateReplica(replicas, txn)) {
                 continue;
               }
-              getTransaction(txn, transactions, dnList, replicas);
-              transactionToDNsCommitMap
-                  .putIfAbsent(txn.getTxID(), new LinkedHashSet<>());
+              getTransaction(
+                  txn, transactions, dnList, replicas, commandStatus);
             }
           } catch (ContainerNotFoundException ex) {
             LOG.warn("Container: " + id + " was not found for the transaction: 
"
@@ -492,6 +393,33 @@ public class DeletedBlockLogImpl
     }
   }
 
+  public void setScmCommandTimeoutMs(long scmCommandTimeoutMs) {
+    this.scmCommandTimeoutMs = scmCommandTimeoutMs;
+  }
+
+  @VisibleForTesting
+  public SCMDeletedBlockTransactionStatusManager
+      getSCMDeletedBlockTransactionStatusManager() {
+    return transactionStatusManager;
+  }
+
+  @Override
+  public void recordTransactionCreated(UUID dnId, long scmCmdId,
+      Set<Long> dnTxSet) {
+    getSCMDeletedBlockTransactionStatusManager()
+        .recordTransactionCreated(dnId, scmCmdId, dnTxSet);
+  }
+
+  @Override
+  public void onDatanodeDead(UUID dnId) {
+    getSCMDeletedBlockTransactionStatusManager().onDatanodeDead(dnId);
+  }
+
+  @Override
+  public void onSent(DatanodeDetails dnId, SCMCommand<?> scmCommand) {
+    getSCMDeletedBlockTransactionStatusManager().onSent(dnId, scmCommand);
+  }
+
   @Override
   public void onMessage(
       DeleteBlockStatus deleteBlockStatus, EventPublisher publisher) {
@@ -500,18 +428,30 @@ public class DeletedBlockLogImpl
       return;
     }
 
-    CommandStatus.Status status = deleteBlockStatus.getCmdStatus().getStatus();
-    if (status == CommandStatus.Status.EXECUTED) {
-      ContainerBlocksDeletionACKProto ackProto =
-          deleteBlockStatus.getCmdStatus().getBlockDeletionAck();
-      commitTransactions(ackProto.getResultsList(),
-          UUID.fromString(ackProto.getDnId()));
-      metrics.incrBlockDeletionCommandSuccess();
-    } else if (status == CommandStatus.Status.FAILED) {
-      metrics.incrBlockDeletionCommandFailure();
-    } else {
-      LOG.error("Delete Block Command is not executed yet.");
-      return;
+    DatanodeDetails details = deleteBlockStatus.getDatanodeDetails();
+    UUID dnId = details.getUuid();
+    for (CommandStatus commandStatus : deleteBlockStatus.getCmdStatus()) {
+      CommandStatus.Status status = commandStatus.getStatus();
+      lock.lock();
+      try {
+        if (status == CommandStatus.Status.EXECUTED) {
+          ContainerBlocksDeletionACKProto ackProto =
+              commandStatus.getBlockDeletionAck();
+          getSCMDeletedBlockTransactionStatusManager()
+              .commitTransactions(ackProto.getResultsList(), dnId);
+          metrics.incrBlockDeletionCommandSuccess();
+        } else if (status == CommandStatus.Status.FAILED) {
+          metrics.incrBlockDeletionCommandFailure();
+        } else {
+          LOG.debug("Delete Block Command {} is not executed on the Datanode" +
+              " {}.", commandStatus.getCmdId(), dnId);
+        }
+
+        getSCMDeletedBlockTransactionStatusManager()
+            .commitSCMCommandStatus(deleteBlockStatus.getCmdStatus(), dnId);
+      } finally {
+        lock.unlock();
+      }
     }
   }
 }
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java
index 5fd889b758..24e950a599 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java
@@ -49,7 +49,6 @@ import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
 import org.apache.hadoop.hdds.utils.BackgroundTaskResult.EmptyTaskResult;
 import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
 import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
-import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
 import org.apache.hadoop.util.Time;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -176,11 +175,14 @@ public class SCMBlockDeletingService extends 
BackgroundService
             UUID dnId = entry.getKey();
             List<DeletedBlocksTransaction> dnTXs = entry.getValue();
             if (!dnTXs.isEmpty()) {
-              processedTxIDs.addAll(dnTXs.stream()
+              Set<Long> dnTxSet = dnTXs.stream()
                   .map(DeletedBlocksTransaction::getTxID)
-                  .collect(Collectors.toSet()));
-              SCMCommand<?> command = new DeleteBlocksCommand(dnTXs);
+                  .collect(Collectors.toSet());
+              processedTxIDs.addAll(dnTxSet);
+              DeleteBlocksCommand command = new DeleteBlocksCommand(dnTXs);
               command.setTerm(scmContext.getTermOfLeader());
+              deletedBlockLog.recordTransactionCreated(dnId, command.getId(),
+                  dnTxSet);
               eventPublisher.fireEvent(SCMEvents.DATANODE_COMMAND,
                   new CommandForDatanode<>(dnId, command));
               metrics.incrBlockDeletionCommandSent();
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMDeletedBlockTransactionStatusManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMDeletedBlockTransactionStatusManager.java
new file mode 100644
index 0000000000..b43e91e059
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMDeletedBlockTransactionStatusManager.java
@@ -0,0 +1,581 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.hdds.scm.block;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatus;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto.DeleteBlockTransactionResult;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerManager;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
+import org.apache.hadoop.hdds.scm.ha.SCMContext;
+import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+import static java.lang.Math.min;
+import static 
org.apache.hadoop.hdds.scm.block.SCMDeletedBlockTransactionStatusManager.SCMDeleteBlocksCommandStatusManager.CmdStatus;
+import static 
org.apache.hadoop.hdds.scm.block.SCMDeletedBlockTransactionStatusManager.SCMDeleteBlocksCommandStatusManager.CmdStatus.SENT;
+import static 
org.apache.hadoop.hdds.scm.block.SCMDeletedBlockTransactionStatusManager.SCMDeleteBlocksCommandStatusManager.CmdStatus.TO_BE_SENT;
+
+/**
+ * This is a class to manage the status of DeletedBlockTransaction,
+ * the purpose of this class is to reduce the number of duplicate
+ * DeletedBlockTransaction sent to the DN.
+ */
+public class SCMDeletedBlockTransactionStatusManager {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(SCMDeletedBlockTransactionStatusManager.class);
+  // Maps txId to set of DNs which are successful in committing the transaction
+  private final Map<Long, Set<UUID>> transactionToDNsCommitMap;
+  // Maps txId to its retry counts;
+  private final Map<Long, Integer> transactionToRetryCountMap;
+  // The access to DeletedBlocksTXTable is protected by
+  // DeletedBlockLogStateManager.
+  private final DeletedBlockLogStateManager deletedBlockLogStateManager;
+  private final ContainerManager containerManager;
+  private final ScmBlockDeletingServiceMetrics metrics;
+  private final SCMContext scmContext;
+  private final long scmCommandTimeoutMs;
+
+  /**
+   * Before the DeletedBlockTransaction is executed on DN and reported to
+   * SCM, it is managed by this {@link SCMDeleteBlocksCommandStatusManager}.
+   * After the DeletedBlocksTransaction in the DeleteBlocksCommand is
+   * committed on the SCM, it is managed by
+   * {@link SCMDeletedBlockTransactionStatusManager#transactionToDNsCommitMap}
+   */
+  private final SCMDeleteBlocksCommandStatusManager
+      scmDeleteBlocksCommandStatusManager;
+
+  public SCMDeletedBlockTransactionStatusManager(
+      DeletedBlockLogStateManager deletedBlockLogStateManager,
+      ContainerManager containerManager, SCMContext scmContext,
+      ScmBlockDeletingServiceMetrics metrics, long scmCommandTimeoutMs) {
+    // maps transaction to dns which have committed it.
+    this.deletedBlockLogStateManager = deletedBlockLogStateManager;
+    this.metrics = metrics;
+    this.containerManager = containerManager;
+    this.scmContext = scmContext;
+    this.scmCommandTimeoutMs = scmCommandTimeoutMs;
+    this.transactionToDNsCommitMap = new ConcurrentHashMap<>();
+    this.transactionToRetryCountMap = new ConcurrentHashMap<>();
+    this.scmDeleteBlocksCommandStatusManager =
+        new SCMDeleteBlocksCommandStatusManager();
+  }
+
+  /**
+   * A class that manages the status of a DeletedBlockTransaction based
+   * on DeleteBlocksCommand.
+   */
+  protected static class SCMDeleteBlocksCommandStatusManager {
+    public static final Logger LOG =
+        LoggerFactory.getLogger(SCMDeleteBlocksCommandStatusManager.class);
+    private final Map<UUID, Map<Long, CmdStatusData>> scmCmdStatusRecord;
+
+    private static final CmdStatus DEFAULT_STATUS = TO_BE_SENT;
+    private static final Set<CmdStatus> STATUSES_REQUIRING_TIMEOUT =
+        new HashSet<>(Arrays.asList(SENT));
+
+    public SCMDeleteBlocksCommandStatusManager() {
+      this.scmCmdStatusRecord = new ConcurrentHashMap<>();
+    }
+
+    /**
+     * Status of SCMDeleteBlocksCommand.
+     */
+    public enum CmdStatus {
+      // The DeleteBlocksCommand has not yet been sent.
+      // This is the initial status of the command after it's created.
+      TO_BE_SENT,
+      // If the DeleteBlocksCommand has been sent but has not been executed
+      // completely by DN, the DeleteBlocksCommand's state will be SENT.
+      // Note that the state of SENT includes the following possibilities.
+      //   - The command was sent but not received
+      //   - The command was sent and received by the DN,
+      //     and is waiting to be executed.
+      //   - The Command sent and being executed by DN
+      SENT
+    }
+
+    protected static final class CmdStatusData {
+      private final UUID dnId;
+      private final long scmCmdId;
+      private final Set<Long> deletedBlocksTxIds;
+      private Instant updateTime;
+      private CmdStatus status;
+
+      private CmdStatusData(
+          UUID dnId, long scmTxID, Set<Long> deletedBlocksTxIds) {
+        this.dnId = dnId;
+        this.scmCmdId = scmTxID;
+        this.deletedBlocksTxIds = deletedBlocksTxIds;
+        setStatus(DEFAULT_STATUS);
+      }
+
+      public Set<Long> getDeletedBlocksTxIds() {
+        return Collections.unmodifiableSet(deletedBlocksTxIds);
+      }
+
+      public UUID getDnId() {
+        return dnId;
+      }
+
+      public long getScmCmdId() {
+        return scmCmdId;
+      }
+
+      public CmdStatus getStatus() {
+        return status;
+      }
+
+      public void setStatus(CmdStatus status) {
+        this.updateTime = Instant.now();
+        this.status = status;
+      }
+
+      public Instant getUpdateTime() {
+        return updateTime;
+      }
+
+      @Override
+      public String toString() {
+        return "ScmTxStateMachine" +
+            "{dnId=" + dnId +
+            ", scmTxID=" + scmCmdId +
+            ", deletedBlocksTxIds=" + deletedBlocksTxIds +
+            ", updateTime=" + updateTime +
+            ", status=" + status +
+            '}';
+      }
+    }
+
+    protected static CmdStatusData createScmCmdStatusData(
+        UUID dnId, long scmCmdId, Set<Long> deletedBlocksTxIds) {
+      return new CmdStatusData(dnId, scmCmdId, deletedBlocksTxIds);
+    }
+
+    protected void recordScmCommand(CmdStatusData statusData) {
+      LOG.debug("Record ScmCommand: {}", statusData);
+      scmCmdStatusRecord.computeIfAbsent(statusData.getDnId(), k ->
+          new ConcurrentHashMap<>()).put(statusData.getScmCmdId(), statusData);
+    }
+
+    protected void onSent(UUID dnId, long scmCmdId) {
+      updateStatus(dnId, scmCmdId, CommandStatus.Status.PENDING);
+    }
+
+    protected void onDatanodeDead(UUID dnId) {
+      LOG.info("Clean SCMCommand record for DN: {}", dnId);
+      scmCmdStatusRecord.remove(dnId);
+    }
+
+    protected void updateStatusByDNCommandStatus(UUID dnId, long scmCmdId,
+        CommandStatus.Status newState) {
+      updateStatus(dnId, scmCmdId, newState);
+    }
+
+    protected void cleanAllTimeoutSCMCommand(long timeoutMs) {
+      for (UUID dnId : scmCmdStatusRecord.keySet()) {
+        for (CmdStatus status : STATUSES_REQUIRING_TIMEOUT) {
+          removeTimeoutScmCommand(
+              dnId, getScmCommandIds(dnId, status), timeoutMs);
+        }
+      }
+    }
+
+    public void cleanTimeoutSCMCommand(UUID dnId, long timeoutMs) {
+      for (CmdStatus status : STATUSES_REQUIRING_TIMEOUT) {
+        removeTimeoutScmCommand(
+            dnId, getScmCommandIds(dnId, status), timeoutMs);
+      }
+    }
+
+    private Set<Long> getScmCommandIds(UUID dnId, CmdStatus status) {
+      Set<Long> scmCmdIds = new HashSet<>();
+      Map<Long, CmdStatusData> record = scmCmdStatusRecord.get(dnId);
+      if (record == null) {
+        return scmCmdIds;
+      }
+      for (CmdStatusData statusData : record.values()) {
+        if (statusData.getStatus().equals(status)) {
+          scmCmdIds.add(statusData.getScmCmdId());
+        }
+      }
+      return scmCmdIds;
+    }
+
+    private Instant getUpdateTime(UUID dnId, long scmCmdId) {
+      Map<Long, CmdStatusData> record = scmCmdStatusRecord.get(dnId);
+      if (record == null || record.get(scmCmdId) == null) {
+        return null;
+      }
+      return record.get(scmCmdId).getUpdateTime();
+    }
+
+    private void updateStatus(UUID dnId, long scmCmdId,
+        CommandStatus.Status newStatus) {
+      Map<Long, CmdStatusData> recordForDn = scmCmdStatusRecord.get(dnId);
+      if (recordForDn == null) {
+        LOG.warn("Unknown Datanode: {} scmCmdId {} newStatus {}",
+            dnId, scmCmdId, newStatus);
+        return;
+      }
+      if (recordForDn.get(scmCmdId) == null) {
+        LOG.warn("Unknown SCM Command: {} Datanode {} newStatus {}",
+            scmCmdId, dnId, newStatus);
+        return;
+      }
+
+      boolean changed = false;
+      CmdStatusData statusData = recordForDn.get(scmCmdId);
+      CmdStatus oldStatus = statusData.getStatus();
+      switch (newStatus) {
+      case PENDING:
+        if (oldStatus == TO_BE_SENT || oldStatus == SENT) {
+          // TO_BE_SENT -> SENT: The DeleteBlocksCommand is sent by SCM,
+          // The follow-up status has not been updated by Datanode.
+
+          // SENT -> SENT: The DeleteBlocksCommand continues to wait to be
+          // executed by Datanode.
+          statusData.setStatus(SENT);
+          changed = true;
+        }
+        break;
+      case EXECUTED:
+      case FAILED:
+        if (oldStatus == SENT) {
+          // Once the DN executes DeleteBlocksCommands, regardless of whether
+          // DeleteBlocksCommands is executed successfully or not,
+          // it will be deleted from record.
+          // Successful DeleteBlocksCommands are recorded in
+          // `transactionToDNsCommitMap`.
+          removeScmCommand(dnId, scmCmdId);
+          changed = true;
+        }
+        if (oldStatus == TO_BE_SENT) {
+          // SCM receives a reply to an unsent transaction,
+          // which should not normally occur.
+          LOG.error("Received {} status for a command marked TO_BE_SENT. " +
+                  "This indicates a potential issue in command handling. " +
+                  "SCM Command ID: {}, Datanode ID: {}, Current Status: {}",
+              newStatus, scmCmdId, dnId, oldStatus);
+          removeScmCommand(dnId, scmCmdId);
+          changed = true;
+        }
+        break;
+      default:
+        LOG.error("Can not update to Unknown new Status: {}", newStatus);
+        break;
+      }
+      if (!changed) {
+        LOG.warn("Cannot update illegal status for DN: {} ScmCommandId {} " +
+            "Status From {} to {}", dnId, scmCmdId, oldStatus, newStatus);
+      } else {
+        LOG.debug("Successful update DN: {} ScmCommandId {} Status From {} to" 
+
+            " {}", dnId, scmCmdId, oldStatus, newStatus);
+      }
+    }
+
+    private void removeTimeoutScmCommand(UUID dnId,
+        Set<Long> scmCmdIds, long timeoutMs) {
+      Instant now = Instant.now();
+      for (Long scmCmdId : scmCmdIds) {
+        Instant updateTime = getUpdateTime(dnId, scmCmdId);
+        if (updateTime != null &&
+            Duration.between(updateTime, now).toMillis() > timeoutMs) {
+          CmdStatusData state = removeScmCommand(dnId, scmCmdId);
+          LOG.warn("Remove Timeout SCM BlockDeletionCommand {} for DN {} " +
+              "after without update {}ms}", state, dnId, timeoutMs);
+        } else {
+          LOG.warn("Timeout SCM scmCmdIds {} for DN {} " +
+              "after without update {}ms}", scmCmdIds, dnId, timeoutMs);
+        }
+      }
+    }
+
+    private CmdStatusData removeScmCommand(UUID dnId, long scmCmdId) {
+      Map<Long, CmdStatusData> record = scmCmdStatusRecord.get(dnId);
+      if (record == null || record.get(scmCmdId) == null) {
+        return null;
+      }
+      CmdStatusData statusData = record.remove(scmCmdId);
+      LOG.debug("Remove ScmCommand {} for DN: {} ", statusData, dnId);
+      return statusData;
+    }
+
+    public Map<UUID, Map<Long, CmdStatus>> getCommandStatusByTxId(
+        Set<UUID> dnIds) {
+      Map<UUID, Map<Long, CmdStatus>> result =
+          new HashMap<>(scmCmdStatusRecord.size());
+
+      for (UUID dnId : dnIds) {
+        Map<Long, CmdStatusData> record = scmCmdStatusRecord.get(dnId);
+        if (record == null) {
+          continue;
+        }
+        Map<Long, CmdStatus> dnStatusMap = new HashMap<>();
+        for (CmdStatusData statusData : record.values()) {
+          CmdStatus status = statusData.getStatus();
+          for (Long deletedBlocksTxId : statusData.getDeletedBlocksTxIds()) {
+            dnStatusMap.put(deletedBlocksTxId, status);
+          }
+        }
+        result.put(dnId, dnStatusMap);
+      }
+
+      return result;
+    }
+
+    private void clear() {
+      scmCmdStatusRecord.clear();
+    }
+
+    @VisibleForTesting
+    Map<UUID, Map<Long, CmdStatusData>> getScmCmdStatusRecord() {
+      return scmCmdStatusRecord;
+    }
+  }
+
+  public void incrementRetryCount(List<Long> txIDs, long maxRetry)
+      throws IOException {
+    ArrayList<Long> txIDsToUpdate = new ArrayList<>();
+    for (Long txID : txIDs) {
+      int currentCount =
+          transactionToRetryCountMap.getOrDefault(txID, 0);
+      if (currentCount > maxRetry) {
+        continue;
+      } else {
+        currentCount += 1;
+        if (currentCount > maxRetry) {
+          txIDsToUpdate.add(txID);
+        }
+        transactionToRetryCountMap.put(txID, currentCount);
+      }
+    }
+
+    if (!txIDsToUpdate.isEmpty()) {
+      deletedBlockLogStateManager
+          .increaseRetryCountOfTransactionInDB(txIDsToUpdate);
+    }
+  }
+
+  public void resetRetryCount(List<Long> txIDs) throws IOException {
+    for (Long txID: txIDs) {
+      transactionToRetryCountMap.computeIfPresent(txID, (key, value) -> 0);
+    }
+  }
+
+  public int getOrDefaultRetryCount(long txID, int defaultValue) {
+    return transactionToRetryCountMap.getOrDefault(txID, defaultValue);
+  }
+
+  public void onSent(DatanodeDetails dnId, SCMCommand<?> scmCommand) {
+    scmDeleteBlocksCommandStatusManager.onSent(
+        dnId.getUuid(), scmCommand.getId());
+  }
+
+  public Map<UUID, Map<Long, CmdStatus>> getCommandStatusByTxId(
+      Set<UUID> dnIds) {
+    return scmDeleteBlocksCommandStatusManager.getCommandStatusByTxId(dnIds);
+  }
+
+  public void recordTransactionCreated(
+      UUID dnId, long scmCmdId, Set<Long> dnTxSet) {
+    scmDeleteBlocksCommandStatusManager.recordScmCommand(
+        SCMDeleteBlocksCommandStatusManager
+            .createScmCmdStatusData(dnId, scmCmdId, dnTxSet));
+    dnTxSet.forEach(txId -> transactionToDNsCommitMap
+        .putIfAbsent(txId, new LinkedHashSet<>()));
+  }
+
+  public void clear() {
+    transactionToRetryCountMap.clear();
+    scmDeleteBlocksCommandStatusManager.clear();
+    transactionToDNsCommitMap.clear();
+  }
+
+  public void cleanAllTimeoutSCMCommand(long timeoutMs) {
+    scmDeleteBlocksCommandStatusManager.cleanAllTimeoutSCMCommand(timeoutMs);
+  }
+
+  public void onDatanodeDead(UUID dnId) {
+    scmDeleteBlocksCommandStatusManager.onDatanodeDead(dnId);
+  }
+
+  public boolean isDuplication(DatanodeDetails dnDetail, long tx,
+      Map<UUID, Map<Long, CmdStatus>> commandStatus) {
+    if (alreadyExecuted(dnDetail.getUuid(), tx)) {
+      return true;
+    }
+    return inProcessing(dnDetail.getUuid(), tx, commandStatus);
+  }
+
+  public boolean alreadyExecuted(UUID dnId, long txId) {
+    Set<UUID> dnsWithTransactionCommitted =
+        transactionToDNsCommitMap.get(txId);
+    return dnsWithTransactionCommitted != null && dnsWithTransactionCommitted
+        .contains(dnId);
+  }
+
+  /**
+   * Commits a transaction means to delete all footprints of a transaction
+   * from the log. This method doesn't guarantee all transactions can be
+   * successfully deleted, it tolerate failures and tries best efforts to.
+   *  @param transactionResults - delete block transaction results.
+   * @param dnId - ID of datanode which acknowledges the delete block command.
+   */
+  @VisibleForTesting
+  public void commitTransactions(
+      List<DeleteBlockTransactionResult> transactionResults, UUID dnId) {
+
+    ArrayList<Long> txIDsToBeDeleted = new ArrayList<>();
+    Set<UUID> dnsWithCommittedTxn;
+    for (DeleteBlockTransactionResult transactionResult :
+        transactionResults) {
+      if (isTransactionFailed(transactionResult)) {
+        metrics.incrBlockDeletionTransactionFailure();
+        continue;
+      }
+      try {
+        metrics.incrBlockDeletionTransactionSuccess();
+        long txID = transactionResult.getTxID();
+        // set of dns which have successfully committed transaction txId.
+        dnsWithCommittedTxn = transactionToDNsCommitMap.get(txID);
+        final ContainerID containerId = ContainerID.valueOf(
+            transactionResult.getContainerID());
+        if (dnsWithCommittedTxn == null) {
+          // Mostly likely it's a retried delete command response.
+          if (LOG.isDebugEnabled()) {
+            LOG.debug(
+                "Transaction txId={} commit by dnId={} for containerID={}"
+                    + " failed. Corresponding entry not found.", txID, dnId,
+                containerId);
+          }
+          continue;
+        }
+
+        dnsWithCommittedTxn.add(dnId);
+        final ContainerInfo container =
+            containerManager.getContainer(containerId);
+        final Set<ContainerReplica> replicas =
+            containerManager.getContainerReplicas(containerId);
+        // The delete entry can be safely removed from the log if all the
+        // corresponding nodes commit the txn. It is required to check that
+        // the nodes returned in the pipeline match the replication factor.
+        if (min(replicas.size(), dnsWithCommittedTxn.size())
+            >= container.getReplicationConfig().getRequiredNodes()) {
+          List<UUID> containerDns = replicas.stream()
+              .map(ContainerReplica::getDatanodeDetails)
+              .map(DatanodeDetails::getUuid)
+              .collect(Collectors.toList());
+          if (dnsWithCommittedTxn.containsAll(containerDns)) {
+            transactionToDNsCommitMap.remove(txID);
+            transactionToRetryCountMap.remove(txID);
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Purging txId={} from block deletion log", txID);
+            }
+            txIDsToBeDeleted.add(txID);
+          }
+        }
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Datanode txId={} containerId={} committed by dnId={}",
+              txID, containerId, dnId);
+        }
+      } catch (IOException e) {
+        LOG.warn("Could not commit delete block transaction: " +
+            transactionResult.getTxID(), e);
+      }
+    }
+    try {
+      deletedBlockLogStateManager.removeTransactionsFromDB(txIDsToBeDeleted);
+      metrics.incrBlockDeletionTransactionCompleted(txIDsToBeDeleted.size());
+    } catch (IOException e) {
+      LOG.warn("Could not commit delete block transactions: "
+          + txIDsToBeDeleted, e);
+    }
+  }
+
+  @VisibleForTesting
+  public void commitSCMCommandStatus(List<CommandStatus> deleteBlockStatus,
+      UUID dnId) {
+    processSCMCommandStatus(deleteBlockStatus, dnId);
+    scmDeleteBlocksCommandStatusManager.
+        cleanTimeoutSCMCommand(dnId, scmCommandTimeoutMs);
+  }
+
+  private boolean inProcessing(UUID dnId, long deletedBlocksTxId,
+      Map<UUID, Map<Long, CmdStatus>> commandStatus) {
+    Map<Long, CmdStatus> deletedBlocksTxStatus = commandStatus.get(dnId);
+    return deletedBlocksTxStatus != null &&
+        deletedBlocksTxStatus.get(deletedBlocksTxId) != null;
+  }
+
+  private void processSCMCommandStatus(List<CommandStatus> deleteBlockStatus,
+      UUID dnID) {
+    Map<Long, CommandStatus> lastStatus = new HashMap<>();
+    Map<Long, CommandStatus.Status> summary = new HashMap<>();
+
+    // The CommandStatus is ordered in the report. So we can focus only on the
+    // last status in the command report.
+    deleteBlockStatus.forEach(cmdStatus -> {
+      lastStatus.put(cmdStatus.getCmdId(), cmdStatus);
+      summary.put(cmdStatus.getCmdId(), cmdStatus.getStatus());
+    });
+    LOG.debug("CommandStatus {} from Datanode {} ", summary, dnID);
+    for (Map.Entry<Long, CommandStatus> entry : lastStatus.entrySet()) {
+      CommandStatus.Status status = entry.getValue().getStatus();
+      scmDeleteBlocksCommandStatusManager.updateStatusByDNCommandStatus(
+          dnID, entry.getKey(), status);
+    }
+  }
+
+  private boolean isTransactionFailed(DeleteBlockTransactionResult result) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(
+          "Got block deletion ACK from datanode, TXIDs={}, " + "success={}",
+          result.getTxID(), result.getSuccess());
+    }
+    if (!result.getSuccess()) {
+      LOG.warn("Got failed ACK for TXID={}, prepare to resend the "
+          + "TX in next interval", result.getTxID());
+      return true;
+    }
+    return false;
+  }
+}
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/CommandStatusReportHandler.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/CommandStatusReportHandler.java
index d43311265d..5d737659dd 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/CommandStatusReportHandler.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/CommandStatusReportHandler.java
@@ -18,6 +18,8 @@
 package org.apache.hadoop.hdds.scm.command;
 
 import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.HddsIdFactory;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.SCMCommandProto;
 import org.apache.hadoop.hdds.protocol.proto
@@ -31,6 +33,7 @@ import 
org.apache.hadoop.hdds.server.events.IdentifiableEventPayload;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
 import java.util.List;
 
 /**
@@ -54,32 +57,43 @@ public class CommandStatusReportHandler implements
     }
 
     // Route command status to its watchers.
+    List<CommandStatus> deleteBlocksCommandStatus = new ArrayList<>();
     cmdStatusList.forEach(cmdStatus -> {
       if (LOGGER.isTraceEnabled()) {
         LOGGER.trace("Emitting command status for id:{} type: {}", cmdStatus
             .getCmdId(), cmdStatus.getType());
       }
       if (cmdStatus.getType() == SCMCommandProto.Type.deleteBlocksCommand) {
-        publisher.fireEvent(SCMEvents.DELETE_BLOCK_STATUS,
-            new DeleteBlockStatus(cmdStatus));
+        deleteBlocksCommandStatus.add(cmdStatus);
       } else {
         LOGGER.debug("CommandStatus of type:{} not handled in " +
             "CommandStatusReportHandler.", cmdStatus.getType());
       }
     });
+
+    /**
+     * The purpose of aggregating all CommandStatus to commit is to reduce the
+     * Thread switching. When the Datanode queue has a large number of commands
+     * , there will have many {@link CommandStatus#Status#PENDING} status
+     * CommandStatus in report
+     */
+    if (!deleteBlocksCommandStatus.isEmpty()) {
+      publisher.fireEvent(SCMEvents.DELETE_BLOCK_STATUS, new DeleteBlockStatus(
+          deleteBlocksCommandStatus, report.getDatanodeDetails()));
+    }
   }
 
   /**
    * Wrapper event for CommandStatus.
    */
   public static class CommandStatusEvent implements IdentifiableEventPayload {
-    private CommandStatus cmdStatus;
+    private final List<CommandStatus> cmdStatus;
 
-    CommandStatusEvent(CommandStatus cmdStatus) {
+    CommandStatusEvent(List<CommandStatus> cmdStatus) {
       this.cmdStatus = cmdStatus;
     }
 
-    public CommandStatus getCmdStatus() {
+    public List<CommandStatus> getCmdStatus() {
       return cmdStatus;
     }
 
@@ -90,7 +104,7 @@ public class CommandStatusReportHandler implements
 
     @Override
     public long getId() {
-      return cmdStatus.getCmdId();
+      return HddsIdFactory.getLongId();
     }
   }
 
@@ -98,8 +112,16 @@ public class CommandStatusReportHandler implements
    * Wrapper event for DeleteBlock Command.
    */
   public static class DeleteBlockStatus extends CommandStatusEvent {
-    public DeleteBlockStatus(CommandStatus cmdStatus) {
+    private final DatanodeDetails datanodeDetails;
+
+    public DeleteBlockStatus(List<CommandStatus> cmdStatus,
+        DatanodeDetails datanodeDetails) {
       super(cmdStatus);
+      this.datanodeDetails = datanodeDetails;
+    }
+
+    public DatanodeDetails getDatanodeDetails() {
+      return datanodeDetails;
     }
   }
 
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java
index f05eb761d9..3c40437d7f 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java
@@ -24,6 +24,7 @@ import java.util.Optional;
 
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.block.DeletedBlockLog;
 import org.apache.hadoop.hdds.scm.container.ContainerException;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.ContainerManager;
@@ -41,6 +42,8 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
 
+import javax.annotation.Nullable;
+
 import static org.apache.hadoop.hdds.scm.events.SCMEvents.CLOSE_CONTAINER;
 
 /**
@@ -51,6 +54,8 @@ public class DeadNodeHandler implements 
EventHandler<DatanodeDetails> {
   private final NodeManager nodeManager;
   private final PipelineManager pipelineManager;
   private final ContainerManager containerManager;
+  @Nullable
+  private final DeletedBlockLog deletedBlockLog;
 
   private static final Logger LOG =
       LoggerFactory.getLogger(DeadNodeHandler.class);
@@ -58,9 +63,17 @@ public class DeadNodeHandler implements 
EventHandler<DatanodeDetails> {
   public DeadNodeHandler(final NodeManager nodeManager,
                          final PipelineManager pipelineManager,
                          final ContainerManager containerManager) {
+    this(nodeManager, pipelineManager, containerManager, null);
+  }
+
+  public DeadNodeHandler(final NodeManager nodeManager,
+                         final PipelineManager pipelineManager,
+                         final ContainerManager containerManager,
+                         @Nullable final DeletedBlockLog deletedBlockLog) {
     this.nodeManager = nodeManager;
     this.pipelineManager = pipelineManager;
     this.containerManager = containerManager;
+    this.deletedBlockLog = deletedBlockLog;
   }
 
   @Override
@@ -95,6 +108,13 @@ public class DeadNodeHandler implements 
EventHandler<DatanodeDetails> {
       LOG.info("Clearing command queue of size {} for DN {}",
           cmdList.size(), datanodeDetails);
 
+      // remove DeleteBlocksCommand associated with the dead node unless it
+      // is IN_MAINTENANCE
+      if (deletedBlockLog != null &&
+          !nodeManager.getNodeStatus(datanodeDetails).isInMaintenance()) {
+        deletedBlockLog.onDatanodeDead(datanodeDetails.getUuid());
+      }
+
       //move dead datanode out of ClusterNetworkTopology
       NetworkTopology nt = nodeManager.getClusterNetworkTopologyMap();
       if (nt.contains(datanodeDetails)) {
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
index 011b361d62..399a7ef952 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
@@ -47,6 +47,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import java.util.Collection;
+import java.util.function.BiConsumer;
 
 /**
  * A node manager supports a simple interface for managing a datanode.
@@ -90,6 +91,18 @@ public interface NodeManager extends 
StorageContainerNodeProtocol,
         defaultLayoutVersionProto());
   }
 
+  /**
+   * Register a SendCommandNotify handler for a specific type of SCMCommand.
+   * @param type The type of the SCMCommand.
+   * @param scmCommand A BiConsumer that takes a DatanodeDetails and a
+   *                   SCMCommand object and performs the necessary actions.
+   * @return whatever the regular register command returns with default
+   * layout version passed in.
+   */
+  default void registerSendCommandNotify(SCMCommandProto.Type type,
+      BiConsumer<DatanodeDetails, SCMCommand<?>> scmCommand) {
+  }
+
   /**
    * Gets all Live Datanodes that are currently communicating with SCM.
    * @param nodeStatus - Status of the node to return
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
index 167b25afd0..972c061d5d 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
@@ -84,6 +84,7 @@ import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.BiConsumer;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
@@ -128,6 +129,8 @@ public class SCMNodeManager implements NodeManager {
   private final HDDSLayoutVersionManager scmLayoutVersionManager;
   private final EventPublisher scmNodeEventPublisher;
   private final SCMContext scmContext;
+  private final Map<SCMCommandProto.Type,
+      BiConsumer<DatanodeDetails, SCMCommand<?>>> sendCommandNotifyMap;
 
   /**
    * Lock used to synchronize some operation in Node manager to ensure a
@@ -179,6 +182,13 @@ public class SCMNodeManager implements NodeManager {
     String dnLimit = conf.get(ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT);
     this.heavyNodeCriteria = dnLimit == null ? 0 : Integer.parseInt(dnLimit);
     this.scmContext = scmContext;
+    this.sendCommandNotifyMap = new HashMap<>();
+  }
+
+  @Override
+  public void registerSendCommandNotify(SCMCommandProto.Type type,
+      BiConsumer<DatanodeDetails, SCMCommand<?>> scmCommand) {
+    this.sendCommandNotifyMap.put(type, scmCommand);
   }
 
   private void registerMXBean() {
@@ -521,6 +531,15 @@ public class SCMNodeManager implements NodeManager {
           commandQueue.getDatanodeCommandSummary(datanodeDetails.getUuid());
       List<SCMCommand> commands =
           commandQueue.getCommand(datanodeDetails.getUuid());
+
+      // Update the SCMCommand of deleteBlocksCommand Status
+      for (SCMCommand<?> command : commands) {
+        if (sendCommandNotifyMap.get(command.getType()) != null) {
+          sendCommandNotifyMap.get(command.getType())
+              .accept(datanodeDetails, command);
+        }
+      }
+
       if (queueReport != null) {
         processNodeCommandQueueReport(datanodeDetails, queueReport, summary);
       }
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index 722244d4c1..c000514ed3 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.conf.ReconfigurationHandler;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
 import 
org.apache.hadoop.hdds.protocolPB.SCMSecurityProtocolClientSideTranslatorPB;
 import org.apache.hadoop.hdds.scm.PipelineChoosePolicy;
 import org.apache.hadoop.hdds.scm.PlacementPolicy;
@@ -577,6 +578,9 @@ public final class StorageContainerManager extends 
ServiceRuntimeInfoImpl
     eventQueue.addHandler(SCMEvents.PIPELINE_REPORT, pipelineReportHandler);
     eventQueue.addHandler(SCMEvents.CRL_STATUS_REPORT, crlStatusReportHandler);
 
+    scmNodeManager.registerSendCommandNotify(
+        SCMCommandProto.Type.deleteBlocksCommand,
+        scmBlockManager.getDeletedBlockLog()::onSent);
   }
 
   private void initializeCertificateClient() throws IOException {
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java
index e1d15146d0..987b1ddbb9 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java
@@ -22,6 +22,8 @@ import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.client.RatisReplicationConfig;
 import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
@@ -49,6 +51,9 @@ import org.apache.hadoop.hdds.protocol.proto
     .DeleteBlockTransactionResult;
 import org.apache.hadoop.hdds.utils.db.Table;
 import org.apache.hadoop.hdds.utils.db.TableIterator;
+import org.apache.hadoop.ozone.protocol.commands.CommandStatus;
+import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
+import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
 import org.apache.ozone.test.GenericTestUtils;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Assertions;
@@ -62,6 +67,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -71,6 +77,7 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiConsumer;
 import java.util.stream.Collectors;
 
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys
@@ -255,7 +262,7 @@ public class TestDeletedBlockLog {
       List<DeleteBlockTransactionResult> transactionResults,
       DatanodeDetails... dns) throws IOException {
     for (DatanodeDetails dnDetails : dns) {
-      deletedBlockLog
+      deletedBlockLog.getSCMDeletedBlockTransactionStatusManager()
           .commitTransactions(transactionResults, dnDetails.getUuid());
     }
     scmHADBTransactionBuffer.flush();
@@ -284,15 +291,6 @@ public class TestDeletedBlockLog {
         .collect(Collectors.toList()));
   }
 
-  private void commitTransactions(DatanodeDeletedBlockTransactions
-      transactions) {
-    transactions.getDatanodeTransactionMap().forEach((uuid,
-        deletedBlocksTransactions) -> deletedBlockLog
-        .commitTransactions(deletedBlocksTransactions.stream()
-            .map(this::createDeleteBlockTransactionResult)
-            .collect(Collectors.toList()), uuid));
-  }
-
   private DeleteBlockTransactionResult createDeleteBlockTransactionResult(
       DeletedBlocksTransaction transaction) {
     return DeleteBlockTransactionResult.newBuilder()
@@ -315,6 +313,13 @@ public class TestDeletedBlockLog {
           transactions.getDatanodeTransactionMap().get(dn.getUuid()))
           .orElseGet(LinkedList::new));
     }
+    // Simulated transactions are sent
+    for (Map.Entry<UUID, List<DeletedBlocksTransaction>> entry :
+        transactions.getDatanodeTransactionMap().entrySet()) {
+      DeleteBlocksCommand command = new DeleteBlocksCommand(entry.getValue());
+      recordScmCommandToStatusManager(entry.getKey(), command);
+      sendSCMDeleteBlocksCommand(entry.getKey(), command);
+    }
     return txns;
   }
 
@@ -431,6 +436,9 @@ public class TestDeletedBlockLog {
     }
 
     // Increment for the reset transactions.
+    // Lets the SCM delete the transaction and wait for the DN reply
+    // to timeout, thus allowing the transaction to resend the
+    deletedBlockLog.setScmCommandTimeoutMs(-1L);
     incrementCount(txIDs);
     blocks = getAllTransactions();
     for (DeletedBlocksTransaction block : blocks) {
@@ -442,6 +450,7 @@ public class TestDeletedBlockLog {
 
   @Test
   public void testCommitTransactions() throws Exception {
+    deletedBlockLog.setScmCommandTimeoutMs(Long.MAX_VALUE);
     addTransactions(generateData(50), true);
     mockContainerHealthResult(true);
     List<DeletedBlocksTransaction> blocks =
@@ -458,6 +467,12 @@ public class TestDeletedBlockLog {
         DatanodeDetails.newBuilder().setUuid(UUID.randomUUID())
             .build());
 
+    blocks = getTransactions(50 * BLOCKS_PER_TXN * THREE);
+    // SCM will not repeat a transaction until it has timed out.
+    Assertions.assertEquals(0, blocks.size());
+    // Lets the SCM delete the transaction and wait for the DN reply
+    // to timeout, thus allowing the transaction to resend the
+    deletedBlockLog.setScmCommandTimeoutMs(-1L);
     blocks = getTransactions(50 * BLOCKS_PER_TXN * THREE);
     // only uncommitted dn have transactions
     Assertions.assertEquals(30, blocks.size());
@@ -467,6 +482,173 @@ public class TestDeletedBlockLog {
     Assertions.assertEquals(0, blocks.size());
   }
 
+  private void recordScmCommandToStatusManager(
+      UUID dnId, DeleteBlocksCommand command) {
+    Set<Long> dnTxSet = command.blocksTobeDeleted()
+        .stream().map(DeletedBlocksTransaction::getTxID)
+        .collect(Collectors.toSet());
+    deletedBlockLog.recordTransactionCreated(dnId, command.getId(), dnTxSet);
+  }
+
+  private void sendSCMDeleteBlocksCommand(UUID dnId, SCMCommand<?> scmCommand) 
{
+    deletedBlockLog.onSent(
+        DatanodeDetails.newBuilder().setUuid(dnId).build(), scmCommand);
+  }
+
+  private void assertNoDuplicateTransactions(
+      DatanodeDeletedBlockTransactions transactions1,
+      DatanodeDeletedBlockTransactions transactions2) {
+    Map<UUID, List<DeletedBlocksTransaction>> map1 =
+        transactions1.getDatanodeTransactionMap();
+    Map<UUID, List<DeletedBlocksTransaction>> map2 =
+        transactions2.getDatanodeTransactionMap();
+
+    for (Map.Entry<UUID, List<DeletedBlocksTransaction>> entry :
+        map1.entrySet()) {
+      UUID dnId = entry.getKey();
+      Set<DeletedBlocksTransaction> txSet1 = new HashSet<>(entry.getValue());
+      Set<DeletedBlocksTransaction> txSet2 = new HashSet<>(map2.get(dnId));
+
+      txSet1.retainAll(txSet2);
+      Assertions.assertEquals(0, txSet1.size(),
+          String.format("Duplicate Transactions found first transactions %s " +
+              "second transactions %s for Dn %s", txSet1, txSet2, dnId));
+    }
+  }
+
+
+  private void assertContainsAllTransactions(
+      DatanodeDeletedBlockTransactions transactions1,
+      DatanodeDeletedBlockTransactions transactions2) {
+    Map<UUID, List<DeletedBlocksTransaction>> map1 =
+        transactions1.getDatanodeTransactionMap();
+    Map<UUID, List<DeletedBlocksTransaction>> map2 =
+        transactions2.getDatanodeTransactionMap();
+
+    for (Map.Entry<UUID, List<DeletedBlocksTransaction>> entry :
+        map1.entrySet()) {
+      UUID dnId = entry.getKey();
+      Set<DeletedBlocksTransaction> txSet1 = new HashSet<>(entry.getValue());
+      Set<DeletedBlocksTransaction> txSet2 = new HashSet<>(map2.get(dnId));
+
+      Assertions.assertTrue(txSet1.containsAll(txSet2));
+    }
+  }
+
+  private void commitSCMCommandStatus(Long scmCmdId, UUID dnID,
+      StorageContainerDatanodeProtocolProtos.CommandStatus.Status status) {
+    List<StorageContainerDatanodeProtocolProtos
+        .CommandStatus> deleteBlockStatus = new ArrayList<>();
+    deleteBlockStatus.add(CommandStatus.CommandStatusBuilder.newBuilder()
+        .setCmdId(scmCmdId)
+        .setType(Type.deleteBlocksCommand)
+        .setStatus(status)
+        .build()
+        .getProtoBufMessage());
+
+    deletedBlockLog.getSCMDeletedBlockTransactionStatusManager()
+        .commitSCMCommandStatus(deleteBlockStatus, dnID);
+  }
+
+  private void createDeleteBlocksCommandAndAction(
+      DatanodeDeletedBlockTransactions transactions,
+      BiConsumer<UUID, DeleteBlocksCommand> afterCreate) {
+    for (Map.Entry<UUID, List<DeletedBlocksTransaction>> entry :
+        transactions.getDatanodeTransactionMap().entrySet()) {
+      UUID dnId = entry.getKey();
+      List<DeletedBlocksTransaction> dnTXs = entry.getValue();
+      DeleteBlocksCommand command = new DeleteBlocksCommand(dnTXs);
+      afterCreate.accept(dnId, command);
+    }
+  }
+
+  @Test
+  public void testNoDuplicateTransactionsForInProcessingSCMCommand()
+      throws Exception {
+    // The SCM will not resend these transactions in blow case:
+    // - If the command has not been sent;
+    // - The DN does not report the status of the command via heartbeat
+    //   After the command is sent;
+    // - If the DN reports the command status as PENDING;
+    addTransactions(generateData(10), true);
+    int blockLimit = 2 * BLOCKS_PER_TXN * THREE;
+    mockContainerHealthResult(true);
+
+    // If the command has not been sent
+    DatanodeDeletedBlockTransactions transactions1 =
+        deletedBlockLog.getTransactions(blockLimit, new HashSet<>(dnList));
+    createDeleteBlocksCommandAndAction(transactions1,
+        this::recordScmCommandToStatusManager);
+
+    // - The DN does not report the status of the command via heartbeat
+    //   After the command is sent
+    DatanodeDeletedBlockTransactions transactions2 =
+        deletedBlockLog.getTransactions(blockLimit, new HashSet<>(dnList));
+    assertNoDuplicateTransactions(transactions1, transactions2);
+    createDeleteBlocksCommandAndAction(transactions2, (dnId, command) -> {
+      recordScmCommandToStatusManager(dnId, command);
+      sendSCMDeleteBlocksCommand(dnId, command);
+    });
+
+    // - If the DN reports the command status as PENDING
+    DatanodeDeletedBlockTransactions transactions3 =
+        deletedBlockLog.getTransactions(blockLimit, new HashSet<>(dnList));
+    assertNoDuplicateTransactions(transactions1, transactions3);
+    createDeleteBlocksCommandAndAction(transactions3, (dnId, command) -> {
+      recordScmCommandToStatusManager(dnId, command);
+      sendSCMDeleteBlocksCommand(dnId, command);
+      commitSCMCommandStatus(command.getId(), dnId,
+          StorageContainerDatanodeProtocolProtos.CommandStatus.Status.PENDING);
+    });
+    assertNoDuplicateTransactions(transactions3, transactions1);
+    assertNoDuplicateTransactions(transactions3, transactions2);
+
+    DatanodeDeletedBlockTransactions transactions4 =
+        deletedBlockLog.getTransactions(blockLimit, new HashSet<>(dnList));
+    assertNoDuplicateTransactions(transactions4, transactions1);
+    assertNoDuplicateTransactions(transactions4, transactions2);
+    assertNoDuplicateTransactions(transactions4, transactions3);
+  }
+
+  @Test
+  public void testFailedAndTimeoutSCMCommandCanBeResend() throws Exception {
+    // The SCM will be resent these transactions in blow case:
+    // - Executed failed commands;
+    // - DN does not refresh the PENDING state for more than a period of time;
+    deletedBlockLog.setScmCommandTimeoutMs(Long.MAX_VALUE);
+    addTransactions(generateData(10), true);
+    int blockLimit = 2 * BLOCKS_PER_TXN * THREE;
+    mockContainerHealthResult(true);
+
+    // - DN does not refresh the PENDING state for more than a period of time;
+    DatanodeDeletedBlockTransactions transactions =
+        deletedBlockLog.getTransactions(blockLimit, new HashSet<>(dnList));
+    createDeleteBlocksCommandAndAction(transactions, (dnId, command) -> {
+      recordScmCommandToStatusManager(dnId, command);
+      sendSCMDeleteBlocksCommand(dnId, command);
+      commitSCMCommandStatus(command.getId(), dnId,
+          StorageContainerDatanodeProtocolProtos.CommandStatus.Status.PENDING);
+    });
+
+    // - Executed failed commands;
+    DatanodeDeletedBlockTransactions transactions2 =
+        deletedBlockLog.getTransactions(blockLimit, new HashSet<>(dnList));
+    createDeleteBlocksCommandAndAction(transactions2, (dnId, command) -> {
+      recordScmCommandToStatusManager(dnId, command);
+      sendSCMDeleteBlocksCommand(dnId, command);
+      commitSCMCommandStatus(command.getId(), dnId,
+          StorageContainerDatanodeProtocolProtos.CommandStatus.Status.FAILED);
+    });
+
+    deletedBlockLog.setScmCommandTimeoutMs(-1L);
+    DatanodeDeletedBlockTransactions transactions3 =
+        deletedBlockLog.getTransactions(Integer.MAX_VALUE,
+            new HashSet<>(dnList));
+    assertNoDuplicateTransactions(transactions, transactions2);
+    assertContainsAllTransactions(transactions3, transactions);
+    assertContainsAllTransactions(transactions3, transactions2);
+  }
+
   @Test
   public void testDNOnlyOneNodeHealthy() throws Exception {
     Map<Long, List<Long>> deletedBlocks = generateData(50);
@@ -496,9 +678,7 @@ public class TestDeletedBlockLog {
     // For the first 30 txn, deletedBlockLog only has the txn from dn1 and dn2
     // For the rest txn, txn will be got from all dns.
     // Committed txn will be: 1-40. 1-40. 31-40
-    commitTransactions(deletedBlockLog.getTransactions(
-        30 * BLOCKS_PER_TXN * THREE,
-        dnList.stream().collect(Collectors.toSet())));
+    commitTransactions(getTransactions(30 * BLOCKS_PER_TXN * THREE));
 
     // The rest txn shall be: 41-50. 41-50. 41-50
     List<DeletedBlocksTransaction> blocks = getAllTransactions();
@@ -590,6 +770,7 @@ public class TestDeletedBlockLog {
   @Test
   public void testDeletedBlockTransactions()
       throws IOException, TimeoutException {
+    deletedBlockLog.setScmCommandTimeoutMs(Long.MAX_VALUE);
     mockContainerHealthResult(true);
     int txNum = 10;
     List<DeletedBlocksTransaction> blocks;
@@ -622,9 +803,21 @@ public class TestDeletedBlockLog {
     // add two transactions for same container
     containerID = blocks.get(0).getContainerID();
     Map<Long, List<Long>> deletedBlocksMap = new HashMap<>();
-    deletedBlocksMap.put(containerID, new LinkedList<>());
+    Random random = new Random();
+    long localId = random.nextLong();
+    deletedBlocksMap.put(containerID, new LinkedList<>(
+        Collections.singletonList(localId)));
     addTransactions(deletedBlocksMap, true);
+    blocks = getTransactions(txNum * BLOCKS_PER_TXN * ONE);
+    // Only newly added Blocks will be sent, as previously sent transactions
+    // that have not yet timed out will not be sent.
+    Assertions.assertEquals(1, blocks.size());
+    Assertions.assertEquals(1, blocks.get(0).getLocalIDCount());
+    Assertions.assertEquals(blocks.get(0).getLocalID(0), localId);
 
+    // Lets the SCM delete the transaction and wait for the DN reply
+    // to timeout, thus allowing the transaction to resend the
+    deletedBlockLog.setScmCommandTimeoutMs(-1L);
     // get should return two transactions for the same container
     blocks = getTransactions(txNum * BLOCKS_PER_TXN * ONE);
     Assertions.assertEquals(2, blocks.size());
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestSCMDeleteBlocksCommandStatusManager.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestSCMDeleteBlocksCommandStatusManager.java
new file mode 100644
index 0000000000..888cb42fd7
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestSCMDeleteBlocksCommandStatusManager.java
@@ -0,0 +1,256 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.block;
+
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.BeforeEach;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+import static 
org.apache.hadoop.hdds.scm.block.SCMDeletedBlockTransactionStatusManager.SCMDeleteBlocksCommandStatusManager;
+import static 
org.apache.hadoop.hdds.scm.block.SCMDeletedBlockTransactionStatusManager.SCMDeleteBlocksCommandStatusManager.CmdStatus.SENT;
+import static 
org.apache.hadoop.hdds.scm.block.SCMDeletedBlockTransactionStatusManager.SCMDeleteBlocksCommandStatusManager.CmdStatus.TO_BE_SENT;
+import static 
org.apache.hadoop.hdds.scm.block.SCMDeletedBlockTransactionStatusManager.SCMDeleteBlocksCommandStatusManager.CmdStatusData;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+/**
+ * A test for SCMDeleteBlocksCommandStatusManager.
+ */
+public class TestSCMDeleteBlocksCommandStatusManager {
+
+  private SCMDeleteBlocksCommandStatusManager manager;
+  private UUID dnId1;
+  private UUID dnId2;
+  private long scmCmdId1;
+  private long scmCmdId2;
+  private long scmCmdId3;
+  private long scmCmdId4;
+  private Set<Long> deletedBlocksTxIds1;
+  private Set<Long> deletedBlocksTxIds2;
+  private Set<Long> deletedBlocksTxIds3;
+  private Set<Long> deletedBlocksTxIds4;
+
+  @BeforeEach
+  public void setup() throws Exception {
+    manager = new SCMDeleteBlocksCommandStatusManager();
+    // Create test data
+    dnId1 = UUID.randomUUID();
+    dnId2 = UUID.randomUUID();
+    scmCmdId1 = 1L;
+    scmCmdId2 = 2L;
+    scmCmdId3 = 3L;
+    scmCmdId4 = 4L;
+    deletedBlocksTxIds1 = new HashSet<>();
+    deletedBlocksTxIds1.add(100L);
+    deletedBlocksTxIds2 = new HashSet<>();
+    deletedBlocksTxIds2.add(200L);
+    deletedBlocksTxIds3 = new HashSet<>();
+    deletedBlocksTxIds3.add(300L);
+    deletedBlocksTxIds4 = new HashSet<>();
+    deletedBlocksTxIds4.add(400L);
+  }
+
+  @Test
+  public void testRecordScmCommand() {
+    CmdStatusData statusData =
+        SCMDeleteBlocksCommandStatusManager.createScmCmdStatusData(
+            dnId1, scmCmdId1, deletedBlocksTxIds1);
+
+    manager.recordScmCommand(statusData);
+
+    assertNotNull(manager.getScmCmdStatusRecord().get(dnId1));
+    assertEquals(1, manager.getScmCmdStatusRecord().get(dnId1).size());
+    CmdStatusData cmdStatusData =
+        manager.getScmCmdStatusRecord().get(dnId1).get(scmCmdId1);
+    assertNotNull(cmdStatusData);
+    assertEquals(dnId1, statusData.getDnId());
+    assertEquals(scmCmdId1, statusData.getScmCmdId());
+    assertEquals(deletedBlocksTxIds1, statusData.getDeletedBlocksTxIds());
+    // The default status is `CmdStatus.TO_BE_SENT`
+    assertEquals(TO_BE_SENT, statusData.getStatus());
+  }
+
+  @Test
+  public void testOnSent() {
+    CmdStatusData statusData =
+        SCMDeleteBlocksCommandStatusManager.createScmCmdStatusData(
+            dnId1, scmCmdId1, deletedBlocksTxIds1);
+    manager.recordScmCommand(statusData);
+
+    Map<Long, CmdStatusData> dnStatusRecord =
+        manager.getScmCmdStatusRecord().get(dnId1);
+    // After the Command is sent by SCM, the status of the Command
+    // will change from TO_BE_SENT to SENT
+    assertEquals(TO_BE_SENT, dnStatusRecord.get(scmCmdId1).getStatus());
+    manager.onSent(dnId1, scmCmdId1);
+    assertEquals(SENT, dnStatusRecord.get(scmCmdId1).getStatus());
+  }
+
+  @Test
+  public void testUpdateStatusByDNCommandStatus() {
+    // Test all Status update by Datanode Heartbeat report.
+    //  SENT -> PENDING_EXECUTED: The DeleteBlocksCommand is sent and received
+    //  by the Datanode, but the command is not executed by the Datanode,
+    //  the command is waiting to be executed.
+
+    //  SENT -> NEED_RESEND: The DeleteBlocksCommand is sent and lost before
+    //  it is received by the DN.
+    //  SENT -> EXECUTED: The DeleteBlocksCommand has been sent to Datanode,
+    //  executed by DN, and executed successfully.
+    //
+    //  PENDING_EXECUTED -> PENDING_EXECUTED: The DeleteBlocksCommand continues
+    //  to wait to be executed by Datanode.
+    //  PENDING_EXECUTED -> NEED_RESEND: The DeleteBlocksCommand waited for a
+    //  while and was executed, but the execution failed; Or the
+    //  DeleteBlocksCommand was lost while waiting(such as the Datanode 
restart)
+    //
+    //  PENDING_EXECUTED -> EXECUTED: The Command waits for a period of
+    //  time on the DN and is executed successfully.
+
+    recordAndSentCommand(manager, dnId1,
+        Arrays.asList(scmCmdId1, scmCmdId2, scmCmdId3, scmCmdId4),
+        Arrays.asList(deletedBlocksTxIds1, deletedBlocksTxIds2,
+            deletedBlocksTxIds3, deletedBlocksTxIds4));
+
+    Map<Long, CmdStatusData> dnStatusRecord =
+        manager.getScmCmdStatusRecord().get(dnId1);
+    assertEquals(SENT, dnStatusRecord.get(scmCmdId1).getStatus());
+    assertEquals(SENT, dnStatusRecord.get(scmCmdId2).getStatus());
+    assertEquals(SENT, dnStatusRecord.get(scmCmdId3).getStatus());
+    assertEquals(SENT, dnStatusRecord.get(scmCmdId4).getStatus());
+
+    // SENT -> PENDING_EXECUTED
+    manager.updateStatusByDNCommandStatus(dnId1, scmCmdId1,
+        StorageContainerDatanodeProtocolProtos.CommandStatus.Status.PENDING);
+    // SENT -> EXECUTED
+    manager.updateStatusByDNCommandStatus(dnId1, scmCmdId2,
+        StorageContainerDatanodeProtocolProtos.CommandStatus.Status.EXECUTED);
+    // SENT -> NEED_RESEND
+    manager.updateStatusByDNCommandStatus(dnId1, scmCmdId3,
+        StorageContainerDatanodeProtocolProtos.CommandStatus.Status.FAILED);
+    // SENT -> PENDING_EXECUTED
+    manager.updateStatusByDNCommandStatus(dnId1, scmCmdId4,
+        StorageContainerDatanodeProtocolProtos.CommandStatus.Status.PENDING);
+
+    assertEquals(SENT, dnStatusRecord.get(scmCmdId1).getStatus());
+    assertNull(dnStatusRecord.get(scmCmdId2));
+    assertNull(dnStatusRecord.get(scmCmdId3));
+    assertEquals(SENT, dnStatusRecord.get(scmCmdId4).getStatus());
+  }
+
+  @Test
+  public void testCleanSCMCommandForDn() {
+    // Transactions in states EXECUTED and NEED_RESEND will be cleaned up
+    // directly, while transactions in states PENDING_EXECUTED and SENT
+    // will be cleaned up after timeout
+    recordAndSentCommand(manager, dnId1,
+        Arrays.asList(scmCmdId1, scmCmdId2, scmCmdId3, scmCmdId4),
+        Arrays.asList(deletedBlocksTxIds1, deletedBlocksTxIds2,
+            deletedBlocksTxIds3, deletedBlocksTxIds4));
+
+    // SENT -> PENDING_EXECUTED
+    manager.updateStatusByDNCommandStatus(dnId1, scmCmdId1,
+        StorageContainerDatanodeProtocolProtos.CommandStatus.Status.PENDING);
+    // SENT -> EXECUTED
+    manager.updateStatusByDNCommandStatus(dnId1, scmCmdId2,
+        StorageContainerDatanodeProtocolProtos.CommandStatus.Status.EXECUTED);
+    // SENT -> NEED_RESEND
+    manager.updateStatusByDNCommandStatus(dnId1, scmCmdId3,
+        StorageContainerDatanodeProtocolProtos.CommandStatus.Status.FAILED);
+
+    Map<Long, CmdStatusData> dnStatusRecord =
+        manager.getScmCmdStatusRecord().get(dnId1);
+    assertNotNull(dnStatusRecord.get(scmCmdId1));
+    assertNull(dnStatusRecord.get(scmCmdId2));
+    assertNull(dnStatusRecord.get(scmCmdId3));
+    assertNotNull(dnStatusRecord.get(scmCmdId4));
+
+    manager.cleanTimeoutSCMCommand(dnId1, Long.MAX_VALUE);
+
+    // scmCmdId1 is PENDING_EXECUTED will be cleaned up after timeout
+    assertNotNull(dnStatusRecord.get(scmCmdId1));
+    assertNull(dnStatusRecord.get(scmCmdId3));
+    assertNull(dnStatusRecord.get(scmCmdId2));
+    // scmCmdId4 is SENT will be cleaned up after timeout
+    assertNotNull(dnStatusRecord.get(scmCmdId4));
+
+    manager.cleanTimeoutSCMCommand(dnId1, -1);
+    assertNull(dnStatusRecord.get(scmCmdId1));
+    assertNull(dnStatusRecord.get(scmCmdId4));
+  }
+
+  @Test
+  public void testCleanAllTimeoutSCMCommand() {
+    // Test All EXECUTED and NEED_RESEND status in the DN will be cleaned up
+
+    // Transactions in states EXECUTED and NEED_RESEND will be cleaned up
+    // directly, while transactions in states PENDING_EXECUTED and SENT
+    // will be cleaned up after timeout
+    recordAndSentCommand(manager, dnId1, Arrays.asList(scmCmdId1),
+        Arrays.asList(deletedBlocksTxIds1));
+    recordAndSentCommand(manager, dnId2, Arrays.asList(scmCmdId2),
+        Arrays.asList(deletedBlocksTxIds2));
+
+    Map<Long, CmdStatusData> dn1StatusRecord =
+        manager.getScmCmdStatusRecord().get(dnId1);
+    Map<Long, CmdStatusData> dn2StatusRecord =
+        manager.getScmCmdStatusRecord().get(dnId2);
+
+    // Only let the scmCmdId1 have a Heartbeat report, its status will be
+    // updated, the scmCmdId2 still in SENT status.
+    // SENT -> PENDING_EXECUTED
+    manager.updateStatusByDNCommandStatus(dnId1, scmCmdId1,
+        StorageContainerDatanodeProtocolProtos.CommandStatus.Status.PENDING);
+
+    manager.cleanAllTimeoutSCMCommand(Long.MAX_VALUE);
+    // scmCmdId1 is PENDING_EXECUTED will be cleaned up after timeout
+    assertNotNull(dn1StatusRecord.get(scmCmdId1));
+    assertNotNull(dn2StatusRecord.get(scmCmdId2));
+
+    // scmCmdId2 is SENT will be cleaned up after timeout
+    manager.cleanAllTimeoutSCMCommand(-1);
+    assertNull(dn1StatusRecord.get(scmCmdId1));
+    assertNull(dn2StatusRecord.get(scmCmdId2));
+
+  }
+
+  private void recordAndSentCommand(
+      SCMDeleteBlocksCommandStatusManager statusManager,
+      UUID dnId, List<Long> scmCmdIds, List<Set<Long>> txIds) {
+    assertEquals(scmCmdIds.size(), txIds.size());
+    for (int i = 0; i < scmCmdIds.size(); i++) {
+      long scmCmdId = scmCmdIds.get(i);
+      Set<Long> deletedBlocksTxIds = txIds.get(i);
+      CmdStatusData statusData =
+          SCMDeleteBlocksCommandStatusManager.createScmCmdStatusData(
+              dnId, scmCmdId, deletedBlocksTxIds);
+      statusManager.recordScmCommand(statusData);
+      statusManager.onSent(dnId, scmCmdId);
+    }
+  }
+
+}
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java
index 168fdd11a5..0f65cdb108 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java
@@ -47,6 +47,7 @@ import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.StorageReportProto;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.HddsTestUtils;
+import org.apache.hadoop.hdds.scm.block.DeletedBlockLog;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.ContainerManager;
 import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
@@ -90,6 +91,7 @@ public class TestDeadNodeHandler {
   private EventQueue eventQueue;
   private String storageDir;
   private SCMContext scmContext;
+  private DeletedBlockLog deletedBlockLog;
 
   @BeforeEach
   public void setup() throws IOException, AuthenticationException {
@@ -117,8 +119,9 @@ public class TestDeadNodeHandler {
     pipelineManager.setPipelineProvider(RATIS,
         mockRatisProvider);
     containerManager = scm.getContainerManager();
+    deletedBlockLog = Mockito.mock(DeletedBlockLog.class);
     deadNodeHandler = new DeadNodeHandler(nodeManager,
-        Mockito.mock(PipelineManager.class), containerManager);
+        Mockito.mock(PipelineManager.class), containerManager, 
deletedBlockLog);
     healthyReadOnlyNodeHandler =
         new HealthyReadOnlyNodeHandler(nodeManager,
             pipelineManager);
@@ -134,6 +137,7 @@ public class TestDeadNodeHandler {
   }
 
   @Test
+  @SuppressWarnings("checkstyle:MethodLength")
   public void testOnMessage() throws Exception {
     //GIVEN
     DatanodeDetails datanode1 = MockDatanodeDetails.randomDatanodeDetails();
@@ -233,6 +237,9 @@ public class TestDeadNodeHandler {
     Assertions.assertFalse(
         nodeManager.getClusterNetworkTopologyMap().contains(datanode1));
 
+    Mockito.verify(deletedBlockLog, Mockito.times(0))
+        .onDatanodeDead(datanode1.getUuid());
+
     Set<ContainerReplica> container1Replicas = containerManager
         
.getContainerReplicas(ContainerID.valueOf(container1.getContainerID()));
     Assertions.assertEquals(2, container1Replicas.size());
@@ -260,6 +267,9 @@ public class TestDeadNodeHandler {
     Assertions.assertEquals(0, 
         nodeManager.getCommandQueueCount(datanode1.getUuid(), cmd.getType()));
 
+    Mockito.verify(deletedBlockLog, Mockito.times(1))
+        .onDatanodeDead(datanode1.getUuid());
+
     container1Replicas = containerManager
         
.getContainerReplicas(ContainerID.valueOf(container1.getContainerID()));
     Assertions.assertEquals(1, container1Replicas.size());
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
index e3da551c3e..a3decb0efb 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
@@ -954,16 +954,17 @@ public class TestSCMNodeManager {
 
   @Test
   public void testProcessCommandQueueReport()
-      throws IOException, NodeNotFoundException {
+      throws IOException, NodeNotFoundException, AuthenticationException {
     OzoneConfiguration conf = new OzoneConfiguration();
     SCMStorageConfig scmStorageConfig = mock(SCMStorageConfig.class);
     when(scmStorageConfig.getClusterID()).thenReturn("xyz111");
     EventPublisher eventPublisher = mock(EventPublisher.class);
     HDDSLayoutVersionManager lvm  =
         new HDDSLayoutVersionManager(scmStorageConfig.getLayoutVersion());
+    createNodeManager(getConf());
     SCMNodeManager nodeManager  = new SCMNodeManager(conf,
         scmStorageConfig, eventPublisher, new NetworkTopologyImpl(conf),
-        SCMContext.emptyContext(), lvm);
+        scmContext, lvm);
     LayoutVersionProto layoutInfo = toLayoutVersionProto(
         lvm.getMetadataLayoutVersion(), lvm.getSoftwareLayoutVersion());
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to