This is an automated email from the ASF dual-hosted git repository.

slfan1989 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 741bdd636b4 HDFS-17721. RBF: Allow routers to declare IP for admin 
addr (#7342) Contributed by Felix Nguyen.
741bdd636b4 is described below

commit 741bdd636b4e24d6c6ea44b77661315bff474776
Author: Felix Nguyen <174066851+kokon...@users.noreply.github.com>
AuthorDate: Mon Feb 3 21:28:37 2025 +0800

    HDFS-17721. RBF: Allow routers to declare IP for admin addr (#7342) 
Contributed by Felix Nguyen.
    
    Reviewed-by: Haiyang Hu <huhaiy...@apache.org>
    Signed-off-by: Shilun Fan <slfan1...@apache.org>
---
 .../router/MountTableRefresherService.java         | 16 ++++++++---
 .../server/federation/router/RBFConfigKeys.java    |  3 ++
 .../federation/router/RouterHeartbeatService.java  |  9 ++++--
 .../server/federation/store/StateStoreUtils.java   | 16 +++++++++++
 .../src/main/resources/hdfs-rbf-default.xml        |  8 ++++++
 .../router/TestRouterMountTableCacheRefresh.java   | 32 ++++++++++++++++------
 6 files changed, 69 insertions(+), 15 deletions(-)

diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/MountTableRefresherService.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/MountTableRefresherService.java
index 1f8debf0ec2..1579ccbccd1 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/MountTableRefresherService.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/MountTableRefresherService.java
@@ -75,7 +75,7 @@ public class MountTableRefresherService extends 
AbstractService {
 
   /**
    * All router admin clients cached. So no need to create the client again and
-   * again. Router admin address(host:port) is used as key to cache 
RouterClient
+   * again. Router admin address(host:port or ip:port) is used as key to cache 
RouterClient
    * objects.
    */
   private LoadingCache<String, RouterClient> routerClientsCache;
@@ -102,8 +102,13 @@ protected void serviceInit(Configuration conf) throws 
Exception {
     this.mountTableStore = getMountTableStore();
     // Attach this service to mount table store.
     this.mountTableStore.setRefreshService(this);
-    this.localAdminAddress =
-        StateStoreUtils.getHostPortString(router.getAdminServerAddress());
+    if (conf.getBoolean(RBFConfigKeys.DFS_ROUTER_HEARTBEAT_WITH_IP_ENABLE,
+        RBFConfigKeys.DFS_ROUTER_HEARTBEAT_WITH_IP_ENABLE_DEFAULT)) {
+      this.localAdminAddress = 
StateStoreUtils.getIpPortString(router.getAdminServerAddress());
+    } else {
+      this.localAdminAddress = 
StateStoreUtils.getHostPortString(router.getAdminServerAddress());
+    }
+    LOG.info("Initialized MountTableRefresherService with addr: {}", 
this.localAdminAddress);
     this.cacheUpdateTimeout = conf.getTimeDuration(
         RBFConfigKeys.MOUNT_TABLE_CACHE_UPDATE_TIMEOUT,
         RBFConfigKeys.MOUNT_TABLE_CACHE_UPDATE_TIMEOUT_DEFAULT,
@@ -220,7 +225,7 @@ public void refresh() throws StateStoreUnavailableException 
{
     List<MountTableRefresherThread> refreshThreads = new ArrayList<>();
     for (RouterState routerState : cachedRecords) {
       String adminAddress = routerState.getAdminAddress();
-      if (adminAddress == null || adminAddress.length() == 0) {
+      if (adminAddress == null || adminAddress.isEmpty()) {
         // this router has not enabled router admin.
         continue;
       }
@@ -237,11 +242,13 @@ public void refresh() throws 
StateStoreUnavailableException {
          * RouterClient
          */
         refreshThreads.add(getLocalRefresher(adminAddress));
+        LOG.debug("Added local refresher for {}", adminAddress);
       } else {
         try {
           RouterClient client = routerClientsCache.get(adminAddress);
           refreshThreads.add(new MountTableRefresherThread(
               client.getMountTableManager(), adminAddress));
+          LOG.debug("Added remote refresher for {}", adminAddress);
         } catch (ExecutionException execExcep) {
           // Can not connect, seems router is stopped now.
           LOG.warn(ROUTER_CONNECT_ERROR_MSG, adminAddress, execExcep);
@@ -296,6 +303,7 @@ private void logResult(List<MountTableRefresherThread> 
refreshThreads) {
       if (mountTableRefreshThread.isSuccess()) {
         successCount++;
       } else {
+        LOG.debug("Failed to refresh {}", 
mountTableRefreshThread.getAdminAddress());
         failureCount++;
         // remove RouterClient from cache so that new client is created
         removeFromCache(mountTableRefreshThread.getAdminAddress());
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java
index 64f27bd3ba3..ee23fe8ced0 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java
@@ -288,6 +288,9 @@ public class RBFConfigKeys extends 
CommonConfigurationKeysPublic {
       FEDERATION_ROUTER_PREFIX + "safemode.checkperiod";
   public static final long DFS_ROUTER_SAFEMODE_CHECKPERIOD_MS_DEFAULT =
       TimeUnit.SECONDS.toMillis(5);
+  public static final String DFS_ROUTER_HEARTBEAT_WITH_IP_ENABLE =
+      FEDERATION_ROUTER_PREFIX + "heartbeat.with.ip.enable";
+  public static final boolean DFS_ROUTER_HEARTBEAT_WITH_IP_ENABLE_DEFAULT = 
false;
 
   // HDFS Router-based federation mount table entries
   /** Maximum number of cache entries to have. */
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterHeartbeatService.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterHeartbeatService.java
index 19d7442acb6..5607ab8109d 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterHeartbeatService.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterHeartbeatService.java
@@ -88,9 +88,12 @@ synchronized void updateStateStore() {
             getStateStoreVersion(MountTableStore.class));
         record.setStateStoreVersion(stateStoreVersion);
         // if admin server not started then hostPort will be empty
-        String hostPort =
-            StateStoreUtils.getHostPortString(router.getAdminServerAddress());
-        record.setAdminAddress(hostPort);
+        if 
(router.getConfig().getBoolean(RBFConfigKeys.DFS_ROUTER_HEARTBEAT_WITH_IP_ENABLE,
+            RBFConfigKeys.DFS_ROUTER_HEARTBEAT_WITH_IP_ENABLE_DEFAULT)) {
+          
record.setAdminAddress(StateStoreUtils.getIpPortString(router.getAdminServerAddress()));
+        } else {
+          
record.setAdminAddress(StateStoreUtils.getHostPortString(router.getAdminServerAddress()));
+        }
         RouterHeartbeatRequest request =
             RouterHeartbeatRequest.newInstance(record);
         RouterHeartbeatResponse response = 
routerStore.routerHeartbeat(request);
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreUtils.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreUtils.java
index 4b932d6d939..7f755ddbd52 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreUtils.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreUtils.java
@@ -25,6 +25,7 @@
 
 import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
 import org.apache.hadoop.hdfs.server.federation.store.records.Query;
+import org.apache.hadoop.net.NetUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -136,4 +137,19 @@ public static String getHostPortString(InetSocketAddress 
address) {
     return hostName + ":" + address.getPort();
   }
 
+  /**
+   * Returns address in form of ip:port, empty string if address is null.
+   *
+   * @param address address
+   * @return host:port
+   */
+  public static String getIpPortString(InetSocketAddress address) {
+    if (null == address) {
+      return "";
+    }
+    address = NetUtils.getConnectAddress(address);
+    InetAddress inet = address.getAddress();
+    return inet.getHostAddress() + ":" + address.getPort();
+  }
+
 }
\ No newline at end of file
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml
index 26b89ce0313..408a6ef25f6 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml
@@ -974,4 +974,12 @@
     </description>
   </property>
 
+  <property>
+    <name>dfs.federation.router.heartbeat.with.ip.enable</name>
+    <description>
+      Make router use IP instead of host when communicating with router state 
state store.
+    </description>
+    <value>false</value>
+  </property>
+
 </configuration>
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterMountTableCacheRefresh.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterMountTableCacheRefresh.java
index 139236d24e1..1995939297b 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterMountTableCacheRefresh.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterMountTableCacheRefresh.java
@@ -25,6 +25,8 @@
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
@@ -53,23 +55,32 @@
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.util.Time;
 import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 /**
  * This test class verifies that mount table cache is updated on all the 
routers
  * when MountTableRefreshService is enabled and there is a change in mount 
table
  * entries.
  */
+@RunWith(Parameterized.class)
 public class TestRouterMountTableCacheRefresh {
   private static TestingServer curatorTestingServer;
   private static MiniRouterDFSCluster cluster;
   private static RouterContext routerContext;
   private static MountTableManager mountTableManager;
 
-  @BeforeClass
-  public static void setUp() throws Exception {
+  @Parameterized.Parameters
+  public static Collection<Object> data() {
+    return Arrays.asList(new Object[] {true, false});
+  }
+
+  public TestRouterMountTableCacheRefresh(boolean useIpForHeartbeats) throws 
Exception {
+    // Initialize only once per parameter
+    if (curatorTestingServer != null) {
+      return;
+    }
     curatorTestingServer = new TestingServer();
     curatorTestingServer.start();
     final String connectString = curatorTestingServer.getConnectString();
@@ -82,6 +93,7 @@ public static void setUp() throws Exception {
         FileSubclusterResolver.class);
     conf.set(RBFConfigKeys.FEDERATION_STORE_ZK_ADDRESS, connectString);
     conf.setBoolean(RBFConfigKeys.DFS_ROUTER_STORE_ENABLE, true);
+    conf.setBoolean(RBFConfigKeys.DFS_ROUTER_HEARTBEAT_WITH_IP_ENABLE, 
useIpForHeartbeats);
     cluster.addRouterOverrides(conf);
     cluster.startCluster();
     cluster.startRouters();
@@ -95,11 +107,15 @@ public static void setUp() throws Exception {
         numNameservices, 60000);
   }
 
-  @AfterClass
-  public static void destory() {
+  @Parameterized.AfterParam
+  public static void destroy() {
     try {
-      curatorTestingServer.close();
-      cluster.shutdown();
+      if (curatorTestingServer != null) {
+        curatorTestingServer.close();
+      }
+      if (cluster != null) {
+        cluster.shutdown();
+      }
     } catch (IOException e) {
       // do nothing
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to