This is an automated email from the ASF dual-hosted git repository. dcapwell pushed a commit to branch cep-15-accord in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cep-15-accord by this push: new a5b8c06bb9 When jvm-dtest is shutting down an instance TCM retries block the shutdown causing the test to fail a5b8c06bb9 is described below commit a5b8c06bb925905719261b1f449fffb049f54d1b Author: David Capwell <dcapw...@apache.org> AuthorDate: Tue Apr 2 22:18:50 2024 -0700 When jvm-dtest is shutting down an instance TCM retries block the shutdown causing the test to fail patch by David Capwell; reviewed by Blake Eggleston for CASSANDRA-19514 --- .../apache/cassandra/concurrent/Shutdownable.java | 14 +++++++- .../cassandra/service/accord/AccordService.java | 10 ++++++ .../apache/cassandra/tcm/EpochAwareDebounce.java | 40 +++++++++++++++++----- .../org/apache/cassandra/tcm/RemoteProcessor.java | 2 ++ .../cassandra/distributed/impl/Instance.java | 7 ++++ 5 files changed, 64 insertions(+), 9 deletions(-) diff --git a/src/java/org/apache/cassandra/concurrent/Shutdownable.java b/src/java/org/apache/cassandra/concurrent/Shutdownable.java index 185875b791..a72253fc87 100644 --- a/src/java/org/apache/cassandra/concurrent/Shutdownable.java +++ b/src/java/org/apache/cassandra/concurrent/Shutdownable.java @@ -19,7 +19,9 @@ package org.apache.cassandra.concurrent; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import org.apache.cassandra.utils.ExecutorUtils; import org.apache.cassandra.utils.Shared; import static org.apache.cassandra.utils.Shared.Scope.SIMULATION; @@ -29,6 +31,11 @@ public interface Shutdownable { boolean isTerminated(); + default boolean isShutdown() + { + return isTerminated(); + } + /** * Shutdown once any remaining work has completed (however this is defined for the implementation). */ @@ -42,5 +49,10 @@ public interface Shutdownable /** * Await termination of this object, i.e. the cessation of all current and future work. */ - public boolean awaitTermination(long timeout, TimeUnit units) throws InterruptedException; + boolean awaitTermination(long timeout, TimeUnit units) throws InterruptedException; + + default void shutdownAndWait(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException + { + ExecutorUtils.shutdownAndWait(timeout, unit, this); + } } diff --git a/src/java/org/apache/cassandra/service/accord/AccordService.java b/src/java/org/apache/cassandra/service/accord/AccordService.java index 9a44da4538..6e136029a0 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordService.java +++ b/src/java/org/apache/cassandra/service/accord/AccordService.java @@ -35,7 +35,9 @@ import com.google.common.primitives.Ints; import accord.coordinate.TopologyMismatch; import accord.impl.CoordinateDurabilityScheduling; import org.apache.cassandra.cql3.statements.RequestValidations; +import org.apache.cassandra.service.StorageService; import org.apache.cassandra.service.accord.interop.AccordInteropAdapter.AccordInteropFactory; +import org.apache.cassandra.tcm.ClusterMetadata; import org.apache.cassandra.tcm.ClusterMetadataService; import org.apache.cassandra.service.accord.api.*; import org.apache.cassandra.utils.*; @@ -241,6 +243,14 @@ public class AccordService implements IAccordService, Shutdownable } AccordService as = new AccordService(AccordTopology.tcmIdToAccord(tcmId)); as.startup(); + if (StorageService.instance.isReplacingSameAddress()) + { + // when replacing another node but using the same ip the hostId will also match, this causes no TCM transactions + // to be committed... + // In order to bootup correctly, need to pull in the current epoch + ClusterMetadata current = ClusterMetadata.current(); + as.configurationService().notifyPostCommit(current, current, false); + } instance = as; } diff --git a/src/java/org/apache/cassandra/tcm/EpochAwareDebounce.java b/src/java/org/apache/cassandra/tcm/EpochAwareDebounce.java index a1cc6e5a95..41e09073f4 100644 --- a/src/java/org/apache/cassandra/tcm/EpochAwareDebounce.java +++ b/src/java/org/apache/cassandra/tcm/EpochAwareDebounce.java @@ -20,12 +20,11 @@ package org.apache.cassandra.tcm; import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; import org.apache.cassandra.concurrent.ExecutorFactory; import org.apache.cassandra.concurrent.ExecutorPlus; -import org.apache.cassandra.utils.ExecutorUtils; +import org.apache.cassandra.concurrent.Shutdownable; import org.apache.cassandra.utils.concurrent.AsyncPromise; import org.apache.cassandra.utils.concurrent.Future; @@ -35,7 +34,7 @@ import org.apache.cassandra.utils.concurrent.Future; * comes in, we create a new future. If a request for a newer epoch comes in, we simply * swap out the current future reference for a new one which is requesting the newer epoch. */ -public class EpochAwareDebounce<T> +public class EpochAwareDebounce<T> implements Shutdownable { public static final EpochAwareDebounce<ClusterMetadata> instance = new EpochAwareDebounce<>(); @@ -74,6 +73,36 @@ public class EpochAwareDebounce<T> } } + @Override + public boolean isTerminated() + { + return executor.isTerminated(); + } + + @Override + public boolean isShutdown() + { + return executor.isShutdown(); + } + + @Override + public void shutdown() + { + executor.shutdown(); + } + + @Override + public Object shutdownNow() + { + return executor.shutdownNow(); + } + + @Override + public boolean awaitTermination(long timeout, TimeUnit units) throws InterruptedException + { + return executor.awaitTermination(timeout, units); + } + private static class EpochAwareAsyncPromise<T> extends AsyncPromise<T> { private final Epoch epoch; @@ -82,9 +111,4 @@ public class EpochAwareDebounce<T> this.epoch = epoch; } } - - public void shutdownAndWait(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException - { - ExecutorUtils.shutdownAndWait(timeout, unit, executor); - } } diff --git a/src/java/org/apache/cassandra/tcm/RemoteProcessor.java b/src/java/org/apache/cassandra/tcm/RemoteProcessor.java index 6a68407810..e82f2eaaf1 100644 --- a/src/java/org/apache/cassandra/tcm/RemoteProcessor.java +++ b/src/java/org/apache/cassandra/tcm/RemoteProcessor.java @@ -190,6 +190,8 @@ public final class RemoteProcessor implements Processor { if (promise.isCancelled() || promise.isDone()) return; + if (EpochAwareDebounce.instance.isShutdown()) + promise.tryFailure(new IllegalStateException("Unable to retry as we are shutting down")); if (!candidates.hasNext()) promise.tryFailure(new IllegalStateException(String.format("Ran out of candidates while sending %s: %s", verb, candidates))); diff --git a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java index 1e4335707a..b48d3199a9 100644 --- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java +++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java @@ -885,6 +885,7 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance { Future<?> future = async((ExecutorService executor) -> { Throwable error = null; + inInstancelogger.warn("Shutting down in thread {}", Thread.currentThread().getName()); CompactionManager.instance.forceShutdown(); @@ -1225,6 +1226,11 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance } })); } + // This is not used code, but it is here for when you run in a debugger... + // When shutdown gets blocked we need to be able to trace down which future is blocked, so this idx + // helps map the location... the reason we can't leverage here is the timeout logic is higher up, so + // 'idx' really only helps out in a debugger... + int idx = 0; for (Future<Throwable> future : results) { try @@ -1237,6 +1243,7 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance { accumulate = Throwables.merge(accumulate, t); } + idx++; } return accumulate; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org