Delay reconnect on session expired
Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/651ac591 Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/651ac591 Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/651ac591 Branch: refs/heads/CURATOR-3.0 Commit: 651ac591725567bf85ec830a45b9b062d5dd7474 Parents: 6d8c0be Author: Zoltan Szekeres <zoltan.szeke...@morganstanley.com> Authored: Thu Jan 26 09:03:20 2017 -0500 Committer: Zoltan Szekeres <zoltan.szeke...@morganstanley.com> Committed: Thu Jan 26 09:03:20 2017 -0500 ---------------------------------------------------------------------- .../org/apache/curator/ConnectionState.java | 49 +++++++++---- .../framework/imps/TestBlockUntilConnected.java | 75 ++++++++++++++++++++ 2 files changed, 110 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/651ac591/curator-client/src/main/java/org/apache/curator/ConnectionState.java ---------------------------------------------------------------------- diff --git a/curator-client/src/main/java/org/apache/curator/ConnectionState.java b/curator-client/src/main/java/org/apache/curator/ConnectionState.java index 7044ddf..bb4f08e 100644 --- a/curator-client/src/main/java/org/apache/curator/ConnectionState.java +++ b/curator-client/src/main/java/org/apache/curator/ConnectionState.java @@ -149,7 +149,9 @@ class ConnectionState implements Watcher, Closeable log.debug("ConnectState watcher: " + event); } - if ( event.getType() == Watcher.Event.EventType.None ) + final boolean eventTypeNone = event.getType() == Watcher.Event.EventType.None; + + if ( eventTypeNone ) { boolean wasConnected = isConnected.get(); boolean newIsConnected = checkState(event.getState(), wasConnected); @@ -160,13 +162,33 @@ class ConnectionState implements Watcher, Closeable } } + // only wait during tests + assert waitOnExpiredEvent(event.getState()); + for ( Watcher parentWatcher : parentWatchers ) { - OperationTrace trace = new OperationTrace("connection-state-parent-process", tracer.get(), getSessionId()); parentWatcher.process(event); trace.commit(); } + + if (eventTypeNone) handleState(event.getState()); + } + + // only for testing + private boolean waitOnExpiredEvent(Event.KeeperState currentState) + { + if (currentState == Event.KeeperState.Expired) + { + log.debug("Waiting on Expired event for testing"); + try + { + Thread.sleep(1000); + } + catch(InterruptedException e) {} + log.debug("Continue processing"); + } + return true; } EnsembleProvider getEnsembleProvider() @@ -240,11 +262,11 @@ class ConnectionState implements Watcher, Closeable private boolean checkState(Event.KeeperState state, boolean wasConnected) { boolean isConnected = wasConnected; - boolean checkNewConnectionString = true; switch ( state ) { default: case Disconnected: + case Expired: { isConnected = false; break; @@ -264,14 +286,6 @@ class ConnectionState implements Watcher, Closeable break; } - case Expired: - { - isConnected = false; - checkNewConnectionString = false; - handleExpiredSession(); - break; - } - case SaslAuthenticated: { // NOP @@ -283,12 +297,19 @@ class ConnectionState implements Watcher, Closeable new EventTrace(state.toString(), tracer.get(), getSessionId()).commit(); } - if ( checkNewConnectionString && zooKeeper.hasNewConnectionString() ) + return isConnected; + } + + private void handleState(Event.KeeperState state) + { + if (state == Event.KeeperState.Expired) + { + handleExpiredSession(); + } + else if (zooKeeper.hasNewConnectionString()) { handleNewConnectionString(); } - - return isConnected; } private void handleNewConnectionString() http://git-wip-us.apache.org/repos/asf/curator/blob/651ac591/curator-framework/src/test/java/org/apache/curator/framework/imps/TestBlockUntilConnected.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestBlockUntilConnected.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestBlockUntilConnected.java index a6dc7ab..7ea0849 100644 --- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestBlockUntilConnected.java +++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestBlockUntilConnected.java @@ -21,6 +21,9 @@ package org.apache.curator.framework.imps; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.api.CuratorEvent; +import org.apache.curator.framework.api.CuratorEventType; +import org.apache.curator.framework.api.CuratorListener; import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.framework.state.ConnectionStateListener; import org.apache.curator.retry.RetryOneTime; @@ -28,6 +31,7 @@ import org.apache.curator.test.BaseClassForTests; import org.apache.curator.test.TestingServer; import org.apache.curator.test.Timing; import org.apache.curator.utils.CloseableUtils; +import org.apache.zookeeper.Watcher; import org.testng.Assert; import org.testng.annotations.Test; import java.util.Timer; @@ -256,4 +260,75 @@ public class TestBlockUntilConnected extends BaseClassForTests } } } + + /** + * Test that we got disconnected before calling blockUntilConnected and we reconnect we receive a session expired event. + */ + @Test + public void testBlockUntilConnectedSessionExpired() throws Exception + { + Timing timing = new Timing(); + CuratorFramework client = CuratorFrameworkFactory.builder(). + connectString(server.getConnectString()). + retryPolicy(new RetryOneTime(1)). + build(); + + final CountDownLatch lostLatch = new CountDownLatch(1); + client.getConnectionStateListenable().addListener(new ConnectionStateListener() + { + + @Override + public void stateChanged(CuratorFramework client, ConnectionState newState) + { + if ( newState == ConnectionState.LOST ) + { + lostLatch.countDown(); + } + } + }); + + final CountDownLatch expiredLatch = new CountDownLatch(1); + client.getCuratorListenable().addListener(new CuratorListener() { + @Override + public void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception { + if (event.getType() == CuratorEventType.WATCHED && event.getWatchedEvent().getState() == Watcher.Event.KeeperState.Expired) + { + expiredLatch.countDown(); + } + } + }); + + try + { + client.start(); + + //Block until we're connected + Assert.assertTrue(client.blockUntilConnected(5, TimeUnit.SECONDS), "Failed to connect"); + + final long sessionTimeoutMs = client.getZookeeperClient().getConnectionTimeoutMs(); + + //Kill the server + CloseableUtils.closeQuietly(server); + + //Wait until we hit the lost state + Assert.assertTrue(timing.awaitLatch(lostLatch), "Failed to reach LOST state"); + + Thread.sleep(sessionTimeoutMs); + + server = new TestingServer(server.getPort(), server.getTempDirectory()); + + //Wait until we get expired event + Assert.assertTrue(timing.awaitLatch(expiredLatch), "Failed to get Expired event"); + + Assert.assertTrue(client.blockUntilConnected(50, TimeUnit.SECONDS), "Not connected"); + } + catch ( Exception e ) + { + Assert.fail("Unexpected exception " + e); + } + finally + { + CloseableUtils.closeQuietly(client); + } + } }