This is an automated email from the ASF dual-hosted git repository.

marcuse pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 67139d5c33 Raise priority of TCM internode messages during critical 
operations
67139d5c33 is described below

commit 67139d5c334e58fdc8d9f09f9288155448666cd3
Author: Alex Petrov <oleksandr.pet...@gmail.com>
AuthorDate: Tue Mar 26 11:52:17 2024 +0100

    Raise priority of TCM internode messages during critical operations
    
    Patch by Alex Petrov; reviewed by Sam Tunnicliffe and marcuse for 
CASSANDRA-19517.
---
 CHANGES.txt                                        |  1 +
 src/java/org/apache/cassandra/net/Message.java     | 21 +++++-
 src/java/org/apache/cassandra/net/MessageFlag.java |  4 +-
 .../org/apache/cassandra/net/MessagingService.java |  8 +++
 .../apache/cassandra/net/OutboundConnections.java  |  7 +-
 src/java/org/apache/cassandra/net/Verb.java        | 30 ++++-----
 .../org/apache/cassandra/schema/Keyspaces.java     |  9 +++
 .../org/apache/cassandra/service/paxos/Paxos.java  |  5 ++
 .../cassandra/service/paxos/PaxosCommit.java       |  8 ++-
 .../service/paxos/PaxosCommitAndPrepare.java       |  4 +-
 .../cassandra/service/paxos/PaxosPrepare.java      |  2 +-
 .../service/paxos/PaxosPrepareRefresh.java         |  2 +-
 .../cassandra/service/paxos/PaxosPropose.java      |  2 +-
 .../cassandra/service/paxos/PaxosRepair.java       |  3 +-
 .../service/paxos/cleanup/PaxosCleanup.java        | 11 ++--
 .../paxos/cleanup/PaxosCleanupComplete.java        |  8 ++-
 .../service/paxos/cleanup/PaxosCleanupRequest.java | 12 ++--
 .../service/paxos/cleanup/PaxosCleanupSession.java |  6 +-
 .../paxos/cleanup/PaxosFinishPrepareCleanup.java   |  4 +-
 .../paxos/cleanup/PaxosStartPrepareCleanup.java    |  4 +-
 .../cassandra/distributed/impl/Instance.java       | 13 ++++
 .../test/ring/CMSUrgentMessagesTest.java           | 74 ++++++++++++++++++++++
 22 files changed, 191 insertions(+), 47 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 12787c4fb3..fb1ae2f8bd 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 5.1
+ * Raise priority of TCM internode messages during critical operations 
(CASSANDRA-19517)
  * Add nodetool command to unregister LEFT nodes (CASSANDRA-19581)
  * Add cluster metadata id to gossip syn messages (CASSANDRA-19613)
  * Reduce heap usage occupied by the metrics (CASSANDRA-19567)
diff --git a/src/java/org/apache/cassandra/net/Message.java 
b/src/java/org/apache/cassandra/net/Message.java
index b0a0b9c48d..7d7799a186 100644
--- a/src/java/org/apache/cassandra/net/Message.java
+++ b/src/java/org/apache/cassandra/net/Message.java
@@ -217,6 +217,15 @@ public class Message<T>
         return outWithParam(nextId(), verb, expiresAtNanos, payload, 0, null, 
null);
     }
 
+    public static <T> Message<T> out(Verb verb, T payload, boolean isUrgent)
+    {
+        assert !verb.isResponse();
+        if (isUrgent)
+            return outWithFlag(verb, payload,  MessageFlag.URGENT);
+        else
+            return out(verb, payload);
+    }
+
     public static <T> Message<T> outWithFlag(Verb verb, T payload, MessageFlag 
flag)
     {
         assert !verb.isResponse();
@@ -305,7 +314,10 @@ public class Message<T>
     /** Builds a response Message with provided payload, and all the right 
fields inferred from request Message */
     public <T> Message<T> responseWith(T payload)
     {
-        return outWithParam(id(), verb().responseVerb, expiresAtNanos(), 
payload, null, null);
+        Message<T> msg = outWithParam(id(), verb().responseVerb, 
expiresAtNanos(), payload, null, null);
+        if (header.hasFlag(MessageFlag.URGENT))
+            msg = msg.withFlag(MessageFlag.URGENT);
+        return msg;
     }
 
     /** Builds a response Message with no payload, and all the right fields 
inferred from request Message */
@@ -485,6 +497,11 @@ public class Message<T>
             this.params = params;
         }
 
+        public boolean hasFlag(MessageFlag messageFlag)
+        {
+            return messageFlag.isIn(flags);
+        }
+
         Header withFrom(InetAddressAndPort from)
         {
             return new Header(id, epoch, verb, from, createdAtNanos, 
expiresAtNanos, flags, params);
@@ -936,7 +953,7 @@ public class Message<T>
             serializeParams(header.params, out, version);
         }
 
-        private Header deserializeHeader(DataInputPlus in, InetAddressAndPort 
peer, int version) throws IOException
+        public Header deserializeHeader(DataInputPlus in, InetAddressAndPort 
peer, int version) throws IOException
         {
             long id = in.readUnsignedVInt();
             Epoch epoch = Epoch.EMPTY;
diff --git a/src/java/org/apache/cassandra/net/MessageFlag.java 
b/src/java/org/apache/cassandra/net/MessageFlag.java
index 441b06b6a3..1c2db557c3 100644
--- a/src/java/org/apache/cassandra/net/MessageFlag.java
+++ b/src/java/org/apache/cassandra/net/MessageFlag.java
@@ -29,7 +29,9 @@ public enum MessageFlag
     /** track repaired data - see CASSANDRA-14145 */
     TRACK_REPAIRED_DATA  (1),
     /** allow creating warnings or aborting queries based off query - see 
CASSANDRA-16850 */
-    TRACK_WARNINGS(2);
+    TRACK_WARNINGS(2),
+    /** whether this message should be sent on an URGENT channel despite its 
Verb default priority */
+    URGENT(3);
 
     private final int id;
 
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java 
b/src/java/org/apache/cassandra/net/MessagingService.java
index d1e2f7b260..47c4830d20 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -494,6 +494,14 @@ public class MessagingService extends 
MessagingServiceMBeanImpl implements Messa
         return future;
     }
 
+    public void respondWithFailure(RequestFailureReason reason, Message<?> 
message)
+    {
+        Message<?> r = Message.failureResponse(message.id(), 
message.expiresAtNanos(), reason);
+        if (r.header.hasFlag(MessageFlag.URGENT))
+            r = r.withFlag(MessageFlag.URGENT);
+        send(r, message.respondTo());
+    }
+
     public void send(Message message, InetAddressAndPort to, ConnectionType 
specifyConnection)
     {
         if (isShuttingDown)
diff --git a/src/java/org/apache/cassandra/net/OutboundConnections.java 
b/src/java/org/apache/cassandra/net/OutboundConnections.java
index aacc2b4473..80c66e0595 100644
--- a/src/java/org/apache/cassandra/net/OutboundConnections.java
+++ b/src/java/org/apache/cassandra/net/OutboundConnections.java
@@ -229,9 +229,10 @@ public class OutboundConnections
             return LARGE_MESSAGES;
         }
 
-        return msg.verb().priority == Verb.Priority.P0
-               ? URGENT_MESSAGES
-               : SMALL_MESSAGES;
+        if (msg.verb().priority == Verb.Priority.P0 || 
msg.header.hasFlag(MessageFlag.URGENT))
+            return URGENT_MESSAGES;
+        else
+            return SMALL_MESSAGES;
     }
 
     @VisibleForTesting
diff --git a/src/java/org/apache/cassandra/net/Verb.java 
b/src/java/org/apache/cassandra/net/Verb.java
index a307185ecc..17c4550fc0 100644
--- a/src/java/org/apache/cassandra/net/Verb.java
+++ b/src/java/org/apache/cassandra/net/Verb.java
@@ -224,21 +224,21 @@ public enum Verb
     PAXOS2_CLEANUP_COMPLETE_REQ      (48, P2, repairTimeout, PAXOS_REPAIR,     
 () -> PaxosCleanupComplete.serializer,         () -> 
PaxosCleanupComplete.verbHandler,                      
PAXOS2_CLEANUP_COMPLETE_RSP      ),
 
     // transactional cluster metadata
-    TCM_COMMIT_RSP         (801, P1, rpcTimeout,      INTERNAL_METADATA,    
MessageSerializers::commitResultSerializer,         () -> 
ResponseVerbHandler.instance                                 ),
-    TCM_COMMIT_REQ         (802, P1, rpcTimeout,      INTERNAL_METADATA,    
MessageSerializers::commitSerializer,               () -> 
commitRequestHandler(),               TCM_COMMIT_RSP         ),
-    TCM_FETCH_CMS_LOG_RSP  (803, P1, rpcTimeout,      FETCH_LOG,            
MessageSerializers::logStateSerializer,             () -> 
ResponseVerbHandler.instance                                 ),
-    TCM_FETCH_CMS_LOG_REQ  (804, P1, rpcTimeout,      FETCH_LOG,            () 
-> FetchCMSLog.serializer,                       () -> 
fetchLogRequestHandler(),             TCM_FETCH_CMS_LOG_RSP  ),
-    TCM_REPLICATION        (805, P1, rpcTimeout,      INTERNAL_METADATA,    
MessageSerializers::logStateSerializer,             () -> replicationHandler()  
                                       ),
-    TCM_NOTIFY_RSP         (806, P1, rpcTimeout,      INTERNAL_METADATA,    () 
-> Epoch.messageSerializer,                      () -> 
ResponseVerbHandler.instance                                 ),
-    TCM_NOTIFY_REQ         (807, P1, rpcTimeout,      INTERNAL_METADATA,    
MessageSerializers::logStateSerializer,             () -> logNotifyHandler(),   
                TCM_NOTIFY_RSP         ),
-    TCM_CURRENT_EPOCH_REQ  (808, P1, rpcTimeout,      INTERNAL_METADATA,    () 
-> Epoch.messageSerializer,                      () -> 
currentEpochRequestHandler(),         TCM_NOTIFY_RSP         ),
-    TCM_INIT_MIG_RSP       (809, P1, rpcTimeout,      INTERNAL_METADATA,    
MessageSerializers::metadataHolderSerializer,       () -> 
ResponseVerbHandler.instance                                 ),
-    TCM_INIT_MIG_REQ       (810, P1, rpcTimeout,      INTERNAL_METADATA,    () 
-> Election.Initiator.serializer,                () -> 
Election.instance.prepareHandler,     TCM_INIT_MIG_RSP       ),
-    TCM_ABORT_MIG          (811, P1, rpcTimeout,      INTERNAL_METADATA,    () 
-> Election.Initiator.serializer,                () -> 
Election.instance.abortHandler,       TCM_INIT_MIG_RSP       ),
-    TCM_DISCOVER_RSP       (812, P1, rpcTimeout,      INTERNAL_METADATA,    () 
-> Discovery.serializer,                         () -> 
ResponseVerbHandler.instance                                 ),
-    TCM_DISCOVER_REQ       (813, P1, rpcTimeout,      INTERNAL_METADATA,    () 
-> NoPayload.serializer,                         () -> 
Discovery.instance.requestHandler,    TCM_DISCOVER_RSP       ),
-    TCM_FETCH_PEER_LOG_RSP (818, P1, rpcTimeout,      FETCH_LOG,            
MessageSerializers::logStateSerializer,             () -> 
ResponseVerbHandler.instance                                 ),
-    TCM_FETCH_PEER_LOG_REQ (819, P1, rpcTimeout,      FETCH_LOG,            () 
-> FetchPeerLog.serializer,                      () -> 
FetchPeerLog.Handler.instance,        TCM_FETCH_PEER_LOG_RSP ),
+    TCM_COMMIT_RSP         (801, P0, rpcTimeout,      INTERNAL_METADATA,    
MessageSerializers::commitResultSerializer,         () -> 
ResponseVerbHandler.instance                                 ),
+    TCM_COMMIT_REQ         (802, P0, rpcTimeout,      INTERNAL_METADATA,    
MessageSerializers::commitSerializer,               () -> 
commitRequestHandler(),               TCM_COMMIT_RSP         ),
+    TCM_FETCH_CMS_LOG_RSP  (803, P0, rpcTimeout,      FETCH_LOG,            
MessageSerializers::logStateSerializer,             () -> 
ResponseVerbHandler.instance                                 ),
+    TCM_FETCH_CMS_LOG_REQ  (804, P0, rpcTimeout,      FETCH_LOG,            () 
-> FetchCMSLog.serializer,                       () -> 
fetchLogRequestHandler(),             TCM_FETCH_CMS_LOG_RSP  ),
+    TCM_REPLICATION        (805, P0, rpcTimeout,      INTERNAL_METADATA,    
MessageSerializers::logStateSerializer,             () -> replicationHandler()  
                                       ),
+    TCM_NOTIFY_RSP         (806, P0, rpcTimeout,      INTERNAL_METADATA,    () 
-> Epoch.messageSerializer,                      () -> 
ResponseVerbHandler.instance                                 ),
+    TCM_NOTIFY_REQ         (807, P0, rpcTimeout,      INTERNAL_METADATA,    
MessageSerializers::logStateSerializer,             () -> logNotifyHandler(),   
                TCM_NOTIFY_RSP         ),
+    TCM_CURRENT_EPOCH_REQ  (808, P0, rpcTimeout,      INTERNAL_METADATA,    () 
-> Epoch.messageSerializer,                      () -> 
currentEpochRequestHandler(),         TCM_NOTIFY_RSP         ),
+    TCM_INIT_MIG_RSP       (809, P0, rpcTimeout,      INTERNAL_METADATA,    
MessageSerializers::metadataHolderSerializer,       () -> 
ResponseVerbHandler.instance                                 ),
+    TCM_INIT_MIG_REQ       (810, P0, rpcTimeout,      INTERNAL_METADATA,    () 
-> Election.Initiator.serializer,                () -> 
Election.instance.prepareHandler,     TCM_INIT_MIG_RSP       ),
+    TCM_ABORT_MIG          (811, P0, rpcTimeout,      INTERNAL_METADATA,    () 
-> Election.Initiator.serializer,                () -> 
Election.instance.abortHandler,       TCM_INIT_MIG_RSP       ),
+    TCM_DISCOVER_RSP       (812, P0, rpcTimeout,      INTERNAL_METADATA,    () 
-> Discovery.serializer,                         () -> 
ResponseVerbHandler.instance                                 ),
+    TCM_DISCOVER_REQ       (813, P0, rpcTimeout,      INTERNAL_METADATA,    () 
-> NoPayload.serializer,                         () -> 
Discovery.instance.requestHandler,    TCM_DISCOVER_RSP       ),
+    TCM_FETCH_PEER_LOG_RSP (818, P0, rpcTimeout,      FETCH_LOG,            
MessageSerializers::logStateSerializer,             () -> 
ResponseVerbHandler.instance                                 ),
+    TCM_FETCH_PEER_LOG_REQ (819, P0, rpcTimeout,      FETCH_LOG,            () 
-> FetchPeerLog.serializer,                      () -> 
FetchPeerLog.Handler.instance,        TCM_FETCH_PEER_LOG_RSP ),
 
     INITIATE_DATA_MOVEMENTS_RSP (814, P1, rpcTimeout, MISC, () -> 
NoPayload.serializer,             () -> ResponseVerbHandler.instance            
                      ),
     INITIATE_DATA_MOVEMENTS_REQ (815, P1, rpcTimeout, MISC, () -> 
DataMovement.serializer,          () -> DataMovementVerbHandler.instance, 
INITIATE_DATA_MOVEMENTS_RSP ),
diff --git a/src/java/org/apache/cassandra/schema/Keyspaces.java 
b/src/java/org/apache/cassandra/schema/Keyspaces.java
index 3f5431da1a..151d5a5db0 100644
--- a/src/java/org/apache/cassandra/schema/Keyspaces.java
+++ b/src/java/org/apache/cassandra/schema/Keyspaces.java
@@ -121,6 +121,15 @@ public final class Keyspaces implements 
Iterable<KeyspaceMetadata>
         return tables.get(id);
     }
 
+    public KeyspaceMetadata getContainingKeyspaceMetadata(TableId tableId)
+    {
+        TableMetadata tableMetadata = getTableOrViewNullable(tableId);
+        if (tableMetadata == null)
+            throw new IllegalStateException("Can't find table " + tableId);
+
+        return keyspaces.get(tableMetadata.keyspace);
+    }
+
     public boolean isEmpty()
     {
         return keyspaces.isEmpty();
diff --git a/src/java/org/apache/cassandra/service/paxos/Paxos.java 
b/src/java/org/apache/cassandra/service/paxos/Paxos.java
index fed3cc0785..1a76cfd972 100644
--- a/src/java/org/apache/cassandra/service/paxos/Paxos.java
+++ b/src/java/org/apache/cassandra/service/paxos/Paxos.java
@@ -535,6 +535,11 @@ public class Paxos
         {
             throw new UnsupportedOperationException();
         }
+
+        public boolean isUrgent()
+        {
+            return keyspace.getMetadata().params.replication.isMeta();
+        }
     }
 
     /**
diff --git a/src/java/org/apache/cassandra/service/paxos/PaxosCommit.java 
b/src/java/org/apache/cassandra/service/paxos/PaxosCommit.java
index 1821ec4004..943b04c30c 100644
--- a/src/java/org/apache/cassandra/service/paxos/PaxosCommit.java
+++ b/src/java/org/apache/cassandra/service/paxos/PaxosCommit.java
@@ -175,9 +175,11 @@ public class PaxosCommit<OnDone extends Consumer<? super 
PaxosCommit.Status>> ex
     void start(Participants participants, boolean async)
     {
         boolean executeOnSelf = false;
-        Message<Agreed> commitMessage = Message.out(PAXOS_COMMIT_REQ, commit);
-        Message<Mutation> mutationMessage = ENABLE_DC_LOCAL_COMMIT && 
consistencyForConsensus.isDatacenterLocal()
-                                            ? 
Message.out(PAXOS2_COMMIT_REMOTE_REQ, commit.makeMutation()) : null;
+        Message<Agreed> commitMessage = Message.out(PAXOS_COMMIT_REQ, commit, 
participants.isUrgent());
+
+        Message<Mutation> mutationMessage = null;
+        if (ENABLE_DC_LOCAL_COMMIT && 
consistencyForConsensus.isDatacenterLocal())
+            mutationMessage = Message.out(PAXOS2_COMMIT_REMOTE_REQ, 
commit.makeMutation(), participants.isUrgent());
 
         for (int i = 0, mi = participants.allLive.size(); i < mi ; ++i)
             executeOnSelf |= isSelfOrSend(commitMessage, mutationMessage, 
participants.allLive.endpoint(i));
diff --git 
a/src/java/org/apache/cassandra/service/paxos/PaxosCommitAndPrepare.java 
b/src/java/org/apache/cassandra/service/paxos/PaxosCommitAndPrepare.java
index 7a9dba7b5a..7046dfbb37 100644
--- a/src/java/org/apache/cassandra/service/paxos/PaxosCommitAndPrepare.java
+++ b/src/java/org/apache/cassandra/service/paxos/PaxosCommitAndPrepare.java
@@ -49,8 +49,8 @@ public class PaxosCommitAndPrepare
         PaxosPrepare prepare = new PaxosPrepare(participants, request, 
acceptEarlyReadSuccess, null);
 
         Tracing.trace("Committing {}; Preparing {}", commit.ballot, ballot);
-        Message<Request> message = Message.out(PAXOS2_COMMIT_AND_PREPARE_REQ, 
request);
-//                
.permitsArtificialDelay(participants.consistencyForConsensus);
+        Message<Request> message = Message.out(PAXOS2_COMMIT_AND_PREPARE_REQ, 
request, participants.isUrgent());
+
         start(prepare, participants, message, RequestHandler::execute);
         return prepare;
     }
diff --git a/src/java/org/apache/cassandra/service/paxos/PaxosPrepare.java 
b/src/java/org/apache/cassandra/service/paxos/PaxosPrepare.java
index 7d0c114299..78f8a7a903 100644
--- a/src/java/org/apache/cassandra/service/paxos/PaxosPrepare.java
+++ b/src/java/org/apache/cassandra/service/paxos/PaxosPrepare.java
@@ -358,7 +358,7 @@ public class PaxosPrepare extends 
PaxosRequestCallback<PaxosPrepare.Response> im
     private static PaxosPrepare prepareWithBallotInternal(Participants 
participants, Request request, boolean acceptEarlyReadPermission, 
Consumer<Status> onDone)
     {
         PaxosPrepare prepare = new PaxosPrepare(participants, request, 
acceptEarlyReadPermission, onDone);
-        Message<Request> message = Message.out(PAXOS2_PREPARE_REQ, request);
+        Message<Request> message = Message.out(PAXOS2_PREPARE_REQ, request, 
participants.isUrgent());
         start(prepare, participants, message, RequestHandler::execute);
         return prepare;
     }
diff --git 
a/src/java/org/apache/cassandra/service/paxos/PaxosPrepareRefresh.java 
b/src/java/org/apache/cassandra/service/paxos/PaxosPrepareRefresh.java
index 19aff7428a..925daaf9dd 100644
--- a/src/java/org/apache/cassandra/service/paxos/PaxosPrepareRefresh.java
+++ b/src/java/org/apache/cassandra/service/paxos/PaxosPrepareRefresh.java
@@ -75,7 +75,7 @@ public class PaxosPrepareRefresh implements 
RequestCallbackWithFailure<PaxosPrep
     public PaxosPrepareRefresh(Ballot prepared, Paxos.Participants 
participants, Committed latestCommitted, Callbacks callbacks)
     {
         this.callbacks = callbacks;
-        this.send = Message.out(PAXOS2_PREPARE_REFRESH_REQ, new 
Request(prepared, latestCommitted));
+        this.send = Message.out(PAXOS2_PREPARE_REFRESH_REQ, new 
Request(prepared, latestCommitted), participants.isUrgent());
     }
 
     void refresh(List<InetAddressAndPort> refresh)
diff --git a/src/java/org/apache/cassandra/service/paxos/PaxosPropose.java 
b/src/java/org/apache/cassandra/service/paxos/PaxosPropose.java
index 57d3459f40..db702af7d4 100644
--- a/src/java/org/apache/cassandra/service/paxos/PaxosPropose.java
+++ b/src/java/org/apache/cassandra/service/paxos/PaxosPropose.java
@@ -207,7 +207,7 @@ public class PaxosPropose<OnDone extends Consumer<? super 
PaxosPropose.Status>>
 
     void start(Paxos.Participants participants)
     {
-        Message<Request> message = Message.out(PAXOS2_PROPOSE_REQ, new 
Request(proposal));
+        Message<Request> message = Message.out(PAXOS2_PROPOSE_REQ, new 
Request(proposal), participants.isUrgent());
 
         boolean executeOnSelf = false;
         for (int i = 0, size = participants.sizeOfPoll(); i < size ; ++i)
diff --git a/src/java/org/apache/cassandra/service/paxos/PaxosRepair.java 
b/src/java/org/apache/cassandra/service/paxos/PaxosRepair.java
index cc6bc72325..0e3a732185 100644
--- a/src/java/org/apache/cassandra/service/paxos/PaxosRepair.java
+++ b/src/java/org/apache/cassandra/service/paxos/PaxosRepair.java
@@ -288,7 +288,8 @@ public class PaxosRepair extends AbstractPaxosRepair
 
         public void run()
         {
-            Message<Request> message = Message.out(PAXOS2_REPAIR_REQ, new 
Request(partitionKey(), table));
+            Message<Request> message = Message.out(PAXOS2_REPAIR_REQ, new 
Request(partitionKey(), table), participants.isUrgent());
+
             for (int i = 0, size = participants.sizeOfPoll(); i < size ; ++i)
                 MessagingService.instance().sendWithCallback(message, 
participants.voter(i), this);
         }
diff --git 
a/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanup.java 
b/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanup.java
index feaa64bd1e..331adef64f 100644
--- a/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanup.java
+++ b/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanup.java
@@ -33,6 +33,7 @@ import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.gms.EndpointState;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.repair.SharedContext;
+import org.apache.cassandra.schema.Keyspaces;
 import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.schema.TableMetadata;
@@ -55,6 +56,7 @@ public class PaxosCleanup extends AsyncFuture<Void> 
implements Runnable
     private final Collection<Range<Token>> ranges;
     private final boolean skippedReplicas;
     private final Executor executor;
+    private final boolean isUrgent;
 
     // references kept for debugging
     private PaxosStartPrepareCleanup startPrepare;
@@ -70,6 +72,7 @@ public class PaxosCleanup extends AsyncFuture<Void> 
implements Runnable
         this.ranges = ranges;
         this.skippedReplicas = skippedReplicas;
         this.executor = executor;
+        this.isUrgent = 
Keyspace.open(table.keyspace).getMetadata().params.replication.isMeta();
     }
 
     private <T> void addCallback(Future<T> future, Consumer<T> onComplete)
@@ -87,28 +90,28 @@ public class PaxosCleanup extends AsyncFuture<Void> 
implements Runnable
     public void run()
     {
         EndpointState localEpState = 
ctx.gossiper().getEndpointStateForEndpoint(ctx.broadcastAddressAndPort());
-        startPrepare = PaxosStartPrepareCleanup.prepare(ctx, table.id, 
endpoints, localEpState, ranges);
+        startPrepare = PaxosStartPrepareCleanup.prepare(ctx, table.id, 
endpoints, localEpState, ranges, isUrgent);
         addCallback(startPrepare, this::finishPrepare);
     }
 
     private void finishPrepare(PaxosCleanupHistory result)
     {
         ctx.nonPeriodicTasks().schedule(() -> {
-            finishPrepare = PaxosFinishPrepareCleanup.finish(ctx, endpoints, 
result);
+            finishPrepare = PaxosFinishPrepareCleanup.finish(ctx, endpoints, 
isUrgent, result);
             addCallback(finishPrepare, (v) -> startSession(result.highBound));
         }, Math.min(getCasContentionTimeout(MILLISECONDS), 
getWriteRpcTimeout(MILLISECONDS)), MILLISECONDS);
     }
 
     private void startSession(Ballot lowBound)
     {
-        session = new PaxosCleanupSession(ctx, endpoints, table.id, ranges);
+        session = new PaxosCleanupSession(ctx, endpoints, table.id, ranges, 
isUrgent);
         addCallback(session, (v) -> finish(lowBound));
         executor.execute(session);
     }
 
     private void finish(Ballot lowBound)
     {
-        complete = new PaxosCleanupComplete(ctx, endpoints, table.id, ranges, 
lowBound, skippedReplicas);
+        complete = new PaxosCleanupComplete(ctx, endpoints, table.id, ranges, 
lowBound, skippedReplicas, isUrgent);
         addCallback(complete, this::trySuccess);
         executor.execute(complete);
     }
diff --git 
a/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanupComplete.java 
b/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanupComplete.java
index 8742af84e0..c0c9a7e1d4 100644
--- 
a/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanupComplete.java
+++ 
b/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanupComplete.java
@@ -52,8 +52,9 @@ public class PaxosCleanupComplete extends AsyncFuture<Void> 
implements RequestCa
     final Ballot lowBound;
     final boolean skippedReplicas;
     private final SharedContext ctx;
+    private final boolean isUrgent;
 
-    PaxosCleanupComplete(SharedContext ctx, Collection<InetAddressAndPort> 
endpoints, TableId tableId, Collection<Range<Token>> ranges, Ballot lowBound, 
boolean skippedReplicas)
+    PaxosCleanupComplete(SharedContext ctx, Collection<InetAddressAndPort> 
endpoints, TableId tableId, Collection<Range<Token>> ranges, Ballot lowBound, 
boolean skippedReplicas, boolean isUrgent)
     {
         this.ctx = ctx;
         this.waitingResponse = new HashSet<>(endpoints);
@@ -61,13 +62,16 @@ public class PaxosCleanupComplete extends AsyncFuture<Void> 
implements RequestCa
         this.ranges = ranges;
         this.lowBound = lowBound;
         this.skippedReplicas = skippedReplicas;
+        this.isUrgent = isUrgent;
     }
 
     public synchronized void run()
     {
         Request request = !skippedReplicas ? new Request(tableId, lowBound, 
ranges)
                                            : new Request(tableId, 
Ballot.none(), Collections.emptyList());
-        Message<Request> message = Message.out(PAXOS2_CLEANUP_COMPLETE_REQ, 
request);
+
+        Message<Request> message = Message.out(PAXOS2_CLEANUP_COMPLETE_REQ, 
request, isUrgent);
+
         for (InetAddressAndPort endpoint : waitingResponse)
             ctx.messaging().sendWithCallback(message, endpoint, this);
     }
diff --git 
a/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanupRequest.java 
b/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanupRequest.java
index 2eaeaf2537..6d3fb731ea 100644
--- 
a/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanupRequest.java
+++ 
b/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanupRequest.java
@@ -38,6 +38,7 @@ import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessageFlag;
 import org.apache.cassandra.repair.SharedContext;
 import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.tcm.ClusterMetadata;
@@ -75,6 +76,7 @@ public class PaxosCleanupRequest
         return in -> {
             PaxosCleanupRequest request = in.payload;
 
+            boolean isUrgent = in.header.hasFlag(MessageFlag.URGENT);
             if (!PaxosCleanup.isInRangeAndShouldProcess(ctx, request.ranges, 
request.tableId))
             {
                 // Try catching up, in case it's us
@@ -82,24 +84,24 @@ public class PaxosCleanupRequest
 
                 String msg = String.format("Rejecting cleanup request %s from 
%s. Some ranges are not replicated (%s)",
                                            request.session, in.from(), 
request.ranges);
-                Message<PaxosCleanupResponse> response = 
Message.out(PAXOS2_CLEANUP_RSP2, PaxosCleanupResponse.failed(request.session, 
msg));
+                Message<PaxosCleanupResponse> response = 
Message.out(PAXOS2_CLEANUP_RSP2, PaxosCleanupResponse.failed(request.session, 
msg), isUrgent);
                 ctx.messaging().send(response, in.respondTo());
                 return;
             }
 
             PaxosCleanupLocalCoordinator coordinator = 
PaxosCleanupLocalCoordinator.create(ctx, request);
 
-            coordinator.addCallback(new FutureCallback<PaxosCleanupResponse>()
+            coordinator.addCallback(new FutureCallback<>()
             {
                 public void onSuccess(@Nullable PaxosCleanupResponse finished)
                 {
-                    Message<PaxosCleanupResponse> response = 
Message.out(PAXOS2_CLEANUP_RSP2, coordinator.getNow());
+                    Message<PaxosCleanupResponse> response = 
Message.out(PAXOS2_CLEANUP_RSP2, coordinator.getNow(), isUrgent);
                     ctx.messaging().send(response, in.respondTo());
                 }
 
                 public void onFailure(Throwable throwable)
                 {
-                    Message<PaxosCleanupResponse> response = 
Message.out(PAXOS2_CLEANUP_RSP2, PaxosCleanupResponse.failed(request.session, 
throwable.getMessage()));
+                    Message<PaxosCleanupResponse> response = 
Message.out(PAXOS2_CLEANUP_RSP2, PaxosCleanupResponse.failed(request.session, 
throwable.getMessage()), isUrgent);
                     ctx.messaging().send(response, in.respondTo());
                 }
             });
@@ -112,7 +114,7 @@ public class PaxosCleanupRequest
     }
     public static final IVerbHandler<PaxosCleanupRequest> verbHandler = 
createVerbHandler(SharedContext.Global.instance);
 
-    public static final IVersionedSerializer<PaxosCleanupRequest> serializer = 
new IVersionedSerializer<PaxosCleanupRequest>()
+    public static final IVersionedSerializer<PaxosCleanupRequest> serializer = 
new IVersionedSerializer<>()
     {
         public void serialize(PaxosCleanupRequest completer, DataOutputPlus 
out, int version) throws IOException
         {
diff --git 
a/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanupSession.java 
b/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanupSession.java
index 681f67a56c..80f571cd26 100644
--- 
a/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanupSession.java
+++ 
b/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanupSession.java
@@ -98,15 +98,17 @@ public class PaxosCleanupSession extends AsyncFuture<Void> 
implements Runnable,
     private final TableId tableId;
     private final Collection<Range<Token>> ranges;
     private final Queue<InetAddressAndPort> pendingCleanups = new 
ConcurrentLinkedQueue<>();
+    private final boolean isUrgent;
     private InetAddressAndPort inProgress = null;
     private volatile long lastMessageSentNanos;
     private ScheduledFuture<?> timeout;
 
-    PaxosCleanupSession(SharedContext ctx, Collection<InetAddressAndPort> 
endpoints, TableId tableId, Collection<Range<Token>> ranges)
+    PaxosCleanupSession(SharedContext ctx, Collection<InetAddressAndPort> 
endpoints, TableId tableId, Collection<Range<Token>> ranges, boolean isUrgent)
     {
         this.ctx = ctx;
         this.tableId = tableId;
         this.ranges = ranges;
+        this.isUrgent = isUrgent;
 
         pendingCleanups.addAll(endpoints);
         lastMessageSentNanos = ctx.clock().nanoTime();
@@ -125,7 +127,7 @@ public class PaxosCleanupSession extends AsyncFuture<Void> 
implements Runnable,
     {
         lastMessageSentNanos = ctx.clock().nanoTime();
         PaxosCleanupRequest completer = new PaxosCleanupRequest(session, 
tableId, ranges);
-        Message<PaxosCleanupRequest> msg = Message.out(PAXOS2_CLEANUP_REQ, 
completer);
+        Message<PaxosCleanupRequest> msg = Message.out(PAXOS2_CLEANUP_REQ, 
completer, isUrgent);
         ctx.messaging().sendWithCallback(msg, endpoint, this);
     }
 
diff --git 
a/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosFinishPrepareCleanup.java
 
b/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosFinishPrepareCleanup.java
index b3104788d2..07b1bbe334 100644
--- 
a/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosFinishPrepareCleanup.java
+++ 
b/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosFinishPrepareCleanup.java
@@ -38,12 +38,12 @@ public class PaxosFinishPrepareCleanup extends 
AsyncFuture<Void> implements Requ
         this.waitingResponse = new HashSet<>(endpoints);
     }
 
-    public static PaxosFinishPrepareCleanup finish(SharedContext ctx, 
Collection<InetAddressAndPort> endpoints, PaxosCleanupHistory result)
+    public static PaxosFinishPrepareCleanup finish(SharedContext ctx, 
Collection<InetAddressAndPort> endpoints, boolean isUrgent, PaxosCleanupHistory 
result)
     {
         PaxosFinishPrepareCleanup callback = new 
PaxosFinishPrepareCleanup(endpoints);
         synchronized (callback)
         {
-            Message<PaxosCleanupHistory> message = 
Message.out(Verb.PAXOS2_CLEANUP_FINISH_PREPARE_REQ, result);
+            Message<PaxosCleanupHistory> message = 
Message.out(Verb.PAXOS2_CLEANUP_FINISH_PREPARE_REQ, result, isUrgent);
             for (InetAddressAndPort endpoint : endpoints)
                 ctx.messaging().sendWithCallback(message, endpoint, callback);
         }
diff --git 
a/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosStartPrepareCleanup.java
 
b/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosStartPrepareCleanup.java
index 8f631bcbed..41735526a4 100644
--- 
a/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosStartPrepareCleanup.java
+++ 
b/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosStartPrepareCleanup.java
@@ -77,12 +77,12 @@ public class PaxosStartPrepareCleanup extends 
AsyncFuture<PaxosCleanupHistory> i
      * prepare message to prevent racing with gossip dissemination and 
guarantee that every repair participant is aware
      * of the pending ring change during repair.
      */
-    public static PaxosStartPrepareCleanup prepare(SharedContext ctx, TableId 
tableId, Collection<InetAddressAndPort> endpoints, EndpointState localEpState, 
Collection<Range<Token>> ranges)
+    public static PaxosStartPrepareCleanup prepare(SharedContext ctx, TableId 
tableId, Collection<InetAddressAndPort> endpoints, EndpointState localEpState, 
Collection<Range<Token>> ranges, boolean isUrgent)
     {
         PaxosStartPrepareCleanup callback = new 
PaxosStartPrepareCleanup(tableId, endpoints);
         synchronized (callback)
         {
-            Message<Request> message = 
Message.out(PAXOS2_CLEANUP_START_PREPARE_REQ, new Request(tableId, 
localEpState, ranges));
+            Message<Request> message = 
Message.out(PAXOS2_CLEANUP_START_PREPARE_REQ, new Request(tableId, 
localEpState, ranges), isUrgent);
             for (InetAddressAndPort endpoint : endpoints)
                 ctx.messaging().sendWithCallback(message, endpoint, callback);
         }
diff --git 
a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java 
b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
index a2b225ca0b..e53f7c724a 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
@@ -486,6 +486,19 @@ public class Instance extends IsolatedExecutor implements 
IInvokableInstance
         }
     }
 
+    @VisibleForTesting
+    public static Message.Header deserializeHeader(IMessage message)
+    {
+        try (DataInputBuffer in = new DataInputBuffer(message.bytes()))
+        {
+            return Message.serializer.deserializeHeader(in, 
toCassandraInetAddressAndPort(message.from()), message.version());
+        }
+        catch (Throwable t)
+        {
+            throw new RuntimeException("Can not deserialize heaader " + 
message, t);
+        }
+    }
+
     @Override
     public void receiveMessage(IMessage message)
     {
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/ring/CMSUrgentMessagesTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/ring/CMSUrgentMessagesTest.java
new file mode 100644
index 0000000000..8a718cf013
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/ring/CMSUrgentMessagesTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.ring;
+
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.impl.Instance;
+import org.apache.cassandra.distributed.test.log.FuzzTestBase;
+import org.apache.cassandra.net.MessageFlag;
+import org.apache.cassandra.net.Verb;
+import org.apache.cassandra.tcm.sequences.AddToCMS;
+
+public class CMSUrgentMessagesTest extends FuzzTestBase
+{
+    @Test
+    public void allPaxosMessagesAreUrgentTest() throws Throwable
+    {
+        try (Cluster cluster = builder().withNodes(3).start())
+        {
+            List<Throwable> thrown = new CopyOnWriteArrayList<>();
+            cluster.filters()
+                   .allVerbs()
+                   .messagesMatching((from, to, msg) -> {
+                       Verb verb = Verb.fromId(msg.verb());
+                       if (!verb.toString().contains("PAXOS2"))
+                           return false;
+
+                       try
+                       {
+                           boolean hasFlag = cluster.get(1).callOnInstance(() 
-> Instance.deserializeHeader(msg).hasFlag(MessageFlag.URGENT));
+                           assert hasFlag : String.format("%s does not have 
URGENT flag set: %s", verb, msg);
+                       }
+                       catch (Throwable t)
+                       {
+                           thrown.add(t);
+                       }
+                       return false;
+                   })
+                   .drop();
+
+            for (int idx : new int[]{ 2, 3 })
+                cluster.get(idx).runOnInstance(() -> AddToCMS.initiate());
+
+            if (!thrown.isEmpty())
+            {
+                Throwable t = new AssertionError("Caught exceptions");
+                for (Throwable throwable : thrown)
+                    t.addSuppressed(throwable);
+                throw t;
+            }
+        }
+    }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org


Reply via email to