Repository: zookeeper Updated Branches: refs/heads/master b0df8fe1e -> cd209456b
ZOOKEEPER-2926: Fix potential data consistency issue due to the session management bug Author: Fangmin Lyu <allen...@fb.com> Reviewers: Michael Han <h...@apache.org>, Andor Molnar <an...@cloudera.com> Closes #447 from lvfangmin/ZOOKEEPER-2926 Project: http://git-wip-us.apache.org/repos/asf/zookeeper/repo Commit: http://git-wip-us.apache.org/repos/asf/zookeeper/commit/cd209456 Tree: http://git-wip-us.apache.org/repos/asf/zookeeper/tree/cd209456 Diff: http://git-wip-us.apache.org/repos/asf/zookeeper/diff/cd209456 Branch: refs/heads/master Commit: cd209456b67cde5aba771b1a240ebe5607398459 Parents: b0df8fe Author: Fangmin Lyu <allen...@fb.com> Authored: Tue Aug 7 21:21:37 2018 -0700 Committer: Michael Han <h...@apache.org> Committed: Tue Aug 7 21:21:37 2018 -0700 ---------------------------------------------------------------------- .../zookeeper/server/PrepRequestProcessor.java | 71 ++--- .../org/apache/zookeeper/server/ServerCnxn.java | 2 +- .../apache/zookeeper/server/SessionTracker.java | 11 +- .../zookeeper/server/SessionTrackerImpl.java | 17 +- .../zookeeper/server/ZooKeeperServer.java | 9 +- .../server/quorum/LeaderSessionTracker.java | 54 ++-- .../server/quorum/LearnerSessionTracker.java | 54 ++-- .../server/quorum/LocalSessionTracker.java | 6 +- .../quorum/UpgradeableSessionTracker.java | 31 +- .../apache/zookeeper/server/MockServerCnxn.java | 4 +- .../server/PrepRequestProcessorTest.java | 8 +- .../zookeeper/server/SessionTrackerTest.java | 6 +- .../server/quorum/SessionUpgradeQuorumTest.java | 298 +++++++++++++++++++ .../org/apache/zookeeper/test/ClientBase.java | 10 +- .../org/apache/zookeeper/test/QuorumBase.java | 60 ++-- .../zookeeper/test/SessionTrackerCheckTest.java | 15 +- 16 files changed, 497 insertions(+), 159 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/zookeeper/blob/cd209456/src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java ---------------------------------------------------------------------- diff --git a/src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java b/src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java index b70ad18..eebc86b 100644 --- a/src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java +++ b/src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java @@ -436,7 +436,7 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements } zks.sessionTracker.checkSession(request.sessionId, request.getOwner()); - ReconfigRequest reconfigRequest = (ReconfigRequest)record; + ReconfigRequest reconfigRequest = (ReconfigRequest)record; LeaderZooKeeperServer lzks; try { lzks = (LeaderZooKeeperServer)zks; @@ -444,13 +444,13 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements // standalone mode - reconfiguration currently not supported throw new KeeperException.UnimplementedException(); } - QuorumVerifier lastSeenQV = lzks.self.getLastSeenQuorumVerifier(); + QuorumVerifier lastSeenQV = lzks.self.getLastSeenQuorumVerifier(); // check that there's no reconfig in progress if (lastSeenQV.getVersion()!=lzks.self.getQuorumVerifier().getVersion()) { - throw new KeeperException.ReconfigInProgress(); + throw new KeeperException.ReconfigInProgress(); } long configId = reconfigRequest.getCurConfigId(); - + if (configId != -1 && configId!=lzks.self.getLastSeenQuorumVerifier().getVersion()){ String msg = "Reconfiguration from version " + configId + " failed -- last seen version is " + lzks.self.getLastSeenQuorumVerifier().getVersion(); @@ -458,54 +458,54 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements } String newMembers = reconfigRequest.getNewMembers(); - - if (newMembers != null) { //non-incremental membership change + + if (newMembers != null) { //non-incremental membership change LOG.info("Non-incremental reconfig"); - + // Input may be delimited by either commas or newlines so convert to common newline separated format newMembers = newMembers.replaceAll(",", "\n"); - + try{ - Properties props = new Properties(); + Properties props = new Properties(); props.load(new StringReader(newMembers)); request.qv = QuorumPeerConfig.parseDynamicConfig(props, lzks.self.getElectionType(), true, false); request.qv.setVersion(request.getHdr().getZxid()); } catch (IOException | ConfigException e) { throw new KeeperException.BadArgumentsException(e.getMessage()); } - } else { //incremental change - must be a majority quorum system + } else { //incremental change - must be a majority quorum system LOG.info("Incremental reconfig"); - - List<String> joiningServers = null; + + List<String> joiningServers = null; String joiningServersString = reconfigRequest.getJoiningServers(); if (joiningServersString != null) { joiningServers = StringUtils.split(joiningServersString,","); } - + List<String> leavingServers = null; String leavingServersString = reconfigRequest.getLeavingServers(); if (leavingServersString != null) { leavingServers = StringUtils.split(leavingServersString, ","); } - + if (!(lastSeenQV instanceof QuorumMaj)) { String msg = "Incremental reconfiguration requested but last configuration seen has a non-majority quorum system"; LOG.warn(msg); - throw new KeeperException.BadArgumentsException(msg); + throw new KeeperException.BadArgumentsException(msg); } Map<Long, QuorumServer> nextServers = new HashMap<Long, QuorumServer>(lastSeenQV.getAllMembers()); - try { + try { if (leavingServers != null) { for (String leaving: leavingServers){ long sid = Long.parseLong(leaving); nextServers.remove(sid); - } + } } if (joiningServers != null) { for (String joiner: joiningServers){ - // joiner should have the following format: server.x = server_spec;client_spec + // joiner should have the following format: server.x = server_spec;client_spec String[] parts = StringUtils.split(joiner, "=").toArray(new String[0]); if (parts.length != 2) { throw new KeeperException.BadArgumentsException("Wrong format of server string"); @@ -514,7 +514,7 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Long sid = Long.parseLong(parts[0].substring(parts[0].lastIndexOf('.') + 1)); QuorumServer qs = new QuorumServer(sid, parts[1]); if (qs.clientAddr == null || qs.electionAddr == null || qs.addr == null) { - throw new KeeperException.BadArgumentsException("Wrong format of server string - each server should have 3 ports specified"); + throw new KeeperException.BadArgumentsException("Wrong format of server string - each server should have 3 ports specified"); } // check duplication of addresses and ports @@ -527,7 +527,7 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements nextServers.remove(qs.id); nextServers.put(qs.id, qs); - } + } } } catch (ConfigException e){ throw new KeeperException.BadArgumentsException("Reconfiguration failed"); @@ -543,21 +543,21 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements String msg = "Reconfig failed - new configuration must include at least 1 follower"; LOG.warn(msg); throw new KeeperException.BadArgumentsException(msg); - } - + } + if (!lzks.getLeader().isQuorumSynced(request.qv)) { String msg2 = "Reconfig failed - there must be a connected and synced quorum in new configuration"; - LOG.warn(msg2); + LOG.warn(msg2); throw new KeeperException.NewConfigNoQuorum(); } - - nodeRecord = getRecordForPath(ZooDefs.CONFIG_NODE); + + nodeRecord = getRecordForPath(ZooDefs.CONFIG_NODE); checkACL(zks, request.cnxn, nodeRecord.acl, ZooDefs.Perms.WRITE, request.authInfo, null, null); - request.setTxn(new SetDataTxn(ZooDefs.CONFIG_NODE, request.qv.toString().getBytes(), -1)); + request.setTxn(new SetDataTxn(ZooDefs.CONFIG_NODE, request.qv.toString().getBytes(), -1)); nodeRecord = nodeRecord.duplicate(request.getHdr().getZxid()); - nodeRecord.stat.setVersion(-1); + nodeRecord.stat.setVersion(-1); addChangeRecord(nodeRecord); - break; + break; case OpCode.setACL: zks.sessionTracker.checkSession(request.sessionId, request.getOwner()); SetACLRequest setAclRequest = (SetACLRequest)record; @@ -579,13 +579,8 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements int to = request.request.getInt(); request.setTxn(new CreateSessionTxn(to)); request.request.rewind(); - if (request.isLocalSession()) { - // This will add to local session tracker if it is enabled - zks.sessionTracker.addSession(request.sessionId, to); - } else { - // Explicitly add to global session if the flag is not set - zks.sessionTracker.addGlobalSession(request.sessionId, to); - } + // only add the global session tracker but not to ZKDb + zks.sessionTracker.trackSession(request.sessionId, to); zks.setOwner(request.sessionId, request.getOwner()); break; case OpCode.closeSession: @@ -759,7 +754,7 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements pRequest2Txn(request.type, zks.getNextZxid(), request, deleteRequest, true); break; case OpCode.setData: - SetDataRequest setDataRequest = new SetDataRequest(); + SetDataRequest setDataRequest = new SetDataRequest(); pRequest2Txn(request.type, zks.getNextZxid(), request, setDataRequest, true); break; case OpCode.reconfig: @@ -768,11 +763,11 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements pRequest2Txn(request.type, zks.getNextZxid(), request, reconfigRequest, true); break; case OpCode.setACL: - SetACLRequest setAclRequest = new SetACLRequest(); + SetACLRequest setAclRequest = new SetACLRequest(); pRequest2Txn(request.type, zks.getNextZxid(), request, setAclRequest, true); break; case OpCode.check: - CheckVersionRequest checkRequest = new CheckVersionRequest(); + CheckVersionRequest checkRequest = new CheckVersionRequest(); pRequest2Txn(request.type, zks.getNextZxid(), request, checkRequest, true); break; case OpCode.multi: http://git-wip-us.apache.org/repos/asf/zookeeper/blob/cd209456/src/java/main/org/apache/zookeeper/server/ServerCnxn.java ---------------------------------------------------------------------- diff --git a/src/java/main/org/apache/zookeeper/server/ServerCnxn.java b/src/java/main/org/apache/zookeeper/server/ServerCnxn.java index 35d6c55..cd43ee2 100644 --- a/src/java/main/org/apache/zookeeper/server/ServerCnxn.java +++ b/src/java/main/org/apache/zookeeper/server/ServerCnxn.java @@ -92,7 +92,7 @@ public abstract class ServerCnxn implements Stats, Watcher { } /* notify the client the session is closing and close/cleanup socket */ - abstract void sendCloseSession(); + public abstract void sendCloseSession(); public abstract void process(WatchedEvent event); http://git-wip-us.apache.org/repos/asf/zookeeper/blob/cd209456/src/java/main/org/apache/zookeeper/server/SessionTracker.java ---------------------------------------------------------------------- diff --git a/src/java/main/org/apache/zookeeper/server/SessionTracker.java b/src/java/main/org/apache/zookeeper/server/SessionTracker.java index bbf7df3..9ff7a7f 100644 --- a/src/java/main/org/apache/zookeeper/server/SessionTracker.java +++ b/src/java/main/org/apache/zookeeper/server/SessionTracker.java @@ -47,21 +47,20 @@ public interface SessionTracker { long createSession(int sessionTimeout); /** - * Add a global session to those being tracked. + * Track the session expire, not add to ZkDb. * @param id sessionId * @param to sessionTimeout - * @return whether the session was newly added (if false, already existed) + * @return whether the session was newly tracked (if false, already tracked) */ - boolean addGlobalSession(long id, int to); + boolean trackSession(long id, int to); /** - * Add a session to those being tracked. The session is added as a local - * session if they are enabled, otherwise as global. + * Add the session to the local session map or global one in zkDB. * @param id sessionId * @param to sessionTimeout * @return whether the session was newly added (if false, already existed) */ - boolean addSession(long id, int to); + boolean commitSession(long id, int to); /** * @param sessionId http://git-wip-us.apache.org/repos/asf/zookeeper/blob/cd209456/src/java/main/org/apache/zookeeper/server/SessionTrackerImpl.java ---------------------------------------------------------------------- diff --git a/src/java/main/org/apache/zookeeper/server/SessionTrackerImpl.java b/src/java/main/org/apache/zookeeper/server/SessionTrackerImpl.java index 6fc6eb5..e040493 100644 --- a/src/java/main/org/apache/zookeeper/server/SessionTrackerImpl.java +++ b/src/java/main/org/apache/zookeeper/server/SessionTrackerImpl.java @@ -102,7 +102,7 @@ public class SessionTrackerImpl extends ZooKeeperCriticalThread implements this.sessionsWithTimeout = sessionsWithTimeout; this.nextSessionId.set(initializeNextSession(serverId)); for (Entry<Long, Integer> e : sessionsWithTimeout.entrySet()) { - addSession(e.getKey(), e.getValue()); + trackSession(e.getKey(), e.getValue()); } EphemeralType.validateServerId(serverId); @@ -245,17 +245,12 @@ public class SessionTrackerImpl extends ZooKeeperCriticalThread implements public long createSession(int sessionTimeout) { long sessionId = nextSessionId.getAndIncrement(); - addSession(sessionId, sessionTimeout); + trackSession(sessionId, sessionTimeout); return sessionId; } - public boolean addGlobalSession(long id, int sessionTimeout) { - return addSession(id, sessionTimeout); - } - - public synchronized boolean addSession(long id, int sessionTimeout) { - sessionsWithTimeout.put(id, sessionTimeout); - + @Override + public synchronized boolean trackSession(long id, int sessionTimeout) { boolean added = false; SessionImpl session = sessionsById.get(id); @@ -285,6 +280,10 @@ public class SessionTrackerImpl extends ZooKeeperCriticalThread implements return added; } + public synchronized boolean commitSession(long id, int sessionTimeout) { + return sessionsWithTimeout.put(id, sessionTimeout) == null; + } + public boolean isTrackingSession(long sessionId) { return sessionsById.containsKey(sessionId); } http://git-wip-us.apache.org/repos/asf/zookeeper/blob/cd209456/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java ---------------------------------------------------------------------- diff --git a/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java b/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java index 3d98f8e..64f242f 100644 --- a/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java +++ b/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java @@ -1217,13 +1217,8 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { if (opCode == OpCode.createSession) { if (hdr != null && txn instanceof CreateSessionTxn) { CreateSessionTxn cst = (CreateSessionTxn) txn; - sessionTracker.addGlobalSession(sessionId, cst.getTimeOut()); - } else if (request != null && request.isLocalSession()) { - request.request.rewind(); - int timeout = request.request.getInt(); - request.request.rewind(); - sessionTracker.addSession(request.sessionId, timeout); - } else { + sessionTracker.commitSession(sessionId, cst.getTimeOut()); + } else if (request == null || !request.isLocalSession()) { LOG.warn("*****>>>>> Got " + txn.getClass() + " " + txn.toString()); http://git-wip-us.apache.org/repos/asf/zookeeper/blob/cd209456/src/java/main/org/apache/zookeeper/server/quorum/LeaderSessionTracker.java ---------------------------------------------------------------------- diff --git a/src/java/main/org/apache/zookeeper/server/quorum/LeaderSessionTracker.java b/src/java/main/org/apache/zookeeper/server/quorum/LeaderSessionTracker.java index 38bbfe8..e79207b 100644 --- a/src/java/main/org/apache/zookeeper/server/quorum/LeaderSessionTracker.java +++ b/src/java/main/org/apache/zookeeper/server/quorum/LeaderSessionTracker.java @@ -85,31 +85,47 @@ public class LeaderSessionTracker extends UpgradeableSessionTracker { return globalSessionTracker.isTrackingSession(sessionId); } - public boolean addGlobalSession(long sessionId, int sessionTimeout) { - boolean added = - globalSessionTracker.addSession(sessionId, sessionTimeout); - if (localSessionsEnabled && added) { + public boolean trackSession(long sessionId, int sessionTimeout) { + boolean tracked = + globalSessionTracker.trackSession(sessionId, sessionTimeout); + if (localSessionsEnabled && tracked) { // Only do extra logging so we know what kind of session this is // if we're supporting both kinds of sessions - LOG.info("Adding global session 0x" + Long.toHexString(sessionId)); + LOG.info("Tracking global session 0x" + Long.toHexString(sessionId)); } - return added; + return tracked; } - public boolean addSession(long sessionId, int sessionTimeout) { - boolean added; - if (localSessionsEnabled && !isGlobalSession(sessionId)) { - added = localSessionTracker.addSession(sessionId, sessionTimeout); - // Check for race condition with session upgrading - if (isGlobalSession(sessionId)) { - added = false; - localSessionTracker.removeSession(sessionId); - } else if (added) { - LOG.info("Adding local session 0x" + Long.toHexString(sessionId)); - } - } else { - added = addGlobalSession(sessionId, sessionTimeout); + /** + * Synchronized on this to avoid race condition of adding a local session + * after committed global session, which may cause the same session being + * tracked on this server and leader. + */ + public synchronized boolean commitSession( + long sessionId, int sessionTimeout) { + boolean added = + globalSessionTracker.commitSession(sessionId, sessionTimeout); + + if (added) { + LOG.info("Committing global session 0x" + Long.toHexString(sessionId)); + } + + // If the session moved before the session upgrade finished, it's + // possible that the session will be added to the local session + // again. Need to double check and remove it from local session + // tracker when the global session is quorum committed, otherwise the + // local session might be tracked both locally and on leader. + // + // This cannot totally avoid the local session being upgraded again + // because there is still race condition between create another upgrade + // request and process the createSession commit, and there is no way + // to know there is a on flying createSession request because it might + // be upgraded by other server which owns the session before move. + if (localSessionsEnabled) { + removeLocalSession(sessionId); + finishedUpgrading(sessionId); } + return added; } http://git-wip-us.apache.org/repos/asf/zookeeper/blob/cd209456/src/java/main/org/apache/zookeeper/server/quorum/LearnerSessionTracker.java ---------------------------------------------------------------------- diff --git a/src/java/main/org/apache/zookeeper/server/quorum/LearnerSessionTracker.java b/src/java/main/org/apache/zookeeper/server/quorum/LearnerSessionTracker.java index 1cc2ab1..5420494 100644 --- a/src/java/main/org/apache/zookeeper/server/quorum/LearnerSessionTracker.java +++ b/src/java/main/org/apache/zookeeper/server/quorum/LearnerSessionTracker.java @@ -58,7 +58,6 @@ public class LearnerSessionTracker extends UpgradeableSessionTracker { private final long serverId; private final AtomicLong nextSessionId = new AtomicLong(); - private final boolean localSessionsEnabled; private final ConcurrentMap<Long, Integer> globalSessionsWithTimeouts; public LearnerSessionTracker(SessionExpirer expirer, @@ -101,33 +100,44 @@ public class LearnerSessionTracker extends UpgradeableSessionTracker { return globalSessionsWithTimeouts.containsKey(sessionId); } - public boolean addGlobalSession(long sessionId, int sessionTimeout) { + public boolean trackSession(long sessionId, int sessionTimeout) { + // Learner doesn't track global session, do nothing here + return false; + } + + /** + * Synchronized on this to avoid race condition of adding a local session + * after committed global session, which may cause the same session being + * tracked on this server and leader. + */ + public synchronized boolean commitSession( + long sessionId, int sessionTimeout) { boolean added = globalSessionsWithTimeouts.put(sessionId, sessionTimeout) == null; - if (localSessionsEnabled && added) { + + if (added) { // Only do extra logging so we know what kind of session this is // if we're supporting both kinds of sessions - LOG.info("Adding global session 0x" + Long.toHexString(sessionId)); + LOG.info("Committing global session 0x" + Long.toHexString(sessionId)); } - touchTable.get().put(sessionId, sessionTimeout); - return added; - } - public boolean addSession(long sessionId, int sessionTimeout) { - boolean added; - if (localSessionsEnabled && !isGlobalSession(sessionId)) { - added = localSessionTracker.addSession(sessionId, sessionTimeout); - // Check for race condition with session upgrading - if (isGlobalSession(sessionId)) { - added = false; - localSessionTracker.removeSession(sessionId); - } else if (added) { - LOG.info("Adding local session 0x" - + Long.toHexString(sessionId)); - } - } else { - added = addGlobalSession(sessionId, sessionTimeout); + // If the session moved before the session upgrade finished, it's + // possible that the session will be added to the local session + // again. Need to double check and remove it from local session + // tracker when the global session is quorum committed, otherwise the + // local session might be tracked both locally and on leader. + // + // This cannot totally avoid the local session being upgraded again + // because there is still race condition between create another upgrade + // request and process the createSession commit, and there is no way + // to know there is a on flying createSession request because it might + // be upgraded by other server which owns the session before move. + if (localSessionsEnabled) { + removeLocalSession(sessionId); + finishedUpgrading(sessionId); } + + touchTable.get().put(sessionId, sessionTimeout); return added; } @@ -136,7 +146,7 @@ public class LearnerSessionTracker extends UpgradeableSessionTracker { if (localSessionTracker.touchSession(sessionId, sessionTimeout)) { return true; } - if (!isGlobalSession(sessionId)) { + if (!isGlobalSession(sessionId) && !isUpgradingSession(sessionId)) { return false; } } http://git-wip-us.apache.org/repos/asf/zookeeper/blob/cd209456/src/java/main/org/apache/zookeeper/server/quorum/LocalSessionTracker.java ---------------------------------------------------------------------- diff --git a/src/java/main/org/apache/zookeeper/server/quorum/LocalSessionTracker.java b/src/java/main/org/apache/zookeeper/server/quorum/LocalSessionTracker.java index df6ccb2..523c440 100644 --- a/src/java/main/org/apache/zookeeper/server/quorum/LocalSessionTracker.java +++ b/src/java/main/org/apache/zookeeper/server/quorum/LocalSessionTracker.java @@ -40,7 +40,9 @@ public class LocalSessionTracker extends SessionTrackerImpl { return false; } - public boolean addGlobalSession(long sessionId, int sessionTimeout) { - throw new UnsupportedOperationException(); + public long createSession(int sessionTimeout) { + long sessionId = super.createSession(sessionTimeout); + commitSession(sessionId, sessionTimeout); + return sessionId; } } http://git-wip-us.apache.org/repos/asf/zookeeper/blob/cd209456/src/java/main/org/apache/zookeeper/server/quorum/UpgradeableSessionTracker.java ---------------------------------------------------------------------- diff --git a/src/java/main/org/apache/zookeeper/server/quorum/UpgradeableSessionTracker.java b/src/java/main/org/apache/zookeeper/server/quorum/UpgradeableSessionTracker.java index 2e58ff5..eb50a07 100644 --- a/src/java/main/org/apache/zookeeper/server/quorum/UpgradeableSessionTracker.java +++ b/src/java/main/org/apache/zookeeper/server/quorum/UpgradeableSessionTracker.java @@ -33,7 +33,9 @@ public abstract class UpgradeableSessionTracker implements SessionTracker { private static final Logger LOG = LoggerFactory.getLogger(UpgradeableSessionTracker.class); private ConcurrentMap<Long, Integer> localSessionsWithTimeouts; + private ConcurrentMap<Long, Integer> upgradingSessions; protected LocalSessionTracker localSessionTracker; + protected boolean localSessionsEnabled; public void start() {} @@ -43,6 +45,7 @@ public abstract class UpgradeableSessionTracker implements SessionTracker { new ConcurrentHashMap<Long, Integer>(); this.localSessionTracker = new LocalSessionTracker( expirer, this.localSessionsWithTimeouts, tickTime, id, listener); + this.upgradingSessions = new ConcurrentHashMap<Long, Integer>(); } public boolean isTrackingSession(long sessionId) { @@ -54,6 +57,17 @@ public abstract class UpgradeableSessionTracker implements SessionTracker { localSessionTracker.isTrackingSession(sessionId); } + public boolean isUpgradingSession(long sessionId) { + return upgradingSessions != null && + upgradingSessions.containsKey(sessionId); + } + + public void finishedUpgrading(long sessionId) { + if (upgradingSessions != null) { + upgradingSessions.remove(sessionId); + } + } + abstract public boolean isGlobalSession(long sessionId); /** @@ -74,14 +88,27 @@ public abstract class UpgradeableSessionTracker implements SessionTracker { Integer timeout = localSessionsWithTimeouts.remove(sessionId); if (timeout != null) { LOG.info("Upgrading session 0x" + Long.toHexString(sessionId)); - // Add as global before removing as local - addGlobalSession(sessionId, timeout); + // Track global session, which will add to global session tracker + // on leader and do nothing on learner. Need to start track global + // session in leader now to update the session expire between + // LeaderRequestProcessor and PrepRequestProcessor. + trackSession(sessionId, timeout); + // Track ongoing upgrading sessions, learner will use it to find + // other sessions it has which are not in local and global sessions + upgradingSessions.put(sessionId, timeout); localSessionTracker.removeSession(sessionId); return timeout; } return -1; } + protected void removeLocalSession(long sessionId) { + if (localSessionTracker == null) { + return; + } + localSessionTracker.removeSession(sessionId); + } + public void checkGlobalSession(long sessionId, Object owner) throws KeeperException.SessionExpiredException, KeeperException.SessionMovedException { http://git-wip-us.apache.org/repos/asf/zookeeper/blob/cd209456/src/java/test/org/apache/zookeeper/server/MockServerCnxn.java ---------------------------------------------------------------------- diff --git a/src/java/test/org/apache/zookeeper/server/MockServerCnxn.java b/src/java/test/org/apache/zookeeper/server/MockServerCnxn.java index 2e37272..7ae0004 100644 --- a/src/java/test/org/apache/zookeeper/server/MockServerCnxn.java +++ b/src/java/test/org/apache/zookeeper/server/MockServerCnxn.java @@ -44,7 +44,7 @@ public class MockServerCnxn extends ServerCnxn { } @Override - void sendCloseSession() { + public void sendCloseSession() { } @Override @@ -110,4 +110,4 @@ public class MockServerCnxn extends ServerCnxn { public int getInterestOps() { return 0; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/zookeeper/blob/cd209456/src/java/test/org/apache/zookeeper/server/PrepRequestProcessorTest.java ---------------------------------------------------------------------- diff --git a/src/java/test/org/apache/zookeeper/server/PrepRequestProcessorTest.java b/src/java/test/org/apache/zookeeper/server/PrepRequestProcessorTest.java index 8223583..606994c 100644 --- a/src/java/test/org/apache/zookeeper/server/PrepRequestProcessorTest.java +++ b/src/java/test/org/apache/zookeeper/server/PrepRequestProcessorTest.java @@ -208,18 +208,18 @@ public class PrepRequestProcessorTest extends ClientBase { @Override public void shutdown() { // TODO Auto-generated method stub - + } } - + private class MySessionTracker implements SessionTracker { @Override - public boolean addGlobalSession(long id, int to) { + public boolean trackSession(long id, int to) { // TODO Auto-generated method stub return false; } @Override - public boolean addSession(long id, int to) { + public boolean commitSession(long id, int to) { // TODO Auto-generated method stub return false; } http://git-wip-us.apache.org/repos/asf/zookeeper/blob/cd209456/src/java/test/org/apache/zookeeper/server/SessionTrackerTest.java ---------------------------------------------------------------------- diff --git a/src/java/test/org/apache/zookeeper/server/SessionTrackerTest.java b/src/java/test/org/apache/zookeeper/server/SessionTrackerTest.java index 00e34fa..abe9aa0 100644 --- a/src/java/test/org/apache/zookeeper/server/SessionTrackerTest.java +++ b/src/java/test/org/apache/zookeeper/server/SessionTrackerTest.java @@ -51,7 +51,7 @@ public class SessionTrackerTest extends ZKTestCase { ZooKeeperServer zks = setupSessionTracker(); latch = new CountDownLatch(1); - zks.sessionTracker.addSession(sessionId, sessionTimeout); + zks.sessionTracker.trackSession(sessionId, sessionTimeout); SessionTrackerImpl sessionTrackerImpl = (SessionTrackerImpl) zks.sessionTracker; SessionImpl sessionImpl = sessionTrackerImpl.sessionsById .get(sessionId); @@ -68,7 +68,7 @@ public class SessionTrackerTest extends ZKTestCase { // Simulating FinalRequestProcessor logic: create session request has // delayed and now reaches FinalRequestProcessor. Here the leader zk // will do sessionTracker.addSession(id, timeout) - sessionTrackerImpl.addSession(sessionId, sessionTimeout); + sessionTrackerImpl.trackSession(sessionId, sessionTimeout); try { sessionTrackerImpl.checkSession(sessionId, sessionOwner); Assert.fail("Should throw session expiry exception " @@ -93,7 +93,7 @@ public class SessionTrackerTest extends ZKTestCase { ZooKeeperServer zks = setupSessionTracker(); latch = new CountDownLatch(1); - zks.sessionTracker.addSession(sessionId, sessionTimeout); + zks.sessionTracker.trackSession(sessionId, sessionTimeout); SessionTrackerImpl sessionTrackerImpl = (SessionTrackerImpl) zks.sessionTracker; SessionImpl sessionImpl = sessionTrackerImpl.sessionsById .get(sessionId); http://git-wip-us.apache.org/repos/asf/zookeeper/blob/cd209456/src/java/test/org/apache/zookeeper/server/quorum/SessionUpgradeQuorumTest.java ---------------------------------------------------------------------- diff --git a/src/java/test/org/apache/zookeeper/server/quorum/SessionUpgradeQuorumTest.java b/src/java/test/org/apache/zookeeper/server/quorum/SessionUpgradeQuorumTest.java new file mode 100644 index 0000000..8e345fe --- /dev/null +++ b/src/java/test/org/apache/zookeeper/server/quorum/SessionUpgradeQuorumTest.java @@ -0,0 +1,298 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.quorum; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.concurrent.ConcurrentHashMap; +import javax.security.sasl.SaslException; + +import org.apache.jute.BinaryOutputArchive; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.PortAssignment; +import org.apache.zookeeper.ZKTestCase; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.ZooKeeper.States; +import org.apache.zookeeper.data.Id; +import org.apache.zookeeper.proto.CreateRequest; +import org.apache.zookeeper.test.ClientBase; +import org.apache.zookeeper.test.ClientBase.CountdownWatcher; +import org.apache.zookeeper.server.persistence.FileTxnSnapLog; +import org.apache.zookeeper.server.ByteBufferInputStream; +import org.apache.zookeeper.server.Request; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.zookeeper.test.QuorumBase; +import org.apache.zookeeper.test.DisconnectableZooKeeper; + +public class SessionUpgradeQuorumTest extends QuorumPeerTestBase { + protected static final Logger LOG = LoggerFactory.getLogger(SessionUpgradeQuorumTest.class); + public static final int CONNECTION_TIMEOUT = ClientBase.CONNECTION_TIMEOUT; + + public static final int SERVER_COUNT = 3; + private MainThread mt[]; + private int clientPorts[]; + private TestQPMainDropSessionUpgrading qpMain[]; + + @Before + public void setUp() throws Exception { + LOG.info("STARTING quorum " + getClass().getName()); + // setup the env with RetainDB and local session upgrading + ClientBase.setupTestEnv(); + + mt = new MainThread[SERVER_COUNT]; + clientPorts = new int[SERVER_COUNT]; + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < SERVER_COUNT; i++) { + clientPorts[i] = PortAssignment.unique(); + sb.append("server.").append(i).append("=127.0.0.1:") + .append(PortAssignment.unique()).append(":") + .append(PortAssignment.unique()).append("\n"); + } + sb.append("localSessionsEnabled=true\n"); + sb.append("localSessionsUpgradingEnabled=true\n"); + String cfg = sb.toString(); + + // create a 3 server ensemble + qpMain = new TestQPMainDropSessionUpgrading[SERVER_COUNT]; + for (int i = 0; i < SERVER_COUNT; i++) { + final TestQPMainDropSessionUpgrading qp = new TestQPMainDropSessionUpgrading(); + qpMain[i] = qp; + mt[i] = new MainThread(i, clientPorts[i], cfg, false) { + @Override + public TestQPMain getTestQPMain() { + return qp; + } + }; + mt[i].start(); + } + + for (int i = 0; i < SERVER_COUNT; i++) { + Assert.assertTrue("waiting for server " + i + " being up", + ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[i], + CONNECTION_TIMEOUT)); + } + } + + @After + public void tearDown() throws Exception { + LOG.info("STOPPING quorum " + getClass().getName()); + for (int i = 0; i < SERVER_COUNT; i++) { + mt[i].shutdown(); + } + } + + @Test + public void testLocalSessionUpgradeSnapshot() throws IOException, InterruptedException { + // select the candidate of follower + int leader = -1; + int followerA = -1; + for (int i = SERVER_COUNT - 1; i >= 0; i--) { + if (mt[i].main.quorumPeer.leader != null) { + leader = i; + } else if (followerA == -1) { + followerA = i; + } + } + + LOG.info("follower A is {}", followerA); + qpMain[followerA].setDropCreateSession(true); + + // create a client, and create an ephemeral node to trigger the + // upgrading process + final String node = "/node-1"; + ZooKeeper zk = new ZooKeeper("127.0.0.1:" + clientPorts[followerA], + ClientBase.CONNECTION_TIMEOUT, this); + + waitForOne(zk, States.CONNECTED); + + // clone the session id and passwd for later usage + long sessionId = zk.getSessionId(); + + // should fail because of the injection + try { + zk.create(node, new byte[2], ZooDefs.Ids.OPEN_ACL_UNSAFE, + CreateMode.EPHEMERAL); + Assert.fail("expect to failed to upgrade session due to the " + + "TestQPMainDropSessionUpgrading is being used"); + } catch (KeeperException e) { + LOG.info("KeeperException when create ephemeral node, {}", e); + } + + // force to take snapshot + qpMain[followerA].quorumPeer.follower.zk.takeSnapshot(true); + + // wait snapshot finish + Thread.sleep(500); + + // shutdown all servers + for (int i = 0; i < SERVER_COUNT; i++) { + mt[i].shutdown(); + } + + ArrayList<States> waitStates =new ArrayList<States>(); + waitStates.add(States.CONNECTING); + waitStates.add(States.CLOSED); + waitForOne(zk, waitStates); + + // start the servers again, start follower A last as we want to + // keep it running as follower + for (int i = 0; i < SERVER_COUNT; i++) { + mt[i].start(); + } + + for (int i = 0; i < SERVER_COUNT; i++) { + Assert.assertTrue("waiting for server " + i + " being up", + ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[i], + CONNECTION_TIMEOUT)); + } + + // check global session not exist on follower A + for (int i = 0; i < SERVER_COUNT; i++) { + ConcurrentHashMap<Long, Integer> sessions = + mt[i].main.quorumPeer.getZkDb().getSessionWithTimeOuts(); + Assert.assertFalse("server " + i + " should not have global " + + "session " + sessionId, sessions.containsKey(sessionId)); + } + + zk.close(); + } + + @Test + public void testOnlyUpgradeSessionOnce() + throws IOException, InterruptedException, KeeperException { + // create a client, and create an ephemeral node to trigger the + // upgrading process + final String node = "/node-1"; + ZooKeeper zk = new ZooKeeper("127.0.0.1:" + clientPorts[0], + ClientBase.CONNECTION_TIMEOUT, this); + + waitForOne(zk, States.CONNECTED); + long sessionId = zk.getSessionId(); + + QuorumZooKeeperServer server = + (QuorumZooKeeperServer) mt[0].main.quorumPeer.getActiveServer(); + Request create1 = createEphemeralRequest("/data-1", sessionId); + Request create2 = createEphemeralRequest("/data-2", sessionId); + + Assert.assertNotNull("failed to upgrade on a ephemeral create", + server.checkUpgradeSession(create1)); + Assert.assertNull("tried to upgrade again", server.checkUpgradeSession(create2)); + + // clean al the setups and close the zk + zk.close(); + } + + private static class TestQPMainDropSessionUpgrading extends TestQPMain { + + private volatile boolean shouldDrop = false; + + public void setDropCreateSession(boolean dropCreateSession) { + shouldDrop = dropCreateSession; + } + + @Override + protected QuorumPeer getQuorumPeer() throws SaslException { + return new QuorumPeer() { + + @Override + protected Follower makeFollower(FileTxnSnapLog logFactory) + throws IOException { + + return new Follower(this, new FollowerZooKeeperServer( + logFactory, this, this.getZkDb())) { + + @Override + protected void request(Request request) + throws IOException { + if (!shouldDrop) { + super.request(request); + return; + } + LOG.info("request is {}, cnxn {}", request.type, request.cnxn); + + if (request.type == ZooDefs.OpCode.createSession) { + LOG.info("drop createSession request {}", request); + return; + } + + if (request.type == ZooDefs.OpCode.create && + request.cnxn != null) { + CreateRequest createRequest = new CreateRequest(); + request.request.rewind(); + ByteBufferInputStream.byteBuffer2Record( + request.request, createRequest); + request.request.rewind(); + try { + CreateMode createMode = + CreateMode.fromFlag(createRequest.getFlags()); + if (createMode.isEphemeral()) { + request.cnxn.sendCloseSession(); + } + } catch (KeeperException e) {} + return; + } + + super.request(request); + } + }; + } + }; + } + } + + private void waitForOne(ZooKeeper zk, States state) throws InterruptedException { + ArrayList<States> states = new ArrayList<States>(); + states.add(state); + waitForOne(zk, states); + } + + private void waitForOne(ZooKeeper zk, ArrayList<States> states) throws InterruptedException { + int iterations = ClientBase.CONNECTION_TIMEOUT / 500; + while (!states.contains(zk.getState())) { + if (iterations-- == 0) { + LOG.info("state is {}", zk.getState()); + throw new RuntimeException("Waiting too long"); + } + Thread.sleep(500); + } + } + + private Request createEphemeralRequest(String path, long sessionId) throws IOException { + ByteArrayOutputStream boas = new ByteArrayOutputStream(); + BinaryOutputArchive boa = BinaryOutputArchive.getArchive(boas); + CreateRequest createRequest = new CreateRequest(path, + "data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, + CreateMode.EPHEMERAL.toFlag()); + createRequest.serialize(boa, "request"); + ByteBuffer bb = ByteBuffer.wrap(boas.toByteArray()); + return new Request(null, sessionId, 1, ZooDefs.OpCode.create2, bb, + new ArrayList<Id>()); + } + +} http://git-wip-us.apache.org/repos/asf/zookeeper/blob/cd209456/src/java/test/org/apache/zookeeper/test/ClientBase.java ---------------------------------------------------------------------- diff --git a/src/java/test/org/apache/zookeeper/test/ClientBase.java b/src/java/test/org/apache/zookeeper/test/ClientBase.java index 7d2ec56..a550098 100644 --- a/src/java/test/org/apache/zookeeper/test/ClientBase.java +++ b/src/java/test/org/apache/zookeeper/test/ClientBase.java @@ -432,9 +432,9 @@ public abstract class ClientBase extends ZKTestCase { * Because any exception on starting the server would leave the server * running and the caller would not be able to shutdown the instance. This * may affect other test cases. - * + * * @return newly created server instance - * + * * @see <a * href="https://issues.apache.org/jira/browse/ZOOKEEPER-1852">ZOOKEEPER-1852</a> * for more information. @@ -508,7 +508,7 @@ public abstract class ClientBase extends ZKTestCase { */ OSMXBean osMbean = new OSMXBean(); if (osMbean.getUnix() == true) { - initialFdCount = osMbean.getOpenFileDescriptorCount(); + initialFdCount = osMbean.getOpenFileDescriptorCount(); LOG.info("Initial fdcount is: " + initialFdCount); } @@ -568,7 +568,7 @@ public abstract class ClientBase extends ZKTestCase { /** * Returns a string representation of the given long value session id - * + * * @param sessionId * long value of session id * @return string representation of session id @@ -630,7 +630,7 @@ public abstract class ClientBase extends ZKTestCase { */ OSMXBean osMbean = new OSMXBean(); if (osMbean.getUnix() == true) { - long fdCount = osMbean.getOpenFileDescriptorCount(); + long fdCount = osMbean.getOpenFileDescriptorCount(); String message = "fdcount after test is: " + fdCount + " at start it was " + initialFdCount; LOG.info(message); http://git-wip-us.apache.org/repos/asf/zookeeper/blob/cd209456/src/java/test/org/apache/zookeeper/test/QuorumBase.java ---------------------------------------------------------------------- diff --git a/src/java/test/org/apache/zookeeper/test/QuorumBase.java b/src/java/test/org/apache/zookeeper/test/QuorumBase.java index fb2bb87..fcaa9b6 100644 --- a/src/java/test/org/apache/zookeeper/test/QuorumBase.java +++ b/src/java/test/org/apache/zookeeper/test/QuorumBase.java @@ -53,13 +53,13 @@ public class QuorumBase extends ClientBase { protected int port3; protected int port4; protected int port5; - + protected int portLE1; protected int portLE2; protected int portLE3; protected int portLE4; protected int portLE5; - + protected int portClient1; protected int portClient2; protected int portClient3; @@ -73,12 +73,12 @@ public class QuorumBase extends ClientBase { // This just avoids complaints by junit public void testNull() { } - + @Override public void setUp() throws Exception { setUp(false); } - + protected void setUp(boolean withObservers) throws Exception { LOG.info("QuorumBase.setup " + getTestName()); setupTestEnv(); @@ -92,19 +92,19 @@ public class QuorumBase extends ClientBase { port3 = PortAssignment.unique(); port4 = PortAssignment.unique(); port5 = PortAssignment.unique(); - + portLE1 = PortAssignment.unique(); portLE2 = PortAssignment.unique(); portLE3 = PortAssignment.unique(); portLE4 = PortAssignment.unique(); portLE5 = PortAssignment.unique(); - + portClient1 = PortAssignment.unique(); portClient2 = PortAssignment.unique(); portClient3 = PortAssignment.unique(); portClient4 = PortAssignment.unique(); portClient5 = PortAssignment.unique(); - + hostPort = "127.0.0.1:" + portClient1 + ",127.0.0.1:" + portClient2 + ",127.0.0.1:" + portClient3 @@ -128,44 +128,44 @@ public class QuorumBase extends ClientBase { LOG.info("Setup finished"); } - + void startServers() throws Exception { - startServers(false); + startServers(false); } - + void startServers(boolean withObservers) throws Exception { int tickTime = 2000; int initLimit = 3; int syncLimit = 3; Map<Long,QuorumServer> peers = new HashMap<Long,QuorumServer>(); - peers.put(Long.valueOf(1), new QuorumServer(1, + peers.put(Long.valueOf(1), new QuorumServer(1, new InetSocketAddress(LOCALADDR, port1), new InetSocketAddress(LOCALADDR, portLE1), new InetSocketAddress(LOCALADDR, portClient1), LearnerType.PARTICIPANT)); - peers.put(Long.valueOf(2), new QuorumServer(2, + peers.put(Long.valueOf(2), new QuorumServer(2, new InetSocketAddress(LOCALADDR, port2), new InetSocketAddress(LOCALADDR, portLE2), new InetSocketAddress(LOCALADDR, portClient2), LearnerType.PARTICIPANT)); - peers.put(Long.valueOf(3), new QuorumServer(3, + peers.put(Long.valueOf(3), new QuorumServer(3, new InetSocketAddress(LOCALADDR, port3), new InetSocketAddress(LOCALADDR, portLE3), new InetSocketAddress(LOCALADDR, portClient3), LearnerType.PARTICIPANT)); - peers.put(Long.valueOf(4), new QuorumServer(4, + peers.put(Long.valueOf(4), new QuorumServer(4, new InetSocketAddress(LOCALADDR, port4), new InetSocketAddress(LOCALADDR, portLE4), new InetSocketAddress(LOCALADDR, portClient4), LearnerType.PARTICIPANT)); - peers.put(Long.valueOf(5), new QuorumServer(5, + peers.put(Long.valueOf(5), new QuorumServer(5, new InetSocketAddress(LOCALADDR, port5), new InetSocketAddress(LOCALADDR, portLE5), new InetSocketAddress(LOCALADDR, portClient5), LearnerType.PARTICIPANT)); - + if (withObservers) { - peers.get(Long.valueOf(4)).type = LearnerType.OBSERVER; + peers.get(Long.valueOf(4)).type = LearnerType.OBSERVER; peers.get(Long.valueOf(5)).type = LearnerType.OBSERVER; } @@ -184,18 +184,18 @@ public class QuorumBase extends ClientBase { LOG.info("creating QuorumPeer 5 port " + portClient5); s5 = new QuorumPeer(peers, s5dir, s5dir, portClient5, 3, 5, tickTime, initLimit, syncLimit); Assert.assertEquals(portClient5, s5.getClientPort()); - + if (withObservers) { s4.setLearnerType(LearnerType.OBSERVER); s5.setLearnerType(LearnerType.OBSERVER); } - + LOG.info("QuorumPeer 1 voting view: " + s1.getVotingView()); LOG.info("QuorumPeer 2 voting view: " + s2.getVotingView()); LOG.info("QuorumPeer 3 voting view: " + s3.getVotingView()); LOG.info("QuorumPeer 4 voting view: " + s4.getVotingView()); - LOG.info("QuorumPeer 5 voting view: " + s5.getVotingView()); - + LOG.info("QuorumPeer 5 voting view: " + s5.getVotingView()); + s1.enableLocalSessions(localSessionsEnabled); s2.enableLocalSessions(localSessionsEnabled); s3.enableLocalSessions(localSessionsEnabled); @@ -299,37 +299,37 @@ public class QuorumBase extends ClientBase { int tickTime = 2000; int initLimit = 3; int syncLimit = 3; - + if(peers == null){ peers = new HashMap<Long,QuorumServer>(); - peers.put(Long.valueOf(1), new QuorumServer(1, + peers.put(Long.valueOf(1), new QuorumServer(1, new InetSocketAddress(LOCALADDR, port1), new InetSocketAddress(LOCALADDR, portLE1), new InetSocketAddress(LOCALADDR, portClient1), LearnerType.PARTICIPANT)); - peers.put(Long.valueOf(2), new QuorumServer(2, + peers.put(Long.valueOf(2), new QuorumServer(2, new InetSocketAddress(LOCALADDR, port2), new InetSocketAddress(LOCALADDR, portLE2), new InetSocketAddress(LOCALADDR, portClient2), LearnerType.PARTICIPANT)); - peers.put(Long.valueOf(3), new QuorumServer(3, + peers.put(Long.valueOf(3), new QuorumServer(3, new InetSocketAddress(LOCALADDR, port3), new InetSocketAddress(LOCALADDR, portLE3), new InetSocketAddress(LOCALADDR, portClient3), LearnerType.PARTICIPANT)); - peers.put(Long.valueOf(4), new QuorumServer(4, + peers.put(Long.valueOf(4), new QuorumServer(4, new InetSocketAddress(LOCALADDR, port4), new InetSocketAddress(LOCALADDR, portLE4), new InetSocketAddress(LOCALADDR, portClient4), LearnerType.PARTICIPANT)); - peers.put(Long.valueOf(5), new QuorumServer(5, + peers.put(Long.valueOf(5), new QuorumServer(5, new InetSocketAddress(LOCALADDR, port5), new InetSocketAddress(LOCALADDR, portLE5), new InetSocketAddress(LOCALADDR, portClient5), LearnerType.PARTICIPANT)); } - + switch(i){ case 1: LOG.info("creating QuorumPeer 1 port " + portClient1); @@ -341,7 +341,7 @@ public class QuorumBase extends ClientBase { s2 = new QuorumPeer(peers, s2dir, s2dir, portClient2, 3, 2, tickTime, initLimit, syncLimit); Assert.assertEquals(portClient2, s2.getClientPort()); break; - case 3: + case 3: LOG.info("creating QuorumPeer 3 port " + portClient3); s3 = new QuorumPeer(peers, s3dir, s3dir, portClient3, 3, 3, tickTime, initLimit, syncLimit); Assert.assertEquals(portClient3, s3.getClientPort()); @@ -361,7 +361,7 @@ public class QuorumBase extends ClientBase { @Override public void tearDown() throws Exception { LOG.info("TearDown started"); - + OSMXBean osMbean = new OSMXBean(); if (osMbean.getUnix() == true) { LOG.info("fdcount after test is: " http://git-wip-us.apache.org/repos/asf/zookeeper/blob/cd209456/src/java/test/org/apache/zookeeper/test/SessionTrackerCheckTest.java ---------------------------------------------------------------------- diff --git a/src/java/test/org/apache/zookeeper/test/SessionTrackerCheckTest.java b/src/java/test/org/apache/zookeeper/test/SessionTrackerCheckTest.java index b484452..d8c9bb9 100644 --- a/src/java/test/org/apache/zookeeper/test/SessionTrackerCheckTest.java +++ b/src/java/test/org/apache/zookeeper/test/SessionTrackerCheckTest.java @@ -98,8 +98,7 @@ public class SessionTrackerCheckTest extends ZKTestCase { } // Local session - sessionId = 0xf005ba11; - tracker.addSession(sessionId, CONNECTION_TIMEOUT); + sessionId = tracker.createSession(CONNECTION_TIMEOUT); try { tracker.checkSession(sessionId, null); } catch (Exception e) { @@ -144,8 +143,8 @@ public class SessionTrackerCheckTest extends ZKTestCase { Assert.fail("local session from other server should not fail"); } - // Global session - tracker.addGlobalSession(sessionId, CONNECTION_TIMEOUT); + // Track global session + tracker.trackSession(sessionId, CONNECTION_TIMEOUT); try { tracker.checkSession(sessionId, null); } catch (Exception e) { @@ -158,9 +157,7 @@ public class SessionTrackerCheckTest extends ZKTestCase { } // Local session from the leader - sessionId = (expirer.sid << 56) + 1; - ; - tracker.addSession(sessionId, CONNECTION_TIMEOUT); + sessionId = tracker.createSession(CONNECTION_TIMEOUT); try { tracker.checkSession(sessionId, null); } catch (Exception e) { @@ -168,7 +165,7 @@ public class SessionTrackerCheckTest extends ZKTestCase { } // During session upgrade - tracker.addGlobalSession(sessionId, CONNECTION_TIMEOUT); + tracker.trackSession(sessionId, CONNECTION_TIMEOUT); try { tracker.checkSession(sessionId, null); } catch (Exception e) { @@ -186,7 +183,7 @@ public class SessionTrackerCheckTest extends ZKTestCase { // Global session sessionId = 0xdeadbeef; - tracker.addSession(sessionId, CONNECTION_TIMEOUT); + tracker.trackSession(sessionId, CONNECTION_TIMEOUT); try { tracker.checkSession(sessionId, null); } catch (Exception e) {