Repository: curator Updated Branches: refs/heads/CURATOR-460 [created] 5920c7445
CURATOR-460 Adjust to obey sessionExpirationPercent value Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/72b7b870 Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/72b7b870 Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/72b7b870 Branch: refs/heads/CURATOR-460 Commit: 72b7b8709c822b9170d8d7beb2814c4c36b89e58 Parents: eb6ad40 Author: javando <antonio.rafael...@gmail.com> Authored: Mon Apr 2 23:27:13 2018 -0300 Committer: javando <antonio.rafael...@gmail.com> Committed: Mon Apr 2 23:27:13 2018 -0300 ---------------------------------------------------------------------- .../framework/state/ConnectionStateManager.java | 1 + .../recipes/leader/TestLeaderLatch.java | 30 ++++++++++++++++++++ 2 files changed, 31 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/72b7b870/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java index 0c8ddf8..fedcedf 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java @@ -253,6 +253,7 @@ public class ConnectionStateManager implements Closeable { int lastNegotiatedSessionTimeoutMs = client.getZookeeperClient().getLastNegotiatedSessionTimeoutMs(); int useSessionTimeoutMs = (lastNegotiatedSessionTimeoutMs > 0) ? lastNegotiatedSessionTimeoutMs : sessionTimeoutMs; + useSessionTimeoutMs = sessionExpirationPercent > 0 && startOfSuspendedEpoch != 0 ? (useSessionTimeoutMs * sessionExpirationPercent) / 100 : useSessionTimeoutMs; long elapsedMs = startOfSuspendedEpoch == 0 ? useSessionTimeoutMs / 2 : System.currentTimeMillis() - startOfSuspendedEpoch; long pollMaxMs = useSessionTimeoutMs - elapsedMs; http://git-wip-us.apache.org/repos/asf/curator/blob/72b7b870/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java index 011e4a0..af1475d 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java @@ -23,6 +23,7 @@ import com.google.common.base.Throwables; import com.google.common.collect.Lists; import com.google.common.collect.Queues; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.curator.connection.StandardConnectionHandlingPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.imps.TestCleanState; @@ -222,6 +223,35 @@ public class TestLeaderLatch extends BaseClassForTests next.add(states.poll(timing.forSessionSleep().milliseconds(), TimeUnit.MILLISECONDS)); next.add(states.poll(timing.forSessionSleep().milliseconds(), TimeUnit.MILLISECONDS)); Assert.assertTrue(next.equals(Arrays.asList(ConnectionState.LOST.name(), "false")) || next.equals(Arrays.asList("false", ConnectionState.LOST.name())), next.toString()); + + latch.close(); + client.close(); + + timing.sleepABit(); + states.clear(); + server = new TestingServer(); + client = CuratorFrameworkFactory.builder() + .connectString(server.getConnectString()) + .connectionTimeoutMs(1000) + .sessionTimeoutMs(timing.session()) + .retryPolicy(new RetryOneTime(1)) + .connectionStateErrorPolicy(new SessionConnectionStateErrorPolicy()) + .connectionHandlingPolicy(new StandardConnectionHandlingPolicy(30)) + .build(); + client.getConnectionStateListenable().addListener(stateListener); + client.start(); + latch = new LeaderLatch(client, "/test"); + latch.addListener(listener); + latch.start(); + Assert.assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.CONNECTED.name()); + Assert.assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), "true"); + server.close(); + Assert.assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.SUSPENDED.name()); + next = Lists.newArrayList(); + + next.add(states.poll(timing.session() / 3, TimeUnit.MILLISECONDS)); + next.add(states.poll(timing.forSleepingABit().milliseconds(), TimeUnit.MILLISECONDS)); + Assert.assertTrue(next.equals(Arrays.asList(ConnectionState.LOST.name(), "false")) || next.equals(Arrays.asList("false", ConnectionState.LOST.name())), next.toString()); } finally {