HADOOP-13742. Expose NumOpenConnectionsPerUser as a metric. Brahma Reddy 
Battula.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/bd373555
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/bd373555
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/bd373555

Branch: refs/heads/HADOOP-13345
Commit: bd3735554fa5c3bc064c57ec78f4308430b14b48
Parents: b2d4b7b
Author: Kihwal Lee <kih...@apache.org>
Authored: Thu Nov 17 12:16:38 2016 -0600
Committer: Kihwal Lee <kih...@apache.org>
Committed: Thu Nov 17 12:16:38 2016 -0600

----------------------------------------------------------------------
 .../main/java/org/apache/hadoop/ipc/Server.java | 61 +++++++++++++++++++-
 .../apache/hadoop/ipc/metrics/RpcMetrics.java   |  5 ++
 .../java/org/apache/hadoop/ipc/TestRPC.java     | 29 +++++++++-
 .../org/apache/hadoop/test/MetricsAsserts.java  |  7 +++
 4 files changed, 98 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd373555/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
index 1c7e76a..8f1956e 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
@@ -122,6 +122,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.CodedOutputStream;
 import com.google.protobuf.Message;
+import org.codehaus.jackson.map.ObjectMapper;
 
 /** An abstract IPC service.  IPC calls take a single {@link Writable} as a
  * parameter, and return a {@link Writable} as their value.  A service runs on
@@ -2151,6 +2152,9 @@ public abstract class Server {
       authorizeConnection();
       // don't set until after authz because connection isn't established
       connectionContextRead = true;
+      if (user != null) {
+        connectionManager.incrUserConnections(user.getShortUserName());
+      }
     }
     
     /**
@@ -3019,7 +3023,20 @@ public abstract class Server {
   public int getNumOpenConnections() {
     return connectionManager.size();
   }
-  
+
+  /**
+   * Get the NumOpenConnections/User.
+   */
+  public String getNumOpenConnectionsPerUser() {
+    ObjectMapper mapper = new ObjectMapper();
+    try {
+      return mapper
+          .writeValueAsString(connectionManager.getUserToConnectionsMap());
+    } catch (IOException ignored) {
+    }
+    return null;
+  }
+
   /**
    * The number of rpc calls in the queue.
    * @return The number of rpc calls in the queue.
@@ -3139,6 +3156,9 @@ public abstract class Server {
   private class ConnectionManager {
     final private AtomicInteger count = new AtomicInteger();    
     final private Set<Connection> connections;
+    /* Map to maintain the statistics per User */
+    final private Map<String, Integer> userToConnectionsMap;
+    final private Object userToConnectionsMapLock = new Object();
 
     final private Timer idleScanTimer;
     final private int idleScanThreshold;
@@ -3170,6 +3190,7 @@ public abstract class Server {
       this.connections = Collections.newSetFromMap(
           new ConcurrentHashMap<Connection,Boolean>(
               maxQueueSize, 0.75f, readThreads+2));
+      this.userToConnectionsMap = new ConcurrentHashMap<>();
     }
 
     private boolean add(Connection connection) {
@@ -3187,7 +3208,39 @@ public abstract class Server {
       }
       return removed;
     }
-    
+
+    void incrUserConnections(String user) {
+      synchronized (userToConnectionsMapLock) {
+        Integer count = userToConnectionsMap.get(user);
+        if (count == null) {
+          count = 1;
+        } else {
+          count++;
+        }
+        userToConnectionsMap.put(user, count);
+      }
+    }
+
+    void decrUserConnections(String user) {
+      synchronized (userToConnectionsMapLock) {
+        Integer count = userToConnectionsMap.get(user);
+        if (count == null) {
+          return;
+        } else {
+          count--;
+        }
+        if (count == 0) {
+          userToConnectionsMap.remove(user);
+        } else {
+          userToConnectionsMap.put(user, count);
+        }
+      }
+    }
+
+    Map<String, Integer> getUserToConnectionsMap() {
+      return userToConnectionsMap;
+    }
+
     int size() {
       return count.get();
     }
@@ -3226,6 +3279,10 @@ public abstract class Server {
         // only close if actually removed to avoid double-closing due
         // to possible races
         connection.close();
+        // Remove authorized users only
+        if (connection.user != null && connection.connectionContextRead) {
+          decrUserConnections(connection.user.getShortUserName());
+        }
       }
       return exists;
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd373555/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java
index 5373f95..ef43618 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java
@@ -104,6 +104,11 @@ public class RpcMetrics {
     return server.getNumOpenConnections();
   }
 
+  @Metric("Number of open connections per user")
+  public String numOpenConnectionsPerUser() {
+    return server.getNumOpenConnectionsPerUser();
+  }
+
   @Metric("Length of the call queue") public int callQueueLength() {
     return server.getCallQueueLen();
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd373555/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
index 72b603a..f0d883b 100644
--- 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
+++ 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
@@ -64,6 +64,7 @@ import java.net.ConnectException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.SocketTimeoutException;
+import java.security.PrivilegedAction;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -1015,7 +1016,7 @@ public class TestRPC extends TestRpcBase {
 
   @Test
   public void testRpcMetrics() throws Exception {
-    Server server;
+    final Server server;
     TestRpcService proxy = null;
 
     final int interval = 1;
@@ -1025,7 +1026,21 @@ public class TestRPC extends TestRpcBase {
         RPC_METRICS_PERCENTILES_INTERVALS_KEY, "" + interval);
 
     server = setupTestServer(conf, 5);
-
+    String testUser = "testUser";
+    UserGroupInformation anotherUser =
+        UserGroupInformation.createRemoteUser(testUser);
+    TestRpcService proxy2 =
+        anotherUser.doAs(new PrivilegedAction<TestRpcService>() {
+          public TestRpcService run() {
+            try {
+              return RPC.getProxy(TestRpcService.class, 0,
+                  server.getListenerAddress(), conf);
+            } catch (IOException e) {
+              e.printStackTrace();
+            }
+            return null;
+          }
+        });
     try {
       proxy = getClient(addr, conf);
 
@@ -1033,6 +1048,7 @@ public class TestRPC extends TestRpcBase {
         proxy.ping(null, newEmptyRequest());
 
         proxy.echo(null, newEchoRequest("" + i));
+        proxy2.echo(null, newEchoRequest("" + i));
       }
       MetricsRecordBuilder rpcMetrics =
           getMetrics(server.getRpcMetrics().name());
@@ -1044,7 +1060,16 @@ public class TestRPC extends TestRpcBase {
           rpcMetrics);
       MetricsAsserts.assertQuantileGauges("RpcProcessingTime" + interval + "s",
           rpcMetrics);
+      String actualUserVsCon = MetricsAsserts
+          .getStringMetric("NumOpenConnectionsPerUser", rpcMetrics);
+      String proxyUser =
+          UserGroupInformation.getCurrentUser().getShortUserName();
+      assertTrue(actualUserVsCon.contains("\"" + proxyUser + "\":1"));
+      assertTrue(actualUserVsCon.contains("\"" + testUser + "\":1"));
     } finally {
+      if (proxy2 != null) {
+        RPC.stopProxy(proxy2);
+      }
       stop(server, proxy);
     }
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd373555/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MetricsAsserts.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MetricsAsserts.java
 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MetricsAsserts.java
index 982481e..5d87b07 100644
--- 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MetricsAsserts.java
+++ 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MetricsAsserts.java
@@ -236,6 +236,13 @@ public class MetricsAsserts {
     return captor.getValue();
   }
 
+  public static String getStringMetric(String name, MetricsRecordBuilder rb) {
+    ArgumentCaptor<String> captor = ArgumentCaptor.forClass(String.class);
+    verify(rb, atLeast(0)).tag(eqName(info(name, "")), captor.capture());
+    checkCaptured(captor, name);
+    return captor.getValue();
+  }
+
    /**
    * Assert a float gauge metric as expected
    * @param name  of the metric


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to