This is an automated email from the ASF dual-hosted git repository.

adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new ee6a4ad99e HDDS-9674. Read from non-datanode host does not consider 
topology (#5610)
ee6a4ad99e is described below

commit ee6a4ad99e212c9809566c9f6a246b5e60007ae4
Author: Doroszlai, Attila <[email protected]>
AuthorDate: Tue Nov 28 15:20:04 2023 +0100

    HDDS-9674. Read from non-datanode host does not consider topology (#5610)
---
 .../hadoop/hdds/scm/node/SCMNodeManager.java       | 62 +++++++++-------------
 .../hdds/scm/server/SCMBlockProtocolServer.java    | 40 +++++++++++---
 .../hdds/scm/server/StorageContainerManager.java   | 34 +++++++++++-
 .../scm/server/TestSCMBlockProtocolServer.java     | 45 +++++++++++-----
 4 files changed, 123 insertions(+), 58 deletions(-)

diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
index 3103d5a7d4..167b25afd0 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
@@ -54,9 +54,6 @@ import org.apache.hadoop.hdds.server.events.EventPublisher;
 import org.apache.hadoop.hdds.upgrade.HDDSLayoutVersionManager;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.metrics2.util.MBeans;
-import org.apache.hadoop.net.CachedDNSToSwitchMapping;
-import org.apache.hadoop.net.DNSToSwitchMapping;
-import org.apache.hadoop.net.TableMapping;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.protocol.VersionResponse;
 import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
@@ -65,7 +62,6 @@ import 
org.apache.hadoop.ozone.protocol.commands.RefreshVolumeUsageCommand;
 import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand;
 import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
 import 
org.apache.hadoop.ozone.protocol.commands.SetNodeOperationalStateCommand;
-import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.Time;
 import org.apache.ratis.protocol.exceptions.NotLeaderException;
 import org.slf4j.Logger;
@@ -88,6 +84,7 @@ import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 
 import static org.apache.hadoop.hdds.protocol.DatanodeDetails.Port.Name.HTTP;
@@ -123,7 +120,7 @@ public class SCMNodeManager implements NodeManager {
   private ObjectName nmInfoBean;
   private final SCMStorageConfig scmStorageConfig;
   private final NetworkTopology clusterMap;
-  private final DNSToSwitchMapping dnsToSwitchMapping;
+  private final Function<String, String> nodeResolver;
   private final boolean useHostname;
   private final Map<String, Set<UUID>> dnsToUuidMap = new 
ConcurrentHashMap<>();
   private final int numPipelinesPerMetadataVolume;
@@ -142,12 +139,25 @@ public class SCMNodeManager implements NodeManager {
   /**
    * Constructs SCM machine Manager.
    */
-  public SCMNodeManager(OzoneConfiguration conf,
-                        SCMStorageConfig scmStorageConfig,
-                        EventPublisher eventPublisher,
-                        NetworkTopology networkTopology,
-                        SCMContext scmContext,
-                        HDDSLayoutVersionManager layoutVersionManager) {
+  public SCMNodeManager(
+      OzoneConfiguration conf,
+      SCMStorageConfig scmStorageConfig,
+      EventPublisher eventPublisher,
+      NetworkTopology networkTopology,
+      SCMContext scmContext,
+      HDDSLayoutVersionManager layoutVersionManager) {
+    this(conf, scmStorageConfig, eventPublisher, networkTopology, scmContext,
+        layoutVersionManager, hostname -> null);
+  }
+
+  public SCMNodeManager(
+      OzoneConfiguration conf,
+      SCMStorageConfig scmStorageConfig,
+      EventPublisher eventPublisher,
+      NetworkTopology networkTopology,
+      SCMContext scmContext,
+      HDDSLayoutVersionManager layoutVersionManager,
+      Function<String, String> nodeResolver) {
     this.scmNodeEventPublisher = eventPublisher;
     this.nodeStateManager = new NodeStateManager(conf, eventPublisher,
         layoutVersionManager, scmContext);
@@ -159,15 +169,7 @@ public class SCMNodeManager implements NodeManager {
     registerMXBean();
     this.metrics = SCMNodeMetrics.create(this);
     this.clusterMap = networkTopology;
-    Class<? extends DNSToSwitchMapping> dnsToSwitchMappingClass =
-        conf.getClass(
-            DFSConfigKeysLegacy.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
-            TableMapping.class, DNSToSwitchMapping.class);
-    DNSToSwitchMapping newInstance = ReflectionUtils.newInstance(
-        dnsToSwitchMappingClass, conf);
-    this.dnsToSwitchMapping =
-        ((newInstance instanceof CachedDNSToSwitchMapping) ? newInstance
-            : new CachedDNSToSwitchMapping(newInstance));
+    this.nodeResolver = nodeResolver;
     this.useHostname = conf.getBoolean(
         DFSConfigKeysLegacy.DFS_DATANODE_USE_DN_HOSTNAME,
         DFSConfigKeysLegacy.DFS_DATANODE_USE_DN_HOSTNAME_DEFAULT);
@@ -383,7 +385,8 @@ public class SCMNodeManager implements NodeManager {
     final String ipAddress = datanodeDetails.getIpAddress();
     final String hostName = datanodeDetails.getHostName();
     datanodeDetails.setNetworkName(datanodeDetails.getUuidString());
-    String networkLocation = nodeResolve(useHostname ? hostName : ipAddress);
+    String networkLocation = nodeResolver.apply(
+        useHostname ? hostName : ipAddress);
     if (networkLocation != null) {
       datanodeDetails.setNetworkLocation(networkLocation);
     }
@@ -1412,23 +1415,6 @@ public class SCMNodeManager implements NodeManager {
     }
   }
 
-  private String nodeResolve(String hostname) {
-    List<String> hosts = new ArrayList<>(1);
-    hosts.add(hostname);
-    List<String> resolvedHosts = dnsToSwitchMapping.resolve(hosts);
-    if (resolvedHosts != null && !resolvedHosts.isEmpty()) {
-      String location = resolvedHosts.get(0);
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Resolve datanode {} return location {}", hostname, 
location);
-      }
-      return location;
-    } else {
-      LOG.error("Node {} Resolution failed. Please make sure that DNS table " +
-          "mapping or configured mapping is functional.", hostname);
-      return null;
-    }
-  }
-
   /**
    * Test utility to stop heartbeat check process.
    *
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java
index 8d29d5eeb8..d2d8d0a63a 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java
@@ -42,7 +42,9 @@ import 
org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
 import org.apache.hadoop.hdds.scm.container.common.helpers.DeleteBlockResult;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.hdds.scm.net.InnerNode;
 import org.apache.hadoop.hdds.scm.net.Node;
+import org.apache.hadoop.hdds.scm.net.NodeImpl;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
 import org.apache.hadoop.hdds.scm.protocolPB.ScmBlockLocationProtocolPB;
@@ -69,6 +71,7 @@ import com.google.protobuf.ProtocolMessageEnum;
 import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_HANDLER_COUNT_DEFAULT;
 import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_HANDLER_COUNT_KEY;
 import static 
org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes.IO_EXCEPTION;
+import static org.apache.hadoop.hdds.scm.net.NetConstants.NODE_COST_DEFAULT;
 import static 
org.apache.hadoop.hdds.scm.server.StorageContainerManager.startRpcServer;
 import static org.apache.hadoop.hdds.server.ServerUtils.getRemoteUserName;
 import static org.apache.hadoop.hdds.server.ServerUtils.updateRPCListenAddress;
@@ -345,14 +348,13 @@ public class SCMBlockProtocolServer implements
     auditMap.put("nodes", String.valueOf(nodes));
     try {
       NodeManager nodeManager = scm.getScmNodeManager();
-      Node client = null;
-      List<DatanodeDetails> possibleClients =
-          nodeManager.getNodesByAddress(clientMachine);
-      if (possibleClients.size() > 0) {
-        client = possibleClients.get(0);
+      Node client = getDatanode(clientMachine);
+      // not datanode
+      if (client == null) {
+        client = getOtherNode(clientMachine);
       }
-      List<Node> nodeList = new ArrayList();
-      nodes.stream().forEach(uuid -> {
+      List<Node> nodeList = new ArrayList<>();
+      nodes.forEach(uuid -> {
         DatanodeDetails node = nodeManager.getNodeByUuid(uuid);
         if (node != null) {
           nodeList.add(node);
@@ -377,6 +379,30 @@ public class SCMBlockProtocolServer implements
     }
   }
 
+  private Node getDatanode(String clientMachine) {
+    List<DatanodeDetails> datanodes = scm.getScmNodeManager()
+        .getNodesByAddress(clientMachine);
+    return !datanodes.isEmpty() ? datanodes.get(0) : null;
+  }
+
+  private Node getOtherNode(String clientMachine) {
+    try {
+      String clientLocation = scm.resolveNodeLocation(clientMachine);
+      if (clientLocation != null) {
+        Node rack = scm.getClusterMap().getNode(clientLocation);
+        if (rack instanceof InnerNode) {
+          return new NodeImpl(clientMachine, clientLocation,
+              (InnerNode) rack, rack.getLevel() + 1,
+              NODE_COST_DEFAULT);
+        }
+      }
+    } catch (Exception e) {
+      LOG.info("Could not resolve client {}: {}",
+          clientMachine, e.getMessage());
+    }
+    return null;
+  }
+
   @Override
   public AuditMessage buildAuditMessageForSuccess(
       AuditAction op, Map<String, String> auditMap) {
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index 70636215b2..edba7bb6d3 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -25,10 +25,12 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.protobuf.BlockingService;
 
+import java.util.Collections;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.DFSConfigKeysLegacy;
 import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.HddsUtils;
 import org.apache.hadoop.hdds.annotation.InterfaceAudience;
@@ -147,7 +149,10 @@ import 
org.apache.hadoop.hdds.utils.LegacyHadoopConfigurationSource;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.metrics2.MetricsSystem;
 import org.apache.hadoop.metrics2.util.MBeans;
+import org.apache.hadoop.net.CachedDNSToSwitchMapping;
+import org.apache.hadoop.net.DNSToSwitchMapping;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.net.TableMapping;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.OzoneSecurityUtil;
 import org.apache.hadoop.ozone.common.Storage.StorageState;
@@ -160,6 +165,7 @@ import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
 import 
org.apache.hadoop.security.authentication.client.AuthenticationException;
+import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.util.ExitUtils;
 import org.apache.ratis.util.JvmPauseMonitor;
@@ -319,6 +325,7 @@ public final class StorageContainerManager extends 
ServiceRuntimeInfoImpl
   private final SecretKeyManagerService secretKeyManagerService;
 
   private Clock systemClock;
+  private DNSToSwitchMapping dnsToSwitchMapping;
 
   /**
    * Creates a new StorageContainerManager. Configuration will be
@@ -703,11 +710,22 @@ public final class StorageContainerManager extends 
ServiceRuntimeInfoImpl
           .build();
     }
 
+    Class<? extends DNSToSwitchMapping> dnsToSwitchMappingClass =
+        conf.getClass(
+            DFSConfigKeysLegacy.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
+            TableMapping.class, DNSToSwitchMapping.class);
+    DNSToSwitchMapping newInstance = ReflectionUtils.newInstance(
+        dnsToSwitchMappingClass, conf);
+    dnsToSwitchMapping =
+        ((newInstance instanceof CachedDNSToSwitchMapping) ? newInstance
+            : new CachedDNSToSwitchMapping(newInstance));
+
     if (configurator.getScmNodeManager() != null) {
       scmNodeManager = configurator.getScmNodeManager();
     } else {
       scmNodeManager = new SCMNodeManager(conf, scmStorageConfig, eventQueue,
-          clusterMap, scmContext, scmLayoutVersionManager);
+          clusterMap, scmContext, scmLayoutVersionManager,
+          this::resolveNodeLocation);
     }
 
     placementMetrics = SCMContainerPlacementMetrics.create();
@@ -2190,4 +2208,18 @@ public final class StorageContainerManager extends 
ServiceRuntimeInfoImpl
   public ReconfigurationHandler getReconfigurationHandler() {
     return reconfigurationHandler;
   }
+
+  public String resolveNodeLocation(String hostname) {
+    List<String> hosts = Collections.singletonList(hostname);
+    List<String> resolvedHosts = dnsToSwitchMapping.resolve(hosts);
+    if (resolvedHosts != null && !resolvedHosts.isEmpty()) {
+      String location = resolvedHosts.get(0);
+      LOG.debug("Node {} resolved to location {}", hostname, location);
+      return location;
+    } else {
+      LOG.debug("Node resolution did not yield any result for {}", hostname);
+      return null;
+    }
+  }
+
 }
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMBlockProtocolServer.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMBlockProtocolServer.java
index 4ee186323e..af905f4db5 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMBlockProtocolServer.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMBlockProtocolServer.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hdds.scm.server;
 
+import com.google.common.collect.ImmutableMap;
 import org.apache.hadoop.hdds.DFSConfigKeysLegacy;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@@ -43,6 +44,7 @@ import org.mockito.Mockito;
 import java.io.File;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.UUID;
 import java.util.stream.Collectors;
 
@@ -61,6 +63,11 @@ public class TestSCMBlockProtocolServer {
   private ScmBlockLocationProtocolServerSideTranslatorPB service;
   private static final int NODE_COUNT = 10;
 
+  private static final Map<String, String> EDGE_NODES = ImmutableMap.of(
+      "edge0", "/rack0",
+      "edge1", "/rack1"
+  );
+
   @BeforeEach
   void setUp(@TempDir File dir) throws Exception {
     config = SCMTestUtils.getConf(dir);
@@ -75,6 +82,7 @@ public class TestSCMBlockProtocolServer {
       nodeMapping.add(dn.getIpAddress() + "=" + rack);
       datanodes.add(dn);
     }
+    EDGE_NODES.forEach((n, r) -> nodeMapping.add(n + "=" + r));
     config.set(StaticMapping.KEY_HADOOP_CONFIGURED_NODE_MAPPING,
         String.join(",", nodeMapping));
 
@@ -113,18 +121,31 @@ public class TestSCMBlockProtocolServer {
       Assertions.assertEquals(dn, sorted.get(0),
           "Source node should be sorted very first");
 
-      for (int i = 1; i < NODE_COUNT / 2; i++) {
-        DatanodeDetails item = sorted.get(i);
-        Assertions.assertEquals(dn.getNetworkLocation(),
-            item.getNetworkLocation(),
-            "Nodes in the same rack should be sorted first");
-      }
-      for (int i = NODE_COUNT / 2; i < NODE_COUNT; i++) {
-        DatanodeDetails item = sorted.get(i);
-        Assertions.assertNotEquals(dn.getNetworkLocation(),
-            item.getNetworkLocation(),
-            "Nodes in the other rack should be sorted last");
-      }
+      assertRackOrder(dn.getNetworkLocation(), sorted);
+    }
+  }
+
+  @Test
+  void sortDatanodesRelativeToNonDatanode() {
+    List<String> datanodes = getNetworkNames();
+
+    for (Map.Entry<String, String> entry : EDGE_NODES.entrySet()) {
+      assertRackOrder(entry.getValue(),
+          server.sortDatanodes(datanodes, entry.getKey()));
+    }
+  }
+
+  private static void assertRackOrder(String rack, List<DatanodeDetails> list) 
{
+    int size = list.size();
+
+    for (int i = 0; i < size / 2; i++) {
+      Assertions.assertEquals(rack, list.get(i).getNetworkLocation(),
+          "Nodes in the same rack should be sorted first");
+    }
+
+    for (int i = size / 2; i < size; i++) {
+      Assertions.assertNotEquals(rack, list.get(i).getNetworkLocation(),
+          "Nodes in the other rack should be sorted last");
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to