This is an automated email from the ASF dual-hosted git repository. inigoiri pushed a commit to branch HDFS-13891 in repository https://gitbox.apache.org/repos/asf/hadoop.git
commit 0ffeac3ae0c37acd1679ab91335a0746081b5cb7 Author: Yiqun Lin <yq...@apache.org> AuthorDate: Tue Dec 4 19:58:38 2018 +0800 HDFS-14114. RBF: MIN_ACTIVE_RATIO should be configurable. Contributed by Fei Hui. --- .../federation/router/ConnectionManager.java | 20 +++++---- .../server/federation/router/ConnectionPool.java | 14 +++++- .../server/federation/router/RBFConfigKeys.java | 5 +++ .../src/main/resources/hdfs-rbf-default.xml | 8 ++++ .../federation/router/TestConnectionManager.java | 51 +++++++++++++++++++--- 5 files changed, 83 insertions(+), 15 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java index fa2bf94..74bbbb5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java @@ -49,10 +49,6 @@ public class ConnectionManager { private static final Logger LOG = LoggerFactory.getLogger(ConnectionManager.class); - /** Minimum amount of active connections: 50%. */ - protected static final float MIN_ACTIVE_RATIO = 0.5f; - - /** Configuration for the connection manager, pool and sockets. */ private final Configuration conf; @@ -60,6 +56,8 @@ public class ConnectionManager { private final int minSize = 1; /** Max number of connections per user + nn. */ private final int maxSize; + /** Min ratio of active connections per user + nn. */ + private final float minActiveRatio; /** How often we close a pool for a particular user + nn. */ private final long poolCleanupPeriodMs; @@ -96,10 +94,13 @@ public class ConnectionManager { public ConnectionManager(Configuration config) { this.conf = config; - // Configure minimum and maximum connection pools + // Configure minimum, maximum and active connection pools this.maxSize = this.conf.getInt( RBFConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_POOL_SIZE, RBFConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_POOL_SIZE_DEFAULT); + this.minActiveRatio = this.conf.getFloat( + RBFConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_MIN_ACTIVE_RATIO, + RBFConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_MIN_ACTIVE_RATIO_DEFAULT); // Map with the connections indexed by UGI and Namenode this.pools = new HashMap<>(); @@ -203,7 +204,8 @@ public class ConnectionManager { pool = this.pools.get(connectionId); if (pool == null) { pool = new ConnectionPool( - this.conf, nnAddress, ugi, this.minSize, this.maxSize, protocol); + this.conf, nnAddress, ugi, this.minSize, this.maxSize, + this.minActiveRatio, protocol); this.pools.put(connectionId, pool); } } finally { @@ -326,8 +328,9 @@ public class ConnectionManager { long timeSinceLastActive = Time.now() - pool.getLastActiveTime(); int total = pool.getNumConnections(); int active = pool.getNumActiveConnections(); + float poolMinActiveRatio = pool.getMinActiveRatio(); if (timeSinceLastActive > connectionCleanupPeriodMs || - active < MIN_ACTIVE_RATIO * total) { + active < poolMinActiveRatio * total) { // Remove and close 1 connection List<ConnectionContext> conns = pool.removeConnections(1); for (ConnectionContext conn : conns) { @@ -412,8 +415,9 @@ public class ConnectionManager { try { int total = pool.getNumConnections(); int active = pool.getNumActiveConnections(); + float poolMinActiveRatio = pool.getMinActiveRatio(); if (pool.getNumConnections() < pool.getMaxSize() && - active >= MIN_ACTIVE_RATIO * total) { + active >= poolMinActiveRatio * total) { ConnectionContext conn = pool.newConnection(); pool.addConnection(conn); } else { diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java index fab3b81..f868521 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java @@ -91,6 +91,8 @@ public class ConnectionPool { private final int minSize; /** Max number of connections per user. */ private final int maxSize; + /** Min ratio of active connections per user. */ + private final float minActiveRatio; /** The last time a connection was active. */ private volatile long lastActiveTime = 0; @@ -98,7 +100,7 @@ public class ConnectionPool { protected ConnectionPool(Configuration config, String address, UserGroupInformation user, int minPoolSize, int maxPoolSize, - Class<?> proto) throws IOException { + float minActiveRatio, Class<?> proto) throws IOException { this.conf = config; @@ -112,6 +114,7 @@ public class ConnectionPool { // Set configuration parameters for the pool this.minSize = minPoolSize; this.maxSize = maxPoolSize; + this.minActiveRatio = minActiveRatio; // Add minimum connections to the pool for (int i=0; i<this.minSize; i++) { @@ -141,6 +144,15 @@ public class ConnectionPool { } /** + * Get the minimum ratio of active connections in this pool. + * + * @return Minimum ratio of active connections. + */ + protected float getMinActiveRatio() { + return this.minActiveRatio; + } + + /** * Get the connection pool identifier. * * @return Connection pool identifier. diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java index 10018fe..0070de7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java @@ -102,6 +102,11 @@ public class RBFConfigKeys extends CommonConfigurationKeysPublic { FEDERATION_ROUTER_PREFIX + "connection.creator.queue-size"; public static final int DFS_ROUTER_NAMENODE_CONNECTION_CREATOR_QUEUE_SIZE_DEFAULT = 100; + public static final String + DFS_ROUTER_NAMENODE_CONNECTION_MIN_ACTIVE_RATIO = + FEDERATION_ROUTER_PREFIX + "connection.min-active-ratio"; + public static final float + DFS_ROUTER_NAMENODE_CONNECTION_MIN_ACTIVE_RATIO_DEFAULT = 0.5f; public static final String DFS_ROUTER_NAMENODE_CONNECTION_POOL_SIZE = FEDERATION_ROUTER_PREFIX + "connection.pool-size"; public static final int DFS_ROUTER_NAMENODE_CONNECTION_POOL_SIZE_DEFAULT = diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml index 09050bb..afb3c32 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml @@ -118,6 +118,14 @@ </property> <property> + <name>dfs.federation.router.connection.min-active-ratio</name> + <value>0.5f</value> + <description> + Minimum active ratio of connections from the router to namenodes. + </description> + </property> + + <property> <name>dfs.federation.router.connection.clean.ms</name> <value>10000</value> <description> diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestConnectionManager.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestConnectionManager.java index 765f6c8..6c1e448 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestConnectionManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestConnectionManager.java @@ -80,14 +80,14 @@ public class TestConnectionManager { Map<ConnectionPoolId, ConnectionPool> poolMap = connManager.getPools(); ConnectionPool pool1 = new ConnectionPool( - conf, TEST_NN_ADDRESS, TEST_USER1, 0, 10, ClientProtocol.class); + conf, TEST_NN_ADDRESS, TEST_USER1, 0, 10, 0.5f, ClientProtocol.class); addConnectionsToPool(pool1, 9, 4); poolMap.put( new ConnectionPoolId(TEST_USER1, TEST_NN_ADDRESS, ClientProtocol.class), pool1); ConnectionPool pool2 = new ConnectionPool( - conf, TEST_NN_ADDRESS, TEST_USER2, 0, 10, ClientProtocol.class); + conf, TEST_NN_ADDRESS, TEST_USER2, 0, 10, 0.5f, ClientProtocol.class); addConnectionsToPool(pool2, 10, 10); poolMap.put( new ConnectionPoolId(TEST_USER2, TEST_NN_ADDRESS, ClientProtocol.class), @@ -110,7 +110,7 @@ public class TestConnectionManager { // Make sure the number of connections doesn't go below minSize ConnectionPool pool3 = new ConnectionPool( - conf, TEST_NN_ADDRESS, TEST_USER3, 2, 10, ClientProtocol.class); + conf, TEST_NN_ADDRESS, TEST_USER3, 2, 10, 0.5f, ClientProtocol.class); addConnectionsToPool(pool3, 8, 0); poolMap.put( new ConnectionPoolId(TEST_USER3, TEST_NN_ADDRESS, ClientProtocol.class), @@ -171,7 +171,7 @@ public class TestConnectionManager { int activeConns = 5; ConnectionPool pool = new ConnectionPool( - conf, TEST_NN_ADDRESS, TEST_USER1, 0, 10, ClientProtocol.class); + conf, TEST_NN_ADDRESS, TEST_USER1, 0, 10, 0.5f, ClientProtocol.class); addConnectionsToPool(pool, totalConns, activeConns); poolMap.put( new ConnectionPoolId(TEST_USER1, TEST_NN_ADDRESS, ClientProtocol.class), @@ -196,7 +196,7 @@ public class TestConnectionManager { @Test public void testValidClientIndex() throws Exception { ConnectionPool pool = new ConnectionPool( - conf, TEST_NN_ADDRESS, TEST_USER1, 2, 2, ClientProtocol.class); + conf, TEST_NN_ADDRESS, TEST_USER1, 2, 2, 0.5f, ClientProtocol.class); for(int i = -3; i <= 3; i++) { pool.getClientIndex().set(i); ConnectionContext conn = pool.getConnection(); @@ -212,7 +212,7 @@ public class TestConnectionManager { int activeConns = 5; ConnectionPool pool = new ConnectionPool( - conf, TEST_NN_ADDRESS, TEST_USER1, 0, 10, NamenodeProtocol.class); + conf, TEST_NN_ADDRESS, TEST_USER1, 0, 10, 0.5f, NamenodeProtocol.class); addConnectionsToPool(pool, totalConns, activeConns); poolMap.put( new ConnectionPoolId( @@ -262,4 +262,43 @@ public class TestConnectionManager { } } + @Test + public void testConfigureConnectionActiveRatio() throws IOException { + final int totalConns = 10; + int activeConns = 7; + + Configuration tmpConf = new Configuration(); + // Set dfs.federation.router.connection.min-active-ratio 0.8f + tmpConf.setFloat( + RBFConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_MIN_ACTIVE_RATIO, 0.8f); + ConnectionManager tmpConnManager = new ConnectionManager(tmpConf); + tmpConnManager.start(); + + // Create one new connection pool + tmpConnManager.getConnection(TEST_USER1, TEST_NN_ADDRESS, + NamenodeProtocol.class); + + Map<ConnectionPoolId, ConnectionPool> poolMap = tmpConnManager.getPools(); + ConnectionPoolId connectionPoolId = new ConnectionPoolId(TEST_USER1, + TEST_NN_ADDRESS, NamenodeProtocol.class); + ConnectionPool pool = poolMap.get(connectionPoolId); + + // Test min active ratio is 0.8f + assertEquals(0.8f, pool.getMinActiveRatio(), 0.001f); + + pool.getConnection().getClient(); + // Test there is one active connection in pool + assertEquals(1, pool.getNumActiveConnections()); + + // Add other 6 active/9 total connections to pool + addConnectionsToPool(pool, totalConns - 1, activeConns - 1); + + // There are 7 active connections. + // The active number is less than totalConns(10) * minActiveRatio(0.8f). + // We can cleanup the pool + tmpConnManager.cleanup(pool); + assertEquals(totalConns - 1, pool.getNumConnections()); + + tmpConnManager.close(); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org