Repository: zookeeper Updated Branches: refs/heads/master fe25fed93 -> db074423f
ZOOKEEPER-3177: Refactor request throttle logic in NIO and Netty to keep the same behavior and make the code easier to maintain Author: Fangmin Lyu <allen...@fb.com> Reviewers: Enrico Olivelli <eolive...@gmail.com>, Venkateswarlu Tumati <tuma...@gmail.com>, Michael Han <h...@apache.org> Closes #673 from lvfangmin/ZOOKEEPER-3177 Project: http://git-wip-us.apache.org/repos/asf/zookeeper/repo Commit: http://git-wip-us.apache.org/repos/asf/zookeeper/commit/db074423 Tree: http://git-wip-us.apache.org/repos/asf/zookeeper/tree/db074423 Diff: http://git-wip-us.apache.org/repos/asf/zookeeper/diff/db074423 Branch: refs/heads/master Commit: db074423f09026446640242ecfcf26310467b1fa Parents: fe25fed Author: Fangmin Lyu <allen...@fb.com> Authored: Sat Nov 17 09:33:40 2018 -0800 Committer: Michael Han <h...@apache.org> Committed: Sat Nov 17 09:33:40 2018 -0800 ---------------------------------------------------------------------- .../apache/zookeeper/server/DumbWatcher.java | 3 +- .../apache/zookeeper/server/NIOServerCnxn.java | 47 +++----------------- .../zookeeper/server/NettyServerCnxn.java | 35 +++------------ .../org/apache/zookeeper/server/ServerCnxn.java | 44 +++++++++++++++--- .../zookeeper/server/ZooKeeperServer.java | 32 +++++++------ .../server/quorum/FollowerZooKeeperServer.java | 10 ++--- .../server/quorum/LeaderZooKeeperServer.java | 10 ++--- .../apache/zookeeper/server/MockServerCnxn.java | 6 ++- 8 files changed, 85 insertions(+), 102 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/zookeeper/blob/db074423/zookeeper-server/src/main/java/org/apache/zookeeper/server/DumbWatcher.java ---------------------------------------------------------------------- diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/DumbWatcher.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/DumbWatcher.java index ff17181..f384d7c 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/DumbWatcher.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/DumbWatcher.java @@ -41,6 +41,7 @@ public class DumbWatcher extends ServerCnxn { } public DumbWatcher(long sessionId) { + super(null); this.sessionId = sessionId; } @@ -75,7 +76,7 @@ public class DumbWatcher extends ServerCnxn { void enableRecv() { } @Override - void disableRecv() { } + void disableRecv(boolean waitDisableRecv) { } @Override protected ServerStats serverStats() { return null; } http://git-wip-us.apache.org/repos/asf/zookeeper/blob/db074423/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java ---------------------------------------------------------------------- diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java index c344c65..b48eb3d 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java @@ -77,25 +77,16 @@ public class NIOServerCnxn extends ServerCnxn { private int sessionTimeout; - private final ZooKeeperServer zkServer; - - /** - * The number of requests that have been submitted but not yet responded to. - */ - private final AtomicInteger outstandingRequests = new AtomicInteger(0); - /** * This is the id that uniquely identifies the session of a client. Once * this session is no longer active, the ephemeral nodes will go away. */ private long sessionId; - private final int outstandingLimit; - public NIOServerCnxn(ZooKeeperServer zk, SocketChannel sock, SelectionKey sk, NIOServerCnxnFactory factory, SelectorThread selectorThread) throws IOException { - this.zkServer = zk; + super(zk); this.sock = sock; this.sk = sk; this.factory = factory; @@ -103,11 +94,6 @@ public class NIOServerCnxn extends ServerCnxn { if (this.factory.login != null) { this.zooKeeperSaslServer = new ZooKeeperSaslServer(factory.login); } - if (zk != null) { - outstandingLimit = zk.getGlobalOutstandingLimit(); - } else { - outstandingLimit = 1; - } sock.socket().setTcpNoDelay(true); /* set socket linger to false, so that socket close does not block */ sock.socket().setSoLinger(false, -1); @@ -380,21 +366,6 @@ public class NIOServerCnxn extends ServerCnxn { zkServer.processPacket(this, incomingBuffer); } - // Only called as callback from zkServer.processPacket() - protected void incrOutstandingRequests(RequestHeader h) { - if (h.getXid() >= 0) { - outstandingRequests.incrementAndGet(); - // check throttling - int inProcess = zkServer.getInProcess(); - if (inProcess > outstandingLimit) { - if (LOG.isDebugEnabled()) { - LOG.debug("Throttling recv " + inProcess); - } - disableRecv(); - } - } - } - // returns whether we are interested in writing, which is determined // by whether we have any pending buffers on the output queue or not private boolean getWriteInterest() { @@ -411,7 +382,9 @@ public class NIOServerCnxn extends ServerCnxn { // Throttle acceptance of new requests. If this entailed a state change, // register an interest op update request with the selector. - public void disableRecv() { + // + // Don't support wait disable receive in NIO, ignore the parameter + public void disableRecv(boolean waitDisableRecv) { if (throttled.compareAndSet(false, true)) { requestInterestOpsUpdate(); } @@ -566,10 +539,6 @@ public class NIOServerCnxn extends ServerCnxn { return zkServer != null && zkServer.isRunning(); } - public long getOutstandingRequests() { - return outstandingRequests.get(); - } - /* * (non-Javadoc) * @@ -689,13 +658,7 @@ public class NIOServerCnxn extends ServerCnxn { public void sendResponse(ReplyHeader h, Record r, String tag) { try { super.sendResponse(h, r, tag); - if (h.getXid() > 0) { - // check throttling - if (outstandingRequests.decrementAndGet() < 1 || - zkServer.getInProcess() < outstandingLimit) { - enableRecv(); - } - } + decrOutstandingAndCheckThrottle(h); } catch(Exception e) { LOG.warn("Unexpected exception. Destruction averted.", e); } http://git-wip-us.apache.org/repos/asf/zookeeper/blob/db074423/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java ---------------------------------------------------------------------- diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java index ab3fd09..f0a8f7f 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java @@ -61,23 +61,16 @@ public class NettyServerCnxn extends ServerCnxn { ByteBuffer bbLen = ByteBuffer.allocate(4); long sessionId; int sessionTimeout; - AtomicLong outstandingCount = new AtomicLong(); Certificate[] clientChain; volatile boolean closingChannel; - /** The ZooKeeperServer for this connection. May be null if the server - * is not currently serving requests (for example if the server is not - * an active quorum participant. - */ - private volatile ZooKeeperServer zkServer; - NettyServerCnxnFactory factory; boolean initialized; NettyServerCnxn(Channel channel, ZooKeeperServer zks, NettyServerCnxnFactory factory) { + super(zks); this.channel = channel; this.closingChannel = false; - this.zkServer = zks; this.factory = factory; if (this.factory.login != null) { this.zooKeeperSaslServer = new ZooKeeperSaslServer(factory.login); @@ -189,12 +182,7 @@ public class NettyServerCnxn extends ServerCnxn { return; } super.sendResponse(h, r, tag); - if (h.getXid() > 0) { - // zks cannot be null otherwise we would not have gotten here! - if (!zkServer.shouldThrottle(outstandingCount.decrementAndGet())) { - enableRecv(); - } - } + decrOutstandingAndCheckThrottle(h); } @Override @@ -355,10 +343,6 @@ public class NettyServerCnxn extends ServerCnxn { } if (initialized) { zks.processPacket(this, bb); - - if (zks.shouldThrottle(outstandingCount.incrementAndGet())) { - disableRecvNoWait(); - } } else { LOG.debug("got conn req request from " + getRemoteSocketAddress()); @@ -420,21 +404,16 @@ public class NettyServerCnxn extends ServerCnxn { } @Override - public void disableRecv() { - disableRecvNoWait().awaitUninterruptibly(); - } - - private ChannelFuture disableRecvNoWait() { + public void disableRecv(boolean waitDisableRecv) { throttled = true; if (LOG.isDebugEnabled()) { LOG.debug("Throttling - disabling recv " + this); } - return channel.setReadable(false); - } + ChannelFuture cf = channel.setReadable(false); - @Override - public long getOutstandingRequests() { - return outstandingCount.longValue(); + if (waitDisableRecv) { + cf.awaitUninterruptibly(); + } } @Override http://git-wip-us.apache.org/repos/asf/zookeeper/blob/db074423/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerCnxn.java ---------------------------------------------------------------------- diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerCnxn.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerCnxn.java index 0822f19..8e145cb 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerCnxn.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerCnxn.java @@ -68,8 +68,39 @@ public abstract class ServerCnxn implements Stats, Watcher { private volatile boolean stale = false; + AtomicLong outstandingCount = new AtomicLong(); + + /** The ZooKeeperServer for this connection. May be null if the server + * is not currently serving requests (for example if the server is not + * an active quorum participant. + */ + final ZooKeeperServer zkServer; + + public ServerCnxn(final ZooKeeperServer zkServer) { + this.zkServer = zkServer; + } + abstract int getSessionTimeout(); + public void incrOutstandingAndCheckThrottle(RequestHeader h) { + if (h.getXid() <= 0) { + return; + } + if (zkServer.shouldThrottle(outstandingCount.incrementAndGet())) { + disableRecv(false); + } + } + + // will be called from zkServer.processPacket + public void decrOutstandingAndCheckThrottle(ReplyHeader h) { + if (h.getXid() <= 0) { + return; + } + if (!zkServer.shouldThrottle(outstandingCount.decrementAndGet())) { + enableRecv(); + } + } + abstract void close(); public void sendResponse(ReplyHeader h, Record r, String tag) throws IOException { @@ -119,8 +150,12 @@ public abstract class ServerCnxn implements Stats, Watcher { abstract void enableRecv(); - abstract void disableRecv(); + void disableRecv() { + disableRecv(true); + } + abstract void disableRecv(boolean waitDisableRecv); + abstract void setSessionTimeout(int sessionTimeout); protected ZooKeeperSaslServer zooKeeperSaslServer = null; @@ -207,9 +242,6 @@ public abstract class ServerCnxn implements Stats, Watcher { return packetsReceived.incrementAndGet(); } - protected void incrOutstandingRequests(RequestHeader h) { - } - protected long incrPacketsSent() { return packetsSent.incrementAndGet(); } @@ -241,7 +273,9 @@ public abstract class ServerCnxn implements Stats, Watcher { return (Date)established.clone(); } - public abstract long getOutstandingRequests(); + public long getOutstandingRequests() { + return outstandingCount.longValue(); + } public long getPacketsReceived() { return packetsReceived.longValue(); http://git-wip-us.apache.org/repos/asf/zookeeper/blob/db074423/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java ---------------------------------------------------------------------- diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java index 02df585..3ab81e7 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java @@ -86,10 +86,16 @@ import org.slf4j.LoggerFactory; public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { protected static final Logger LOG; + public static final String GLOBAL_OUTSTANDING_LIMIT = "zookeeper.globalOutstandingLimit"; + protected static int globalOutstandingLimit = 1000; + static { LOG = LoggerFactory.getLogger(ZooKeeperServer.class); Environment.logEnv("Server environment:", LOG); + + globalOutstandingLimit = Integer.getInteger(GLOBAL_OUTSTANDING_LIMIT, 1000); + LOG.info("{} = {}", GLOBAL_OUTSTANDING_LIMIT, globalOutstandingLimit); } protected ZooKeeperServerBean jmxServerBean; @@ -858,17 +864,6 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { } } - public int getGlobalOutstandingLimit() { - String sc = System.getProperty("zookeeper.globalOutstandingLimit"); - int limit; - try { - limit = Integer.parseInt(sc); - } catch (Exception e) { - limit = 1000; - } - return limit; - } - public void setServerCnxnFactory(ServerCnxnFactory factory) { serverCnxnFactory = factory; } @@ -1095,7 +1090,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { } public boolean shouldThrottle(long outStandingCount) { - if (getGlobalOutstandingLimit() < getInProcess()) { + if (globalOutstandingLimit < getInProcess()) { return outStandingCount > 0; } return false; @@ -1107,6 +1102,18 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { BinaryInputArchive bia = BinaryInputArchive.getArchive(bais); RequestHeader h = new RequestHeader(); h.deserialize(bia, "header"); + + // Need to increase the outstanding request count first, otherwise + // there might be a race condition that it enabled recv after + // processing request and then disabled when check throttling. + // + // Be aware that we're actually checking the global outstanding + // request before this request. + // + // It's fine if the IOException thrown before we decrease the count + // in cnxn, since it will close the cnxn anyway. + cnxn.incrOutstandingAndCheckThrottle(h); + // Through the magic of byte buffers, txn will not be // pointing // to the start of the txn @@ -1157,7 +1164,6 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { cnxn.sendResponse(rh,rsp, "response"); // not sure about 3rd arg..what is it? return; } else { - cnxn.incrOutstandingRequests(h); Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(), h.getType(), incomingBuffer, cnxn.getAuthInfo()); si.setOwner(ServerCnxn.me); http://git-wip-us.apache.org/repos/asf/zookeeper/blob/db074423/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java ---------------------------------------------------------------------- diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java index 35ef055..ec529de 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java @@ -60,6 +60,10 @@ public class FollowerZooKeeperServer extends LearnerZooKeeperServer { super(logFactory, self.tickTime, self.minSessionTimeout, self.maxSessionTimeout, zkDb, self); this.pendingSyncs = new ConcurrentLinkedQueue<Request>(); + + int divisor = self.getQuorumSize() > 2 ? self.getQuorumSize() - 1 : 1; + globalOutstandingLimit = Integer.getInteger(GLOBAL_OUTSTANDING_LIMIT, 1000) / divisor; + LOG.info("Override {} to {}", GLOBAL_OUTSTANDING_LIMIT, globalOutstandingLimit); } public Follower getFollower(){ @@ -123,12 +127,6 @@ public class FollowerZooKeeperServer extends LearnerZooKeeperServer { } @Override - public int getGlobalOutstandingLimit() { - int divisor = self.getQuorumSize() > 2 ? self.getQuorumSize() - 1 : 1; - return super.getGlobalOutstandingLimit() / divisor; - } - - @Override public String getState() { return "follower"; } http://git-wip-us.apache.org/repos/asf/zookeeper/blob/db074423/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java ---------------------------------------------------------------------- diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java index 4f8c095..c6f60e1 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java @@ -55,6 +55,10 @@ public class LeaderZooKeeperServer extends QuorumZooKeeperServer { */ LeaderZooKeeperServer(FileTxnSnapLog logFactory, QuorumPeer self, ZKDatabase zkDb) throws IOException { super(logFactory, self.tickTime, self.minSessionTimeout, self.maxSessionTimeout, zkDb, self); + + int divisor = self.getQuorumSize() > 2 ? self.getQuorumSize() - 1 : 1; + globalOutstandingLimit = Integer.getInteger(GLOBAL_OUTSTANDING_LIMIT, 1000) / divisor; + LOG.info("Override {} to {}", GLOBAL_OUTSTANDING_LIMIT, globalOutstandingLimit); } public Leader getLeader(){ @@ -103,12 +107,6 @@ public class LeaderZooKeeperServer extends QuorumZooKeeperServer { } @Override - public int getGlobalOutstandingLimit() { - int divisor = self.getQuorumSize() > 2 ? self.getQuorumSize() - 1 : 1; - return super.getGlobalOutstandingLimit() / divisor; - } - - @Override public void createSessionTracker() { sessionTracker = new LeaderSessionTracker( this, getZKDatabase().getSessionWithTimeOuts(), http://git-wip-us.apache.org/repos/asf/zookeeper/blob/db074423/zookeeper-server/src/test/java/org/apache/zookeeper/server/MockServerCnxn.java ---------------------------------------------------------------------- diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/MockServerCnxn.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/MockServerCnxn.java index 7ae0004..20cf36d 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/MockServerCnxn.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/MockServerCnxn.java @@ -29,6 +29,10 @@ public class MockServerCnxn extends ServerCnxn { public Certificate[] clientChain; public boolean secure; + public MockServerCnxn() { + super(null); + } + @Override int getSessionTimeout() { return 0; @@ -84,7 +88,7 @@ public class MockServerCnxn extends ServerCnxn { } @Override - void disableRecv() { + void disableRecv(boolean waitDisableRecv) { } @Override