iamaleksey commented on code in PR #4447: URL: https://github.com/apache/cassandra/pull/4447#discussion_r2742332360
########## src/java/org/apache/cassandra/service/paxos/CasForwardResponse.java: ########## @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service.paxos; + +import java.io.IOException; +import java.util.List; + +import org.apache.cassandra.db.TypeSizes; +import org.apache.cassandra.db.partitions.FilteredPartition; +import org.apache.cassandra.db.rows.RowIterator; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; +import org.apache.cassandra.db.rows.UnfilteredRowIteratorSerializer; +import org.apache.cassandra.db.rows.UnfilteredRowIterators; +import org.apache.cassandra.exceptions.CassandraException; +import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.schema.Schema; +import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.utils.CollectionSerializers; + +import static org.apache.cassandra.db.SerializationHeader.StableHeaderSerializer.STABLE; +import static org.apache.cassandra.db.rows.DeserializationHelper.Flag.FROM_REMOTE; + +/** + * Response containing the result of a forwarded CAS operation. + * Can contain either a successful result or an exception that occurred during execution. + */ +public class CasForwardResponse +{ + public static final Serializer serializer = new Serializer(); + + public final RowIterator result; + public final CassandraException exception; + public final List<String> warnings; + + public CasForwardResponse(RowIterator result, List<String> warnings) + { + this.result = result; + this.exception = null; + this.warnings = warnings; + } + + public CasForwardResponse(CassandraException exception, List<String> warnings) + { + this.result = null; + this.exception = exception; + this.warnings = warnings; + } + + private CasForwardResponse(RowIterator result, CassandraException exception, List<String> warnings) + { + this.result = result; + this.exception = exception; + this.warnings = warnings; + } + + public boolean isSuccess() + { + return exception == null; + } + + public static class Serializer implements IVersionedSerializer<CasForwardResponse> + { + @Override + public void serialize(CasForwardResponse response, DataOutputPlus out, int version) throws IOException + { + out.writeBoolean(response.exception != null); Review Comment: Would be cleaner with three flags, one for each field, instead of this nested complexity and asymmetric warnings ser/de logic I think. Plus having `warning` be a nullable has some messy handling consequences. I'd prefer it be an `emptyList()` instead. ########## src/java/org/apache/cassandra/service/paxos/Paxos2CommitForwardRequest.java: ########## @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service.paxos; + +import java.io.IOException; + +import org.apache.cassandra.db.ConsistencyLevel; +import org.apache.cassandra.db.TypeSizes; +import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.locator.EndpointsForToken; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.locator.Replica; +import org.apache.cassandra.service.paxos.Commit.Agreed; + +import static org.apache.cassandra.dht.AbstractBounds.tokenSerializer; + +/** + * Request to forward a Paxos V2 commit operation to a replica coordinator. + * This is used when the original coordinator is not a replica but needs to + * execute a Paxos commit for a tracked keyspace that requires MutationId generation. + * + * Contains only the essential data needed by PaxosCommit instead of the full Participants object. + */ +public class Paxos2CommitForwardRequest +{ + public static final Serializer serializer = new Serializer(); + + public final Agreed commit; + public final ConsistencyLevel consistencyForConsensus; + public final ConsistencyLevel consistencyForCommit; + public final EndpointsForToken all; + public final EndpointsForToken allLive; + public final EndpointsForToken allDown; + public final int required; + public final boolean isUrgent; + + public Paxos2CommitForwardRequest(Agreed commit, + ConsistencyLevel consistencyForConsensus, + ConsistencyLevel consistencyForCommit, + EndpointsForToken all, + EndpointsForToken allLive, + EndpointsForToken allDown, + int required, + boolean isUrgent) + { + this.commit = commit; + this.consistencyForConsensus = consistencyForConsensus; + this.consistencyForCommit = consistencyForCommit; + this.all = all; + this.allLive = allLive; + this.allDown = allDown; + this.required = required; + this.isUrgent = isUrgent; + } + + public static class Serializer implements IVersionedSerializer<Paxos2CommitForwardRequest> + { + @Override + public void serialize(Paxos2CommitForwardRequest request, DataOutputPlus out, int version) throws IOException + { + Agreed.serializer.serialize(request.commit, out, version); + out.writeByte(request.consistencyForConsensus.code); + out.writeByte(request.consistencyForCommit.code); + + // Serialize EndpointsForToken collections + serializeEndpoints(request.all, out, version); + serializeEndpoints(request.allLive, out, version); + serializeEndpoints(request.allDown, out, version); + + out.writeUnsignedVInt32(request.required); + out.writeBoolean(request.isUrgent); + } + + @Override + public Paxos2CommitForwardRequest deserialize(DataInputPlus in, int version) throws IOException + { + Agreed commit = Agreed.serializer.deserialize(in, version); + ConsistencyLevel consistencyForConsensus = ConsistencyLevel.fromCode(in.readUnsignedByte()); + ConsistencyLevel consistencyForCommit = ConsistencyLevel.fromCode(in.readUnsignedByte()); + + // Deserialize EndpointsForToken collections using partitioner from commit + IPartitioner partitioner = commit.metadata().partitioner; + EndpointsForToken all = deserializeEndpoints(in, version, partitioner); + EndpointsForToken allLive = deserializeEndpoints(in, version, partitioner); + EndpointsForToken allDown = deserializeEndpoints(in, version, partitioner); + + int required = in.readUnsignedVInt32(); + boolean isUrgent = in.readBoolean(); + + return new Paxos2CommitForwardRequest(commit, consistencyForConsensus, consistencyForCommit, + all, allLive, allDown, required, isUrgent); + } + + @Override + public long serializedSize(Paxos2CommitForwardRequest request, int version) + { + long size = Agreed.serializer.serializedSize(request.commit, version) + + 1 // consistencyForConsensus.code + + 1; // consistencyForCommit.code + + size += endpointsSerializedSize(request.all, version); + size += endpointsSerializedSize(request.allLive, version); + size += endpointsSerializedSize(request.allDown, version); + + size += TypeSizes.sizeofUnsignedVInt(request.required); + size += TypeSizes.BOOL_SIZE; // isUrgent + + return size; + } + + private void serializeEndpoints(EndpointsForToken endpoints, DataOutputPlus out, int version) throws IOException + { + out.writeUnsignedVInt32(endpoints.size()); + Token token = endpoints.token(); + Token.compactSerializer.serialize(token, out, version); + + for (Replica replica : endpoints) + { + InetAddressAndPort.Serializer.inetAddressAndPortSerializer.serialize(replica.endpoint(), out, version); Review Comment: Replica.Serializer exists. ########## src/java/org/apache/cassandra/service/paxos/CasForwardRequest.java: ########## @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service.paxos; + +import java.io.IOException; + +import com.google.common.base.Preconditions; + +import org.apache.cassandra.cql3.statements.CQL3CasRequest; +import org.apache.cassandra.db.ConsistencyLevel; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.TypeSizes; +import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.service.ClientState; +import org.apache.cassandra.service.RemoteClientState; + +/** + * Request to forward a CAS operation to a replica coordinator for tracked keyspaces. + * Contains the essential information needed to execute the CAS operation on the remote coordinator. + */ +public class CasForwardRequest +{ + public static final Serializer serializer = new Serializer(); + + public final String keyspaceName; + public final String cfName; + public final DecoratedKey key; + public final ConsistencyLevel consistencyForPaxos; + public final ConsistencyLevel consistencyForCommit; + public final long nowInSeconds; + public final RemoteClientState clientState; + public final CQL3CasRequest casRequest; // The actual CAS request to forward + + /** + * Backward compatibility constructor that accepts ClientState. + */ + public CasForwardRequest(String keyspaceName, + String cfName, + DecoratedKey key, + CQL3CasRequest request, + ConsistencyLevel consistencyForPaxos, + ConsistencyLevel consistencyForCommit, + ClientState clientState, + long nowInSeconds) + { + Preconditions.checkNotNull(keyspaceName, "keyspaceName cannot be null"); + Preconditions.checkNotNull(cfName, "cfName cannot be null"); + Preconditions.checkNotNull(key, "key cannot be null"); + Preconditions.checkNotNull(request, "request cannot be null"); + Preconditions.checkNotNull(consistencyForPaxos, "consistencyForPaxos cannot be null"); + Preconditions.checkNotNull(consistencyForCommit, "consistencyForCommit cannot be null"); + Preconditions.checkNotNull(clientState, "clientState cannot be null"); Review Comment: Not sure why here and in `ConsensusReadForwardRequest` we are checking for nulls, but nowhere else, but you can cut all these lines by assigning the return value of `checkNotNull()` ########## src/java/org/apache/cassandra/db/guardrails/Guardrail.java: ########## @@ -108,15 +108,30 @@ protected void warn(String message, String redactedMessage) if (skipNotifying(true)) return; - message = decorateMessage(message); + String decoratedMessage = decorateMessage(message); + String decoratedRedactedMessage = decorateMessage(redactedMessage); - logger.warn(message); + // If we're deferring warnings (during CAS fragment creation), defer all warning outputs + if (ClientWarn.instance.isDeferring()) + { + // ClientWarn will automatically capture this to the deferred list + ClientWarn.instance.warn(decoratedMessage); Review Comment: Can pull this out of the if statement as it's shared with the other branch, on line 128. ########## src/java/org/apache/cassandra/exceptions/WriteTimeoutException.java: ########## @@ -36,4 +41,38 @@ public WriteTimeoutException(WriteType writeType, ConsistencyLevel consistency, super(ExceptionCode.WRITE_TIMEOUT, consistency, received, blockFor, msg); this.writeType = writeType; } + + @Override + protected void serializeSpecificFields(DataOutputPlus out, int version) throws IOException + { + out.writeByte(consistency.code); + out.writeUnsignedVInt32(received); + out.writeUnsignedVInt32(blockFor); + out.writeUTF(writeType.toString()); Review Comment: WriteType is the only place where we ser/de an enum by its string value rather than int code. ########## src/java/org/apache/cassandra/db/guardrails/PasswordGuardrail.java: ########## @@ -95,5 +95,11 @@ public static class PasswordGuardrailException extends GuardrailViolatedExceptio super(message); this.redactedMessage = redactedMessage; } + + PasswordGuardrailException(String message, String redactedMessage, Throwable cause) Review Comment: Doesn't override `warn()` and `fail()` implementing deferral, which I think is problematic. ########## src/java/org/apache/cassandra/service/RemoteClientState.java: ########## @@ -0,0 +1,322 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.service; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import org.apache.cassandra.auth.AuthenticatedUser; +import org.apache.cassandra.auth.IResource; +import org.apache.cassandra.auth.Permission; +import org.apache.cassandra.cql3.functions.Function; +import org.apache.cassandra.db.TypeSizes; +import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.utils.MD5Digest; + +/** + * A serializable version of ClientState designed for forwarding operations to remote coordinators. + * + * This class contains only the essential authorization attributes needed for forwarded operations, + * avoiding the complex serialization challenges of the full AuthenticatedUser object. + * + * Key design principles: + * - Contains only serializable authorization state (isSuper, isInternal, etc.) + * - Supports guardrail enforcement through isOrdinaryUser() and applyGuardrails() + * - Throws exceptions for operations requiring the full user object (getUser(), authorize()) + * - Used specifically for CAS and consensus read forwarding in tracked keyspaces + */ +public class RemoteClientState extends ClientState +{ + public static final Serializer serializer = new Serializer(); + + // Essential serializable state for authorization context + private final boolean isSuperCached; + private final String userName; // For logging/error messages (optional) + + /** + * Creates a RemoteClientState from a local ClientState for forwarding. + * Extracts only the essential authorization attributes. + */ + public static RemoteClientState from(ClientState localState) + { + if (!(localState instanceof LocalClientState)) + { + throw new IllegalArgumentException("Cannot create RemoteClientState from non-LocalClientState: " + localState.getClass()); + } + LocalClientState local = (LocalClientState) localState; + + return new RemoteClientState( + localState.isSuper(), + localState.isSystem(), + localState.applyGuardrails(), + localState.getRawKeyspace(), + local.getUser() != null ? local.getUser().getName() : null + ); + } + + /** + * Private constructor for deserialization and from() method. + */ + private RemoteClientState(boolean isSuper, boolean isInternal, boolean applyGuardrails, String keyspace, String userName) + { + super(isInternal); + this.isSuperCached = isSuper; + this.userName = userName; + + // Set the cached guardrail and keyspace state in the base class + if (!applyGuardrails) + pauseGuardrails(); + if (keyspace != null) + setRawKeyspace(keyspace); + } + + // ========================================== + // Cached user attributes (implemented with cached values) + // ========================================== + + @Override + public boolean isSuper() + { + return isSuperCached; + } + + // ========================================== + // Methods that throw exceptions for remote operations + // ========================================== + + @Override + public AuthenticatedUser getUser() + { + throw new UnsupportedOperationException("Cannot access user object in remote client state. " + + "Full user operations must be performed on the original coordinator."); + } + + @Override + public void login(AuthenticatedUser user) + { + throw new UnsupportedOperationException("Cannot login on remote client state."); + } + + @Override + public void setKeyspace(String ks) + { + throw new UnsupportedOperationException("Cannot modify keyspace on remote client state."); + } + + @Override + protected Set<Permission> authorize(IResource resource) + { + throw new UnsupportedOperationException("Cannot perform authorization in remote client state. " + + "Authorization must be performed on the original coordinator before forwarding."); + } + + @Override + public void ensurePermission(Permission perm, IResource resource) + { + throw new UnsupportedOperationException("Cannot perform authorization in remote client state. " + + "Authorization must be performed on the original coordinator before forwarding."); + } + + @Override + public void ensurePermission(Permission permission, Function function) + { + throw new UnsupportedOperationException("Cannot perform authorization in remote client state. " + + "Authorization must be performed on the original coordinator before forwarding."); + } + + @Override + public boolean hasTablePermission(TableMetadata table, Permission perm) + { + throw new UnsupportedOperationException("Cannot check table permissions in remote client state. " + + "Permission checks must be performed on the original coordinator before forwarding."); + } + + @Override + public void validateLogin() + { + throw new UnsupportedOperationException("Cannot validate login in remote client state. " + + "Login validation must be performed on the original coordinator before forwarding."); + } + + @Override + public void ensureNotAnonymous() + { + throw new UnsupportedOperationException("Cannot validate anonymous status in remote client state. " + + "Anonymous validation must be performed on the original coordinator before forwarding."); + } + + @Override + public InetSocketAddress getRemoteAddress() + { + throw new UnsupportedOperationException("Remote address not available in remote client state."); + } + + @Override + public InetAddress getClientAddress() + { + throw new UnsupportedOperationException("Client address not available in remote client state."); + } + + @Override + public Optional<String> getDriverName() + { + throw new UnsupportedOperationException("Driver information not available in remote client state."); + } + + @Override + public Optional<String> getDriverVersion() + { + throw new UnsupportedOperationException("Driver information not available in remote client state."); + } + + @Override + public Optional<Map<String, String>> getClientOptions() + { + throw new UnsupportedOperationException("Client options not available in remote client state."); + } + + @Override + public void setDriverName(String driverName) + { + throw new UnsupportedOperationException("Cannot modify driver information in remote client state."); + } + + @Override + public void setDriverVersion(String driverVersion) + { + throw new UnsupportedOperationException("Cannot modify driver information in remote client state."); + } + + @Override + public void setClientOptions(Map<String, String> clientOptions) + { + throw new UnsupportedOperationException("Cannot modify client options in remote client state."); + } + + @Override + public void warnAboutUseWithPreparedStatements(MD5Digest statementId, String preparedKeyspace) + { + throw new UnsupportedOperationException("Cannot issue warnings in remote client state. " + + "Warnings should be issued on the original coordinator."); + } + + @Override + public void warnAboutUneligiblePreparedStatement(MD5Digest statementId) + { + throw new UnsupportedOperationException("Cannot issue warnings in remote client state. " + + "Warnings should be issued on the original coordinator."); + } + + /** + * Get the user name for logging purposes (if available). + */ + public String getUserNameForLogging() + { + return userName != null ? userName : "unknown"; + } + + /** + * Serializer for RemoteClientState to enable network transmission. + */ + public static class Serializer implements IVersionedSerializer<RemoteClientState> + { + @Override + public void serialize(RemoteClientState state, DataOutputPlus out, int version) throws IOException + { + out.writeBoolean(state.isSuperCached); Review Comment: Five scattered booleans here could all be flags. ########## src/java/org/apache/cassandra/service/ClientWarn.java: ########## @@ -87,23 +88,121 @@ public void resetWarnings() set(null); } + /** + * Start deferring warnings. Any warnings added after this call will be stored + * separately and can later be committed (inserted at the marked position) or discarded. + * The insertion point is recorded as the current size of the warnings list. + * <p> + * This is used for CAS operations where we don't want to emit warnings until + * we know whether the conditions passed. + */ + public void startDeferring() + { + State state = get(); + if (state != null) + state.startDeferring(); + } + + /** + * Check if we're currently deferring warnings. + */ + public boolean isDeferring() + { + State state = get(); + return state != null && state.isDeferring(); + } + + /** + * Commit deferred warnings by inserting them at the recorded insertion point. + * After this call, deferring mode is disabled. + * + * @param replayAction optional consumer to receive deferred actions for replay (e.g., guardrail diagnostic events) + */ + public void commitDeferredWarnings(Consumer<Runnable> replayAction) + { + State state = get(); + if (state != null) + state.commitDeferredWarnings(replayAction); + } + + /** + * Commit deferred warnings by inserting them at the recorded insertion point. + * After this call, deferring mode is disabled. + */ + public void commitDeferredWarnings() + { + commitDeferredWarnings(Runnable::run); + } + + /** + * Discard all deferred warnings without adding them to the main warnings list. + * After this call, deferring mode is disabled. + */ + public void discardDeferredWarnings() + { + State state = get(); + if (state != null) + state.discardDeferredWarnings(); + } + + /** + * Store a deferred action to be executed when warnings are committed. + * This is used to defer guardrail diagnostic events until we know conditions passed. + */ + public void addDeferredAction(Runnable action) + { + State state = get(); + if (state != null) + state.addDeferredAction(action); + } + public static class State { private boolean collecting = true; // This must be a thread-safe list. Even though it's wrapped in a ThreadLocal, it's propagated to each thread // from shared state, so multiple threads can reference the same State. private volatile List<String> warnings; + // Deferred warnings support for CAS operations + // These must also be thread-safe since State can be shared across threads + private volatile List<String> deferredWarnings; + private volatile List<Runnable> deferredActions; + private volatile int insertionPoint = -1; + Review Comment: Looks like Claude copied the existing cursed pattern and then made it even more cursed. The formatting is off, but also this class is way too complicated for what it does. ########## src/java/org/apache/cassandra/service/paxos/ConsensusReadForwardResponse.java: ########## @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service.paxos; + +import java.io.IOException; +import java.util.List; + +import org.apache.cassandra.db.TypeSizes; +import org.apache.cassandra.db.partitions.FilteredPartition; +import org.apache.cassandra.db.partitions.PartitionIterator; +import org.apache.cassandra.db.partitions.PartitionIterators; +import org.apache.cassandra.db.rows.RowIterator; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; +import org.apache.cassandra.db.rows.UnfilteredRowIteratorSerializer; +import org.apache.cassandra.db.rows.UnfilteredRowIterators; +import org.apache.cassandra.exceptions.CassandraException; +import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.schema.Schema; +import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.utils.CollectionSerializers; + +import static org.apache.cassandra.db.SerializationHeader.StableHeaderSerializer.STABLE; +import static org.apache.cassandra.db.rows.DeserializationHelper.Flag.FROM_REMOTE; + +/** + * Response containing the result of a forwarded consensus read operation. + * Can contain either a successful result or an exception that occurred during execution. + * + * Since consensus reads are single partition reads, we store a single RowIterator + * and wrap it as a PartitionIterator when accessed. + */ +public class ConsensusReadForwardResponse +{ + public static final Serializer serializer = new Serializer(); + + // Store as RowIterator internally (single partition) but expose as PartitionIterator + private final RowIterator rowIterator; + public final CassandraException exception; + public final List<String> warnings; + + public ConsensusReadForwardResponse(PartitionIterator result, List<String> warnings) + { + // Extract the single partition from the iterator (consensus reads are single partition) + if (result != null && result.hasNext()) + { + this.rowIterator = result.next(); + } + else + { + this.rowIterator = null; + } + this.exception = null; + this.warnings = warnings; + } + + public ConsensusReadForwardResponse(CassandraException exception, List<String> warnings) + { + this.rowIterator = null; + this.exception = exception; + this.warnings = warnings; + } + + // Private constructor for deserialization + private ConsensusReadForwardResponse(RowIterator rowIterator, CassandraException exception, List<String> warnings) + { + this.rowIterator = rowIterator; + this.exception = exception; + this.warnings = warnings; + } + + public boolean isSuccess() + { + return exception == null; + } + + /** + * Get the result as a PartitionIterator. + */ + public PartitionIterator getResult() + { + if (rowIterator == null) + return null; + return PartitionIterators.singletonIterator(rowIterator); + } + + public static class Serializer implements IVersionedSerializer<ConsensusReadForwardResponse> + { + @Override + public void serialize(ConsensusReadForwardResponse response, DataOutputPlus out, int version) throws IOException + { + out.writeBoolean(response.exception != null); Review Comment: Same note, duplicating: Would be cleaner with three flags, one for each field, instead of this nested complexity and asymmetric warnings ser/de logic I think. Plus having warning be a nullable has some messy handling consequences. I'd prefer it be an emptyList() instead. ########## src/java/org/apache/cassandra/service/paxos/ConsensusReadForwardRequest.java: ########## @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service.paxos; + +import java.io.IOException; + +import com.google.common.base.Preconditions; + +import org.apache.cassandra.db.ConsistencyLevel; +import org.apache.cassandra.db.ReadCommand; +import org.apache.cassandra.db.SinglePartitionReadCommand; +import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; + +/** + * Request to forward a consensus read operation to a replica coordinator. + * This is used when the original coordinator is not a replica but needs to + * execute a consensus read for a tracked keyspace that requires proper coordination. + * + * Consensus reads only ever contain a single read command. + */ +public class ConsensusReadForwardRequest +{ + public static final Serializer serializer = new Serializer(); + + public final SinglePartitionReadCommand command; + public final ConsistencyLevel consistencyLevel; + + public ConsensusReadForwardRequest(SinglePartitionReadCommand command, + ConsistencyLevel consistencyLevel) + { + Preconditions.checkNotNull(command, "command cannot be null"); + Preconditions.checkNotNull(consistencyLevel, "consistencyLevel cannot be null"); Review Comment: Not sure why here and in `CASForwardRequest` we are checking for nulls, but nowhere else, but you can cut all these lines by assigning the return value of `checkNotNull()` ########## src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java: ########## @@ -580,4 +586,234 @@ public ConsensusAttemptResult toCasResult(TxnResult txnResult) TxnDataKeyValue partition = (TxnDataKeyValue)txnData.get(txnDataName(CAS_READ)); return casResult(partition != null ? partition.rowIterator(false) : null); } + + /** + * IVersionedSerializer for CQL3CasRequest to enable CAS forwarding between coordinators. + * + */ + public static class Serializer implements IVersionedSerializer<CQL3CasRequest> + { + public static final Serializer instance = new Serializer(); + + @Override + public void serialize(CQL3CasRequest request, DataOutputPlus out, int version) throws IOException + { + // Serialize table metadata using compact form + request.metadata.id.serializeCompact(out); + + // Serialize partition key + DecoratedKey.serializer.serialize(request.key, out, version); + + // Serialize condition columns - serialize statics and regulars separately + Columns.serializer.serialize(request.conditionColumns.statics, out); + Columns.serializer.serialize(request.conditionColumns.regulars, out); + + // Serialize boolean flags as bit field + byte flags = (byte) ((request.updatesRegularRows ? 0x01 : 0) | + (request.updatesStaticRow ? 0x02 : 0) | + (request.hasExists ? 0x04 : 0) | + (request.conditionCheckOnly ? 0x08 : 0)); Review Comment: Make up some constants for the flags maybe? It's what we usually do. ########## src/java/org/apache/cassandra/service/accord/txn/TxnReferenceOperations.java: ########## @@ -62,12 +62,28 @@ public boolean equals(Object o) if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; TxnReferenceOperations that = (TxnReferenceOperations) o; - return metadata.equals(that.metadata) && Objects.equals(clustering, that.clustering) && regulars.equals(that.regulars) && statics.equals(that.statics); + + // Special case: EMPTY singleton matches any other empty instance + if ((this.metadata == null && this.isEmpty() && that.isEmpty()) || + (that.metadata == null && that.isEmpty() && this.isEmpty())) + return true; + + // Otherwise, use strict comparison including metadata + return Objects.equals(metadata, that.metadata) && + Objects.equals(clustering, that.clustering) && + regulars.equals(that.regulars) && + statics.equals(that.statics); } @Override public int hashCode() { + // ALL empty instances (regardless of metadata) get the same hash code + // to match the equals() behavior + if (isEmpty()) + return Objects.hash("EMPTY_SINGLETON"); Review Comment: This is not the best way to do it :) ########## src/java/org/apache/cassandra/db/SystemKeyspace.java: ########## @@ -1603,44 +1605,83 @@ public static void savePaxosProposal(Commit proposal) proposal.ballot, PartitionUpdate.toBytes(proposal.update, MessagingService.current_version), MessagingService.current_version, - proposal.update.partitionKey().getKey(), - proposal.update.metadata().id.asUUID()); + proposal.partitionKey().getKey(), + proposal.metadata().id.asUUID()); } } public static void savePaxosCommit(Commit commit) { // We always erase the last proposal (with the commit timestamp to no erase more recent proposal in case the commit is old) // even though that's really just an optimization since SP.beginAndRepairPaxos will exclude accepted proposal older than the mrc. + ByteBuffer mutationIdBytes = mutationIdToByteBuffer(commit.mutation.id()); + if (commit instanceof Commit.CommittedWithTTL) { long localDeletionTime = ((Commit.CommittedWithTTL) commit).localDeletionTime; - int ttlInSec = legacyPaxosTtlSec(commit.update.metadata()); + int ttlInSec = legacyPaxosTtlSec(commit.metadata()); long nowInSec = localDeletionTime - ttlInSec; - String cql = "UPDATE system." + PAXOS + " USING TIMESTAMP ? AND TTL ? SET proposal_ballot = null, proposal = null, proposal_version = null, most_recent_commit_at = ?, most_recent_commit = ?, most_recent_commit_version = ? WHERE row_key = ? AND cf_id = ?"; + String cql = "UPDATE system." + PAXOS + " USING TIMESTAMP ? AND TTL ? SET proposal_ballot = null, proposal = null, proposal_version = null, most_recent_commit_at = ?, most_recent_commit = ?, most_recent_commit_version = ?, most_recent_commit_mutation_id = ? WHERE row_key = ? AND cf_id = ?"; executeInternalWithNowInSec(cql, nowInSec, commit.ballot.unixMicros(), ttlInSec, commit.ballot, PartitionUpdate.toBytes(commit.update, MessagingService.current_version), MessagingService.current_version, - commit.update.partitionKey().getKey(), - commit.update.metadata().id.asUUID()); + mutationIdBytes, + commit.partitionKey().getKey(), + commit.metadata().id.asUUID()); } else { - String cql = "UPDATE system." + PAXOS + " USING TIMESTAMP ? SET proposal_ballot = null, proposal = null, proposal_version = null, most_recent_commit_at = ?, most_recent_commit = ?, most_recent_commit_version = ? WHERE row_key = ? AND cf_id = ?"; + String cql = "UPDATE system." + PAXOS + " USING TIMESTAMP ? SET proposal_ballot = null, proposal = null, proposal_version = null, most_recent_commit_at = ?, most_recent_commit = ?, most_recent_commit_version = ?, most_recent_commit_mutation_id = ? WHERE row_key = ? AND cf_id = ?"; executeInternal(cql, commit.ballot.unixMicros(), commit.ballot, PartitionUpdate.toBytes(commit.update, MessagingService.current_version), MessagingService.current_version, - commit.update.partitionKey().getKey(), - commit.update.metadata().id.asUUID()); + mutationIdBytes, + commit.partitionKey().getKey(), + commit.metadata().id.asUUID()); } } + /** + * Converts a MutationId to a ByteBuffer for storage in the paxos table. + * Returns null if the mutation ID is none (which will be stored as null in CQL). + */ + private static ByteBuffer mutationIdToByteBuffer(MutationId id) Review Comment: This would be better as an instance method of `MutationId` I believe. ########## src/java/org/apache/cassandra/service/CASRequest.java: ########## @@ -48,6 +48,17 @@ public interface CASRequest */ boolean appliesTo(FilteredPartition current) throws InvalidRequestException; + /** + * Returns true if this request should only check conditions without applying updates. + * This is used for deferred guardrail exception handling - when a guardrail exception + * occurred during request preparation, we still need to check conditions first before + * throwing the exception. + */ + default boolean isConditionCheckOnly() Review Comment: Doesn't need a default implementation? In fact, I don't think we need `CASRequest` at all? The only existing implementation is `CQL3CasRequest`. ########## src/java/org/apache/cassandra/service/accord/txn/TxnReferenceOperations.java: ########## @@ -62,12 +62,28 @@ public boolean equals(Object o) if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; TxnReferenceOperations that = (TxnReferenceOperations) o; - return metadata.equals(that.metadata) && Objects.equals(clustering, that.clustering) && regulars.equals(that.regulars) && statics.equals(that.statics); + + // Special case: EMPTY singleton matches any other empty instance + if ((this.metadata == null && this.isEmpty() && that.isEmpty()) || + (that.metadata == null && that.isEmpty() && this.isEmpty())) Review Comment: This condition can definitely be simplified. ########## src/java/org/apache/cassandra/db/SystemKeyspace.java: ########## @@ -1603,44 +1605,83 @@ public static void savePaxosProposal(Commit proposal) proposal.ballot, PartitionUpdate.toBytes(proposal.update, MessagingService.current_version), MessagingService.current_version, - proposal.update.partitionKey().getKey(), - proposal.update.metadata().id.asUUID()); + proposal.partitionKey().getKey(), + proposal.metadata().id.asUUID()); } } public static void savePaxosCommit(Commit commit) { // We always erase the last proposal (with the commit timestamp to no erase more recent proposal in case the commit is old) // even though that's really just an optimization since SP.beginAndRepairPaxos will exclude accepted proposal older than the mrc. + ByteBuffer mutationIdBytes = mutationIdToByteBuffer(commit.mutation.id()); + if (commit instanceof Commit.CommittedWithTTL) { long localDeletionTime = ((Commit.CommittedWithTTL) commit).localDeletionTime; - int ttlInSec = legacyPaxosTtlSec(commit.update.metadata()); + int ttlInSec = legacyPaxosTtlSec(commit.metadata()); long nowInSec = localDeletionTime - ttlInSec; - String cql = "UPDATE system." + PAXOS + " USING TIMESTAMP ? AND TTL ? SET proposal_ballot = null, proposal = null, proposal_version = null, most_recent_commit_at = ?, most_recent_commit = ?, most_recent_commit_version = ? WHERE row_key = ? AND cf_id = ?"; + String cql = "UPDATE system." + PAXOS + " USING TIMESTAMP ? AND TTL ? SET proposal_ballot = null, proposal = null, proposal_version = null, most_recent_commit_at = ?, most_recent_commit = ?, most_recent_commit_version = ?, most_recent_commit_mutation_id = ? WHERE row_key = ? AND cf_id = ?"; executeInternalWithNowInSec(cql, nowInSec, commit.ballot.unixMicros(), ttlInSec, commit.ballot, PartitionUpdate.toBytes(commit.update, MessagingService.current_version), MessagingService.current_version, - commit.update.partitionKey().getKey(), - commit.update.metadata().id.asUUID()); + mutationIdBytes, + commit.partitionKey().getKey(), + commit.metadata().id.asUUID()); } else { - String cql = "UPDATE system." + PAXOS + " USING TIMESTAMP ? SET proposal_ballot = null, proposal = null, proposal_version = null, most_recent_commit_at = ?, most_recent_commit = ?, most_recent_commit_version = ? WHERE row_key = ? AND cf_id = ?"; + String cql = "UPDATE system." + PAXOS + " USING TIMESTAMP ? SET proposal_ballot = null, proposal = null, proposal_version = null, most_recent_commit_at = ?, most_recent_commit = ?, most_recent_commit_version = ?, most_recent_commit_mutation_id = ? WHERE row_key = ? AND cf_id = ?"; executeInternal(cql, commit.ballot.unixMicros(), commit.ballot, PartitionUpdate.toBytes(commit.update, MessagingService.current_version), MessagingService.current_version, - commit.update.partitionKey().getKey(), - commit.update.metadata().id.asUUID()); + mutationIdBytes, + commit.partitionKey().getKey(), + commit.metadata().id.asUUID()); } } + /** + * Converts a MutationId to a ByteBuffer for storage in the paxos table. + * Returns null if the mutation ID is none (which will be stored as null in CQL). + */ + private static ByteBuffer mutationIdToByteBuffer(MutationId id) + { + if (id == null || id.isNone()) + return null; + + ByteBuffer buffer = ByteBuffer.allocate(16); + buffer.putLong(id.logId()); + buffer.putLong(id.sequenceId()); + buffer.flip(); + return buffer; + } + + /** + * Converts a ByteBuffer from the paxos table back to a MutationId. + * Returns MutationId.none() if the buffer is null or empty. + */ + public static MutationId byteBufferToMutationId(ByteBuffer buffer) Review Comment: This is used elsewhere as well, I think would be cleaner as a method of `MutationId`. ########## src/java/org/apache/cassandra/exceptions/CassandraException.java: ########## @@ -17,29 +17,245 @@ */ package org.apache.cassandra.exceptions; +import java.io.IOException; + +import org.apache.cassandra.cql3.constraints.ConstraintViolationException; +import org.apache.cassandra.cql3.constraints.InvalidConstraintDefinitionException; +import org.apache.cassandra.db.KeyspaceNotDefinedException; +import org.apache.cassandra.db.MutationExceededMaxSizeException; +import org.apache.cassandra.db.TypeSizes; +import org.apache.cassandra.db.guardrails.GuardrailViolatedException; +import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.service.accord.exceptions.AccordReadExhaustedException; +import org.apache.cassandra.service.accord.exceptions.AccordReadPreemptedException; +import org.apache.cassandra.service.accord.exceptions.AccordWriteExhaustedException; +import org.apache.cassandra.service.accord.exceptions.AccordWritePreemptedException; +import org.apache.cassandra.triggers.TriggerDisabledException; +import org.apache.cassandra.utils.ArraySerializers; import org.apache.cassandra.utils.Shared; import static org.apache.cassandra.utils.Shared.Scope.SIMULATION; @Shared(scope = SIMULATION) public abstract class CassandraException extends RuntimeException implements TransportException { - private final ExceptionCode code; + public static final Serializer serializer = new Serializer(); + + private final ExceptionCode clientExceptionCode; protected CassandraException(ExceptionCode code, String msg) { super(msg); - this.code = code; + this.clientExceptionCode = code; } protected CassandraException(ExceptionCode code, String msg, Throwable cause) { super(msg, cause); - this.code = code; + this.clientExceptionCode = code; } public ExceptionCode code() { - return code; + return clientExceptionCode; + } + + /** + * Returns the CassandraExceptionCode for this specific exception class. + * Each subclass must implement this to return its unique serialization code. + */ + public abstract CassandraExceptionCode getCassandraExceptionCode(); + + /** + * Serializer for CassandraException that coordinates serialization of subclasses. + */ + @Shared(scope = SIMULATION) + public static class Serializer implements IVersionedSerializer<CassandraException> + { + @Override + public void serialize(CassandraException exception, DataOutputPlus out, int version) throws IOException + { + // Serialize new CassandraExceptionCode for proper type identification + CassandraExceptionCode cassandraExceptionCode = exception.getCassandraExceptionCode(); + out.writeUnsignedVInt32(cassandraExceptionCode.value); + + String message = exception.getMessage() == null ? "" : exception.getMessage(); + out.writeUTF(message); + + // Serialize cause and suppressed exceptions using ExceptionSerializer + ExceptionSerializer.nullableRemoteExceptionSerializer.serialize(exception.getCause(), out, version); + + Throwable[] suppressed = exception.getSuppressed(); + out.writeUnsignedVInt32(suppressed.length); + for (Throwable t : suppressed) + { + ExceptionSerializer.remoteExceptionSerializer.serialize(t, out, version); + } + + // Serialize stack trace using existing ExceptionSerializer logic + StackTraceElement[] stackTrace = exception.getStackTrace(); + ArraySerializers.serializeArray(stackTrace, out, version, ExceptionSerializer.stackTraceElementSerializer); + + // Delegate to subclass serializer for type-specific fields + exception.serializeSpecificFields(out, version); + } + + @Override + public CassandraException deserialize(DataInputPlus in, int version) throws IOException + { + // Read new CassandraExceptionCode for proper type identification + int cassandraExceptionCode = in.readUnsignedVInt32(); + CassandraExceptionCode classCode = CassandraExceptionCode.fromValue(cassandraExceptionCode); + + String message = in.readUTF(); + + // Deserialize cause and suppressed exceptions using ExceptionSerializer + Throwable cause = ExceptionSerializer.nullableRemoteExceptionSerializer.deserialize(in, version); + + int suppressedCount = in.readUnsignedVInt32(); + Throwable[] suppressed = new Throwable[suppressedCount]; Review Comment: ArraySerializers, as already used for the stack trace. ########## src/java/org/apache/cassandra/exceptions/WriteFailureException.java: ########## @@ -34,4 +39,56 @@ public WriteFailureException(ConsistencyLevel consistency, int received, int blo super(ExceptionCode.WRITE_FAILURE, consistency, received, blockFor, ImmutableMap.copyOf(failureReasonByEndpoint)); this.writeType = writeType; } + + @Override + protected void serializeSpecificFields(DataOutputPlus out, int version) throws IOException + { + out.writeByte(consistency.code); + out.writeUnsignedVInt32(received); + out.writeUnsignedVInt32(blockFor); + + // Serialize failure reason map + CollectionSerializers.serializeMap(failureReasonByEndpoint, out, version, + InetAddressAndPort.Serializer.inetAddressAndPortSerializer, + RequestFailureReason.serializer); + + out.writeUTF(writeType.toString()); Review Comment: WriteType is the only place where we ser/de an enum by its string value rather than int code. Same as in `WriteTimeoutException`, possibly elsewhere too. ########## src/java/org/apache/cassandra/db/IReadResponse.java: ########## @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db; + +import java.io.IOException; + +import com.google.common.base.Preconditions; + +import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.service.reads.tracked.TrackedDataResponse; +import org.apache.cassandra.service.reads.tracked.TrackedSummaryResponse; + +import static org.apache.cassandra.db.IReadResponse.Kind.UNTRACKED; + +public interface IReadResponse +{ + enum Kind + { + UNTRACKED(0), + TRACKED_DATA(1), + TRACKED_SUMMARY(2), + NULL(3); Review Comment: `NULL` is actually never possible, from what I see. Plus the logic in serializers is slightly funny in its formatting and structure - can be simplified. ########## src/java/org/apache/cassandra/db/guardrails/Guardrail.java: ########## @@ -126,21 +141,37 @@ protected void fail(String message, @Nullable ClientState state) protected void fail(String message, String redactedMessage, @Nullable ClientState state) { - message = decorateMessage(message); + String decoratedMessage = decorateMessage(message); + String decoratedRedactedMessage = decorateMessage(redactedMessage); if (!skipNotifying(false)) { - logger.error(message); - // Note that ClientWarn will simply ignore the message if we're not running this as part of a user query - // (the internal "state" will be null) - ClientWarn.instance.warn(message); - // Similarly, tracing will also ignore the message if we're not running tracing on the current thread. - Tracing.trace(message); - GuardrailsDiagnostics.failed(name, decorateMessage(redactedMessage)); + // If we're deferring warnings (during CAS fragment creation), defer all failure outputs + if (ClientWarn.instance.isDeferring()) + { + // ClientWarn will automatically capture this to the deferred list + ClientWarn.instance.warn(decoratedMessage); Review Comment: Same as above. ########## src/java/org/apache/cassandra/exceptions/FunctionExecutionException.java: ########## @@ -50,4 +55,64 @@ public FunctionExecutionException(FunctionName functionName, List<String> argTyp this.argTypes = argTypes; this.detail = msg; } + + @Override + protected void serializeSpecificFields(DataOutputPlus out, int version) throws IOException + { + // Serialize FunctionName + out.writeBoolean(functionName.keyspace != null); + if (functionName.keyspace != null) + out.writeUTF(functionName.keyspace); + out.writeUTF(functionName.name); + + // Serialize argTypes list + out.writeUnsignedVInt32(argTypes.size()); + for (String argType : argTypes) + out.writeUTF(argType); + + // Serialize detail + out.writeUTF(detail); + } + + @Override + protected long serializedSizeSpecificFields(int version) + { + long size = TypeSizes.BOOL_SIZE; // keyspace present flag + if (functionName.keyspace != null) + size += TypeSizes.sizeof(functionName.keyspace); + size += TypeSizes.sizeof(functionName.name); + + size += TypeSizes.sizeofUnsignedVInt(argTypes.size()); // argTypes list size + for (String argType : argTypes) + size += TypeSizes.sizeof(argType); + + size += TypeSizes.sizeof(detail); + return size; + } + + static FunctionExecutionException deserializeFields(String message, DataInputPlus in, int version) throws IOException + { + // Deserialize FunctionName + boolean hasKeyspace = in.readBoolean(); + String keyspace = hasKeyspace ? in.readUTF() : null; + String name = in.readUTF(); + FunctionName functionName = new FunctionName(keyspace, name); + + // Deserialize argTypes list + int argTypesSize = in.readUnsignedVInt32(); + List<String> argTypes = new ArrayList<>(argTypesSize); + for (int i = 0; i < argTypesSize; i++) + argTypes.add(in.readUTF()); + + // Deserialize detail + String detail = in.readUTF(); + + return new FunctionExecutionException(functionName, argTypes, detail); Review Comment: I believe this is incorrect and will skip custom message logic from `create()`. Should use `create()` instead. ########## src/java/org/apache/cassandra/db/EmbeddableSinglePartitionReadCommand.java: ########## @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db; + +import java.io.IOException; + +import com.google.common.base.Preconditions; + +import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.service.reads.tracked.TrackedRead.DataRequest; +import org.apache.cassandra.service.reads.tracked.TrackedRead.SummaryRequest; + +import static org.apache.cassandra.db.EmbeddableSinglePartitionReadCommand.Kind.UNTRACKED; + +/** + * Interface for read command that allows it be serialized and embedded in another message. Used in Paxos + * to provide a common base class to serialize for tracked and untracked reads. Tracked reads contain + * additional information needed to execute the read beyond the read command itself so an additional interface + * is needed. + */ +public interface EmbeddableSinglePartitionReadCommand +{ + enum Kind + { + UNTRACKED, + TRACKED_DATA_READ, + TRACKED_SUMMARY_READ; + + public boolean isTracked() + { + return this != UNTRACKED; + } + + static final IVersionedSerializer<Kind> serializer = new IVersionedSerializer<>() + { + @Override + public void serialize(Kind kind, DataOutputPlus out, int version) throws IOException + { + switch (kind) + { + case UNTRACKED: + out.writeByte(0); Review Comment: Using a different pattern here, hard-coding int values instead of using enum codes, from how it's done in `IReadResponse.Kind`. Also, since NULL there is dead code, now these completely duplicate each other and one could be removed. ########## src/java/org/apache/cassandra/db/guardrails/GuardrailViolatedException.java: ########## @@ -18,12 +18,42 @@ package org.apache.cassandra.db.guardrails; +import org.apache.cassandra.exceptions.CassandraExceptionCode; import org.apache.cassandra.exceptions.InvalidRequestException; public class GuardrailViolatedException extends InvalidRequestException { - GuardrailViolatedException(String message) + public GuardrailViolatedException(String message) { super(message); } + + protected GuardrailViolatedException(String message, Throwable cause) + { + super(message, cause); + } + + @Override + public CassandraExceptionCode getCassandraExceptionCode() + { + return CassandraExceptionCode.GUARDRAIL_VIOLATED; + } + + /** + * Wraps a guardrail exception for deferred throwing, preserving the exception type. + * This is used when a guardrail exception is caught during CAS request preparation + * but needs to be thrown later after conditions are checked. + * + * @param original the original exception that was deferred + * @return a new exception of the same type with the original as the cause + */ + public static GuardrailViolatedException wrapForDeferredThrow(GuardrailViolatedException original) Review Comment: I believe this would be cleaner as an instance method, with an override in `PasswordGuardrailException`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]

