This is an automated email from the ASF dual-hosted git repository.
sumitagrawal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 88e18e3ecc HDDS-8882. Manage status of DeleteBlocksCommand in SCM to
avoid sending duplicates to Datanode (#4988)
88e18e3ecc is described below
commit 88e18e3ecc7f6e1d9b4ecacf269e72fdd192ca95
Author: XiChen <[email protected]>
AuthorDate: Mon Dec 18 21:01:24 2023 +0800
HDDS-8882. Manage status of DeleteBlocksCommand in SCM to avoid sending
duplicates to Datanode (#4988)
---
.../report/CommandStatusReportPublisher.java | 2 +-
.../common/report/TestReportPublisher.java | 4 +-
.../hadoop/hdds/scm/block/DeletedBlockLog.java | 37 +-
.../hadoop/hdds/scm/block/DeletedBlockLogImpl.java | 238 ++++-----
.../hdds/scm/block/SCMBlockDeletingService.java | 10 +-
.../SCMDeletedBlockTransactionStatusManager.java | 581 +++++++++++++++++++++
.../scm/command/CommandStatusReportHandler.java | 36 +-
.../hadoop/hdds/scm/node/DeadNodeHandler.java | 20 +
.../apache/hadoop/hdds/scm/node/NodeManager.java | 13 +
.../hadoop/hdds/scm/node/SCMNodeManager.java | 19 +
.../hdds/scm/server/StorageContainerManager.java | 4 +
.../hadoop/hdds/scm/block/TestDeletedBlockLog.java | 221 +++++++-
.../TestSCMDeleteBlocksCommandStatusManager.java | 256 +++++++++
.../hadoop/hdds/scm/node/TestDeadNodeHandler.java | 12 +-
.../hadoop/hdds/scm/node/TestSCMNodeManager.java | 5 +-
15 files changed, 1268 insertions(+), 190 deletions(-)
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/CommandStatusReportPublisher.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/CommandStatusReportPublisher.java
index 19fde71468..ee08c9f79d 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/CommandStatusReportPublisher.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/CommandStatusReportPublisher.java
@@ -78,9 +78,9 @@ public class CommandStatusReportPublisher extends
// If status is still pending then don't remove it from map as
// CommandHandler will change its status when it works on this command.
if (!cmdStatus.getStatus().equals(Status.PENDING)) {
- builder.addCmdStatus(cmdStatus.getProtoBufMessage());
map.remove(key);
}
+ builder.addCmdStatus(cmdStatus.getProtoBufMessage());
});
return builder.getCmdStatusCount() > 0 ? builder.build() : null;
}
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java
index 9fb9c7251c..42529cabc7 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java
@@ -173,8 +173,8 @@ public class TestReportPublisher {
.build();
cmdStatusMap.put(obj1.getCmdId(), obj1);
cmdStatusMap.put(obj2.getCmdId(), obj2);
- // We are not sending the commands whose status is PENDING.
- Assertions.assertEquals(1,
+ // We will sending the commands whose status is PENDING and EXECUTED
+ Assertions.assertEquals(2,
((CommandStatusReportPublisher) publisher).getReport()
.getCmdStatusCount(),
"Should publish report with 2 status objects");
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLog.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLog.java
index cb9cf603b1..45d53c0ef2 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLog.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLog.java
@@ -19,12 +19,10 @@ package org.apache.hadoop.hdds.scm.block;
import java.util.Set;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto
- .DeleteBlockTransactionResult;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import java.io.Closeable;
import java.io.IOException;
@@ -88,14 +86,33 @@ public interface DeletedBlockLog extends Closeable {
int resetCount(List<Long> txIDs) throws IOException;
/**
- * Commits a transaction means to delete all footprints of a transaction
- * from the log. This method doesn't guarantee all transactions can be
- * successfully deleted, it tolerate failures and tries best efforts to.
- * @param transactionResults - delete block transaction results.
- * @param dnID - ID of datanode which acknowledges the delete block command.
+ * Records the creation of a transaction for a DataNode.
+ *
+ * @param dnId The identifier of the DataNode.
+ * @param scmCmdId The ID of the SCM command.
+ * @param dnTxSet Set of transaction IDs for the DataNode.
+ */
+ void recordTransactionCreated(
+ UUID dnId, long scmCmdId, Set<Long> dnTxSet);
+
+ /**
+ * Handles the cleanup process when a DataNode is reported dead. This method
+ * is responsible for updating or cleaning up the transaction records
+ * associated with the dead DataNode.
+ *
+ * @param dnId The identifier of the dead DataNode.
+ */
+ void onDatanodeDead(UUID dnId);
+
+ /**
+ * Records the event of sending a block deletion command to a DataNode. This
+ * method is called when a command is successfully dispatched to a DataNode,
+ * and it helps in tracking the status of the command.
+ *
+ * @param dnId Details of the DataNode.
+ * @param scmCommand The block deletion command sent.
*/
- void commitTransactions(List<DeleteBlockTransactionResult>
transactionResults,
- UUID dnID);
+ void onSent(DatanodeDetails dnId, SCMCommand<?> scmCommand);
/**
* Creates block deletion transactions for a set of containers,
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
index 8e2d014916..ac64f6e973 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
@@ -18,25 +18,25 @@
package org.apache.hadoop.hdds.scm.block;
import java.io.IOException;
+import java.time.Duration;
import java.util.HashSet;
import java.util.List;
import java.util.UUID;
import java.util.Set;
import java.util.Map;
-import java.util.LinkedHashSet;
import java.util.ArrayList;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatus;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto.DeleteBlockTransactionResult;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
-import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatus;
import
org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler.DeleteBlockStatus;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerManager;
@@ -55,11 +55,12 @@ import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.hdds.utils.db.TableIterator;
import com.google.common.collect.Lists;
-import static java.lang.Math.min;
import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_BLOCK_DELETION_MAX_RETRY;
import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_BLOCK_DELETION_MAX_RETRY_DEFAULT;
+import static
org.apache.hadoop.hdds.scm.block.SCMDeletedBlockTransactionStatusManager.SCMDeleteBlocksCommandStatusManager.CmdStatus;
import static org.apache.hadoop.hdds.scm.ha.SequenceIdGenerator.DEL_TXN_ID;
+import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -82,16 +83,15 @@ public class DeletedBlockLogImpl
private final int maxRetry;
private final ContainerManager containerManager;
private final Lock lock;
- // Maps txId to set of DNs which are successful in committing the transaction
- private final Map<Long, Set<UUID>> transactionToDNsCommitMap;
- // Maps txId to its retry counts;
- private final Map<Long, Integer> transactionToRetryCountMap;
// The access to DeletedBlocksTXTable is protected by
// DeletedBlockLogStateManager.
private final DeletedBlockLogStateManager deletedBlockLogStateManager;
private final SCMContext scmContext;
private final SequenceIdGenerator sequenceIdGen;
private final ScmBlockDeletingServiceMetrics metrics;
+ private final SCMDeletedBlockTransactionStatusManager
+ transactionStatusManager;
+ private long scmCommandTimeoutMs = Duration.ofSeconds(300).toMillis();
private static final int LIST_ALL_FAILED_TRANSACTIONS = -1;
@@ -109,12 +109,6 @@ public class DeletedBlockLogImpl
this.containerManager = containerManager;
this.lock = new ReentrantLock();
- // transactionToDNsCommitMap is updated only when
- // transaction is added to the log and when it is removed.
-
- // maps transaction to dns which have committed it.
- transactionToDNsCommitMap = new ConcurrentHashMap<>();
- transactionToRetryCountMap = new ConcurrentHashMap<>();
this.deletedBlockLogStateManager = DeletedBlockLogStateManagerImpl
.newBuilder()
.setConfiguration(conf)
@@ -126,6 +120,9 @@ public class DeletedBlockLogImpl
this.scmContext = scmContext;
this.sequenceIdGen = sequenceIdGen;
this.metrics = metrics;
+ this.transactionStatusManager =
+ new
SCMDeletedBlockTransactionStatusManager(deletedBlockLogStateManager,
+ containerManager, scmContext, metrics, scmCommandTimeoutMs);
}
@Override
@@ -170,25 +167,7 @@ public class DeletedBlockLogImpl
throws IOException {
lock.lock();
try {
- ArrayList<Long> txIDsToUpdate = new ArrayList<>();
- for (Long txID : txIDs) {
- int currentCount =
- transactionToRetryCountMap.getOrDefault(txID, 0);
- if (currentCount > maxRetry) {
- continue;
- } else {
- currentCount += 1;
- if (currentCount > maxRetry) {
- txIDsToUpdate.add(txID);
- }
- transactionToRetryCountMap.put(txID, currentCount);
- }
- }
-
- if (!txIDsToUpdate.isEmpty()) {
- deletedBlockLogStateManager
- .increaseRetryCountOfTransactionInDB(txIDsToUpdate);
- }
+ transactionStatusManager.incrementRetryCount(txIDs, maxRetry);
} finally {
lock.unlock();
}
@@ -207,9 +186,7 @@ public class DeletedBlockLogImpl
.map(DeletedBlocksTransaction::getTxID)
.collect(Collectors.toList());
}
- for (Long txID: txIDs) {
- transactionToRetryCountMap.computeIfPresent(txID, (key, value) -> 0);
- }
+ transactionStatusManager.resetRetryCount(txIDs);
return deletedBlockLogStateManager.resetRetryCountOfTransactionInDB(
new ArrayList<>(new HashSet<>(txIDs)));
} finally {
@@ -227,89 +204,6 @@ public class DeletedBlockLogImpl
.build();
}
- /**
- * {@inheritDoc}
- *
- * @param transactionResults - transaction IDs.
- * @param dnID - Id of Datanode which has acknowledged
- * a delete block command.
- * @throws IOException
- */
- @Override
- public void commitTransactions(
- List<DeleteBlockTransactionResult> transactionResults, UUID dnID) {
- lock.lock();
- try {
- ArrayList<Long> txIDsToBeDeleted = new ArrayList<>();
- Set<UUID> dnsWithCommittedTxn;
- for (DeleteBlockTransactionResult transactionResult :
- transactionResults) {
- if (isTransactionFailed(transactionResult)) {
- metrics.incrBlockDeletionTransactionFailure();
- continue;
- }
- try {
- metrics.incrBlockDeletionTransactionSuccess();
- long txID = transactionResult.getTxID();
- // set of dns which have successfully committed transaction txId.
- dnsWithCommittedTxn = transactionToDNsCommitMap.get(txID);
- final ContainerID containerId = ContainerID.valueOf(
- transactionResult.getContainerID());
- if (dnsWithCommittedTxn == null) {
- // Mostly likely it's a retried delete command response.
- if (LOG.isDebugEnabled()) {
- LOG.debug(
- "Transaction txId={} commit by dnId={} for containerID={}"
- + " failed. Corresponding entry not found.", txID, dnID,
- containerId);
- }
- continue;
- }
-
- dnsWithCommittedTxn.add(dnID);
- final ContainerInfo container =
- containerManager.getContainer(containerId);
- final Set<ContainerReplica> replicas =
- containerManager.getContainerReplicas(containerId);
- // The delete entry can be safely removed from the log if all the
- // corresponding nodes commit the txn. It is required to check that
- // the nodes returned in the pipeline match the replication factor.
- if (min(replicas.size(), dnsWithCommittedTxn.size())
- >= container.getReplicationConfig().getRequiredNodes()) {
- List<UUID> containerDns = replicas.stream()
- .map(ContainerReplica::getDatanodeDetails)
- .map(DatanodeDetails::getUuid)
- .collect(Collectors.toList());
- if (dnsWithCommittedTxn.containsAll(containerDns)) {
- transactionToDNsCommitMap.remove(txID);
- transactionToRetryCountMap.remove(txID);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Purging txId={} from block deletion log", txID);
- }
- txIDsToBeDeleted.add(txID);
- }
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("Datanode txId={} containerId={} committed by dnId={}",
- txID, containerId, dnID);
- }
- } catch (IOException e) {
- LOG.warn("Could not commit delete block transaction: " +
- transactionResult.getTxID(), e);
- }
- }
- try {
- deletedBlockLogStateManager.removeTransactionsFromDB(txIDsToBeDeleted);
- metrics.incrBlockDeletionTransactionCompleted(txIDsToBeDeleted.size());
- } catch (IOException e) {
- LOG.warn("Could not commit delete block transactions: "
- + txIDsToBeDeleted, e);
- }
- } finally {
- lock.unlock();
- }
- }
-
private boolean isTransactionFailed(DeleteBlockTransactionResult result) {
if (LOG.isDebugEnabled()) {
LOG.debug(
@@ -348,7 +242,7 @@ public class DeletedBlockLogImpl
@Override
public void reinitialize(
Table<Long, DeletedBlocksTransaction> deletedTable) {
- // we don't need handle transactionToDNsCommitMap and
+ // we don't need to handle SCMDeletedBlockTransactionStatusManager and
// deletedBlockLogStateManager, since they will be cleared
// when becoming leader.
deletedBlockLogStateManager.reinitialize(deletedTable);
@@ -359,8 +253,7 @@ public class DeletedBlockLogImpl
* leader.
*/
public void onBecomeLeader() {
- transactionToDNsCommitMap.clear();
- transactionToRetryCountMap.clear();
+ transactionStatusManager.clear();
}
/**
@@ -404,23 +297,21 @@ public class DeletedBlockLogImpl
private void getTransaction(DeletedBlocksTransaction tx,
DatanodeDeletedBlockTransactions transactions,
- Set<DatanodeDetails> dnList, Set<ContainerReplica> replicas) {
+ Set<DatanodeDetails> dnList, Set<ContainerReplica> replicas,
+ Map<UUID, Map<Long, CmdStatus>> commandStatus) {
DeletedBlocksTransaction updatedTxn =
DeletedBlocksTransaction.newBuilder(tx)
- .setCount(transactionToRetryCountMap.getOrDefault(tx.getTxID(), 0))
+ .setCount(transactionStatusManager.getOrDefaultRetryCount(
+ tx.getTxID(), 0))
.build();
for (ContainerReplica replica : replicas) {
- UUID dnID = replica.getDatanodeDetails().getUuid();
- if (!dnList.contains(replica.getDatanodeDetails())) {
+ DatanodeDetails details = replica.getDatanodeDetails();
+ if (!dnList.contains(details)) {
continue;
}
- Set<UUID> dnsWithTransactionCommitted =
- transactionToDNsCommitMap.get(updatedTxn.getTxID());
- if (dnsWithTransactionCommitted == null || !dnsWithTransactionCommitted
- .contains(dnID)) {
- // Transaction need not be sent to dns which have
- // already committed it
- transactions.addTransactionToDN(dnID, updatedTxn);
+ if (!transactionStatusManager.isDuplication(
+ details, updatedTxn.getTxID(), commandStatus)) {
+ transactions.addTransactionToDN(details.getUuid(), updatedTxn);
}
}
}
@@ -442,15 +333,26 @@ public class DeletedBlockLogImpl
throws IOException {
lock.lock();
try {
+ // Here we can clean up the Datanode timeout command that no longer
+ // reports heartbeats
+ getSCMDeletedBlockTransactionStatusManager().cleanAllTimeoutSCMCommand(
+ scmCommandTimeoutMs);
DatanodeDeletedBlockTransactions transactions =
new DatanodeDeletedBlockTransactions();
try (TableIterator<Long,
? extends Table.KeyValue<Long, DeletedBlocksTransaction>> iter =
deletedBlockLogStateManager.getReadOnlyIterator()) {
+ // Get the CmdStatus status of the aggregation, so that the current
+ // status of the specified transaction can be found faster
+ Map<UUID, Map<Long, CmdStatus>> commandStatus =
+ getSCMDeletedBlockTransactionStatusManager()
+ .getCommandStatusByTxId(dnList.stream().
+ map(DatanodeDetails::getUuid).collect(Collectors.toSet()));
ArrayList<Long> txIDs = new ArrayList<>();
// Here takes block replica count as the threshold to avoid the case
// that part of replicas committed the TXN and recorded in the
- // transactionToDNsCommitMap, while they are counted in the threshold.
+ // SCMDeletedBlockTransactionStatusManager, while they are counted
+ // in the threshold.
while (iter.hasNext() &&
transactions.getBlocksDeleted() < blockDeletionLimit) {
Table.KeyValue<Long, DeletedBlocksTransaction> keyValue =
iter.next();
@@ -471,9 +373,8 @@ public class DeletedBlockLogImpl
if (checkInadequateReplica(replicas, txn)) {
continue;
}
- getTransaction(txn, transactions, dnList, replicas);
- transactionToDNsCommitMap
- .putIfAbsent(txn.getTxID(), new LinkedHashSet<>());
+ getTransaction(
+ txn, transactions, dnList, replicas, commandStatus);
}
} catch (ContainerNotFoundException ex) {
LOG.warn("Container: " + id + " was not found for the transaction:
"
@@ -492,6 +393,33 @@ public class DeletedBlockLogImpl
}
}
+ public void setScmCommandTimeoutMs(long scmCommandTimeoutMs) {
+ this.scmCommandTimeoutMs = scmCommandTimeoutMs;
+ }
+
+ @VisibleForTesting
+ public SCMDeletedBlockTransactionStatusManager
+ getSCMDeletedBlockTransactionStatusManager() {
+ return transactionStatusManager;
+ }
+
+ @Override
+ public void recordTransactionCreated(UUID dnId, long scmCmdId,
+ Set<Long> dnTxSet) {
+ getSCMDeletedBlockTransactionStatusManager()
+ .recordTransactionCreated(dnId, scmCmdId, dnTxSet);
+ }
+
+ @Override
+ public void onDatanodeDead(UUID dnId) {
+ getSCMDeletedBlockTransactionStatusManager().onDatanodeDead(dnId);
+ }
+
+ @Override
+ public void onSent(DatanodeDetails dnId, SCMCommand<?> scmCommand) {
+ getSCMDeletedBlockTransactionStatusManager().onSent(dnId, scmCommand);
+ }
+
@Override
public void onMessage(
DeleteBlockStatus deleteBlockStatus, EventPublisher publisher) {
@@ -500,18 +428,30 @@ public class DeletedBlockLogImpl
return;
}
- CommandStatus.Status status = deleteBlockStatus.getCmdStatus().getStatus();
- if (status == CommandStatus.Status.EXECUTED) {
- ContainerBlocksDeletionACKProto ackProto =
- deleteBlockStatus.getCmdStatus().getBlockDeletionAck();
- commitTransactions(ackProto.getResultsList(),
- UUID.fromString(ackProto.getDnId()));
- metrics.incrBlockDeletionCommandSuccess();
- } else if (status == CommandStatus.Status.FAILED) {
- metrics.incrBlockDeletionCommandFailure();
- } else {
- LOG.error("Delete Block Command is not executed yet.");
- return;
+ DatanodeDetails details = deleteBlockStatus.getDatanodeDetails();
+ UUID dnId = details.getUuid();
+ for (CommandStatus commandStatus : deleteBlockStatus.getCmdStatus()) {
+ CommandStatus.Status status = commandStatus.getStatus();
+ lock.lock();
+ try {
+ if (status == CommandStatus.Status.EXECUTED) {
+ ContainerBlocksDeletionACKProto ackProto =
+ commandStatus.getBlockDeletionAck();
+ getSCMDeletedBlockTransactionStatusManager()
+ .commitTransactions(ackProto.getResultsList(), dnId);
+ metrics.incrBlockDeletionCommandSuccess();
+ } else if (status == CommandStatus.Status.FAILED) {
+ metrics.incrBlockDeletionCommandFailure();
+ } else {
+ LOG.debug("Delete Block Command {} is not executed on the Datanode" +
+ " {}.", commandStatus.getCmdId(), dnId);
+ }
+
+ getSCMDeletedBlockTransactionStatusManager()
+ .commitSCMCommandStatus(deleteBlockStatus.getCmdStatus(), dnId);
+ } finally {
+ lock.unlock();
+ }
}
}
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java
index 5fd889b758..24e950a599 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java
@@ -49,7 +49,6 @@ import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
import org.apache.hadoop.hdds.utils.BackgroundTaskResult.EmptyTaskResult;
import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
-import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.apache.hadoop.util.Time;
import com.google.common.annotations.VisibleForTesting;
@@ -176,11 +175,14 @@ public class SCMBlockDeletingService extends
BackgroundService
UUID dnId = entry.getKey();
List<DeletedBlocksTransaction> dnTXs = entry.getValue();
if (!dnTXs.isEmpty()) {
- processedTxIDs.addAll(dnTXs.stream()
+ Set<Long> dnTxSet = dnTXs.stream()
.map(DeletedBlocksTransaction::getTxID)
- .collect(Collectors.toSet()));
- SCMCommand<?> command = new DeleteBlocksCommand(dnTXs);
+ .collect(Collectors.toSet());
+ processedTxIDs.addAll(dnTxSet);
+ DeleteBlocksCommand command = new DeleteBlocksCommand(dnTXs);
command.setTerm(scmContext.getTermOfLeader());
+ deletedBlockLog.recordTransactionCreated(dnId, command.getId(),
+ dnTxSet);
eventPublisher.fireEvent(SCMEvents.DATANODE_COMMAND,
new CommandForDatanode<>(dnId, command));
metrics.incrBlockDeletionCommandSent();
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMDeletedBlockTransactionStatusManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMDeletedBlockTransactionStatusManager.java
new file mode 100644
index 0000000000..b43e91e059
--- /dev/null
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMDeletedBlockTransactionStatusManager.java
@@ -0,0 +1,581 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.hdds.scm.block;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatus;
+import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto.DeleteBlockTransactionResult;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerManager;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
+import org.apache.hadoop.hdds.scm.ha.SCMContext;
+import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+import static java.lang.Math.min;
+import static
org.apache.hadoop.hdds.scm.block.SCMDeletedBlockTransactionStatusManager.SCMDeleteBlocksCommandStatusManager.CmdStatus;
+import static
org.apache.hadoop.hdds.scm.block.SCMDeletedBlockTransactionStatusManager.SCMDeleteBlocksCommandStatusManager.CmdStatus.SENT;
+import static
org.apache.hadoop.hdds.scm.block.SCMDeletedBlockTransactionStatusManager.SCMDeleteBlocksCommandStatusManager.CmdStatus.TO_BE_SENT;
+
+/**
+ * This is a class to manage the status of DeletedBlockTransaction,
+ * the purpose of this class is to reduce the number of duplicate
+ * DeletedBlockTransaction sent to the DN.
+ */
+public class SCMDeletedBlockTransactionStatusManager {
+ public static final Logger LOG =
+ LoggerFactory.getLogger(SCMDeletedBlockTransactionStatusManager.class);
+ // Maps txId to set of DNs which are successful in committing the transaction
+ private final Map<Long, Set<UUID>> transactionToDNsCommitMap;
+ // Maps txId to its retry counts;
+ private final Map<Long, Integer> transactionToRetryCountMap;
+ // The access to DeletedBlocksTXTable is protected by
+ // DeletedBlockLogStateManager.
+ private final DeletedBlockLogStateManager deletedBlockLogStateManager;
+ private final ContainerManager containerManager;
+ private final ScmBlockDeletingServiceMetrics metrics;
+ private final SCMContext scmContext;
+ private final long scmCommandTimeoutMs;
+
+ /**
+ * Before the DeletedBlockTransaction is executed on DN and reported to
+ * SCM, it is managed by this {@link SCMDeleteBlocksCommandStatusManager}.
+ * After the DeletedBlocksTransaction in the DeleteBlocksCommand is
+ * committed on the SCM, it is managed by
+ * {@link SCMDeletedBlockTransactionStatusManager#transactionToDNsCommitMap}
+ */
+ private final SCMDeleteBlocksCommandStatusManager
+ scmDeleteBlocksCommandStatusManager;
+
+ public SCMDeletedBlockTransactionStatusManager(
+ DeletedBlockLogStateManager deletedBlockLogStateManager,
+ ContainerManager containerManager, SCMContext scmContext,
+ ScmBlockDeletingServiceMetrics metrics, long scmCommandTimeoutMs) {
+ // maps transaction to dns which have committed it.
+ this.deletedBlockLogStateManager = deletedBlockLogStateManager;
+ this.metrics = metrics;
+ this.containerManager = containerManager;
+ this.scmContext = scmContext;
+ this.scmCommandTimeoutMs = scmCommandTimeoutMs;
+ this.transactionToDNsCommitMap = new ConcurrentHashMap<>();
+ this.transactionToRetryCountMap = new ConcurrentHashMap<>();
+ this.scmDeleteBlocksCommandStatusManager =
+ new SCMDeleteBlocksCommandStatusManager();
+ }
+
+ /**
+ * A class that manages the status of a DeletedBlockTransaction based
+ * on DeleteBlocksCommand.
+ */
+ protected static class SCMDeleteBlocksCommandStatusManager {
+ public static final Logger LOG =
+ LoggerFactory.getLogger(SCMDeleteBlocksCommandStatusManager.class);
+ private final Map<UUID, Map<Long, CmdStatusData>> scmCmdStatusRecord;
+
+ private static final CmdStatus DEFAULT_STATUS = TO_BE_SENT;
+ private static final Set<CmdStatus> STATUSES_REQUIRING_TIMEOUT =
+ new HashSet<>(Arrays.asList(SENT));
+
+ public SCMDeleteBlocksCommandStatusManager() {
+ this.scmCmdStatusRecord = new ConcurrentHashMap<>();
+ }
+
+ /**
+ * Status of SCMDeleteBlocksCommand.
+ */
+ public enum CmdStatus {
+ // The DeleteBlocksCommand has not yet been sent.
+ // This is the initial status of the command after it's created.
+ TO_BE_SENT,
+ // If the DeleteBlocksCommand has been sent but has not been executed
+ // completely by DN, the DeleteBlocksCommand's state will be SENT.
+ // Note that the state of SENT includes the following possibilities.
+ // - The command was sent but not received
+ // - The command was sent and received by the DN,
+ // and is waiting to be executed.
+ // - The Command sent and being executed by DN
+ SENT
+ }
+
+ protected static final class CmdStatusData {
+ private final UUID dnId;
+ private final long scmCmdId;
+ private final Set<Long> deletedBlocksTxIds;
+ private Instant updateTime;
+ private CmdStatus status;
+
+ private CmdStatusData(
+ UUID dnId, long scmTxID, Set<Long> deletedBlocksTxIds) {
+ this.dnId = dnId;
+ this.scmCmdId = scmTxID;
+ this.deletedBlocksTxIds = deletedBlocksTxIds;
+ setStatus(DEFAULT_STATUS);
+ }
+
+ public Set<Long> getDeletedBlocksTxIds() {
+ return Collections.unmodifiableSet(deletedBlocksTxIds);
+ }
+
+ public UUID getDnId() {
+ return dnId;
+ }
+
+ public long getScmCmdId() {
+ return scmCmdId;
+ }
+
+ public CmdStatus getStatus() {
+ return status;
+ }
+
+ public void setStatus(CmdStatus status) {
+ this.updateTime = Instant.now();
+ this.status = status;
+ }
+
+ public Instant getUpdateTime() {
+ return updateTime;
+ }
+
+ @Override
+ public String toString() {
+ return "ScmTxStateMachine" +
+ "{dnId=" + dnId +
+ ", scmTxID=" + scmCmdId +
+ ", deletedBlocksTxIds=" + deletedBlocksTxIds +
+ ", updateTime=" + updateTime +
+ ", status=" + status +
+ '}';
+ }
+ }
+
+ protected static CmdStatusData createScmCmdStatusData(
+ UUID dnId, long scmCmdId, Set<Long> deletedBlocksTxIds) {
+ return new CmdStatusData(dnId, scmCmdId, deletedBlocksTxIds);
+ }
+
+ protected void recordScmCommand(CmdStatusData statusData) {
+ LOG.debug("Record ScmCommand: {}", statusData);
+ scmCmdStatusRecord.computeIfAbsent(statusData.getDnId(), k ->
+ new ConcurrentHashMap<>()).put(statusData.getScmCmdId(), statusData);
+ }
+
+ protected void onSent(UUID dnId, long scmCmdId) {
+ updateStatus(dnId, scmCmdId, CommandStatus.Status.PENDING);
+ }
+
+ protected void onDatanodeDead(UUID dnId) {
+ LOG.info("Clean SCMCommand record for DN: {}", dnId);
+ scmCmdStatusRecord.remove(dnId);
+ }
+
+ protected void updateStatusByDNCommandStatus(UUID dnId, long scmCmdId,
+ CommandStatus.Status newState) {
+ updateStatus(dnId, scmCmdId, newState);
+ }
+
+ protected void cleanAllTimeoutSCMCommand(long timeoutMs) {
+ for (UUID dnId : scmCmdStatusRecord.keySet()) {
+ for (CmdStatus status : STATUSES_REQUIRING_TIMEOUT) {
+ removeTimeoutScmCommand(
+ dnId, getScmCommandIds(dnId, status), timeoutMs);
+ }
+ }
+ }
+
+ public void cleanTimeoutSCMCommand(UUID dnId, long timeoutMs) {
+ for (CmdStatus status : STATUSES_REQUIRING_TIMEOUT) {
+ removeTimeoutScmCommand(
+ dnId, getScmCommandIds(dnId, status), timeoutMs);
+ }
+ }
+
+ private Set<Long> getScmCommandIds(UUID dnId, CmdStatus status) {
+ Set<Long> scmCmdIds = new HashSet<>();
+ Map<Long, CmdStatusData> record = scmCmdStatusRecord.get(dnId);
+ if (record == null) {
+ return scmCmdIds;
+ }
+ for (CmdStatusData statusData : record.values()) {
+ if (statusData.getStatus().equals(status)) {
+ scmCmdIds.add(statusData.getScmCmdId());
+ }
+ }
+ return scmCmdIds;
+ }
+
+ private Instant getUpdateTime(UUID dnId, long scmCmdId) {
+ Map<Long, CmdStatusData> record = scmCmdStatusRecord.get(dnId);
+ if (record == null || record.get(scmCmdId) == null) {
+ return null;
+ }
+ return record.get(scmCmdId).getUpdateTime();
+ }
+
+ private void updateStatus(UUID dnId, long scmCmdId,
+ CommandStatus.Status newStatus) {
+ Map<Long, CmdStatusData> recordForDn = scmCmdStatusRecord.get(dnId);
+ if (recordForDn == null) {
+ LOG.warn("Unknown Datanode: {} scmCmdId {} newStatus {}",
+ dnId, scmCmdId, newStatus);
+ return;
+ }
+ if (recordForDn.get(scmCmdId) == null) {
+ LOG.warn("Unknown SCM Command: {} Datanode {} newStatus {}",
+ scmCmdId, dnId, newStatus);
+ return;
+ }
+
+ boolean changed = false;
+ CmdStatusData statusData = recordForDn.get(scmCmdId);
+ CmdStatus oldStatus = statusData.getStatus();
+ switch (newStatus) {
+ case PENDING:
+ if (oldStatus == TO_BE_SENT || oldStatus == SENT) {
+ // TO_BE_SENT -> SENT: The DeleteBlocksCommand is sent by SCM,
+ // The follow-up status has not been updated by Datanode.
+
+ // SENT -> SENT: The DeleteBlocksCommand continues to wait to be
+ // executed by Datanode.
+ statusData.setStatus(SENT);
+ changed = true;
+ }
+ break;
+ case EXECUTED:
+ case FAILED:
+ if (oldStatus == SENT) {
+ // Once the DN executes DeleteBlocksCommands, regardless of whether
+ // DeleteBlocksCommands is executed successfully or not,
+ // it will be deleted from record.
+ // Successful DeleteBlocksCommands are recorded in
+ // `transactionToDNsCommitMap`.
+ removeScmCommand(dnId, scmCmdId);
+ changed = true;
+ }
+ if (oldStatus == TO_BE_SENT) {
+ // SCM receives a reply to an unsent transaction,
+ // which should not normally occur.
+ LOG.error("Received {} status for a command marked TO_BE_SENT. " +
+ "This indicates a potential issue in command handling. " +
+ "SCM Command ID: {}, Datanode ID: {}, Current Status: {}",
+ newStatus, scmCmdId, dnId, oldStatus);
+ removeScmCommand(dnId, scmCmdId);
+ changed = true;
+ }
+ break;
+ default:
+ LOG.error("Can not update to Unknown new Status: {}", newStatus);
+ break;
+ }
+ if (!changed) {
+ LOG.warn("Cannot update illegal status for DN: {} ScmCommandId {} " +
+ "Status From {} to {}", dnId, scmCmdId, oldStatus, newStatus);
+ } else {
+ LOG.debug("Successful update DN: {} ScmCommandId {} Status From {} to"
+
+ " {}", dnId, scmCmdId, oldStatus, newStatus);
+ }
+ }
+
+ private void removeTimeoutScmCommand(UUID dnId,
+ Set<Long> scmCmdIds, long timeoutMs) {
+ Instant now = Instant.now();
+ for (Long scmCmdId : scmCmdIds) {
+ Instant updateTime = getUpdateTime(dnId, scmCmdId);
+ if (updateTime != null &&
+ Duration.between(updateTime, now).toMillis() > timeoutMs) {
+ CmdStatusData state = removeScmCommand(dnId, scmCmdId);
+ LOG.warn("Remove Timeout SCM BlockDeletionCommand {} for DN {} " +
+ "after without update {}ms}", state, dnId, timeoutMs);
+ } else {
+ LOG.warn("Timeout SCM scmCmdIds {} for DN {} " +
+ "after without update {}ms}", scmCmdIds, dnId, timeoutMs);
+ }
+ }
+ }
+
+ private CmdStatusData removeScmCommand(UUID dnId, long scmCmdId) {
+ Map<Long, CmdStatusData> record = scmCmdStatusRecord.get(dnId);
+ if (record == null || record.get(scmCmdId) == null) {
+ return null;
+ }
+ CmdStatusData statusData = record.remove(scmCmdId);
+ LOG.debug("Remove ScmCommand {} for DN: {} ", statusData, dnId);
+ return statusData;
+ }
+
+ public Map<UUID, Map<Long, CmdStatus>> getCommandStatusByTxId(
+ Set<UUID> dnIds) {
+ Map<UUID, Map<Long, CmdStatus>> result =
+ new HashMap<>(scmCmdStatusRecord.size());
+
+ for (UUID dnId : dnIds) {
+ Map<Long, CmdStatusData> record = scmCmdStatusRecord.get(dnId);
+ if (record == null) {
+ continue;
+ }
+ Map<Long, CmdStatus> dnStatusMap = new HashMap<>();
+ for (CmdStatusData statusData : record.values()) {
+ CmdStatus status = statusData.getStatus();
+ for (Long deletedBlocksTxId : statusData.getDeletedBlocksTxIds()) {
+ dnStatusMap.put(deletedBlocksTxId, status);
+ }
+ }
+ result.put(dnId, dnStatusMap);
+ }
+
+ return result;
+ }
+
+ private void clear() {
+ scmCmdStatusRecord.clear();
+ }
+
+ @VisibleForTesting
+ Map<UUID, Map<Long, CmdStatusData>> getScmCmdStatusRecord() {
+ return scmCmdStatusRecord;
+ }
+ }
+
+ public void incrementRetryCount(List<Long> txIDs, long maxRetry)
+ throws IOException {
+ ArrayList<Long> txIDsToUpdate = new ArrayList<>();
+ for (Long txID : txIDs) {
+ int currentCount =
+ transactionToRetryCountMap.getOrDefault(txID, 0);
+ if (currentCount > maxRetry) {
+ continue;
+ } else {
+ currentCount += 1;
+ if (currentCount > maxRetry) {
+ txIDsToUpdate.add(txID);
+ }
+ transactionToRetryCountMap.put(txID, currentCount);
+ }
+ }
+
+ if (!txIDsToUpdate.isEmpty()) {
+ deletedBlockLogStateManager
+ .increaseRetryCountOfTransactionInDB(txIDsToUpdate);
+ }
+ }
+
+ public void resetRetryCount(List<Long> txIDs) throws IOException {
+ for (Long txID: txIDs) {
+ transactionToRetryCountMap.computeIfPresent(txID, (key, value) -> 0);
+ }
+ }
+
+ public int getOrDefaultRetryCount(long txID, int defaultValue) {
+ return transactionToRetryCountMap.getOrDefault(txID, defaultValue);
+ }
+
+ public void onSent(DatanodeDetails dnId, SCMCommand<?> scmCommand) {
+ scmDeleteBlocksCommandStatusManager.onSent(
+ dnId.getUuid(), scmCommand.getId());
+ }
+
+ public Map<UUID, Map<Long, CmdStatus>> getCommandStatusByTxId(
+ Set<UUID> dnIds) {
+ return scmDeleteBlocksCommandStatusManager.getCommandStatusByTxId(dnIds);
+ }
+
+ public void recordTransactionCreated(
+ UUID dnId, long scmCmdId, Set<Long> dnTxSet) {
+ scmDeleteBlocksCommandStatusManager.recordScmCommand(
+ SCMDeleteBlocksCommandStatusManager
+ .createScmCmdStatusData(dnId, scmCmdId, dnTxSet));
+ dnTxSet.forEach(txId -> transactionToDNsCommitMap
+ .putIfAbsent(txId, new LinkedHashSet<>()));
+ }
+
+ public void clear() {
+ transactionToRetryCountMap.clear();
+ scmDeleteBlocksCommandStatusManager.clear();
+ transactionToDNsCommitMap.clear();
+ }
+
+ public void cleanAllTimeoutSCMCommand(long timeoutMs) {
+ scmDeleteBlocksCommandStatusManager.cleanAllTimeoutSCMCommand(timeoutMs);
+ }
+
+ public void onDatanodeDead(UUID dnId) {
+ scmDeleteBlocksCommandStatusManager.onDatanodeDead(dnId);
+ }
+
+ public boolean isDuplication(DatanodeDetails dnDetail, long tx,
+ Map<UUID, Map<Long, CmdStatus>> commandStatus) {
+ if (alreadyExecuted(dnDetail.getUuid(), tx)) {
+ return true;
+ }
+ return inProcessing(dnDetail.getUuid(), tx, commandStatus);
+ }
+
+ public boolean alreadyExecuted(UUID dnId, long txId) {
+ Set<UUID> dnsWithTransactionCommitted =
+ transactionToDNsCommitMap.get(txId);
+ return dnsWithTransactionCommitted != null && dnsWithTransactionCommitted
+ .contains(dnId);
+ }
+
+ /**
+ * Commits a transaction means to delete all footprints of a transaction
+ * from the log. This method doesn't guarantee all transactions can be
+ * successfully deleted, it tolerate failures and tries best efforts to.
+ * @param transactionResults - delete block transaction results.
+ * @param dnId - ID of datanode which acknowledges the delete block command.
+ */
+ @VisibleForTesting
+ public void commitTransactions(
+ List<DeleteBlockTransactionResult> transactionResults, UUID dnId) {
+
+ ArrayList<Long> txIDsToBeDeleted = new ArrayList<>();
+ Set<UUID> dnsWithCommittedTxn;
+ for (DeleteBlockTransactionResult transactionResult :
+ transactionResults) {
+ if (isTransactionFailed(transactionResult)) {
+ metrics.incrBlockDeletionTransactionFailure();
+ continue;
+ }
+ try {
+ metrics.incrBlockDeletionTransactionSuccess();
+ long txID = transactionResult.getTxID();
+ // set of dns which have successfully committed transaction txId.
+ dnsWithCommittedTxn = transactionToDNsCommitMap.get(txID);
+ final ContainerID containerId = ContainerID.valueOf(
+ transactionResult.getContainerID());
+ if (dnsWithCommittedTxn == null) {
+ // Mostly likely it's a retried delete command response.
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Transaction txId={} commit by dnId={} for containerID={}"
+ + " failed. Corresponding entry not found.", txID, dnId,
+ containerId);
+ }
+ continue;
+ }
+
+ dnsWithCommittedTxn.add(dnId);
+ final ContainerInfo container =
+ containerManager.getContainer(containerId);
+ final Set<ContainerReplica> replicas =
+ containerManager.getContainerReplicas(containerId);
+ // The delete entry can be safely removed from the log if all the
+ // corresponding nodes commit the txn. It is required to check that
+ // the nodes returned in the pipeline match the replication factor.
+ if (min(replicas.size(), dnsWithCommittedTxn.size())
+ >= container.getReplicationConfig().getRequiredNodes()) {
+ List<UUID> containerDns = replicas.stream()
+ .map(ContainerReplica::getDatanodeDetails)
+ .map(DatanodeDetails::getUuid)
+ .collect(Collectors.toList());
+ if (dnsWithCommittedTxn.containsAll(containerDns)) {
+ transactionToDNsCommitMap.remove(txID);
+ transactionToRetryCountMap.remove(txID);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Purging txId={} from block deletion log", txID);
+ }
+ txIDsToBeDeleted.add(txID);
+ }
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Datanode txId={} containerId={} committed by dnId={}",
+ txID, containerId, dnId);
+ }
+ } catch (IOException e) {
+ LOG.warn("Could not commit delete block transaction: " +
+ transactionResult.getTxID(), e);
+ }
+ }
+ try {
+ deletedBlockLogStateManager.removeTransactionsFromDB(txIDsToBeDeleted);
+ metrics.incrBlockDeletionTransactionCompleted(txIDsToBeDeleted.size());
+ } catch (IOException e) {
+ LOG.warn("Could not commit delete block transactions: "
+ + txIDsToBeDeleted, e);
+ }
+ }
+
+ @VisibleForTesting
+ public void commitSCMCommandStatus(List<CommandStatus> deleteBlockStatus,
+ UUID dnId) {
+ processSCMCommandStatus(deleteBlockStatus, dnId);
+ scmDeleteBlocksCommandStatusManager.
+ cleanTimeoutSCMCommand(dnId, scmCommandTimeoutMs);
+ }
+
+ private boolean inProcessing(UUID dnId, long deletedBlocksTxId,
+ Map<UUID, Map<Long, CmdStatus>> commandStatus) {
+ Map<Long, CmdStatus> deletedBlocksTxStatus = commandStatus.get(dnId);
+ return deletedBlocksTxStatus != null &&
+ deletedBlocksTxStatus.get(deletedBlocksTxId) != null;
+ }
+
+ private void processSCMCommandStatus(List<CommandStatus> deleteBlockStatus,
+ UUID dnID) {
+ Map<Long, CommandStatus> lastStatus = new HashMap<>();
+ Map<Long, CommandStatus.Status> summary = new HashMap<>();
+
+ // The CommandStatus is ordered in the report. So we can focus only on the
+ // last status in the command report.
+ deleteBlockStatus.forEach(cmdStatus -> {
+ lastStatus.put(cmdStatus.getCmdId(), cmdStatus);
+ summary.put(cmdStatus.getCmdId(), cmdStatus.getStatus());
+ });
+ LOG.debug("CommandStatus {} from Datanode {} ", summary, dnID);
+ for (Map.Entry<Long, CommandStatus> entry : lastStatus.entrySet()) {
+ CommandStatus.Status status = entry.getValue().getStatus();
+ scmDeleteBlocksCommandStatusManager.updateStatusByDNCommandStatus(
+ dnID, entry.getKey(), status);
+ }
+ }
+
+ private boolean isTransactionFailed(DeleteBlockTransactionResult result) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Got block deletion ACK from datanode, TXIDs={}, " + "success={}",
+ result.getTxID(), result.getSuccess());
+ }
+ if (!result.getSuccess()) {
+ LOG.warn("Got failed ACK for TXID={}, prepare to resend the "
+ + "TX in next interval", result.getTxID());
+ return true;
+ }
+ return false;
+ }
+}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/CommandStatusReportHandler.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/CommandStatusReportHandler.java
index d43311265d..5d737659dd 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/CommandStatusReportHandler.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/CommandStatusReportHandler.java
@@ -18,6 +18,8 @@
package org.apache.hadoop.hdds.scm.command;
import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.HddsIdFactory;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
import org.apache.hadoop.hdds.protocol.proto
@@ -31,6 +33,7 @@ import
org.apache.hadoop.hdds.server.events.IdentifiableEventPayload;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
import java.util.List;
/**
@@ -54,32 +57,43 @@ public class CommandStatusReportHandler implements
}
// Route command status to its watchers.
+ List<CommandStatus> deleteBlocksCommandStatus = new ArrayList<>();
cmdStatusList.forEach(cmdStatus -> {
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("Emitting command status for id:{} type: {}", cmdStatus
.getCmdId(), cmdStatus.getType());
}
if (cmdStatus.getType() == SCMCommandProto.Type.deleteBlocksCommand) {
- publisher.fireEvent(SCMEvents.DELETE_BLOCK_STATUS,
- new DeleteBlockStatus(cmdStatus));
+ deleteBlocksCommandStatus.add(cmdStatus);
} else {
LOGGER.debug("CommandStatus of type:{} not handled in " +
"CommandStatusReportHandler.", cmdStatus.getType());
}
});
+
+ /**
+ * The purpose of aggregating all CommandStatus to commit is to reduce the
+ * Thread switching. When the Datanode queue has a large number of commands
+ * , there will have many {@link CommandStatus#Status#PENDING} status
+ * CommandStatus in report
+ */
+ if (!deleteBlocksCommandStatus.isEmpty()) {
+ publisher.fireEvent(SCMEvents.DELETE_BLOCK_STATUS, new DeleteBlockStatus(
+ deleteBlocksCommandStatus, report.getDatanodeDetails()));
+ }
}
/**
* Wrapper event for CommandStatus.
*/
public static class CommandStatusEvent implements IdentifiableEventPayload {
- private CommandStatus cmdStatus;
+ private final List<CommandStatus> cmdStatus;
- CommandStatusEvent(CommandStatus cmdStatus) {
+ CommandStatusEvent(List<CommandStatus> cmdStatus) {
this.cmdStatus = cmdStatus;
}
- public CommandStatus getCmdStatus() {
+ public List<CommandStatus> getCmdStatus() {
return cmdStatus;
}
@@ -90,7 +104,7 @@ public class CommandStatusReportHandler implements
@Override
public long getId() {
- return cmdStatus.getCmdId();
+ return HddsIdFactory.getLongId();
}
}
@@ -98,8 +112,16 @@ public class CommandStatusReportHandler implements
* Wrapper event for DeleteBlock Command.
*/
public static class DeleteBlockStatus extends CommandStatusEvent {
- public DeleteBlockStatus(CommandStatus cmdStatus) {
+ private final DatanodeDetails datanodeDetails;
+
+ public DeleteBlockStatus(List<CommandStatus> cmdStatus,
+ DatanodeDetails datanodeDetails) {
super(cmdStatus);
+ this.datanodeDetails = datanodeDetails;
+ }
+
+ public DatanodeDetails getDatanodeDetails() {
+ return datanodeDetails;
}
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java
index f05eb761d9..3c40437d7f 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java
@@ -24,6 +24,7 @@ import java.util.Optional;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.block.DeletedBlockLog;
import org.apache.hadoop.hdds.scm.container.ContainerException;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerManager;
@@ -41,6 +42,8 @@ import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
+import javax.annotation.Nullable;
+
import static org.apache.hadoop.hdds.scm.events.SCMEvents.CLOSE_CONTAINER;
/**
@@ -51,6 +54,8 @@ public class DeadNodeHandler implements
EventHandler<DatanodeDetails> {
private final NodeManager nodeManager;
private final PipelineManager pipelineManager;
private final ContainerManager containerManager;
+ @Nullable
+ private final DeletedBlockLog deletedBlockLog;
private static final Logger LOG =
LoggerFactory.getLogger(DeadNodeHandler.class);
@@ -58,9 +63,17 @@ public class DeadNodeHandler implements
EventHandler<DatanodeDetails> {
public DeadNodeHandler(final NodeManager nodeManager,
final PipelineManager pipelineManager,
final ContainerManager containerManager) {
+ this(nodeManager, pipelineManager, containerManager, null);
+ }
+
+ public DeadNodeHandler(final NodeManager nodeManager,
+ final PipelineManager pipelineManager,
+ final ContainerManager containerManager,
+ @Nullable final DeletedBlockLog deletedBlockLog) {
this.nodeManager = nodeManager;
this.pipelineManager = pipelineManager;
this.containerManager = containerManager;
+ this.deletedBlockLog = deletedBlockLog;
}
@Override
@@ -95,6 +108,13 @@ public class DeadNodeHandler implements
EventHandler<DatanodeDetails> {
LOG.info("Clearing command queue of size {} for DN {}",
cmdList.size(), datanodeDetails);
+ // remove DeleteBlocksCommand associated with the dead node unless it
+ // is IN_MAINTENANCE
+ if (deletedBlockLog != null &&
+ !nodeManager.getNodeStatus(datanodeDetails).isInMaintenance()) {
+ deletedBlockLog.onDatanodeDead(datanodeDetails.getUuid());
+ }
+
//move dead datanode out of ClusterNetworkTopology
NetworkTopology nt = nodeManager.getClusterNetworkTopologyMap();
if (nt.contains(datanodeDetails)) {
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
index 011b361d62..399a7ef952 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
@@ -47,6 +47,7 @@ import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.Collection;
+import java.util.function.BiConsumer;
/**
* A node manager supports a simple interface for managing a datanode.
@@ -90,6 +91,18 @@ public interface NodeManager extends
StorageContainerNodeProtocol,
defaultLayoutVersionProto());
}
+ /**
+ * Register a SendCommandNotify handler for a specific type of SCMCommand.
+ * @param type The type of the SCMCommand.
+ * @param scmCommand A BiConsumer that takes a DatanodeDetails and a
+ * SCMCommand object and performs the necessary actions.
+ * @return whatever the regular register command returns with default
+ * layout version passed in.
+ */
+ default void registerSendCommandNotify(SCMCommandProto.Type type,
+ BiConsumer<DatanodeDetails, SCMCommand<?>> scmCommand) {
+ }
+
/**
* Gets all Live Datanodes that are currently communicating with SCM.
* @param nodeStatus - Status of the node to return
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
index 167b25afd0..972c061d5d 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
@@ -84,6 +84,7 @@ import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -128,6 +129,8 @@ public class SCMNodeManager implements NodeManager {
private final HDDSLayoutVersionManager scmLayoutVersionManager;
private final EventPublisher scmNodeEventPublisher;
private final SCMContext scmContext;
+ private final Map<SCMCommandProto.Type,
+ BiConsumer<DatanodeDetails, SCMCommand<?>>> sendCommandNotifyMap;
/**
* Lock used to synchronize some operation in Node manager to ensure a
@@ -179,6 +182,13 @@ public class SCMNodeManager implements NodeManager {
String dnLimit = conf.get(ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT);
this.heavyNodeCriteria = dnLimit == null ? 0 : Integer.parseInt(dnLimit);
this.scmContext = scmContext;
+ this.sendCommandNotifyMap = new HashMap<>();
+ }
+
+ @Override
+ public void registerSendCommandNotify(SCMCommandProto.Type type,
+ BiConsumer<DatanodeDetails, SCMCommand<?>> scmCommand) {
+ this.sendCommandNotifyMap.put(type, scmCommand);
}
private void registerMXBean() {
@@ -521,6 +531,15 @@ public class SCMNodeManager implements NodeManager {
commandQueue.getDatanodeCommandSummary(datanodeDetails.getUuid());
List<SCMCommand> commands =
commandQueue.getCommand(datanodeDetails.getUuid());
+
+ // Update the SCMCommand of deleteBlocksCommand Status
+ for (SCMCommand<?> command : commands) {
+ if (sendCommandNotifyMap.get(command.getType()) != null) {
+ sendCommandNotifyMap.get(command.getType())
+ .accept(datanodeDetails, command);
+ }
+ }
+
if (queueReport != null) {
processNodeCommandQueueReport(datanodeDetails, queueReport, summary);
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index 722244d4c1..c000514ed3 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.conf.ReconfigurationHandler;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
+import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
import
org.apache.hadoop.hdds.protocolPB.SCMSecurityProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdds.scm.PipelineChoosePolicy;
import org.apache.hadoop.hdds.scm.PlacementPolicy;
@@ -577,6 +578,9 @@ public final class StorageContainerManager extends
ServiceRuntimeInfoImpl
eventQueue.addHandler(SCMEvents.PIPELINE_REPORT, pipelineReportHandler);
eventQueue.addHandler(SCMEvents.CRL_STATUS_REPORT, crlStatusReportHandler);
+ scmNodeManager.registerSendCommandNotify(
+ SCMCommandProto.Type.deleteBlocksCommand,
+ scmBlockManager.getDeletedBlockLog()::onSent);
}
private void initializeCertificateClient() throws IOException {
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java
index e1d15146d0..987b1ddbb9 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java
@@ -22,6 +22,8 @@ import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.client.RatisReplicationConfig;
import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
+import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
@@ -49,6 +51,9 @@ import org.apache.hadoop.hdds.protocol.proto
.DeleteBlockTransactionResult;
import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.hdds.utils.db.TableIterator;
+import org.apache.hadoop.ozone.protocol.commands.CommandStatus;
+import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
+import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.apache.ozone.test.GenericTestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
@@ -62,6 +67,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -71,6 +77,7 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys
@@ -255,7 +262,7 @@ public class TestDeletedBlockLog {
List<DeleteBlockTransactionResult> transactionResults,
DatanodeDetails... dns) throws IOException {
for (DatanodeDetails dnDetails : dns) {
- deletedBlockLog
+ deletedBlockLog.getSCMDeletedBlockTransactionStatusManager()
.commitTransactions(transactionResults, dnDetails.getUuid());
}
scmHADBTransactionBuffer.flush();
@@ -284,15 +291,6 @@ public class TestDeletedBlockLog {
.collect(Collectors.toList()));
}
- private void commitTransactions(DatanodeDeletedBlockTransactions
- transactions) {
- transactions.getDatanodeTransactionMap().forEach((uuid,
- deletedBlocksTransactions) -> deletedBlockLog
- .commitTransactions(deletedBlocksTransactions.stream()
- .map(this::createDeleteBlockTransactionResult)
- .collect(Collectors.toList()), uuid));
- }
-
private DeleteBlockTransactionResult createDeleteBlockTransactionResult(
DeletedBlocksTransaction transaction) {
return DeleteBlockTransactionResult.newBuilder()
@@ -315,6 +313,13 @@ public class TestDeletedBlockLog {
transactions.getDatanodeTransactionMap().get(dn.getUuid()))
.orElseGet(LinkedList::new));
}
+ // Simulated transactions are sent
+ for (Map.Entry<UUID, List<DeletedBlocksTransaction>> entry :
+ transactions.getDatanodeTransactionMap().entrySet()) {
+ DeleteBlocksCommand command = new DeleteBlocksCommand(entry.getValue());
+ recordScmCommandToStatusManager(entry.getKey(), command);
+ sendSCMDeleteBlocksCommand(entry.getKey(), command);
+ }
return txns;
}
@@ -431,6 +436,9 @@ public class TestDeletedBlockLog {
}
// Increment for the reset transactions.
+ // Lets the SCM delete the transaction and wait for the DN reply
+ // to timeout, thus allowing the transaction to resend the
+ deletedBlockLog.setScmCommandTimeoutMs(-1L);
incrementCount(txIDs);
blocks = getAllTransactions();
for (DeletedBlocksTransaction block : blocks) {
@@ -442,6 +450,7 @@ public class TestDeletedBlockLog {
@Test
public void testCommitTransactions() throws Exception {
+ deletedBlockLog.setScmCommandTimeoutMs(Long.MAX_VALUE);
addTransactions(generateData(50), true);
mockContainerHealthResult(true);
List<DeletedBlocksTransaction> blocks =
@@ -458,6 +467,12 @@ public class TestDeletedBlockLog {
DatanodeDetails.newBuilder().setUuid(UUID.randomUUID())
.build());
+ blocks = getTransactions(50 * BLOCKS_PER_TXN * THREE);
+ // SCM will not repeat a transaction until it has timed out.
+ Assertions.assertEquals(0, blocks.size());
+ // Lets the SCM delete the transaction and wait for the DN reply
+ // to timeout, thus allowing the transaction to resend the
+ deletedBlockLog.setScmCommandTimeoutMs(-1L);
blocks = getTransactions(50 * BLOCKS_PER_TXN * THREE);
// only uncommitted dn have transactions
Assertions.assertEquals(30, blocks.size());
@@ -467,6 +482,173 @@ public class TestDeletedBlockLog {
Assertions.assertEquals(0, blocks.size());
}
+ private void recordScmCommandToStatusManager(
+ UUID dnId, DeleteBlocksCommand command) {
+ Set<Long> dnTxSet = command.blocksTobeDeleted()
+ .stream().map(DeletedBlocksTransaction::getTxID)
+ .collect(Collectors.toSet());
+ deletedBlockLog.recordTransactionCreated(dnId, command.getId(), dnTxSet);
+ }
+
+ private void sendSCMDeleteBlocksCommand(UUID dnId, SCMCommand<?> scmCommand)
{
+ deletedBlockLog.onSent(
+ DatanodeDetails.newBuilder().setUuid(dnId).build(), scmCommand);
+ }
+
+ private void assertNoDuplicateTransactions(
+ DatanodeDeletedBlockTransactions transactions1,
+ DatanodeDeletedBlockTransactions transactions2) {
+ Map<UUID, List<DeletedBlocksTransaction>> map1 =
+ transactions1.getDatanodeTransactionMap();
+ Map<UUID, List<DeletedBlocksTransaction>> map2 =
+ transactions2.getDatanodeTransactionMap();
+
+ for (Map.Entry<UUID, List<DeletedBlocksTransaction>> entry :
+ map1.entrySet()) {
+ UUID dnId = entry.getKey();
+ Set<DeletedBlocksTransaction> txSet1 = new HashSet<>(entry.getValue());
+ Set<DeletedBlocksTransaction> txSet2 = new HashSet<>(map2.get(dnId));
+
+ txSet1.retainAll(txSet2);
+ Assertions.assertEquals(0, txSet1.size(),
+ String.format("Duplicate Transactions found first transactions %s " +
+ "second transactions %s for Dn %s", txSet1, txSet2, dnId));
+ }
+ }
+
+
+ private void assertContainsAllTransactions(
+ DatanodeDeletedBlockTransactions transactions1,
+ DatanodeDeletedBlockTransactions transactions2) {
+ Map<UUID, List<DeletedBlocksTransaction>> map1 =
+ transactions1.getDatanodeTransactionMap();
+ Map<UUID, List<DeletedBlocksTransaction>> map2 =
+ transactions2.getDatanodeTransactionMap();
+
+ for (Map.Entry<UUID, List<DeletedBlocksTransaction>> entry :
+ map1.entrySet()) {
+ UUID dnId = entry.getKey();
+ Set<DeletedBlocksTransaction> txSet1 = new HashSet<>(entry.getValue());
+ Set<DeletedBlocksTransaction> txSet2 = new HashSet<>(map2.get(dnId));
+
+ Assertions.assertTrue(txSet1.containsAll(txSet2));
+ }
+ }
+
+ private void commitSCMCommandStatus(Long scmCmdId, UUID dnID,
+ StorageContainerDatanodeProtocolProtos.CommandStatus.Status status) {
+ List<StorageContainerDatanodeProtocolProtos
+ .CommandStatus> deleteBlockStatus = new ArrayList<>();
+ deleteBlockStatus.add(CommandStatus.CommandStatusBuilder.newBuilder()
+ .setCmdId(scmCmdId)
+ .setType(Type.deleteBlocksCommand)
+ .setStatus(status)
+ .build()
+ .getProtoBufMessage());
+
+ deletedBlockLog.getSCMDeletedBlockTransactionStatusManager()
+ .commitSCMCommandStatus(deleteBlockStatus, dnID);
+ }
+
+ private void createDeleteBlocksCommandAndAction(
+ DatanodeDeletedBlockTransactions transactions,
+ BiConsumer<UUID, DeleteBlocksCommand> afterCreate) {
+ for (Map.Entry<UUID, List<DeletedBlocksTransaction>> entry :
+ transactions.getDatanodeTransactionMap().entrySet()) {
+ UUID dnId = entry.getKey();
+ List<DeletedBlocksTransaction> dnTXs = entry.getValue();
+ DeleteBlocksCommand command = new DeleteBlocksCommand(dnTXs);
+ afterCreate.accept(dnId, command);
+ }
+ }
+
+ @Test
+ public void testNoDuplicateTransactionsForInProcessingSCMCommand()
+ throws Exception {
+ // The SCM will not resend these transactions in blow case:
+ // - If the command has not been sent;
+ // - The DN does not report the status of the command via heartbeat
+ // After the command is sent;
+ // - If the DN reports the command status as PENDING;
+ addTransactions(generateData(10), true);
+ int blockLimit = 2 * BLOCKS_PER_TXN * THREE;
+ mockContainerHealthResult(true);
+
+ // If the command has not been sent
+ DatanodeDeletedBlockTransactions transactions1 =
+ deletedBlockLog.getTransactions(blockLimit, new HashSet<>(dnList));
+ createDeleteBlocksCommandAndAction(transactions1,
+ this::recordScmCommandToStatusManager);
+
+ // - The DN does not report the status of the command via heartbeat
+ // After the command is sent
+ DatanodeDeletedBlockTransactions transactions2 =
+ deletedBlockLog.getTransactions(blockLimit, new HashSet<>(dnList));
+ assertNoDuplicateTransactions(transactions1, transactions2);
+ createDeleteBlocksCommandAndAction(transactions2, (dnId, command) -> {
+ recordScmCommandToStatusManager(dnId, command);
+ sendSCMDeleteBlocksCommand(dnId, command);
+ });
+
+ // - If the DN reports the command status as PENDING
+ DatanodeDeletedBlockTransactions transactions3 =
+ deletedBlockLog.getTransactions(blockLimit, new HashSet<>(dnList));
+ assertNoDuplicateTransactions(transactions1, transactions3);
+ createDeleteBlocksCommandAndAction(transactions3, (dnId, command) -> {
+ recordScmCommandToStatusManager(dnId, command);
+ sendSCMDeleteBlocksCommand(dnId, command);
+ commitSCMCommandStatus(command.getId(), dnId,
+ StorageContainerDatanodeProtocolProtos.CommandStatus.Status.PENDING);
+ });
+ assertNoDuplicateTransactions(transactions3, transactions1);
+ assertNoDuplicateTransactions(transactions3, transactions2);
+
+ DatanodeDeletedBlockTransactions transactions4 =
+ deletedBlockLog.getTransactions(blockLimit, new HashSet<>(dnList));
+ assertNoDuplicateTransactions(transactions4, transactions1);
+ assertNoDuplicateTransactions(transactions4, transactions2);
+ assertNoDuplicateTransactions(transactions4, transactions3);
+ }
+
+ @Test
+ public void testFailedAndTimeoutSCMCommandCanBeResend() throws Exception {
+ // The SCM will be resent these transactions in blow case:
+ // - Executed failed commands;
+ // - DN does not refresh the PENDING state for more than a period of time;
+ deletedBlockLog.setScmCommandTimeoutMs(Long.MAX_VALUE);
+ addTransactions(generateData(10), true);
+ int blockLimit = 2 * BLOCKS_PER_TXN * THREE;
+ mockContainerHealthResult(true);
+
+ // - DN does not refresh the PENDING state for more than a period of time;
+ DatanodeDeletedBlockTransactions transactions =
+ deletedBlockLog.getTransactions(blockLimit, new HashSet<>(dnList));
+ createDeleteBlocksCommandAndAction(transactions, (dnId, command) -> {
+ recordScmCommandToStatusManager(dnId, command);
+ sendSCMDeleteBlocksCommand(dnId, command);
+ commitSCMCommandStatus(command.getId(), dnId,
+ StorageContainerDatanodeProtocolProtos.CommandStatus.Status.PENDING);
+ });
+
+ // - Executed failed commands;
+ DatanodeDeletedBlockTransactions transactions2 =
+ deletedBlockLog.getTransactions(blockLimit, new HashSet<>(dnList));
+ createDeleteBlocksCommandAndAction(transactions2, (dnId, command) -> {
+ recordScmCommandToStatusManager(dnId, command);
+ sendSCMDeleteBlocksCommand(dnId, command);
+ commitSCMCommandStatus(command.getId(), dnId,
+ StorageContainerDatanodeProtocolProtos.CommandStatus.Status.FAILED);
+ });
+
+ deletedBlockLog.setScmCommandTimeoutMs(-1L);
+ DatanodeDeletedBlockTransactions transactions3 =
+ deletedBlockLog.getTransactions(Integer.MAX_VALUE,
+ new HashSet<>(dnList));
+ assertNoDuplicateTransactions(transactions, transactions2);
+ assertContainsAllTransactions(transactions3, transactions);
+ assertContainsAllTransactions(transactions3, transactions2);
+ }
+
@Test
public void testDNOnlyOneNodeHealthy() throws Exception {
Map<Long, List<Long>> deletedBlocks = generateData(50);
@@ -496,9 +678,7 @@ public class TestDeletedBlockLog {
// For the first 30 txn, deletedBlockLog only has the txn from dn1 and dn2
// For the rest txn, txn will be got from all dns.
// Committed txn will be: 1-40. 1-40. 31-40
- commitTransactions(deletedBlockLog.getTransactions(
- 30 * BLOCKS_PER_TXN * THREE,
- dnList.stream().collect(Collectors.toSet())));
+ commitTransactions(getTransactions(30 * BLOCKS_PER_TXN * THREE));
// The rest txn shall be: 41-50. 41-50. 41-50
List<DeletedBlocksTransaction> blocks = getAllTransactions();
@@ -590,6 +770,7 @@ public class TestDeletedBlockLog {
@Test
public void testDeletedBlockTransactions()
throws IOException, TimeoutException {
+ deletedBlockLog.setScmCommandTimeoutMs(Long.MAX_VALUE);
mockContainerHealthResult(true);
int txNum = 10;
List<DeletedBlocksTransaction> blocks;
@@ -622,9 +803,21 @@ public class TestDeletedBlockLog {
// add two transactions for same container
containerID = blocks.get(0).getContainerID();
Map<Long, List<Long>> deletedBlocksMap = new HashMap<>();
- deletedBlocksMap.put(containerID, new LinkedList<>());
+ Random random = new Random();
+ long localId = random.nextLong();
+ deletedBlocksMap.put(containerID, new LinkedList<>(
+ Collections.singletonList(localId)));
addTransactions(deletedBlocksMap, true);
+ blocks = getTransactions(txNum * BLOCKS_PER_TXN * ONE);
+ // Only newly added Blocks will be sent, as previously sent transactions
+ // that have not yet timed out will not be sent.
+ Assertions.assertEquals(1, blocks.size());
+ Assertions.assertEquals(1, blocks.get(0).getLocalIDCount());
+ Assertions.assertEquals(blocks.get(0).getLocalID(0), localId);
+ // Lets the SCM delete the transaction and wait for the DN reply
+ // to timeout, thus allowing the transaction to resend the
+ deletedBlockLog.setScmCommandTimeoutMs(-1L);
// get should return two transactions for the same container
blocks = getTransactions(txNum * BLOCKS_PER_TXN * ONE);
Assertions.assertEquals(2, blocks.size());
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestSCMDeleteBlocksCommandStatusManager.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestSCMDeleteBlocksCommandStatusManager.java
new file mode 100644
index 0000000000..888cb42fd7
--- /dev/null
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestSCMDeleteBlocksCommandStatusManager.java
@@ -0,0 +1,256 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.block;
+
+import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.BeforeEach;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+import static
org.apache.hadoop.hdds.scm.block.SCMDeletedBlockTransactionStatusManager.SCMDeleteBlocksCommandStatusManager;
+import static
org.apache.hadoop.hdds.scm.block.SCMDeletedBlockTransactionStatusManager.SCMDeleteBlocksCommandStatusManager.CmdStatus.SENT;
+import static
org.apache.hadoop.hdds.scm.block.SCMDeletedBlockTransactionStatusManager.SCMDeleteBlocksCommandStatusManager.CmdStatus.TO_BE_SENT;
+import static
org.apache.hadoop.hdds.scm.block.SCMDeletedBlockTransactionStatusManager.SCMDeleteBlocksCommandStatusManager.CmdStatusData;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+/**
+ * A test for SCMDeleteBlocksCommandStatusManager.
+ */
+public class TestSCMDeleteBlocksCommandStatusManager {
+
+ private SCMDeleteBlocksCommandStatusManager manager;
+ private UUID dnId1;
+ private UUID dnId2;
+ private long scmCmdId1;
+ private long scmCmdId2;
+ private long scmCmdId3;
+ private long scmCmdId4;
+ private Set<Long> deletedBlocksTxIds1;
+ private Set<Long> deletedBlocksTxIds2;
+ private Set<Long> deletedBlocksTxIds3;
+ private Set<Long> deletedBlocksTxIds4;
+
+ @BeforeEach
+ public void setup() throws Exception {
+ manager = new SCMDeleteBlocksCommandStatusManager();
+ // Create test data
+ dnId1 = UUID.randomUUID();
+ dnId2 = UUID.randomUUID();
+ scmCmdId1 = 1L;
+ scmCmdId2 = 2L;
+ scmCmdId3 = 3L;
+ scmCmdId4 = 4L;
+ deletedBlocksTxIds1 = new HashSet<>();
+ deletedBlocksTxIds1.add(100L);
+ deletedBlocksTxIds2 = new HashSet<>();
+ deletedBlocksTxIds2.add(200L);
+ deletedBlocksTxIds3 = new HashSet<>();
+ deletedBlocksTxIds3.add(300L);
+ deletedBlocksTxIds4 = new HashSet<>();
+ deletedBlocksTxIds4.add(400L);
+ }
+
+ @Test
+ public void testRecordScmCommand() {
+ CmdStatusData statusData =
+ SCMDeleteBlocksCommandStatusManager.createScmCmdStatusData(
+ dnId1, scmCmdId1, deletedBlocksTxIds1);
+
+ manager.recordScmCommand(statusData);
+
+ assertNotNull(manager.getScmCmdStatusRecord().get(dnId1));
+ assertEquals(1, manager.getScmCmdStatusRecord().get(dnId1).size());
+ CmdStatusData cmdStatusData =
+ manager.getScmCmdStatusRecord().get(dnId1).get(scmCmdId1);
+ assertNotNull(cmdStatusData);
+ assertEquals(dnId1, statusData.getDnId());
+ assertEquals(scmCmdId1, statusData.getScmCmdId());
+ assertEquals(deletedBlocksTxIds1, statusData.getDeletedBlocksTxIds());
+ // The default status is `CmdStatus.TO_BE_SENT`
+ assertEquals(TO_BE_SENT, statusData.getStatus());
+ }
+
+ @Test
+ public void testOnSent() {
+ CmdStatusData statusData =
+ SCMDeleteBlocksCommandStatusManager.createScmCmdStatusData(
+ dnId1, scmCmdId1, deletedBlocksTxIds1);
+ manager.recordScmCommand(statusData);
+
+ Map<Long, CmdStatusData> dnStatusRecord =
+ manager.getScmCmdStatusRecord().get(dnId1);
+ // After the Command is sent by SCM, the status of the Command
+ // will change from TO_BE_SENT to SENT
+ assertEquals(TO_BE_SENT, dnStatusRecord.get(scmCmdId1).getStatus());
+ manager.onSent(dnId1, scmCmdId1);
+ assertEquals(SENT, dnStatusRecord.get(scmCmdId1).getStatus());
+ }
+
+ @Test
+ public void testUpdateStatusByDNCommandStatus() {
+ // Test all Status update by Datanode Heartbeat report.
+ // SENT -> PENDING_EXECUTED: The DeleteBlocksCommand is sent and received
+ // by the Datanode, but the command is not executed by the Datanode,
+ // the command is waiting to be executed.
+
+ // SENT -> NEED_RESEND: The DeleteBlocksCommand is sent and lost before
+ // it is received by the DN.
+ // SENT -> EXECUTED: The DeleteBlocksCommand has been sent to Datanode,
+ // executed by DN, and executed successfully.
+ //
+ // PENDING_EXECUTED -> PENDING_EXECUTED: The DeleteBlocksCommand continues
+ // to wait to be executed by Datanode.
+ // PENDING_EXECUTED -> NEED_RESEND: The DeleteBlocksCommand waited for a
+ // while and was executed, but the execution failed; Or the
+ // DeleteBlocksCommand was lost while waiting(such as the Datanode
restart)
+ //
+ // PENDING_EXECUTED -> EXECUTED: The Command waits for a period of
+ // time on the DN and is executed successfully.
+
+ recordAndSentCommand(manager, dnId1,
+ Arrays.asList(scmCmdId1, scmCmdId2, scmCmdId3, scmCmdId4),
+ Arrays.asList(deletedBlocksTxIds1, deletedBlocksTxIds2,
+ deletedBlocksTxIds3, deletedBlocksTxIds4));
+
+ Map<Long, CmdStatusData> dnStatusRecord =
+ manager.getScmCmdStatusRecord().get(dnId1);
+ assertEquals(SENT, dnStatusRecord.get(scmCmdId1).getStatus());
+ assertEquals(SENT, dnStatusRecord.get(scmCmdId2).getStatus());
+ assertEquals(SENT, dnStatusRecord.get(scmCmdId3).getStatus());
+ assertEquals(SENT, dnStatusRecord.get(scmCmdId4).getStatus());
+
+ // SENT -> PENDING_EXECUTED
+ manager.updateStatusByDNCommandStatus(dnId1, scmCmdId1,
+ StorageContainerDatanodeProtocolProtos.CommandStatus.Status.PENDING);
+ // SENT -> EXECUTED
+ manager.updateStatusByDNCommandStatus(dnId1, scmCmdId2,
+ StorageContainerDatanodeProtocolProtos.CommandStatus.Status.EXECUTED);
+ // SENT -> NEED_RESEND
+ manager.updateStatusByDNCommandStatus(dnId1, scmCmdId3,
+ StorageContainerDatanodeProtocolProtos.CommandStatus.Status.FAILED);
+ // SENT -> PENDING_EXECUTED
+ manager.updateStatusByDNCommandStatus(dnId1, scmCmdId4,
+ StorageContainerDatanodeProtocolProtos.CommandStatus.Status.PENDING);
+
+ assertEquals(SENT, dnStatusRecord.get(scmCmdId1).getStatus());
+ assertNull(dnStatusRecord.get(scmCmdId2));
+ assertNull(dnStatusRecord.get(scmCmdId3));
+ assertEquals(SENT, dnStatusRecord.get(scmCmdId4).getStatus());
+ }
+
+ @Test
+ public void testCleanSCMCommandForDn() {
+ // Transactions in states EXECUTED and NEED_RESEND will be cleaned up
+ // directly, while transactions in states PENDING_EXECUTED and SENT
+ // will be cleaned up after timeout
+ recordAndSentCommand(manager, dnId1,
+ Arrays.asList(scmCmdId1, scmCmdId2, scmCmdId3, scmCmdId4),
+ Arrays.asList(deletedBlocksTxIds1, deletedBlocksTxIds2,
+ deletedBlocksTxIds3, deletedBlocksTxIds4));
+
+ // SENT -> PENDING_EXECUTED
+ manager.updateStatusByDNCommandStatus(dnId1, scmCmdId1,
+ StorageContainerDatanodeProtocolProtos.CommandStatus.Status.PENDING);
+ // SENT -> EXECUTED
+ manager.updateStatusByDNCommandStatus(dnId1, scmCmdId2,
+ StorageContainerDatanodeProtocolProtos.CommandStatus.Status.EXECUTED);
+ // SENT -> NEED_RESEND
+ manager.updateStatusByDNCommandStatus(dnId1, scmCmdId3,
+ StorageContainerDatanodeProtocolProtos.CommandStatus.Status.FAILED);
+
+ Map<Long, CmdStatusData> dnStatusRecord =
+ manager.getScmCmdStatusRecord().get(dnId1);
+ assertNotNull(dnStatusRecord.get(scmCmdId1));
+ assertNull(dnStatusRecord.get(scmCmdId2));
+ assertNull(dnStatusRecord.get(scmCmdId3));
+ assertNotNull(dnStatusRecord.get(scmCmdId4));
+
+ manager.cleanTimeoutSCMCommand(dnId1, Long.MAX_VALUE);
+
+ // scmCmdId1 is PENDING_EXECUTED will be cleaned up after timeout
+ assertNotNull(dnStatusRecord.get(scmCmdId1));
+ assertNull(dnStatusRecord.get(scmCmdId3));
+ assertNull(dnStatusRecord.get(scmCmdId2));
+ // scmCmdId4 is SENT will be cleaned up after timeout
+ assertNotNull(dnStatusRecord.get(scmCmdId4));
+
+ manager.cleanTimeoutSCMCommand(dnId1, -1);
+ assertNull(dnStatusRecord.get(scmCmdId1));
+ assertNull(dnStatusRecord.get(scmCmdId4));
+ }
+
+ @Test
+ public void testCleanAllTimeoutSCMCommand() {
+ // Test All EXECUTED and NEED_RESEND status in the DN will be cleaned up
+
+ // Transactions in states EXECUTED and NEED_RESEND will be cleaned up
+ // directly, while transactions in states PENDING_EXECUTED and SENT
+ // will be cleaned up after timeout
+ recordAndSentCommand(manager, dnId1, Arrays.asList(scmCmdId1),
+ Arrays.asList(deletedBlocksTxIds1));
+ recordAndSentCommand(manager, dnId2, Arrays.asList(scmCmdId2),
+ Arrays.asList(deletedBlocksTxIds2));
+
+ Map<Long, CmdStatusData> dn1StatusRecord =
+ manager.getScmCmdStatusRecord().get(dnId1);
+ Map<Long, CmdStatusData> dn2StatusRecord =
+ manager.getScmCmdStatusRecord().get(dnId2);
+
+ // Only let the scmCmdId1 have a Heartbeat report, its status will be
+ // updated, the scmCmdId2 still in SENT status.
+ // SENT -> PENDING_EXECUTED
+ manager.updateStatusByDNCommandStatus(dnId1, scmCmdId1,
+ StorageContainerDatanodeProtocolProtos.CommandStatus.Status.PENDING);
+
+ manager.cleanAllTimeoutSCMCommand(Long.MAX_VALUE);
+ // scmCmdId1 is PENDING_EXECUTED will be cleaned up after timeout
+ assertNotNull(dn1StatusRecord.get(scmCmdId1));
+ assertNotNull(dn2StatusRecord.get(scmCmdId2));
+
+ // scmCmdId2 is SENT will be cleaned up after timeout
+ manager.cleanAllTimeoutSCMCommand(-1);
+ assertNull(dn1StatusRecord.get(scmCmdId1));
+ assertNull(dn2StatusRecord.get(scmCmdId2));
+
+ }
+
+ private void recordAndSentCommand(
+ SCMDeleteBlocksCommandStatusManager statusManager,
+ UUID dnId, List<Long> scmCmdIds, List<Set<Long>> txIds) {
+ assertEquals(scmCmdIds.size(), txIds.size());
+ for (int i = 0; i < scmCmdIds.size(); i++) {
+ long scmCmdId = scmCmdIds.get(i);
+ Set<Long> deletedBlocksTxIds = txIds.get(i);
+ CmdStatusData statusData =
+ SCMDeleteBlocksCommandStatusManager.createScmCmdStatusData(
+ dnId, scmCmdId, deletedBlocksTxIds);
+ statusManager.recordScmCommand(statusData);
+ statusManager.onSent(dnId, scmCmdId);
+ }
+ }
+
+}
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java
index 168fdd11a5..0f65cdb108 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java
@@ -47,6 +47,7 @@ import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.StorageReportProto;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.HddsTestUtils;
+import org.apache.hadoop.hdds.scm.block.DeletedBlockLog;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerManager;
import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
@@ -90,6 +91,7 @@ public class TestDeadNodeHandler {
private EventQueue eventQueue;
private String storageDir;
private SCMContext scmContext;
+ private DeletedBlockLog deletedBlockLog;
@BeforeEach
public void setup() throws IOException, AuthenticationException {
@@ -117,8 +119,9 @@ public class TestDeadNodeHandler {
pipelineManager.setPipelineProvider(RATIS,
mockRatisProvider);
containerManager = scm.getContainerManager();
+ deletedBlockLog = Mockito.mock(DeletedBlockLog.class);
deadNodeHandler = new DeadNodeHandler(nodeManager,
- Mockito.mock(PipelineManager.class), containerManager);
+ Mockito.mock(PipelineManager.class), containerManager,
deletedBlockLog);
healthyReadOnlyNodeHandler =
new HealthyReadOnlyNodeHandler(nodeManager,
pipelineManager);
@@ -134,6 +137,7 @@ public class TestDeadNodeHandler {
}
@Test
+ @SuppressWarnings("checkstyle:MethodLength")
public void testOnMessage() throws Exception {
//GIVEN
DatanodeDetails datanode1 = MockDatanodeDetails.randomDatanodeDetails();
@@ -233,6 +237,9 @@ public class TestDeadNodeHandler {
Assertions.assertFalse(
nodeManager.getClusterNetworkTopologyMap().contains(datanode1));
+ Mockito.verify(deletedBlockLog, Mockito.times(0))
+ .onDatanodeDead(datanode1.getUuid());
+
Set<ContainerReplica> container1Replicas = containerManager
.getContainerReplicas(ContainerID.valueOf(container1.getContainerID()));
Assertions.assertEquals(2, container1Replicas.size());
@@ -260,6 +267,9 @@ public class TestDeadNodeHandler {
Assertions.assertEquals(0,
nodeManager.getCommandQueueCount(datanode1.getUuid(), cmd.getType()));
+ Mockito.verify(deletedBlockLog, Mockito.times(1))
+ .onDatanodeDead(datanode1.getUuid());
+
container1Replicas = containerManager
.getContainerReplicas(ContainerID.valueOf(container1.getContainerID()));
Assertions.assertEquals(1, container1Replicas.size());
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
index e3da551c3e..a3decb0efb 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
@@ -954,16 +954,17 @@ public class TestSCMNodeManager {
@Test
public void testProcessCommandQueueReport()
- throws IOException, NodeNotFoundException {
+ throws IOException, NodeNotFoundException, AuthenticationException {
OzoneConfiguration conf = new OzoneConfiguration();
SCMStorageConfig scmStorageConfig = mock(SCMStorageConfig.class);
when(scmStorageConfig.getClusterID()).thenReturn("xyz111");
EventPublisher eventPublisher = mock(EventPublisher.class);
HDDSLayoutVersionManager lvm =
new HDDSLayoutVersionManager(scmStorageConfig.getLayoutVersion());
+ createNodeManager(getConf());
SCMNodeManager nodeManager = new SCMNodeManager(conf,
scmStorageConfig, eventPublisher, new NetworkTopologyImpl(conf),
- SCMContext.emptyContext(), lvm);
+ scmContext, lvm);
LayoutVersionProto layoutInfo = toLayoutVersionProto(
lvm.getMetadataLayoutVersion(), lvm.getSoftwareLayoutVersion());
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]