Merge branch 'CURATOR-3.0' into CURATOR-248 Conflicts: curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/c117b085 Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/c117b085 Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/c117b085 Branch: refs/heads/CURATOR-248 Commit: c117b0853b5779829ff732514aedd7d60b696ccc Parents: d412f23 d57aaeb Author: randgalt <randg...@apache.org> Authored: Tue Sep 1 06:27:31 2015 -0700 Committer: randgalt <randg...@apache.org> Committed: Tue Sep 1 06:27:31 2015 -0700 ---------------------------------------------------------------------- .../org/apache/curator/ConnectionState.java | 108 ++++--- .../apache/curator/CuratorZookeeperClient.java | 112 +++++-- .../java/org/apache/curator/HandleHolder.java | 19 ++ .../main/java/org/apache/curator/RetryLoop.java | 18 +- .../ClassicConnectionHandlingPolicy.java | 68 +++++ .../connection/ConnectionHandlingPolicy.java | 72 +++++ .../StandardConnectionHandlingPolicy.java | 56 ++++ .../org/apache/curator/retry/RetryForever.java | 60 ++++ .../org/apache/curator/utils/DebugUtils.java | 11 +- .../java/org/apache/curator/TestEnsurePath.java | 3 + .../java/org/apache/curator/TestRetryLoop.java | 20 ++ .../framework/CuratorFrameworkFactory.java | 58 ++++ ...reateModeStatBackgroundPathAndBytesable.java | 25 ++ .../api/BackgroundPathableQuietlyable.java | 18 ++ .../api/CreateBackgroundModeStatACLable.java | 70 +++++ .../curator/framework/api/CreateBuilder.java | 9 +- ...ateProtectACLCreateModePathAndBytesable.java | 72 +++++ ...rotectACLCreateModeStatPathAndBytesable.java | 25 ++ .../framework/api/UnhandledErrorListener.java | 4 +- .../imps/ClassicInternalConnectionHandler.java | 58 ++++ .../framework/imps/CreateBuilderImpl.java | 299 ++++++++++++++++++- .../framework/imps/CuratorFrameworkImpl.java | 86 +++--- .../imps/InternalConnectionHandler.java | 10 + .../imps/StandardInternalConnectionHandler.java | 22 ++ .../framework/state/ConnectionState.java | 27 +- .../framework/state/ConnectionStateManager.java | 91 ++++-- .../framework/imps/TestBlockUntilConnected.java | 1 + .../framework/imps/TestCreateReturningStat.java | 199 ++++++++++++ .../imps/TestEnabledSessionExpiredState.java | 179 +++++++++++ .../framework/imps/TestFrameworkEdges.java | 6 +- .../framework/recipes/cache/NodeCache.java | 41 ++- ...estResetConnectionWithBackgroundFailure.java | 19 +- .../recipes/leader/TestLeaderLatch.java | 15 +- .../recipes/leader/TestLeaderSelector.java | 5 +- .../recipes/leader/TestLeaderSelectorEdges.java | 6 +- .../locks/TestInterProcessMutexBase.java | 19 +- .../apache/curator/test/BaseClassForTests.java | 37 ++- .../java/org/apache/curator/test/Timing.java | 35 ++- curator-x-discovery-server/pom.xml | 6 + curator-x-discovery/pom.xml | 6 + .../discovery/details/TestServiceDiscovery.java | 2 + curator-x-rpc/pom.xml | 6 + src/site/confluence/errors.confluence | 6 +- src/site/confluence/index.confluence | 7 + 44 files changed, 1798 insertions(+), 218 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/c117b085/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java ---------------------------------------------------------------------- diff --cc curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java index 9a67684,daffa13..2b01b30 --- a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java @@@ -31,8 -34,7 +34,9 @@@ import org.apache.curator.framework.imp import org.apache.curator.framework.imps.CuratorTempFrameworkImpl; import org.apache.curator.framework.imps.DefaultACLProvider; import org.apache.curator.framework.imps.GzipCompressionProvider; +import org.apache.curator.framework.state.ErrorPolicy; +import org.apache.curator.framework.state.StandardErrorPolicy; + import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.utils.DefaultZookeeperFactory; import org.apache.curator.utils.ZookeeperFactory; import org.apache.zookeeper.CreateMode; @@@ -118,7 -121,7 +123,8 @@@ public class CuratorFrameworkFactor private ACLProvider aclProvider = DEFAULT_ACL_PROVIDER; private boolean canBeReadOnly = false; private boolean useContainerParentsIfAvailable = true; + private ErrorPolicy errorPolicy = new StandardErrorPolicy(); + private ConnectionHandlingPolicy connectionHandlingPolicy = Boolean.getBoolean("curator-use-classic-connection-handling") ? new ClassicConnectionHandlingPolicy() : new StandardConnectionHandlingPolicy(); /** * Apply the current values and build a new CuratorFramework @@@ -347,18 -350,52 +353,65 @@@ } /** + * Set the error policy to use. The default is {@link StandardErrorPolicy} + * + * @since 3.0.0 + * @param errorPolicy new error policy + * @return this + */ + public Builder errorPolicy(ErrorPolicy errorPolicy) + { + this.errorPolicy = errorPolicy; + return this; + } + ++ /** + * <p> + * Change the connection handling policy. The default policy is {@link StandardConnectionHandlingPolicy}. + * </p> + * <p> + * <strong>IMPORTANT: </strong> StandardConnectionHandlingPolicy has different behavior than the connection + * policy handling prior to version 3.0.0. You can specify that the connection handling be the method + * prior to 3.0.0 by passing in an instance of {@link ClassicConnectionHandlingPolicy} here or by + * setting the command line value "curator-use-classic-connection-handling" to true (e.g. <tt>-Dcurator-use-classic-connection-handling=true</tt>). + * </p> + * <p> + * Major differences from the older behavior are: + * </p> + * <ul> + * <li> + * Session/connection timeouts are no longer managed by the low-level client. They are managed + * by the CuratorFramework instance. There should be no noticeable differences. + * </li> + * <li> + * Prior to 3.0.0, each iteration of the retry policy would allow the connection timeout to elapse + * if the connection hadn't yet succeeded. This meant that the true connection timeout was the configured + * value times the maximum retries in the retry policy. This longstanding issue has been address. + * Now, the connection timeout can elapse only once for a single API call. + * </li> + * <li> + * <strong>MOST IMPORTANTLY!</strong> Prior to 3.0.0, {@link ConnectionState#LOST} did not imply + * a lost session (much to the confusion of users). Now, + * Curator will set the LOST state only when it believes that the ZooKeeper session + * has expired. ZooKeeper connections have a session. When the session expires, clients must take appropriate + * action. In Curator, this is complicated by the fact that Curator internally manages the ZooKeeper + * connection. Now, Curator will set the LOST state when any of the following occurs: + * a) ZooKeeper returns a {@link Watcher.Event.KeeperState#Expired} or {@link KeeperException.Code#SESSIONEXPIRED}; + * b) Curator closes the internally managed ZooKeeper instance; c) The session timeout + * elapses during a network partition. + * </li> + * </ul> + * + * @param connectionHandlingPolicy the policy + * @return this + * @since 3.0.0 + */ + public Builder connectionHandlingPolicy(ConnectionHandlingPolicy connectionHandlingPolicy) + { + this.connectionHandlingPolicy = connectionHandlingPolicy; + return this; + } + public ACLProvider getAclProvider() { return aclProvider; @@@ -414,11 -451,11 +467,16 @@@ return useContainerParentsIfAvailable; } + public ErrorPolicy getErrorPolicy() + { + return errorPolicy; + } + + public ConnectionHandlingPolicy getConnectionHandlingPolicy() + { + return connectionHandlingPolicy; + } + @Deprecated public String getAuthScheme() { http://git-wip-us.apache.org/repos/asf/curator/blob/c117b085/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java ---------------------------------------------------------------------- diff --cc curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java index 3310daf,b04987d..7651731 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java @@@ -84,7 -84,8 +85,9 @@@ public class CuratorFrameworkImpl imple private final NamespaceFacadeCache namespaceFacadeCache; private final NamespaceWatcherMap namespaceWatcherMap = new NamespaceWatcherMap(this); private final boolean useContainerParentsIfAvailable; + private final ErrorPolicy errorPolicy; + private final AtomicLong currentInstanceIndex = new AtomicLong(-1); + private final InternalConnectionHandler internalConnectionHandler; private volatile ExecutorService executorService; private final AtomicBoolean logAsErrorConnectionErrors = new AtomicBoolean(false); @@@ -200,7 -211,7 +214,8 @@@ state = parent.state; authInfos = parent.authInfos; useContainerParentsIfAvailable = parent.useContainerParentsIfAvailable; + errorPolicy = parent.errorPolicy; + internalConnectionHandler = parent.internalConnectionHandler; } @Override http://git-wip-us.apache.org/repos/asf/curator/blob/c117b085/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java ---------------------------------------------------------------------- diff --cc curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java index 41b53fd,3742fb7..fcebae4 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java @@@ -38,9 -35,8 +38,10 @@@ import org.apache.curator.test.Timing import org.apache.curator.utils.CloseableUtils; import org.testng.Assert; import org.testng.annotations.Test; ++import java.util.Arrays; import java.util.Collection; import java.util.List; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorCompletionService; @@@ -57,163 -53,6 +58,167 @@@ public class TestLeaderLatch extends Ba private static final int MAX_LOOPS = 5; @Test + public void testSessionErrorPolicy() throws Exception + { + Timing timing = new Timing(); + LeaderLatch latch = null; + CuratorFramework client = null; + for ( int i = 0; i < 2; ++i ) + { + boolean isSessionIteration = (i == 0); + try + { + client = CuratorFrameworkFactory.builder() + .connectString(server.getConnectString()) + .connectionTimeoutMs(10000) + .sessionTimeoutMs(60000) + .retryPolicy(new RetryOneTime(1)) + .errorPolicy(isSessionIteration ? new SessionErrorPolicy() : new StandardErrorPolicy()) + .build(); + final BlockingQueue<String> states = Queues.newLinkedBlockingQueue(); + ConnectionStateListener stateListener = new ConnectionStateListener() + { + @Override + public void stateChanged(CuratorFramework client, ConnectionState newState) + { + states.add(newState.name()); + } + }; + client.getConnectionStateListenable().addListener(stateListener); + client.start(); + + latch = new LeaderLatch(client, "/test"); + LeaderLatchListener listener = new LeaderLatchListener() + { + @Override + public void isLeader() + { + states.add("true"); + } + + @Override + public void notLeader() + { + states.add("false"); + } + }; + latch.addListener(listener); + latch.start(); + Assert.assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.CONNECTED.name()); + Assert.assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), "true"); + server.stop(); + if ( isSessionIteration ) + { + Assert.assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.SUSPENDED.name()); + server.restart(); + Assert.assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.RECONNECTED.name()); + Assert.assertNull(states.poll(timing.milliseconds(), TimeUnit.MILLISECONDS)); + } + else + { + String s = states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS); + Assert.assertTrue("false".equals(s) || ConnectionState.SUSPENDED.name().equals(s)); + s = states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS); + Assert.assertTrue("false".equals(s) || ConnectionState.SUSPENDED.name().equals(s)); + server.restart(); + Assert.assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.RECONNECTED.name()); + Assert.assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), "true"); + } + } + finally + { + CloseableUtils.closeQuietly(latch); + CloseableUtils.closeQuietly(client); + } + } + } + + @Test + public void testErrorPolicies() throws Exception + { + Timing timing = new Timing(); + LeaderLatch latch = null; + CuratorFramework client = CuratorFrameworkFactory.builder() + .connectString(server.getConnectString()) + .connectionTimeoutMs(1000) + .sessionTimeoutMs(timing.session()) + .retryPolicy(new RetryOneTime(1)) + .errorPolicy(new StandardErrorPolicy()) + .build(); + try + { + final BlockingQueue<String> states = Queues.newLinkedBlockingQueue(); + ConnectionStateListener stateListener = new ConnectionStateListener() + { + @Override + public void stateChanged(CuratorFramework client, ConnectionState newState) + { + states.add(newState.name()); + } + }; + client.getConnectionStateListenable().addListener(stateListener); + client.start(); + latch = new LeaderLatch(client, "/test"); + LeaderLatchListener listener = new LeaderLatchListener() + { + @Override + public void isLeader() + { + states.add("true"); + } + + @Override + public void notLeader() + { + states.add("false"); + } + }; + latch.addListener(listener); + latch.start(); + Assert.assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.CONNECTED.name()); + Assert.assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), "true"); + server.close(); - Assert.assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.SUSPENDED.name()); - Assert.assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), "false"); - Assert.assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.LOST.name()); ++ List<String> next = Lists.newArrayList(); ++ next.add(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS)); ++ next.add(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS)); ++ Assert.assertTrue(next.equals(Arrays.asList(ConnectionState.SUSPENDED.name(), "false")) || next.equals(Arrays.asList("false", ConnectionState.SUSPENDED.name())), next.toString()); ++ Assert.assertEquals(states.poll(timing.forSessionSleep().milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.LOST.name()); + latch.close(); + client.close(); + + timing.sleepABit(); + states.clear(); + + server = new TestingServer(); + client = CuratorFrameworkFactory.builder() + .connectString(server.getConnectString()) + .connectionTimeoutMs(1000) + .sessionTimeoutMs(timing.session()) + .retryPolicy(new RetryOneTime(1)) + .errorPolicy(new SessionErrorPolicy()) + .build(); + client.getConnectionStateListenable().addListener(stateListener); + client.start(); + latch = new LeaderLatch(client, "/test"); + latch.addListener(listener); + latch.start(); + Assert.assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.CONNECTED.name()); + Assert.assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), "true"); + server.close(); + Assert.assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.SUSPENDED.name()); - Assert.assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.LOST.name()); - Assert.assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), "false"); ++ next = Lists.newArrayList(); ++ next.add(states.poll(timing.forSessionSleep().milliseconds(), TimeUnit.MILLISECONDS)); ++ next.add(states.poll(timing.forSessionSleep().milliseconds(), TimeUnit.MILLISECONDS)); ++ Assert.assertTrue(next.equals(Arrays.asList(ConnectionState.LOST.name(), "false")) || next.equals(Arrays.asList("false", ConnectionState.LOST.name())), next.toString()); + } + finally + { + CloseableUtils.closeQuietly(latch); + CloseableUtils.closeQuietly(client); + } + } + + @Test public void testProperCloseWithoutConnectionEstablished() throws Exception { server.stop(); http://git-wip-us.apache.org/repos/asf/curator/blob/c117b085/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelector.java ---------------------------------------------------------------------- diff --cc curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelector.java index ae19b3c,c7f415c..e5b9717 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelector.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelector.java @@@ -20,7 -20,6 +20,8 @@@ package org.apache.curator.framework.recipes.leader; import com.google.common.collect.Lists; +import com.google.common.collect.Queues; ++import org.apache.curator.connection.StandardConnectionHandlingPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.state.ConnectionState; @@@ -53,93 -50,6 +54,93 @@@ public class TestLeaderSelector extend private static final String PATH_NAME = "/one/two/me"; @Test + public void testErrorPolicies() throws Exception + { + Timing timing = new Timing(); + LeaderSelector selector = null; + CuratorFramework client = CuratorFrameworkFactory + .builder() + .connectString(server.getConnectString()) + .connectionTimeoutMs(timing.connection()) + .sessionTimeoutMs(timing.session()) + .retryPolicy(new RetryOneTime(1)) + .errorPolicy(new StandardErrorPolicy()) + .build(); + try + { + final BlockingQueue<String> changes = Queues.newLinkedBlockingQueue(); + + ConnectionStateListener stateListener = new ConnectionStateListener() + { + @Override + public void stateChanged(CuratorFramework client, ConnectionState newState) + { + changes.add(newState.name()); + } + }; + client.getConnectionStateListenable().addListener(stateListener); + client.start(); + LeaderSelectorListener listener = new LeaderSelectorListenerAdapter() + { + @Override + public void takeLeadership(CuratorFramework client) throws Exception + { + changes.add("leader"); + try + { + Thread.currentThread().join(); + } + catch ( InterruptedException e ) + { + changes.add("release"); + Thread.currentThread().interrupt(); + } + } + }; + selector = new LeaderSelector(client, "/test", listener); + selector.start(); + + Assert.assertEquals(changes.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.CONNECTED.name()); + Assert.assertEquals(changes.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), "leader"); + server.close(); + Assert.assertEquals(changes.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.SUSPENDED.name()); + Assert.assertEquals(changes.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), "release"); - Assert.assertEquals(changes.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.LOST.name()); ++ Assert.assertEquals(changes.poll(timing.forSessionSleep().milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.LOST.name()); + + selector.close(); + client.close(); + timing.sleepABit(); + changes.clear(); + + server = new TestingServer(); + client = CuratorFrameworkFactory + .builder() + .connectString(server.getConnectString()) + .connectionTimeoutMs(timing.connection()) + .sessionTimeoutMs(timing.session()) + .retryPolicy(new RetryOneTime(1)) + .errorPolicy(new SessionErrorPolicy()) + .build(); + client.getConnectionStateListenable().addListener(stateListener); + client.start(); + selector = new LeaderSelector(client, "/test", listener); + selector.start(); + + Assert.assertEquals(changes.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.CONNECTED.name()); + Assert.assertEquals(changes.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), "leader"); + server.stop(); + Assert.assertEquals(changes.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.SUSPENDED.name()); - Assert.assertEquals(changes.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.LOST.name()); ++ Assert.assertEquals(changes.poll(timing.forSessionSleep().milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.LOST.name()); + Assert.assertEquals(changes.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), "release"); + } + finally + { + CloseableUtils.closeQuietly(selector); + CloseableUtils.closeQuietly(client); + } + } + + @Test public void testLeaderNodeDeleteOnInterrupt() throws Exception { Timing timing = new Timing(); http://git-wip-us.apache.org/repos/asf/curator/blob/c117b085/src/site/confluence/errors.confluence ----------------------------------------------------------------------