This is an automated email from the ASF dual-hosted git repository.

dcapwell pushed a commit to branch cep-15-accord
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cep-15-accord by this push:
     new a5b8c06bb9 When jvm-dtest is shutting down an instance TCM retries 
block the shutdown causing the test to fail
a5b8c06bb9 is described below

commit a5b8c06bb925905719261b1f449fffb049f54d1b
Author: David Capwell <dcapw...@apache.org>
AuthorDate: Tue Apr 2 22:18:50 2024 -0700

    When jvm-dtest is shutting down an instance TCM retries block the shutdown 
causing the test to fail
    
    patch by David Capwell; reviewed by Blake Eggleston for CASSANDRA-19514
---
 .../apache/cassandra/concurrent/Shutdownable.java  | 14 +++++++-
 .../cassandra/service/accord/AccordService.java    | 10 ++++++
 .../apache/cassandra/tcm/EpochAwareDebounce.java   | 40 +++++++++++++++++-----
 .../org/apache/cassandra/tcm/RemoteProcessor.java  |  2 ++
 .../cassandra/distributed/impl/Instance.java       |  7 ++++
 5 files changed, 64 insertions(+), 9 deletions(-)

diff --git a/src/java/org/apache/cassandra/concurrent/Shutdownable.java 
b/src/java/org/apache/cassandra/concurrent/Shutdownable.java
index 185875b791..a72253fc87 100644
--- a/src/java/org/apache/cassandra/concurrent/Shutdownable.java
+++ b/src/java/org/apache/cassandra/concurrent/Shutdownable.java
@@ -19,7 +19,9 @@
 package org.apache.cassandra.concurrent;
 
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
+import org.apache.cassandra.utils.ExecutorUtils;
 import org.apache.cassandra.utils.Shared;
 
 import static org.apache.cassandra.utils.Shared.Scope.SIMULATION;
@@ -29,6 +31,11 @@ public interface Shutdownable
 {
     boolean isTerminated();
 
+    default boolean isShutdown()
+    {
+        return isTerminated();
+    }
+
     /**
      * Shutdown once any remaining work has completed (however this is defined 
for the implementation).
      */
@@ -42,5 +49,10 @@ public interface Shutdownable
     /**
      * Await termination of this object, i.e. the cessation of all current and 
future work.
      */
-    public boolean awaitTermination(long timeout, TimeUnit units) throws 
InterruptedException;
+    boolean awaitTermination(long timeout, TimeUnit units) throws 
InterruptedException;
+
+    default void shutdownAndWait(long timeout, TimeUnit unit) throws 
InterruptedException, TimeoutException
+    {
+        ExecutorUtils.shutdownAndWait(timeout, unit, this);
+    }
 }
diff --git a/src/java/org/apache/cassandra/service/accord/AccordService.java 
b/src/java/org/apache/cassandra/service/accord/AccordService.java
index 9a44da4538..6e136029a0 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordService.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordService.java
@@ -35,7 +35,9 @@ import com.google.common.primitives.Ints;
 import accord.coordinate.TopologyMismatch;
 import accord.impl.CoordinateDurabilityScheduling;
 import org.apache.cassandra.cql3.statements.RequestValidations;
+import org.apache.cassandra.service.StorageService;
 import 
org.apache.cassandra.service.accord.interop.AccordInteropAdapter.AccordInteropFactory;
+import org.apache.cassandra.tcm.ClusterMetadata;
 import org.apache.cassandra.tcm.ClusterMetadataService;
 import org.apache.cassandra.service.accord.api.*;
 import org.apache.cassandra.utils.*;
@@ -241,6 +243,14 @@ public class AccordService implements IAccordService, 
Shutdownable
         }
         AccordService as = new 
AccordService(AccordTopology.tcmIdToAccord(tcmId));
         as.startup();
+        if (StorageService.instance.isReplacingSameAddress())
+        {
+            // when replacing another node but using the same ip the hostId 
will also match, this causes no TCM transactions
+            // to be committed...
+            // In order to bootup correctly, need to pull in the current epoch
+            ClusterMetadata current = ClusterMetadata.current();
+            as.configurationService().notifyPostCommit(current, current, 
false);
+        }
         instance = as;
     }
 
diff --git a/src/java/org/apache/cassandra/tcm/EpochAwareDebounce.java 
b/src/java/org/apache/cassandra/tcm/EpochAwareDebounce.java
index a1cc6e5a95..41e09073f4 100644
--- a/src/java/org/apache/cassandra/tcm/EpochAwareDebounce.java
+++ b/src/java/org/apache/cassandra/tcm/EpochAwareDebounce.java
@@ -20,12 +20,11 @@ package org.apache.cassandra.tcm;
 
 import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.cassandra.concurrent.ExecutorFactory;
 import org.apache.cassandra.concurrent.ExecutorPlus;
-import org.apache.cassandra.utils.ExecutorUtils;
+import org.apache.cassandra.concurrent.Shutdownable;
 import org.apache.cassandra.utils.concurrent.AsyncPromise;
 import org.apache.cassandra.utils.concurrent.Future;
 
@@ -35,7 +34,7 @@ import org.apache.cassandra.utils.concurrent.Future;
  * comes in, we create a new future. If a request for a newer epoch comes in, 
we simply
  * swap out the current future reference for a new one which is requesting the 
newer epoch.
  */
-public class EpochAwareDebounce<T>
+public class EpochAwareDebounce<T> implements Shutdownable
 {
     public static final EpochAwareDebounce<ClusterMetadata> instance = new 
EpochAwareDebounce<>();
 
@@ -74,6 +73,36 @@ public class EpochAwareDebounce<T>
         }
     }
 
+    @Override
+    public boolean isTerminated()
+    {
+        return executor.isTerminated();
+    }
+
+    @Override
+    public boolean isShutdown()
+    {
+        return executor.isShutdown();
+    }
+
+    @Override
+    public void shutdown()
+    {
+        executor.shutdown();
+    }
+
+    @Override
+    public Object shutdownNow()
+    {
+        return executor.shutdownNow();
+    }
+
+    @Override
+    public boolean awaitTermination(long timeout, TimeUnit units) throws 
InterruptedException
+    {
+        return executor.awaitTermination(timeout, units);
+    }
+
     private static class EpochAwareAsyncPromise<T> extends AsyncPromise<T>
     {
         private final Epoch epoch;
@@ -82,9 +111,4 @@ public class EpochAwareDebounce<T>
             this.epoch = epoch;
         }
     }
-
-    public void shutdownAndWait(long timeout, TimeUnit unit) throws 
InterruptedException, TimeoutException
-    {
-        ExecutorUtils.shutdownAndWait(timeout, unit, executor);
-    }
 }
diff --git a/src/java/org/apache/cassandra/tcm/RemoteProcessor.java 
b/src/java/org/apache/cassandra/tcm/RemoteProcessor.java
index 6a68407810..e82f2eaaf1 100644
--- a/src/java/org/apache/cassandra/tcm/RemoteProcessor.java
+++ b/src/java/org/apache/cassandra/tcm/RemoteProcessor.java
@@ -190,6 +190,8 @@ public final class RemoteProcessor implements Processor
             {
                 if (promise.isCancelled() || promise.isDone())
                     return;
+                if (EpochAwareDebounce.instance.isShutdown())
+                    promise.tryFailure(new IllegalStateException("Unable to 
retry as we are shutting down"));
                 if (!candidates.hasNext())
                     promise.tryFailure(new 
IllegalStateException(String.format("Ran out of candidates while sending %s: 
%s", verb, candidates)));
 
diff --git 
a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java 
b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
index 1e4335707a..b48d3199a9 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
@@ -885,6 +885,7 @@ public class Instance extends IsolatedExecutor implements 
IInvokableInstance
     {
         Future<?> future = async((ExecutorService executor) -> {
             Throwable error = null;
+            inInstancelogger.warn("Shutting down in thread {}", 
Thread.currentThread().getName());
 
             CompactionManager.instance.forceShutdown();
 
@@ -1225,6 +1226,11 @@ public class Instance extends IsolatedExecutor 
implements IInvokableInstance
                 }
             }));
         }
+        // This is not used code, but it is here for when you run in a 
debugger...
+        // When shutdown gets blocked we need to be able to trace down which 
future is blocked, so this idx
+        // helps map the location... the reason we can't leverage here is the 
timeout logic is higher up, so
+        // 'idx' really only helps out in a debugger...
+        int idx = 0;
         for (Future<Throwable> future : results)
         {
             try
@@ -1237,6 +1243,7 @@ public class Instance extends IsolatedExecutor implements 
IInvokableInstance
             {
                 accumulate = Throwables.merge(accumulate, t);
             }
+            idx++;
         }
         return accumulate;
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to