iamaleksey commented on code in PR #4447:
URL: https://github.com/apache/cassandra/pull/4447#discussion_r2742332360


##########
src/java/org/apache/cassandra/service/paxos/CasForwardResponse.java:
##########
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.paxos;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.db.partitions.FilteredPartition;
+import org.apache.cassandra.db.rows.RowIterator;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.db.rows.UnfilteredRowIteratorSerializer;
+import org.apache.cassandra.db.rows.UnfilteredRowIterators;
+import org.apache.cassandra.exceptions.CassandraException;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.utils.CollectionSerializers;
+
+import static 
org.apache.cassandra.db.SerializationHeader.StableHeaderSerializer.STABLE;
+import static 
org.apache.cassandra.db.rows.DeserializationHelper.Flag.FROM_REMOTE;
+
+/**
+ * Response containing the result of a forwarded CAS operation.
+ * Can contain either a successful result or an exception that occurred during 
execution.
+ */
+public class CasForwardResponse
+{
+    public static final Serializer serializer = new Serializer();
+
+    public final RowIterator result;
+    public final CassandraException exception;
+    public final List<String> warnings;
+
+    public CasForwardResponse(RowIterator result, List<String> warnings)
+    {
+        this.result = result;
+        this.exception = null;
+        this.warnings = warnings;
+    }
+
+    public CasForwardResponse(CassandraException exception, List<String> 
warnings)
+    {
+        this.result = null;
+        this.exception = exception;
+        this.warnings = warnings;
+    }
+
+    private CasForwardResponse(RowIterator result, CassandraException 
exception, List<String> warnings)
+    {
+        this.result = result;
+        this.exception = exception;
+        this.warnings = warnings;
+    }
+
+    public boolean isSuccess()
+    {
+        return exception == null;
+    }
+
+    public static class Serializer implements 
IVersionedSerializer<CasForwardResponse>
+    {
+        @Override
+        public void serialize(CasForwardResponse response, DataOutputPlus out, 
int version) throws IOException
+        {
+            out.writeBoolean(response.exception != null);

Review Comment:
   Would be cleaner with three flags, one for each field, instead of this 
nested complexity and asymmetric warnings ser/de logic I think. Plus having 
`warning` be a nullable has some messy handling consequences. I'd prefer it be 
an `emptyList()` instead.



##########
src/java/org/apache/cassandra/service/paxos/Paxos2CommitForwardRequest.java:
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.paxos;
+
+import java.io.IOException;
+
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.locator.EndpointsForToken;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.Replica;
+import org.apache.cassandra.service.paxos.Commit.Agreed;
+
+import static org.apache.cassandra.dht.AbstractBounds.tokenSerializer;
+
+/**
+ * Request to forward a Paxos V2 commit operation to a replica coordinator.
+ * This is used when the original coordinator is not a replica but needs to
+ * execute a Paxos commit for a tracked keyspace that requires MutationId 
generation.
+ * 
+ * Contains only the essential data needed by PaxosCommit instead of the full 
Participants object.
+ */
+public class Paxos2CommitForwardRequest
+{
+    public static final Serializer serializer = new Serializer();
+
+    public final Agreed commit;
+    public final ConsistencyLevel consistencyForConsensus;
+    public final ConsistencyLevel consistencyForCommit;
+    public final EndpointsForToken all;
+    public final EndpointsForToken allLive;
+    public final EndpointsForToken allDown;
+    public final int required;
+    public final boolean isUrgent;
+
+    public Paxos2CommitForwardRequest(Agreed commit,
+                                     ConsistencyLevel consistencyForConsensus,
+                                     ConsistencyLevel consistencyForCommit,
+                                     EndpointsForToken all,
+                                     EndpointsForToken allLive,
+                                     EndpointsForToken allDown,
+                                     int required,
+                                     boolean isUrgent)
+    {
+        this.commit = commit;
+        this.consistencyForConsensus = consistencyForConsensus;
+        this.consistencyForCommit = consistencyForCommit;
+        this.all = all;
+        this.allLive = allLive;
+        this.allDown = allDown;
+        this.required = required;
+        this.isUrgent = isUrgent;
+    }
+
+    public static class Serializer implements 
IVersionedSerializer<Paxos2CommitForwardRequest>
+    {
+        @Override
+        public void serialize(Paxos2CommitForwardRequest request, 
DataOutputPlus out, int version) throws IOException
+        {
+            Agreed.serializer.serialize(request.commit, out, version);
+            out.writeByte(request.consistencyForConsensus.code);
+            out.writeByte(request.consistencyForCommit.code);
+
+            // Serialize EndpointsForToken collections
+            serializeEndpoints(request.all, out, version);
+            serializeEndpoints(request.allLive, out, version);
+            serializeEndpoints(request.allDown, out, version);
+
+            out.writeUnsignedVInt32(request.required);
+            out.writeBoolean(request.isUrgent);
+        }
+
+        @Override
+        public Paxos2CommitForwardRequest deserialize(DataInputPlus in, int 
version) throws IOException
+        {
+            Agreed commit = Agreed.serializer.deserialize(in, version);
+            ConsistencyLevel consistencyForConsensus = 
ConsistencyLevel.fromCode(in.readUnsignedByte());
+            ConsistencyLevel consistencyForCommit = 
ConsistencyLevel.fromCode(in.readUnsignedByte());
+
+            // Deserialize EndpointsForToken collections using partitioner 
from commit
+            IPartitioner partitioner = commit.metadata().partitioner;
+            EndpointsForToken all = deserializeEndpoints(in, version, 
partitioner);
+            EndpointsForToken allLive = deserializeEndpoints(in, version, 
partitioner);
+            EndpointsForToken allDown = deserializeEndpoints(in, version, 
partitioner);
+
+            int required = in.readUnsignedVInt32();
+            boolean isUrgent = in.readBoolean();
+
+            return new Paxos2CommitForwardRequest(commit, 
consistencyForConsensus, consistencyForCommit,
+                                                  all, allLive, allDown, 
required, isUrgent);
+        }
+
+        @Override
+        public long serializedSize(Paxos2CommitForwardRequest request, int 
version)
+        {
+            long size = Agreed.serializer.serializedSize(request.commit, 
version)
+                        + 1  // consistencyForConsensus.code
+                        + 1; // consistencyForCommit.code
+
+            size += endpointsSerializedSize(request.all, version);
+            size += endpointsSerializedSize(request.allLive, version);
+            size += endpointsSerializedSize(request.allDown, version);
+
+            size += TypeSizes.sizeofUnsignedVInt(request.required);
+            size += TypeSizes.BOOL_SIZE; // isUrgent
+
+            return size;
+        }
+
+        private void serializeEndpoints(EndpointsForToken endpoints, 
DataOutputPlus out, int version) throws IOException
+        {
+            out.writeUnsignedVInt32(endpoints.size());
+            Token token = endpoints.token();
+            Token.compactSerializer.serialize(token, out, version);
+
+            for (Replica replica : endpoints)
+            {
+                
InetAddressAndPort.Serializer.inetAddressAndPortSerializer.serialize(replica.endpoint(),
 out, version);

Review Comment:
   Replica.Serializer exists.



##########
src/java/org/apache/cassandra/service/paxos/CasForwardRequest.java:
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.paxos;
+
+import java.io.IOException;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.cassandra.cql3.statements.CQL3CasRequest;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.RemoteClientState;
+
+/**
+ * Request to forward a CAS operation to a replica coordinator for tracked 
keyspaces.
+ * Contains the essential information needed to execute the CAS operation on 
the remote coordinator.
+ */
+public class CasForwardRequest
+{
+    public static final Serializer serializer = new Serializer();
+
+    public final String keyspaceName;
+    public final String cfName;
+    public final DecoratedKey key;
+    public final ConsistencyLevel consistencyForPaxos;
+    public final ConsistencyLevel consistencyForCommit;
+    public final long nowInSeconds;
+    public final RemoteClientState clientState;
+    public final CQL3CasRequest casRequest;  // The actual CAS request to 
forward
+
+    /**
+     * Backward compatibility constructor that accepts ClientState.
+     */
+    public CasForwardRequest(String keyspaceName,
+                            String cfName,
+                            DecoratedKey key,
+                            CQL3CasRequest request,
+                            ConsistencyLevel consistencyForPaxos,
+                            ConsistencyLevel consistencyForCommit,
+                            ClientState clientState,
+                            long nowInSeconds)
+    {
+        Preconditions.checkNotNull(keyspaceName, "keyspaceName cannot be 
null");
+        Preconditions.checkNotNull(cfName, "cfName cannot be null");
+        Preconditions.checkNotNull(key, "key cannot be null");
+        Preconditions.checkNotNull(request, "request cannot be null");
+        Preconditions.checkNotNull(consistencyForPaxos, "consistencyForPaxos 
cannot be null");
+        Preconditions.checkNotNull(consistencyForCommit, "consistencyForCommit 
cannot be null");
+        Preconditions.checkNotNull(clientState, "clientState cannot be null");

Review Comment:
   Not sure why here and in `ConsensusReadForwardRequest` we are checking for 
nulls, but nowhere else, but you can cut all these lines by assigning the 
return value of `checkNotNull()`



##########
src/java/org/apache/cassandra/db/guardrails/Guardrail.java:
##########
@@ -108,15 +108,30 @@ protected void warn(String message, String 
redactedMessage)
         if (skipNotifying(true))
             return;
 
-        message = decorateMessage(message);
+        String decoratedMessage = decorateMessage(message);
+        String decoratedRedactedMessage = decorateMessage(redactedMessage);
 
-        logger.warn(message);
+        // If we're deferring warnings (during CAS fragment creation), defer 
all warning outputs
+        if (ClientWarn.instance.isDeferring())
+        {
+            // ClientWarn will automatically capture this to the deferred list
+            ClientWarn.instance.warn(decoratedMessage);

Review Comment:
   Can pull this out of the if statement as it's shared with the other branch, 
on line 128.



##########
src/java/org/apache/cassandra/exceptions/WriteTimeoutException.java:
##########
@@ -36,4 +41,38 @@ public WriteTimeoutException(WriteType writeType, 
ConsistencyLevel consistency,
         super(ExceptionCode.WRITE_TIMEOUT, consistency, received, blockFor, 
msg);
         this.writeType = writeType;
     }
+
+    @Override
+    protected void serializeSpecificFields(DataOutputPlus out, int version) 
throws IOException
+    {
+        out.writeByte(consistency.code);
+        out.writeUnsignedVInt32(received);
+        out.writeUnsignedVInt32(blockFor);
+        out.writeUTF(writeType.toString());

Review Comment:
   WriteType is the only place where we ser/de an enum by its string value 
rather than int code.



##########
src/java/org/apache/cassandra/db/guardrails/PasswordGuardrail.java:
##########
@@ -95,5 +95,11 @@ public static class PasswordGuardrailException extends 
GuardrailViolatedExceptio
             super(message);
             this.redactedMessage = redactedMessage;
         }
+
+        PasswordGuardrailException(String message, String redactedMessage, 
Throwable cause)

Review Comment:
   Doesn't override `warn()` and `fail()` implementing deferral, which I think 
is problematic.



##########
src/java/org/apache/cassandra/service/RemoteClientState.java:
##########
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import org.apache.cassandra.auth.AuthenticatedUser;
+import org.apache.cassandra.auth.IResource;
+import org.apache.cassandra.auth.Permission;
+import org.apache.cassandra.cql3.functions.Function;
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.utils.MD5Digest;
+
+/**
+ * A serializable version of ClientState designed for forwarding operations to 
remote coordinators.
+ *
+ * This class contains only the essential authorization attributes needed for 
forwarded operations,
+ * avoiding the complex serialization challenges of the full AuthenticatedUser 
object.
+ *
+ * Key design principles:
+ * - Contains only serializable authorization state (isSuper, isInternal, etc.)
+ * - Supports guardrail enforcement through isOrdinaryUser() and 
applyGuardrails()
+ * - Throws exceptions for operations requiring the full user object 
(getUser(), authorize())
+ * - Used specifically for CAS and consensus read forwarding in tracked 
keyspaces
+ */
+public class RemoteClientState extends ClientState
+{
+    public static final Serializer serializer = new Serializer();
+
+    // Essential serializable state for authorization context
+    private final boolean isSuperCached;
+    private final String userName; // For logging/error messages (optional)
+
+    /**
+     * Creates a RemoteClientState from a local ClientState for forwarding.
+     * Extracts only the essential authorization attributes.
+     */
+    public static RemoteClientState from(ClientState localState)
+    {
+        if (!(localState instanceof LocalClientState))
+        {
+            throw new IllegalArgumentException("Cannot create 
RemoteClientState from non-LocalClientState: " + localState.getClass());
+        }
+        LocalClientState local = (LocalClientState) localState;
+
+        return new RemoteClientState(
+            localState.isSuper(),
+            localState.isSystem(),
+            localState.applyGuardrails(),
+            localState.getRawKeyspace(),
+            local.getUser() != null ? local.getUser().getName() : null
+        );
+    }
+
+    /**
+     * Private constructor for deserialization and from() method.
+     */
+    private RemoteClientState(boolean isSuper, boolean isInternal, boolean 
applyGuardrails, String keyspace, String userName)
+    {
+        super(isInternal);
+        this.isSuperCached = isSuper;
+        this.userName = userName;
+
+        // Set the cached guardrail and keyspace state in the base class
+        if (!applyGuardrails)
+            pauseGuardrails();
+        if (keyspace != null)
+            setRawKeyspace(keyspace);
+    }
+
+    // ==========================================
+    // Cached user attributes (implemented with cached values)
+    // ==========================================
+
+    @Override
+    public boolean isSuper()
+    {
+        return isSuperCached;
+    }
+
+    // ==========================================
+    // Methods that throw exceptions for remote operations
+    // ==========================================
+
+    @Override
+    public AuthenticatedUser getUser()
+    {
+        throw new UnsupportedOperationException("Cannot access user object in 
remote client state. " +
+                                              "Full user operations must be 
performed on the original coordinator.");
+    }
+
+    @Override
+    public void login(AuthenticatedUser user)
+    {
+        throw new UnsupportedOperationException("Cannot login on remote client 
state.");
+    }
+
+    @Override
+    public void setKeyspace(String ks)
+    {
+        throw new UnsupportedOperationException("Cannot modify keyspace on 
remote client state.");
+    }
+
+    @Override
+    protected Set<Permission> authorize(IResource resource)
+    {
+        throw new UnsupportedOperationException("Cannot perform authorization 
in remote client state. " +
+                                              "Authorization must be performed 
on the original coordinator before forwarding.");
+    }
+
+    @Override
+    public void ensurePermission(Permission perm, IResource resource)
+    {
+        throw new UnsupportedOperationException("Cannot perform authorization 
in remote client state. " +
+                                              "Authorization must be performed 
on the original coordinator before forwarding.");
+    }
+
+    @Override
+    public void ensurePermission(Permission permission, Function function)
+    {
+        throw new UnsupportedOperationException("Cannot perform authorization 
in remote client state. " +
+                                              "Authorization must be performed 
on the original coordinator before forwarding.");
+    }
+
+    @Override
+    public boolean hasTablePermission(TableMetadata table, Permission perm)
+    {
+        throw new UnsupportedOperationException("Cannot check table 
permissions in remote client state. " +
+                                              "Permission checks must be 
performed on the original coordinator before forwarding.");
+    }
+
+    @Override
+    public void validateLogin()
+    {
+        throw new UnsupportedOperationException("Cannot validate login in 
remote client state. " +
+                                              "Login validation must be 
performed on the original coordinator before forwarding.");
+    }
+
+    @Override
+    public void ensureNotAnonymous()
+    {
+        throw new UnsupportedOperationException("Cannot validate anonymous 
status in remote client state. " +
+                                              "Anonymous validation must be 
performed on the original coordinator before forwarding.");
+    }
+
+    @Override
+    public InetSocketAddress getRemoteAddress()
+    {
+        throw new UnsupportedOperationException("Remote address not available 
in remote client state.");
+    }
+
+    @Override
+    public InetAddress getClientAddress()
+    {
+        throw new UnsupportedOperationException("Client address not available 
in remote client state.");
+    }
+
+    @Override
+    public Optional<String> getDriverName()
+    {
+        throw new UnsupportedOperationException("Driver information not 
available in remote client state.");
+    }
+
+    @Override
+    public Optional<String> getDriverVersion()
+    {
+        throw new UnsupportedOperationException("Driver information not 
available in remote client state.");
+    }
+
+    @Override
+    public Optional<Map<String, String>> getClientOptions()
+    {
+        throw new UnsupportedOperationException("Client options not available 
in remote client state.");
+    }
+
+    @Override
+    public void setDriverName(String driverName)
+    {
+        throw new UnsupportedOperationException("Cannot modify driver 
information in remote client state.");
+    }
+
+    @Override
+    public void setDriverVersion(String driverVersion)
+    {
+        throw new UnsupportedOperationException("Cannot modify driver 
information in remote client state.");
+    }
+
+    @Override
+    public void setClientOptions(Map<String, String> clientOptions)
+    {
+        throw new UnsupportedOperationException("Cannot modify client options 
in remote client state.");
+    }
+
+    @Override
+    public void warnAboutUseWithPreparedStatements(MD5Digest statementId, 
String preparedKeyspace)
+    {
+        throw new UnsupportedOperationException("Cannot issue warnings in 
remote client state. " +
+                                              "Warnings should be issued on 
the original coordinator.");
+    }
+
+    @Override
+    public void warnAboutUneligiblePreparedStatement(MD5Digest statementId)
+    {
+        throw new UnsupportedOperationException("Cannot issue warnings in 
remote client state. " +
+                                              "Warnings should be issued on 
the original coordinator.");
+    }
+
+    /**
+     * Get the user name for logging purposes (if available).
+     */
+    public String getUserNameForLogging()
+    {
+        return userName != null ? userName : "unknown";
+    }
+
+    /**
+     * Serializer for RemoteClientState to enable network transmission.
+     */
+    public static class Serializer implements 
IVersionedSerializer<RemoteClientState>
+    {
+        @Override
+        public void serialize(RemoteClientState state, DataOutputPlus out, int 
version) throws IOException
+        {
+            out.writeBoolean(state.isSuperCached);

Review Comment:
   Five scattered booleans here could all be flags.



##########
src/java/org/apache/cassandra/service/ClientWarn.java:
##########
@@ -87,23 +88,121 @@ public void resetWarnings()
         set(null);
     }
 
+    /**
+     * Start deferring warnings. Any warnings added after this call will be 
stored
+     * separately and can later be committed (inserted at the marked position) 
or discarded.
+     * The insertion point is recorded as the current size of the warnings 
list.
+     * <p>
+     * This is used for CAS operations where we don't want to emit warnings 
until
+     * we know whether the conditions passed.
+     */
+    public void startDeferring()
+    {
+        State state = get();
+        if (state != null)
+            state.startDeferring();
+    }
+
+    /**
+     * Check if we're currently deferring warnings.
+     */
+    public boolean isDeferring()
+    {
+        State state = get();
+        return state != null && state.isDeferring();
+    }
+
+    /**
+     * Commit deferred warnings by inserting them at the recorded insertion 
point.
+     * After this call, deferring mode is disabled.
+     *
+     * @param replayAction optional consumer to receive deferred actions for 
replay (e.g., guardrail diagnostic events)
+     */
+    public void commitDeferredWarnings(Consumer<Runnable> replayAction)
+    {
+        State state = get();
+        if (state != null)
+            state.commitDeferredWarnings(replayAction);
+    }
+
+    /**
+     * Commit deferred warnings by inserting them at the recorded insertion 
point.
+     * After this call, deferring mode is disabled.
+     */
+    public void commitDeferredWarnings()
+    {
+        commitDeferredWarnings(Runnable::run);
+    }
+
+    /**
+     * Discard all deferred warnings without adding them to the main warnings 
list.
+     * After this call, deferring mode is disabled.
+     */
+    public void discardDeferredWarnings()
+    {
+        State state = get();
+        if (state != null)
+            state.discardDeferredWarnings();
+    }
+
+    /**
+     * Store a deferred action to be executed when warnings are committed.
+     * This is used to defer guardrail diagnostic events until we know 
conditions passed.
+     */
+    public void addDeferredAction(Runnable action)
+    {
+        State state = get();
+        if (state != null)
+            state.addDeferredAction(action);
+    }
+
     public static class State
     {
         private boolean collecting = true;
         // This must be a thread-safe list. Even though it's wrapped in a 
ThreadLocal, it's propagated to each thread
         // from shared state, so multiple threads can reference the same State.
         private volatile List<String> warnings;
 
+        // Deferred warnings support for CAS operations
+        // These must also be thread-safe since State can be shared across 
threads
+        private volatile List<String> deferredWarnings;
+        private volatile List<Runnable> deferredActions;
+        private volatile int insertionPoint = -1;
+

Review Comment:
   Looks like Claude copied the existing cursed pattern and then made it even 
more cursed. The formatting is off, but also this class is way too complicated 
for what it does.



##########
src/java/org/apache/cassandra/service/paxos/ConsensusReadForwardResponse.java:
##########
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.paxos;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.db.partitions.FilteredPartition;
+import org.apache.cassandra.db.partitions.PartitionIterator;
+import org.apache.cassandra.db.partitions.PartitionIterators;
+import org.apache.cassandra.db.rows.RowIterator;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.db.rows.UnfilteredRowIteratorSerializer;
+import org.apache.cassandra.db.rows.UnfilteredRowIterators;
+import org.apache.cassandra.exceptions.CassandraException;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.utils.CollectionSerializers;
+
+import static 
org.apache.cassandra.db.SerializationHeader.StableHeaderSerializer.STABLE;
+import static 
org.apache.cassandra.db.rows.DeserializationHelper.Flag.FROM_REMOTE;
+
+/**
+ * Response containing the result of a forwarded consensus read operation.
+ * Can contain either a successful result or an exception that occurred during 
execution.
+ *
+ * Since consensus reads are single partition reads, we store a single 
RowIterator
+ * and wrap it as a PartitionIterator when accessed.
+ */
+public class ConsensusReadForwardResponse
+{
+    public static final Serializer serializer = new Serializer();
+
+    // Store as RowIterator internally (single partition) but expose as 
PartitionIterator
+    private final RowIterator rowIterator;
+    public final CassandraException exception;
+    public final List<String> warnings;
+
+    public ConsensusReadForwardResponse(PartitionIterator result, List<String> 
warnings)
+    {
+        // Extract the single partition from the iterator (consensus reads are 
single partition)
+        if (result != null && result.hasNext())
+        {
+            this.rowIterator = result.next();
+        }
+        else
+        {
+            this.rowIterator = null;
+        }
+        this.exception = null;
+        this.warnings = warnings;
+    }
+
+    public ConsensusReadForwardResponse(CassandraException exception, 
List<String> warnings)
+    {
+        this.rowIterator = null;
+        this.exception = exception;
+        this.warnings = warnings;
+    }
+
+    // Private constructor for deserialization
+    private ConsensusReadForwardResponse(RowIterator rowIterator, 
CassandraException exception, List<String> warnings)
+    {
+        this.rowIterator = rowIterator;
+        this.exception = exception;
+        this.warnings = warnings;
+    }
+
+    public boolean isSuccess()
+    {
+        return exception == null;
+    }
+
+    /**
+     * Get the result as a PartitionIterator.
+     */
+    public PartitionIterator getResult()
+    {
+        if (rowIterator == null)
+            return null;
+        return PartitionIterators.singletonIterator(rowIterator);
+    }
+
+    public static class Serializer implements 
IVersionedSerializer<ConsensusReadForwardResponse>
+    {
+        @Override
+        public void serialize(ConsensusReadForwardResponse response, 
DataOutputPlus out, int version) throws IOException
+        {
+            out.writeBoolean(response.exception != null);

Review Comment:
   Same note, duplicating:
   
   Would be cleaner with three flags, one for each field, instead of this 
nested complexity and asymmetric warnings ser/de logic I think. Plus having 
warning be a nullable has some messy handling consequences. I'd prefer it be an 
emptyList() instead.



##########
src/java/org/apache/cassandra/service/paxos/ConsensusReadForwardRequest.java:
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.paxos;
+
+import java.io.IOException;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.ReadCommand;
+import org.apache.cassandra.db.SinglePartitionReadCommand;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+
+/**
+ * Request to forward a consensus read operation to a replica coordinator.
+ * This is used when the original coordinator is not a replica but needs to
+ * execute a consensus read for a tracked keyspace that requires proper 
coordination.
+ *
+ * Consensus reads only ever contain a single read command.
+ */
+public class ConsensusReadForwardRequest
+{
+    public static final Serializer serializer = new Serializer();
+
+    public final SinglePartitionReadCommand command;
+    public final ConsistencyLevel consistencyLevel;
+
+    public ConsensusReadForwardRequest(SinglePartitionReadCommand command,
+                                     ConsistencyLevel consistencyLevel)
+    {
+        Preconditions.checkNotNull(command, "command cannot be null");
+        Preconditions.checkNotNull(consistencyLevel, "consistencyLevel cannot 
be null");

Review Comment:
   Not sure why here and in `CASForwardRequest` we are checking for nulls, but 
nowhere else, but you can cut all these lines by assigning the return value of 
`checkNotNull()`



##########
src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java:
##########
@@ -580,4 +586,234 @@ public ConsensusAttemptResult toCasResult(TxnResult 
txnResult)
         TxnDataKeyValue partition = 
(TxnDataKeyValue)txnData.get(txnDataName(CAS_READ));
         return casResult(partition != null ? partition.rowIterator(false) : 
null);
     }
+
+    /**
+     * IVersionedSerializer for CQL3CasRequest to enable CAS forwarding 
between coordinators.
+     *
+     */
+    public static class Serializer implements 
IVersionedSerializer<CQL3CasRequest>
+    {
+        public static final Serializer instance = new Serializer();
+
+        @Override
+        public void serialize(CQL3CasRequest request, DataOutputPlus out, int 
version) throws IOException
+        {
+            // Serialize table metadata using compact form
+            request.metadata.id.serializeCompact(out);
+
+            // Serialize partition key
+            DecoratedKey.serializer.serialize(request.key, out, version);
+
+            // Serialize condition columns - serialize statics and regulars 
separately
+            Columns.serializer.serialize(request.conditionColumns.statics, 
out);
+            Columns.serializer.serialize(request.conditionColumns.regulars, 
out);
+
+            // Serialize boolean flags as bit field
+            byte flags = (byte) ((request.updatesRegularRows ? 0x01 : 0) |
+                                (request.updatesStaticRow ? 0x02 : 0) |
+                                (request.hasExists ? 0x04 : 0) |
+                                (request.conditionCheckOnly ? 0x08 : 0));

Review Comment:
   Make up some constants for the flags maybe? It's what we usually do.



##########
src/java/org/apache/cassandra/service/accord/txn/TxnReferenceOperations.java:
##########
@@ -62,12 +62,28 @@ public boolean equals(Object o)
         if (this == o) return true;
         if (o == null || getClass() != o.getClass()) return false;
         TxnReferenceOperations that = (TxnReferenceOperations) o;
-        return metadata.equals(that.metadata) && Objects.equals(clustering, 
that.clustering) && regulars.equals(that.regulars) && 
statics.equals(that.statics);
+
+        // Special case: EMPTY singleton matches any other empty instance
+        if ((this.metadata == null && this.isEmpty() && that.isEmpty()) ||
+            (that.metadata == null && that.isEmpty() && this.isEmpty()))
+            return true;
+
+        // Otherwise, use strict comparison including metadata
+        return Objects.equals(metadata, that.metadata) &&
+               Objects.equals(clustering, that.clustering) &&
+               regulars.equals(that.regulars) &&
+               statics.equals(that.statics);
     }
 
     @Override
     public int hashCode()
     {
+        // ALL empty instances (regardless of metadata) get the same hash code
+        // to match the equals() behavior
+        if (isEmpty())
+            return Objects.hash("EMPTY_SINGLETON");

Review Comment:
   This is not the best way to do it :)



##########
src/java/org/apache/cassandra/db/SystemKeyspace.java:
##########
@@ -1603,44 +1605,83 @@ public static void savePaxosProposal(Commit proposal)
                             proposal.ballot,
                             PartitionUpdate.toBytes(proposal.update, 
MessagingService.current_version),
                             MessagingService.current_version,
-                            proposal.update.partitionKey().getKey(),
-                            proposal.update.metadata().id.asUUID());
+                            proposal.partitionKey().getKey(),
+                            proposal.metadata().id.asUUID());
         }
     }
 
     public static void savePaxosCommit(Commit commit)
     {
         // We always erase the last proposal (with the commit timestamp to no 
erase more recent proposal in case the commit is old)
         // even though that's really just an optimization  since 
SP.beginAndRepairPaxos will exclude accepted proposal older than the mrc.
+        ByteBuffer mutationIdBytes = 
mutationIdToByteBuffer(commit.mutation.id());
+
         if (commit instanceof Commit.CommittedWithTTL)
         {
             long localDeletionTime = ((Commit.CommittedWithTTL) 
commit).localDeletionTime;
-            int ttlInSec = legacyPaxosTtlSec(commit.update.metadata());
+            int ttlInSec = legacyPaxosTtlSec(commit.metadata());
             long nowInSec = localDeletionTime - ttlInSec;
-            String cql = "UPDATE system." + PAXOS + " USING TIMESTAMP ? AND 
TTL ? SET proposal_ballot = null, proposal = null, proposal_version = null, 
most_recent_commit_at = ?, most_recent_commit = ?, most_recent_commit_version = 
? WHERE row_key = ? AND cf_id = ?";
+            String cql = "UPDATE system." + PAXOS + " USING TIMESTAMP ? AND 
TTL ? SET proposal_ballot = null, proposal = null, proposal_version = null, 
most_recent_commit_at = ?, most_recent_commit = ?, most_recent_commit_version = 
?, most_recent_commit_mutation_id = ? WHERE row_key = ? AND cf_id = ?";
             executeInternalWithNowInSec(cql,
                             nowInSec,
                             commit.ballot.unixMicros(),
                             ttlInSec,
                             commit.ballot,
                             PartitionUpdate.toBytes(commit.update, 
MessagingService.current_version),
                             MessagingService.current_version,
-                            commit.update.partitionKey().getKey(),
-                            commit.update.metadata().id.asUUID());
+                            mutationIdBytes,
+                            commit.partitionKey().getKey(),
+                            commit.metadata().id.asUUID());
         }
         else
         {
-            String cql = "UPDATE system." + PAXOS + " USING TIMESTAMP ? SET 
proposal_ballot = null, proposal = null, proposal_version = null, 
most_recent_commit_at = ?, most_recent_commit = ?, most_recent_commit_version = 
? WHERE row_key = ? AND cf_id = ?";
+            String cql = "UPDATE system." + PAXOS + " USING TIMESTAMP ? SET 
proposal_ballot = null, proposal = null, proposal_version = null, 
most_recent_commit_at = ?, most_recent_commit = ?, most_recent_commit_version = 
?, most_recent_commit_mutation_id = ? WHERE row_key = ? AND cf_id = ?";
             executeInternal(cql,
                             commit.ballot.unixMicros(),
                             commit.ballot,
                             PartitionUpdate.toBytes(commit.update, 
MessagingService.current_version),
                             MessagingService.current_version,
-                            commit.update.partitionKey().getKey(),
-                            commit.update.metadata().id.asUUID());
+                            mutationIdBytes,
+                            commit.partitionKey().getKey(),
+                            commit.metadata().id.asUUID());
         }
     }
 
+    /**
+     * Converts a MutationId to a ByteBuffer for storage in the paxos table.
+     * Returns null if the mutation ID is none (which will be stored as null 
in CQL).
+     */
+    private static ByteBuffer mutationIdToByteBuffer(MutationId id)

Review Comment:
   This would be better as an instance method of `MutationId` I believe.



##########
src/java/org/apache/cassandra/service/CASRequest.java:
##########
@@ -48,6 +48,17 @@ public interface CASRequest
      */
     boolean appliesTo(FilteredPartition current) throws 
InvalidRequestException;
 
+    /**
+     * Returns true if this request should only check conditions without 
applying updates.
+     * This is used for deferred guardrail exception handling - when a 
guardrail exception
+     * occurred during request preparation, we still need to check conditions 
first before
+     * throwing the exception.
+     */
+    default boolean isConditionCheckOnly()

Review Comment:
   Doesn't need a default implementation? In fact, I don't think we need 
`CASRequest` at all? The only existing implementation is `CQL3CasRequest`.
   



##########
src/java/org/apache/cassandra/service/accord/txn/TxnReferenceOperations.java:
##########
@@ -62,12 +62,28 @@ public boolean equals(Object o)
         if (this == o) return true;
         if (o == null || getClass() != o.getClass()) return false;
         TxnReferenceOperations that = (TxnReferenceOperations) o;
-        return metadata.equals(that.metadata) && Objects.equals(clustering, 
that.clustering) && regulars.equals(that.regulars) && 
statics.equals(that.statics);
+
+        // Special case: EMPTY singleton matches any other empty instance
+        if ((this.metadata == null && this.isEmpty() && that.isEmpty()) ||
+            (that.metadata == null && that.isEmpty() && this.isEmpty()))

Review Comment:
   This condition can definitely be simplified.



##########
src/java/org/apache/cassandra/db/SystemKeyspace.java:
##########
@@ -1603,44 +1605,83 @@ public static void savePaxosProposal(Commit proposal)
                             proposal.ballot,
                             PartitionUpdate.toBytes(proposal.update, 
MessagingService.current_version),
                             MessagingService.current_version,
-                            proposal.update.partitionKey().getKey(),
-                            proposal.update.metadata().id.asUUID());
+                            proposal.partitionKey().getKey(),
+                            proposal.metadata().id.asUUID());
         }
     }
 
     public static void savePaxosCommit(Commit commit)
     {
         // We always erase the last proposal (with the commit timestamp to no 
erase more recent proposal in case the commit is old)
         // even though that's really just an optimization  since 
SP.beginAndRepairPaxos will exclude accepted proposal older than the mrc.
+        ByteBuffer mutationIdBytes = 
mutationIdToByteBuffer(commit.mutation.id());
+
         if (commit instanceof Commit.CommittedWithTTL)
         {
             long localDeletionTime = ((Commit.CommittedWithTTL) 
commit).localDeletionTime;
-            int ttlInSec = legacyPaxosTtlSec(commit.update.metadata());
+            int ttlInSec = legacyPaxosTtlSec(commit.metadata());
             long nowInSec = localDeletionTime - ttlInSec;
-            String cql = "UPDATE system." + PAXOS + " USING TIMESTAMP ? AND 
TTL ? SET proposal_ballot = null, proposal = null, proposal_version = null, 
most_recent_commit_at = ?, most_recent_commit = ?, most_recent_commit_version = 
? WHERE row_key = ? AND cf_id = ?";
+            String cql = "UPDATE system." + PAXOS + " USING TIMESTAMP ? AND 
TTL ? SET proposal_ballot = null, proposal = null, proposal_version = null, 
most_recent_commit_at = ?, most_recent_commit = ?, most_recent_commit_version = 
?, most_recent_commit_mutation_id = ? WHERE row_key = ? AND cf_id = ?";
             executeInternalWithNowInSec(cql,
                             nowInSec,
                             commit.ballot.unixMicros(),
                             ttlInSec,
                             commit.ballot,
                             PartitionUpdate.toBytes(commit.update, 
MessagingService.current_version),
                             MessagingService.current_version,
-                            commit.update.partitionKey().getKey(),
-                            commit.update.metadata().id.asUUID());
+                            mutationIdBytes,
+                            commit.partitionKey().getKey(),
+                            commit.metadata().id.asUUID());
         }
         else
         {
-            String cql = "UPDATE system." + PAXOS + " USING TIMESTAMP ? SET 
proposal_ballot = null, proposal = null, proposal_version = null, 
most_recent_commit_at = ?, most_recent_commit = ?, most_recent_commit_version = 
? WHERE row_key = ? AND cf_id = ?";
+            String cql = "UPDATE system." + PAXOS + " USING TIMESTAMP ? SET 
proposal_ballot = null, proposal = null, proposal_version = null, 
most_recent_commit_at = ?, most_recent_commit = ?, most_recent_commit_version = 
?, most_recent_commit_mutation_id = ? WHERE row_key = ? AND cf_id = ?";
             executeInternal(cql,
                             commit.ballot.unixMicros(),
                             commit.ballot,
                             PartitionUpdate.toBytes(commit.update, 
MessagingService.current_version),
                             MessagingService.current_version,
-                            commit.update.partitionKey().getKey(),
-                            commit.update.metadata().id.asUUID());
+                            mutationIdBytes,
+                            commit.partitionKey().getKey(),
+                            commit.metadata().id.asUUID());
         }
     }
 
+    /**
+     * Converts a MutationId to a ByteBuffer for storage in the paxos table.
+     * Returns null if the mutation ID is none (which will be stored as null 
in CQL).
+     */
+    private static ByteBuffer mutationIdToByteBuffer(MutationId id)
+    {
+        if (id == null || id.isNone())
+            return null;
+
+        ByteBuffer buffer = ByteBuffer.allocate(16);
+        buffer.putLong(id.logId());
+        buffer.putLong(id.sequenceId());
+        buffer.flip();
+        return buffer;
+    }
+
+    /**
+     * Converts a ByteBuffer from the paxos table back to a MutationId.
+     * Returns MutationId.none() if the buffer is null or empty.
+     */
+    public static MutationId byteBufferToMutationId(ByteBuffer buffer)

Review Comment:
   This is used elsewhere as well, I think would be cleaner as a method of 
`MutationId`.



##########
src/java/org/apache/cassandra/exceptions/CassandraException.java:
##########
@@ -17,29 +17,245 @@
  */
 package org.apache.cassandra.exceptions;
 
+import java.io.IOException;
+
+import org.apache.cassandra.cql3.constraints.ConstraintViolationException;
+import 
org.apache.cassandra.cql3.constraints.InvalidConstraintDefinitionException;
+import org.apache.cassandra.db.KeyspaceNotDefinedException;
+import org.apache.cassandra.db.MutationExceededMaxSizeException;
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.db.guardrails.GuardrailViolatedException;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import 
org.apache.cassandra.service.accord.exceptions.AccordReadExhaustedException;
+import 
org.apache.cassandra.service.accord.exceptions.AccordReadPreemptedException;
+import 
org.apache.cassandra.service.accord.exceptions.AccordWriteExhaustedException;
+import 
org.apache.cassandra.service.accord.exceptions.AccordWritePreemptedException;
+import org.apache.cassandra.triggers.TriggerDisabledException;
+import org.apache.cassandra.utils.ArraySerializers;
 import org.apache.cassandra.utils.Shared;
 
 import static org.apache.cassandra.utils.Shared.Scope.SIMULATION;
 
 @Shared(scope = SIMULATION)
 public abstract class CassandraException extends RuntimeException implements 
TransportException
 {
-    private final ExceptionCode code;
+    public static final Serializer serializer = new Serializer();
+
+    private final ExceptionCode clientExceptionCode;
 
     protected CassandraException(ExceptionCode code, String msg)
     {
         super(msg);
-        this.code = code;
+        this.clientExceptionCode = code;
     }
 
     protected CassandraException(ExceptionCode code, String msg, Throwable 
cause)
     {
         super(msg, cause);
-        this.code = code;
+        this.clientExceptionCode = code;
     }
 
     public ExceptionCode code()
     {
-        return code;
+        return clientExceptionCode;
+    }
+
+    /**
+     * Returns the CassandraExceptionCode for this specific exception class.
+     * Each subclass must implement this to return its unique serialization 
code.
+     */
+    public abstract CassandraExceptionCode getCassandraExceptionCode();
+
+    /**
+     * Serializer for CassandraException that coordinates serialization of 
subclasses.
+     */
+    @Shared(scope = SIMULATION)
+    public static class Serializer implements 
IVersionedSerializer<CassandraException>
+    {
+        @Override
+        public void serialize(CassandraException exception, DataOutputPlus 
out, int version) throws IOException
+        {
+            // Serialize new CassandraExceptionCode for proper type 
identification
+            CassandraExceptionCode cassandraExceptionCode = 
exception.getCassandraExceptionCode();
+            out.writeUnsignedVInt32(cassandraExceptionCode.value);
+
+            String message = exception.getMessage() == null ? "" : 
exception.getMessage();
+            out.writeUTF(message);
+
+            // Serialize cause and suppressed exceptions using 
ExceptionSerializer
+            
ExceptionSerializer.nullableRemoteExceptionSerializer.serialize(exception.getCause(),
 out, version);
+
+            Throwable[] suppressed = exception.getSuppressed();
+            out.writeUnsignedVInt32(suppressed.length);
+            for (Throwable t : suppressed)
+            {
+                ExceptionSerializer.remoteExceptionSerializer.serialize(t, 
out, version);
+            }
+
+            // Serialize stack trace using existing ExceptionSerializer logic
+            StackTraceElement[] stackTrace = exception.getStackTrace();
+            ArraySerializers.serializeArray(stackTrace, out, version, 
ExceptionSerializer.stackTraceElementSerializer);
+
+            // Delegate to subclass serializer for type-specific fields
+            exception.serializeSpecificFields(out, version);
+        }
+
+        @Override
+        public CassandraException deserialize(DataInputPlus in, int version) 
throws IOException
+        {
+            // Read new CassandraExceptionCode for proper type identification
+            int cassandraExceptionCode = in.readUnsignedVInt32();
+            CassandraExceptionCode classCode = 
CassandraExceptionCode.fromValue(cassandraExceptionCode);
+
+            String message = in.readUTF();
+
+            // Deserialize cause and suppressed exceptions using 
ExceptionSerializer
+            Throwable cause = 
ExceptionSerializer.nullableRemoteExceptionSerializer.deserialize(in, version);
+
+            int suppressedCount = in.readUnsignedVInt32();
+            Throwable[] suppressed = new Throwable[suppressedCount];

Review Comment:
   ArraySerializers, as already used for the stack trace.



##########
src/java/org/apache/cassandra/exceptions/WriteFailureException.java:
##########
@@ -34,4 +39,56 @@ public WriteFailureException(ConsistencyLevel consistency, 
int received, int blo
         super(ExceptionCode.WRITE_FAILURE, consistency, received, blockFor, 
ImmutableMap.copyOf(failureReasonByEndpoint));
         this.writeType = writeType;
     }
+
+    @Override
+    protected void serializeSpecificFields(DataOutputPlus out, int version) 
throws IOException
+    {
+        out.writeByte(consistency.code);
+        out.writeUnsignedVInt32(received);
+        out.writeUnsignedVInt32(blockFor);
+
+        // Serialize failure reason map
+        CollectionSerializers.serializeMap(failureReasonByEndpoint, out, 
version,
+                                           
InetAddressAndPort.Serializer.inetAddressAndPortSerializer,
+                                           RequestFailureReason.serializer);
+
+        out.writeUTF(writeType.toString());

Review Comment:
   WriteType is the only place where we ser/de an enum by its string value 
rather than int code. Same as in `WriteTimeoutException`, possibly elsewhere 
too.



##########
src/java/org/apache/cassandra/db/IReadResponse.java:
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.io.IOException;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.reads.tracked.TrackedDataResponse;
+import org.apache.cassandra.service.reads.tracked.TrackedSummaryResponse;
+
+import static org.apache.cassandra.db.IReadResponse.Kind.UNTRACKED;
+
+public interface IReadResponse
+{
+    enum Kind
+    {
+        UNTRACKED(0),
+        TRACKED_DATA(1),
+        TRACKED_SUMMARY(2),
+        NULL(3);

Review Comment:
   `NULL` is actually never possible, from what I see. Plus the logic in 
serializers is slightly funny in its formatting and structure - can be 
simplified.



##########
src/java/org/apache/cassandra/db/guardrails/Guardrail.java:
##########
@@ -126,21 +141,37 @@ protected void fail(String message, @Nullable ClientState 
state)
 
     protected void fail(String message, String redactedMessage, @Nullable 
ClientState state)
     {
-        message = decorateMessage(message);
+        String decoratedMessage = decorateMessage(message);
+        String decoratedRedactedMessage = decorateMessage(redactedMessage);
 
         if (!skipNotifying(false))
         {
-            logger.error(message);
-            // Note that ClientWarn will simply ignore the message if we're 
not running this as part of a user query
-            // (the internal "state" will be null)
-            ClientWarn.instance.warn(message);
-            // Similarly, tracing will also ignore the message if we're not 
running tracing on the current thread.
-            Tracing.trace(message);
-            GuardrailsDiagnostics.failed(name, 
decorateMessage(redactedMessage));
+            // If we're deferring warnings (during CAS fragment creation), 
defer all failure outputs
+            if (ClientWarn.instance.isDeferring())
+            {
+                // ClientWarn will automatically capture this to the deferred 
list
+                ClientWarn.instance.warn(decoratedMessage);

Review Comment:
   Same as above.



##########
src/java/org/apache/cassandra/exceptions/FunctionExecutionException.java:
##########
@@ -50,4 +55,64 @@ public FunctionExecutionException(FunctionName functionName, 
List<String> argTyp
         this.argTypes = argTypes;
         this.detail = msg;
     }
+
+    @Override
+    protected void serializeSpecificFields(DataOutputPlus out, int version) 
throws IOException
+    {
+        // Serialize FunctionName
+        out.writeBoolean(functionName.keyspace != null);
+        if (functionName.keyspace != null)
+            out.writeUTF(functionName.keyspace);
+        out.writeUTF(functionName.name);
+
+        // Serialize argTypes list
+        out.writeUnsignedVInt32(argTypes.size());
+        for (String argType : argTypes)
+            out.writeUTF(argType);
+
+        // Serialize detail
+        out.writeUTF(detail);
+    }
+
+    @Override
+    protected long serializedSizeSpecificFields(int version)
+    {
+        long size = TypeSizes.BOOL_SIZE; // keyspace present flag
+        if (functionName.keyspace != null)
+            size += TypeSizes.sizeof(functionName.keyspace);
+        size += TypeSizes.sizeof(functionName.name);
+
+        size += TypeSizes.sizeofUnsignedVInt(argTypes.size()); // argTypes 
list size
+        for (String argType : argTypes)
+            size += TypeSizes.sizeof(argType);
+
+        size += TypeSizes.sizeof(detail);
+        return size;
+    }
+
+    static FunctionExecutionException deserializeFields(String message, 
DataInputPlus in, int version) throws IOException
+    {
+        // Deserialize FunctionName
+        boolean hasKeyspace = in.readBoolean();
+        String keyspace = hasKeyspace ? in.readUTF() : null;
+        String name = in.readUTF();
+        FunctionName functionName = new FunctionName(keyspace, name);
+
+        // Deserialize argTypes list
+        int argTypesSize = in.readUnsignedVInt32();
+        List<String> argTypes = new ArrayList<>(argTypesSize);
+        for (int i = 0; i < argTypesSize; i++)
+            argTypes.add(in.readUTF());
+
+        // Deserialize detail
+        String detail = in.readUTF();
+
+        return new FunctionExecutionException(functionName, argTypes, detail);

Review Comment:
   I believe this is incorrect and will skip custom message logic from 
`create()`. Should use `create()` instead.



##########
src/java/org/apache/cassandra/db/EmbeddableSinglePartitionReadCommand.java:
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.io.IOException;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.service.reads.tracked.TrackedRead.DataRequest;
+import org.apache.cassandra.service.reads.tracked.TrackedRead.SummaryRequest;
+
+import static 
org.apache.cassandra.db.EmbeddableSinglePartitionReadCommand.Kind.UNTRACKED;
+
+/**
+ * Interface for read command that allows it be serialized and embedded in 
another message. Used in Paxos
+ * to provide a common base class to serialize for tracked and untracked 
reads. Tracked reads contain
+ * additional information needed to execute the read beyond the read command 
itself so an additional interface
+ * is needed.
+ */
+public interface EmbeddableSinglePartitionReadCommand
+{
+    enum Kind
+    {
+        UNTRACKED,
+        TRACKED_DATA_READ,
+        TRACKED_SUMMARY_READ;
+
+        public boolean isTracked()
+        {
+            return this != UNTRACKED;
+        }
+
+        static final IVersionedSerializer<Kind> serializer = new 
IVersionedSerializer<>()
+        {
+            @Override
+            public void serialize(Kind kind, DataOutputPlus out, int version) 
throws IOException
+            {
+                switch (kind)
+                {
+                    case UNTRACKED:
+                        out.writeByte(0);

Review Comment:
   Using a different pattern here, hard-coding int values instead of using enum 
codes, from how it's done in `IReadResponse.Kind`. Also, since NULL there is 
dead code, now these completely duplicate each other and one could be removed.



##########
src/java/org/apache/cassandra/db/guardrails/GuardrailViolatedException.java:
##########
@@ -18,12 +18,42 @@
 
 package org.apache.cassandra.db.guardrails;
 
+import org.apache.cassandra.exceptions.CassandraExceptionCode;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 
 public class GuardrailViolatedException extends InvalidRequestException
 {
-    GuardrailViolatedException(String message)
+    public GuardrailViolatedException(String message)
     {
         super(message);
     }
+
+    protected GuardrailViolatedException(String message, Throwable cause)
+    {
+        super(message, cause);
+    }
+
+    @Override
+    public CassandraExceptionCode getCassandraExceptionCode()
+    {
+        return CassandraExceptionCode.GUARDRAIL_VIOLATED;
+    }
+
+    /**
+     * Wraps a guardrail exception for deferred throwing, preserving the 
exception type.
+     * This is used when a guardrail exception is caught during CAS request 
preparation
+     * but needs to be thrown later after conditions are checked.
+     *
+     * @param original the original exception that was deferred
+     * @return a new exception of the same type with the original as the cause
+     */
+    public static GuardrailViolatedException 
wrapForDeferredThrow(GuardrailViolatedException original)

Review Comment:
   I believe this would be cleaner as an instance method, with an override in 
`PasswordGuardrailException`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to