iamaleksey commented on code in PR #4447: URL: https://github.com/apache/cassandra/pull/4447#discussion_r2788438042
########## src/java/org/apache/cassandra/service/paxos/PrepareRefreshForwardHandler.java: ########## @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service.paxos; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.db.Mutation; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.exceptions.RequestFailure; +import org.apache.cassandra.exceptions.RequestFailureReason; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.net.IVerbHandler; +import org.apache.cassandra.net.Message; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.net.RequestCallbackWithFailure; +import org.apache.cassandra.replication.MutationId; +import org.apache.cassandra.replication.MutationTrackingService; +import org.apache.cassandra.schema.KeyspaceMetadata; +import org.apache.cassandra.schema.Schema; +import org.apache.cassandra.service.paxos.Commit.Committed; +import org.apache.cassandra.tcm.ClusterMetadataService; +import org.apache.cassandra.tracing.Tracing; +import org.apache.cassandra.utils.Clock; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.concurrent.CountDownLatch; + +import static org.apache.cassandra.net.Verb.PAXOS2_PREPARE_REFRESH_REQ; +import static org.apache.cassandra.service.paxos.PaxosRequestCallback.shouldExecuteOnSelf; + +/** + * Handler for forwarded PaxosPrepareRefresh requests. + * Generates a mutation ID and sends the refresh to all target nodes. + * + * This handler is invoked when a non-replica coordinator forwards the refresh + * to a full replica that can generate the mutation ID. + */ +public class PrepareRefreshForwardHandler implements IVerbHandler<PrepareRefreshForwardRequest> +{ + public static final PrepareRefreshForwardHandler instance = new PrepareRefreshForwardHandler(); + private static final Logger logger = LoggerFactory.getLogger(PrepareRefreshForwardHandler.class); + + @Override + public void doVerb(Message<PrepareRefreshForwardRequest> message) + { + ClusterMetadataService.instance().fetchLogFromPeerOrCMS(message.from(), message.header.epoch); + PrepareRefreshForwardRequest request = message.payload; + + Tracing.trace("Executing forwarded PaxosPrepareRefresh for {}", request.commit.partitionKey()); + + try + { + String ksName = request.commit.metadata().keyspace; + KeyspaceMetadata ksMetadata = Schema.instance.getKeyspaceMetadata(ksName); + if (ksMetadata == null) + { + MessagingService.instance().respondWithFailure(RequestFailureReason.INCOMPATIBLE_SCHEMA, message); + logger.error("Failed to forward paxos prepare refresh for non-existent keyspace {}", ksName); + return; + } + + if (!ksMetadata.params.replicationType.isTracked()) + { + throw new IllegalStateException("Asked to perform forwarded prepare refresh, but keyspace " + ksName + " is not tracked"); + } + + Token token = request.commit.partitionKey().getToken(); + MutationId mutationId = MutationTrackingService.instance.nextMutationId(ksName, token); + + Mutation mutationWithId = request.commit.makeMutation(mutationId); + Committed commitWithId = new Commit.Committed(request.commit.ballot, mutationWithId); + + // Now send the refresh to all targets and collect responses + List<InetAddressAndPort> targets = request.refreshTargets; + List<Ballot> supersededBy = Collections.synchronizedList(new ArrayList<>(Collections.nCopies(targets.size(), null))); + CountDownLatch latch = CountDownLatch.newCountDownLatch(targets.size()); + + Message<PaxosPrepareRefresh.Request> refreshMsg = Message.out( + PAXOS2_PREPARE_REFRESH_REQ, + new PaxosPrepareRefresh.Request(request.promised, commitWithId), + request.isUrgent + ); + + // For tracked keyspaces, we MUST ALWAYS write to the local journal since we generated the mutation ID. + // This is required for retry purposes: if a remote target fails, the ActiveLogReconciler will try + // to look up the mutation in the local journal. The node that generated the mutation ID is the "owner" + // and must have the mutation available for retries. + // + // This is different from checking if self is in targets - even if we're not in targets, + // we're still the ID generator and need the mutation locally. + try + { + PaxosPrepareRefresh.RequestHandler.execute( + new PaxosPrepareRefresh.Request(request.promised, commitWithId), FBUtilities.getBroadcastAddressAndPort()); + // Note: we don't use the response since this node may not be in targets + } + catch (Exception e) + { + // Log but continue - we still need to send to targets + logger.warn("Failed to execute local commit for tracked keyspace mutation {}", mutationId, e); + } + + // Check if self is in targets for response tracking (separate from the local write above) + // We need to decrement the latch for the local target since we already executed above + for (int i = 0; i < targets.size(); i++) + { + if (shouldExecuteOnSelf(targets.get(i))) + { + // Already executed locally above, just decrement latch + latch.decrement(); + break; + } + } Review Comment: No reason for this to be a separate loop? ########## src/java/org/apache/cassandra/service/paxos/Paxos2CommitForwardHandler.java: ########## @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service.paxos; + +import java.util.function.Consumer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.exceptions.RequestFailure; +import org.apache.cassandra.exceptions.RequestFailureReason; +import org.apache.cassandra.net.IVerbHandler; +import org.apache.cassandra.net.Message; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.net.NoPayload; +import org.apache.cassandra.schema.KeyspaceMetadata; +import org.apache.cassandra.schema.Schema; +import org.apache.cassandra.tcm.ClusterMetadataService; +import org.apache.cassandra.tracing.Tracing; +import org.apache.cassandra.utils.concurrent.ConditionAsConsumer; + +import static org.apache.cassandra.utils.concurrent.ConditionAsConsumer.newConditionAsConsumer; + +/** + * Handler for forwarded Paxos V2 commit requests. + * Executes the commit operation on behalf of the original coordinator, + * ensuring that MutationId generation happens on a replica coordinator. + * + * The PaxosCommit constructor handles mutation ID generation, so this handler + * simply delegates to PaxosCommit.commit() with the original commit. + */ +public class Paxos2CommitForwardHandler implements IVerbHandler<Paxos2CommitForwardRequest> +{ + public static final Paxos2CommitForwardHandler instance = new Paxos2CommitForwardHandler(); + private static final Logger logger = LoggerFactory.getLogger(Paxos2CommitForwardHandler.class); + + @Override + public void doVerb(Message<Paxos2CommitForwardRequest> message) + { + // Ensure we have up-to-date cluster metadata before executing the forwarded commit + ClusterMetadataService.instance().fetchLogFromPeerOrCMS(message.from(), message.header.epoch); + Paxos2CommitForwardRequest request = message.payload; + + Tracing.trace("Executing forwarded Paxos V2 commit for {}", request.commit.partitionKey()); + + try + { + String ksName = request.commit.metadata().keyspace; + KeyspaceMetadata ksMetadata = Schema.instance.getKeyspaceMetadata(ksName); + if (ksMetadata == null) + { + MessagingService.instance().respondWithFailure(RequestFailureReason.INCOMPATIBLE_SCHEMA, message); + logger.error("Failed to forward paxos commit for non-existent keyspace {}", ksName); + return; + } + + if (!ksMetadata.params.replicationType.isTracked()) + throw new IllegalStateException("Asked to perform forwarded commit, but keyspace " + ksName + " is not tracked"); Review Comment: This is inconsistent with what we do in `CasReadForwardHandler` ########## src/java/org/apache/cassandra/service/paxos/PrepareRefreshForwardHandler.java: ########## @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service.paxos; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.db.Mutation; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.exceptions.RequestFailure; +import org.apache.cassandra.exceptions.RequestFailureReason; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.net.IVerbHandler; +import org.apache.cassandra.net.Message; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.net.RequestCallbackWithFailure; +import org.apache.cassandra.replication.MutationId; +import org.apache.cassandra.replication.MutationTrackingService; +import org.apache.cassandra.schema.KeyspaceMetadata; +import org.apache.cassandra.schema.Schema; +import org.apache.cassandra.service.paxos.Commit.Committed; +import org.apache.cassandra.tcm.ClusterMetadataService; +import org.apache.cassandra.tracing.Tracing; +import org.apache.cassandra.utils.Clock; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.concurrent.CountDownLatch; + +import static org.apache.cassandra.net.Verb.PAXOS2_PREPARE_REFRESH_REQ; +import static org.apache.cassandra.service.paxos.PaxosRequestCallback.shouldExecuteOnSelf; + +/** + * Handler for forwarded PaxosPrepareRefresh requests. + * Generates a mutation ID and sends the refresh to all target nodes. + * + * This handler is invoked when a non-replica coordinator forwards the refresh + * to a full replica that can generate the mutation ID. + */ +public class PrepareRefreshForwardHandler implements IVerbHandler<PrepareRefreshForwardRequest> +{ + public static final PrepareRefreshForwardHandler instance = new PrepareRefreshForwardHandler(); + private static final Logger logger = LoggerFactory.getLogger(PrepareRefreshForwardHandler.class); + + @Override + public void doVerb(Message<PrepareRefreshForwardRequest> message) + { + ClusterMetadataService.instance().fetchLogFromPeerOrCMS(message.from(), message.header.epoch); + PrepareRefreshForwardRequest request = message.payload; + + Tracing.trace("Executing forwarded PaxosPrepareRefresh for {}", request.commit.partitionKey()); + + try + { + String ksName = request.commit.metadata().keyspace; + KeyspaceMetadata ksMetadata = Schema.instance.getKeyspaceMetadata(ksName); + if (ksMetadata == null) + { + MessagingService.instance().respondWithFailure(RequestFailureReason.INCOMPATIBLE_SCHEMA, message); + logger.error("Failed to forward paxos prepare refresh for non-existent keyspace {}", ksName); + return; + } + + if (!ksMetadata.params.replicationType.isTracked()) + { + throw new IllegalStateException("Asked to perform forwarded prepare refresh, but keyspace " + ksName + " is not tracked"); Review Comment: Should be replying with failure? ########## src/java/org/apache/cassandra/service/paxos/PaxosRequestCallback.java: ########## @@ -80,35 +82,59 @@ protected <I> void executeOnSelf(I parameter, BiFunction<I, InetAddressAndPort, onResponse(response, getBroadcastAddressAndPort()); } - protected <I, J> void executeOnSelf(I parameter1, J parameter2, TriFunction<I, J, InetAddressAndPort, T> execute) + protected <I, J> void executeOnSelfAsync(I parameter1, J parameter2, BiFunction<I, J, Future<T>> execute) { - T response; try { - response = execute.apply(parameter1, parameter2, getBroadcastAddressAndPort()); - if (response == null) + Future<T> responseFuture = execute.apply(parameter1, parameter2); + if (responseFuture == null) return; - } - catch (RetryOnDifferentSystemException e) - { - onFailure(getBroadcastAddressAndPort(), RequestFailure.RETRY_ON_DIFFERENT_TRANSACTION_SYSTEM); - return; + + if (responseFuture.isDone()) + { + // Fast path: future already complete + T response = getUnchecked(responseFuture); + onResponse(response, getBroadcastAddressAndPort()); + } + else + { + // Async path: add callback for when future completes + responseFuture.addCallback((response, failure) -> { + if (failure != null) + { + RequestFailure reason = UNKNOWN; + if (failure instanceof WriteTimeoutException) reason = TIMEOUT; + else if (failure instanceof RetryOnDifferentSystemException) + { + onFailure(getBroadcastAddressAndPort(), RequestFailure.RETRY_ON_DIFFERENT_TRANSACTION_SYSTEM); + return; + } + else logger.error("Failed to apply {} locally", parameter1, failure); + onFailure(getBroadcastAddressAndPort(), reason); + } + else + { + onResponse(response, getBroadcastAddressAndPort()); + } + }); + } } catch (Exception ex) { RequestFailure reason = UNKNOWN; if (ex instanceof WriteTimeoutException) reason = TIMEOUT; - else logger.error("Failed to apply {}, {} locally", parameter1, parameter2, ex); - + else if (ex instanceof RetryOnDifferentSystemException) + { + onFailure(getBroadcastAddressAndPort(), RequestFailure.RETRY_ON_DIFFERENT_TRANSACTION_SYSTEM); Review Comment: onFailure() below can handle this ########## src/java/org/apache/cassandra/service/paxos/ConsensusReadForwardResponse.java: ########## @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service.paxos; + +import java.io.IOException; +import java.util.List; + +import org.apache.cassandra.db.TypeSizes; +import org.apache.cassandra.db.partitions.FilteredPartition; +import org.apache.cassandra.db.partitions.PartitionIterator; +import org.apache.cassandra.db.partitions.PartitionIterators; +import org.apache.cassandra.db.rows.RowIterator; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; +import org.apache.cassandra.db.rows.UnfilteredRowIteratorSerializer; +import org.apache.cassandra.db.rows.UnfilteredRowIterators; +import org.apache.cassandra.exceptions.CassandraException; +import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.schema.Schema; +import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.utils.CollectionSerializers; + +import static org.apache.cassandra.db.SerializationHeader.StableHeaderSerializer.STABLE; +import static org.apache.cassandra.db.rows.DeserializationHelper.Flag.FROM_REMOTE; + +/** + * Response containing the result of a forwarded consensus read operation. + * Can contain either a successful result or an exception that occurred during execution. + * + * Since consensus reads are single partition reads, we store a single RowIterator + * and wrap it as a PartitionIterator when accessed. + */ +public class ConsensusReadForwardResponse +{ + public static final Serializer serializer = new Serializer(); + + // Store as RowIterator internally (single partition) but expose as PartitionIterator + private final RowIterator rowIterator; + public final CassandraException exception; + public final List<String> warnings; + + public ConsensusReadForwardResponse(PartitionIterator result, List<String> warnings) + { + // Extract the single partition from the iterator (consensus reads are single partition) + if (result != null && result.hasNext()) + { + this.rowIterator = result.next(); + } + else + { + this.rowIterator = null; + } + this.exception = null; + this.warnings = warnings; + } + + public ConsensusReadForwardResponse(CassandraException exception, List<String> warnings) + { + this.rowIterator = null; + this.exception = exception; + this.warnings = warnings; + } + + // Private constructor for deserialization + private ConsensusReadForwardResponse(RowIterator rowIterator, CassandraException exception, List<String> warnings) + { + this.rowIterator = rowIterator; + this.exception = exception; + this.warnings = warnings; + } + + public boolean isSuccess() + { + return exception == null; + } + + /** + * Get the result as a PartitionIterator. + */ + public PartitionIterator getResult() + { + if (rowIterator == null) + return null; + return PartitionIterators.singletonIterator(rowIterator); + } + + public static class Serializer implements IVersionedSerializer<ConsensusReadForwardResponse> + { + @Override + public void serialize(ConsensusReadForwardResponse response, DataOutputPlus out, int version) throws IOException + { + out.writeBoolean(response.exception != null); Review Comment: Actually the entire class is 99% identical to `CasForwardResponse`, so I'm ditching one of them. ########## src/java/org/apache/cassandra/service/paxos/PaxosRequestCallback.java: ########## @@ -80,35 +82,59 @@ protected <I> void executeOnSelf(I parameter, BiFunction<I, InetAddressAndPort, onResponse(response, getBroadcastAddressAndPort()); } - protected <I, J> void executeOnSelf(I parameter1, J parameter2, TriFunction<I, J, InetAddressAndPort, T> execute) + protected <I, J> void executeOnSelfAsync(I parameter1, J parameter2, BiFunction<I, J, Future<T>> execute) { - T response; try { - response = execute.apply(parameter1, parameter2, getBroadcastAddressAndPort()); - if (response == null) + Future<T> responseFuture = execute.apply(parameter1, parameter2); + if (responseFuture == null) return; - } - catch (RetryOnDifferentSystemException e) - { - onFailure(getBroadcastAddressAndPort(), RequestFailure.RETRY_ON_DIFFERENT_TRANSACTION_SYSTEM); - return; + + if (responseFuture.isDone()) + { + // Fast path: future already complete + T response = getUnchecked(responseFuture); + onResponse(response, getBroadcastAddressAndPort()); + } + else + { + // Async path: add callback for when future completes + responseFuture.addCallback((response, failure) -> { + if (failure != null) + { + RequestFailure reason = UNKNOWN; + if (failure instanceof WriteTimeoutException) reason = TIMEOUT; + else if (failure instanceof RetryOnDifferentSystemException) + { + onFailure(getBroadcastAddressAndPort(), RequestFailure.RETRY_ON_DIFFERENT_TRANSACTION_SYSTEM); Review Comment: onFailure() below can handle this ########## src/java/org/apache/cassandra/service/paxos/PaxosCommitForwardHandler.java: ########## @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service.paxos; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.exceptions.RequestFailure; +import org.apache.cassandra.exceptions.RequestFailureReason; +import org.apache.cassandra.net.IVerbHandler; +import org.apache.cassandra.net.Message; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.net.NoPayload; +import org.apache.cassandra.service.StorageProxy; +import org.apache.cassandra.tcm.ClusterMetadataService; +import org.apache.cassandra.tracing.Tracing; +import org.apache.cassandra.transport.Dispatcher; + +/** + * Handler for forwarded Paxos V1 commit requests. + * Executes the commit operation on behalf of the original coordinator, + * ensuring that MutationId generation happens on a replica coordinator. + */ +public class PaxosCommitForwardHandler implements IVerbHandler<PaxosCommitForwardRequest> +{ + public static final PaxosCommitForwardHandler instance = new PaxosCommitForwardHandler(); + private static final Logger logger = LoggerFactory.getLogger(PaxosCommitForwardHandler.class); + + @Override + public void doVerb(Message<PaxosCommitForwardRequest> message) + { + // PaxosV1 when doing commit picks whatever the current replicas are to send the commits to + // so make sure we at least match what they would have picked + ClusterMetadataService.instance().fetchLogFromPeerOrCMS(message.from(), message.header.epoch); + PaxosCommitForwardRequest request = message.payload; + + Tracing.trace("Executing forwarded Paxos commit for {}", request.proposal.partitionKey()); + + try + { + String ksName = request.proposal.metadata().keyspace; + Keyspace keyspace = Keyspace.openIfExists(ksName); + if (keyspace == null) + { + MessagingService.instance().respondWithFailure(RequestFailureReason.INCOMPATIBLE_SCHEMA, message); + logger.error("Failed to forward paxos commit for non-existent keyspace {}", ksName); + return; + } + + if (!keyspace.getMetadata().params.replicationType.isTracked()) Review Comment: This is also inconsistent with what we do in CasReadForwardHandler -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]

