Guaranteed deletes must accept InterruptedException as well as retryable exceptions
Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/989f9414 Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/989f9414 Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/989f9414 Branch: refs/heads/CURATOR-160 Commit: 989f94148faae97f23368e4b5bae2f1f03eaa62c Parents: 6a56c51 Author: randgalt <randg...@apache.org> Authored: Sun Apr 19 13:13:45 2015 -0500 Committer: randgalt <randg...@apache.org> Committed: Sun Apr 19 13:13:45 2015 -0500 ---------------------------------------------------------------------- .../framework/imps/DeleteBuilderImpl.java | 110 ++++++++++--------- .../recipes/leader/TestLeaderSelector.java | 67 +++++++++++ 2 files changed, 127 insertions(+), 50 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/989f9414/curator-framework/src/main/java/org/apache/curator/framework/imps/DeleteBuilderImpl.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/DeleteBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/DeleteBuilderImpl.java index 5d8b846..c067357 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/DeleteBuilderImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/DeleteBuilderImpl.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -16,6 +16,7 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.curator.framework.imps; import org.apache.curator.RetryLoop; @@ -40,11 +41,11 @@ import java.util.concurrent.Executor; class DeleteBuilderImpl implements DeleteBuilder, BackgroundOperation<String> { - private final CuratorFrameworkImpl client; - private int version; - private Backgrounding backgrounding; - private boolean deletingChildrenIfNeeded; - private boolean guaranteed; + private final CuratorFrameworkImpl client; + private int version; + private Backgrounding backgrounding; + private boolean deletingChildrenIfNeeded; + private boolean guaranteed; DeleteBuilderImpl(CuratorFrameworkImpl client) { @@ -55,14 +56,14 @@ class DeleteBuilderImpl implements DeleteBuilder, BackgroundOperation<String> guaranteed = false; } - TransactionDeleteBuilder asTransactionDeleteBuilder(final CuratorTransactionImpl curatorTransaction, final CuratorMultiTransactionRecord transaction) + TransactionDeleteBuilder asTransactionDeleteBuilder(final CuratorTransactionImpl curatorTransaction, final CuratorMultiTransactionRecord transaction) { return new TransactionDeleteBuilder() { @Override public CuratorTransactionBridge forPath(String path) throws Exception { - String fixedPath = client.fixForNamespace(path); + String fixedPath = client.fixForNamespace(path); transaction.add(Op.delete(fixedPath, version), OperationType.DELETE, path); return curatorTransaction; } @@ -142,32 +143,35 @@ class DeleteBuilderImpl implements DeleteBuilder, BackgroundOperation<String> @Override public void performBackgroundOperation(final OperationAndData<String> operationAndData) throws Exception { - final TimeTrace trace = client.getZookeeperClient().startTracer("DeleteBuilderImpl-Background"); + final TimeTrace trace = client.getZookeeperClient().startTracer("DeleteBuilderImpl-Background"); client.getZooKeeper().delete - ( - operationAndData.getData(), - version, - new AsyncCallback.VoidCallback() - { - @Override - public void processResult(int rc, String path, Object ctx) + ( + operationAndData.getData(), + version, + new AsyncCallback.VoidCallback() { - trace.commit(); - if ((rc == KeeperException.Code.NOTEMPTY.intValue()) && deletingChildrenIfNeeded) { - backgroundDeleteChildrenThenNode(operationAndData); - } else { - CuratorEvent event = new CuratorEventImpl(client, CuratorEventType.DELETE, rc, path, null, ctx, null, null, null, null, null); - client.processBackgroundOperation(operationAndData, event); + @Override + public void processResult(int rc, String path, Object ctx) + { + trace.commit(); + if ( (rc == KeeperException.Code.NOTEMPTY.intValue()) && deletingChildrenIfNeeded ) + { + backgroundDeleteChildrenThenNode(operationAndData); + } + else + { + CuratorEvent event = new CuratorEventImpl(client, CuratorEventType.DELETE, rc, path, null, ctx, null, null, null, null, null); + client.processBackgroundOperation(operationAndData, event); + } } - } - }, - backgrounding.getContext() - ); + }, + backgrounding.getContext() + ); } private void backgroundDeleteChildrenThenNode(final OperationAndData<String> mainOperationAndData) { - BackgroundOperation<String> operation = new BackgroundOperation<String>() + BackgroundOperation<String> operation = new BackgroundOperation<String>() { @Override public void performBackgroundOperation(OperationAndData<String> dummy) throws Exception @@ -190,12 +194,12 @@ class DeleteBuilderImpl implements DeleteBuilder, BackgroundOperation<String> @Override public Void forPath(String path) throws Exception { - final String unfixedPath = path; + final String unfixedPath = path; path = client.fixForNamespace(path); if ( backgrounding.inBackground() ) { - OperationAndData.ErrorCallback<String> errorCallback = null; + OperationAndData.ErrorCallback<String> errorCallback = null; if ( guaranteed ) { errorCallback = new OperationAndData.ErrorCallback<String>() @@ -223,35 +227,41 @@ class DeleteBuilderImpl implements DeleteBuilder, BackgroundOperation<String> private void pathInForeground(final String path, String unfixedPath) throws Exception { - TimeTrace trace = client.getZookeeperClient().startTracer("DeleteBuilderImpl-Foreground"); + TimeTrace trace = client.getZookeeperClient().startTracer("DeleteBuilderImpl-Foreground"); try { RetryLoop.callWithRetry - ( - client.getZookeeperClient(), - new Callable<Void>() - { - @Override - public Void call() throws Exception + ( + client.getZookeeperClient(), + new Callable<Void>() { - try { - client.getZooKeeper().delete(path, version); - } catch (KeeperException.NotEmptyException e) { - if (deletingChildrenIfNeeded) { - ZKPaths.deleteChildren(client.getZooKeeper(), path, true); - } else { - throw e; + @Override + public Void call() throws Exception + { + try + { + client.getZooKeeper().delete(path, version); } + catch ( KeeperException.NotEmptyException e ) + { + if ( deletingChildrenIfNeeded ) + { + ZKPaths.deleteChildren(client.getZooKeeper(), path, true); + } + else + { + throw e; + } + } + return null; } - return null; } - } - ); - } + ); + } catch ( Exception e ) { //Only retry a guaranteed delete if it's a retryable error - if( RetryLoop.isRetryException(e) && guaranteed ) + if( (RetryLoop.isRetryException(e) || (e instanceof InterruptedException)) && guaranteed ) { client.getFailedDeleteManager().addFailedDelete(unfixedPath); } http://git-wip-us.apache.org/repos/asf/curator/blob/989f9414/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelector.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelector.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelector.java index ec909f7..c7f415c 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelector.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelector.java @@ -23,6 +23,7 @@ import com.google.common.collect.Lists; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.state.ConnectionState; +import org.apache.curator.framework.state.ConnectionStateListener; import org.apache.curator.retry.RetryOneTime; import org.apache.curator.test.BaseClassForTests; import org.apache.curator.test.KillSession; @@ -34,6 +35,7 @@ import org.testng.annotations.Test; import org.testng.internal.annotations.Sets; import java.util.List; import java.util.Set; +import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; @@ -48,6 +50,71 @@ public class TestLeaderSelector extends BaseClassForTests private static final String PATH_NAME = "/one/two/me"; @Test + public void testLeaderNodeDeleteOnInterrupt() throws Exception + { + Timing timing = new Timing(); + LeaderSelector selector = null; + CuratorFramework client = null; + try + { + client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)); + final CountDownLatch reconnectedLatch = new CountDownLatch(1); + ConnectionStateListener connectionStateListener = new ConnectionStateListener() + { + @Override + public void stateChanged(CuratorFramework client, ConnectionState newState) + { + if ( newState == ConnectionState.RECONNECTED ) + { + reconnectedLatch.countDown(); + } + } + }; + client.getConnectionStateListenable().addListener(connectionStateListener); + client.start(); + + final BlockingQueue<Thread> queue = new ArrayBlockingQueue<Thread>(1); + LeaderSelectorListener listener = new LeaderSelectorListener() + { + @Override + public void takeLeadership(CuratorFramework client) throws Exception + { + queue.add(Thread.currentThread()); + try + { + Thread.currentThread().join(); + } + catch ( InterruptedException e ) + { + Thread.currentThread().interrupt(); + } + } + + @Override + public void stateChanged(CuratorFramework client, ConnectionState newState) + { + } + }; + selector = new LeaderSelector(client, "/leader", listener); + selector.start(); + + Thread leaderThread = queue.take(); + server.stop(); + leaderThread.interrupt(); + server.restart(); + Assert.assertTrue(timing.awaitLatch(reconnectedLatch)); + timing.sleepABit(); + + Assert.assertEquals(client.getChildren().forPath("/leader").size(), 0); + } + finally + { + CloseableUtils.closeQuietly(selector); + CloseableUtils.closeQuietly(client); + } + } + + @Test public void testInterruptLeadershipWithRequeue() throws Exception { Timing timing = new Timing();