Repository: curator
Updated Branches:
  refs/heads/CURATOR-468 [created] ddfcbc1e3


CURATOR-468

Adds a test to what #279 had along with a slightly more complete fix. The 
finally clause of doWork() releases the mutex. In an interrupted situation this 
would always fail causing the failed delete manager to handle the work 
resulting in slightly slower deletions and spurious log messaged. This fixes 
this by clearing the thread interrupted state prior to releasing the mutex and 
then resetting afterwards if needed.


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/97e76919
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/97e76919
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/97e76919

Branch: refs/heads/CURATOR-468
Commit: 97e76919ae0056985aacc786d6c440fb779a6877
Parents: d13a12f
Author: randgalt <randg...@apache.org>
Authored: Thu Dec 6 12:40:43 2018 -0500
Committer: randgalt <randg...@apache.org>
Committed: Thu Dec 6 12:40:43 2018 -0500

----------------------------------------------------------------------
 .../recipes/leader/LeaderSelector.java          | 27 +++++++++--
 .../recipes/leader/TestLeaderSelector.java      | 50 +++++++++++++++++++-
 2 files changed, 71 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/97e76919/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderSelector.java
----------------------------------------------------------------------
diff --git 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderSelector.java
 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderSelector.java
index 6ad1053..e505c91 100644
--- 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderSelector.java
+++ 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderSelector.java
@@ -46,6 +46,7 @@ import java.util.concurrent.Future;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.curator.utils.PathUtils;
 
@@ -304,7 +305,7 @@ public class LeaderSelector implements Closeable
         for ( String path : participantNodes )
         {
             Participant participant = participantForPath(client, path, 
isLeader);
-            
+
             if( participant != null )
             {
                 builder.add(participant);
@@ -340,26 +341,26 @@ public class LeaderSelector implements Closeable
     static Participant getLeader(CuratorFramework client, Collection<String> 
participantNodes) throws Exception
     {
         Participant result = null;
-        
+
         if ( participantNodes.size() > 0 )
         {
             Iterator<String> iter = participantNodes.iterator();
             while ( iter.hasNext() )
             {
                 result = participantForPath(client, iter.next(), true);
-                
+
                 if ( result != null )
                 {
                     break;
                 }
             }
         }
-        
+
         if( result == null )
         {
             result = new Participant();
         }
-        
+
         return result;
     }
 
@@ -400,6 +401,9 @@ public class LeaderSelector implements Closeable
     }
 
     @VisibleForTesting
+    volatile AtomicInteger failedMutexReleaseCount = null;
+
+    @VisibleForTesting
     void doWork() throws Exception
     {
         hasLeadership = false;
@@ -444,16 +448,29 @@ public class LeaderSelector implements Closeable
             if ( hasLeadership )
             {
                 hasLeadership = false;
+                boolean wasInterrupted = Thread.interrupted();
                 try
                 {
                     mutex.release();
                 }
                 catch ( Exception e )
                 {
+                    if ( failedMutexReleaseCount != null )
+                    {
+                        failedMutexReleaseCount.incrementAndGet();
+                    }
+
                     ThreadUtils.checkInterrupted(e);
                     log.error("The leader threw an exception", e);
                     // ignore errors - this is just a safety
                 }
+                finally
+                {
+                    if ( wasInterrupted )
+                    {
+                        Thread.currentThread().interrupt();
+                    }
+                }
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/curator/blob/97e76919/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelector.java
----------------------------------------------------------------------
diff --git 
a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelector.java
 
b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelector.java
index 808ff8f..9a5e42e 100644
--- 
a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelector.java
+++ 
b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelector.java
@@ -24,19 +24,23 @@ import com.google.common.collect.Queues;
 import com.google.common.collect.Sets;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.imps.CuratorFrameworkImpl;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
 import org.apache.curator.framework.state.SessionConnectionStateErrorPolicy;
 import org.apache.curator.framework.state.StandardConnectionStateErrorPolicy;
 import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.BaseClassForTests;
-import org.apache.curator.test.compatibility.KillSession2;
 import org.apache.curator.test.TestingServer;
 import org.apache.curator.test.Timing;
+import org.apache.curator.test.compatibility.KillSession2;
 import org.apache.curator.test.compatibility.Timing2;
 import org.apache.curator.utils.CloseableUtils;
+import org.apache.curator.utils.ZKPaths;
 import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
+import java.lang.reflect.Array;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Set;
@@ -55,6 +59,50 @@ public class TestLeaderSelector extends BaseClassForTests
     private static final String PATH_NAME = "/one/two/me";
 
     @Test
+    public void testInterruption() throws Exception
+    {
+        Timing2 timing = new Timing2();
+        LeaderSelector selector = null;
+        CuratorFramework client = 
CuratorFrameworkFactory.newClient(server.getConnectString(), new 
RetryOneTime(1));
+        try
+        {
+            client.start();
+
+            CountDownLatch exitLatch = new CountDownLatch(1);
+            BlockingQueue<Thread> threadExchange = new ArrayBlockingQueue<>(1);
+            LeaderSelectorListener listener = new 
LeaderSelectorListenerAdapter()
+            {
+                @Override
+                public void takeLeadership(CuratorFramework client) throws 
Exception
+                {
+                    threadExchange.put(Thread.currentThread());
+                    try
+                    {
+                        Thread.currentThread().join();
+                    }
+                    finally
+                    {
+                        exitLatch.countDown();
+                    }
+                }
+            };
+            selector = new LeaderSelector(client, PATH_NAME, listener);
+            selector.failedMutexReleaseCount = new AtomicInteger();
+            selector.start();
+            Thread leaderThread = timing.takeFromQueue(threadExchange);
+            leaderThread.interrupt();
+            Assert.assertTrue(timing.awaitLatch(exitLatch));
+            timing.sleepABit(); // wait for leader selector to clear nodes
+            Assert.assertEquals(0, selector.failedMutexReleaseCount.get());
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(selector);
+            CloseableUtils.closeQuietly(client);
+        }
+    }
+
+    @Test
     public void testErrorPolicies() throws Exception
     {
         Timing2 timing = new Timing2();

Reply via email to