This is an automated email from the ASF dual-hosted git repository. samt pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push: new dc45bb5876 Allow threads waiting for the log follower to be interrupted dc45bb5876 is described below commit dc45bb5876aafa2ce7dcfe6a3b7de0f6a9a35fda Author: Sam Tunnicliffe <s...@apache.org> AuthorDate: Thu Jul 11 19:40:55 2024 +0100 Allow threads waiting for the log follower to be interrupted Patch by Sam Tunnicliffe and David Capwell; reviewed by Alex Petrov for CASSANDRA-19761 --- CHANGES.txt | 1 + .../org/apache/cassandra/tcm/RemoteProcessor.java | 4 +- .../org/apache/cassandra/tcm/log/LocalLog.java | 22 +++-- .../cassandra/utils/JVMStabilityInspector.java | 3 + .../distributed/test/tcm/CMSShutdownTest.java | 104 +++++++++++++++++++++ 5 files changed, 125 insertions(+), 9 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 72d6e8f1e9..7c6c10cd50 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 5.1 + * Allow threads waiting for the metadata log follower to be interrupted (CASSANDRA-19761) * Support dictionary lookup for CassandraPasswordValidator (CASSANDRA-19762) * Disallow denylisting keys in system_cluster_metadata (CASSANDRA-19713) * Fix gossip status after replacement (CASSANDRA-19712) diff --git a/src/java/org/apache/cassandra/tcm/RemoteProcessor.java b/src/java/org/apache/cassandra/tcm/RemoteProcessor.java index c267d140d4..79f0b7cf7d 100644 --- a/src/java/org/apache/cassandra/tcm/RemoteProcessor.java +++ b/src/java/org/apache/cassandra/tcm/RemoteProcessor.java @@ -152,7 +152,7 @@ public final class RemoteProcessor implements Processor { try { - return fetchLogAndWaitInternal(candidateIterator, log).awaitUninterruptibly().get(); + return fetchLogAndWaitInternal(candidateIterator, log).await().get(); } catch (InterruptedException | ExecutionException e) { @@ -191,7 +191,7 @@ public final class RemoteProcessor implements Processor { Promise<RSP> promise = new AsyncPromise<>(); sendWithCallbackAsync(promise, verb, request, candidates, retryPolicy); - return promise.awaitUninterruptibly().get(); + return promise.await().get(); } catch (InterruptedException | ExecutionException e) { diff --git a/src/java/org/apache/cassandra/tcm/log/LocalLog.java b/src/java/org/apache/cassandra/tcm/log/LocalLog.java index 0307e49048..a5b744fc17 100644 --- a/src/java/org/apache/cassandra/tcm/log/LocalLog.java +++ b/src/java/org/apache/cassandra/tcm/log/LocalLog.java @@ -370,6 +370,8 @@ public abstract class LocalLog implements Closeable */ public void append(LogState logState) { + if (logState.isEmpty()) + return; logger.debug("Appending log state with snapshot to the pending buffer: {}", logState); // If we receive a base state (snapshot), we need to construct a synthetic ForceSnapshot transformation that will serve as // a base for application of the rest of the entries. If the log state contains any additional transformations that follow @@ -403,13 +405,14 @@ public abstract class LocalLog implements Closeable { runOnce(null); } - catch (InterruptedException | TimeoutException e) + catch (TimeoutException e) { - throw new RuntimeException("Should not have happened, since we await uninterruptibly", e); + // This should not happen as no duration was specified in the call to runOnce + throw new RuntimeException("Timed out waiting for log follower to run", e); } } - abstract void runOnce(DurationSpec durationSpec) throws InterruptedException, TimeoutException; + abstract void runOnce(DurationSpec durationSpec) throws TimeoutException; abstract void processPending(); private Entry peek() @@ -664,8 +667,11 @@ public abstract class LocalLog implements Closeable } @Override - public void runOnce(DurationSpec duration) throws InterruptedException, TimeoutException + public void runOnce(DurationSpec duration) throws TimeoutException { + if (executor.isTerminated()) + throw new IllegalStateException("Global log follower has shutdown"); + Condition ours = Condition.newOneTimeCondition(); for (int i = 0; i < 2; i++) { @@ -678,9 +684,10 @@ public abstract class LocalLog implements Closeable { if (duration == null) { - current.awaitUninterruptibly(); + + current.awaitThrowUncheckedOnInterrupt(); } - else if (!current.await(duration.to(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS)) + else if (!current.awaitThrowUncheckedOnInterrupt(duration.to(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS)) { throw new TimeoutException(String.format("Timed out waiting for follower to run at least once. " + "Pending is %s and current is now at epoch %s.", @@ -707,7 +714,7 @@ public abstract class LocalLog implements Closeable if (runnable.subscriber.compareAndSet(null, ours)) { runnable.logNotifier.signalAll(); - ours.awaitUninterruptibly(); + ours.awaitThrowUncheckedOnInterrupt(); return; } } @@ -727,6 +734,7 @@ public abstract class LocalLog implements Closeable Condition condition = runnable.subscriber.get(); if (condition != null) condition.signalAll(); + runnable.logNotifier.signalAll(); try { diff --git a/src/java/org/apache/cassandra/utils/JVMStabilityInspector.java b/src/java/org/apache/cassandra/utils/JVMStabilityInspector.java index a396ef9947..983e75f252 100644 --- a/src/java/org/apache/cassandra/utils/JVMStabilityInspector.java +++ b/src/java/org/apache/cassandra/utils/JVMStabilityInspector.java @@ -141,6 +141,9 @@ public final class JVMStabilityInspector if (t instanceof InterruptedException) throw new UncheckedInterruptedException((InterruptedException) t); + if (t instanceof UncheckedInterruptedException) + throw (UncheckedInterruptedException)t; + if (DatabaseDescriptor.getDiskFailurePolicy() == Config.DiskFailurePolicy.die) if (t instanceof FSError || t instanceof CorruptSSTableException) isUnstable = true; diff --git a/test/distributed/org/apache/cassandra/distributed/test/tcm/CMSShutdownTest.java b/test/distributed/org/apache/cassandra/distributed/test/tcm/CMSShutdownTest.java new file mode 100644 index 0000000000..736f8335e9 --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/test/tcm/CMSShutdownTest.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test.tcm; + +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; + +import org.junit.Test; + +import net.bytebuddy.ByteBuddy; +import net.bytebuddy.dynamic.loading.ClassLoadingStrategy; +import net.bytebuddy.implementation.MethodDelegation; +import net.bytebuddy.implementation.bind.annotation.SuperCall; +import org.apache.cassandra.concurrent.Stage; +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.api.Feature; +import org.apache.cassandra.distributed.test.TestBaseImpl; +import org.apache.cassandra.tcm.ClusterMetadataService; +import org.apache.cassandra.tcm.Epoch; +import org.apache.cassandra.tcm.PaxosBackedProcessor; +import org.apache.cassandra.tcm.Transformation; +import org.apache.cassandra.tcm.log.Entry; +import org.apache.cassandra.tcm.transformations.TriggerSnapshot; +import org.apache.cassandra.utils.Shared; + +import static net.bytebuddy.matcher.ElementMatchers.named; + +public class CMSShutdownTest extends TestBaseImpl +{ + @Test + public void shutdownCMSCoincidingWithUnsuccessfulCommit() throws Exception + { + // This test simulates a CMS node attempting to commit an entry to the log but being unable + // to obtain consensus from other CMS members while it is also shutting down itself. + try (Cluster cluster = Cluster.build(2) + .withConfig(c -> c.with(Feature.values())) + .withInstanceInitializer(BBHelper::install) + .start()) + { + cluster.get(1).runOnInstance(CommitHelper::scheduleCommits); + State.latch.await(); + } + } + + private static class CommitHelper + { + public static void commitTransformations() + { + // Continuously attempt to commit a log entry, meanwhile counting down the + // latch ensures that every commit will fail as if unable to obtain consensus + // from other CMS members + State.latch.countDown(); + for (; ;) + ClusterMetadataService.instance().commit(TriggerSnapshot.instance); + } + + public static void scheduleCommits() + { + Stage.MISC.execute(CommitHelper::commitTransformations); + } + } + + @Shared + public static class State + { + public static final CountDownLatch latch = new CountDownLatch(1); + } + + public static class BBHelper + { + static void install(ClassLoader cl, int node) + { + if (node != 1) return; + new ByteBuddy().rebase(PaxosBackedProcessor.class) + .method(named("tryCommitOne")) + .intercept(MethodDelegation.to(BBHelper.class)) + .make() + .load(cl, ClassLoadingStrategy.Default.INJECTION); + } + + public static boolean tryCommitOne(Entry.Id entryId, Transformation transform, Epoch previousEpoch, Epoch nextEpoch, @SuperCall Callable<Boolean> call) throws Exception + { + if (State.latch.getCount() == 1) + return call.call(); + return false; + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org