This is an automated email from the ASF dual-hosted git repository. hanm pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/zookeeper.git
The following commit(s) were added to refs/heads/master by this push: new eecd9e7 ZOOKEEPER-3492: Add weights to server side connection throttling eecd9e7 is described below commit eecd9e7ce046083bd40cd6134bdb2b405d01fe67 Author: Jie Huang <jiehu...@fb.com> AuthorDate: Thu Sep 5 14:12:50 2019 -0700 ZOOKEEPER-3492: Add weights to server side connection throttling Author: Jie Huang <jiehu...@fb.com> Reviewers: Michael Han <h...@apache.org>, Enrico Olivelli <eolive...@gmail.com> Closes #1037 from jhuan31/ZOOKEEPER-3492 --- .../src/main/resources/markdown/zookeeperAdmin.md | 33 ++++- .../org/apache/zookeeper/server/BlueThrottle.java | 119 ++++++++++++++++-- .../apache/zookeeper/server/SessionTracker.java | 1 + .../zookeeper/server/SessionTrackerImpl.java | 4 + .../apache/zookeeper/server/ZooKeeperServer.java | 34 ++++-- .../server/quorum/LeaderSessionTracker.java | 1 - .../server/quorum/UpgradeableSessionTracker.java | 5 + .../apache/zookeeper/server/BlueThrottleTest.java | 134 +++++++++++++++++++++ .../zookeeper/server/PrepRequestProcessorTest.java | 4 + 9 files changed, 311 insertions(+), 24 deletions(-) diff --git a/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md b/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md index c4cc747..5113eaa 100644 --- a/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md +++ b/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md @@ -124,8 +124,8 @@ is no full support. #### Required Software -ZooKeeper runs in Java, release 1.8 or greater -(JDK 8 LTS, JDK 11 LTS, JDK 12 - Java 9 and 10 are not supported). +ZooKeeper runs in Java, release 1.8 or greater +(JDK 8 LTS, JDK 11 LTS, JDK 12 - Java 9 and 10 are not supported). It runs as an _ensemble_ of ZooKeeper servers. Three ZooKeeper servers is the minimum recommended size for an ensemble, and we also recommend that they run on separate @@ -822,6 +822,27 @@ property, when available, is noted below. dropping. This parameter defines the threshold to decrease the dropping probability. The default is 0. +* *zookeeper.connection_throttle_weight_enabled* : + (Java system property only) + **New in 3.6.0:** + Whether to consider connection weights when throttling. Only useful when connection throttle is enabled, that is, connectionMaxTokens is larger than 0. The default is false. + +* *zookeeper.connection_throttle_global_session_weight* : + (Java system property only) + **New in 3.6.0:** + The weight of a global session. It is the number of tokens required for a global session request to get through the connection throttler. It has to be a positive integer no smaller than the weight of a local session. The default is 3. + +* *zookeeper.connection_throttle_local_session_weight* : + (Java system property only) + **New in 3.6.0:** + The weight of a local session. It is the number of tokens required for a local session request to get through the connection throttler. It has to be a positive integer no larger than the weight of a global session or a renew session. The default is 1. + +* *zookeeper.connection_throttle_renew_session_weight* : + (Java system property only) + **New in 3.6.0:** + The weight of renewing a session. It is also the number of tokens required for a reconnect request to get through the throttler. It has to be a positive integer no smaller than the weight of a local session. The default is 2. + + * *clientPortListenBacklog* : **New in 3.4.14, 3.5.5, 3.6.0:** The socket backlog length for the ZooKeeper server socket. This controls @@ -889,7 +910,7 @@ property, when available, is noted below. * *advancedFlowControlEnabled* : (Java system property: **zookeeper.netty.advancedFlowControl.enabled**) - Using accurate flow control in netty based on the status of ZooKeeper + Using accurate flow control in netty based on the status of ZooKeeper pipeline to avoid direct buffer OOM. It will disable the AUTO_READ in Netty. @@ -958,9 +979,9 @@ of servers -- that is, when deploying clusters of servers. * *connectToLearnerMasterLimit* : (Java system property: zookeeper.**connectToLearnerMasterLimit**) Amount of time, in ticks (see [tickTime](#id_tickTime)), to allow followers to - connect to the leader after leader election. Defaults to the value of initLimit. + connect to the leader after leader election. Defaults to the value of initLimit. Use when initLimit is high so connecting to learner master doesn't result in higher timeout. - + * *leaderServes* : (Java system property: zookeeper.**leaderServes**) Leader accepts client connections. Default value is "yes". @@ -1568,7 +1589,7 @@ options are used to configure the [AdminServer](#sc_adminserver). ### Metrics Providers -**New in 3.6.0:** The following options are used to configure metrics. +**New in 3.6.0:** The following options are used to configure metrics. By default ZooKeeper server exposes useful metrics using the [AdminServer](#sc_adminserver). and [Four Letter Words](#sc_4lw) interface. diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/BlueThrottle.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/BlueThrottle.java index 3895c2e..9f03e44 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/BlueThrottle.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/BlueThrottle.java @@ -20,6 +20,8 @@ package org.apache.zookeeper.server; import java.util.Random; import org.apache.zookeeper.common.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Implements a token-bucket based rate limiting mechanism with optional @@ -69,6 +71,7 @@ import org.apache.zookeeper.common.Time; **/ public class BlueThrottle { + private static final Logger LOG = LoggerFactory.getLogger(BlueThrottle.class); private int maxTokens; private int fillTime; @@ -86,35 +89,115 @@ public class BlueThrottle { Random rng; public static final String CONNECTION_THROTTLE_TOKENS = "zookeeper.connection_throttle_tokens"; - public static final int DEFAULT_CONNECTION_THROTTLE_TOKENS; + private static final int DEFAULT_CONNECTION_THROTTLE_TOKENS; public static final String CONNECTION_THROTTLE_FILL_TIME = "zookeeper.connection_throttle_fill_time"; - public static final int DEFAULT_CONNECTION_THROTTLE_FILL_TIME; + private static final int DEFAULT_CONNECTION_THROTTLE_FILL_TIME; public static final String CONNECTION_THROTTLE_FILL_COUNT = "zookeeper.connection_throttle_fill_count"; - public static final int DEFAULT_CONNECTION_THROTTLE_FILL_COUNT; + private static final int DEFAULT_CONNECTION_THROTTLE_FILL_COUNT; public static final String CONNECTION_THROTTLE_FREEZE_TIME = "zookeeper.connection_throttle_freeze_time"; - public static final int DEFAULT_CONNECTION_THROTTLE_FREEZE_TIME; + private static final int DEFAULT_CONNECTION_THROTTLE_FREEZE_TIME; public static final String CONNECTION_THROTTLE_DROP_INCREASE = "zookeeper.connection_throttle_drop_increase"; - public static final double DEFAULT_CONNECTION_THROTTLE_DROP_INCREASE; + private static final double DEFAULT_CONNECTION_THROTTLE_DROP_INCREASE; public static final String CONNECTION_THROTTLE_DROP_DECREASE = "zookeeper.connection_throttle_drop_decrease"; - public static final double DEFAULT_CONNECTION_THROTTLE_DROP_DECREASE; + private static final double DEFAULT_CONNECTION_THROTTLE_DROP_DECREASE; public static final String CONNECTION_THROTTLE_DECREASE_RATIO = "zookeeper.connection_throttle_decrease_ratio"; - public static final double DEFAULT_CONNECTION_THROTTLE_DECREASE_RATIO; + private static final double DEFAULT_CONNECTION_THROTTLE_DECREASE_RATIO; + + public static final String WEIGHED_CONNECTION_THROTTLE = "zookeeper.connection_throttle_weight_enabled"; + private static boolean connectionWeightEnabled; + + public static final String GLOBAL_SESSION_WEIGHT = "zookeeper.connection_throttle_global_session_weight"; + private static final int DEFAULT_GLOBAL_SESSION_WEIGHT; + + public static final String LOCAL_SESSION_WEIGHT = "zookeeper.connection_throttle_local_session_weight"; + private static final int DEFAULT_LOCAL_SESSION_WEIGHT; + + public static final String RENEW_SESSION_WEIGHT = "zookeeper.connection_throttle_renew_session_weight"; + private static final int DEFAULT_RENEW_SESSION_WEIGHT; + + // for unit tests only + protected static void setConnectionWeightEnabled(boolean enabled) { + connectionWeightEnabled = enabled; + logWeighedThrottlingSetting(); + } + + private static void logWeighedThrottlingSetting() { + if (connectionWeightEnabled) { + LOG.info("Weighed connection throttling is enabled. " + + "But it will only be effective if connection throttling is enabled"); + LOG.info( + "The weights for different session types are: global {} renew {} local {}", + DEFAULT_GLOBAL_SESSION_WEIGHT, + DEFAULT_RENEW_SESSION_WEIGHT, + DEFAULT_LOCAL_SESSION_WEIGHT + ); + } else { + LOG.info("Weighed connection throttling is disabled"); + } + } static { - DEFAULT_CONNECTION_THROTTLE_TOKENS = Integer.getInteger(CONNECTION_THROTTLE_TOKENS, 0); - DEFAULT_CONNECTION_THROTTLE_FILL_TIME = Integer.getInteger(CONNECTION_THROTTLE_FILL_TIME, 1); - DEFAULT_CONNECTION_THROTTLE_FILL_COUNT = Integer.getInteger(CONNECTION_THROTTLE_FILL_COUNT, 1); + int tokens = Integer.getInteger(CONNECTION_THROTTLE_TOKENS, 0); + int fillCount = Integer.getInteger(CONNECTION_THROTTLE_FILL_COUNT, 1); + + connectionWeightEnabled = Boolean.getBoolean(WEIGHED_CONNECTION_THROTTLE); + + // if not specified, the weights for a global session, a local session, and a renew session + // are 3, 1, 2 respectively. The weight for a global session is 3 because in our connection benchmarking, + // the throughput of global sessions is about one third of that of local sessions. Renewing a session + // requires is more expensive than establishing a local session and cheaper than creating a global session so + // its default weight is set to 2. + int globalWeight = Integer.getInteger(GLOBAL_SESSION_WEIGHT, 3); + int localWeight = Integer.getInteger(LOCAL_SESSION_WEIGHT, 1); + int renewWeight = Integer.getInteger(RENEW_SESSION_WEIGHT, 2); + + if (globalWeight <= 0) { + LOG.warn("Invalid global session weight {}. It should be larger than 0", globalWeight); + DEFAULT_GLOBAL_SESSION_WEIGHT = 3; + } else if (globalWeight < localWeight) { + LOG.warn("The global session weight {} is less than the local session weight {}. Use the local session weight.", + globalWeight, localWeight); + DEFAULT_GLOBAL_SESSION_WEIGHT = localWeight; + } else { + DEFAULT_GLOBAL_SESSION_WEIGHT = globalWeight; + } + if (localWeight <= 0) { + LOG.warn("Invalid local session weight {}. It should be larger than 0", localWeight); + DEFAULT_LOCAL_SESSION_WEIGHT = 1; + } else { + DEFAULT_LOCAL_SESSION_WEIGHT = localWeight; + } + + if (renewWeight <= 0) { + LOG.warn("Invalid renew session weight {}. It should be larger than 0", renewWeight); + DEFAULT_RENEW_SESSION_WEIGHT = 2; + } else if (renewWeight < localWeight) { + LOG.warn("The renew session weight {} is less than the local session weight {}. Use the local session weight.", + renewWeight, localWeight); + DEFAULT_RENEW_SESSION_WEIGHT = localWeight; + } else { + DEFAULT_RENEW_SESSION_WEIGHT = renewWeight; + } + + // This is based on the assumption that tokens set in config are for global sessions + DEFAULT_CONNECTION_THROTTLE_TOKENS = connectionWeightEnabled + ? DEFAULT_GLOBAL_SESSION_WEIGHT * tokens : tokens; + DEFAULT_CONNECTION_THROTTLE_FILL_TIME = Integer.getInteger(CONNECTION_THROTTLE_FILL_TIME, 1); + DEFAULT_CONNECTION_THROTTLE_FILL_COUNT = connectionWeightEnabled + ? DEFAULT_GLOBAL_SESSION_WEIGHT * fillCount : fillCount; DEFAULT_CONNECTION_THROTTLE_FREEZE_TIME = Integer.getInteger(CONNECTION_THROTTLE_FREEZE_TIME, -1); DEFAULT_CONNECTION_THROTTLE_DROP_INCREASE = getDoubleProp(CONNECTION_THROTTLE_DROP_INCREASE, 0.02); DEFAULT_CONNECTION_THROTTLE_DROP_DECREASE = getDoubleProp(CONNECTION_THROTTLE_DROP_DECREASE, 0.002); DEFAULT_CONNECTION_THROTTLE_DECREASE_RATIO = getDoubleProp(CONNECTION_THROTTLE_DECREASE_RATIO, 0); + + logWeighedThrottlingSetting(); } /* Varation of Integer.getInteger for real number properties */ @@ -212,6 +295,22 @@ public class BlueThrottle { return maxTokens - tokens; } + public int getRequiredTokensForGlobal() { + return BlueThrottle.DEFAULT_GLOBAL_SESSION_WEIGHT; + } + + public int getRequiredTokensForLocal() { + return BlueThrottle.DEFAULT_LOCAL_SESSION_WEIGHT; + } + + public int getRequiredTokensForRenew() { + return BlueThrottle.DEFAULT_RENEW_SESSION_WEIGHT; + } + + public boolean isConnectionWeightEnabled() { + return BlueThrottle.connectionWeightEnabled; + } + public synchronized boolean checkLimit(int need) { // A maxTokens setting of zero disables throttling if (maxTokens == 0) { diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/SessionTracker.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/SessionTracker.java index 8a3bb1e..9cf4774 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/SessionTracker.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/SessionTracker.java @@ -137,4 +137,5 @@ public interface SessionTracker { */ long getLocalSessionCount(); + boolean isLocalSessionsEnabled(); } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/SessionTrackerImpl.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/SessionTrackerImpl.java index 07b3fae..755512e 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/SessionTrackerImpl.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/SessionTrackerImpl.java @@ -342,4 +342,8 @@ public class SessionTrackerImpl extends ZooKeeperCriticalThread implements Sessi return 0; } + @Override + public boolean isLocalSessionsEnabled() { + return false; + } } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java index 01748e6..95aaed3 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java @@ -148,6 +148,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { protected String initialConfig; private final RequestPathMetricsCollector requestPathMetricsCollector; + private boolean localSessionEnabled = false; protected enum State { INITIAL, RUNNING, @@ -598,7 +599,10 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { registerMetrics(); setState(State.RUNNING); + requestPathMetricsCollector.start(); + + localSessionEnabled = sessionTracker.isLocalSessionsEnabled(); notifyAll(); } @@ -1212,12 +1216,9 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { return connThrottle.getDropChance(); } - public void processConnectRequest(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException, ClientCnxnLimitException { - - if (!connThrottle.checkLimit(1)) { - throw new ClientCnxnLimitException(); - } - ServerMetrics.getMetrics().CONNECTION_TOKEN_DEFICIT.add(connThrottle.getDeficit()); + @SuppressFBWarnings(value = "IS2_INCONSISTENT_SYNC", justification = "the value won't change after startup") + public void processConnectRequest(ServerCnxn cnxn, ByteBuffer incomingBuffer) + throws IOException, ClientCnxnLimitException { BinaryInputArchive bia = BinaryInputArchive.getArchive(new ByteBufferInputStream(incomingBuffer)); ConnectRequest connReq = new ConnectRequest(); @@ -1226,7 +1227,27 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { LOG.debug("Session establishment request from client " + cnxn.getRemoteSocketAddress() + " client's lastZxid is 0x" + Long.toHexString(connReq.getLastZxidSeen())); } + long sessionId = connReq.getSessionId(); + int tokensNeeded = 1; + if (connThrottle.isConnectionWeightEnabled()) { + if (sessionId == 0) { + if (localSessionEnabled) { + tokensNeeded = connThrottle.getRequiredTokensForLocal(); + } else { + tokensNeeded = connThrottle.getRequiredTokensForGlobal(); + } + } else { + tokensNeeded = connThrottle.getRequiredTokensForRenew(); + } + } + + if (!connThrottle.checkLimit(tokensNeeded)) { + throw new ClientCnxnLimitException(); + } + ServerMetrics.getMetrics().CONNECTION_TOKEN_DEFICIT.add(connThrottle.getDeficit()); + ServerMetrics.getMetrics().CONNECTION_REQUEST_COUNT.add(1); + boolean readOnly = false; try { readOnly = bia.readBool("readOnly"); @@ -1269,7 +1290,6 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { // We don't want to receive any packets until we are sure that the // session is setup cnxn.disableRecv(); - long sessionId = connReq.getSessionId(); if (sessionId == 0) { long id = createSession(cnxn, passwd, sessionTimeout); if (LOG.isDebugEnabled()) { diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderSessionTracker.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderSessionTracker.java index f4eb92c..5ab732f 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderSessionTracker.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderSessionTracker.java @@ -38,7 +38,6 @@ public class LeaderSessionTracker extends UpgradeableSessionTracker { private static final Logger LOG = LoggerFactory.getLogger(LeaderSessionTracker.class); - private final boolean localSessionsEnabled; private final SessionTrackerImpl globalSessionTracker; /** diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/UpgradeableSessionTracker.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/UpgradeableSessionTracker.java index 9edb4f2..bc25e5d 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/UpgradeableSessionTracker.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/UpgradeableSessionTracker.java @@ -55,6 +55,11 @@ public abstract class UpgradeableSessionTracker implements SessionTracker { return localSessionTracker != null && localSessionTracker.isTrackingSession(sessionId); } + @Override + public boolean isLocalSessionsEnabled() { + return localSessionsEnabled; + } + public boolean isUpgradingSession(long sessionId) { return upgradingSessions != null && upgradingSessions.containsKey(sessionId); } diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/BlueThrottleTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/BlueThrottleTest.java index c3d10bb..8b64c2b 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/BlueThrottleTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/BlueThrottleTest.java @@ -22,7 +22,12 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import java.util.Random; +import java.util.concurrent.TimeoutException; import org.apache.zookeeper.ZKTestCase; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.test.ClientBase; +import org.apache.zookeeper.test.QuorumUtil; +import org.junit.Assert; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,6 +35,7 @@ import org.slf4j.LoggerFactory; public class BlueThrottleTest extends ZKTestCase { private static final Logger LOG = LoggerFactory.getLogger(BlueThrottleTest.class); + private static final int RAPID_TIMEOUT = 10000; class MockRandom extends Random { @@ -162,4 +168,132 @@ public class BlueThrottleTest extends ZKTestCase { assertTrue("Later requests should have a chance", accepted > 0); } + private QuorumUtil quorumUtil = new QuorumUtil(1); + private ClientBase.CountdownWatcher[] watchers; + private ZooKeeper[] zks; + + private int connect(int n) throws Exception { + String connStr = quorumUtil.getConnectionStringForServer(1); + int connected = 0; + + zks = new ZooKeeper[n]; + watchers = new ClientBase.CountdownWatcher[n]; + for (int i = 0; i < n; i++){ + watchers[i] = new ClientBase.CountdownWatcher(); + zks[i] = new ZooKeeper(connStr, 3000, watchers[i]); + try { + watchers[i].waitForConnected(RAPID_TIMEOUT); + connected++; + } catch (TimeoutException e) { + LOG.info("Connection denied by the throttler due to insufficient tokens"); + break; + } + } + + return connected; + } + + private void shutdownQuorum() throws Exception{ + for (ZooKeeper zk : zks) { + if (zk != null) { + zk.close(); + } + } + + quorumUtil.shutdownAll(); + } + + @Test + public void testNoThrottling() throws Exception { + quorumUtil.startAll(); + + //disable throttling + quorumUtil.getPeer(1).peer.getActiveServer().connThrottle().setMaxTokens(0); + + int connected = connect(10); + + Assert.assertEquals(10, connected); + shutdownQuorum(); + } + + @Test + public void testThrottling() throws Exception { + quorumUtil.enableLocalSession(true); + quorumUtil.startAll(); + + quorumUtil.getPeer(1).peer.getActiveServer().connThrottle().setMaxTokens(2); + //no refill, makes testing easier + quorumUtil.getPeer(1).peer.getActiveServer().connThrottle().setFillCount(0); + + + int connected = connect(3); + Assert.assertEquals(2, connected); + shutdownQuorum(); + + quorumUtil.enableLocalSession(false); + quorumUtil.startAll(); + + quorumUtil.getPeer(1).peer.getActiveServer().connThrottle().setMaxTokens(2); + //no refill, makes testing easier + quorumUtil.getPeer(1).peer.getActiveServer().connThrottle().setFillCount(0); + + + connected = connect(3); + Assert.assertEquals(2, connected); + shutdownQuorum(); + } + + @Test + public void testWeighedThrottling() throws Exception { + // this test depends on the session weights set to the default values + // 3 for global session, 2 for renew sessions, 1 for local sessions + BlueThrottle.setConnectionWeightEnabled(true); + + quorumUtil.enableLocalSession(true); + quorumUtil.startAll(); + quorumUtil.getPeer(1).peer.getActiveServer().connThrottle().setMaxTokens(10); + quorumUtil.getPeer(1).peer.getActiveServer().connThrottle().setFillCount(0); + + //try to create 11 local sessions, 10 created, because we have only 10 tokens + int connected = connect(11); + Assert.assertEquals(10, connected); + shutdownQuorum(); + + quorumUtil.enableLocalSession(false); + quorumUtil.startAll(); + quorumUtil.getPeer(1).peer.getActiveServer().connThrottle().setMaxTokens(10); + quorumUtil.getPeer(1).peer.getActiveServer().connThrottle().setFillCount(0); + //tyr to create 11 global sessions, 3 created, because we have 10 tokens and each connection needs 3 + connected = connect(11); + Assert.assertEquals(3, connected); + shutdownQuorum(); + + quorumUtil.startAll(); + quorumUtil.getPeer(1).peer.getActiveServer().connThrottle().setMaxTokens(10); + quorumUtil.getPeer(1).peer.getActiveServer().connThrottle().setFillCount(0); + connected = connect(2); + Assert.assertEquals(2, connected); + + quorumUtil.shutdown(1); + watchers[0].waitForDisconnected(RAPID_TIMEOUT); + watchers[1].waitForDisconnected(RAPID_TIMEOUT); + + quorumUtil.restart(1); + //client will try to reconnect + quorumUtil.getPeer(1).peer.getActiveServer().connThrottle().setMaxTokens(3); + quorumUtil.getPeer(1).peer.getActiveServer().connThrottle().setFillCount(0); + int reconnected = 0; + for (int i = 0; i < 2; i++){ + try { + watchers[i].waitForConnected(RAPID_TIMEOUT); + reconnected++; + } catch (TimeoutException e) { + LOG.info("One reconnect fails due to insufficient tokens"); + } + } + //each reconnect takes two tokens, we have 3, so only one reconnects + LOG.info("reconnected {}", reconnected); + Assert.assertEquals(1, reconnected); + shutdownQuorum(); + } } diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/PrepRequestProcessorTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/PrepRequestProcessorTest.java index 8aacaac..9724423 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/PrepRequestProcessorTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/PrepRequestProcessorTest.java @@ -278,6 +278,10 @@ public class PrepRequestProcessorTest extends ClientBase { return 0; } + @Override + public boolean isLocalSessionsEnabled() { + return false; + } } }