iamaleksey commented on code in PR #4447:
URL: https://github.com/apache/cassandra/pull/4447#discussion_r2754604965


##########
src/java/org/apache/cassandra/service/RemoteClientState.java:
##########
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import org.apache.cassandra.auth.AuthenticatedUser;
+import org.apache.cassandra.auth.IResource;
+import org.apache.cassandra.auth.Permission;
+import org.apache.cassandra.cql3.functions.Function;
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.utils.MD5Digest;
+
+/**
+ * A serializable version of ClientState designed for forwarding operations to 
remote coordinators.
+ *
+ * This class contains only the essential authorization attributes needed for 
forwarded operations,
+ * avoiding the complex serialization challenges of the full AuthenticatedUser 
object.
+ *
+ * Key design principles:
+ * - Contains only serializable authorization state (isSuper, isInternal, etc.)
+ * - Supports guardrail enforcement through isOrdinaryUser() and 
applyGuardrails()
+ * - Throws exceptions for operations requiring the full user object 
(getUser(), authorize())
+ * - Used specifically for CAS and consensus read forwarding in tracked 
keyspaces
+ */
+public class RemoteClientState extends ClientState
+{
+    public static final Serializer serializer = new Serializer();
+
+    // Essential serializable state for authorization context
+    private final boolean isSuperCached;
+    private final String userName; // For logging/error messages (optional)
+
+    /**
+     * Creates a RemoteClientState from a local ClientState for forwarding.
+     * Extracts only the essential authorization attributes.
+     */
+    public static RemoteClientState from(ClientState localState)
+    {
+        if (!(localState instanceof LocalClientState))
+        {
+            throw new IllegalArgumentException("Cannot create 
RemoteClientState from non-LocalClientState: " + localState.getClass());
+        }
+        LocalClientState local = (LocalClientState) localState;
+
+        return new RemoteClientState(
+            localState.isSuper(),
+            localState.isSystem(),
+            localState.applyGuardrails(),
+            localState.getRawKeyspace(),
+            local.getUser() != null ? local.getUser().getName() : null
+        );
+    }
+
+    /**
+     * Private constructor for deserialization and from() method.
+     */
+    private RemoteClientState(boolean isSuper, boolean isInternal, boolean 
applyGuardrails, String keyspace, String userName)
+    {
+        super(isInternal);
+        this.isSuperCached = isSuper;
+        this.userName = userName;
+
+        // Set the cached guardrail and keyspace state in the base class
+        if (!applyGuardrails)
+            pauseGuardrails();
+        if (keyspace != null)
+            setRawKeyspace(keyspace);
+    }
+
+    // ==========================================
+    // Cached user attributes (implemented with cached values)
+    // ==========================================
+
+    @Override
+    public boolean isSuper()
+    {
+        return isSuperCached;
+    }
+
+    // ==========================================
+    // Methods that throw exceptions for remote operations
+    // ==========================================
+
+    @Override
+    public AuthenticatedUser getUser()
+    {
+        throw new UnsupportedOperationException("Cannot access user object in 
remote client state. " +
+                                              "Full user operations must be 
performed on the original coordinator.");
+    }
+
+    @Override
+    public void login(AuthenticatedUser user)
+    {
+        throw new UnsupportedOperationException("Cannot login on remote client 
state.");
+    }
+
+    @Override
+    public void setKeyspace(String ks)
+    {
+        throw new UnsupportedOperationException("Cannot modify keyspace on 
remote client state.");
+    }
+
+    @Override
+    protected Set<Permission> authorize(IResource resource)
+    {
+        throw new UnsupportedOperationException("Cannot perform authorization 
in remote client state. " +
+                                              "Authorization must be performed 
on the original coordinator before forwarding.");
+    }
+
+    @Override
+    public void ensurePermission(Permission perm, IResource resource)
+    {
+        throw new UnsupportedOperationException("Cannot perform authorization 
in remote client state. " +
+                                              "Authorization must be performed 
on the original coordinator before forwarding.");
+    }
+
+    @Override
+    public void ensurePermission(Permission permission, Function function)
+    {
+        throw new UnsupportedOperationException("Cannot perform authorization 
in remote client state. " +
+                                              "Authorization must be performed 
on the original coordinator before forwarding.");
+    }
+
+    @Override
+    public boolean hasTablePermission(TableMetadata table, Permission perm)
+    {
+        throw new UnsupportedOperationException("Cannot check table 
permissions in remote client state. " +
+                                              "Permission checks must be 
performed on the original coordinator before forwarding.");
+    }
+
+    @Override
+    public void validateLogin()
+    {
+        throw new UnsupportedOperationException("Cannot validate login in 
remote client state. " +
+                                              "Login validation must be 
performed on the original coordinator before forwarding.");
+    }
+
+    @Override
+    public void ensureNotAnonymous()
+    {
+        throw new UnsupportedOperationException("Cannot validate anonymous 
status in remote client state. " +
+                                              "Anonymous validation must be 
performed on the original coordinator before forwarding.");
+    }
+
+    @Override
+    public InetSocketAddress getRemoteAddress()
+    {
+        throw new UnsupportedOperationException("Remote address not available 
in remote client state.");
+    }
+
+    @Override
+    public InetAddress getClientAddress()
+    {
+        throw new UnsupportedOperationException("Client address not available 
in remote client state.");
+    }
+
+    @Override
+    public Optional<String> getDriverName()
+    {
+        throw new UnsupportedOperationException("Driver information not 
available in remote client state.");
+    }
+
+    @Override
+    public Optional<String> getDriverVersion()
+    {
+        throw new UnsupportedOperationException("Driver information not 
available in remote client state.");
+    }
+
+    @Override
+    public Optional<Map<String, String>> getClientOptions()
+    {
+        throw new UnsupportedOperationException("Client options not available 
in remote client state.");
+    }
+
+    @Override
+    public void setDriverName(String driverName)
+    {
+        throw new UnsupportedOperationException("Cannot modify driver 
information in remote client state.");
+    }
+
+    @Override
+    public void setDriverVersion(String driverVersion)
+    {
+        throw new UnsupportedOperationException("Cannot modify driver 
information in remote client state.");
+    }
+
+    @Override
+    public void setClientOptions(Map<String, String> clientOptions)
+    {
+        throw new UnsupportedOperationException("Cannot modify client options 
in remote client state.");
+    }
+
+    @Override
+    public void warnAboutUseWithPreparedStatements(MD5Digest statementId, 
String preparedKeyspace)
+    {
+        throw new UnsupportedOperationException("Cannot issue warnings in 
remote client state. " +
+                                              "Warnings should be issued on 
the original coordinator.");
+    }
+
+    @Override
+    public void warnAboutUneligiblePreparedStatement(MD5Digest statementId)
+    {
+        throw new UnsupportedOperationException("Cannot issue warnings in 
remote client state. " +
+                                              "Warnings should be issued on 
the original coordinator.");
+    }
+
+    /**
+     * Get the user name for logging purposes (if available).
+     */
+    public String getUserNameForLogging()
+    {
+        return userName != null ? userName : "unknown";
+    }

Review Comment:
   Dead code.



##########
.gitignore:
##########
@@ -71,6 +71,7 @@ target/
 .DS_Store
 Thumbs.db
 .ccm/
+CLAUDE.md

Review Comment:
   Probably best to put it into the local `
   .git/info/exclude` instead for now.



##########
src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java:
##########
@@ -580,4 +586,234 @@ public ConsensusAttemptResult toCasResult(TxnResult 
txnResult)
         TxnDataKeyValue partition = 
(TxnDataKeyValue)txnData.get(txnDataName(CAS_READ));
         return casResult(partition != null ? partition.rowIterator(false) : 
null);
     }
+
+    /**
+     * IVersionedSerializer for CQL3CasRequest to enable CAS forwarding 
between coordinators.
+     *
+     */
+    public static class Serializer implements 
IVersionedSerializer<CQL3CasRequest>
+    {
+        public static final Serializer instance = new Serializer();
+
+        @Override
+        public void serialize(CQL3CasRequest request, DataOutputPlus out, int 
version) throws IOException
+        {
+            // Serialize table metadata using compact form
+            request.metadata.id.serializeCompact(out);
+
+            // Serialize partition key
+            DecoratedKey.serializer.serialize(request.key, out, version);
+
+            // Serialize condition columns - serialize statics and regulars 
separately
+            Columns.serializer.serialize(request.conditionColumns.statics, 
out);
+            Columns.serializer.serialize(request.conditionColumns.regulars, 
out);
+
+            // Serialize boolean flags as bit field
+            byte flags = (byte) ((request.updatesRegularRows ? 0x01 : 0) |
+                                (request.updatesStaticRow ? 0x02 : 0) |
+                                (request.hasExists ? 0x04 : 0) |
+                                (request.conditionCheckOnly ? 0x08 : 0));
+            out.writeByte(flags);
+
+            // Serialize static conditions (nullable RowCondition)
+            serializeRowCondition(request.staticConditions, out, version);
+
+            // Serialize conditions map
+            out.writeUnsignedVInt32(request.conditions.size());
+            for (Map.Entry<Clustering<?>, RowCondition> entry : 
request.conditions.entrySet())
+            {
+                Clustering.serializer.serialize(entry.getKey(), out, version, 
request.metadata.comparator.subtypes());
+                serializeRowCondition(entry.getValue(), out, version);
+            }
+
+            // Serialize write fragments (replaces updates and range deletions)
+            out.writeUnsignedVInt32(request.writeFragments.size());
+            TableMetadatas tableMetadatas = 
TableMetadatas.of(request.metadata);
+            for (TxnWrite.Fragment fragment : request.writeFragments)
+            {
+                TxnWrite.Fragment.serializer.serialize(fragment, 
tableMetadatas, out, Version.findBestMatchForMessagingVersion(version));
+            }
+        }
+
+        @Override
+        public CQL3CasRequest deserialize(DataInputPlus in, int version) 
throws IOException
+        {
+            // Deserialize table metadata
+            TableId tableId = TableId.deserializeCompact(in);
+            TableMetadata metadata = Schema.instance.getTableMetadata(tableId);
+            if (metadata == null)
+                throw new IOException("Unknown table ID in CQL3CasRequest 
deserialization: " + tableId);
+
+            // Deserialize partition key
+            DecoratedKey key = (DecoratedKey) 
DecoratedKey.serializer.deserialize(in, version);
+
+            // Deserialize condition columns
+            Columns statics = Columns.serializer.deserialize(in, metadata);
+            Columns regulars = Columns.serializer.deserialize(in, metadata);
+            RegularAndStaticColumns conditionColumns = new 
RegularAndStaticColumns(statics, regulars);
+
+            // Deserialize flags
+            byte flags = in.readByte();
+            boolean updatesRegularRows = (flags & 0x01) != 0;
+            boolean updatesStaticRow = (flags & 0x02) != 0;
+            boolean hasExists = (flags & 0x04) != 0;
+            boolean conditionCheckOnly = (flags & 0x08) != 0;
+
+            // Create the CQL3CasRequest
+            CQL3CasRequest request = new CQL3CasRequest(metadata, key, 
conditionColumns,
+                                                      updatesRegularRows, 
updatesStaticRow);
+            request.hasExists = hasExists;
+            request.conditionCheckOnly = conditionCheckOnly;
+
+            // Deserialize static conditions
+            request.staticConditions = deserializeRowCondition(in, version, 
metadata, Clustering.STATIC_CLUSTERING);
+
+            // Deserialize conditions map
+            int conditionsCount = in.readUnsignedVInt32();
+            for (int i = 0; i < conditionsCount; i++)
+            {
+                Clustering<?> clustering = 
Clustering.serializer.deserialize(in, version, metadata.comparator.subtypes());
+                RowCondition condition = deserializeRowCondition(in, version, 
metadata, clustering);
+                request.conditions.put(clustering, condition);
+            }
+
+            // Deserialize write fragments
+            int fragmentCount = in.readUnsignedVInt32();
+            TableMetadatas tableMetadatas = TableMetadatas.of(metadata);
+            for (int i = 0; i < fragmentCount; i++)
+            {
+                PartitionKey partitionKey = new PartitionKey(metadata.id, 
request.key);
+                TxnWrite.Fragment fragment = 
TxnWrite.Fragment.serializer.deserialize(partitionKey, tableMetadatas, in, 
Version.findBestMatchForMessagingVersion(version));
+                request.writeFragments.add(fragment);
+            }
+
+            return request;
+        }
+
+        @Override
+        public long serializedSize(CQL3CasRequest request, int version)
+        {
+            long size = 0;
+
+            // Table metadata (compact form)
+            size += request.metadata.id.serializedCompactSize();
+
+            // Partition key
+            size += DecoratedKey.serializer.serializedSize(request.key, 
version);
+
+            // Condition columns - statics and regulars separately
+            size += 
Columns.serializer.serializedSize(request.conditionColumns.statics);
+            size += 
Columns.serializer.serializedSize(request.conditionColumns.regulars);
+
+            // Flags byte
+            size += 1;
+
+            // Static conditions
+            size += rowConditionSize(request.staticConditions, version);
+
+            // Conditions map
+            size += TypeSizes.sizeofUnsignedVInt(request.conditions.size());
+            for (Map.Entry<Clustering<?>, RowCondition> entry : 
request.conditions.entrySet())
+            {
+                size += Clustering.serializer.serializedSize(entry.getKey(), 
version, request.metadata.comparator.subtypes());
+                size += rowConditionSize(entry.getValue(), version);
+            }
+
+            // Write fragments (replaces updates and range deletions)
+            size += 
TypeSizes.sizeofUnsignedVInt(request.writeFragments.size());
+            for (TxnWrite.Fragment fragment : request.writeFragments)
+            {
+                size += TxnWrite.Fragment.serializer.serializedSize(fragment, 
TableMetadatas.of(request.metadata), 
Version.findBestMatchForMessagingVersion(version));
+            }
+
+            return size;
+        }
+
+        // Helper methods for RowCondition serialization
+        private void serializeRowCondition(RowCondition condition, 
DataOutputPlus out, int version) throws IOException
+        {
+            if (condition == null)
+            {
+                out.writeByte(0); // NULL_TYPE
+                return;
+            }
+
+            if (condition instanceof NotExistCondition)
+            {
+                out.writeByte(1); // NOT_EXIST_TYPE
+                // Don't serialize clustering here - it's already serialized 
in the conditions map
+            }
+            else if (condition instanceof ExistCondition)
+            {
+                out.writeByte(2); // EXIST_TYPE
+                // Don't serialize clustering here - it's already serialized 
in the conditions map
+            }
+            else if (condition instanceof ColumnsConditions)
+            {
+                out.writeByte(3); // COLUMNS_CONDITIONS_TYPE
+                ColumnsConditions cc = (ColumnsConditions) condition;
+                out.writeUnsignedVInt32(cc.conditions.size());
+
+                // Serialize each ColumnCondition.Bound using adapted pattern
+                for (ColumnCondition.Bound bound : cc.conditions)
+                {
+                    ColumnCondition.Bound.serializer.serialize(bound, 
TableMetadatas.of(bound.table), out);
+                }
+            }
+            else
+            {
+                throw new IOException("Unknown RowCondition type: " + 
condition.getClass());
+            }
+        }
+
+        private RowCondition deserializeRowCondition(DataInputPlus in, int 
version, TableMetadata metadata, Clustering<?> clustering) throws IOException
+        {
+            byte type = in.readByte();
+            switch (type)
+            {
+                case 0: // NULL_TYPE
+                    return null;
+                case 1: // NOT_EXIST_TYPE
+                    return new NotExistCondition(clustering);
+                case 2: // EXIST_TYPE
+                    return new ExistCondition(clustering);
+                case 3: // COLUMNS_CONDITIONS_TYPE
+                    int conditionsCount = in.readUnsignedVInt32();
+                    ColumnsConditions columnsConditions = new 
ColumnsConditions(clustering);
+
+                    // Deserialize each ColumnCondition.Bound
+                    for (int i = 0; i < conditionsCount; i++)
+                    {
+                        ColumnCondition.Bound bound = 
ColumnCondition.Bound.serializer.deserialize(TableMetadatas.of(metadata), in);
+                        columnsConditions.conditions.add(bound);
+                    }
+
+                    return columnsConditions;
+                default:
+                    throw new IOException("Unknown RowCondition type: " + 
type);
+            }
+        }
+
+        private long rowConditionSize(RowCondition condition, int version)
+        {
+            if (condition == null)
+                return 1; // type byte

Review Comment:
   Can drop this if-return, `instanceof` handles null just fine.



##########
src/java/org/apache/cassandra/service/ClientState.java:
##########
@@ -182,70 +165,50 @@ public static void resetLastTimestamp(long nowMillis)
     }
 
     /**
-     * Construct a new, empty ClientState for internal calls.
+     * Protected constructor for subclasses.
      */
-    private ClientState()
-    {
-        this.isInternal = true;
-        this.remoteAddress = null;
-    }
-
-    protected ClientState(InetSocketAddress remoteAddress)
+    protected ClientState(boolean isInternal)
     {
-        this.isInternal = false;
-        this.remoteAddress = remoteAddress;
-        if (!DatabaseDescriptor.getAuthenticator().requireAuthentication())
-            this.user = AuthenticatedUser.ANONYMOUS_USER;
-    }
-
-    protected ClientState(ClientState source)
-    {
-        this.isInternal = source.isInternal;
-        this.remoteAddress = source.remoteAddress;
-        this.user = source.user;
-        this.keyspace = source.keyspace;
-        this.driverName = source.driverName;
-        this.driverVersion = source.driverVersion;
-        this.clientOptions = source.clientOptions;
+        this.isInternal = isInternal;
     }
 
     /**
-     * @return a ClientState object for internal C* calls (not limited by any 
kind of auth).
+     * @return a LocalClientState object for internal C* calls (not limited by 
any kind of auth).
      */
     public static ClientState forInternalCalls()
     {
-        return new ClientState();
+        return LocalClientState.forInternalCalls();
     }
 
     public static ClientState forInternalCalls(String keyspace)
     {
-        ClientState state = new ClientState();
-        state.setKeyspace(keyspace);
-        return state;
+        return LocalClientState.forInternalCalls(keyspace);
     }
 
     /**
-     * @return a ClientState object for external clients (native protocol 
users).
+     * @return a LocalClientState object for external clients (native protocol 
users).
      */
     public static ClientState forExternalCalls(SocketAddress remoteAddress)
     {
-        return new ClientState((InetSocketAddress)remoteAddress);
+        return LocalClientState.forExternalCalls(remoteAddress);
     }
 
     /**
      * Clone this ClientState object, but use the provided keyspace instead of 
the
      * keyspace in this ClientState object.
      *
-     * @return a new ClientState object if the keyspace argument is non-null. 
Otherwise do not clone
+     * @return a new LocalClientState object if the keyspace argument is 
non-null. Otherwise do not clone
      *   and return this ClientState object.
      */
     public ClientState cloneWithKeyspaceIfSet(String keyspace)
     {
         if (keyspace == null)
             return this;
-        ClientState clientState = new ClientState(this);
-        clientState.setKeyspace(keyspace);
-        return clientState;
+        if (this instanceof LocalClientState)
+        {
+            return ((LocalClientState) this).cloneWithKeyspaceIfSet(keyspace);
+        }
+        throw new UnsupportedOperationException("Clone not supported for " + 
this.getClass().getSimpleName());

Review Comment:
   Should just be made abstract and have `RemoteClientState` impl throw an 
unsupported, like the other methods around here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to