Another edge case found by Evaristo. When the SUSPEND is set, a background sync 
is executed to detect LOST. If the ZK connection is reset while this command is 
processing an incorrect LOST might get set. Instead, keep track of ZooKeeper 
instance reset. If there is a reset ignore the background sync failure and 
re-submit the sync to test again.


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/0359bc5a
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/0359bc5a
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/0359bc5a

Branch: refs/heads/master
Commit: 0359bc5ab683285f44523d1445ef2eb8116380c4
Parents: 75acb0d
Author: randgalt <randg...@apache.org>
Authored: Sun Jan 12 16:53:03 2014 -0500
Committer: randgalt <randg...@apache.org>
Committed: Sun Jan 12 16:54:17 2014 -0500

----------------------------------------------------------------------
 .../org/apache/curator/ConnectionState.java     |   9 ++
 .../apache/curator/CuratorZookeeperClient.java  |  11 ++
 .../framework/imps/CuratorFrameworkImpl.java    |  17 ++-
 ...estResetConnectionWithBackgroundFailure.java | 104 +++++++++++++++++++
 4 files changed, 139 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/0359bc5a/curator-client/src/main/java/org/apache/curator/ConnectionState.java
----------------------------------------------------------------------
diff --git 
a/curator-client/src/main/java/org/apache/curator/ConnectionState.java 
b/curator-client/src/main/java/org/apache/curator/ConnectionState.java
index e02ee88..4978c3f 100644
--- a/curator-client/src/main/java/org/apache/curator/ConnectionState.java
+++ b/curator-client/src/main/java/org/apache/curator/ConnectionState.java
@@ -34,6 +34,7 @@ import java.io.IOException;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
 class ConnectionState implements Watcher, Closeable
@@ -49,6 +50,7 @@ class ConnectionState implements Watcher, Closeable
     private final AtomicReference<TracerDriver> tracer;
     private final Queue<Exception> backgroundExceptions = new 
ConcurrentLinkedQueue<Exception>();
     private final Queue<Watcher> parentWatchers = new 
ConcurrentLinkedQueue<Watcher>();
+    private final AtomicLong instanceIndex = new AtomicLong();
     private volatile long connectionStartMs = 0;
 
     ConnectionState(ZookeeperFactory zookeeperFactory, EnsembleProvider 
ensembleProvider, int sessionTimeoutMs, int connectionTimeoutMs, Watcher 
parentWatcher, AtomicReference<TracerDriver> tracer, boolean canBeReadOnly)
@@ -131,6 +133,11 @@ class ConnectionState implements Watcher, Closeable
         parentWatchers.remove(watcher);
     }
 
+    long getInstanceIndex()
+    {
+        return instanceIndex.get();
+    }
+
     @Override
     public void process(WatchedEvent event)
     {
@@ -204,6 +211,8 @@ class ConnectionState implements Watcher, Closeable
     {
         log.debug("reset");
 
+        instanceIndex.incrementAndGet();
+
         isConnected.set(false);
         connectionStartMs = System.currentTimeMillis();
         zooKeeper.closeAndReset();

http://git-wip-us.apache.org/repos/asf/curator/blob/0359bc5a/curator-client/src/main/java/org/apache/curator/CuratorZookeeperClient.java
----------------------------------------------------------------------
diff --git 
a/curator-client/src/main/java/org/apache/curator/CuratorZookeeperClient.java 
b/curator-client/src/main/java/org/apache/curator/CuratorZookeeperClient.java
index f4e56f9..f0a4ab3 100644
--- 
a/curator-client/src/main/java/org/apache/curator/CuratorZookeeperClient.java
+++ 
b/curator-client/src/main/java/org/apache/curator/CuratorZookeeperClient.java
@@ -279,6 +279,17 @@ public class CuratorZookeeperClient implements Closeable
         return connectionTimeoutMs;
     }
 
+    /**
+     * Every time a new {@link ZooKeeper} instance is allocated, the "instance 
index"
+     * is incremented.
+     *
+     * @return the current instance index
+     */
+    public long getInstanceIndex()
+    {
+        return state.getInstanceIndex();
+    }
+
     void        addParentWatcher(Watcher watcher)
     {
         state.addParentWatcher(watcher);

http://git-wip-us.apache.org/repos/asf/curator/blob/0359bc5a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
----------------------------------------------------------------------
diff --git 
a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
 
b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
index 1b0ef3f..f1258ea 100644
--- 
a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
+++ 
b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
@@ -606,14 +606,27 @@ public class CuratorFrameworkImpl implements 
CuratorFramework
     {
         connectionStateManager.setToSuspended();
 
+        final long instanceIndex = client.getInstanceIndex();
+
         // we appear to have disconnected, force a new ZK event and see if we 
can connect to another server
-        BackgroundOperation<String> operation = new BackgroundSyncImpl(this, 
null);
+        final BackgroundOperation<String> operation = new 
BackgroundSyncImpl(this, null);
         OperationAndData.ErrorCallback<String> errorCallback = new 
OperationAndData.ErrorCallback<String>()
         {
             @Override
             public void retriesExhausted(OperationAndData<String> 
operationAndData)
             {
-                connectionStateManager.addStateChange(ConnectionState.LOST);
+                // if instanceIndex != newInstanceIndex, the ZooKeeper 
instance was reset/reallocated
+                // so the pending background sync is no longer valid
+                long newInstanceIndex = client.getInstanceIndex();
+                if ( instanceIndex == newInstanceIndex )
+                {
+                    
connectionStateManager.addStateChange(ConnectionState.LOST);
+                }
+                else
+                {
+                    log.debug("suspendConnection() failure ignored as the 
ZooKeeper instance was reset. Retrying.");
+                    performBackgroundOperation(new 
OperationAndData<String>(operation, "/", null, this, null));
+                }
             }
         };
         performBackgroundOperation(new OperationAndData<String>(operation, 
"/", null, errorCallback, null));

http://git-wip-us.apache.org/repos/asf/curator/blob/0359bc5a/curator-recipes/src/test/java/org/apache/curator/framework/client/TestResetConnectionWithBackgroundFailure.java
----------------------------------------------------------------------
diff --git 
a/curator-recipes/src/test/java/org/apache/curator/framework/client/TestResetConnectionWithBackgroundFailure.java
 
b/curator-recipes/src/test/java/org/apache/curator/framework/client/TestResetConnectionWithBackgroundFailure.java
new file mode 100644
index 0000000..e634a6d
--- /dev/null
+++ 
b/curator-recipes/src/test/java/org/apache/curator/framework/client/TestResetConnectionWithBackgroundFailure.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.client;
+
+import com.google.common.io.Closeables;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.recipes.BaseClassForTests;
+import org.apache.curator.framework.recipes.leader.LeaderSelector;
+import org.apache.curator.framework.recipes.leader.LeaderSelectorListener;
+import 
org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.TestingServer;
+import org.apache.curator.test.Timing;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestResetConnectionWithBackgroundFailure extends BaseClassForTests
+{
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    @Test
+    public void testConnectionStateListener() throws Exception
+    {
+        server.close();
+
+        final StringBuilder listenerSequence = new StringBuilder();
+        LeaderSelector selector = null;
+        Timing timing = new Timing();
+        CuratorFramework client = 
CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), 
timing.connection(), new RetryOneTime(100));
+        try
+        {
+            client.start();
+            timing.sleepABit();
+
+            LeaderSelectorListener listenerLeader = new 
LeaderSelectorListenerAdapter()
+            {
+                @Override
+                public void takeLeadership(CuratorFramework client) throws 
Exception
+                {
+                    Thread.currentThread().join();
+                }
+            };
+            selector = new LeaderSelector(client, "/leader", listenerLeader);
+            selector.autoRequeue();
+            selector.start();
+
+            ConnectionStateListener listener1 = new ConnectionStateListener()
+            {
+                @Override
+                public void stateChanged(CuratorFramework client, 
ConnectionState newState)
+                {
+                    listenerSequence.append("-").append(newState);
+                }
+            };
+
+            client.getConnectionStateListenable().addListener(listener1);
+            log.debug("Starting ZK server");
+            server = new TestingServer(server.getPort());
+            timing.forWaiting().sleepABit();
+
+            log.debug("Stopping ZK server");
+            server.close();
+            timing.forWaiting().sleepABit();
+
+            log.debug("Starting ZK server");
+            server = new TestingServer(server.getPort());
+            timing.forWaiting().sleepABit();
+
+            log.debug("Stopping ZK server");
+            server.close();
+            timing.forWaiting().sleepABit();
+
+            Assert.assertEquals(listenerSequence.toString(), 
"-CONNECTED-SUSPENDED-LOST-RECONNECTED-SUSPENDED-LOST");
+        }
+        finally
+        {
+            Closeables.closeQuietly(selector);
+            Closeables.closeQuietly(client);
+        }
+    }
+
+}
\ No newline at end of file

Reply via email to