This is an automated email from the ASF dual-hosted git repository.
adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 43c9565d9d HDDS-8888. Consider Datanode queue capacity when sending
DeleteBlocks command (#4939)
43c9565d9d is described below
commit 43c9565d9d0250af5694d10c02f7634b79206ab6
Author: XiChen <[email protected]>
AuthorDate: Sun Jan 7 05:38:30 2024 +0800
HDDS-8888. Consider Datanode queue capacity when sending DeleteBlocks
command (#4939)
---
.../common/statemachine/DatanodeConfiguration.java | 6 +-
.../hdds/scm/block/SCMBlockDeletingService.java | 41 ++++-
.../scm/block/TestSCMBlockDeletingService.java | 177 +++++++++++++++++++++
3 files changed, 215 insertions(+), 9 deletions(-)
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java
index 3272aedb66..12ac40cabc 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java
@@ -185,7 +185,11 @@ public class DatanodeConfiguration extends
ReconfigurableConfig {
defaultValue = "5",
tags = {DATANODE},
description = "The maximum number of block delete commands queued on " +
- " a datanode"
+ " a datanode, This configuration is also used by the SCM to " +
+ "control whether to send delete commands to the DN. If the DN" +
+ " has more commands waiting in the queue than this value, " +
+ "the SCM will not send any new block delete commands. until the " +
+ "DN has processed some commands and the queue length is reduced."
)
private int blockDeleteQueueLimit = 5;
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java
index 8677baf33b..7271d9dcba 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java
@@ -42,11 +42,13 @@ import org.apache.hadoop.hdds.scm.ha.SCMService;
import org.apache.hadoop.hdds.scm.ha.SCMServiceManager;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.node.NodeStatus;
+import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.hdds.utils.BackgroundService;
import org.apache.hadoop.hdds.utils.BackgroundTask;
import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
import org.apache.hadoop.hdds.utils.BackgroundTaskResult.EmptyTaskResult;
+import
org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
import org.apache.hadoop.util.Time;
@@ -90,6 +92,7 @@ public class SCMBlockDeletingService extends BackgroundService
private long safemodeExitMillis = 0;
private final long safemodeExitRunDelayMillis;
+ private final long deleteBlocksPendingCommandLimit;
private final Clock clock;
@SuppressWarnings("parameternumber")
@@ -110,6 +113,9 @@ public class SCMBlockDeletingService extends
BackgroundService
HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT,
HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT_DEFAULT,
TimeUnit.MILLISECONDS);
+ DatanodeConfiguration dnConf =
+ conf.getObject(DatanodeConfiguration.class);
+ this.deleteBlocksPendingCommandLimit = dnConf.getBlockDeleteQueueLimit();
this.clock = clock;
this.deletedBlockLog = deletedBlockLog;
this.nodeManager = nodeManager;
@@ -155,13 +161,12 @@ public class SCMBlockDeletingService extends
BackgroundService
List<DatanodeDetails> datanodes =
nodeManager.getNodes(NodeStatus.inServiceHealthy());
if (datanodes != null) {
- // When DN node is healthy and in-service, and previous commands
- // are handled for deleteBlocks Type, then it will be considered
- // in this iteration
- final Set<DatanodeDetails> included = datanodes.stream().filter(
- dn -> nodeManager.getCommandQueueCount(dn.getUuid(),
- Type.deleteBlocksCommand) == 0).collect(Collectors.toSet());
try {
+ // When DN node is healthy and in-service, and their number of
+ // 'deleteBlocks' type commands is below the limit.
+ // These nodes will be considered for this iteration.
+ final Set<DatanodeDetails> included =
+ getDatanodesWithinCommandLimit(datanodes);
DatanodeDeletedBlockTransactions transactions =
deletedBlockLog.getTransactions(getBlockDeleteTXNum(), included);
@@ -205,7 +210,8 @@ public class SCMBlockDeletingService extends
BackgroundService
deletedBlockLog.incrementCount(new ArrayList<>(processedTxIDs));
} catch (NotLeaderException nle) {
LOG.warn("Skip current run, since not leader any more.", nle);
- return EmptyTaskResult.newResult();
+ } catch (NodeNotFoundException e) {
+ LOG.error("Datanode not found in NodeManager. Should not happen", e);
} catch (IOException e) {
// We may tolerate a number of failures for sometime
// but if it continues to fail, at some point we need to raise
@@ -213,7 +219,6 @@ public class SCMBlockDeletingService extends
BackgroundService
// continues to retry the scanning.
LOG.error("Failed to get block deletion transactions from delTX log",
e);
- return EmptyTaskResult.newResult();
}
}
@@ -283,4 +288,24 @@ public class SCMBlockDeletingService extends
BackgroundService
public ScmBlockDeletingServiceMetrics getMetrics() {
return this.metrics;
}
+
+ /**
+ * Filters and returns a set of healthy datanodes that have not exceeded
+ * the deleteBlocksPendingCommandLimit.
+ *
+ * @param datanodes a list of DatanodeDetails
+ * @return a set of filtered DatanodeDetails
+ */
+ @VisibleForTesting
+ protected Set<DatanodeDetails> getDatanodesWithinCommandLimit(
+ List<DatanodeDetails> datanodes) throws NodeNotFoundException {
+ final Set<DatanodeDetails> included = new HashSet<>();
+ for (DatanodeDetails dn : datanodes) {
+ if (nodeManager.getTotalDatanodeCommandCount(dn,
Type.deleteBlocksCommand) < deleteBlocksPendingCommandLimit
+ && nodeManager.getCommandQueueCount(dn.getUuid(),
Type.deleteBlocksCommand) < 2) {
+ included.add(dn);
+ }
+ }
+ return included;
+ }
}
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestSCMBlockDeletingService.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestSCMBlockDeletingService.java
new file mode 100644
index 0000000000..3bd7ad00f6
--- /dev/null
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestSCMBlockDeletingService.java
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.block;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.conf.ReconfigurationHandler;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
+import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type;
+import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
+import org.apache.hadoop.hdds.scm.ScmConfig;
+import org.apache.hadoop.hdds.scm.events.SCMEvents;
+import org.apache.hadoop.hdds.scm.ha.SCMContext;
+import org.apache.hadoop.hdds.scm.ha.SCMServiceManager;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.scm.node.NodeStatus;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+import
org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
+import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+
+import java.time.Clock;
+import java.time.ZoneOffset;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anySet;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Test SCMBlockDeletingService.
+ */
+public class TestSCMBlockDeletingService {
+ private SCMBlockDeletingService service;
+ private EventPublisher eventPublisher;
+ private List<DatanodeDetails> datanodeDetails;
+ private OzoneConfiguration conf;
+ private NodeManager nodeManager;
+ private ScmBlockDeletingServiceMetrics metrics;
+
+ @BeforeEach
+ public void setup() throws Exception {
+ nodeManager = mock(NodeManager.class);
+ eventPublisher = mock(EventPublisher.class);
+ conf = new OzoneConfiguration();
+ metrics = ScmBlockDeletingServiceMetrics.create();
+ when(nodeManager.getTotalDatanodeCommandCount(any(),
+ any())).thenReturn(0);
+ SCMServiceManager scmServiceManager = mock(SCMServiceManager.class);
+ SCMContext scmContext = mock(SCMContext.class);
+
+ DatanodeDeletedBlockTransactions ddbt =
+ new DatanodeDeletedBlockTransactions();
+ DatanodeDetails datanode1 = MockDatanodeDetails.randomDatanodeDetails();
+ DatanodeDetails datanode2 = MockDatanodeDetails.randomDatanodeDetails();
+ DatanodeDetails datanode3 = MockDatanodeDetails.randomDatanodeDetails();
+ datanodeDetails = Arrays.asList(datanode1, datanode2, datanode3);
+ when(nodeManager.getNodes(NodeStatus.inServiceHealthy())).thenReturn(
+ datanodeDetails);
+ DeletedBlocksTransaction tx1 = createTestDeleteTxn(1, Arrays.asList(1L),
1);
+ ddbt.addTransactionToDN(datanode1.getUuid(), tx1);
+ ddbt.addTransactionToDN(datanode2.getUuid(), tx1);
+ ddbt.addTransactionToDN(datanode3.getUuid(), tx1);
+ DeletedBlockLog mockDeletedBlockLog = mock(DeletedBlockLog.class);
+ when(mockDeletedBlockLog.getTransactions(
+ anyInt(), anySet())).thenReturn(ddbt);
+
+ service = spy(new SCMBlockDeletingService(
+ mockDeletedBlockLog, nodeManager, eventPublisher, scmContext,
+ scmServiceManager, conf, conf.getObject(ScmConfig.class), metrics,
Clock.system(
+ ZoneOffset.UTC), mock(ReconfigurationHandler.class)));
+ when(service.shouldRun()).thenReturn(true);
+ }
+
+ @AfterEach
+ public void stop() {
+ service.stop();
+ ScmBlockDeletingServiceMetrics.unRegister();
+ }
+
+ @Test
+ public void testCall() throws Exception {
+ callDeletedBlockTransactionScanner();
+
+ ArgumentCaptor<CommandForDatanode> argumentCaptor =
+ ArgumentCaptor.forClass(CommandForDatanode.class);
+
+ // Three Datanode is healthy and in-service, and the task queue is empty,
+ // so the transaction will send to all three Datanode
+ verify(eventPublisher, times(3)).fireEvent(
+ eq(SCMEvents.DATANODE_COMMAND), argumentCaptor.capture());
+ List<CommandForDatanode> actualCommands = argumentCaptor.getAllValues();
+ List<UUID> actualDnIds = actualCommands.stream()
+ .map(CommandForDatanode::getDatanodeId)
+ .collect(Collectors.toList());
+ Set<UUID> expectedDnIdsSet = datanodeDetails.stream()
+ .map(DatanodeDetails::getUuid).collect(Collectors.toSet());
+
+ assertEquals(expectedDnIdsSet, new HashSet<>(actualDnIds));
+ assertEquals(datanodeDetails.size(),
+ metrics.getNumBlockDeletionCommandSent());
+ // Echo Command has one Transaction
+ assertEquals(datanodeDetails.size() * 1,
+ metrics.getNumBlockDeletionTransactionSent());
+ }
+
+ private void callDeletedBlockTransactionScanner() throws Exception {
+ service.getTasks().poll().call();
+ }
+
+ @Test
+ public void testLimitCommandSending() throws Exception {
+ DatanodeConfiguration dnConf =
+ conf.getObject(DatanodeConfiguration.class);
+ int pendingCommandLimit = dnConf.getBlockDeleteQueueLimit();
+
+ // The number of commands pending on all Datanodes has reached the limit.
+ when(nodeManager.getTotalDatanodeCommandCount(any(),
+ any())).thenReturn(pendingCommandLimit);
+ assertEquals(0,
+ service.getDatanodesWithinCommandLimit(datanodeDetails).size());
+
+ // The number of commands pending on all Datanodes is 0
+ when(nodeManager.getTotalDatanodeCommandCount(any(),
+ any())).thenReturn(0);
+ assertEquals(datanodeDetails.size(),
+ service.getDatanodesWithinCommandLimit(datanodeDetails).size());
+
+ // The number of commands pending on first Datanodes has reached the limit.
+ DatanodeDetails fullDatanode = datanodeDetails.get(0);
+ when(nodeManager.getTotalDatanodeCommandCount(fullDatanode,
+ Type.deleteBlocksCommand)).thenReturn(pendingCommandLimit);
+ Set<DatanodeDetails> includeNodes =
+ service.getDatanodesWithinCommandLimit(datanodeDetails);
+ assertEquals(datanodeDetails.size() - 1,
+ includeNodes.size());
+ assertFalse(includeNodes.contains(fullDatanode));
+ }
+
+ private DeletedBlocksTransaction createTestDeleteTxn(
+ long txnID, List<Long> blocks, long containerID) {
+ return DeletedBlocksTransaction.newBuilder().setTxID(txnID)
+ .setContainerID(containerID).addAllLocalID(blocks).setCount(0).build();
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]