If there is a network event after the semaphore's node is created but before getChildren() is called, the previous implementation would orphan the newly created node causing a deadlock later on
Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/8dc0283e Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/8dc0283e Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/8dc0283e Branch: refs/heads/CURATOR-3.0 Commit: 8dc0283e02ed799d5d76303f370ccc6325662b83 Parents: 5584a61 Author: randgalt <randg...@apache.org> Authored: Thu Jun 2 17:27:18 2016 -0500 Committer: randgalt <randg...@apache.org> Committed: Thu Jun 2 17:27:18 2016 -0500 ---------------------------------------------------------------------- .../recipes/locks/InterProcessSemaphoreV2.java | 24 ++- .../locks/TestInterProcessSemaphore.java | 167 +++++++++++++++++-- 2 files changed, 173 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/8dc0283e/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java index 3d96be2..2b9d48d 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java @@ -44,6 +44,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.List; import java.util.Set; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import org.apache.curator.utils.PathUtils; @@ -327,6 +328,9 @@ public class InterProcessSemaphoreV2 RETRY_DUE_TO_MISSING_NODE } + static volatile CountDownLatch debugAcquireLatch = null; + static volatile CountDownLatch debugFailedGetChildrenLatch = null; + private InternalAcquireResult internalAcquire1Lease(ImmutableList.Builder<Lease> builder, long startMs, boolean hasWait, long waitMs) throws Exception { if ( client.getState() != CuratorFrameworkState.STARTED ) @@ -356,11 +360,29 @@ public class InterProcessSemaphoreV2 String nodeName = ZKPaths.getNodeFromPath(path); lease = makeLease(path); + if ( debugAcquireLatch != null ) + { + debugAcquireLatch.await(); + } + synchronized(this) { for(;;) { - List<String> children = client.getChildren().usingWatcher(watcher).forPath(leasesPath); + List<String> children; + try + { + children = client.getChildren().usingWatcher(watcher).forPath(leasesPath); + } + catch ( Exception e ) + { + if ( debugFailedGetChildrenLatch != null ) + { + debugFailedGetChildrenLatch.countDown(); + } + returnLease(lease); // otherwise the just created ZNode will be orphaned causing a dead lock + throw e; + } if ( !children.contains(nodeName) ) { log.error("Sequential path not found: " + path); http://git-wip-us.apache.org/repos/asf/curator/blob/8dc0283e/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java index ad45d90..216c2a2 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java @@ -20,14 +20,19 @@ package org.apache.curator.framework.recipes.locks; import com.google.common.collect.Lists; -import org.apache.curator.framework.api.CuratorWatcher; -import org.apache.curator.test.BaseClassForTests; -import org.apache.curator.utils.CloseableUtils; +import com.google.common.collect.Queues; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.api.CuratorWatcher; import org.apache.curator.framework.recipes.shared.SharedCount; +import org.apache.curator.framework.state.ConnectionState; +import org.apache.curator.framework.state.ConnectionStateListener; +import org.apache.curator.retry.RetryNTimes; import org.apache.curator.retry.RetryOneTime; +import org.apache.curator.test.BaseClassForTests; import org.apache.curator.test.Timing; +import org.apache.curator.utils.CloseableUtils; +import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.testng.Assert; @@ -35,6 +40,7 @@ import org.testng.annotations.Test; import java.util.Collection; import java.util.List; import java.util.Random; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorCompletionService; @@ -49,6 +55,126 @@ import java.util.concurrent.atomic.AtomicInteger; public class TestInterProcessSemaphore extends BaseClassForTests { @Test + public void testAcquireAfterLostServer() throws Exception + { + // CURATOR-335 + + final String SEMAPHORE_PATH = "/test"; + final int MAX_SEMAPHORES = 1; + final int NUM_CLIENTS = 10; + + ExecutorService executor = Executors.newFixedThreadPool(NUM_CLIENTS); + + final Timing timing = new Timing(); + + final CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.forWaiting().milliseconds(), timing.connection(), new RetryOneTime(1)); // long session time on purpose + try + { + client.start(); + + InterProcessSemaphoreV2.debugAcquireLatch = new CountDownLatch(1); // cause one of the semaphores to create its node and then wait + InterProcessSemaphoreV2.debugFailedGetChildrenLatch = new CountDownLatch(1); // semaphore will notify when getChildren() fails + final CountDownLatch isReadyLatch = new CountDownLatch(NUM_CLIENTS); + final BlockingQueue<Boolean> acquiredQueue = Queues.newLinkedBlockingQueue(); + Runnable runner = new Runnable() + { + @Override + public void run() + { + while ( !Thread.currentThread().isInterrupted() ) + { + InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, SEMAPHORE_PATH, MAX_SEMAPHORES); + Lease lease = null; + try + { + isReadyLatch.countDown(); + lease = semaphore.acquire(); + acquiredQueue.add(true); + timing.sleepABit(); + } + catch ( InterruptedException e ) + { + Thread.currentThread().interrupt(); + break; + } + catch ( KeeperException e ) + { + try + { + timing.sleepABit(); + } + catch ( InterruptedException e2 ) + { + Thread.currentThread().interrupt(); + break; + } + } + catch ( Exception ignore ) + { + // ignore + } + finally + { + if ( lease != null ) + { + semaphore.returnLease(lease); + } + } + } + } + }; + for ( int i = 0; i < NUM_CLIENTS; ++i ) + { + executor.execute(runner); + } + Assert.assertTrue(timing.awaitLatch(isReadyLatch)); + timing.sleepABit(); + + final CountDownLatch lostLatch = new CountDownLatch(1); + final CountDownLatch restartedLatch = new CountDownLatch(1); + client.getConnectionStateListenable().addListener(new ConnectionStateListener() + { + @Override + public void stateChanged(CuratorFramework client, ConnectionState newState) + { + if ( newState == ConnectionState.LOST ) + { + lostLatch.countDown(); + } + else if ( newState == ConnectionState.RECONNECTED ) + { + restartedLatch.countDown(); + } + } + }); + + timing.sleepABit(); + server.stop(); + Assert.assertTrue(timing.awaitLatch(lostLatch)); + InterProcessSemaphoreV2.debugAcquireLatch.countDown(); // the waiting semaphore proceeds to getChildren - which should fail + Assert.assertTrue(timing.awaitLatch(InterProcessSemaphoreV2.debugFailedGetChildrenLatch)); // wait until getChildren fails + + server.restart(); + + Assert.assertTrue(timing.awaitLatch(restartedLatch)); + for ( int i = 0; i < NUM_CLIENTS; ++i ) + { + // acquires should continue as normal after server restart + Boolean polled = acquiredQueue.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS); + if ( (polled == null) || !polled ) + { + Assert.fail("Semaphores not reacquired after restart"); + } + } + } + finally + { + executor.shutdownNow(); + CloseableUtils.closeQuietly(client); + } + } + + @Test public void testThreadedLeaseIncrease() throws Exception { final Timing timing = new Timing(); @@ -551,13 +677,13 @@ public class TestInterProcessSemaphore extends BaseClassForTests Assert.assertTrue(client.getChildren().forPath("/test").size() > 0); childReaper = new ChildReaper( - client, - "/test", - Reaper.Mode.REAP_UNTIL_GONE, - ChildReaper.newExecutorService(), - 1, - "/test-leader", - InterProcessSemaphoreV2.LOCK_SCHEMA + client, + "/test", + Reaper.Mode.REAP_UNTIL_GONE, + ChildReaper.newExecutorService(), + 1, + "/test-leader", + InterProcessSemaphoreV2.LOCK_SCHEMA ); childReaper.start(); @@ -591,18 +717,23 @@ public class TestInterProcessSemaphore extends BaseClassForTests Assert.assertEquals(childNodes.size(), 1); final CountDownLatch nodeCreatedLatch = new CountDownLatch(1); - client.getChildren().usingWatcher(new CuratorWatcher() { + client.getChildren().usingWatcher(new CuratorWatcher() + { @Override - public void process(WatchedEvent event) throws Exception { - if (event.getType() == Watcher.Event.EventType.NodeCreated) { + public void process(WatchedEvent event) throws Exception + { + if ( event.getType() == Watcher.Event.EventType.NodeCreated ) + { nodeCreatedLatch.countDown(); } } }).forPath("/test/leases"); - final Future<Lease> leaseFuture = executor.submit(new Callable<Lease>() { + final Future<Lease> leaseFuture = executor.submit(new Callable<Lease>() + { @Override - public Lease call() throws Exception { + public Lease call() throws Exception + { return semaphore.acquire(timing.forWaiting().multiple(2).seconds(), TimeUnit.SECONDS); } }); @@ -610,8 +741,10 @@ public class TestInterProcessSemaphore extends BaseClassForTests // wait for second lease to create its node timing.awaitLatch(nodeCreatedLatch); String newNode = null; - for (String c : client.getChildren().forPath("/test/leases")) { - if (!childNodes.contains(c)) { + for ( String c : client.getChildren().forPath("/test/leases") ) + { + if ( !childNodes.contains(c) ) + { newNode = c; } }