Updated LeaderLatch for error policy
Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/5429a217 Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/5429a217 Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/5429a217 Branch: refs/heads/CURATOR-3.0 Commit: 5429a217bb23901aaf2b187bb8c1d760d0a76bcc Parents: 94dff8a Author: randgalt <randg...@apache.org> Authored: Mon Aug 24 17:39:41 2015 -0500 Committer: randgalt <randg...@apache.org> Committed: Mon Aug 24 17:39:41 2015 -0500 ---------------------------------------------------------------------- .../framework/recipes/leader/LeaderLatch.java | 39 +++-- .../recipes/leader/TestLeaderLatch.java | 162 +++++++++++++++++++ 2 files changed, 187 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/5429a217/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java index da9b8b2..aa4dd9f 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java @@ -160,20 +160,20 @@ public class LeaderLatch implements Closeable Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once"); startTask.set(AfterConnectionEstablished.execute(client, new Runnable() + { + @Override + public void run() + { + try { - @Override - public void run() - { - try - { - internalStart(); - } - finally - { - startTask.set(null); - } - } - })); + internalStart(); + } + finally + { + startTask.set(null); + } + } + })); } /** @@ -604,7 +604,10 @@ public class LeaderLatch implements Closeable { try { - reset(); + if ( client.getErrorPolicy().isErrorState(ConnectionState.SUSPENDED) || !hasLeadership.get() ) + { + reset(); + } } catch ( Exception e ) { @@ -615,6 +618,14 @@ public class LeaderLatch implements Closeable } case SUSPENDED: + { + if ( client.getErrorPolicy().isErrorState(ConnectionState.SUSPENDED) ) + { + setLeadership(false); + } + break; + } + case LOST: { setLeadership(false); http://git-wip-us.apache.org/repos/asf/curator/blob/5429a217/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java index 3742fb7..bd73e9d 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java @@ -21,12 +21,15 @@ package org.apache.curator.framework.recipes.leader; import com.google.common.base.Throwables; import com.google.common.collect.Lists; +import com.google.common.collect.Queues; import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.imps.TestCleanState; import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.framework.state.ConnectionStateListener; +import org.apache.curator.framework.state.SessionErrorPolicy; +import org.apache.curator.framework.state.StandardErrorPolicy; import org.apache.curator.retry.RetryNTimes; import org.apache.curator.retry.RetryOneTime; import org.apache.curator.test.BaseClassForTests; @@ -37,11 +40,13 @@ import org.testng.Assert; import org.testng.annotations.Test; import java.util.Collection; import java.util.List; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -53,6 +58,163 @@ public class TestLeaderLatch extends BaseClassForTests private static final int MAX_LOOPS = 5; @Test + public void testSessionErrorPolicy() throws Exception + { + Timing timing = new Timing(); + LeaderLatch latch = null; + CuratorFramework client = null; + for ( int i = 0; i < 2; ++i ) + { + boolean isSessionIteration = (i == 0); + try + { + client = CuratorFrameworkFactory.builder() + .connectString(server.getConnectString()) + .connectionTimeoutMs(10000) + .sessionTimeoutMs(60000) + .retryPolicy(new RetryOneTime(1)) + .errorPolicy(isSessionIteration ? new SessionErrorPolicy() : new StandardErrorPolicy()) + .build(); + final BlockingQueue<String> states = Queues.newLinkedBlockingQueue(); + ConnectionStateListener stateListener = new ConnectionStateListener() + { + @Override + public void stateChanged(CuratorFramework client, ConnectionState newState) + { + states.add(newState.name()); + } + }; + client.getConnectionStateListenable().addListener(stateListener); + client.start(); + + latch = new LeaderLatch(client, "/test"); + LeaderLatchListener listener = new LeaderLatchListener() + { + @Override + public void isLeader() + { + states.add("true"); + } + + @Override + public void notLeader() + { + states.add("false"); + } + }; + latch.addListener(listener); + latch.start(); + Assert.assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.CONNECTED.name()); + Assert.assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), "true"); + server.stop(); + if ( isSessionIteration ) + { + Assert.assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.SUSPENDED.name()); + server.restart(); + Assert.assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.RECONNECTED.name()); + Assert.assertNull(states.poll(timing.milliseconds(), TimeUnit.MILLISECONDS)); + } + else + { + String s = states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS); + Assert.assertTrue("false".equals(s) || ConnectionState.SUSPENDED.name().equals(s)); + s = states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS); + Assert.assertTrue("false".equals(s) || ConnectionState.SUSPENDED.name().equals(s)); + server.restart(); + Assert.assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.RECONNECTED.name()); + Assert.assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), "true"); + } + } + finally + { + CloseableUtils.closeQuietly(latch); + CloseableUtils.closeQuietly(client); + } + } + } + + @Test + public void testErrorPolicies() throws Exception + { + Timing timing = new Timing(); + LeaderLatch latch = null; + CuratorFramework client = CuratorFrameworkFactory.builder() + .connectString(server.getConnectString()) + .connectionTimeoutMs(1000) + .sessionTimeoutMs(timing.session()) + .retryPolicy(new RetryOneTime(1)) + .errorPolicy(new StandardErrorPolicy()) + .build(); + try + { + final BlockingQueue<String> states = Queues.newLinkedBlockingQueue(); + ConnectionStateListener stateListener = new ConnectionStateListener() + { + @Override + public void stateChanged(CuratorFramework client, ConnectionState newState) + { + states.add(newState.name()); + } + }; + client.getConnectionStateListenable().addListener(stateListener); + client.start(); + latch = new LeaderLatch(client, "/test"); + LeaderLatchListener listener = new LeaderLatchListener() + { + @Override + public void isLeader() + { + states.add("true"); + } + + @Override + public void notLeader() + { + states.add("false"); + } + }; + latch.addListener(listener); + latch.start(); + Assert.assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.CONNECTED.name()); + Assert.assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), "true"); + server.close(); + Assert.assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.SUSPENDED.name()); + Assert.assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), "false"); + Assert.assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.LOST.name()); + latch.close(); + client.close(); + + timing.sleepABit(); + states.clear(); + + server = new TestingServer(); + client = CuratorFrameworkFactory.builder() + .connectString(server.getConnectString()) + .connectionTimeoutMs(1000) + .sessionTimeoutMs(timing.session()) + .retryPolicy(new RetryOneTime(1)) + .errorPolicy(new SessionErrorPolicy()) + .build(); + client.getConnectionStateListenable().addListener(stateListener); + client.start(); + latch = new LeaderLatch(client, "/test"); + latch.addListener(listener); + latch.start(); + Assert.assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.CONNECTED.name()); + Assert.assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), "true"); + server.close(); + Assert.assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.SUSPENDED.name()); + Assert.assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.LOST.name()); + Assert.assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), "false"); + } + finally + { + CloseableUtils.closeQuietly(latch); + CloseableUtils.closeQuietly(client); + } + } + + @Test public void testProperCloseWithoutConnectionEstablished() throws Exception { server.stop();