This is an automated email from the ASF dual-hosted git repository.

samt pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new dc45bb5876 Allow threads waiting for the log follower to be interrupted
dc45bb5876 is described below

commit dc45bb5876aafa2ce7dcfe6a3b7de0f6a9a35fda
Author: Sam Tunnicliffe <s...@apache.org>
AuthorDate: Thu Jul 11 19:40:55 2024 +0100

    Allow threads waiting for the log follower to be interrupted
    
    Patch by Sam Tunnicliffe and David Capwell; reviewed by Alex Petrov for
    CASSANDRA-19761
---
 CHANGES.txt                                        |   1 +
 .../org/apache/cassandra/tcm/RemoteProcessor.java  |   4 +-
 .../org/apache/cassandra/tcm/log/LocalLog.java     |  22 +++--
 .../cassandra/utils/JVMStabilityInspector.java     |   3 +
 .../distributed/test/tcm/CMSShutdownTest.java      | 104 +++++++++++++++++++++
 5 files changed, 125 insertions(+), 9 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 72d6e8f1e9..7c6c10cd50 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 5.1
+ * Allow threads waiting for the metadata log follower to be interrupted 
(CASSANDRA-19761)
  * Support dictionary lookup for CassandraPasswordValidator (CASSANDRA-19762)
  * Disallow denylisting keys in system_cluster_metadata (CASSANDRA-19713)
  * Fix gossip status after replacement (CASSANDRA-19712)
diff --git a/src/java/org/apache/cassandra/tcm/RemoteProcessor.java 
b/src/java/org/apache/cassandra/tcm/RemoteProcessor.java
index c267d140d4..79f0b7cf7d 100644
--- a/src/java/org/apache/cassandra/tcm/RemoteProcessor.java
+++ b/src/java/org/apache/cassandra/tcm/RemoteProcessor.java
@@ -152,7 +152,7 @@ public final class RemoteProcessor implements Processor
     {
         try
         {
-            return fetchLogAndWaitInternal(candidateIterator, 
log).awaitUninterruptibly().get();
+            return fetchLogAndWaitInternal(candidateIterator, 
log).await().get();
         }
         catch (InterruptedException | ExecutionException e)
         {
@@ -191,7 +191,7 @@ public final class RemoteProcessor implements Processor
         {
             Promise<RSP> promise = new AsyncPromise<>();
             sendWithCallbackAsync(promise, verb, request, candidates, 
retryPolicy);
-            return promise.awaitUninterruptibly().get();
+            return promise.await().get();
         }
         catch (InterruptedException | ExecutionException e)
         {
diff --git a/src/java/org/apache/cassandra/tcm/log/LocalLog.java 
b/src/java/org/apache/cassandra/tcm/log/LocalLog.java
index 0307e49048..a5b744fc17 100644
--- a/src/java/org/apache/cassandra/tcm/log/LocalLog.java
+++ b/src/java/org/apache/cassandra/tcm/log/LocalLog.java
@@ -370,6 +370,8 @@ public abstract class LocalLog implements Closeable
      */
     public void append(LogState logState)
     {
+        if (logState.isEmpty())
+            return;
         logger.debug("Appending log state with snapshot to the pending buffer: 
{}", logState);
         // If we receive a base state (snapshot), we need to construct a 
synthetic ForceSnapshot transformation that will serve as
         // a base for application of the rest of the entries. If the log state 
contains any additional transformations that follow
@@ -403,13 +405,14 @@ public abstract class LocalLog implements Closeable
         {
             runOnce(null);
         }
-        catch (InterruptedException | TimeoutException e)
+        catch (TimeoutException e)
         {
-            throw new RuntimeException("Should not have happened, since we 
await uninterruptibly", e);
+            // This should not happen as no duration was specified in the call 
to runOnce
+            throw new RuntimeException("Timed out waiting for log follower to 
run", e);
         }
     }
 
-    abstract void runOnce(DurationSpec durationSpec) throws 
InterruptedException, TimeoutException;
+    abstract void runOnce(DurationSpec durationSpec) throws TimeoutException;
     abstract void processPending();
 
     private Entry peek()
@@ -664,8 +667,11 @@ public abstract class LocalLog implements Closeable
         }
 
         @Override
-        public void runOnce(DurationSpec duration) throws 
InterruptedException, TimeoutException
+        public void runOnce(DurationSpec duration) throws TimeoutException
         {
+            if (executor.isTerminated())
+                throw new IllegalStateException("Global log follower has 
shutdown");
+
             Condition ours = Condition.newOneTimeCondition();
             for (int i = 0; i < 2; i++)
             {
@@ -678,9 +684,10 @@ public abstract class LocalLog implements Closeable
                 {
                     if (duration == null)
                     {
-                        current.awaitUninterruptibly();
+
+                        current.awaitThrowUncheckedOnInterrupt();
                     }
-                    else if 
(!current.await(duration.to(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS))
+                    else if 
(!current.awaitThrowUncheckedOnInterrupt(duration.to(TimeUnit.MILLISECONDS), 
TimeUnit.MILLISECONDS))
                     {
                         throw new TimeoutException(String.format("Timed out 
waiting for follower to run at least once. " +
                                                                  "Pending is 
%s and current is now at epoch %s.",
@@ -707,7 +714,7 @@ public abstract class LocalLog implements Closeable
                 if (runnable.subscriber.compareAndSet(null, ours))
                 {
                     runnable.logNotifier.signalAll();
-                    ours.awaitUninterruptibly();
+                    ours.awaitThrowUncheckedOnInterrupt();
                     return;
                 }
             }
@@ -727,6 +734,7 @@ public abstract class LocalLog implements Closeable
             Condition condition = runnable.subscriber.get();
             if (condition != null)
                 condition.signalAll();
+
             runnable.logNotifier.signalAll();
             try
             {
diff --git a/src/java/org/apache/cassandra/utils/JVMStabilityInspector.java 
b/src/java/org/apache/cassandra/utils/JVMStabilityInspector.java
index a396ef9947..983e75f252 100644
--- a/src/java/org/apache/cassandra/utils/JVMStabilityInspector.java
+++ b/src/java/org/apache/cassandra/utils/JVMStabilityInspector.java
@@ -141,6 +141,9 @@ public final class JVMStabilityInspector
         if (t instanceof InterruptedException)
             throw new UncheckedInterruptedException((InterruptedException) t);
 
+        if (t instanceof UncheckedInterruptedException)
+            throw (UncheckedInterruptedException)t;
+
         if (DatabaseDescriptor.getDiskFailurePolicy() == 
Config.DiskFailurePolicy.die)
             if (t instanceof FSError || t instanceof CorruptSSTableException)
                 isUnstable = true;
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/tcm/CMSShutdownTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/tcm/CMSShutdownTest.java
new file mode 100644
index 0000000000..736f8335e9
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/tcm/CMSShutdownTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.tcm;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+
+import org.junit.Test;
+
+import net.bytebuddy.ByteBuddy;
+import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
+import net.bytebuddy.implementation.MethodDelegation;
+import net.bytebuddy.implementation.bind.annotation.SuperCall;
+import org.apache.cassandra.concurrent.Stage;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.tcm.ClusterMetadataService;
+import org.apache.cassandra.tcm.Epoch;
+import org.apache.cassandra.tcm.PaxosBackedProcessor;
+import org.apache.cassandra.tcm.Transformation;
+import org.apache.cassandra.tcm.log.Entry;
+import org.apache.cassandra.tcm.transformations.TriggerSnapshot;
+import org.apache.cassandra.utils.Shared;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+
+public class CMSShutdownTest extends TestBaseImpl
+{
+    @Test
+    public void shutdownCMSCoincidingWithUnsuccessfulCommit() throws Exception
+    {
+        // This test simulates a CMS node attempting to commit an entry to the 
log but being unable
+        // to obtain consensus from other CMS members while it is also 
shutting down itself.
+        try (Cluster cluster = Cluster.build(2)
+                                      .withConfig(c -> 
c.with(Feature.values()))
+                                      
.withInstanceInitializer(BBHelper::install)
+                                      .start())
+        {
+            cluster.get(1).runOnInstance(CommitHelper::scheduleCommits);
+            State.latch.await();
+        }
+    }
+
+    private static class CommitHelper
+    {
+        public static void commitTransformations()
+        {
+            // Continuously attempt to commit a log entry, meanwhile counting 
down the
+            // latch ensures that every commit will fail as if unable to 
obtain consensus
+            // from other CMS members
+            State.latch.countDown();
+            for (; ;)
+                
ClusterMetadataService.instance().commit(TriggerSnapshot.instance);
+        }
+
+        public static void scheduleCommits()
+        {
+            Stage.MISC.execute(CommitHelper::commitTransformations);
+        }
+    }
+
+    @Shared
+    public static class State
+    {
+        public static final CountDownLatch latch = new CountDownLatch(1);
+    }
+
+    public static class BBHelper
+    {
+        static void install(ClassLoader cl, int node)
+        {
+            if (node != 1) return;
+            new ByteBuddy().rebase(PaxosBackedProcessor.class)
+                           .method(named("tryCommitOne"))
+                           .intercept(MethodDelegation.to(BBHelper.class))
+                           .make()
+                           .load(cl, ClassLoadingStrategy.Default.INJECTION);
+        }
+
+        public static boolean tryCommitOne(Entry.Id entryId, Transformation 
transform, Epoch previousEpoch, Epoch nextEpoch, @SuperCall Callable<Boolean> 
call) throws Exception
+        {
+            if (State.latch.getCount() == 1)
+                return call.call();
+            return false;
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to