This is an automated email from the ASF dual-hosted git repository. fangmin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/zookeeper.git
The following commit(s) were added to refs/heads/master by this push: new d8db889 ZOOKEEPER-3326: Add session/connection related metrics d8db889 is described below commit d8db88914e7af7c11a2588be202d90b925791d31 Author: Jie Huang <jiehu...@fb.com> AuthorDate: Mon Apr 22 10:15:22 2019 -0700 ZOOKEEPER-3326: Add session/connection related metrics Author: Jie Huang <jiehu...@fb.com> Reviewers: eolive...@apache.org, fang...@apache.org Closes #861 from jhuan31/ZOOKEEPER-3326 --- .../org/apache/zookeeper/server/NIOServerCnxn.java | 2 + .../zookeeper/server/NIOServerCnxnFactory.java | 1 + .../apache/zookeeper/server/NettyServerCnxn.java | 1 + .../org/apache/zookeeper/server/ServerMetrics.java | 18 ++ .../zookeeper/server/SessionTrackerImpl.java | 1 + .../apache/zookeeper/server/ZooKeeperServer.java | 2 + .../zookeeper/server/quorum/LearnerHandler.java | 1 + .../zookeeper/server/ConnectionMetricsTest.java | 239 +++++++++++++++++++++ 8 files changed, 265 insertions(+) diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java index c47267c..c8ee076 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java @@ -155,6 +155,7 @@ public class NIOServerCnxn extends ServerCnxn { if (incomingBuffer.remaining() != 0) { // have we read length bytes? int rc = sock.read(incomingBuffer); // sock is non-blocking, so ok if (rc < 0) { + ServerMetrics.getMetrics().CONNECTION_DROP_COUNT.add(1); throw new EndOfStreamException( "Unable to read additional data from client sessionid 0x" + Long.toHexString(sessionId) @@ -314,6 +315,7 @@ public class NIOServerCnxn extends ServerCnxn { if (k.isReadable()) { int rc = sock.read(incomingBuffer); if (rc < 0) { + ServerMetrics.getMetrics().CONNECTION_DROP_COUNT.add(1); throw new EndOfStreamException( "Unable to read additional data from client sessionid 0x" + Long.toHexString(sessionId) diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxnFactory.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxnFactory.java index 23ec3e6..289c7a2 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxnFactory.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxnFactory.java @@ -576,6 +576,7 @@ public class NIOServerCnxnFactory extends ServerCnxnFactory { continue; } for (NIOServerCnxn conn : cnxnExpiryQueue.poll()) { + ServerMetrics.getMetrics().SESSIONLESS_CONNECTIONS_EXPIRED.add(1); conn.close(); } } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java index 571c88a..50cd674 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java @@ -124,6 +124,7 @@ public class NettyServerCnxn extends ServerCnxn { } }); } else { + ServerMetrics.getMetrics().CONNECTION_DROP_COUNT.add(1); channel.eventLoop().execute(this::releaseQueuedBuffer); } } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java index cdd57f0..5fefdfc 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java @@ -122,6 +122,15 @@ public final class ServerMetrics { OUTSTANDING_CHANGES_REMOVED = metricsContext.getCounter("outstanding_changes_removed"); PREP_PROCESS_TIME = metricsContext.getSummary("prep_process_time", DetailLevel.BASIC); CLOSE_SESSION_PREP_TIME = metricsContext.getSummary("close_session_prep_time", DetailLevel.ADVANCED); + + REVALIDATE_COUNT = metricsContext.getCounter("revalidate_count"); + CONNECTION_DROP_COUNT = metricsContext.getCounter("connection_drop_count"); + CONNECTION_REVALIDATE_COUNT = metricsContext.getCounter("connection_revalidate_count"); + + // Expiry queue stats + SESSIONLESS_CONNECTIONS_EXPIRED = metricsContext.getCounter("sessionless_connections_expired"); + STALE_SESSIONS_EXPIRED = metricsContext.getCounter("stale_sessions_expired"); + } /** @@ -167,6 +176,15 @@ public final class ServerMetrics { public final Counter SNAP_COUNT; public final Counter COMMIT_COUNT; public final Counter CONNECTION_REQUEST_COUNT; + + public final Counter REVALIDATE_COUNT; + public final Counter CONNECTION_DROP_COUNT; + public final Counter CONNECTION_REVALIDATE_COUNT; + + // Expiry queue stats + public final Counter SESSIONLESS_CONNECTIONS_EXPIRED; + public final Counter STALE_SESSIONS_EXPIRED; + // Connection throttling related public final Summary CONNECTION_TOKEN_DEFICIT; public final Counter CONNECTION_REJECTED; diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/SessionTrackerImpl.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/SessionTrackerImpl.java index 0699620..a984f20 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/SessionTrackerImpl.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/SessionTrackerImpl.java @@ -153,6 +153,7 @@ public class SessionTrackerImpl extends ZooKeeperCriticalThread implements } for (SessionImpl s : sessionExpiryQueue.poll()) { + ServerMetrics.getMetrics().STALE_SESSIONS_EXPIRED.add(1); setSessionClosing(s.sessionId); expirer.expire(s); } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java index 2aec4ec..262df5f 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java @@ -1180,6 +1180,8 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { } cnxn.setSessionId(sessionId); reopenSession(cnxn, sessionId, passwd, sessionTimeout); + ServerMetrics.getMetrics().CONNECTION_REVALIDATE_COUNT.add(1); + } } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java index 0f953c8..539f579 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java @@ -582,6 +582,7 @@ public class LearnerHandler extends ZooKeeperThread { } break; case Leader.REVALIDATE: + ServerMetrics.getMetrics().REVALIDATE_COUNT.add(1); learnerMaster.revalidateSession(qp, this); break; case Leader.REQUEST: diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/ConnectionMetricsTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/ConnectionMetricsTest.java new file mode 100644 index 0000000..d6ade36 --- /dev/null +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/ConnectionMetricsTest.java @@ -0,0 +1,239 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server; + +import io.netty.channel.Channel; +import io.netty.channel.EventLoop; +import org.apache.zookeeper.PortAssignment; +import org.apache.zookeeper.ZKTestCase; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.metrics.MetricsUtils; +import org.apache.zookeeper.test.ClientBase; +import org.apache.zookeeper.test.QuorumUtil; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.nio.ByteBuffer; +import java.nio.channels.*; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import static org.apache.zookeeper.server.NIOServerCnxnFactory.ZOOKEEPER_NIO_SESSIONLESS_CNXN_TIMEOUT; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class ConnectionMetricsTest extends ZKTestCase { + protected static final Logger LOG = LoggerFactory.getLogger(ConnectionMetricsTest.class); + + @Test + public void testRevalidateCount() throws Exception { + ServerMetrics.getMetrics().resetAll(); + QuorumUtil util = new QuorumUtil(1); // create a quorum of 3 servers + // disable local session to make sure we create a global session + util.enableLocalSession(false); + util.startAll(); + + int follower1 = (int)util.getFollowerQuorumPeers().get(0).getId(); + int follower2 = (int)util.getFollowerQuorumPeers().get(1).getId(); + LOG.info("connecting to server: {}", follower1); + ClientBase.CountdownWatcher watcher = new ClientBase.CountdownWatcher(); + // create a connection to follower + ZooKeeper zk = new ZooKeeper(util.getConnectionStringForServer(follower1), ClientBase.CONNECTION_TIMEOUT, watcher); + watcher.waitForConnected(ClientBase.CONNECTION_TIMEOUT); + LOG.info("connected"); + + // update the connection to allow to connect to the other follower + zk.updateServerList(util.getConnectionStringForServer(follower2)); + + // follower is shut down and zk should be disconnected + util.shutdown(follower1); + watcher.waitForDisconnected(ClientBase.CONNECTION_TIMEOUT); + LOG.info("disconnected"); + // should reconnect to another follower, will ask leader to revalidate + watcher.waitForConnected(ClientBase.CONNECTION_TIMEOUT); + LOG.info("reconnected"); + + Map<String, Object> values = MetricsUtils.currentServerMetrics(); + Assert.assertEquals(1L, values.get("connection_revalidate_count")); + Assert.assertEquals(1L, values.get("revalidate_count")); + + zk.close(); + util.shutdownAll(); + } + + + private class MockNIOServerCnxn extends NIOServerCnxn { + public MockNIOServerCnxn(ZooKeeperServer zk, SocketChannel sock, + SelectionKey sk, NIOServerCnxnFactory factory, + NIOServerCnxnFactory.SelectorThread selectorThread) throws IOException { + super(zk, sock, sk, factory, selectorThread); + } + + @Override + protected boolean isSocketOpen() { + return true; + } + } + + private static class FakeSK extends SelectionKey { + + @Override + public SelectableChannel channel() { + return null; + } + + @Override + public Selector selector() { + return mock(Selector.class); + } + + @Override + public boolean isValid() { + return true; + } + + @Override + public void cancel() { + } + + @Override + public int interestOps() { + return ops; + } + + private int ops = OP_WRITE + OP_READ; + + @Override + public SelectionKey interestOps(int ops) { + this.ops = ops; + return this; + } + + @Override + public int readyOps() { + return ops; + } + + } + + private NIOServerCnxn createMockNIOCnxn() throws IOException { + InetSocketAddress socketAddr = new InetSocketAddress(80); + Socket socket = mock(Socket.class); + when(socket.getRemoteSocketAddress()).thenReturn(socketAddr); + SocketChannel sock = mock(SocketChannel.class); + when(sock.socket()).thenReturn(socket); + when(sock.read(any(ByteBuffer.class))).thenReturn(-1); + + return new MockNIOServerCnxn(mock(ZooKeeperServer.class), sock, null, mock(NIOServerCnxnFactory.class), null); + } + + @Test + public void testNIOConnectionDropCount() throws Exception { + ServerMetrics.getMetrics().resetAll(); + + NIOServerCnxn cnxn = createMockNIOCnxn(); + cnxn.doIO(new FakeSK()); + + Map<String, Object> values = MetricsUtils.currentServerMetrics(); + Assert.assertEquals(1L, values.get("connection_drop_count")); + } + + @Test + public void testNettyConnectionDropCount() throws Exception { + InetSocketAddress socketAddr = new InetSocketAddress(80); + Channel channel = mock(Channel.class); + when(channel.isOpen()).thenReturn(false); + when(channel.remoteAddress()).thenReturn(socketAddr); + EventLoop eventLoop = mock(EventLoop.class); + when(channel.eventLoop()).thenReturn(eventLoop); + + ServerMetrics.getMetrics().resetAll(); + + NettyServerCnxnFactory factory = new NettyServerCnxnFactory(); + NettyServerCnxn cnxn = new NettyServerCnxn(channel, mock(ZooKeeperServer.class), factory); + + // pretend it's connected + factory.cnxns.add(cnxn); + cnxn.close(); + + Map<String, Object> values = MetricsUtils.currentServerMetrics(); + Assert.assertEquals(1L, values.get("connection_drop_count")); + } + + @Test + public void testSessionlessConnectionsExpired() throws Exception { + ServerCnxnFactory factory = new NIOServerCnxnFactory(); + factory.configure(new InetSocketAddress(PortAssignment.unique()), 1000); + factory.start(); + int timeout = Integer.getInteger( + ZOOKEEPER_NIO_SESSIONLESS_CNXN_TIMEOUT, 10000); + + ServerMetrics.getMetrics().resetAll(); + // add two connections w/o touching them so they will expire + ((NIOServerCnxnFactory) factory).touchCnxn(createMockNIOCnxn()); + ((NIOServerCnxnFactory) factory).touchCnxn(createMockNIOCnxn()); + + Map<String, Object> values = MetricsUtils.currentServerMetrics(); + int sleptTime = 0; + while (values.get("sessionless_connections_expired") == null || sleptTime < 2*timeout){ + Thread.sleep(100); + sleptTime += 100; + values = MetricsUtils.currentServerMetrics(); + } + + Assert.assertEquals(2L, values.get("sessionless_connections_expired")); + + factory.shutdown(); + } + + @Test + public void testStaleSessionsExpired() throws Exception { + int tickTime = 1000; + SessionTrackerImpl tracker = new SessionTrackerImpl(mock(ZooKeeperServer.class), + new ConcurrentHashMap<>(), tickTime, 1L, null); + + tracker.sessionsById.put(1L, mock(SessionTrackerImpl.SessionImpl.class)); + tracker.sessionsById.put(2L, mock(SessionTrackerImpl.SessionImpl.class)); + + tracker.touchSession(1L, tickTime); + tracker.touchSession(2L, tickTime); + + ServerMetrics.getMetrics().resetAll(); + + tracker.start(); + + Map<String, Object> values = MetricsUtils.currentServerMetrics(); + int sleptTime = 0; + while (values.get("stale_sessions_expired") == null || sleptTime < 2*tickTime) { + Thread.sleep(100); + sleptTime += 100; + values = MetricsUtils.currentServerMetrics(); + } + + Assert.assertEquals(2L, values.get("stale_sessions_expired")); + + tracker.shutdown(); + } +}