This is an automated email from the ASF dual-hosted git repository. kezhuw pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/curator.git
The following commit(s) were added to refs/heads/master by this push: new bf587436 CURATOR-677: Complete BackgroundCallback if sub operation failed or cancelled (#467) bf587436 is described below commit bf587436c085d0361e881f919de64acb8424b1e5 Author: Kezhu Wang <kez...@apache.org> AuthorDate: Tue Aug 15 23:23:53 2023 +0800 CURATOR-677: Complete BackgroundCallback if sub operation failed or cancelled (#467) Currently, some background operations use auxiliary sub operations to complete task in case of primary conditions are not satisfied. But most of these sub operations count only success path, so they will hang `BackgroundCallback` if they are failed or cancelled due to framework closed. This is the leftover of [CURATOR-673][](#464). [CURATOR-673]: https://issues.apache.org/jira/browse/CURATOR-673 --- .../curator/framework/imps/CreateBuilderImpl.java | 36 ++----- .../framework/imps/CuratorFrameworkImpl.java | 15 ++- .../curator/framework/imps/DeleteBuilderImpl.java | 3 +- .../curator/framework/imps/ExistsBuilderImpl.java | 1 - .../curator/framework/imps/OperationAndData.java | 13 ++- .../curator/framework/imps/SetDataBuilderImpl.java | 9 +- .../curator/framework/imps/TestFramework.java | 118 ++++++++++++++++++++- 7 files changed, 154 insertions(+), 41 deletions(-) diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java index ef765769..fa208141 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java @@ -608,7 +608,6 @@ public class CreateBuilderImpl client, operationAndData, operationAndData.getData().getPath(), - backgrounding, acling.getACLProviderForParents(), createParentsAsContainers); } else if ((rc == KeeperException.Code.NODEEXISTS.intValue()) && setDataIfExists) { @@ -726,7 +725,6 @@ public class CreateBuilderImpl final CuratorFrameworkImpl client, final OperationAndData<T> mainOperationAndData, final String path, - Backgrounding backgrounding, final InternalACLProvider aclProvider, final boolean createParentsAsContainers) { BackgroundOperation<T> operation = new BackgroundOperation<T>() { @@ -736,8 +734,6 @@ public class CreateBuilderImpl ZKPaths.mkdirs(client.getZooKeeper(), path, false, aclProvider, createParentsAsContainers); } catch (KeeperException e) { if (!client.getZookeeperClient().getRetryPolicy().allowRetry(e)) { - sendBackgroundResponse( - client, e.code().intValue(), e.getPath(), null, null, null, mainOperationAndData); throw e; } // otherwise safe to ignore as it will get retried @@ -750,8 +746,7 @@ public class CreateBuilderImpl return CuratorEventType.CREATE; } }; - OperationAndData<T> parentOperation = new OperationAndData<>( - operation, mainOperationAndData.getData(), null, null, backgrounding.getContext(), null); + OperationAndData<T> parentOperation = new OperationAndData<>(operation, mainOperationAndData); client.queueOperation(parentOperation); } @@ -773,17 +768,13 @@ public class CreateBuilderImpl BackgroundOperation<PathAndBytes> operation = new BackgroundOperation<PathAndBytes>() { @Override public void performBackgroundOperation(OperationAndData<PathAndBytes> op) throws Exception { - try { - client.getZooKeeper() - .setData( - path, - mainOperationAndData.getData().getData(), - setDataIfExistsVersion, - statCallback, - backgrounding.getContext()); - } catch (KeeperException e) { - // ignore - } + client.getZooKeeper() + .setData( + path, + mainOperationAndData.getData().getData(), + setDataIfExistsVersion, + statCallback, + backgrounding.getContext()); } @Override @@ -791,7 +782,7 @@ public class CreateBuilderImpl return CuratorEventType.CREATE; } }; - client.queueOperation(new OperationAndData<>(operation, null, null, null, null, null)); + client.queueOperation(new OperationAndData<>(operation, mainOperationAndData)); } private void backgroundCheckIdempotent( @@ -821,12 +812,7 @@ public class CreateBuilderImpl BackgroundOperation<PathAndBytes> operation = new BackgroundOperation<PathAndBytes>() { @Override public void performBackgroundOperation(OperationAndData<PathAndBytes> op) throws Exception { - try { - client.getZooKeeper().getData(path, false, dataCallback, backgrounding.getContext()); - } catch (KeeperException e) { - // ignore - client.logError("Unexpected exception in async idempotent check for, ignoring: " + path, e); - } + client.getZooKeeper().getData(path, false, dataCallback, backgrounding.getContext()); } @Override @@ -834,7 +820,7 @@ public class CreateBuilderImpl return CuratorEventType.CREATE; } }; - client.queueOperation(new OperationAndData<>(operation, null, null, null, null, null)); + client.queueOperation(new OperationAndData<>(operation, mainOperationAndData)); } private void sendBackgroundResponse( diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java index 019897b3..dd62006d 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java @@ -431,9 +431,13 @@ public class CuratorFrameworkImpl implements CuratorFramework { if (ensembleTracker != null) { ensembleTracker.close(); } - OperationAndData<?>[] droppedOperations = backgroundOperations.toArray(new OperationAndData<?>[0]); - backgroundOperations.clear(); - Arrays.stream(droppedOperations).forEach(this::closeOperation); + // Operations are forbidden to queue after closing, but there are still other concurrent mutations, + // say, un-sleeping and not fully terminated background thread. So we have to drain the queue atomically + // to avoid duplicated close. But DelayQueue counts Delayed::getDelay, so we have to clear it up front. + backgroundOperations.forEach(OperationAndData::clearSleep); + Collection<OperationAndData<?>> droppedOperations = new ArrayList<>(backgroundOperations.size()); + backgroundOperations.drainTo(droppedOperations); + droppedOperations.forEach(this::closeOperation); listeners.clear(); unhandledErrorListeners.clear(); connectionStateManager.close(); @@ -745,7 +749,10 @@ public class CuratorFrameworkImpl implements CuratorFramework { return; } } - closeOperation(operationAndData); + // Sleeping operations are queued with delay, it could have been pulled out for execution or cancellation. + if (backgroundOperations.remove(operationAndData)) { + closeOperation(operationAndData); + } } /** diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/DeleteBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/DeleteBuilderImpl.java index 31dbf911..958879f5 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/DeleteBuilderImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/DeleteBuilderImpl.java @@ -230,8 +230,7 @@ public class DeleteBuilderImpl implements DeleteBuilder, BackgroundOperation<Str return CuratorEventType.DELETE; } }; - OperationAndData<String> parentOperation = new OperationAndData<String>( - operation, mainOperationAndData.getData(), null, null, backgrounding.getContext(), null); + OperationAndData<String> parentOperation = new OperationAndData<>(operation, mainOperationAndData); client.queueOperation(parentOperation); } diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/ExistsBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/ExistsBuilderImpl.java index 102ab823..68494c48 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/ExistsBuilderImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/ExistsBuilderImpl.java @@ -193,7 +193,6 @@ public class ExistsBuilderImpl client, operationAndData, operationAndData.getData(), - backgrounding, acling.getACLProviderForParents(), createParentContainersIfNeeded); } else { diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/OperationAndData.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/OperationAndData.java index be15d882..19e89c8e 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/OperationAndData.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/OperationAndData.java @@ -36,7 +36,7 @@ class OperationAndData<T> implements Delayed, RetrySleeper { private final BackgroundCallback callback; private final long startTimeMs = System.currentTimeMillis(); private final ErrorCallback<T> errorCallback; - private final AtomicInteger retryCount = new AtomicInteger(0); + private final AtomicInteger retryCount; private final AtomicLong sleepUntilTimeMs = new AtomicLong(0); private final AtomicLong ordinal = new AtomicLong(); private final Object context; @@ -46,6 +46,16 @@ class OperationAndData<T> implements Delayed, RetrySleeper { void retriesExhausted(OperationAndData<T> operationAndData); } + OperationAndData(BackgroundOperation<T> operation, OperationAndData<T> main) { + this.operation = operation; + this.data = main.data; + this.callback = main.callback; + this.errorCallback = main.errorCallback; + this.context = main.context; + this.connectionRequired = main.connectionRequired; + this.retryCount = main.retryCount; + } + OperationAndData( BackgroundOperation<T> operation, T data, @@ -59,6 +69,7 @@ class OperationAndData<T> implements Delayed, RetrySleeper { this.errorCallback = errorCallback; this.context = context; this.connectionRequired = connectionRequired; + this.retryCount = new AtomicInteger(0); reset(); } diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/SetDataBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/SetDataBuilderImpl.java index 325053aa..73ba8a54 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/SetDataBuilderImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/SetDataBuilderImpl.java @@ -229,12 +229,7 @@ public class SetDataBuilderImpl BackgroundOperation<PathAndBytes> operation = new BackgroundOperation<PathAndBytes>() { @Override public void performBackgroundOperation(OperationAndData<PathAndBytes> op) throws Exception { - try { - client.getZooKeeper().getData(path, false, dataCallback, backgrounding.getContext()); - } catch (KeeperException e) { - // ignore - client.logError("Unexpected exception in async idempotent check for, ignoring: " + path, e); - } + client.getZooKeeper().getData(path, false, dataCallback, backgrounding.getContext()); } @Override @@ -242,7 +237,7 @@ public class SetDataBuilderImpl return CuratorEventType.SET_DATA; } }; - client.queueOperation(new OperationAndData<>(operation, null, null, null, null, null)); + client.queueOperation(new OperationAndData<>(operation, mainOperationAndData)); } @Override diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFramework.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFramework.java index ea7fc048..d28b814e 100644 --- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFramework.java +++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFramework.java @@ -39,6 +39,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.curator.RetryPolicy; import org.apache.curator.RetrySleeper; import org.apache.curator.framework.AuthInfo; @@ -60,6 +61,7 @@ import org.apache.curator.framework.api.transaction.CuratorMultiTransaction; import org.apache.curator.framework.api.transaction.CuratorOp; import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.framework.state.ConnectionStateListener; +import org.apache.curator.retry.RetryForever; import org.apache.curator.retry.RetryOneTime; import org.apache.curator.test.BaseClassForTests; import org.apache.curator.test.Timing; @@ -917,6 +919,16 @@ public class TestFramework extends BaseClassForTests { BackgroundOperation<?> create(CuratorFramework client, CompletableFuture<CuratorEvent> future) throws Exception; } + private static class CountingCompletableFuture<T> extends CompletableFuture<T> { + private final AtomicInteger completes = new AtomicInteger(); + + @Override + public boolean complete(T value) { + completes.incrementAndGet(); + return super.complete(value); + } + } + private void testBackgroundOperationWithConcurrentCloseAndChaosStalls( BackgroundOperationFactory operationFactory, long maxRuns, long[] millisStalls) throws Exception { AlwaysRetry alwaysRetry = new AlwaysRetry(2); @@ -924,7 +936,7 @@ public class TestFramework extends BaseClassForTests { client.start(); try { // given: error background request with always-retry policy - CompletableFuture<CuratorEvent> future = new CompletableFuture<>(); + CountingCompletableFuture<CuratorEvent> future = new CountingCompletableFuture<>(); BackgroundOperation<?> operation = operationFactory.create(client, future); // These chaos steps create chances to run into concurrent contentions. @@ -948,6 +960,7 @@ public class TestFramework extends BaseClassForTests { assertThat(event.getResultCode()).isEqualTo(KeeperException.Code.SESSIONEXPIRED.intValue()); assertThat(event.getType()).isSameAs(operation.getBackgroundEventType()); assertThat(event.getContext()).isSameAs(future); + assertThat(future.completes.get()).isEqualTo(1); } finally { CloseableUtils.closeQuietly(client); } @@ -957,6 +970,7 @@ public class TestFramework extends BaseClassForTests { throws Exception { testBackgroundOperationWithConcurrentCloseAndChaosStalls(operationFactory, -1, new long[] {20, -1, 5}); testBackgroundOperationWithConcurrentCloseAndChaosStalls(operationFactory, -1, new long[] {10}); + testBackgroundOperationWithConcurrentCloseAndChaosStalls(operationFactory, -1, new long[] {200}); testBackgroundOperationWithConcurrentCloseAndChaosStalls(operationFactory, 2, new long[] {20}); } @@ -978,6 +992,67 @@ public class TestFramework extends BaseClassForTests { }); } + @Test + public void testBackgroundCreateSetDataIfExistsWithConcurrentClose() throws Exception { + AtomicBoolean retry = new AtomicBoolean(); + testBackgroundOperationWithConcurrentClose((client, future) -> { + if (retry.compareAndSet(false, true)) { + try { + client.create().forPath("/exist-path"); + } catch (KeeperException ex) { + throw new IllegalStateException(ex); + } + } + CreateBuilder create = client.create(); + create.orSetData(Integer.MAX_VALUE) + .inBackground((ignored, event) -> future.complete(event), future) + .forPath("/exist-path"); + return (BackgroundOperation<?>) create; + }); + } + + @Test + public void testBackgroundCreateIdempotentWithConcurrentClose() throws Exception { + AtomicBoolean retry = new AtomicBoolean(); + testBackgroundOperationWithConcurrentClose((client, future) -> { + if (retry.compareAndSet(false, true)) { + try { + client.create().forPath("/exist-path", "some-data".getBytes()); + } catch (KeeperException ex) { + throw new IllegalStateException(ex); + } + } + CreateBuilder create = client.create(); + create.idempotent() + .inBackground((ignored, event) -> future.complete(event), future) + .forPath("/exist-path", "different-data".getBytes()); + return (BackgroundOperation<?>) create; + }); + } + + @Test + public void testBackgroundCreateParentsIfNeedWithConcurrentClose() throws Exception { + AtomicBoolean retry = new AtomicBoolean(); + testBackgroundOperationWithConcurrentClose((client, future) -> { + if (retry.compareAndSet(false, true)) { + try { + // Disable CREATE child permission for grandparent path. + client.create() + .withACL(Collections.singletonList( + new ACL(ZooDefs.Perms.READ, ZooDefs.Ids.ANYONE_ID_UNSAFE))) + .forPath("/grandparent"); + } catch (KeeperException ex) { + throw new IllegalStateException(ex); + } + } + CreateBuilder create = client.create(); + create.creatingParentsIfNeeded() + .inBackground((ignored, event) -> future.complete(event), future) + .forPath("/grandparent/parent/child"); + return (BackgroundOperation<?>) create; + }); + } + @Test public void testBackgroundDeleteWithConcurrentClose() throws Exception { testBackgroundOperationWithConcurrentClose((client, future) -> { @@ -988,6 +1063,31 @@ public class TestFramework extends BaseClassForTests { }); } + @Test + public void testBackgroundDeleteNotEmptyAndACLWithConcurrentClose() throws Exception { + AtomicBoolean retry = new AtomicBoolean(); + testBackgroundOperationWithConcurrentClose((client, future) -> { + if (retry.compareAndSet(false, true)) { + try (CuratorFramework authedClient = CuratorFrameworkFactory.builder() + .connectString(server.getConnectString()) + .authorization("digest", "me1:pass1".getBytes()) + .retryPolicy(new RetryForever(2)) + .build()) { + client.create().forPath("/not-empty-path"); + authedClient.start(); + authedClient.create().withACL(ZooDefs.Ids.CREATOR_ALL_ACL).forPath("/not-empty-path/child"); + } catch (KeeperException ex) { + throw new IllegalStateException(ex); + } + } + DeleteBuilder delete = client.delete(); + delete.deletingChildrenIfNeeded() + .inBackground((ignored, event) -> future.complete(event), future) + .forPath("/not-empty-path"); + return (BackgroundOperation<?>) delete; + }); + } + @Test public void testBackgroundExistsWithConcurrentClose() throws Exception { testBackgroundOperationWithConcurrentClose((client, future) -> { @@ -1018,6 +1118,22 @@ public class TestFramework extends BaseClassForTests { }); } + @Test + public void testBackgroundSetDataIdempotentWithConcurrentClose() throws Exception { + AtomicBoolean retry = new AtomicBoolean(); + testBackgroundOperationWithConcurrentClose((client, future) -> { + if (retry.compareAndSet(false, true)) { + client.create().forPath("/bad-version-path", "version1".getBytes()); + } + SetDataBuilder setData = client.setData(); + setData.idempotent() + .withVersion(333) + .inBackground((ignored, event) -> future.complete(event), future) + .forPath("/bad-version-path"); + return (BackgroundOperation<?>) setData; + }); + } + @Test public void testBackgroundChildrenWithConcurrentClose() throws Exception { testBackgroundOperationWithConcurrentClose((client, future) -> {