This is an automated email from the ASF dual-hosted git repository.

ifesdjeen pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 3e6a551dba TCM: Catch up committing node on rejection
3e6a551dba is described below

commit 3e6a551dbab6ecdc97b99f9ec3118316bfaf1802
Author: Alex Petrov <oleksandr.pet...@gmail.com>
AuthorDate: Thu Jan 11 14:18:46 2024 +0100

    TCM: Catch up committing node on rejection
    
    Patch by Alex Petrov; reviewed by Marcus Eriksson for CASSANDRA-19260
---
 .../cassandra/tcm/AbstractLocalProcessor.java      | 10 +++--
 src/java/org/apache/cassandra/tcm/Commit.java      | 47 +++++++++++++++++++---
 .../org/apache/cassandra/tcm/RemoteProcessor.java  | 14 +++++--
 ...ationSmokeTest.java => LogReplicationTest.java} | 38 ++++++++++++++++-
 4 files changed, 94 insertions(+), 15 deletions(-)

diff --git a/src/java/org/apache/cassandra/tcm/AbstractLocalProcessor.java 
b/src/java/org/apache/cassandra/tcm/AbstractLocalProcessor.java
index 2d00085f3d..a72b5b664f 100644
--- a/src/java/org/apache/cassandra/tcm/AbstractLocalProcessor.java
+++ b/src/java/org/apache/cassandra/tcm/AbstractLocalProcessor.java
@@ -18,6 +18,7 @@
 
 package org.apache.cassandra.tcm;
 
+import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
 
 import org.slf4j.Logger;
@@ -79,7 +80,7 @@ public abstract class AbstractLocalProcessor implements 
Processor
                 {
                     return maybeFailure(entryId,
                                         lastKnown,
-                                        () -> new 
Commit.Result.Failure(result.rejected().code, result.rejected().reason, true));
+                                        () -> 
Commit.Result.rejected(result.rejected().code, result.rejected().reason, 
toLogState(lastKnown)));
                 }
 
                 continue;
@@ -118,9 +119,10 @@ public abstract class AbstractLocalProcessor implements 
Processor
                 retryPolicy.maybeSleep();
             }
         }
-        return new Commit.Result.Failure(SERVER_ERROR,
-                                         String.format("Could not perform 
commit using the following retry stategy: %s", retryPolicy.tries),
-                                         false);
+        return Commit.Result.failed(SERVER_ERROR,
+                                    String.format("Could not perform commit 
after %d/%d tries. Time remaining: %dms",
+                                                  retryPolicy.tries, 
retryPolicy.maxTries,
+                                                  
TimeUnit.NANOSECONDS.toMillis(retryPolicy.remainingNanos())));
     }
 
     public Commit.Result maybeFailure(Entry.Id entryId, Epoch lastKnown, 
Supplier<Commit.Result.Failure> orElse)
diff --git a/src/java/org/apache/cassandra/tcm/Commit.java 
b/src/java/org/apache/cassandra/tcm/Commit.java
index f0146847f1..8871efa5b2 100644
--- a/src/java/org/apache/cassandra/tcm/Commit.java
+++ b/src/java/org/apache/cassandra/tcm/Commit.java
@@ -119,6 +119,9 @@ public class Commit
     static volatile Result.Serializer resultSerializerCache;
     public interface Result
     {
+        IVersionedSerializer<Result> defaultMessageSerializer = new 
Serializer(NodeVersion.CURRENT.serializationVersion());
+
+        LogState logState();
         boolean isSuccess();
         boolean isFailure();
 
@@ -131,7 +134,6 @@ public class Commit
         {
             return (Failure) this;
         }
-        IVersionedSerializer<Result> defaultMessageSerializer = new 
Serializer(NodeVersion.CURRENT.serializationVersion());
 
         static IVersionedSerializer<Result> messageSerializer(Version version)
         {
@@ -163,6 +165,12 @@ public class Commit
                        '}';
             }
 
+            @Override
+            public LogState logState()
+            {
+                return logState;
+            }
+
             public boolean isSuccess()
             {
                 return true;
@@ -174,6 +182,16 @@ public class Commit
             }
         }
 
+        static Failure rejected(ExceptionCode exceptionCode, String reason, 
LogState logState)
+        {
+            return new Failure(exceptionCode, reason, logState, true);
+        }
+
+        static Failure failed(ExceptionCode exceptionCode, String message)
+        {
+            return new Failure(exceptionCode, message, LogState.EMPTY, false);
+        }
+
         final class Failure implements Result
         {
             public final ExceptionCode code;
@@ -181,8 +199,9 @@ public class Commit
             // Rejection means that we were able to linearize the operation,
             // but it was rejected by the internal logic of the transformation.
             public final boolean rejected;
+            public final LogState logState;
 
-            public Failure(ExceptionCode code, String message, boolean 
rejected)
+            private Failure(ExceptionCode code, String message, LogState 
logState, boolean rejected)
             {
                 if (message == null)
                     message = "";
@@ -190,6 +209,7 @@ public class Commit
                 // TypeSizes#sizeOf encoder only allows strings that are up to 
Short.MAX_VALUE bytes large
                 this.message =  message.substring(0, 
Math.min(message.length(), Short.MAX_VALUE));
                 this.rejected = rejected;
+                this.logState = logState;
             }
 
             @Override
@@ -202,6 +222,12 @@ public class Commit
                        '}';
             }
 
+            @Override
+            public LogState logState()
+            {
+                return logState;
+            }
+
             public boolean isSuccess()
             {
                 return false;
@@ -233,7 +259,7 @@ public class Commit
                 {
                     out.writeByte(SUCCESS);
                     out.writeUnsignedVInt32(serializationVersion.asInt());
-                    
LogState.metadataSerializer.serialize(t.success().logState, out, 
serializationVersion);
+                    LogState.metadataSerializer.serialize(t.logState(), out, 
serializationVersion);
                     Epoch.serializer.serialize(t.success().epoch, out, 
serializationVersion);
                 }
                 else
@@ -243,6 +269,8 @@ public class Commit
                     out.writeByte(failure.rejected ? REJECTED : FAILED);
                     out.writeUnsignedVInt32(failure.code.value);
                     out.writeUTF(failure.message);
+                    out.writeUnsignedVInt32(serializationVersion.asInt());
+                    LogState.metadataSerializer.serialize(t.logState(), out, 
serializationVersion);
                 }
             }
 
@@ -259,8 +287,13 @@ public class Commit
                 }
                 else
                 {
-                    return new 
Failure(ExceptionCode.fromValue(in.readUnsignedVInt32()),
-                                       in.readUTF(),
+                    ExceptionCode exceptionCode = 
ExceptionCode.fromValue(in.readUnsignedVInt32());
+                    String message = in.readUTF();
+                    Version deserializationVersion = 
Version.fromInt(in.readUnsignedVInt32());
+                    LogState delta = 
LogState.metadataSerializer.deserialize(in, deserializationVersion);
+                    return new Failure(exceptionCode,
+                                       message,
+                                       delta,
                                        b == REJECTED);
                 }
             }
@@ -272,7 +305,7 @@ public class Commit
                 if (t instanceof Success)
                 {
                     size += 
VIntCoding.computeUnsignedVIntSize(serializationVersion.asInt());
-                    size += 
LogState.metadataSerializer.serializedSize(t.success().logState, 
serializationVersion);
+                    size += 
LogState.metadataSerializer.serializedSize(t.logState(), serializationVersion);
                     size += Epoch.serializer.serializedSize(t.success().epoch, 
serializationVersion);
                 }
                 else
@@ -280,6 +313,8 @@ public class Commit
                     assert t instanceof Failure;
                     size += VIntCoding.computeUnsignedVIntSize(((Failure) 
t).code.value);
                     size += TypeSizes.sizeof(((Failure)t).message);
+                    size += 
VIntCoding.computeUnsignedVIntSize(serializationVersion.asInt());
+                    size += 
LogState.metadataSerializer.serializedSize(t.logState(), serializationVersion);
                 }
                 return size;
             }
diff --git a/src/java/org/apache/cassandra/tcm/RemoteProcessor.java 
b/src/java/org/apache/cassandra/tcm/RemoteProcessor.java
index 06772a1ec8..260d151419 100644
--- a/src/java/org/apache/cassandra/tcm/RemoteProcessor.java
+++ b/src/java/org/apache/cassandra/tcm/RemoteProcessor.java
@@ -78,20 +78,26 @@ public final class RemoteProcessor implements Processor
                                                     new 
CandidateIterator(candidates(false)),
                                                     retryPolicy);
 
+            log.append(result.logState());
+
             if (result.isSuccess())
             {
                 Commit.Result.Success success = result.success();
-                log.append(success.logState);
                 log.awaitAtLeast(success.epoch);
             }
+            else
+            {
+                log.waitForHighestConsecutive();
+            }
 
             return result;
         }
         catch (Exception e)
         {
-            return new Commit.Result.Failure(SERVER_ERROR, e.getMessage() == 
null
-                                                           ? 
e.getClass().toString()
-                                                           : e.getMessage(), 
false);
+            return Commit.Result.failed(SERVER_ERROR,
+                                        e.getMessage() == null
+                                        ? e.getClass().toString()
+                                        : e.getMessage());
         }
     }
 
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/tcm/LogReplicationSmokeTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/tcm/LogReplicationTest.java
similarity index 68%
rename from 
test/distributed/org/apache/cassandra/distributed/test/tcm/LogReplicationSmokeTest.java
rename to 
test/distributed/org/apache/cassandra/distributed/test/tcm/LogReplicationTest.java
index 3d0b9a1894..fe35274162 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/tcm/LogReplicationSmokeTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/tcm/LogReplicationTest.java
@@ -24,9 +24,11 @@ import java.util.Map;
 import java.util.Random;
 import java.util.Set;
 
+import org.junit.Assert;
 import org.junit.Test;
 
 import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
 import org.apache.cassandra.distributed.api.IInvokableInstance;
 import org.apache.cassandra.distributed.shared.ClusterUtils;
 import org.apache.cassandra.distributed.test.TestBaseImpl;
@@ -42,7 +44,7 @@ import static 
org.apache.cassandra.distributed.api.Feature.NETWORK;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
-public class LogReplicationSmokeTest extends TestBaseImpl
+public class LogReplicationTest extends TestBaseImpl
 {
     @Test
     public void testRequestingPeerWatermarks() throws Throwable
@@ -73,6 +75,40 @@ public class LogReplicationSmokeTest extends TestBaseImpl
         }
     }
 
+    @Test
+    public void testCatchUpOnRejection() throws Throwable
+    {
+        try (Cluster cluster = builder().withNodes(3)
+                                        .withConfig(config -> 
config.with(GOSSIP).with(NETWORK))
+                                        .start())
+        {
+            init(cluster);
+            IInvokableInstance cmsNode = cluster.get(1);
+            ClusterUtils.waitForCMSToQuiesce(cluster, cmsNode);
+
+            cluster.coordinator(1).execute("CREATE KEYSPACE only_once WITH 
replication = {'class': 'SimpleStrategy', 'replication_factor': 1};",
+                                           ConsistencyLevel.ONE);
+
+            long cmsEpoch = cluster.get(1).callsOnInstance(() -> 
ClusterMetadata.current().epoch.getEpoch()).call();
+            long epochBefore = cluster.get(2).callsOnInstance(() -> 
ClusterMetadata.current().epoch.getEpoch()).call();
+            Assert.assertTrue(cmsEpoch > epochBefore);
+            // should get rejected
+            try
+            {
+                cluster.coordinator(2).execute("CREATE KEYSPACE only_once WITH 
replication = {'class': 'SimpleStrategy', 'replication_factor': 1};",
+                                               ConsistencyLevel.ONE);
+                Assert.fail("Creation should have failed");
+            }
+            catch (Throwable t)
+            {
+                Assert.assertTrue(t.getMessage().contains("Cannot add existing 
keyspace"));
+                System.out.println("t.getMessage() = " + t.getMessage());
+            }
+            long epochAfter = cluster.get(2).callsOnInstance(() -> 
ClusterMetadata.current().epoch.getEpoch()).call();
+            Assert.assertTrue(epochAfter > epochBefore);
+        }
+    }
+
     private int getConsistentValue(Cluster cluster)
     {
         Set<Integer> values = new HashSet<>();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to