This is an automated email from the ASF dual-hosted git repository. slfan1989 pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push: new 741bdd636b4 HDFS-17721. RBF: Allow routers to declare IP for admin addr (#7342) Contributed by Felix Nguyen. 741bdd636b4 is described below commit 741bdd636b4e24d6c6ea44b77661315bff474776 Author: Felix Nguyen <174066851+kokon...@users.noreply.github.com> AuthorDate: Mon Feb 3 21:28:37 2025 +0800 HDFS-17721. RBF: Allow routers to declare IP for admin addr (#7342) Contributed by Felix Nguyen. Reviewed-by: Haiyang Hu <huhaiy...@apache.org> Signed-off-by: Shilun Fan <slfan1...@apache.org> --- .../router/MountTableRefresherService.java | 16 ++++++++--- .../server/federation/router/RBFConfigKeys.java | 3 ++ .../federation/router/RouterHeartbeatService.java | 9 ++++-- .../server/federation/store/StateStoreUtils.java | 16 +++++++++++ .../src/main/resources/hdfs-rbf-default.xml | 8 ++++++ .../router/TestRouterMountTableCacheRefresh.java | 32 ++++++++++++++++------ 6 files changed, 69 insertions(+), 15 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/MountTableRefresherService.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/MountTableRefresherService.java index 1f8debf0ec2..1579ccbccd1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/MountTableRefresherService.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/MountTableRefresherService.java @@ -75,7 +75,7 @@ public class MountTableRefresherService extends AbstractService { /** * All router admin clients cached. So no need to create the client again and - * again. Router admin address(host:port) is used as key to cache RouterClient + * again. Router admin address(host:port or ip:port) is used as key to cache RouterClient * objects. */ private LoadingCache<String, RouterClient> routerClientsCache; @@ -102,8 +102,13 @@ protected void serviceInit(Configuration conf) throws Exception { this.mountTableStore = getMountTableStore(); // Attach this service to mount table store. this.mountTableStore.setRefreshService(this); - this.localAdminAddress = - StateStoreUtils.getHostPortString(router.getAdminServerAddress()); + if (conf.getBoolean(RBFConfigKeys.DFS_ROUTER_HEARTBEAT_WITH_IP_ENABLE, + RBFConfigKeys.DFS_ROUTER_HEARTBEAT_WITH_IP_ENABLE_DEFAULT)) { + this.localAdminAddress = StateStoreUtils.getIpPortString(router.getAdminServerAddress()); + } else { + this.localAdminAddress = StateStoreUtils.getHostPortString(router.getAdminServerAddress()); + } + LOG.info("Initialized MountTableRefresherService with addr: {}", this.localAdminAddress); this.cacheUpdateTimeout = conf.getTimeDuration( RBFConfigKeys.MOUNT_TABLE_CACHE_UPDATE_TIMEOUT, RBFConfigKeys.MOUNT_TABLE_CACHE_UPDATE_TIMEOUT_DEFAULT, @@ -220,7 +225,7 @@ public void refresh() throws StateStoreUnavailableException { List<MountTableRefresherThread> refreshThreads = new ArrayList<>(); for (RouterState routerState : cachedRecords) { String adminAddress = routerState.getAdminAddress(); - if (adminAddress == null || adminAddress.length() == 0) { + if (adminAddress == null || adminAddress.isEmpty()) { // this router has not enabled router admin. continue; } @@ -237,11 +242,13 @@ public void refresh() throws StateStoreUnavailableException { * RouterClient */ refreshThreads.add(getLocalRefresher(adminAddress)); + LOG.debug("Added local refresher for {}", adminAddress); } else { try { RouterClient client = routerClientsCache.get(adminAddress); refreshThreads.add(new MountTableRefresherThread( client.getMountTableManager(), adminAddress)); + LOG.debug("Added remote refresher for {}", adminAddress); } catch (ExecutionException execExcep) { // Can not connect, seems router is stopped now. LOG.warn(ROUTER_CONNECT_ERROR_MSG, adminAddress, execExcep); @@ -296,6 +303,7 @@ private void logResult(List<MountTableRefresherThread> refreshThreads) { if (mountTableRefreshThread.isSuccess()) { successCount++; } else { + LOG.debug("Failed to refresh {}", mountTableRefreshThread.getAdminAddress()); failureCount++; // remove RouterClient from cache so that new client is created removeFromCache(mountTableRefreshThread.getAdminAddress()); diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java index 64f27bd3ba3..ee23fe8ced0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java @@ -288,6 +288,9 @@ public class RBFConfigKeys extends CommonConfigurationKeysPublic { FEDERATION_ROUTER_PREFIX + "safemode.checkperiod"; public static final long DFS_ROUTER_SAFEMODE_CHECKPERIOD_MS_DEFAULT = TimeUnit.SECONDS.toMillis(5); + public static final String DFS_ROUTER_HEARTBEAT_WITH_IP_ENABLE = + FEDERATION_ROUTER_PREFIX + "heartbeat.with.ip.enable"; + public static final boolean DFS_ROUTER_HEARTBEAT_WITH_IP_ENABLE_DEFAULT = false; // HDFS Router-based federation mount table entries /** Maximum number of cache entries to have. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterHeartbeatService.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterHeartbeatService.java index 19d7442acb6..5607ab8109d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterHeartbeatService.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterHeartbeatService.java @@ -88,9 +88,12 @@ synchronized void updateStateStore() { getStateStoreVersion(MountTableStore.class)); record.setStateStoreVersion(stateStoreVersion); // if admin server not started then hostPort will be empty - String hostPort = - StateStoreUtils.getHostPortString(router.getAdminServerAddress()); - record.setAdminAddress(hostPort); + if (router.getConfig().getBoolean(RBFConfigKeys.DFS_ROUTER_HEARTBEAT_WITH_IP_ENABLE, + RBFConfigKeys.DFS_ROUTER_HEARTBEAT_WITH_IP_ENABLE_DEFAULT)) { + record.setAdminAddress(StateStoreUtils.getIpPortString(router.getAdminServerAddress())); + } else { + record.setAdminAddress(StateStoreUtils.getHostPortString(router.getAdminServerAddress())); + } RouterHeartbeatRequest request = RouterHeartbeatRequest.newInstance(record); RouterHeartbeatResponse response = routerStore.routerHeartbeat(request); diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreUtils.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreUtils.java index 4b932d6d939..7f755ddbd52 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreUtils.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreUtils.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord; import org.apache.hadoop.hdfs.server.federation.store.records.Query; +import org.apache.hadoop.net.NetUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -136,4 +137,19 @@ public static String getHostPortString(InetSocketAddress address) { return hostName + ":" + address.getPort(); } + /** + * Returns address in form of ip:port, empty string if address is null. + * + * @param address address + * @return host:port + */ + public static String getIpPortString(InetSocketAddress address) { + if (null == address) { + return ""; + } + address = NetUtils.getConnectAddress(address); + InetAddress inet = address.getAddress(); + return inet.getHostAddress() + ":" + address.getPort(); + } + } \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml index 26b89ce0313..408a6ef25f6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml @@ -974,4 +974,12 @@ </description> </property> + <property> + <name>dfs.federation.router.heartbeat.with.ip.enable</name> + <description> + Make router use IP instead of host when communicating with router state state store. + </description> + <value>false</value> + </property> + </configuration> diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterMountTableCacheRefresh.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterMountTableCacheRefresh.java index 139236d24e1..1995939297b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterMountTableCacheRefresh.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterMountTableCacheRefresh.java @@ -25,6 +25,8 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.concurrent.TimeUnit; @@ -53,23 +55,32 @@ import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.Time; import org.junit.After; -import org.junit.AfterClass; -import org.junit.BeforeClass; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; /** * This test class verifies that mount table cache is updated on all the routers * when MountTableRefreshService is enabled and there is a change in mount table * entries. */ +@RunWith(Parameterized.class) public class TestRouterMountTableCacheRefresh { private static TestingServer curatorTestingServer; private static MiniRouterDFSCluster cluster; private static RouterContext routerContext; private static MountTableManager mountTableManager; - @BeforeClass - public static void setUp() throws Exception { + @Parameterized.Parameters + public static Collection<Object> data() { + return Arrays.asList(new Object[] {true, false}); + } + + public TestRouterMountTableCacheRefresh(boolean useIpForHeartbeats) throws Exception { + // Initialize only once per parameter + if (curatorTestingServer != null) { + return; + } curatorTestingServer = new TestingServer(); curatorTestingServer.start(); final String connectString = curatorTestingServer.getConnectString(); @@ -82,6 +93,7 @@ public static void setUp() throws Exception { FileSubclusterResolver.class); conf.set(RBFConfigKeys.FEDERATION_STORE_ZK_ADDRESS, connectString); conf.setBoolean(RBFConfigKeys.DFS_ROUTER_STORE_ENABLE, true); + conf.setBoolean(RBFConfigKeys.DFS_ROUTER_HEARTBEAT_WITH_IP_ENABLE, useIpForHeartbeats); cluster.addRouterOverrides(conf); cluster.startCluster(); cluster.startRouters(); @@ -95,11 +107,15 @@ public static void setUp() throws Exception { numNameservices, 60000); } - @AfterClass - public static void destory() { + @Parameterized.AfterParam + public static void destroy() { try { - curatorTestingServer.close(); - cluster.shutdown(); + if (curatorTestingServer != null) { + curatorTestingServer.close(); + } + if (cluster != null) { + cluster.shutdown(); + } } catch (IOException e) { // do nothing } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org