This is an automated email from the ASF dual-hosted git repository. inigoiri pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push: new f18bbdd HDFS-14927. RBF: Add metrics for async callers thread pool. Contributed by Leon Gao. f18bbdd is described below commit f18bbdd9d84cc1a23d33524f5cb61321cdb1b926 Author: Inigo Goiri <inigo...@apache.org> AuthorDate: Fri Nov 1 10:14:31 2019 -0700 HDFS-14927. RBF: Add metrics for async callers thread pool. Contributed by Leon Gao. --- .../federation/metrics/FederationRPCMBean.java | 6 ++ .../federation/metrics/FederationRPCMetrics.java | 5 ++ .../server/federation/router/RouterRpcClient.java | 14 +++++ .../router/TestRouterClientRejectOverload.java | 70 ++++++++++++++++++++++ 4 files changed, 95 insertions(+) diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMBean.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMBean.java index 76b3ca6..f57e310 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMBean.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMBean.java @@ -93,4 +93,10 @@ public interface FederationRPCMBean { * @return JSON string representation. */ String getRpcClientConnections(); + + /** + * Get the JSON representation of the async caller thread pool. + * @return JSON string representation of the async caller thread pool. + */ + String getAsyncCallerPool(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMetrics.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMetrics.java index 8e57c6b..b16a6c0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMetrics.java @@ -220,6 +220,11 @@ public class FederationRPCMetrics implements FederationRPCMBean { return rpcServer.getRPCClient().getJSON(); } + @Override + public String getAsyncCallerPool() { + return rpcServer.getRPCClient().getAsyncCallerPoolJson(); + } + /** * Add the time to proxy an operation from the moment the Router sends it to * the Namenode until it replied. diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java index 563f3d0..1c17c29 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java @@ -69,6 +69,7 @@ import org.apache.hadoop.ipc.RetriableException; import org.apache.hadoop.ipc.StandbyException; import org.apache.hadoop.net.ConnectTimeoutException; import org.apache.hadoop.security.UserGroupInformation; +import org.eclipse.jetty.util.ajax.JSON; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -264,6 +265,19 @@ public class RouterRpcClient { } /** + * JSON representation of the async caller thread pool. + * + * @return String representation of the JSON. + */ + public String getAsyncCallerPoolJson() { + final Map<String, Integer> info = new LinkedHashMap<>(); + info.put("active", executorService.getActiveCount()); + info.put("total", executorService.getPoolSize()); + info.put("max", executorService.getMaximumPoolSize()); + return JSON.toString(info); + } + + /** * Get ClientProtocol proxy client for a NameNode. Each combination of user + * NN must use a unique proxy client. Previously created clients are cached * and stored in a connection pool by the ConnectionManager. diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterClientRejectOverload.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterClientRejectOverload.java index a4611f2..cc7f5a6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterClientRejectOverload.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterClientRejectOverload.java @@ -31,6 +31,7 @@ import java.net.URI; import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -48,6 +49,8 @@ import org.apache.hadoop.hdfs.server.federation.metrics.FederationRPCMetrics; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.StandbyException; +import org.apache.hadoop.test.GenericTestUtils; +import org.codehaus.jackson.map.ObjectMapper; import org.junit.After; import org.junit.Rule; import org.junit.Test; @@ -356,4 +359,71 @@ public class TestRouterClientRejectOverload { // Router 0 failures do not change assertEquals(originalRouter0Failures, rpcMetrics0.getProxyOpNoNamenodes()); } + + @Test + public void testAsyncCallerPoolMetrics() throws Exception { + setupCluster(true, false); + simulateSlowNamenode(cluster.getCluster().getNameNode(0), 2); + final ObjectMapper objectMapper = new ObjectMapper(); + + // Set only one router to make test easier + cluster.getRouters().remove(1); + FederationRPCMetrics metrics = cluster.getRouters().get(0).getRouter() + .getRpcServer().getRPCMetrics(); + + // No active connection initially + Map<String, Integer> result = objectMapper + .readValue(metrics.getAsyncCallerPool(), Map.class); + assertEquals(0, result.get("active").intValue()); + assertEquals(0, result.get("total").intValue()); + assertEquals(4, result.get("max").intValue()); + + ExecutorService exec = Executors.newSingleThreadExecutor(); + + try { + // Run a client request to create an active connection + exec.submit(() -> { + DFSClient routerClient = null; + try { + routerClient = new DFSClient(new URI("hdfs://fed"), + cluster.getRouterClientConf()); + String clientName = routerClient.getClientName(); + ClientProtocol routerProto = routerClient.getNamenode(); + routerProto.renewLease(clientName); + } catch (Exception e) { + fail("Client request failed: " + e); + } finally { + if (routerClient != null) { + try { + routerClient.close(); + } catch (IOException e) { + LOG.error("Cannot close the client"); + } + } + } + }); + + // Wait for client request to be active + GenericTestUtils.waitFor(() -> { + try { + Map<String, Integer> newResult = objectMapper.readValue( + metrics.getAsyncCallerPool(), Map.class); + if (newResult.get("active") != 1) { + return false; + } + if (newResult.get("max") != 4) { + return false; + } + int total = newResult.get("total"); + // "total" is dynamic + return total >= 1 && total <= 4; + } catch (Exception e) { + LOG.error("Not able to parse metrics result: " + e); + } + return false; + }, 100, 2000); + } finally { + exec.shutdown(); + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org