This is an automated email from the ASF dual-hosted git repository. dcapwell pushed a commit to branch cassandra-5.0 in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-5.0 by this push: new ca0b77d743 Repair fuzz tests fail with paxos_variant: v2 ca0b77d743 is described below commit ca0b77d7434aa75528e0cb625889825d29c5f122 Author: David Capwell <dcapw...@apache.org> AuthorDate: Mon Mar 25 14:40:09 2024 -0700 Repair fuzz tests fail with paxos_variant: v2 patch by David Capwell; reviewed by Blake Eggleston, Ekaterina Dimitrova for CASSANDRA-19042 --- src/java/org/apache/cassandra/gms/Gossiper.java | 2 + src/java/org/apache/cassandra/gms/IGossiper.java | 3 + .../org/apache/cassandra/net/MessageDelivery.java | 5 + .../org/apache/cassandra/net/MessagingService.java | 5 - .../org/apache/cassandra/repair/RepairJob.java | 2 +- .../org/apache/cassandra/repair/SharedContext.java | 54 ++++++++ .../cassandra/service/ActiveRepairService.java | 2 +- .../apache/cassandra/service/StorageService.java | 7 +- .../org/apache/cassandra/service/paxos/Paxos.java | 12 +- .../cassandra/service/paxos/PaxosRepair.java | 43 ++---- .../service/paxos/cleanup/PaxosCleanup.java | 35 +++-- .../paxos/cleanup/PaxosCleanupComplete.java | 28 ++-- .../cleanup/PaxosCleanupLocalCoordinator.java | 22 +-- .../service/paxos/cleanup/PaxosCleanupRequest.java | 63 +++++---- .../paxos/cleanup/PaxosCleanupResponse.java | 8 +- .../service/paxos/cleanup/PaxosCleanupSession.java | 54 +++----- .../paxos/cleanup/PaxosFinishPrepareCleanup.java | 112 ++------------- ...shPrepareCleanup.java => PaxosRepairState.java} | 138 ++++++++++++------- .../paxos/cleanup/PaxosStartPrepareCleanup.java | 41 +++--- .../service/paxos/cleanup/PaxosTableRepairs.java | 24 ---- .../paxos/uncommitted/PaxosUncommittedTracker.java | 4 +- .../distributed/test/PaxosRepairTest.java | 7 +- .../org/apache/cassandra/repair/FuzzTestBase.java | 152 +++++++++++++++++++-- .../org/apache/cassandra/repair/RepairJobTest.java | 4 +- 24 files changed, 468 insertions(+), 359 deletions(-) diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java b/src/java/org/apache/cassandra/gms/Gossiper.java index adf465de21..d907f76686 100644 --- a/src/java/org/apache/cassandra/gms/Gossiper.java +++ b/src/java/org/apache/cassandra/gms/Gossiper.java @@ -1328,6 +1328,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean, return ep1.getHeartBeatState().getGeneration() - ep2.getHeartBeatState().getGeneration(); } + @Override public void notifyFailureDetector(Map<InetAddressAndPort, EndpointState> remoteEpStateMap) { for (Entry<InetAddressAndPort, EndpointState> entry : remoteEpStateMap.entrySet()) @@ -1624,6 +1625,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean, } @VisibleForTesting + @Override public void applyStateLocally(Map<InetAddressAndPort, EndpointState> epStateMap) { checkProperThreadForStateMutation(); diff --git a/src/java/org/apache/cassandra/gms/IGossiper.java b/src/java/org/apache/cassandra/gms/IGossiper.java index 0e33526d22..aa9d95a97d 100644 --- a/src/java/org/apache/cassandra/gms/IGossiper.java +++ b/src/java/org/apache/cassandra/gms/IGossiper.java @@ -18,6 +18,7 @@ package org.apache.cassandra.gms; +import java.util.Map; import javax.annotation.Nullable; import org.apache.cassandra.locator.InetAddressAndPort; @@ -30,6 +31,8 @@ public interface IGossiper @Nullable EndpointState getEndpointStateForEndpoint(InetAddressAndPort ep); + void notifyFailureDetector(Map<InetAddressAndPort, EndpointState> remoteEpStateMap); + void applyStateLocally(Map<InetAddressAndPort, EndpointState> epStateMap); @Nullable default CassandraVersion getReleaseVersion(InetAddressAndPort ep) { diff --git a/src/java/org/apache/cassandra/net/MessageDelivery.java b/src/java/org/apache/cassandra/net/MessageDelivery.java index dd8c6ceeda..36001c4988 100644 --- a/src/java/org/apache/cassandra/net/MessageDelivery.java +++ b/src/java/org/apache/cassandra/net/MessageDelivery.java @@ -18,6 +18,7 @@ package org.apache.cassandra.net; +import org.apache.cassandra.exceptions.RequestFailureReason; import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.utils.concurrent.Future; @@ -28,4 +29,8 @@ public interface MessageDelivery public <REQ, RSP> void sendWithCallback(Message<REQ> message, InetAddressAndPort to, RequestCallback<RSP> cb, ConnectionType specifyConnection); public <REQ, RSP> Future<Message<RSP>> sendWithResult(Message<REQ> message, InetAddressAndPort to); public <V> void respond(V response, Message<?> message); + public default void respondWithFailure(RequestFailureReason reason, Message<?> message) + { + send(Message.failureResponse(message.id(), message.expiresAtNanos(), reason), message.respondTo()); + } } diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java index 903ef977b2..94586b41c8 100644 --- a/src/java/org/apache/cassandra/net/MessagingService.java +++ b/src/java/org/apache/cassandra/net/MessagingService.java @@ -449,11 +449,6 @@ public class MessagingService extends MessagingServiceMBeanImpl implements Messa send(message.responseWith(response), message.respondTo()); } - public <V> void respondWithFailure(RequestFailureReason reason, Message<?> message) - { - send(Message.failureResponse(message.id(), message.expiresAtNanos(), reason), message.respondTo()); - } - public void send(Message message, InetAddressAndPort to, ConnectionType specifyConnection) { if (logger.isTraceEnabled()) diff --git a/src/java/org/apache/cassandra/repair/RepairJob.java b/src/java/org/apache/cassandra/repair/RepairJob.java index c9966c5a05..c54336a6b3 100644 --- a/src/java/org/apache/cassandra/repair/RepairJob.java +++ b/src/java/org/apache/cassandra/repair/RepairJob.java @@ -129,7 +129,7 @@ public class RepairJob extends AsyncFuture<RepairResult> implements Runnable { logger.info("{} {}.{} starting paxos repair", session.previewKind.logPrefix(session.getId()), desc.keyspace, desc.columnFamily); TableMetadata metadata = Schema.instance.getTableMetadata(desc.keyspace, desc.columnFamily); - paxosRepair = PaxosCleanup.cleanup(allEndpoints, metadata, desc.ranges, session.state.commonRange.hasSkippedReplicas, taskExecutor); + paxosRepair = PaxosCleanup.cleanup(ctx, allEndpoints, metadata, desc.ranges, session.state.commonRange.hasSkippedReplicas, taskExecutor); } else { diff --git a/src/java/org/apache/cassandra/repair/SharedContext.java b/src/java/org/apache/cassandra/repair/SharedContext.java index 8ccc88f584..440da2cf45 100644 --- a/src/java/org/apache/cassandra/repair/SharedContext.java +++ b/src/java/org/apache/cassandra/repair/SharedContext.java @@ -37,6 +37,8 @@ import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.net.MessageDelivery; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.service.ActiveRepairService; +import org.apache.cassandra.service.PendingRangeCalculatorService; +import org.apache.cassandra.service.paxos.cleanup.PaxosRepairState; import org.apache.cassandra.streaming.StreamPlan; import org.apache.cassandra.utils.Clock; import org.apache.cassandra.utils.FBUtilities; @@ -57,6 +59,8 @@ public interface SharedContext ExecutorFactory executorFactory(); MBeanWrapper mbean(); ScheduledExecutorPlus optionalTasks(); + ScheduledExecutorPlus nonPeriodicTasks(); + ScheduledExecutorPlus scheduledTasks(); MessageDelivery messaging(); default SharedContext withMessaging(MessageDelivery messaging) @@ -77,6 +81,8 @@ public interface SharedContext IValidationManager validationManager(); TableRepairManager repairManager(ColumnFamilyStore store); StreamExecutor streamExecutor(); + PendingRangeCalculatorService pendingRangeCalculator(); + PaxosRepairState paxosRepairState(); class Global implements SharedContext { @@ -118,6 +124,18 @@ public interface SharedContext return ScheduledExecutors.optionalTasks; } + @Override + public ScheduledExecutorPlus nonPeriodicTasks() + { + return ScheduledExecutors.nonPeriodicTasks; + } + + @Override + public ScheduledExecutorPlus scheduledTasks() + { + return ScheduledExecutors.scheduledTasks; + } + @Override public MessageDelivery messaging() { @@ -171,6 +189,18 @@ public interface SharedContext { return StreamPlan::execute; } + + @Override + public PendingRangeCalculatorService pendingRangeCalculator() + { + return PendingRangeCalculatorService.instance; + } + + @Override + public PaxosRepairState paxosRepairState() + { + return PaxosRepairState.instance(); + } } class ForwardingSharedContext implements SharedContext @@ -223,6 +253,18 @@ public interface SharedContext return delegate().optionalTasks(); } + @Override + public ScheduledExecutorPlus nonPeriodicTasks() + { + return delegate().nonPeriodicTasks(); + } + + @Override + public ScheduledExecutorPlus scheduledTasks() + { + return delegate().scheduledTasks(); + } + @Override public MessageDelivery messaging() { @@ -276,5 +318,17 @@ public interface SharedContext { return delegate().streamExecutor(); } + + @Override + public PendingRangeCalculatorService pendingRangeCalculator() + { + return delegate().pendingRangeCalculator(); + } + + @Override + public PaxosRepairState paxosRepairState() + { + return delegate().paxosRepairState(); + } } } diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java index 7324b968a4..e120122c08 100644 --- a/src/java/org/apache/cassandra/service/ActiveRepairService.java +++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java @@ -1136,7 +1136,7 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai range, table.keyspace, table.name, pending, PAXOS_REPAIR_ALLOW_MULTIPLE_PENDING_UNSAFE.getKey())); } - Future<Void> future = PaxosCleanup.cleanup(endpoints, table, Collections.singleton(range), false, repairCommandExecutor()); + Future<Void> future = PaxosCleanup.cleanup(ctx, endpoints, table, Collections.singleton(range), false, repairCommandExecutor()); futures.add(future); } } diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 78be739301..cbdadee1c3 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -180,6 +180,7 @@ import org.apache.cassandra.net.AsyncOneResponse; import org.apache.cassandra.net.Message; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.repair.RepairCoordinator; +import org.apache.cassandra.repair.SharedContext; import org.apache.cassandra.repair.messages.RepairOption; import org.apache.cassandra.schema.CompactionParams.TombstoneOption; import org.apache.cassandra.schema.KeyspaceMetadata; @@ -199,7 +200,7 @@ import org.apache.cassandra.service.paxos.PaxosCommit; import org.apache.cassandra.service.paxos.PaxosRepair; import org.apache.cassandra.service.paxos.PaxosState; import org.apache.cassandra.service.paxos.cleanup.PaxosCleanupLocalCoordinator; -import org.apache.cassandra.service.paxos.cleanup.PaxosTableRepairs; +import org.apache.cassandra.service.paxos.cleanup.PaxosRepairState; import org.apache.cassandra.service.snapshot.SnapshotManager; import org.apache.cassandra.service.snapshot.TableSnapshot; import org.apache.cassandra.streaming.StreamManager; @@ -4863,7 +4864,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE return ImmediateFuture.success(null); List<Range<Token>> ranges = getLocalAndPendingRanges(table.keyspace); - PaxosCleanupLocalCoordinator coordinator = PaxosCleanupLocalCoordinator.createForAutoRepair(tableId, ranges); + PaxosCleanupLocalCoordinator coordinator = PaxosCleanupLocalCoordinator.createForAutoRepair(SharedContext.Global.instance, tableId, ranges); ScheduledExecutors.optionalTasks.submit(coordinator::start); return coordinator; } @@ -7349,7 +7350,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE public void clearPaxosRepairs() { logger.info("StorageService#clearPaxosRepairs called via jmx"); - PaxosTableRepairs.clearRepairs(); + PaxosRepairState.instance().clearRepairs(); } public void setSkipPaxosRepairCompatibilityCheck(boolean v) diff --git a/src/java/org/apache/cassandra/service/paxos/Paxos.java b/src/java/org/apache/cassandra/service/paxos/Paxos.java index 3b87ffb545..473b5741ad 100644 --- a/src/java/org/apache/cassandra/service/paxos/Paxos.java +++ b/src/java/org/apache/cassandra/service/paxos/Paxos.java @@ -26,6 +26,7 @@ import java.util.Objects; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.function.Function; +import java.util.function.Predicate; import java.util.function.Supplier; import javax.annotation.Nullable; @@ -85,9 +86,9 @@ import org.apache.cassandra.service.CASRequest; import org.apache.cassandra.service.ClientState; import org.apache.cassandra.service.FailureRecordingCallback.AsMap; import org.apache.cassandra.service.paxos.Commit.Proposal; +import org.apache.cassandra.service.paxos.cleanup.PaxosRepairState; import org.apache.cassandra.service.reads.DataResolver; import org.apache.cassandra.service.reads.repair.NoopReadRepair; -import org.apache.cassandra.service.paxos.cleanup.PaxosTableRepairs; import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.triggers.TriggerExecutor; import org.apache.cassandra.utils.CassandraVersion; @@ -385,13 +386,18 @@ public class Paxos } static Participants get(TableMetadata table, Token token, ConsistencyLevel consistencyForConsensus) + { + return get(table, token, consistencyForConsensus, FailureDetector.isReplicaAlive); + } + + static Participants get(TableMetadata table, Token token, ConsistencyLevel consistencyForConsensus, Predicate<Replica> isReplicaAlive) { Keyspace keyspace = Keyspace.open(table.keyspace); ReplicaLayout.ForTokenWrite all = forTokenWriteLiveAndDown(keyspace, token); ReplicaLayout.ForTokenWrite electorate = consistencyForConsensus.isDatacenterLocal() ? all.filter(InOurDc.replicas()) : all; - EndpointsForToken live = all.all().filter(FailureDetector.isReplicaAlive); + EndpointsForToken live = all.all().filter(isReplicaAlive); return new Participants(keyspace, consistencyForConsensus, all, electorate, live); } @@ -1255,6 +1261,6 @@ public class Paxos public static void evictHungRepairs() { - PaxosTableRepairs.evictHungRepairs(); + PaxosRepairState.instance().evictHungRepairs(); } } diff --git a/src/java/org/apache/cassandra/service/paxos/PaxosRepair.java b/src/java/org/apache/cassandra/service/paxos/PaxosRepair.java index 45a3664731..ae5bc557c7 100644 --- a/src/java/org/apache/cassandra/service/paxos/PaxosRepair.java +++ b/src/java/org/apache/cassandra/service/paxos/PaxosRepair.java @@ -42,10 +42,7 @@ import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.exceptions.RequestFailureReason; import org.apache.cassandra.exceptions.UnavailableException; -import org.apache.cassandra.gms.ApplicationState; -import org.apache.cassandra.gms.EndpointState; -import org.apache.cassandra.gms.Gossiper; -import org.apache.cassandra.gms.VersionedValue; +import org.apache.cassandra.gms.IGossiper; import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; @@ -55,6 +52,7 @@ import org.apache.cassandra.net.IVerbHandler; import org.apache.cassandra.net.Message; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.net.RequestCallbackWithFailure; +import org.apache.cassandra.repair.SharedContext; import org.apache.cassandra.schema.Schema; import org.apache.cassandra.schema.TableId; import org.apache.cassandra.schema.TableMetadata; @@ -659,45 +657,24 @@ public class PaxosRepair extends AbstractPaxosRepair return (version.major == 4 && version.minor > 0) || version.major > 4; } - static String getPeerVersion(InetAddressAndPort peer) + static boolean validatePeerCompatibility(IGossiper gossiper, Replica peer) { - EndpointState epState = Gossiper.instance.getEndpointStateForEndpoint(peer); - if (epState == null) - return null; - - VersionedValue value = epState.getApplicationState(ApplicationState.RELEASE_VERSION); - if (value == null) - return null; - - try - { - return value.value; - } - catch (IllegalArgumentException e) - { - return null; - } - } - - static boolean validatePeerCompatibility(Replica peer) - { - String versionString = getPeerVersion(peer.endpoint()); - CassandraVersion version = versionString != null ? new CassandraVersion(versionString) : null; + CassandraVersion version = gossiper.getReleaseVersion(peer.endpoint()); boolean result = validateVersionCompatibility(version); if (!result) - logger.info("PaxosRepair isn't supported by {} on version {}", peer, versionString); + logger.info("PaxosRepair isn't supported by {} on version {}", peer, version); return result; } - static boolean validatePeerCompatibility(TableMetadata table, Range<Token> range) + static boolean validatePeerCompatibility(SharedContext ctx, TableMetadata table, Range<Token> range) { - Participants participants = Participants.get(table, range.right, ConsistencyLevel.SERIAL); - return Iterables.all(participants.all, PaxosRepair::validatePeerCompatibility); + Participants participants = Participants.get(table, range.right, ConsistencyLevel.SERIAL, r -> ctx.failureDetector().isAlive(r.endpoint())); + return Iterables.all(participants.all, r -> validatePeerCompatibility(ctx.gossiper(), r)); } - public static boolean validatePeerCompatibility(TableMetadata table, Collection<Range<Token>> ranges) + public static boolean validatePeerCompatibility(SharedContext ctx, TableMetadata table, Collection<Range<Token>> ranges) { - return Iterables.all(ranges, range -> validatePeerCompatibility(table, range)); + return Iterables.all(ranges, range -> validatePeerCompatibility(ctx, table, range)); } public static void shutdownAndWait(long timeout, TimeUnit units) throws InterruptedException, TimeoutException diff --git a/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanup.java b/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanup.java index 6eb1ebd574..7b4163e2a2 100644 --- a/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanup.java +++ b/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanup.java @@ -29,32 +29,30 @@ import com.google.common.collect.Iterables; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.cassandra.concurrent.ScheduledExecutors; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.gms.EndpointState; -import org.apache.cassandra.gms.Gossiper; import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.locator.RangesAtEndpoint; +import org.apache.cassandra.repair.SharedContext; import org.apache.cassandra.schema.Schema; import org.apache.cassandra.schema.TableId; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.service.paxos.Ballot; -import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.concurrent.AsyncFuture; import org.apache.cassandra.utils.concurrent.Future; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.apache.cassandra.config.DatabaseDescriptor.getCasContentionTimeout; import static org.apache.cassandra.config.DatabaseDescriptor.getWriteRpcTimeout; -import static org.apache.cassandra.utils.FBUtilities.getBroadcastAddressAndPort; public class PaxosCleanup extends AsyncFuture<Void> implements Runnable { private static final Logger logger = LoggerFactory.getLogger(PaxosCleanup.class); + private final SharedContext ctx; private final Collection<InetAddressAndPort> endpoints; private final TableMetadata table; private final Collection<Range<Token>> ranges; @@ -67,8 +65,9 @@ public class PaxosCleanup extends AsyncFuture<Void> implements Runnable private PaxosCleanupSession session; private PaxosCleanupComplete complete; - public PaxosCleanup(Collection<InetAddressAndPort> endpoints, TableMetadata table, Collection<Range<Token>> ranges, boolean skippedReplicas, Executor executor) + public PaxosCleanup(SharedContext ctx, Collection<InetAddressAndPort> endpoints, TableMetadata table, Collection<Range<Token>> ranges, boolean skippedReplicas, Executor executor) { + this.ctx = ctx; this.endpoints = endpoints; this.table = table; this.ranges = ranges; @@ -81,51 +80,51 @@ public class PaxosCleanup extends AsyncFuture<Void> implements Runnable future.addCallback(onComplete, this::tryFailure); } - public static PaxosCleanup cleanup(Collection<InetAddressAndPort> endpoints, TableMetadata table, Collection<Range<Token>> ranges, boolean skippedReplicas, Executor executor) + public static PaxosCleanup cleanup(SharedContext ctx, Collection<InetAddressAndPort> endpoints, TableMetadata table, Collection<Range<Token>> ranges, boolean skippedReplicas, Executor executor) { - PaxosCleanup cleanup = new PaxosCleanup(endpoints, table, ranges, skippedReplicas, executor); + PaxosCleanup cleanup = new PaxosCleanup(ctx, endpoints, table, ranges, skippedReplicas, executor); executor.execute(cleanup); return cleanup; } public void run() { - EndpointState localEpState = Gossiper.instance.getEndpointStateForEndpoint(getBroadcastAddressAndPort()); - startPrepare = PaxosStartPrepareCleanup.prepare(table.id, endpoints, localEpState, ranges); + EndpointState localEpState = ctx.gossiper().getEndpointStateForEndpoint(ctx.broadcastAddressAndPort()); + startPrepare = PaxosStartPrepareCleanup.prepare(ctx, table.id, endpoints, localEpState, ranges); addCallback(startPrepare, this::finishPrepare); } private void finishPrepare(PaxosCleanupHistory result) { - ScheduledExecutors.nonPeriodicTasks.schedule(() -> { - finishPrepare = PaxosFinishPrepareCleanup.finish(endpoints, result); + ctx.nonPeriodicTasks().schedule(() -> { + finishPrepare = PaxosFinishPrepareCleanup.finish(ctx, endpoints, result); addCallback(finishPrepare, (v) -> startSession(result.highBound)); }, Math.min(getCasContentionTimeout(MILLISECONDS), getWriteRpcTimeout(MILLISECONDS)), MILLISECONDS); } private void startSession(Ballot lowBound) { - session = new PaxosCleanupSession(endpoints, table.id, ranges); + session = new PaxosCleanupSession(ctx, endpoints, table.id, ranges); addCallback(session, (v) -> finish(lowBound)); executor.execute(session); } private void finish(Ballot lowBound) { - complete = new PaxosCleanupComplete(endpoints, table.id, ranges, lowBound, skippedReplicas); + complete = new PaxosCleanupComplete(ctx, endpoints, table.id, ranges, lowBound, skippedReplicas); addCallback(complete, this::trySuccess); executor.execute(complete); } - private static boolean isOutOfRange(String ksName, Collection<Range<Token>> repairRanges) + private static boolean isOutOfRange(SharedContext ctx, String ksName, Collection<Range<Token>> repairRanges) { Keyspace keyspace = Keyspace.open(ksName); List<Range<Token>> localRanges = Range.normalize(keyspace.getReplicationStrategy() .getAddressReplicas() - .get(FBUtilities.getBroadcastAddressAndPort()) + .get(ctx.broadcastAddressAndPort()) .ranges()); - RangesAtEndpoint pendingRanges = StorageService.instance.getTokenMetadata().getPendingRanges(ksName, FBUtilities.getBroadcastAddressAndPort()); + RangesAtEndpoint pendingRanges = StorageService.instance.getTokenMetadata().getPendingRanges(ksName, ctx.broadcastAddressAndPort()); if (!pendingRanges.isEmpty()) { localRanges.addAll(pendingRanges.ranges()); @@ -140,14 +139,14 @@ public class PaxosCleanup extends AsyncFuture<Void> implements Runnable return false; } - static boolean isInRangeAndShouldProcess(Collection<Range<Token>> ranges, TableId tableId) + static boolean isInRangeAndShouldProcess(SharedContext ctx, Collection<Range<Token>> ranges, TableId tableId) { TableMetadata metadata = Schema.instance.getTableMetadata(tableId); Keyspace keyspace = Keyspace.open(metadata.keyspace); Preconditions.checkNotNull(keyspace); - if (!isOutOfRange(metadata.keyspace, ranges)) + if (!isOutOfRange(ctx, metadata.keyspace, ranges)) return true; logger.warn("Out of range PaxosCleanup request for {}: {}", metadata, ranges); diff --git a/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanupComplete.java b/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanupComplete.java index 0196e9cce0..8742af84e0 100644 --- a/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanupComplete.java +++ b/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanupComplete.java @@ -31,7 +31,10 @@ import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.locator.InetAddressAndPort; -import org.apache.cassandra.net.*; +import org.apache.cassandra.net.IVerbHandler; +import org.apache.cassandra.net.Message; +import org.apache.cassandra.net.RequestCallbackWithFailure; +import org.apache.cassandra.repair.SharedContext; import org.apache.cassandra.schema.Schema; import org.apache.cassandra.schema.TableId; import org.apache.cassandra.service.paxos.Ballot; @@ -48,9 +51,11 @@ public class PaxosCleanupComplete extends AsyncFuture<Void> implements RequestCa final Collection<Range<Token>> ranges; final Ballot lowBound; final boolean skippedReplicas; + private final SharedContext ctx; - PaxosCleanupComplete(Collection<InetAddressAndPort> endpoints, TableId tableId, Collection<Range<Token>> ranges, Ballot lowBound, boolean skippedReplicas) + PaxosCleanupComplete(SharedContext ctx, Collection<InetAddressAndPort> endpoints, TableId tableId, Collection<Range<Token>> ranges, Ballot lowBound, boolean skippedReplicas) { + this.ctx = ctx; this.waitingResponse = new HashSet<>(endpoints); this.tableId = tableId; this.ranges = ranges; @@ -64,7 +69,7 @@ public class PaxosCleanupComplete extends AsyncFuture<Void> implements RequestCa : new Request(tableId, Ballot.none(), Collections.emptyList()); Message<Request> message = Message.out(PAXOS2_CLEANUP_COMPLETE_REQ, request); for (InetAddressAndPort endpoint : waitingResponse) - MessagingService.instance().sendWithCallback(message, endpoint, this); + ctx.messaging().sendWithCallback(message, endpoint, this); } @Override @@ -86,7 +91,7 @@ public class PaxosCleanupComplete extends AsyncFuture<Void> implements RequestCa trySuccess(null); } - static class Request + public static class Request { final TableId tableId; final Ballot lowBound; @@ -136,9 +141,14 @@ public class PaxosCleanupComplete extends AsyncFuture<Void> implements RequestCa } }; - public static final IVerbHandler<Request> verbHandler = (in) -> { - ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreInstance(in.payload.tableId); - cfs.onPaxosRepairComplete(in.payload.ranges, in.payload.lowBound); - MessagingService.instance().respond(noPayload, in); - }; + public static IVerbHandler<Request> createVerbHandler(SharedContext ctx) + { + return (in) -> { + ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreInstance(in.payload.tableId); + cfs.onPaxosRepairComplete(in.payload.ranges, in.payload.lowBound); + ctx.messaging().respond(noPayload, in); + }; + } + + public static final IVerbHandler<Request> verbHandler = createVerbHandler(SharedContext.Global.instance); } diff --git a/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanupLocalCoordinator.java b/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanupLocalCoordinator.java index 3378714a0c..14c970b7db 100644 --- a/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanupLocalCoordinator.java +++ b/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanupLocalCoordinator.java @@ -32,6 +32,7 @@ import org.apache.cassandra.db.ConsistencyLevel; import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; +import org.apache.cassandra.repair.SharedContext; import org.apache.cassandra.schema.Schema; import org.apache.cassandra.schema.TableId; import org.apache.cassandra.schema.TableMetadata; @@ -43,7 +44,6 @@ import org.apache.cassandra.utils.CloseableIterator; import org.apache.cassandra.utils.concurrent.AsyncFuture; import static org.apache.cassandra.service.paxos.cleanup.PaxosCleanupSession.TIMEOUT_NANOS; -import static org.apache.cassandra.utils.Clock.Global.nanoTime; public class PaxosCleanupLocalCoordinator extends AsyncFuture<PaxosCleanupResponse> { @@ -55,21 +55,23 @@ public class PaxosCleanupLocalCoordinator extends AsyncFuture<PaxosCleanupRespon private final TableMetadata table; private final Collection<Range<Token>> ranges; private final CloseableIterator<UncommittedPaxosKey> uncommittedIter; + private final SharedContext ctx; private int count = 0; private final long deadline; private final Map<DecoratedKey, AbstractPaxosRepair> inflight = new ConcurrentHashMap<>(); private final PaxosTableRepairs tableRepairs; - private PaxosCleanupLocalCoordinator(UUID session, TableId tableId, Collection<Range<Token>> ranges, CloseableIterator<UncommittedPaxosKey> uncommittedIter) + private PaxosCleanupLocalCoordinator(SharedContext ctx, UUID session, TableId tableId, Collection<Range<Token>> ranges, CloseableIterator<UncommittedPaxosKey> uncommittedIter) { + this.ctx = ctx; this.session = session; this.tableId = tableId; this.table = Schema.instance.getTableMetadata(tableId); this.ranges = ranges; this.uncommittedIter = uncommittedIter; - this.tableRepairs = PaxosTableRepairs.getForTable(tableId); - this.deadline = TIMEOUT_NANOS + nanoTime(); + this.tableRepairs = ctx.paxosRepairState().getForTable(tableId); + this.deadline = TIMEOUT_NANOS + ctx.clock().nanoTime(); } public synchronized void start() @@ -80,7 +82,7 @@ public class PaxosCleanupLocalCoordinator extends AsyncFuture<PaxosCleanupRespon return; } - if (!PaxosRepair.validatePeerCompatibility(table, ranges)) + if (!PaxosRepair.validatePeerCompatibility(ctx, table, ranges)) { fail("Unsupported peer versions for " + tableId + ' ' + ranges.toString()); return; @@ -91,16 +93,16 @@ public class PaxosCleanupLocalCoordinator extends AsyncFuture<PaxosCleanupRespon scheduleKeyRepairsOrFinish(); } - public static PaxosCleanupLocalCoordinator create(PaxosCleanupRequest request) + public static PaxosCleanupLocalCoordinator create(SharedContext ctx, PaxosCleanupRequest request) { CloseableIterator<UncommittedPaxosKey> iterator = PaxosState.uncommittedTracker().uncommittedKeyIterator(request.tableId, request.ranges); - return new PaxosCleanupLocalCoordinator(request.session, request.tableId, request.ranges, iterator); + return new PaxosCleanupLocalCoordinator(ctx, request.session, request.tableId, request.ranges, iterator); } - public static PaxosCleanupLocalCoordinator createForAutoRepair(TableId tableId, Collection<Range<Token>> ranges) + public static PaxosCleanupLocalCoordinator createForAutoRepair(SharedContext ctx, TableId tableId, Collection<Range<Token>> ranges) { CloseableIterator<UncommittedPaxosKey> iterator = PaxosState.uncommittedTracker().uncommittedKeyIterator(tableId, ranges); - return new PaxosCleanupLocalCoordinator(INTERNAL_SESSION, tableId, ranges, iterator); + return new PaxosCleanupLocalCoordinator(ctx, INTERNAL_SESSION, tableId, ranges, iterator); } /** @@ -113,7 +115,7 @@ public class PaxosCleanupLocalCoordinator extends AsyncFuture<PaxosCleanupRespon Preconditions.checkArgument(parallelism > 0); if (inflight.size() < parallelism) { - if (nanoTime() - deadline >= 0) + if (ctx.clock().nanoTime() - deadline >= 0) { fail("timeout"); return; diff --git a/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanupRequest.java b/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanupRequest.java index 4db457f4af..2dbbc58d69 100644 --- a/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanupRequest.java +++ b/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanupRequest.java @@ -38,10 +38,10 @@ import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.net.IVerbHandler; import org.apache.cassandra.net.Message; +import org.apache.cassandra.repair.SharedContext; import org.apache.cassandra.schema.TableId; import org.apache.cassandra.utils.UUIDSerializer; -import static org.apache.cassandra.net.MessagingService.instance; import static org.apache.cassandra.net.NoPayload.noPayload; import static org.apache.cassandra.net.Verb.PAXOS2_CLEANUP_RSP2; @@ -68,40 +68,45 @@ public class PaxosCleanupRequest this.ranges = rangesOrMin(ranges); } - public static final IVerbHandler<PaxosCleanupRequest> verbHandler = in -> { - PaxosCleanupRequest request = in.payload; - - if (!PaxosCleanup.isInRangeAndShouldProcess(request.ranges, request.tableId)) - { - String msg = String.format("Rejecting cleanup request %s from %s. Some ranges are not replicated (%s)", - request.session, in.from(), request.ranges); - Message<PaxosCleanupResponse> response = Message.out(PAXOS2_CLEANUP_RSP2, PaxosCleanupResponse.failed(request.session, msg)); - instance().send(response, in.respondTo()); - return; - } - - PaxosCleanupLocalCoordinator coordinator = PaxosCleanupLocalCoordinator.create(request); + public static IVerbHandler<PaxosCleanupRequest> createVerbHandler(SharedContext ctx) + { + return in -> { + PaxosCleanupRequest request = in.payload; - coordinator.addCallback(new FutureCallback<PaxosCleanupResponse>() - { - public void onSuccess(@Nullable PaxosCleanupResponse finished) + if (!PaxosCleanup.isInRangeAndShouldProcess(ctx, request.ranges, request.tableId)) { - Message<PaxosCleanupResponse> response = Message.out(PAXOS2_CLEANUP_RSP2, coordinator.getNow()); - instance().send(response, in.respondTo()); + String msg = String.format("Rejecting cleanup request %s from %s. Some ranges are not replicated (%s)", + request.session, in.from(), request.ranges); + Message<PaxosCleanupResponse> response = Message.out(PAXOS2_CLEANUP_RSP2, PaxosCleanupResponse.failed(request.session, msg)); + ctx.messaging().send(response, in.respondTo()); + return; } - public void onFailure(Throwable throwable) - { - Message<PaxosCleanupResponse> response = Message.out(PAXOS2_CLEANUP_RSP2, PaxosCleanupResponse.failed(request.session, throwable.getMessage())); - instance().send(response, in.respondTo()); - } - }); + PaxosCleanupLocalCoordinator coordinator = PaxosCleanupLocalCoordinator.create(ctx, request); - // ack the request so the coordinator knows we've started - instance().respond(noPayload, in); + coordinator.addCallback(new FutureCallback<>() + { + public void onSuccess(@Nullable PaxosCleanupResponse finished) + { + Message<PaxosCleanupResponse> response = Message.out(PAXOS2_CLEANUP_RSP2, coordinator.getNow()); + ctx.messaging().send(response, in.respondTo()); + } + + public void onFailure(Throwable throwable) + { + Message<PaxosCleanupResponse> response = Message.out(PAXOS2_CLEANUP_RSP2, PaxosCleanupResponse.failed(request.session, throwable.getMessage())); + ctx.messaging().send(response, in.respondTo()); + } + }); + + // ack the request so the coordinator knows we've started + ctx.messaging().respond(noPayload, in); + + coordinator.start(); + }; + } - coordinator.start(); - }; + public static final IVerbHandler<PaxosCleanupRequest> verbHandler = createVerbHandler(SharedContext.Global.instance); public static final IVersionedSerializer<PaxosCleanupRequest> serializer = new IVersionedSerializer<PaxosCleanupRequest>() { diff --git a/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanupResponse.java b/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanupResponse.java index 1c90162001..2315d687bc 100644 --- a/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanupResponse.java +++ b/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanupResponse.java @@ -28,6 +28,7 @@ import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.net.IVerbHandler; +import org.apache.cassandra.repair.SharedContext; import org.apache.cassandra.utils.UUIDSerializer; public class PaxosCleanupResponse @@ -53,7 +54,12 @@ public class PaxosCleanupResponse return new PaxosCleanupResponse(session, false, message); } - public static final IVerbHandler<PaxosCleanupResponse> verbHandler = (message) -> PaxosCleanupSession.finishSession(message.from(), message.payload); + public static IVerbHandler<PaxosCleanupResponse> createVerbHandler(SharedContext ctx) + { + return message -> ctx.paxosRepairState().finishSession(message.from(), message.payload); + } + + public static final IVerbHandler<PaxosCleanupResponse> verbHandler = createVerbHandler(SharedContext.Global.instance); public static final IVersionedSerializer<PaxosCleanupResponse> serializer = new IVersionedSerializer<PaxosCleanupResponse>() { diff --git a/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanupSession.java b/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanupSession.java index 5f1eea6b68..681f67a56c 100644 --- a/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanupSession.java +++ b/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanupSession.java @@ -20,14 +20,12 @@ package org.apache.cassandra.service.paxos.cleanup; import java.lang.ref.WeakReference; import java.util.*; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import com.google.common.base.Preconditions; -import org.apache.cassandra.concurrent.ScheduledExecutors; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.exceptions.RequestFailureReason; @@ -38,23 +36,21 @@ import org.apache.cassandra.gms.IFailureDetectionEventListener; import org.apache.cassandra.gms.VersionedValue; import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.net.Message; -import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.net.RequestCallbackWithFailure; +import org.apache.cassandra.repair.SharedContext; import org.apache.cassandra.schema.TableId; import org.apache.cassandra.utils.concurrent.AsyncFuture; import static org.apache.cassandra.config.CassandraRelevantProperties.PAXOS_CLEANUP_SESSION_TIMEOUT_SECONDS; import static org.apache.cassandra.net.Verb.PAXOS2_CLEANUP_REQ; -import static org.apache.cassandra.utils.Clock.Global.nanoTime; public class PaxosCleanupSession extends AsyncFuture<Void> implements Runnable, IEndpointStateChangeSubscriber, IFailureDetectionEventListener, RequestCallbackWithFailure<Void> { - private static final Map<UUID, PaxosCleanupSession> sessions = new ConcurrentHashMap<>(); - static final long TIMEOUT_NANOS; + static { long timeoutSeconds = PAXOS_CLEANUP_SESSION_TIMEOUT_SECONDS.getLong(); @@ -63,10 +59,12 @@ public class PaxosCleanupSession extends AsyncFuture<Void> implements Runnable, private static class TimeoutTask implements Runnable { + private final SharedContext ctx; private final WeakReference<PaxosCleanupSession> ref; TimeoutTask(PaxosCleanupSession session) { + this.ctx = session.ctx; this.ref = new WeakReference<>(session); } @@ -77,7 +75,7 @@ public class PaxosCleanupSession extends AsyncFuture<Void> implements Runnable, if (session == null || session.isDone()) return; - long remaining = session.lastMessageSentNanos + TIMEOUT_NANOS - nanoTime(); + long remaining = session.lastMessageSentNanos + TIMEOUT_NANOS - ctx.clock().nanoTime(); if (remaining > 0) schedule(remaining); else @@ -86,7 +84,7 @@ public class PaxosCleanupSession extends AsyncFuture<Void> implements Runnable, ScheduledFuture<?> schedule(long delayNanos) { - return ScheduledExecutors.scheduledTasks.scheduleTimeoutWithDelay(this, delayNanos, TimeUnit.NANOSECONDS); + return ctx.scheduledTasks().scheduleTimeoutWithDelay(this, delayNanos, TimeUnit.NANOSECONDS); } private static ScheduledFuture<?> schedule(PaxosCleanupSession session) @@ -95,38 +93,29 @@ public class PaxosCleanupSession extends AsyncFuture<Void> implements Runnable, } } - private final UUID session = UUID.randomUUID(); + private final SharedContext ctx; + public final UUID session = UUID.randomUUID(); private final TableId tableId; private final Collection<Range<Token>> ranges; private final Queue<InetAddressAndPort> pendingCleanups = new ConcurrentLinkedQueue<>(); private InetAddressAndPort inProgress = null; - private volatile long lastMessageSentNanos = nanoTime(); + private volatile long lastMessageSentNanos; private ScheduledFuture<?> timeout; - PaxosCleanupSession(Collection<InetAddressAndPort> endpoints, TableId tableId, Collection<Range<Token>> ranges) + PaxosCleanupSession(SharedContext ctx, Collection<InetAddressAndPort> endpoints, TableId tableId, Collection<Range<Token>> ranges) { + this.ctx = ctx; this.tableId = tableId; this.ranges = ranges; pendingCleanups.addAll(endpoints); - } - - private static void setSession(PaxosCleanupSession session) - { - Preconditions.checkState(!sessions.containsKey(session.session)); - sessions.put(session.session, session); - } - - private static void removeSession(PaxosCleanupSession session) - { - Preconditions.checkState(sessions.containsKey(session.session)); - sessions.remove(session.session); + lastMessageSentNanos = ctx.clock().nanoTime(); } @Override public void run() { - setSession(this); + ctx.paxosRepairState().setSession(this); startNextOrFinish(); if (!isDone()) timeout = TimeoutTask.schedule(this); @@ -134,10 +123,10 @@ public class PaxosCleanupSession extends AsyncFuture<Void> implements Runnable, private void startCleanup(InetAddressAndPort endpoint) { - lastMessageSentNanos = nanoTime(); + lastMessageSentNanos = ctx.clock().nanoTime(); PaxosCleanupRequest completer = new PaxosCleanupRequest(session, tableId, ranges); Message<PaxosCleanupRequest> msg = Message.out(PAXOS2_CLEANUP_REQ, completer); - MessagingService.instance().sendWithCallback(msg, endpoint, this); + ctx.messaging().sendWithCallback(msg, endpoint, this); } private synchronized void startNextOrFinish() @@ -157,7 +146,7 @@ public class PaxosCleanupSession extends AsyncFuture<Void> implements Runnable, } else { - removeSession(this); + ctx.paxosRepairState().removeSession(this); trySuccess(null); if (timeout != null) timeout.cancel(true); @@ -168,13 +157,13 @@ public class PaxosCleanupSession extends AsyncFuture<Void> implements Runnable, { if (isDone()) return; - removeSession(this); + ctx.paxosRepairState().removeSession(this); tryFailure(new PaxosCleanupException(message)); if (timeout != null) timeout.cancel(true); } - private synchronized void finish(InetAddressAndPort from, PaxosCleanupResponse finished) + synchronized void finish(InetAddressAndPort from, PaxosCleanupResponse finished) { Preconditions.checkArgument(from.equals(inProgress), "Received unexpected cleanup complete response from %s for session %s. Expected %s", from, session, inProgress); inProgress = null; @@ -189,13 +178,6 @@ public class PaxosCleanupSession extends AsyncFuture<Void> implements Runnable, } } - public static void finishSession(InetAddressAndPort from, PaxosCleanupResponse response) - { - PaxosCleanupSession session = sessions.get(response.session); - if (session != null) - session.finish(from, response); - } - private synchronized void maybeKillSession(InetAddressAndPort unavailable, String reason) { // don't fail if we've already completed the cleanup for the unavailable endpoint, diff --git a/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosFinishPrepareCleanup.java b/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosFinishPrepareCleanup.java index 92d8d35028..b3104788d2 100644 --- a/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosFinishPrepareCleanup.java +++ b/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosFinishPrepareCleanup.java @@ -18,25 +18,16 @@ package org.apache.cassandra.service.paxos.cleanup; -import java.io.IOException; import java.util.*; -import java.util.concurrent.atomic.AtomicReference; -import org.apache.cassandra.concurrent.Stage; -import org.apache.cassandra.db.SystemKeyspace; import org.apache.cassandra.exceptions.RequestFailureReason; -import org.apache.cassandra.io.FSWriteError; import org.apache.cassandra.locator.InetAddressAndPort; -import org.apache.cassandra.net.*; -import org.apache.cassandra.schema.Schema; -import org.apache.cassandra.service.paxos.Ballot; -import org.apache.cassandra.service.paxos.PaxosState; -import org.apache.cassandra.utils.Throwables; +import org.apache.cassandra.net.IVerbHandler; +import org.apache.cassandra.net.Message; +import org.apache.cassandra.net.RequestCallbackWithFailure; +import org.apache.cassandra.net.Verb; +import org.apache.cassandra.repair.SharedContext; import org.apache.cassandra.utils.concurrent.AsyncFuture; -import org.apache.cassandra.utils.concurrent.IntrusiveStack; - -import static org.apache.cassandra.exceptions.RequestFailureReason.UNKNOWN; -import static org.apache.cassandra.net.NoPayload.noPayload; public class PaxosFinishPrepareCleanup extends AsyncFuture<Void> implements RequestCallbackWithFailure<Void> { @@ -47,14 +38,14 @@ public class PaxosFinishPrepareCleanup extends AsyncFuture<Void> implements Requ this.waitingResponse = new HashSet<>(endpoints); } - public static PaxosFinishPrepareCleanup finish(Collection<InetAddressAndPort> endpoints, PaxosCleanupHistory result) + public static PaxosFinishPrepareCleanup finish(SharedContext ctx, Collection<InetAddressAndPort> endpoints, PaxosCleanupHistory result) { PaxosFinishPrepareCleanup callback = new PaxosFinishPrepareCleanup(endpoints); synchronized (callback) { Message<PaxosCleanupHistory> message = Message.out(Verb.PAXOS2_CLEANUP_FINISH_PREPARE_REQ, result); for (InetAddressAndPort endpoint : endpoints) - MessagingService.instance().sendWithCallback(message, endpoint, callback); + ctx.messaging().sendWithCallback(message, endpoint, callback); } return callback; } @@ -77,93 +68,10 @@ public class PaxosFinishPrepareCleanup extends AsyncFuture<Void> implements Requ trySuccess(null); } - static class PendingCleanup extends IntrusiveStack<PendingCleanup> + public static IVerbHandler<PaxosCleanupHistory> createVerbHandler(SharedContext ctx) { - private static final AtomicReference<PendingCleanup> pendingCleanup = new AtomicReference(); - private static final Runnable CLEANUP = () -> { - PendingCleanup list = pendingCleanup.getAndSet(null); - if (list == null) - return; - - Ballot highBound = Ballot.none(); - for (PendingCleanup pending : IntrusiveStack.iterable(list)) - { - PaxosCleanupHistory cleanupHistory = pending.message.payload; - if (cleanupHistory.highBound.compareTo(highBound) > 0) - highBound = cleanupHistory.highBound; - } - try - { - try - { - PaxosState.ballotTracker().updateLowBound(highBound); - } - catch (IOException e) - { - throw new FSWriteError(e); - } - } - catch (Throwable t) - { - for (PendingCleanup pending : IntrusiveStack.iterable(list)) - MessagingService.instance().respondWithFailure(UNKNOWN, pending.message); - throw t; - } - - Set<PendingCleanup> failed = null; - Throwable fail = null; - for (PendingCleanup pending : IntrusiveStack.iterable(list)) - { - try - { - Schema.instance.getColumnFamilyStoreInstance(pending.message.payload.tableId) - .syncPaxosRepairHistory(pending.message.payload.history, false); - } - catch (Throwable t) - { - fail = Throwables.merge(fail, t); - if (failed == null) - failed = Collections.newSetFromMap(new IdentityHashMap<>()); - failed.add(pending); - MessagingService.instance().respondWithFailure(UNKNOWN, pending.message); - } - } - - try - { - SystemKeyspace.flushPaxosRepairHistory(); - for (PendingCleanup pending : IntrusiveStack.iterable(list)) - { - if (failed == null || !failed.contains(pending)) - MessagingService.instance().respond(noPayload, pending.message); - } - } - catch (Throwable t) - { - fail = Throwables.merge(fail, t); - for (PendingCleanup pending : IntrusiveStack.iterable(list)) - { - if (failed == null || !failed.contains(pending)) - MessagingService.instance().respondWithFailure(UNKNOWN, pending.message); - } - } - Throwables.maybeFail(fail); - }; - - final Message<PaxosCleanupHistory> message; - PendingCleanup(Message<PaxosCleanupHistory> message) - { - this.message = message; - } - - public static void add(Message<PaxosCleanupHistory> message) - { - PendingCleanup next = new PendingCleanup(message); - PendingCleanup prev = IntrusiveStack.push(AtomicReference::get, AtomicReference::compareAndSet, pendingCleanup, next); - if (prev == null) - Stage.MISC.execute(CLEANUP); - } + return ctx.paxosRepairState()::addCleanupHistory; } - public static final IVerbHandler<PaxosCleanupHistory> verbHandler = PendingCleanup::add; + public static final IVerbHandler<PaxosCleanupHistory> verbHandler = createVerbHandler(SharedContext.Global.instance); } diff --git a/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosFinishPrepareCleanup.java b/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosRepairState.java similarity index 56% copy from src/java/org/apache/cassandra/service/paxos/cleanup/PaxosFinishPrepareCleanup.java copy to src/java/org/apache/cassandra/service/paxos/cleanup/PaxosRepairState.java index 92d8d35028..5636d5cb73 100644 --- a/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosFinishPrepareCleanup.java +++ b/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosRepairState.java @@ -19,68 +19,124 @@ package org.apache.cassandra.service.paxos.cleanup; import java.io.IOException; -import java.util.*; +import java.util.Collections; +import java.util.IdentityHashMap; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import com.google.common.base.Preconditions; + import org.apache.cassandra.concurrent.Stage; import org.apache.cassandra.db.SystemKeyspace; -import org.apache.cassandra.exceptions.RequestFailureReason; import org.apache.cassandra.io.FSWriteError; import org.apache.cassandra.locator.InetAddressAndPort; -import org.apache.cassandra.net.*; +import org.apache.cassandra.net.Message; +import org.apache.cassandra.repair.SharedContext; import org.apache.cassandra.schema.Schema; +import org.apache.cassandra.schema.TableId; import org.apache.cassandra.service.paxos.Ballot; import org.apache.cassandra.service.paxos.PaxosState; import org.apache.cassandra.utils.Throwables; -import org.apache.cassandra.utils.concurrent.AsyncFuture; import org.apache.cassandra.utils.concurrent.IntrusiveStack; import static org.apache.cassandra.exceptions.RequestFailureReason.UNKNOWN; import static org.apache.cassandra.net.NoPayload.noPayload; -public class PaxosFinishPrepareCleanup extends AsyncFuture<Void> implements RequestCallbackWithFailure<Void> +/** + * Tracks the state of paxos repair cleanup work + */ +public class PaxosRepairState { - private final Set<InetAddressAndPort> waitingResponse; + private final SharedContext ctx; + private final AtomicReference<PendingCleanup> pendingCleanup = new AtomicReference<>(); + private final Map<UUID, PaxosCleanupSession> sessions = new ConcurrentHashMap<>(); + private final ConcurrentMap<TableId, PaxosTableRepairs> tableRepairsMap = new ConcurrentHashMap<>(); - PaxosFinishPrepareCleanup(Collection<InetAddressAndPort> endpoints) + public PaxosRepairState(SharedContext ctx) { - this.waitingResponse = new HashSet<>(endpoints); + this.ctx = ctx; } - public static PaxosFinishPrepareCleanup finish(Collection<InetAddressAndPort> endpoints, PaxosCleanupHistory result) + public static PaxosRepairState instance() { - PaxosFinishPrepareCleanup callback = new PaxosFinishPrepareCleanup(endpoints); - synchronized (callback) - { - Message<PaxosCleanupHistory> message = Message.out(Verb.PAXOS2_CLEANUP_FINISH_PREPARE_REQ, result); - for (InetAddressAndPort endpoint : endpoints) - MessagingService.instance().sendWithCallback(message, endpoint, callback); - } - return callback; + return Holder.instance; + } + + PaxosTableRepairs getForTable(TableId tableId) + { + return tableRepairsMap.computeIfAbsent(tableId, k -> new PaxosTableRepairs()); + } + + public void evictHungRepairs() + { + long deadline = ctx.clock().nanoTime() - TimeUnit.MINUTES.toNanos(5); + for (PaxosTableRepairs repairs : tableRepairsMap.values()) + repairs.evictHungRepairs(deadline); + } + + public void clearRepairs() + { + for (PaxosTableRepairs repairs : tableRepairsMap.values()) + repairs.clear(); } - @Override - public void onFailure(InetAddressAndPort from, RequestFailureReason reason) + + public void setSession(PaxosCleanupSession session) { - tryFailure(new PaxosCleanupException(reason + " failure response from " + from)); + Preconditions.checkState(!sessions.containsKey(session.session)); + sessions.put(session.session, session); } - public synchronized void onResponse(Message<Void> msg) + public void removeSession(PaxosCleanupSession session) { - if (isDone()) - return; + Preconditions.checkState(sessions.containsKey(session.session)); + sessions.remove(session.session); + } - if (!waitingResponse.remove(msg.from())) - throw new IllegalArgumentException("Received unexpected response from " + msg.from()); + public void finishSession(InetAddressAndPort from, PaxosCleanupResponse response) + { + PaxosCleanupSession session = sessions.get(response.session); + if (session != null) + session.finish(from, response); + } + + public void addCleanupHistory(Message<PaxosCleanupHistory> message) + { + PendingCleanup.add(ctx, pendingCleanup, message); + } - if (waitingResponse.isEmpty()) - trySuccess(null); + /** + * This is not required, but it helps to see what escapes simulation... can put a break point on instance() while running the tests + */ + private static class Holder + { + private static final PaxosRepairState instance = new PaxosRepairState(SharedContext.Global.instance); } static class PendingCleanup extends IntrusiveStack<PendingCleanup> { - private static final AtomicReference<PendingCleanup> pendingCleanup = new AtomicReference(); - private static final Runnable CLEANUP = () -> { + private final Message<PaxosCleanupHistory> message; + + PendingCleanup(Message<PaxosCleanupHistory> message) + { + this.message = message; + } + + private static void add(SharedContext ctx, AtomicReference<PendingCleanup> pendingCleanup, Message<PaxosCleanupHistory> message) + { + PendingCleanup next = new PendingCleanup(message); + PendingCleanup prev = IntrusiveStack.push(AtomicReference::get, AtomicReference::compareAndSet, pendingCleanup, next); + if (prev == null) + Stage.MISC.execute(() -> cleanup(ctx, pendingCleanup)); + } + + private static void cleanup(SharedContext ctx, AtomicReference<PendingCleanup> pendingCleanup) + { PendingCleanup list = pendingCleanup.getAndSet(null); if (list == null) return; @@ -106,7 +162,7 @@ public class PaxosFinishPrepareCleanup extends AsyncFuture<Void> implements Requ catch (Throwable t) { for (PendingCleanup pending : IntrusiveStack.iterable(list)) - MessagingService.instance().respondWithFailure(UNKNOWN, pending.message); + ctx.messaging().respondWithFailure(UNKNOWN, pending.message); throw t; } @@ -125,7 +181,7 @@ public class PaxosFinishPrepareCleanup extends AsyncFuture<Void> implements Requ if (failed == null) failed = Collections.newSetFromMap(new IdentityHashMap<>()); failed.add(pending); - MessagingService.instance().respondWithFailure(UNKNOWN, pending.message); + ctx.messaging().respondWithFailure(UNKNOWN, pending.message); } } @@ -135,7 +191,7 @@ public class PaxosFinishPrepareCleanup extends AsyncFuture<Void> implements Requ for (PendingCleanup pending : IntrusiveStack.iterable(list)) { if (failed == null || !failed.contains(pending)) - MessagingService.instance().respond(noPayload, pending.message); + ctx.messaging().respond(noPayload, pending.message); } } catch (Throwable t) @@ -144,26 +200,10 @@ public class PaxosFinishPrepareCleanup extends AsyncFuture<Void> implements Requ for (PendingCleanup pending : IntrusiveStack.iterable(list)) { if (failed == null || !failed.contains(pending)) - MessagingService.instance().respondWithFailure(UNKNOWN, pending.message); + ctx.messaging().respondWithFailure(UNKNOWN, pending.message); } } Throwables.maybeFail(fail); - }; - - final Message<PaxosCleanupHistory> message; - PendingCleanup(Message<PaxosCleanupHistory> message) - { - this.message = message; - } - - public static void add(Message<PaxosCleanupHistory> message) - { - PendingCleanup next = new PendingCleanup(message); - PendingCleanup prev = IntrusiveStack.push(AtomicReference::get, AtomicReference::compareAndSet, pendingCleanup, next); - if (prev == null) - Stage.MISC.execute(CLEANUP); } } - - public static final IVerbHandler<PaxosCleanupHistory> verbHandler = PendingCleanup::add; } diff --git a/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosStartPrepareCleanup.java b/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosStartPrepareCleanup.java index 9f30692ad4..12a319bcf4 100644 --- a/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosStartPrepareCleanup.java +++ b/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosStartPrepareCleanup.java @@ -36,10 +36,12 @@ import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.locator.InetAddressAndPort; -import org.apache.cassandra.net.*; +import org.apache.cassandra.net.IVerbHandler; +import org.apache.cassandra.net.Message; +import org.apache.cassandra.net.RequestCallbackWithFailure; +import org.apache.cassandra.repair.SharedContext; import org.apache.cassandra.schema.Schema; import org.apache.cassandra.schema.TableId; -import org.apache.cassandra.service.PendingRangeCalculatorService; import org.apache.cassandra.service.paxos.Ballot; import org.apache.cassandra.service.paxos.Commit; import org.apache.cassandra.service.paxos.PaxosRepairHistory; @@ -75,14 +77,14 @@ public class PaxosStartPrepareCleanup extends AsyncFuture<PaxosCleanupHistory> i * prepare message to prevent racing with gossip dissemination and guarantee that every repair participant is aware * of the pending ring change during repair. */ - public static PaxosStartPrepareCleanup prepare(TableId tableId, Collection<InetAddressAndPort> endpoints, EndpointState localEpState, Collection<Range<Token>> ranges) + public static PaxosStartPrepareCleanup prepare(SharedContext ctx, TableId tableId, Collection<InetAddressAndPort> endpoints, EndpointState localEpState, Collection<Range<Token>> ranges) { PaxosStartPrepareCleanup callback = new PaxosStartPrepareCleanup(tableId, endpoints); synchronized (callback) { Message<Request> message = Message.out(PAXOS2_CLEANUP_START_PREPARE_REQ, new Request(tableId, localEpState, ranges)); for (InetAddressAndPort endpoint : endpoints) - MessagingService.instance().sendWithCallback(message, endpoint, callback); + ctx.messaging().sendWithCallback(message, endpoint, callback); } return callback; } @@ -110,24 +112,24 @@ public class PaxosStartPrepareCleanup extends AsyncFuture<PaxosCleanupHistory> i trySuccess(new PaxosCleanupHistory(table, maxBallot, history)); } - private static void maybeUpdateTopology(InetAddressAndPort endpoint, EndpointState remote) + private static void maybeUpdateTopology(SharedContext ctx, InetAddressAndPort endpoint, EndpointState remote) { - EndpointState local = Gossiper.instance.getEndpointStateForEndpoint(endpoint); + EndpointState local = ctx.gossiper().getEndpointStateForEndpoint(endpoint); if (local == null || local.isSupersededBy(remote)) { logger.trace("updating endpoint info for {} with {}", endpoint, remote); Map<InetAddressAndPort, EndpointState> states = Collections.singletonMap(endpoint, remote); Gossiper.runInGossipStageBlocking(() -> { - Gossiper.instance.notifyFailureDetector(states); - Gossiper.instance.applyStateLocally(states); + ctx.gossiper().notifyFailureDetector(states); + ctx.gossiper().applyStateLocally(states); }); // TODO: We should also wait for schema pulls/pushes, however this would be quite an involved change to MigrationManager // (which currently drops some migration tasks on the floor). // Note it would be fine for us to fail to complete the migration task and simply treat this response as a failure/timeout. } // even if we have th latest gossip info, wait until pending range calculations are complete - PendingRangeCalculatorService.instance.blockUntilFinished(); + ctx.pendingRangeCalculator().blockUntilFinished(); } public static class Request @@ -181,12 +183,17 @@ public class PaxosStartPrepareCleanup extends AsyncFuture<PaxosCleanupHistory> i } } - public static final IVerbHandler<Request> verbHandler = in -> { - ColumnFamilyStore table = Schema.instance.getColumnFamilyStoreInstance(in.payload.tableId); - maybeUpdateTopology(in.from(), in.payload.epState); - Ballot highBound = newBallot(ballotTracker().getHighBound(), ConsistencyLevel.SERIAL); - PaxosRepairHistory history = table.getPaxosRepairHistoryForRanges(in.payload.ranges); - Message<PaxosCleanupHistory> out = in.responseWith(new PaxosCleanupHistory(table.metadata.id, highBound, history)); - MessagingService.instance().send(out, in.respondTo()); - }; + public static IVerbHandler<Request> createVerbHandler(SharedContext ctx) + { + return in -> { + ColumnFamilyStore table = Schema.instance.getColumnFamilyStoreInstance(in.payload.tableId); + maybeUpdateTopology(ctx, in.from(), in.payload.epState); + Ballot highBound = newBallot(ballotTracker().getHighBound(), ConsistencyLevel.SERIAL); + PaxosRepairHistory history = table.getPaxosRepairHistoryForRanges(in.payload.ranges); + Message<PaxosCleanupHistory> out = in.responseWith(new PaxosCleanupHistory(table.metadata.id, highBound, history)); + ctx.messaging().send(out, in.respondTo()); + }; + } + + public static final IVerbHandler<Request> verbHandler = createVerbHandler(SharedContext.Global.instance); } diff --git a/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosTableRepairs.java b/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosTableRepairs.java index 6da4e0bce1..cf7f547cb4 100644 --- a/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosTableRepairs.java +++ b/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosTableRepairs.java @@ -21,7 +21,6 @@ package org.apache.cassandra.service.paxos.cleanup; import java.util.ArrayDeque; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.function.Predicate; @@ -34,7 +33,6 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.db.ConsistencyLevel; import org.apache.cassandra.db.DecoratedKey; -import org.apache.cassandra.schema.TableId; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.service.paxos.AbstractPaxosRepair; import org.apache.cassandra.service.paxos.Ballot; @@ -42,7 +40,6 @@ import org.apache.cassandra.service.paxos.PaxosRepair; import org.apache.cassandra.utils.NoSpamLogger; import static org.apache.cassandra.service.paxos.Commit.isAfter; -import static org.apache.cassandra.utils.Clock.Global.nanoTime; /** * Coordinates repairs on a given key to prevent multiple repairs being scheduled for a single key @@ -211,25 +208,4 @@ public class PaxosTableRepairs implements AbstractPaxosRepair.Listener { return PaxosRepair.create(consistency, key, incompleteBallot, table); } - - private static final ConcurrentMap<TableId, PaxosTableRepairs> tableRepairsMap = new ConcurrentHashMap<>(); - - static PaxosTableRepairs getForTable(TableId tableId) - { - return tableRepairsMap.computeIfAbsent(tableId, k -> new PaxosTableRepairs()); - } - - public static void evictHungRepairs() - { - long deadline = nanoTime() - TimeUnit.MINUTES.toNanos(5); - for (PaxosTableRepairs repairs : tableRepairsMap.values()) - repairs.evictHungRepairs(deadline); - } - - public static void clearRepairs() - { - for (PaxosTableRepairs repairs : tableRepairsMap.values()) - repairs.clear(); - } - } diff --git a/src/java/org/apache/cassandra/service/paxos/uncommitted/PaxosUncommittedTracker.java b/src/java/org/apache/cassandra/service/paxos/uncommitted/PaxosUncommittedTracker.java index adbd537930..062de3cf59 100644 --- a/src/java/org/apache/cassandra/service/paxos/uncommitted/PaxosUncommittedTracker.java +++ b/src/java/org/apache/cassandra/service/paxos/uncommitted/PaxosUncommittedTracker.java @@ -46,7 +46,7 @@ import org.apache.cassandra.schema.SchemaConstants; import org.apache.cassandra.schema.TableId; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.service.StorageService; -import org.apache.cassandra.service.paxos.cleanup.PaxosTableRepairs; +import org.apache.cassandra.service.paxos.cleanup.PaxosRepairState; import org.apache.cassandra.utils.CloseableIterator; import static org.apache.cassandra.config.CassandraRelevantProperties.AUTO_REPAIR_FREQUENCY_SECONDS; @@ -322,7 +322,7 @@ public class PaxosUncommittedTracker { runAndLogException("file consolidation", this::consolidateFiles); runAndLogException("schedule auto repairs", this::schedulePaxosAutoRepairs); - runAndLogException("evict hung repairs", PaxosTableRepairs::evictHungRepairs); + runAndLogException("evict hung repairs", PaxosRepairState.instance()::evictHungRepairs); } public synchronized void startAutoRepairs() diff --git a/test/distributed/org/apache/cassandra/distributed/test/PaxosRepairTest.java b/test/distributed/org/apache/cassandra/distributed/test/PaxosRepairTest.java index dd1aa07e20..86b8a5199d 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/PaxosRepairTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/PaxosRepairTest.java @@ -62,6 +62,7 @@ import org.apache.cassandra.gms.ApplicationState; import org.apache.cassandra.gms.EndpointState; import org.apache.cassandra.gms.VersionedValue; import org.apache.cassandra.repair.RepairParallelism; +import org.apache.cassandra.repair.SharedContext; import org.apache.cassandra.repair.messages.RepairOption; import org.apache.cassandra.schema.Schema; import org.apache.cassandra.schema.TableMetadata; @@ -340,7 +341,7 @@ public class PaxosRepairTest extends TestBaseImpl List<InetAddressAndPort> endpoints = cluster.stream().map(IInstance::broadcastAddress).map(InetAddressAndPort::getByAddress).collect(Collectors.toList()); Future<?> cleanup = cluster.get(1).appliesOnInstance((List<? extends InetSocketAddress> es, ExecutorService exec)-> { TableMetadata metadata = Keyspace.open(KEYSPACE).getMetadata().getTableOrViewNullable(TABLE); - return PaxosCleanup.cleanup(es.stream().map(InetAddressAndPort::getByAddress).collect(Collectors.toSet()), metadata, StorageService.instance.getLocalRanges(KEYSPACE), false, exec); + return PaxosCleanup.cleanup(SharedContext.Global.instance, es.stream().map(InetAddressAndPort::getByAddress).collect(Collectors.toSet()), metadata, StorageService.instance.getLocalRanges(KEYSPACE), false, exec); }).apply(endpoints, executor); Uninterruptibles.awaitUninterruptibly(haveFetchedLowBound); @@ -404,7 +405,7 @@ public class PaxosRepairTest extends TestBaseImpl List<InetAddressAndPort> endpoints = cluster.stream().map(i -> InetAddressAndPort.getByAddress(i.broadcastAddress())).collect(Collectors.toList()); Future<?> cleanup = cluster.get(1).appliesOnInstance((List<? extends InetSocketAddress> es, ExecutorService exec)-> { TableMetadata metadata = Keyspace.open(KEYSPACE).getMetadata().getTableOrViewNullable(TABLE); - return PaxosCleanup.cleanup(es.stream().map(InetAddressAndPort::getByAddress).collect(Collectors.toSet()), metadata, StorageService.instance.getLocalRanges(KEYSPACE), false, exec); + return PaxosCleanup.cleanup(SharedContext.Global.instance, es.stream().map(InetAddressAndPort::getByAddress).collect(Collectors.toSet()), metadata, StorageService.instance.getLocalRanges(KEYSPACE), false, exec); }).apply(endpoints, executor); IMessageFilters.Filter dropAllTo1 = cluster.verbs(PAXOS2_PREPARE_REQ, PAXOS2_PROPOSE_REQ, PAXOS_COMMIT_REQ).from(2).to(1).outbound().drop(); @@ -483,7 +484,7 @@ public class PaxosRepairTest extends TestBaseImpl List<InetAddressAndPort> endpoints = cluster.stream().map(i -> InetAddressAndPort.getByAddress(i.broadcastAddress())).collect(Collectors.toList()); Future<?> cleanup = cluster.get(1).appliesOnInstance((List<? extends InetSocketAddress> es, ExecutorService exec)-> { TableMetadata metadata = Keyspace.open(KEYSPACE).getMetadata().getTableOrViewNullable(TABLE); - return PaxosCleanup.cleanup(es.stream().map(InetAddressAndPort::getByAddress).collect(Collectors.toSet()), metadata, StorageService.instance.getLocalRanges(KEYSPACE), false, exec); + return PaxosCleanup.cleanup(SharedContext.Global.instance, es.stream().map(InetAddressAndPort::getByAddress).collect(Collectors.toSet()), metadata, StorageService.instance.getLocalRanges(KEYSPACE), false, exec); }).apply(endpoints, executor); cleanup.get(); diff --git a/test/unit/org/apache/cassandra/repair/FuzzTestBase.java b/test/unit/org/apache/cassandra/repair/FuzzTestBase.java index c819cc465f..81b341bed9 100644 --- a/test/unit/org/apache/cassandra/repair/FuzzTestBase.java +++ b/test/unit/org/apache/cassandra/repair/FuzzTestBase.java @@ -50,7 +50,6 @@ import javax.annotation.Nullable; import com.google.common.collect.Iterables; import com.google.common.collect.Maps; -import org.apache.cassandra.config.UnitConfigOverride; import org.junit.Before; import org.junit.BeforeClass; @@ -58,7 +57,6 @@ import accord.utils.DefaultRandom; import accord.utils.Gen; import accord.utils.Gens; import accord.utils.RandomSource; -import org.agrona.collections.Long2ObjectHashMap; import org.agrona.collections.LongHashSet; import org.apache.cassandra.concurrent.ExecutorBuilder; import org.apache.cassandra.concurrent.ExecutorBuilderFactory; @@ -71,6 +69,7 @@ import org.apache.cassandra.concurrent.SequentialExecutorPlus; import org.apache.cassandra.concurrent.SimulatedExecutorFactory; import org.apache.cassandra.concurrent.Stage; import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.config.UnitConfigOverride; import org.apache.cassandra.cql3.CQLTester; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Digest; @@ -119,7 +118,15 @@ import org.apache.cassandra.schema.TableId; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.schema.Tables; import org.apache.cassandra.service.ActiveRepairService; +import org.apache.cassandra.service.PendingRangeCalculatorService; import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.service.paxos.cleanup.PaxosCleanupComplete; +import org.apache.cassandra.service.paxos.cleanup.PaxosCleanupHistory; +import org.apache.cassandra.service.paxos.cleanup.PaxosCleanupRequest; +import org.apache.cassandra.service.paxos.cleanup.PaxosCleanupResponse; +import org.apache.cassandra.service.paxos.cleanup.PaxosRepairState; +import org.apache.cassandra.service.paxos.cleanup.PaxosFinishPrepareCleanup; +import org.apache.cassandra.service.paxos.cleanup.PaxosStartPrepareCleanup; import org.apache.cassandra.streaming.StreamEventHandler; import org.apache.cassandra.streaming.StreamReceiveException; import org.apache.cassandra.streaming.StreamSession; @@ -339,6 +346,12 @@ public abstract class FuzzTestBase extends CQLTester.InMemory // these messages are not resilent to ephemeral issues case STATUS_REQ: case STATUS_RSP: + // paxos repair does not support faults and will cause a TIMEOUT error, failing the repair + case PAXOS2_CLEANUP_COMPLETE_REQ: + case PAXOS2_CLEANUP_REQ: + case PAXOS2_CLEANUP_RSP2: + case PAXOS2_CLEANUP_START_PREPARE_REQ: + case PAXOS2_CLEANUP_FINISH_PREPARE_REQ: noFaults.add(message.id()); return Faults.NONE; default: @@ -654,6 +667,7 @@ public abstract class FuzzTestBase extends CQLTester.InMemory // We run tests in an isolated JVM per class, so not cleaing up is safe... but if that assumption ever changes, will need to cleanup Stage.ANTI_ENTROPY.unsafeSetExecutor(orderedExecutor); + Stage.MISC.unsafeSetExecutor(orderedExecutor); Stage.INTERNAL_RESPONSE.unsafeSetExecutor(unorderedScheduled); Mockito.when(failureDetector.isAlive(Mockito.any())).thenReturn(true); Thread expectedThread = Thread.currentThread(); @@ -776,10 +790,46 @@ public abstract class FuzzTestBase extends CQLTester.InMemory } } + private static class CallbackKey + { + private final long id; + private final InetAddressAndPort peer; + + private CallbackKey(long id, InetAddressAndPort peer) + { + this.id = id; + this.peer = peer; + } + + @Override + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + CallbackKey that = (CallbackKey) o; + return id == that.id && peer.equals(that.peer); + } + + @Override + public int hashCode() + { + return Objects.hash(id, peer); + } + + @Override + public String toString() + { + return "CallbackKey{" + + "id=" + id + + ", peer=" + peer + + '}'; + } + } + private class Messaging implements MessageDelivery { final InetAddressAndPort broadcastAddressAndPort; - final Long2ObjectHashMap<CallbackContext> callbacks = new Long2ObjectHashMap<>(); + final Map<CallbackKey, CallbackContext> callbacks = new HashMap<>(); private Messaging(InetAddressAndPort broadcastAddressAndPort) { @@ -812,10 +862,11 @@ public abstract class FuzzTestBase extends CQLTester.InMemory CallbackContext cb; if (callback != null) { - if (callbacks.containsKey(message.id())) - throw new AssertionError("Message id " + message.id() + " already has a callback"); + CallbackKey key = new CallbackKey(message.id(), to); + if (callbacks.containsKey(key)) + throw new AssertionError("Message id " + message.id() + " to " + to + " already has a callback"); cb = new CallbackContext(callback); - callbacks.put(message.id(), cb); + callbacks.put(key, cb); } else { @@ -861,7 +912,7 @@ public abstract class FuzzTestBase extends CQLTester.InMemory if (cb != null) { unorderedScheduled.schedule(() -> { - CallbackContext ctx = callbacks.remove(message.id()); + CallbackContext ctx = callbacks.remove(new CallbackKey(message.id(), to)); if (ctx != null) { assert ctx == cb; @@ -952,6 +1003,21 @@ public abstract class FuzzTestBase extends CQLTester.InMemory { return endpoints.get(ep); } + + @Override + public void notifyFailureDetector(Map<InetAddressAndPort, EndpointState> remoteEpStateMap) + { + + } + + @Override + public void applyStateLocally(Map<InetAddressAndPort, EndpointState> epStateMap) + { + // If we were testing paxos this would be wrong... + // CASSANDRA-18917 added support for simulating Gossip, but gossip issues were found so couldn't merge that patch... + // For the paxos repair, since we don't care about paxos messages, this is ok to no-op for now, but if paxos cleanup + // ever was to be tested this logic would need to be implemented + } } class Node implements SharedContext @@ -965,6 +1031,7 @@ public abstract class FuzzTestBase extends CQLTester.InMemory final Messaging messaging; final IValidationManager validationManager; private FailingBiConsumer<ColumnFamilyStore, Validator> doValidation = DEFAULT_VALIDATION; + final PaxosRepairState paxosRepairState; private final StreamExecutor defaultStreamExecutor = plan -> { long delayNanos = rs.nextLong(TimeUnit.SECONDS.toNanos(5), TimeUnit.MINUTES.toNanos(10)); unorderedScheduled.schedule(() -> { @@ -983,6 +1050,7 @@ public abstract class FuzzTestBase extends CQLTester.InMemory this.tokens = tokens; this.messaging = messaging; this.activeRepairService = new ActiveRepairService(this); + this.paxosRepairState = new PaxosRepairState(this); this.validationManager = (cfs, validator) -> unorderedScheduled.submit(() -> { try { @@ -993,7 +1061,39 @@ public abstract class FuzzTestBase extends CQLTester.InMemory validator.fail(e); } }); - this.verbHandler = new RepairMessageVerbHandler(this); + this.verbHandler = new IVerbHandler<>() + { + private final RepairMessageVerbHandler repairVerbHandler = new RepairMessageVerbHandler(Node.this); + private final IVerbHandler<PaxosStartPrepareCleanup.Request> paxosStartPrepareCleanup = PaxosStartPrepareCleanup.createVerbHandler(Node.this); + private final IVerbHandler<PaxosCleanupRequest> paxosCleanupRequestIVerbHandler = PaxosCleanupRequest.createVerbHandler(Node.this); + private final IVerbHandler<PaxosCleanupHistory> paxosFinishPrepareCleanup = PaxosFinishPrepareCleanup.createVerbHandler(Node.this); + private final IVerbHandler<PaxosCleanupResponse> paxosCleanupResponse = PaxosCleanupResponse.createVerbHandler(Node.this); + private final IVerbHandler<PaxosCleanupComplete.Request> paxosCleanupComplete = PaxosCleanupComplete.createVerbHandler(Node.this); + @Override + public void doVerb(Message message) throws IOException + { + switch (message.verb()) + { + case PAXOS2_CLEANUP_START_PREPARE_REQ: + paxosStartPrepareCleanup.doVerb(message); + break; + case PAXOS2_CLEANUP_REQ: + paxosCleanupRequestIVerbHandler.doVerb(message); + break; + case PAXOS2_CLEANUP_FINISH_PREPARE_REQ: + paxosFinishPrepareCleanup.doVerb(message); + break; + case PAXOS2_CLEANUP_RSP2: + paxosCleanupResponse.doVerb(message); + break; + case PAXOS2_CLEANUP_COMPLETE_REQ: + paxosCleanupComplete.doVerb(message); + break; + default: + repairVerbHandler.doVerb(message); + } + } + }; activeRepairService.start(); } @@ -1036,10 +1136,12 @@ public abstract class FuzzTestBase extends CQLTester.InMemory if (msg.verb().isResponse()) { // handle callbacks - if (messaging.callbacks.containsKey(msg.id())) + CallbackKey key = new CallbackKey(msg.id(), msg.from()); + if (messaging.callbacks.containsKey(key)) { - CallbackContext callback = messaging.callbacks.remove(msg.id()); - if (callback == null) return; + CallbackContext callback = messaging.callbacks.remove(key); + if (callback == null) + return; try { if (msg.isFailureResponse()) @@ -1114,6 +1216,18 @@ public abstract class FuzzTestBase extends CQLTester.InMemory return unorderedScheduled; } + @Override + public ScheduledExecutorPlus nonPeriodicTasks() + { + return unorderedScheduled; + } + + @Override + public ScheduledExecutorPlus scheduledTasks() + { + return unorderedScheduled; + } + @Override public Supplier<Random> random() { @@ -1198,6 +1312,18 @@ public abstract class FuzzTestBase extends CQLTester.InMemory { return streamExecutor; } + + @Override + public PendingRangeCalculatorService pendingRangeCalculator() + { + return PendingRangeCalculatorService.instance; + } + + @Override + public PaxosRepairState paxosRepairState() + { + return paxosRepairState; + } } private Message serde(Message msg) @@ -1285,6 +1411,10 @@ public abstract class FuzzTestBase extends CQLTester.InMemory next = it.next(); } if (FuzzTestBase.class.getName().equals(next.getClassName())) return Access.MAIN_THREAD_ONLY; + // this is non-deterministic... but since the scope of the work is testing repair and not paxos... this is unblocked for now... + if (("org.apache.cassandra.service.paxos.Paxos".equals(next.getClassName()) && "newBallot".equals(next.getMethodName())) + || ("org.apache.cassandra.service.paxos.uncommitted.PaxosBallotTracker".equals(next.getClassName()) && "updateLowBound".equals(next.getMethodName()))) + return Access.MAIN_THREAD_ONLY; if (next.getClassName().startsWith("org.apache.cassandra.db.") || next.getClassName().startsWith("org.apache.cassandra.gms.") || next.getClassName().startsWith("org.apache.cassandra.cql3.") || next.getClassName().startsWith("org.apache.cassandra.metrics.") || next.getClassName().startsWith("org.apache.cassandra.utils.concurrent.") || next.getClassName().startsWith("org.apache.cassandra.utils.TimeUUID") // this would be good to solve || next.getClassName().startsWith(PendingAntiCompaction.class.getName())) diff --git a/test/unit/org/apache/cassandra/repair/RepairJobTest.java b/test/unit/org/apache/cassandra/repair/RepairJobTest.java index 165df1bbc9..872ee99abb 100644 --- a/test/unit/org/apache/cassandra/repair/RepairJobTest.java +++ b/test/unit/org/apache/cassandra/repair/RepairJobTest.java @@ -40,6 +40,7 @@ import com.google.common.util.concurrent.ListenableFuture; import org.apache.cassandra.repair.messages.SyncResponse; import org.apache.cassandra.repair.messages.ValidationResponse; +import org.apache.cassandra.service.paxos.cleanup.PaxosRepairState; import org.assertj.core.api.Assertions; import org.junit.After; import org.junit.Before; @@ -67,7 +68,6 @@ import org.apache.cassandra.service.ActiveRepairService; import org.apache.cassandra.service.paxos.Paxos; import org.apache.cassandra.service.paxos.cleanup.PaxosCleanupRequest; import org.apache.cassandra.service.paxos.cleanup.PaxosCleanupResponse; -import org.apache.cassandra.service.paxos.cleanup.PaxosCleanupSession; import org.apache.cassandra.streaming.PreviewKind; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; @@ -875,7 +875,7 @@ public class RepairJobTest if (message.verb() == PAXOS2_CLEANUP_REQ) { PaxosCleanupRequest request = (PaxosCleanupRequest) message.payload; - PaxosCleanupSession.finishSession(to, new PaxosCleanupResponse(request.session, true, null)); + PaxosRepairState.instance().finishSession(to, new PaxosCleanupResponse(request.session, true, null)); return false; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org