This is an automated email from the ASF dual-hosted git repository. ifesdjeen pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push: new 3e6a551dba TCM: Catch up committing node on rejection 3e6a551dba is described below commit 3e6a551dbab6ecdc97b99f9ec3118316bfaf1802 Author: Alex Petrov <oleksandr.pet...@gmail.com> AuthorDate: Thu Jan 11 14:18:46 2024 +0100 TCM: Catch up committing node on rejection Patch by Alex Petrov; reviewed by Marcus Eriksson for CASSANDRA-19260 --- .../cassandra/tcm/AbstractLocalProcessor.java | 10 +++-- src/java/org/apache/cassandra/tcm/Commit.java | 47 +++++++++++++++++++--- .../org/apache/cassandra/tcm/RemoteProcessor.java | 14 +++++-- ...ationSmokeTest.java => LogReplicationTest.java} | 38 ++++++++++++++++- 4 files changed, 94 insertions(+), 15 deletions(-) diff --git a/src/java/org/apache/cassandra/tcm/AbstractLocalProcessor.java b/src/java/org/apache/cassandra/tcm/AbstractLocalProcessor.java index 2d00085f3d..a72b5b664f 100644 --- a/src/java/org/apache/cassandra/tcm/AbstractLocalProcessor.java +++ b/src/java/org/apache/cassandra/tcm/AbstractLocalProcessor.java @@ -18,6 +18,7 @@ package org.apache.cassandra.tcm; +import java.util.concurrent.TimeUnit; import java.util.function.Supplier; import org.slf4j.Logger; @@ -79,7 +80,7 @@ public abstract class AbstractLocalProcessor implements Processor { return maybeFailure(entryId, lastKnown, - () -> new Commit.Result.Failure(result.rejected().code, result.rejected().reason, true)); + () -> Commit.Result.rejected(result.rejected().code, result.rejected().reason, toLogState(lastKnown))); } continue; @@ -118,9 +119,10 @@ public abstract class AbstractLocalProcessor implements Processor retryPolicy.maybeSleep(); } } - return new Commit.Result.Failure(SERVER_ERROR, - String.format("Could not perform commit using the following retry stategy: %s", retryPolicy.tries), - false); + return Commit.Result.failed(SERVER_ERROR, + String.format("Could not perform commit after %d/%d tries. Time remaining: %dms", + retryPolicy.tries, retryPolicy.maxTries, + TimeUnit.NANOSECONDS.toMillis(retryPolicy.remainingNanos()))); } public Commit.Result maybeFailure(Entry.Id entryId, Epoch lastKnown, Supplier<Commit.Result.Failure> orElse) diff --git a/src/java/org/apache/cassandra/tcm/Commit.java b/src/java/org/apache/cassandra/tcm/Commit.java index f0146847f1..8871efa5b2 100644 --- a/src/java/org/apache/cassandra/tcm/Commit.java +++ b/src/java/org/apache/cassandra/tcm/Commit.java @@ -119,6 +119,9 @@ public class Commit static volatile Result.Serializer resultSerializerCache; public interface Result { + IVersionedSerializer<Result> defaultMessageSerializer = new Serializer(NodeVersion.CURRENT.serializationVersion()); + + LogState logState(); boolean isSuccess(); boolean isFailure(); @@ -131,7 +134,6 @@ public class Commit { return (Failure) this; } - IVersionedSerializer<Result> defaultMessageSerializer = new Serializer(NodeVersion.CURRENT.serializationVersion()); static IVersionedSerializer<Result> messageSerializer(Version version) { @@ -163,6 +165,12 @@ public class Commit '}'; } + @Override + public LogState logState() + { + return logState; + } + public boolean isSuccess() { return true; @@ -174,6 +182,16 @@ public class Commit } } + static Failure rejected(ExceptionCode exceptionCode, String reason, LogState logState) + { + return new Failure(exceptionCode, reason, logState, true); + } + + static Failure failed(ExceptionCode exceptionCode, String message) + { + return new Failure(exceptionCode, message, LogState.EMPTY, false); + } + final class Failure implements Result { public final ExceptionCode code; @@ -181,8 +199,9 @@ public class Commit // Rejection means that we were able to linearize the operation, // but it was rejected by the internal logic of the transformation. public final boolean rejected; + public final LogState logState; - public Failure(ExceptionCode code, String message, boolean rejected) + private Failure(ExceptionCode code, String message, LogState logState, boolean rejected) { if (message == null) message = ""; @@ -190,6 +209,7 @@ public class Commit // TypeSizes#sizeOf encoder only allows strings that are up to Short.MAX_VALUE bytes large this.message = message.substring(0, Math.min(message.length(), Short.MAX_VALUE)); this.rejected = rejected; + this.logState = logState; } @Override @@ -202,6 +222,12 @@ public class Commit '}'; } + @Override + public LogState logState() + { + return logState; + } + public boolean isSuccess() { return false; @@ -233,7 +259,7 @@ public class Commit { out.writeByte(SUCCESS); out.writeUnsignedVInt32(serializationVersion.asInt()); - LogState.metadataSerializer.serialize(t.success().logState, out, serializationVersion); + LogState.metadataSerializer.serialize(t.logState(), out, serializationVersion); Epoch.serializer.serialize(t.success().epoch, out, serializationVersion); } else @@ -243,6 +269,8 @@ public class Commit out.writeByte(failure.rejected ? REJECTED : FAILED); out.writeUnsignedVInt32(failure.code.value); out.writeUTF(failure.message); + out.writeUnsignedVInt32(serializationVersion.asInt()); + LogState.metadataSerializer.serialize(t.logState(), out, serializationVersion); } } @@ -259,8 +287,13 @@ public class Commit } else { - return new Failure(ExceptionCode.fromValue(in.readUnsignedVInt32()), - in.readUTF(), + ExceptionCode exceptionCode = ExceptionCode.fromValue(in.readUnsignedVInt32()); + String message = in.readUTF(); + Version deserializationVersion = Version.fromInt(in.readUnsignedVInt32()); + LogState delta = LogState.metadataSerializer.deserialize(in, deserializationVersion); + return new Failure(exceptionCode, + message, + delta, b == REJECTED); } } @@ -272,7 +305,7 @@ public class Commit if (t instanceof Success) { size += VIntCoding.computeUnsignedVIntSize(serializationVersion.asInt()); - size += LogState.metadataSerializer.serializedSize(t.success().logState, serializationVersion); + size += LogState.metadataSerializer.serializedSize(t.logState(), serializationVersion); size += Epoch.serializer.serializedSize(t.success().epoch, serializationVersion); } else @@ -280,6 +313,8 @@ public class Commit assert t instanceof Failure; size += VIntCoding.computeUnsignedVIntSize(((Failure) t).code.value); size += TypeSizes.sizeof(((Failure)t).message); + size += VIntCoding.computeUnsignedVIntSize(serializationVersion.asInt()); + size += LogState.metadataSerializer.serializedSize(t.logState(), serializationVersion); } return size; } diff --git a/src/java/org/apache/cassandra/tcm/RemoteProcessor.java b/src/java/org/apache/cassandra/tcm/RemoteProcessor.java index 06772a1ec8..260d151419 100644 --- a/src/java/org/apache/cassandra/tcm/RemoteProcessor.java +++ b/src/java/org/apache/cassandra/tcm/RemoteProcessor.java @@ -78,20 +78,26 @@ public final class RemoteProcessor implements Processor new CandidateIterator(candidates(false)), retryPolicy); + log.append(result.logState()); + if (result.isSuccess()) { Commit.Result.Success success = result.success(); - log.append(success.logState); log.awaitAtLeast(success.epoch); } + else + { + log.waitForHighestConsecutive(); + } return result; } catch (Exception e) { - return new Commit.Result.Failure(SERVER_ERROR, e.getMessage() == null - ? e.getClass().toString() - : e.getMessage(), false); + return Commit.Result.failed(SERVER_ERROR, + e.getMessage() == null + ? e.getClass().toString() + : e.getMessage()); } } diff --git a/test/distributed/org/apache/cassandra/distributed/test/tcm/LogReplicationSmokeTest.java b/test/distributed/org/apache/cassandra/distributed/test/tcm/LogReplicationTest.java similarity index 68% rename from test/distributed/org/apache/cassandra/distributed/test/tcm/LogReplicationSmokeTest.java rename to test/distributed/org/apache/cassandra/distributed/test/tcm/LogReplicationTest.java index 3d0b9a1894..fe35274162 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/tcm/LogReplicationSmokeTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/tcm/LogReplicationTest.java @@ -24,9 +24,11 @@ import java.util.Map; import java.util.Random; import java.util.Set; +import org.junit.Assert; import org.junit.Test; import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.api.ConsistencyLevel; import org.apache.cassandra.distributed.api.IInvokableInstance; import org.apache.cassandra.distributed.shared.ClusterUtils; import org.apache.cassandra.distributed.test.TestBaseImpl; @@ -42,7 +44,7 @@ import static org.apache.cassandra.distributed.api.Feature.NETWORK; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -public class LogReplicationSmokeTest extends TestBaseImpl +public class LogReplicationTest extends TestBaseImpl { @Test public void testRequestingPeerWatermarks() throws Throwable @@ -73,6 +75,40 @@ public class LogReplicationSmokeTest extends TestBaseImpl } } + @Test + public void testCatchUpOnRejection() throws Throwable + { + try (Cluster cluster = builder().withNodes(3) + .withConfig(config -> config.with(GOSSIP).with(NETWORK)) + .start()) + { + init(cluster); + IInvokableInstance cmsNode = cluster.get(1); + ClusterUtils.waitForCMSToQuiesce(cluster, cmsNode); + + cluster.coordinator(1).execute("CREATE KEYSPACE only_once WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};", + ConsistencyLevel.ONE); + + long cmsEpoch = cluster.get(1).callsOnInstance(() -> ClusterMetadata.current().epoch.getEpoch()).call(); + long epochBefore = cluster.get(2).callsOnInstance(() -> ClusterMetadata.current().epoch.getEpoch()).call(); + Assert.assertTrue(cmsEpoch > epochBefore); + // should get rejected + try + { + cluster.coordinator(2).execute("CREATE KEYSPACE only_once WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};", + ConsistencyLevel.ONE); + Assert.fail("Creation should have failed"); + } + catch (Throwable t) + { + Assert.assertTrue(t.getMessage().contains("Cannot add existing keyspace")); + System.out.println("t.getMessage() = " + t.getMessage()); + } + long epochAfter = cluster.get(2).callsOnInstance(() -> ClusterMetadata.current().epoch.getEpoch()).call(); + Assert.assertTrue(epochAfter > epochBefore); + } + } + private int getConsistentValue(Cluster cluster) { Set<Integer> values = new HashSet<>(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org