iamaleksey commented on code in PR #4447:
URL: https://github.com/apache/cassandra/pull/4447#discussion_r2788438042


##########
src/java/org/apache/cassandra/service/paxos/PrepareRefreshForwardHandler.java:
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.paxos;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.exceptions.RequestFailure;
+import org.apache.cassandra.exceptions.RequestFailureReason;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.RequestCallbackWithFailure;
+import org.apache.cassandra.replication.MutationId;
+import org.apache.cassandra.replication.MutationTrackingService;
+import org.apache.cassandra.schema.KeyspaceMetadata;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.service.paxos.Commit.Committed;
+import org.apache.cassandra.tcm.ClusterMetadataService;
+import org.apache.cassandra.tracing.Tracing;
+import org.apache.cassandra.utils.Clock;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.concurrent.CountDownLatch;
+
+import static org.apache.cassandra.net.Verb.PAXOS2_PREPARE_REFRESH_REQ;
+import static 
org.apache.cassandra.service.paxos.PaxosRequestCallback.shouldExecuteOnSelf;
+
+/**
+ * Handler for forwarded PaxosPrepareRefresh requests.
+ * Generates a mutation ID and sends the refresh to all target nodes.
+ *
+ * This handler is invoked when a non-replica coordinator forwards the refresh
+ * to a full replica that can generate the mutation ID.
+ */
+public class PrepareRefreshForwardHandler implements 
IVerbHandler<PrepareRefreshForwardRequest>
+{
+    public static final PrepareRefreshForwardHandler instance = new 
PrepareRefreshForwardHandler();
+    private static final Logger logger = 
LoggerFactory.getLogger(PrepareRefreshForwardHandler.class);
+
+    @Override
+    public void doVerb(Message<PrepareRefreshForwardRequest> message)
+    {
+        
ClusterMetadataService.instance().fetchLogFromPeerOrCMS(message.from(), 
message.header.epoch);
+        PrepareRefreshForwardRequest request = message.payload;
+
+        Tracing.trace("Executing forwarded PaxosPrepareRefresh for {}", 
request.commit.partitionKey());
+
+        try
+        {
+            String ksName = request.commit.metadata().keyspace;
+            KeyspaceMetadata ksMetadata = 
Schema.instance.getKeyspaceMetadata(ksName);
+            if (ksMetadata == null)
+            {
+                
MessagingService.instance().respondWithFailure(RequestFailureReason.INCOMPATIBLE_SCHEMA,
 message);
+                logger.error("Failed to forward paxos prepare refresh for 
non-existent keyspace {}", ksName);
+                return;
+            }
+
+            if (!ksMetadata.params.replicationType.isTracked())
+            {
+                throw new IllegalStateException("Asked to perform forwarded 
prepare refresh, but keyspace " + ksName + " is not tracked");
+            }
+
+            Token token = request.commit.partitionKey().getToken();
+            MutationId mutationId = 
MutationTrackingService.instance.nextMutationId(ksName, token);
+
+            Mutation mutationWithId = request.commit.makeMutation(mutationId);
+            Committed commitWithId = new 
Commit.Committed(request.commit.ballot, mutationWithId);
+
+            // Now send the refresh to all targets and collect responses
+            List<InetAddressAndPort> targets = request.refreshTargets;
+            List<Ballot> supersededBy = Collections.synchronizedList(new 
ArrayList<>(Collections.nCopies(targets.size(), null)));
+            CountDownLatch latch = 
CountDownLatch.newCountDownLatch(targets.size());
+
+            Message<PaxosPrepareRefresh.Request> refreshMsg = Message.out(
+                PAXOS2_PREPARE_REFRESH_REQ,
+                new PaxosPrepareRefresh.Request(request.promised, 
commitWithId),
+                request.isUrgent
+            );
+
+            // For tracked keyspaces, we MUST ALWAYS write to the local 
journal since we generated the mutation ID.
+            // This is required for retry purposes: if a remote target fails, 
the ActiveLogReconciler will try
+            // to look up the mutation in the local journal. The node that 
generated the mutation ID is the "owner"
+            // and must have the mutation available for retries.
+            //
+            // This is different from checking if self is in targets - even if 
we're not in targets,
+            // we're still the ID generator and need the mutation locally.
+            try
+            {
+                PaxosPrepareRefresh.RequestHandler.execute(
+                    new PaxosPrepareRefresh.Request(request.promised, 
commitWithId), FBUtilities.getBroadcastAddressAndPort());
+                // Note: we don't use the response since this node may not be 
in targets
+            }
+            catch (Exception e)
+            {
+                // Log but continue - we still need to send to targets
+                logger.warn("Failed to execute local commit for tracked 
keyspace mutation {}", mutationId, e);
+            }
+
+            // Check if self is in targets for response tracking (separate 
from the local write above)
+            // We need to decrement the latch for the local target since we 
already executed above
+            for (int i = 0; i < targets.size(); i++)
+            {
+                if (shouldExecuteOnSelf(targets.get(i)))
+                {
+                    // Already executed locally above, just decrement latch
+                    latch.decrement();
+                    break;
+                }
+            }

Review Comment:
   No reason for this to be a separate loop?



##########
src/java/org/apache/cassandra/service/paxos/Paxos2CommitForwardHandler.java:
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.paxos;
+
+import java.util.function.Consumer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.exceptions.RequestFailure;
+import org.apache.cassandra.exceptions.RequestFailureReason;
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.NoPayload;
+import org.apache.cassandra.schema.KeyspaceMetadata;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.tcm.ClusterMetadataService;
+import org.apache.cassandra.tracing.Tracing;
+import org.apache.cassandra.utils.concurrent.ConditionAsConsumer;
+
+import static 
org.apache.cassandra.utils.concurrent.ConditionAsConsumer.newConditionAsConsumer;
+
+/**
+ * Handler for forwarded Paxos V2 commit requests.
+ * Executes the commit operation on behalf of the original coordinator,
+ * ensuring that MutationId generation happens on a replica coordinator.
+ *
+ * The PaxosCommit constructor handles mutation ID generation, so this handler
+ * simply delegates to PaxosCommit.commit() with the original commit.
+ */
+public class Paxos2CommitForwardHandler implements 
IVerbHandler<Paxos2CommitForwardRequest>
+{
+    public static final Paxos2CommitForwardHandler instance = new 
Paxos2CommitForwardHandler();
+    private static final Logger logger = 
LoggerFactory.getLogger(Paxos2CommitForwardHandler.class);
+
+    @Override
+    public void doVerb(Message<Paxos2CommitForwardRequest> message)
+    {
+        // Ensure we have up-to-date cluster metadata before executing the 
forwarded commit
+        
ClusterMetadataService.instance().fetchLogFromPeerOrCMS(message.from(), 
message.header.epoch);
+        Paxos2CommitForwardRequest request = message.payload;
+
+        Tracing.trace("Executing forwarded Paxos V2 commit for {}", 
request.commit.partitionKey());
+
+        try
+        {
+            String ksName = request.commit.metadata().keyspace;
+            KeyspaceMetadata ksMetadata = 
Schema.instance.getKeyspaceMetadata(ksName);
+            if (ksMetadata == null)
+            {
+                
MessagingService.instance().respondWithFailure(RequestFailureReason.INCOMPATIBLE_SCHEMA,
 message);
+                logger.error("Failed to forward paxos commit for non-existent 
keyspace {}", ksName);
+                return;
+            }
+
+            if (!ksMetadata.params.replicationType.isTracked())
+                throw new IllegalStateException("Asked to perform forwarded 
commit, but keyspace " + ksName + " is not tracked");

Review Comment:
   This is inconsistent with what we do in `CasReadForwardHandler`



##########
src/java/org/apache/cassandra/service/paxos/PrepareRefreshForwardHandler.java:
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.paxos;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.exceptions.RequestFailure;
+import org.apache.cassandra.exceptions.RequestFailureReason;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.RequestCallbackWithFailure;
+import org.apache.cassandra.replication.MutationId;
+import org.apache.cassandra.replication.MutationTrackingService;
+import org.apache.cassandra.schema.KeyspaceMetadata;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.service.paxos.Commit.Committed;
+import org.apache.cassandra.tcm.ClusterMetadataService;
+import org.apache.cassandra.tracing.Tracing;
+import org.apache.cassandra.utils.Clock;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.concurrent.CountDownLatch;
+
+import static org.apache.cassandra.net.Verb.PAXOS2_PREPARE_REFRESH_REQ;
+import static 
org.apache.cassandra.service.paxos.PaxosRequestCallback.shouldExecuteOnSelf;
+
+/**
+ * Handler for forwarded PaxosPrepareRefresh requests.
+ * Generates a mutation ID and sends the refresh to all target nodes.
+ *
+ * This handler is invoked when a non-replica coordinator forwards the refresh
+ * to a full replica that can generate the mutation ID.
+ */
+public class PrepareRefreshForwardHandler implements 
IVerbHandler<PrepareRefreshForwardRequest>
+{
+    public static final PrepareRefreshForwardHandler instance = new 
PrepareRefreshForwardHandler();
+    private static final Logger logger = 
LoggerFactory.getLogger(PrepareRefreshForwardHandler.class);
+
+    @Override
+    public void doVerb(Message<PrepareRefreshForwardRequest> message)
+    {
+        
ClusterMetadataService.instance().fetchLogFromPeerOrCMS(message.from(), 
message.header.epoch);
+        PrepareRefreshForwardRequest request = message.payload;
+
+        Tracing.trace("Executing forwarded PaxosPrepareRefresh for {}", 
request.commit.partitionKey());
+
+        try
+        {
+            String ksName = request.commit.metadata().keyspace;
+            KeyspaceMetadata ksMetadata = 
Schema.instance.getKeyspaceMetadata(ksName);
+            if (ksMetadata == null)
+            {
+                
MessagingService.instance().respondWithFailure(RequestFailureReason.INCOMPATIBLE_SCHEMA,
 message);
+                logger.error("Failed to forward paxos prepare refresh for 
non-existent keyspace {}", ksName);
+                return;
+            }
+
+            if (!ksMetadata.params.replicationType.isTracked())
+            {
+                throw new IllegalStateException("Asked to perform forwarded 
prepare refresh, but keyspace " + ksName + " is not tracked");

Review Comment:
   Should be replying with failure?



##########
src/java/org/apache/cassandra/service/paxos/PaxosRequestCallback.java:
##########
@@ -80,35 +82,59 @@ protected <I> void executeOnSelf(I parameter, BiFunction<I, 
InetAddressAndPort,
         onResponse(response, getBroadcastAddressAndPort());
     }
 
-    protected <I, J> void executeOnSelf(I parameter1, J parameter2, 
TriFunction<I, J, InetAddressAndPort, T> execute)
+    protected <I, J> void executeOnSelfAsync(I parameter1, J parameter2, 
BiFunction<I, J, Future<T>> execute)
     {
-        T response;
         try
         {
-            response = execute.apply(parameter1, parameter2, 
getBroadcastAddressAndPort());
-            if (response == null)
+            Future<T> responseFuture = execute.apply(parameter1, parameter2);
+            if (responseFuture == null)
                 return;
-        }
-        catch (RetryOnDifferentSystemException e)
-        {
-            onFailure(getBroadcastAddressAndPort(), 
RequestFailure.RETRY_ON_DIFFERENT_TRANSACTION_SYSTEM);
-            return;
+
+            if (responseFuture.isDone())
+            {
+                // Fast path: future already complete
+                T response = getUnchecked(responseFuture);
+                onResponse(response, getBroadcastAddressAndPort());
+            }
+            else
+            {
+                // Async path: add callback for when future completes
+                responseFuture.addCallback((response, failure) -> {
+                    if (failure != null)
+                    {
+                        RequestFailure reason = UNKNOWN;
+                        if (failure instanceof WriteTimeoutException) reason = 
TIMEOUT;
+                        else if (failure instanceof 
RetryOnDifferentSystemException)
+                        {
+                            onFailure(getBroadcastAddressAndPort(), 
RequestFailure.RETRY_ON_DIFFERENT_TRANSACTION_SYSTEM);
+                            return;
+                        }
+                        else logger.error("Failed to apply {} locally", 
parameter1, failure);
+                        onFailure(getBroadcastAddressAndPort(), reason);
+                    }
+                    else
+                    {
+                        onResponse(response, getBroadcastAddressAndPort());
+                    }
+                });
+            }
         }
         catch (Exception ex)
         {
             RequestFailure reason = UNKNOWN;
             if (ex instanceof WriteTimeoutException) reason = TIMEOUT;
-            else logger.error("Failed to apply {}, {} locally", parameter1, 
parameter2, ex);
-
+            else if (ex instanceof RetryOnDifferentSystemException)
+            {
+                onFailure(getBroadcastAddressAndPort(), 
RequestFailure.RETRY_ON_DIFFERENT_TRANSACTION_SYSTEM);

Review Comment:
   onFailure() below can handle this



##########
src/java/org/apache/cassandra/service/paxos/ConsensusReadForwardResponse.java:
##########
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.paxos;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.db.partitions.FilteredPartition;
+import org.apache.cassandra.db.partitions.PartitionIterator;
+import org.apache.cassandra.db.partitions.PartitionIterators;
+import org.apache.cassandra.db.rows.RowIterator;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.db.rows.UnfilteredRowIteratorSerializer;
+import org.apache.cassandra.db.rows.UnfilteredRowIterators;
+import org.apache.cassandra.exceptions.CassandraException;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.utils.CollectionSerializers;
+
+import static 
org.apache.cassandra.db.SerializationHeader.StableHeaderSerializer.STABLE;
+import static 
org.apache.cassandra.db.rows.DeserializationHelper.Flag.FROM_REMOTE;
+
+/**
+ * Response containing the result of a forwarded consensus read operation.
+ * Can contain either a successful result or an exception that occurred during 
execution.
+ *
+ * Since consensus reads are single partition reads, we store a single 
RowIterator
+ * and wrap it as a PartitionIterator when accessed.
+ */
+public class ConsensusReadForwardResponse
+{
+    public static final Serializer serializer = new Serializer();
+
+    // Store as RowIterator internally (single partition) but expose as 
PartitionIterator
+    private final RowIterator rowIterator;
+    public final CassandraException exception;
+    public final List<String> warnings;
+
+    public ConsensusReadForwardResponse(PartitionIterator result, List<String> 
warnings)
+    {
+        // Extract the single partition from the iterator (consensus reads are 
single partition)
+        if (result != null && result.hasNext())
+        {
+            this.rowIterator = result.next();
+        }
+        else
+        {
+            this.rowIterator = null;
+        }
+        this.exception = null;
+        this.warnings = warnings;
+    }
+
+    public ConsensusReadForwardResponse(CassandraException exception, 
List<String> warnings)
+    {
+        this.rowIterator = null;
+        this.exception = exception;
+        this.warnings = warnings;
+    }
+
+    // Private constructor for deserialization
+    private ConsensusReadForwardResponse(RowIterator rowIterator, 
CassandraException exception, List<String> warnings)
+    {
+        this.rowIterator = rowIterator;
+        this.exception = exception;
+        this.warnings = warnings;
+    }
+
+    public boolean isSuccess()
+    {
+        return exception == null;
+    }
+
+    /**
+     * Get the result as a PartitionIterator.
+     */
+    public PartitionIterator getResult()
+    {
+        if (rowIterator == null)
+            return null;
+        return PartitionIterators.singletonIterator(rowIterator);
+    }
+
+    public static class Serializer implements 
IVersionedSerializer<ConsensusReadForwardResponse>
+    {
+        @Override
+        public void serialize(ConsensusReadForwardResponse response, 
DataOutputPlus out, int version) throws IOException
+        {
+            out.writeBoolean(response.exception != null);

Review Comment:
   Actually the entire class is 99% identical to `CasForwardResponse`, so I'm 
ditching one of them.



##########
src/java/org/apache/cassandra/service/paxos/PaxosRequestCallback.java:
##########
@@ -80,35 +82,59 @@ protected <I> void executeOnSelf(I parameter, BiFunction<I, 
InetAddressAndPort,
         onResponse(response, getBroadcastAddressAndPort());
     }
 
-    protected <I, J> void executeOnSelf(I parameter1, J parameter2, 
TriFunction<I, J, InetAddressAndPort, T> execute)
+    protected <I, J> void executeOnSelfAsync(I parameter1, J parameter2, 
BiFunction<I, J, Future<T>> execute)
     {
-        T response;
         try
         {
-            response = execute.apply(parameter1, parameter2, 
getBroadcastAddressAndPort());
-            if (response == null)
+            Future<T> responseFuture = execute.apply(parameter1, parameter2);
+            if (responseFuture == null)
                 return;
-        }
-        catch (RetryOnDifferentSystemException e)
-        {
-            onFailure(getBroadcastAddressAndPort(), 
RequestFailure.RETRY_ON_DIFFERENT_TRANSACTION_SYSTEM);
-            return;
+
+            if (responseFuture.isDone())
+            {
+                // Fast path: future already complete
+                T response = getUnchecked(responseFuture);
+                onResponse(response, getBroadcastAddressAndPort());
+            }
+            else
+            {
+                // Async path: add callback for when future completes
+                responseFuture.addCallback((response, failure) -> {
+                    if (failure != null)
+                    {
+                        RequestFailure reason = UNKNOWN;
+                        if (failure instanceof WriteTimeoutException) reason = 
TIMEOUT;
+                        else if (failure instanceof 
RetryOnDifferentSystemException)
+                        {
+                            onFailure(getBroadcastAddressAndPort(), 
RequestFailure.RETRY_ON_DIFFERENT_TRANSACTION_SYSTEM);

Review Comment:
   onFailure() below can handle this



##########
src/java/org/apache/cassandra/service/paxos/PaxosCommitForwardHandler.java:
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.paxos;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.exceptions.RequestFailure;
+import org.apache.cassandra.exceptions.RequestFailureReason;
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.NoPayload;
+import org.apache.cassandra.service.StorageProxy;
+import org.apache.cassandra.tcm.ClusterMetadataService;
+import org.apache.cassandra.tracing.Tracing;
+import org.apache.cassandra.transport.Dispatcher;
+
+/**
+ * Handler for forwarded Paxos V1 commit requests.
+ * Executes the commit operation on behalf of the original coordinator,
+ * ensuring that MutationId generation happens on a replica coordinator.
+ */
+public class PaxosCommitForwardHandler implements 
IVerbHandler<PaxosCommitForwardRequest>
+{
+    public static final PaxosCommitForwardHandler instance = new 
PaxosCommitForwardHandler();
+    private static final Logger logger = 
LoggerFactory.getLogger(PaxosCommitForwardHandler.class);
+
+    @Override
+    public void doVerb(Message<PaxosCommitForwardRequest> message)
+    {
+        // PaxosV1 when doing commit picks whatever the current replicas are 
to send the commits to
+        // so make sure we at least match what they would have picked
+        
ClusterMetadataService.instance().fetchLogFromPeerOrCMS(message.from(), 
message.header.epoch);
+        PaxosCommitForwardRequest request = message.payload;
+
+        Tracing.trace("Executing forwarded Paxos commit for {}", 
request.proposal.partitionKey());
+
+        try
+        {
+            String ksName = request.proposal.metadata().keyspace;
+            Keyspace keyspace = Keyspace.openIfExists(ksName);
+            if (keyspace == null)
+            {
+                
MessagingService.instance().respondWithFailure(RequestFailureReason.INCOMPATIBLE_SCHEMA,
 message);
+                logger.error("Failed to forward paxos commit for non-existent 
keyspace {}", ksName);
+                return;
+            }
+
+            if (!keyspace.getMetadata().params.replicationType.isTracked())

Review Comment:
   This is also inconsistent with what we do in CasReadForwardHandler



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to