Merge branch 'CURATOR-3.0' into CURATOR-248

Conflicts:
        
curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
        
curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/c117b085
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/c117b085
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/c117b085

Branch: refs/heads/CURATOR-248
Commit: c117b0853b5779829ff732514aedd7d60b696ccc
Parents: d412f23 d57aaeb
Author: randgalt <randg...@apache.org>
Authored: Tue Sep 1 06:27:31 2015 -0700
Committer: randgalt <randg...@apache.org>
Committed: Tue Sep 1 06:27:31 2015 -0700

----------------------------------------------------------------------
 .../org/apache/curator/ConnectionState.java     | 108 ++++---
 .../apache/curator/CuratorZookeeperClient.java  | 112 +++++--
 .../java/org/apache/curator/HandleHolder.java   |  19 ++
 .../main/java/org/apache/curator/RetryLoop.java |  18 +-
 .../ClassicConnectionHandlingPolicy.java        |  68 +++++
 .../connection/ConnectionHandlingPolicy.java    |  72 +++++
 .../StandardConnectionHandlingPolicy.java       |  56 ++++
 .../org/apache/curator/retry/RetryForever.java  |  60 ++++
 .../org/apache/curator/utils/DebugUtils.java    |  11 +-
 .../java/org/apache/curator/TestEnsurePath.java |   3 +
 .../java/org/apache/curator/TestRetryLoop.java  |  20 ++
 .../framework/CuratorFrameworkFactory.java      |  58 ++++
 ...reateModeStatBackgroundPathAndBytesable.java |  25 ++
 .../api/BackgroundPathableQuietlyable.java      |  18 ++
 .../api/CreateBackgroundModeStatACLable.java    |  70 +++++
 .../curator/framework/api/CreateBuilder.java    |   9 +-
 ...ateProtectACLCreateModePathAndBytesable.java |  72 +++++
 ...rotectACLCreateModeStatPathAndBytesable.java |  25 ++
 .../framework/api/UnhandledErrorListener.java   |   4 +-
 .../imps/ClassicInternalConnectionHandler.java  |  58 ++++
 .../framework/imps/CreateBuilderImpl.java       | 299 ++++++++++++++++++-
 .../framework/imps/CuratorFrameworkImpl.java    |  86 +++---
 .../imps/InternalConnectionHandler.java         |  10 +
 .../imps/StandardInternalConnectionHandler.java |  22 ++
 .../framework/state/ConnectionState.java        |  27 +-
 .../framework/state/ConnectionStateManager.java |  91 ++++--
 .../framework/imps/TestBlockUntilConnected.java |   1 +
 .../framework/imps/TestCreateReturningStat.java | 199 ++++++++++++
 .../imps/TestEnabledSessionExpiredState.java    | 179 +++++++++++
 .../framework/imps/TestFrameworkEdges.java      |   6 +-
 .../framework/recipes/cache/NodeCache.java      |  41 ++-
 ...estResetConnectionWithBackgroundFailure.java |  19 +-
 .../recipes/leader/TestLeaderLatch.java         |  15 +-
 .../recipes/leader/TestLeaderSelector.java      |   5 +-
 .../recipes/leader/TestLeaderSelectorEdges.java |   6 +-
 .../locks/TestInterProcessMutexBase.java        |  19 +-
 .../apache/curator/test/BaseClassForTests.java  |  37 ++-
 .../java/org/apache/curator/test/Timing.java    |  35 ++-
 curator-x-discovery-server/pom.xml              |   6 +
 curator-x-discovery/pom.xml                     |   6 +
 .../discovery/details/TestServiceDiscovery.java |   2 +
 curator-x-rpc/pom.xml                           |   6 +
 src/site/confluence/errors.confluence           |   6 +-
 src/site/confluence/index.confluence            |   7 +
 44 files changed, 1798 insertions(+), 218 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/c117b085/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
----------------------------------------------------------------------
diff --cc 
curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
index 9a67684,daffa13..2b01b30
--- 
a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
+++ 
b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
@@@ -31,8 -34,7 +34,9 @@@ import org.apache.curator.framework.imp
  import org.apache.curator.framework.imps.CuratorTempFrameworkImpl;
  import org.apache.curator.framework.imps.DefaultACLProvider;
  import org.apache.curator.framework.imps.GzipCompressionProvider;
 +import org.apache.curator.framework.state.ErrorPolicy;
 +import org.apache.curator.framework.state.StandardErrorPolicy;
+ import org.apache.curator.framework.state.ConnectionState;
  import org.apache.curator.utils.DefaultZookeeperFactory;
  import org.apache.curator.utils.ZookeeperFactory;
  import org.apache.zookeeper.CreateMode;
@@@ -118,7 -121,7 +123,8 @@@ public class CuratorFrameworkFactor
          private ACLProvider aclProvider = DEFAULT_ACL_PROVIDER;
          private boolean canBeReadOnly = false;
          private boolean useContainerParentsIfAvailable = true;
 +        private ErrorPolicy errorPolicy = new StandardErrorPolicy();
+         private ConnectionHandlingPolicy connectionHandlingPolicy = 
Boolean.getBoolean("curator-use-classic-connection-handling") ? new 
ClassicConnectionHandlingPolicy() : new StandardConnectionHandlingPolicy();
  
          /**
           * Apply the current values and build a new CuratorFramework
@@@ -347,18 -350,52 +353,65 @@@
          }
  
          /**
 +         * Set the error policy to use. The default is {@link 
StandardErrorPolicy}
 +         *
 +         * @since 3.0.0
 +         * @param errorPolicy new error policy
 +         * @return this
 +         */
 +        public Builder errorPolicy(ErrorPolicy errorPolicy)
 +        {
 +            this.errorPolicy = errorPolicy;
 +            return this;
 +        }
 +
++        /**
+          * <p>
+          *     Change the connection handling policy. The default policy is 
{@link StandardConnectionHandlingPolicy}.
+          * </p>
+          * <p>
+          *     <strong>IMPORTANT: </strong> StandardConnectionHandlingPolicy 
has different behavior than the connection
+          *     policy handling prior to version 3.0.0. You can specify that 
the connection handling be the method
+          *     prior to 3.0.0 by passing in an instance of {@link 
ClassicConnectionHandlingPolicy} here or by
+          *     setting the command line value 
"curator-use-classic-connection-handling" to true (e.g. 
<tt>-Dcurator-use-classic-connection-handling=true</tt>).
+          * </p>
+          * <p>
+          *     Major differences from the older behavior are:
+          * </p>
+          * <ul>
+          *     <li>
+          *         Session/connection timeouts are no longer managed by the 
low-level client. They are managed
+          *         by the CuratorFramework instance. There should be no 
noticeable differences.
+          *     </li>
+          *     <li>
+          *         Prior to 3.0.0, each iteration of the retry policy would 
allow the connection timeout to elapse
+          *         if the connection hadn't yet succeeded. This meant that 
the true connection timeout was the configured
+          *         value times the maximum retries in the retry policy. This 
longstanding issue has been address.
+          *         Now, the connection timeout can elapse only once for a 
single API call.
+          *     </li>
+          *     <li>
+          *         <strong>MOST IMPORTANTLY!</strong> Prior to 3.0.0, {@link 
ConnectionState#LOST} did not imply
+          *         a lost session (much to the confusion of users). Now,
+          *         Curator will set the LOST state only when it believes that 
the ZooKeeper session
+          *         has expired. ZooKeeper connections have a session. When 
the session expires, clients must take appropriate
+          *         action. In Curator, this is complicated by the fact that 
Curator internally manages the ZooKeeper
+          *         connection. Now, Curator will set the LOST state when any 
of the following occurs:
+          *         a) ZooKeeper returns a {@link 
Watcher.Event.KeeperState#Expired} or {@link 
KeeperException.Code#SESSIONEXPIRED};
+          *         b) Curator closes the internally managed ZooKeeper 
instance; c) The session timeout
+          *         elapses during a network partition.
+          *     </li>
+          * </ul>
+          *
+          * @param connectionHandlingPolicy the policy
+          * @return this
+          * @since 3.0.0
+          */
+         public Builder connectionHandlingPolicy(ConnectionHandlingPolicy 
connectionHandlingPolicy)
+         {
+             this.connectionHandlingPolicy = connectionHandlingPolicy;
+             return this;
+         }
+ 
          public ACLProvider getAclProvider()
          {
              return aclProvider;
@@@ -414,11 -451,11 +467,16 @@@
              return useContainerParentsIfAvailable;
          }
  
 +        public ErrorPolicy getErrorPolicy()
 +        {
 +            return errorPolicy;
 +        }
 +
+         public ConnectionHandlingPolicy getConnectionHandlingPolicy()
+         {
+             return connectionHandlingPolicy;
+         }
+ 
          @Deprecated
          public String getAuthScheme()
          {

http://git-wip-us.apache.org/repos/asf/curator/blob/c117b085/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
----------------------------------------------------------------------
diff --cc 
curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
index 3310daf,b04987d..7651731
--- 
a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
+++ 
b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
@@@ -84,7 -84,8 +85,9 @@@ public class CuratorFrameworkImpl imple
      private final NamespaceFacadeCache namespaceFacadeCache;
      private final NamespaceWatcherMap namespaceWatcherMap = new 
NamespaceWatcherMap(this);
      private final boolean useContainerParentsIfAvailable;
 +    private final ErrorPolicy errorPolicy;
+     private final AtomicLong currentInstanceIndex = new AtomicLong(-1);
+     private final InternalConnectionHandler internalConnectionHandler;
  
      private volatile ExecutorService executorService;
      private final AtomicBoolean logAsErrorConnectionErrors = new 
AtomicBoolean(false);
@@@ -200,7 -211,7 +214,8 @@@
          state = parent.state;
          authInfos = parent.authInfos;
          useContainerParentsIfAvailable = 
parent.useContainerParentsIfAvailable;
 +        errorPolicy = parent.errorPolicy;
+         internalConnectionHandler = parent.internalConnectionHandler;
      }
  
      @Override

http://git-wip-us.apache.org/repos/asf/curator/blob/c117b085/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
----------------------------------------------------------------------
diff --cc 
curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
index 41b53fd,3742fb7..fcebae4
--- 
a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
+++ 
b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
@@@ -38,9 -35,8 +38,10 @@@ import org.apache.curator.test.Timing
  import org.apache.curator.utils.CloseableUtils;
  import org.testng.Assert;
  import org.testng.annotations.Test;
++import java.util.Arrays;
  import java.util.Collection;
  import java.util.List;
 +import java.util.concurrent.BlockingQueue;
  import java.util.concurrent.Callable;
  import java.util.concurrent.CountDownLatch;
  import java.util.concurrent.ExecutorCompletionService;
@@@ -57,163 -53,6 +58,167 @@@ public class TestLeaderLatch extends Ba
      private static final int MAX_LOOPS = 5;
  
      @Test
 +    public void testSessionErrorPolicy() throws Exception
 +    {
 +        Timing timing = new Timing();
 +        LeaderLatch latch = null;
 +        CuratorFramework client = null;
 +        for ( int i = 0; i < 2; ++i )
 +        {
 +            boolean isSessionIteration = (i == 0);
 +            try
 +            {
 +                client = CuratorFrameworkFactory.builder()
 +                    .connectString(server.getConnectString())
 +                    .connectionTimeoutMs(10000)
 +                    .sessionTimeoutMs(60000)
 +                    .retryPolicy(new RetryOneTime(1))
 +                    .errorPolicy(isSessionIteration ? new 
SessionErrorPolicy() : new StandardErrorPolicy())
 +                    .build();
 +                final BlockingQueue<String> states = 
Queues.newLinkedBlockingQueue();
 +                ConnectionStateListener stateListener = new 
ConnectionStateListener()
 +                {
 +                    @Override
 +                    public void stateChanged(CuratorFramework client, 
ConnectionState newState)
 +                    {
 +                        states.add(newState.name());
 +                    }
 +                };
 +                
client.getConnectionStateListenable().addListener(stateListener);
 +                client.start();
 +
 +                latch = new LeaderLatch(client, "/test");
 +                LeaderLatchListener listener = new LeaderLatchListener()
 +                {
 +                    @Override
 +                    public void isLeader()
 +                    {
 +                        states.add("true");
 +                    }
 +
 +                    @Override
 +                    public void notLeader()
 +                    {
 +                        states.add("false");
 +                    }
 +                };
 +                latch.addListener(listener);
 +                latch.start();
 +                
Assert.assertEquals(states.poll(timing.forWaiting().milliseconds(), 
TimeUnit.MILLISECONDS), ConnectionState.CONNECTED.name());
 +                
Assert.assertEquals(states.poll(timing.forWaiting().milliseconds(), 
TimeUnit.MILLISECONDS), "true");
 +                server.stop();
 +                if ( isSessionIteration )
 +                {
 +                    
Assert.assertEquals(states.poll(timing.forWaiting().milliseconds(), 
TimeUnit.MILLISECONDS), ConnectionState.SUSPENDED.name());
 +                    server.restart();
 +                    
Assert.assertEquals(states.poll(timing.forWaiting().milliseconds(), 
TimeUnit.MILLISECONDS), ConnectionState.RECONNECTED.name());
 +                    Assert.assertNull(states.poll(timing.milliseconds(), 
TimeUnit.MILLISECONDS));
 +                }
 +                else
 +                {
 +                    String s = 
states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS);
 +                    Assert.assertTrue("false".equals(s) || 
ConnectionState.SUSPENDED.name().equals(s));
 +                    s = states.poll(timing.forWaiting().milliseconds(), 
TimeUnit.MILLISECONDS);
 +                    Assert.assertTrue("false".equals(s) || 
ConnectionState.SUSPENDED.name().equals(s));
 +                    server.restart();
 +                    
Assert.assertEquals(states.poll(timing.forWaiting().milliseconds(), 
TimeUnit.MILLISECONDS), ConnectionState.RECONNECTED.name());
 +                    
Assert.assertEquals(states.poll(timing.forWaiting().milliseconds(), 
TimeUnit.MILLISECONDS), "true");
 +                }
 +            }
 +            finally
 +            {
 +                CloseableUtils.closeQuietly(latch);
 +                CloseableUtils.closeQuietly(client);
 +            }
 +        }
 +    }
 +
 +    @Test
 +    public void testErrorPolicies() throws Exception
 +    {
 +        Timing timing = new Timing();
 +        LeaderLatch latch = null;
 +        CuratorFramework client = CuratorFrameworkFactory.builder()
 +            .connectString(server.getConnectString())
 +            .connectionTimeoutMs(1000)
 +            .sessionTimeoutMs(timing.session())
 +            .retryPolicy(new RetryOneTime(1))
 +            .errorPolicy(new StandardErrorPolicy())
 +            .build();
 +        try
 +        {
 +            final BlockingQueue<String> states = 
Queues.newLinkedBlockingQueue();
 +            ConnectionStateListener stateListener = new 
ConnectionStateListener()
 +            {
 +                @Override
 +                public void stateChanged(CuratorFramework client, 
ConnectionState newState)
 +                {
 +                    states.add(newState.name());
 +                }
 +            };
 +            client.getConnectionStateListenable().addListener(stateListener);
 +            client.start();
 +            latch = new LeaderLatch(client, "/test");
 +            LeaderLatchListener listener = new LeaderLatchListener()
 +            {
 +                @Override
 +                public void isLeader()
 +                {
 +                    states.add("true");
 +                }
 +
 +                @Override
 +                public void notLeader()
 +                {
 +                    states.add("false");
 +                }
 +            };
 +            latch.addListener(listener);
 +            latch.start();
 +            
Assert.assertEquals(states.poll(timing.forWaiting().milliseconds(), 
TimeUnit.MILLISECONDS), ConnectionState.CONNECTED.name());
 +            
Assert.assertEquals(states.poll(timing.forWaiting().milliseconds(), 
TimeUnit.MILLISECONDS), "true");
 +            server.close();
-             
Assert.assertEquals(states.poll(timing.forWaiting().milliseconds(), 
TimeUnit.MILLISECONDS), ConnectionState.SUSPENDED.name());
-             
Assert.assertEquals(states.poll(timing.forWaiting().milliseconds(), 
TimeUnit.MILLISECONDS), "false");
-             
Assert.assertEquals(states.poll(timing.forWaiting().milliseconds(), 
TimeUnit.MILLISECONDS), ConnectionState.LOST.name());
++            List<String> next = Lists.newArrayList();
++            next.add(states.poll(timing.forWaiting().milliseconds(), 
TimeUnit.MILLISECONDS));
++            next.add(states.poll(timing.forWaiting().milliseconds(), 
TimeUnit.MILLISECONDS));
++            
Assert.assertTrue(next.equals(Arrays.asList(ConnectionState.SUSPENDED.name(), 
"false")) || next.equals(Arrays.asList("false", 
ConnectionState.SUSPENDED.name())), next.toString());
++            
Assert.assertEquals(states.poll(timing.forSessionSleep().milliseconds(), 
TimeUnit.MILLISECONDS), ConnectionState.LOST.name());
 +            latch.close();
 +            client.close();
 +
 +            timing.sleepABit();
 +            states.clear();
 +
 +            server = new TestingServer();
 +            client = CuratorFrameworkFactory.builder()
 +                .connectString(server.getConnectString())
 +                .connectionTimeoutMs(1000)
 +                .sessionTimeoutMs(timing.session())
 +                .retryPolicy(new RetryOneTime(1))
 +                .errorPolicy(new SessionErrorPolicy())
 +                .build();
 +            client.getConnectionStateListenable().addListener(stateListener);
 +            client.start();
 +            latch = new LeaderLatch(client, "/test");
 +            latch.addListener(listener);
 +            latch.start();
 +            
Assert.assertEquals(states.poll(timing.forWaiting().milliseconds(), 
TimeUnit.MILLISECONDS), ConnectionState.CONNECTED.name());
 +            
Assert.assertEquals(states.poll(timing.forWaiting().milliseconds(), 
TimeUnit.MILLISECONDS), "true");
 +            server.close();
 +            
Assert.assertEquals(states.poll(timing.forWaiting().milliseconds(), 
TimeUnit.MILLISECONDS), ConnectionState.SUSPENDED.name());
-             
Assert.assertEquals(states.poll(timing.forWaiting().milliseconds(), 
TimeUnit.MILLISECONDS), ConnectionState.LOST.name());
-             
Assert.assertEquals(states.poll(timing.forWaiting().milliseconds(), 
TimeUnit.MILLISECONDS), "false");
++            next = Lists.newArrayList();
++            next.add(states.poll(timing.forSessionSleep().milliseconds(), 
TimeUnit.MILLISECONDS));
++            next.add(states.poll(timing.forSessionSleep().milliseconds(), 
TimeUnit.MILLISECONDS));
++            
Assert.assertTrue(next.equals(Arrays.asList(ConnectionState.LOST.name(), 
"false")) || next.equals(Arrays.asList("false", ConnectionState.LOST.name())), 
next.toString());
 +        }
 +        finally
 +        {
 +            CloseableUtils.closeQuietly(latch);
 +            CloseableUtils.closeQuietly(client);
 +        }
 +    }
 +
 +    @Test
      public void testProperCloseWithoutConnectionEstablished() throws Exception
      {
          server.stop();

http://git-wip-us.apache.org/repos/asf/curator/blob/c117b085/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelector.java
----------------------------------------------------------------------
diff --cc 
curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelector.java
index ae19b3c,c7f415c..e5b9717
--- 
a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelector.java
+++ 
b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelector.java
@@@ -20,7 -20,6 +20,8 @@@
  package org.apache.curator.framework.recipes.leader;
  
  import com.google.common.collect.Lists;
 +import com.google.common.collect.Queues;
++import org.apache.curator.connection.StandardConnectionHandlingPolicy;
  import org.apache.curator.framework.CuratorFramework;
  import org.apache.curator.framework.CuratorFrameworkFactory;
  import org.apache.curator.framework.state.ConnectionState;
@@@ -53,93 -50,6 +54,93 @@@ public class TestLeaderSelector extend
      private static final String PATH_NAME = "/one/two/me";
  
      @Test
 +    public void testErrorPolicies() throws Exception
 +    {
 +        Timing timing = new Timing();
 +        LeaderSelector selector = null;
 +        CuratorFramework client = CuratorFrameworkFactory
 +            .builder()
 +            .connectString(server.getConnectString())
 +            .connectionTimeoutMs(timing.connection())
 +            .sessionTimeoutMs(timing.session())
 +            .retryPolicy(new RetryOneTime(1))
 +            .errorPolicy(new StandardErrorPolicy())
 +            .build();
 +        try
 +        {
 +            final BlockingQueue<String> changes = 
Queues.newLinkedBlockingQueue();
 +
 +            ConnectionStateListener stateListener = new 
ConnectionStateListener()
 +            {
 +                @Override
 +                public void stateChanged(CuratorFramework client, 
ConnectionState newState)
 +                {
 +                    changes.add(newState.name());
 +                }
 +            };
 +            client.getConnectionStateListenable().addListener(stateListener);
 +            client.start();
 +            LeaderSelectorListener listener = new 
LeaderSelectorListenerAdapter()
 +            {
 +                @Override
 +                public void takeLeadership(CuratorFramework client) throws 
Exception
 +                {
 +                    changes.add("leader");
 +                    try
 +                    {
 +                        Thread.currentThread().join();
 +                    }
 +                    catch ( InterruptedException e )
 +                    {
 +                        changes.add("release");
 +                        Thread.currentThread().interrupt();
 +                    }
 +                }
 +            };
 +            selector = new LeaderSelector(client, "/test", listener);
 +            selector.start();
 +
 +            
Assert.assertEquals(changes.poll(timing.forWaiting().milliseconds(), 
TimeUnit.MILLISECONDS), ConnectionState.CONNECTED.name());
 +            
Assert.assertEquals(changes.poll(timing.forWaiting().milliseconds(), 
TimeUnit.MILLISECONDS), "leader");
 +            server.close();
 +            
Assert.assertEquals(changes.poll(timing.forWaiting().milliseconds(), 
TimeUnit.MILLISECONDS), ConnectionState.SUSPENDED.name());
 +            
Assert.assertEquals(changes.poll(timing.forWaiting().milliseconds(), 
TimeUnit.MILLISECONDS), "release");
-             
Assert.assertEquals(changes.poll(timing.forWaiting().milliseconds(), 
TimeUnit.MILLISECONDS), ConnectionState.LOST.name());
++            
Assert.assertEquals(changes.poll(timing.forSessionSleep().milliseconds(), 
TimeUnit.MILLISECONDS), ConnectionState.LOST.name());
 +
 +            selector.close();
 +            client.close();
 +            timing.sleepABit();
 +            changes.clear();
 +
 +            server = new TestingServer();
 +            client = CuratorFrameworkFactory
 +                .builder()
 +                .connectString(server.getConnectString())
 +                .connectionTimeoutMs(timing.connection())
 +                .sessionTimeoutMs(timing.session())
 +                .retryPolicy(new RetryOneTime(1))
 +                .errorPolicy(new SessionErrorPolicy())
 +                .build();
 +            client.getConnectionStateListenable().addListener(stateListener);
 +            client.start();
 +            selector = new LeaderSelector(client, "/test", listener);
 +            selector.start();
 +
 +            
Assert.assertEquals(changes.poll(timing.forWaiting().milliseconds(), 
TimeUnit.MILLISECONDS), ConnectionState.CONNECTED.name());
 +            
Assert.assertEquals(changes.poll(timing.forWaiting().milliseconds(), 
TimeUnit.MILLISECONDS), "leader");
 +            server.stop();
 +            
Assert.assertEquals(changes.poll(timing.forWaiting().milliseconds(), 
TimeUnit.MILLISECONDS), ConnectionState.SUSPENDED.name());
-             
Assert.assertEquals(changes.poll(timing.forWaiting().milliseconds(), 
TimeUnit.MILLISECONDS), ConnectionState.LOST.name());
++            
Assert.assertEquals(changes.poll(timing.forSessionSleep().milliseconds(), 
TimeUnit.MILLISECONDS), ConnectionState.LOST.name());
 +            
Assert.assertEquals(changes.poll(timing.forWaiting().milliseconds(), 
TimeUnit.MILLISECONDS), "release");
 +        }
 +        finally
 +        {
 +            CloseableUtils.closeQuietly(selector);
 +            CloseableUtils.closeQuietly(client);
 +        }
 +    }
 +
 +    @Test
      public void testLeaderNodeDeleteOnInterrupt() throws Exception
      {
          Timing timing = new Timing();

http://git-wip-us.apache.org/repos/asf/curator/blob/c117b085/src/site/confluence/errors.confluence
----------------------------------------------------------------------

Reply via email to