This is an automated email from the ASF dual-hosted git repository.
adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new ee6a4ad99e HDDS-9674. Read from non-datanode host does not consider
topology (#5610)
ee6a4ad99e is described below
commit ee6a4ad99e212c9809566c9f6a246b5e60007ae4
Author: Doroszlai, Attila <[email protected]>
AuthorDate: Tue Nov 28 15:20:04 2023 +0100
HDDS-9674. Read from non-datanode host does not consider topology (#5610)
---
.../hadoop/hdds/scm/node/SCMNodeManager.java | 62 +++++++++-------------
.../hdds/scm/server/SCMBlockProtocolServer.java | 40 +++++++++++---
.../hdds/scm/server/StorageContainerManager.java | 34 +++++++++++-
.../scm/server/TestSCMBlockProtocolServer.java | 45 +++++++++++-----
4 files changed, 123 insertions(+), 58 deletions(-)
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
index 3103d5a7d4..167b25afd0 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
@@ -54,9 +54,6 @@ import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.hdds.upgrade.HDDSLayoutVersionManager;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.metrics2.util.MBeans;
-import org.apache.hadoop.net.CachedDNSToSwitchMapping;
-import org.apache.hadoop.net.DNSToSwitchMapping;
-import org.apache.hadoop.net.TableMapping;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.protocol.VersionResponse;
import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
@@ -65,7 +62,6 @@ import
org.apache.hadoop.ozone.protocol.commands.RefreshVolumeUsageCommand;
import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import
org.apache.hadoop.ozone.protocol.commands.SetNodeOperationalStateCommand;
-import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Time;
import org.apache.ratis.protocol.exceptions.NotLeaderException;
import org.slf4j.Logger;
@@ -88,6 +84,7 @@ import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Function;
import java.util.stream.Collectors;
import static org.apache.hadoop.hdds.protocol.DatanodeDetails.Port.Name.HTTP;
@@ -123,7 +120,7 @@ public class SCMNodeManager implements NodeManager {
private ObjectName nmInfoBean;
private final SCMStorageConfig scmStorageConfig;
private final NetworkTopology clusterMap;
- private final DNSToSwitchMapping dnsToSwitchMapping;
+ private final Function<String, String> nodeResolver;
private final boolean useHostname;
private final Map<String, Set<UUID>> dnsToUuidMap = new
ConcurrentHashMap<>();
private final int numPipelinesPerMetadataVolume;
@@ -142,12 +139,25 @@ public class SCMNodeManager implements NodeManager {
/**
* Constructs SCM machine Manager.
*/
- public SCMNodeManager(OzoneConfiguration conf,
- SCMStorageConfig scmStorageConfig,
- EventPublisher eventPublisher,
- NetworkTopology networkTopology,
- SCMContext scmContext,
- HDDSLayoutVersionManager layoutVersionManager) {
+ public SCMNodeManager(
+ OzoneConfiguration conf,
+ SCMStorageConfig scmStorageConfig,
+ EventPublisher eventPublisher,
+ NetworkTopology networkTopology,
+ SCMContext scmContext,
+ HDDSLayoutVersionManager layoutVersionManager) {
+ this(conf, scmStorageConfig, eventPublisher, networkTopology, scmContext,
+ layoutVersionManager, hostname -> null);
+ }
+
+ public SCMNodeManager(
+ OzoneConfiguration conf,
+ SCMStorageConfig scmStorageConfig,
+ EventPublisher eventPublisher,
+ NetworkTopology networkTopology,
+ SCMContext scmContext,
+ HDDSLayoutVersionManager layoutVersionManager,
+ Function<String, String> nodeResolver) {
this.scmNodeEventPublisher = eventPublisher;
this.nodeStateManager = new NodeStateManager(conf, eventPublisher,
layoutVersionManager, scmContext);
@@ -159,15 +169,7 @@ public class SCMNodeManager implements NodeManager {
registerMXBean();
this.metrics = SCMNodeMetrics.create(this);
this.clusterMap = networkTopology;
- Class<? extends DNSToSwitchMapping> dnsToSwitchMappingClass =
- conf.getClass(
- DFSConfigKeysLegacy.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
- TableMapping.class, DNSToSwitchMapping.class);
- DNSToSwitchMapping newInstance = ReflectionUtils.newInstance(
- dnsToSwitchMappingClass, conf);
- this.dnsToSwitchMapping =
- ((newInstance instanceof CachedDNSToSwitchMapping) ? newInstance
- : new CachedDNSToSwitchMapping(newInstance));
+ this.nodeResolver = nodeResolver;
this.useHostname = conf.getBoolean(
DFSConfigKeysLegacy.DFS_DATANODE_USE_DN_HOSTNAME,
DFSConfigKeysLegacy.DFS_DATANODE_USE_DN_HOSTNAME_DEFAULT);
@@ -383,7 +385,8 @@ public class SCMNodeManager implements NodeManager {
final String ipAddress = datanodeDetails.getIpAddress();
final String hostName = datanodeDetails.getHostName();
datanodeDetails.setNetworkName(datanodeDetails.getUuidString());
- String networkLocation = nodeResolve(useHostname ? hostName : ipAddress);
+ String networkLocation = nodeResolver.apply(
+ useHostname ? hostName : ipAddress);
if (networkLocation != null) {
datanodeDetails.setNetworkLocation(networkLocation);
}
@@ -1412,23 +1415,6 @@ public class SCMNodeManager implements NodeManager {
}
}
- private String nodeResolve(String hostname) {
- List<String> hosts = new ArrayList<>(1);
- hosts.add(hostname);
- List<String> resolvedHosts = dnsToSwitchMapping.resolve(hosts);
- if (resolvedHosts != null && !resolvedHosts.isEmpty()) {
- String location = resolvedHosts.get(0);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Resolve datanode {} return location {}", hostname,
location);
- }
- return location;
- } else {
- LOG.error("Node {} Resolution failed. Please make sure that DNS table " +
- "mapping or configured mapping is functional.", hostname);
- return null;
- }
- }
-
/**
* Test utility to stop heartbeat check process.
*
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java
index 8d29d5eeb8..d2d8d0a63a 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java
@@ -42,7 +42,9 @@ import
org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
import org.apache.hadoop.hdds.scm.container.common.helpers.DeleteBlockResult;
import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.hdds.scm.net.InnerNode;
import org.apache.hadoop.hdds.scm.net.Node;
+import org.apache.hadoop.hdds.scm.net.NodeImpl;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
import org.apache.hadoop.hdds.scm.protocolPB.ScmBlockLocationProtocolPB;
@@ -69,6 +71,7 @@ import com.google.protobuf.ProtocolMessageEnum;
import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_HANDLER_COUNT_DEFAULT;
import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_HANDLER_COUNT_KEY;
import static
org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes.IO_EXCEPTION;
+import static org.apache.hadoop.hdds.scm.net.NetConstants.NODE_COST_DEFAULT;
import static
org.apache.hadoop.hdds.scm.server.StorageContainerManager.startRpcServer;
import static org.apache.hadoop.hdds.server.ServerUtils.getRemoteUserName;
import static org.apache.hadoop.hdds.server.ServerUtils.updateRPCListenAddress;
@@ -345,14 +348,13 @@ public class SCMBlockProtocolServer implements
auditMap.put("nodes", String.valueOf(nodes));
try {
NodeManager nodeManager = scm.getScmNodeManager();
- Node client = null;
- List<DatanodeDetails> possibleClients =
- nodeManager.getNodesByAddress(clientMachine);
- if (possibleClients.size() > 0) {
- client = possibleClients.get(0);
+ Node client = getDatanode(clientMachine);
+ // not datanode
+ if (client == null) {
+ client = getOtherNode(clientMachine);
}
- List<Node> nodeList = new ArrayList();
- nodes.stream().forEach(uuid -> {
+ List<Node> nodeList = new ArrayList<>();
+ nodes.forEach(uuid -> {
DatanodeDetails node = nodeManager.getNodeByUuid(uuid);
if (node != null) {
nodeList.add(node);
@@ -377,6 +379,30 @@ public class SCMBlockProtocolServer implements
}
}
+ private Node getDatanode(String clientMachine) {
+ List<DatanodeDetails> datanodes = scm.getScmNodeManager()
+ .getNodesByAddress(clientMachine);
+ return !datanodes.isEmpty() ? datanodes.get(0) : null;
+ }
+
+ private Node getOtherNode(String clientMachine) {
+ try {
+ String clientLocation = scm.resolveNodeLocation(clientMachine);
+ if (clientLocation != null) {
+ Node rack = scm.getClusterMap().getNode(clientLocation);
+ if (rack instanceof InnerNode) {
+ return new NodeImpl(clientMachine, clientLocation,
+ (InnerNode) rack, rack.getLevel() + 1,
+ NODE_COST_DEFAULT);
+ }
+ }
+ } catch (Exception e) {
+ LOG.info("Could not resolve client {}: {}",
+ clientMachine, e.getMessage());
+ }
+ return null;
+ }
+
@Override
public AuditMessage buildAuditMessageForSuccess(
AuditAction op, Map<String, String> auditMap) {
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index 70636215b2..edba7bb6d3 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -25,10 +25,12 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.protobuf.BlockingService;
+import java.util.Collections;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.DFSConfigKeysLegacy;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.HddsUtils;
import org.apache.hadoop.hdds.annotation.InterfaceAudience;
@@ -147,7 +149,10 @@ import
org.apache.hadoop.hdds.utils.LegacyHadoopConfigurationSource;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.metrics2.MetricsSystem;
import org.apache.hadoop.metrics2.util.MBeans;
+import org.apache.hadoop.net.CachedDNSToSwitchMapping;
+import org.apache.hadoop.net.DNSToSwitchMapping;
import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.net.TableMapping;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneSecurityUtil;
import org.apache.hadoop.ozone.common.Storage.StorageState;
@@ -160,6 +165,7 @@ import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
import
org.apache.hadoop.security.authentication.client.AuthenticationException;
+import org.apache.hadoop.util.ReflectionUtils;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.util.ExitUtils;
import org.apache.ratis.util.JvmPauseMonitor;
@@ -319,6 +325,7 @@ public final class StorageContainerManager extends
ServiceRuntimeInfoImpl
private final SecretKeyManagerService secretKeyManagerService;
private Clock systemClock;
+ private DNSToSwitchMapping dnsToSwitchMapping;
/**
* Creates a new StorageContainerManager. Configuration will be
@@ -703,11 +710,22 @@ public final class StorageContainerManager extends
ServiceRuntimeInfoImpl
.build();
}
+ Class<? extends DNSToSwitchMapping> dnsToSwitchMappingClass =
+ conf.getClass(
+ DFSConfigKeysLegacy.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
+ TableMapping.class, DNSToSwitchMapping.class);
+ DNSToSwitchMapping newInstance = ReflectionUtils.newInstance(
+ dnsToSwitchMappingClass, conf);
+ dnsToSwitchMapping =
+ ((newInstance instanceof CachedDNSToSwitchMapping) ? newInstance
+ : new CachedDNSToSwitchMapping(newInstance));
+
if (configurator.getScmNodeManager() != null) {
scmNodeManager = configurator.getScmNodeManager();
} else {
scmNodeManager = new SCMNodeManager(conf, scmStorageConfig, eventQueue,
- clusterMap, scmContext, scmLayoutVersionManager);
+ clusterMap, scmContext, scmLayoutVersionManager,
+ this::resolveNodeLocation);
}
placementMetrics = SCMContainerPlacementMetrics.create();
@@ -2190,4 +2208,18 @@ public final class StorageContainerManager extends
ServiceRuntimeInfoImpl
public ReconfigurationHandler getReconfigurationHandler() {
return reconfigurationHandler;
}
+
+ public String resolveNodeLocation(String hostname) {
+ List<String> hosts = Collections.singletonList(hostname);
+ List<String> resolvedHosts = dnsToSwitchMapping.resolve(hosts);
+ if (resolvedHosts != null && !resolvedHosts.isEmpty()) {
+ String location = resolvedHosts.get(0);
+ LOG.debug("Node {} resolved to location {}", hostname, location);
+ return location;
+ } else {
+ LOG.debug("Node resolution did not yield any result for {}", hostname);
+ return null;
+ }
+ }
+
}
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMBlockProtocolServer.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMBlockProtocolServer.java
index 4ee186323e..af905f4db5 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMBlockProtocolServer.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMBlockProtocolServer.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hdds.scm.server;
+import com.google.common.collect.ImmutableMap;
import org.apache.hadoop.hdds.DFSConfigKeysLegacy;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@@ -43,6 +44,7 @@ import org.mockito.Mockito;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
@@ -61,6 +63,11 @@ public class TestSCMBlockProtocolServer {
private ScmBlockLocationProtocolServerSideTranslatorPB service;
private static final int NODE_COUNT = 10;
+ private static final Map<String, String> EDGE_NODES = ImmutableMap.of(
+ "edge0", "/rack0",
+ "edge1", "/rack1"
+ );
+
@BeforeEach
void setUp(@TempDir File dir) throws Exception {
config = SCMTestUtils.getConf(dir);
@@ -75,6 +82,7 @@ public class TestSCMBlockProtocolServer {
nodeMapping.add(dn.getIpAddress() + "=" + rack);
datanodes.add(dn);
}
+ EDGE_NODES.forEach((n, r) -> nodeMapping.add(n + "=" + r));
config.set(StaticMapping.KEY_HADOOP_CONFIGURED_NODE_MAPPING,
String.join(",", nodeMapping));
@@ -113,18 +121,31 @@ public class TestSCMBlockProtocolServer {
Assertions.assertEquals(dn, sorted.get(0),
"Source node should be sorted very first");
- for (int i = 1; i < NODE_COUNT / 2; i++) {
- DatanodeDetails item = sorted.get(i);
- Assertions.assertEquals(dn.getNetworkLocation(),
- item.getNetworkLocation(),
- "Nodes in the same rack should be sorted first");
- }
- for (int i = NODE_COUNT / 2; i < NODE_COUNT; i++) {
- DatanodeDetails item = sorted.get(i);
- Assertions.assertNotEquals(dn.getNetworkLocation(),
- item.getNetworkLocation(),
- "Nodes in the other rack should be sorted last");
- }
+ assertRackOrder(dn.getNetworkLocation(), sorted);
+ }
+ }
+
+ @Test
+ void sortDatanodesRelativeToNonDatanode() {
+ List<String> datanodes = getNetworkNames();
+
+ for (Map.Entry<String, String> entry : EDGE_NODES.entrySet()) {
+ assertRackOrder(entry.getValue(),
+ server.sortDatanodes(datanodes, entry.getKey()));
+ }
+ }
+
+ private static void assertRackOrder(String rack, List<DatanodeDetails> list)
{
+ int size = list.size();
+
+ for (int i = 0; i < size / 2; i++) {
+ Assertions.assertEquals(rack, list.get(i).getNetworkLocation(),
+ "Nodes in the same rack should be sorted first");
+ }
+
+ for (int i = size / 2; i < size; i++) {
+ Assertions.assertNotEquals(rack, list.get(i).getNetworkLocation(),
+ "Nodes in the other rack should be sorted last");
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]