This is an automated email from the ASF dual-hosted git repository. dcapwell pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit c9625e0102dab66f41d3ef2338c54d499e73a8c5 Merge: c2a78639de ca0b77d743 Author: David Capwell <dcapw...@apache.org> AuthorDate: Tue Mar 26 13:04:03 2024 -0700 Merge branch 'cassandra-5.0' into trunk src/java/org/apache/cassandra/gms/Gossiper.java | 2 + src/java/org/apache/cassandra/gms/IGossiper.java | 3 + .../org/apache/cassandra/net/MessageDelivery.java | 4 + .../org/apache/cassandra/net/MessagingService.java | 5 - .../org/apache/cassandra/repair/RepairJob.java | 2 +- .../org/apache/cassandra/repair/SharedContext.java | 40 +++++ .../cassandra/service/ActiveRepairService.java | 4 +- .../apache/cassandra/service/StorageService.java | 7 +- .../org/apache/cassandra/service/paxos/Paxos.java | 12 +- .../cassandra/service/paxos/PaxosRepair.java | 9 +- .../service/paxos/cleanup/PaxosCleanup.java | 34 ++--- .../paxos/cleanup/PaxosCleanupComplete.java | 28 ++-- .../cleanup/PaxosCleanupLocalCoordinator.java | 22 +-- .../service/paxos/cleanup/PaxosCleanupRequest.java | 70 ++++----- .../paxos/cleanup/PaxosCleanupResponse.java | 8 +- .../service/paxos/cleanup/PaxosCleanupSession.java | 54 +++---- .../paxos/cleanup/PaxosFinishPrepareCleanup.java | 112 ++------------ ...shPrepareCleanup.java => PaxosRepairState.java} | 138 +++++++++++------- .../paxos/cleanup/PaxosStartPrepareCleanup.java | 38 +++-- .../service/paxos/cleanup/PaxosTableRepairs.java | 24 --- .../paxos/uncommitted/PaxosUncommittedTracker.java | 4 +- .../org/apache/cassandra/tcm/ClusterMetadata.java | 7 +- .../distributed/test/PaxosRepairTest.java | 7 +- .../org/apache/cassandra/repair/FuzzTestBase.java | 162 ++++++++++++++++++--- .../org/apache/cassandra/repair/RepairJobTest.java | 4 +- 25 files changed, 459 insertions(+), 341 deletions(-) diff --cc src/java/org/apache/cassandra/gms/Gossiper.java index 21d4ab5b8f,d907f76686..35cf57d3e1 --- a/src/java/org/apache/cassandra/gms/Gossiper.java +++ b/src/java/org/apache/cassandra/gms/Gossiper.java @@@ -1058,6 -1317,18 +1058,7 @@@ public class Gossiper implements IFailu return reqdEndpointState; } - /** - * determine which endpoint started up earlier - */ - public int compareEndpointStartup(InetAddressAndPort addr1, InetAddressAndPort addr2) - { - EndpointState ep1 = getEndpointStateForEndpoint(addr1); - EndpointState ep2 = getEndpointStateForEndpoint(addr2); - assert ep1 != null && ep2 != null; - return ep1.getHeartBeatState().getGeneration() - ep2.getHeartBeatState().getGeneration(); - } - + @Override public void notifyFailureDetector(Map<InetAddressAndPort, EndpointState> remoteEpStateMap) { for (Entry<InetAddressAndPort, EndpointState> entry : remoteEpStateMap.entrySet()) diff --cc src/java/org/apache/cassandra/net/MessagingService.java index ceae703097,94586b41c8..d1e2f7b260 --- a/src/java/org/apache/cassandra/net/MessagingService.java +++ b/src/java/org/apache/cassandra/net/MessagingService.java @@@ -472,41 -449,8 +472,36 @@@ public class MessagingService extends M send(message.responseWith(response), message.respondTo()); } + public <RSP> Future<RSP> sendWithResponse(InetAddressAndPort to, Message<?> msg) + { + Promise<RSP> future = AsyncPromise.uncancellable(); + MessagingService.instance().sendWithCallback(msg, to, + new RequestCallback<RSP>() + { + @Override + public void onResponse(Message<RSP> msg) + { + future.setSuccess(msg.payload); + } + + @Override + public void onFailure(InetAddressAndPort from, RequestFailureReason failureReason) + { + future.setFailure(new RuntimeException(failureReason.toString())); + } + }); + + return future; + } + - public <V> void respondWithFailure(RequestFailureReason reason, Message<?> message) - { - send(Message.failureResponse(message.id(), message.expiresAtNanos(), reason), message.respondTo()); - } - public void send(Message message, InetAddressAndPort to, ConnectionType specifyConnection) { + if (isShuttingDown) + { + logger.error("Cannot send the message {} to {}, as messaging service is shutting down", message, to); + return; + } + if (logger.isTraceEnabled()) { logger.trace("{} sending {} to {}@{}", FBUtilities.getBroadcastAddressAndPort(), message.verb(), message.id(), to); diff --cc src/java/org/apache/cassandra/repair/SharedContext.java index 8ccc88f584,440da2cf45..6c13ae99c9 --- a/src/java/org/apache/cassandra/repair/SharedContext.java +++ b/src/java/org/apache/cassandra/repair/SharedContext.java @@@ -37,6 -37,8 +37,7 @@@ import org.apache.cassandra.locator.Ine import org.apache.cassandra.net.MessageDelivery; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.service.ActiveRepairService; -import org.apache.cassandra.service.PendingRangeCalculatorService; + import org.apache.cassandra.service.paxos.cleanup.PaxosRepairState; import org.apache.cassandra.streaming.StreamPlan; import org.apache.cassandra.utils.Clock; import org.apache.cassandra.utils.FBUtilities; @@@ -77,6 -81,8 +80,7 @@@ public interface SharedContex IValidationManager validationManager(); TableRepairManager repairManager(ColumnFamilyStore store); StreamExecutor streamExecutor(); - PendingRangeCalculatorService pendingRangeCalculator(); + PaxosRepairState paxosRepairState(); class Global implements SharedContext { @@@ -171,6 -189,18 +187,12 @@@ { return StreamPlan::execute; } + - @Override - public PendingRangeCalculatorService pendingRangeCalculator() - { - return PendingRangeCalculatorService.instance; - } - + @Override + public PaxosRepairState paxosRepairState() + { + return PaxosRepairState.instance(); + } } class ForwardingSharedContext implements SharedContext @@@ -276,5 -318,17 +310,11 @@@ { return delegate().streamExecutor(); } + - @Override - public PendingRangeCalculatorService pendingRangeCalculator() - { - return delegate().pendingRangeCalculator(); - } - + @Override + public PaxosRepairState paxosRepairState() + { + return delegate().paxosRepairState(); + } } } diff --cc src/java/org/apache/cassandra/service/ActiveRepairService.java index 6b2e814185,e120122c08..a7252eca2b --- a/src/java/org/apache/cassandra/service/ActiveRepairService.java +++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java @@@ -1157,15 -1128,16 +1157,15 @@@ public class ActiveRepairService implem "Skipping this check can lead to paxos correctness issues", range, ksName, reason, downEndpoints, SKIP_PAXOS_REPAIR_ON_TOPOLOGY_CHANGE.getKey(), SKIP_PAXOS_REPAIR_ON_TOPOLOGY_CHANGE_KEYSPACES.getKey())); } - EndpointsForToken pending = StorageService.instance.getTokenMetadata().pendingEndpointsForToken(range.right, ksName); - if (pending.size() > 1 && !PAXOS_REPAIR_ALLOW_MULTIPLE_PENDING_UNSAFE.getBoolean()) + // todo: can probably be removed with TrM + if (ClusterMetadata.current().hasPendingRangesFor(keyspace.getMetadata(), range.right) && PAXOS_REPAIR_ALLOW_MULTIPLE_PENDING_UNSAFE.getBoolean()) { - throw new RuntimeException(String.format("Cannot begin paxos auto repair for %s in %s.%s, multiple pending endpoints exist for range (%s). " + + throw new RuntimeException(String.format("Cannot begin paxos auto repair for %s in %s.%s, multiple pending endpoints exist for range (metadata = %s). " + "Set -D%s=true to skip this check", - range, table.keyspace, table.name, pending, PAXOS_REPAIR_ALLOW_MULTIPLE_PENDING_UNSAFE.getKey())); + range, table.keyspace, table.name, ClusterMetadata.current(), PAXOS_REPAIR_ALLOW_MULTIPLE_PENDING_UNSAFE.getKey())); } - futures.add(() -> PaxosCleanup.cleanup(endpoints, table, Collections.singleton(range), false, repairCommandExecutor())); - Future<Void> future = PaxosCleanup.cleanup(ctx, endpoints, table, Collections.singleton(range), false, repairCommandExecutor()); - futures.add(future); ++ futures.add(() -> PaxosCleanup.cleanup(ctx, endpoints, table, Collections.singleton(range), false, repairCommandExecutor())); } } diff --cc src/java/org/apache/cassandra/service/StorageService.java index 36dabcd807,cbdadee1c3..928be162a0 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@@ -154,8 -175,12 +154,9 @@@ import org.apache.cassandra.locator.Sys import org.apache.cassandra.metrics.Sampler; import org.apache.cassandra.metrics.SamplingManager; import org.apache.cassandra.metrics.StorageMetrics; -import org.apache.cassandra.net.AsyncOneResponse; -import org.apache.cassandra.net.Message; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.repair.RepairCoordinator; + import org.apache.cassandra.repair.SharedContext; import org.apache.cassandra.repair.messages.RepairOption; import org.apache.cassandra.schema.CompactionParams.TombstoneOption; import org.apache.cassandra.schema.KeyspaceMetadata; @@@ -3332,8 -4863,8 +3333,8 @@@ public class StorageService extends Not if (table == null) return ImmediateFuture.success(null); - List<Range<Token>> ranges = getLocalAndPendingRanges(table.keyspace); + Collection<Range<Token>> ranges = getLocalAndPendingRanges(table.keyspace); - PaxosCleanupLocalCoordinator coordinator = PaxosCleanupLocalCoordinator.createForAutoRepair(tableId, ranges); + PaxosCleanupLocalCoordinator coordinator = PaxosCleanupLocalCoordinator.createForAutoRepair(SharedContext.Global.instance, tableId, ranges); ScheduledExecutors.optionalTasks.submit(coordinator::start); return coordinator; } diff --cc src/java/org/apache/cassandra/service/paxos/Paxos.java index 99570ba917,473b5741ad..f3938f1ccb --- a/src/java/org/apache/cassandra/service/paxos/Paxos.java +++ b/src/java/org/apache/cassandra/service/paxos/Paxos.java @@@ -87,12 -86,9 +88,12 @@@ import org.apache.cassandra.service.CAS import org.apache.cassandra.service.ClientState; import org.apache.cassandra.service.FailureRecordingCallback.AsMap; import org.apache.cassandra.service.paxos.Commit.Proposal; + import org.apache.cassandra.service.paxos.cleanup.PaxosRepairState; +import org.apache.cassandra.tcm.ClusterMetadata; import org.apache.cassandra.service.reads.DataResolver; import org.apache.cassandra.service.reads.repair.NoopReadRepair; - import org.apache.cassandra.service.paxos.cleanup.PaxosTableRepairs; +import org.apache.cassandra.tcm.Epoch; +import org.apache.cassandra.tcm.membership.NodeId; import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.triggers.TriggerExecutor; import org.apache.cassandra.utils.CassandraVersion; @@@ -394,43 -385,21 +395,48 @@@ public class Paxo return electorateNatural; } - static Participants get(TableMetadata table, Token token, ConsistencyLevel consistencyForConsensus) + @Override + public boolean stillAppliesTo(ClusterMetadata newMetadata) { - return get(table, token, consistencyForConsensus, FailureDetector.isReplicaAlive); + if (newMetadata.epoch.equals(epoch)) + return true; + + Participants newParticipants = recompute.apply(newMetadata); + return newParticipants.electorate.equals(electorate); + } + + @Override + public void collectSuccess(InetAddressAndPort inetAddressAndPort) + { + + } + + @Override + public void collectFailure(InetAddressAndPort inetAddressAndPort, RequestFailureReason t) + { + } - static Participants get(TableMetadata table, Token token, ConsistencyLevel consistencyForConsensus, Predicate<Replica> isReplicaAlive) + static Participants get(ClusterMetadata metadata, TableMetadata table, Token token, ConsistencyLevel consistencyForConsensus) + { - Keyspace keyspace = Keyspace.open(table.keyspace); - ReplicaLayout.ForTokenWrite all = forTokenWriteLiveAndDown(keyspace, token); ++ return get(metadata, table, token, consistencyForConsensus, FailureDetector.isReplicaAlive); ++ } ++ ++ static Participants get(ClusterMetadata metadata, TableMetadata table, Token token, ConsistencyLevel consistencyForConsensus, Predicate<Replica> isReplicaAlive) + { + KeyspaceMetadata keyspaceMetadata = metadata.schema.getKeyspaceMetadata(table.keyspace); + ReplicaLayout.ForTokenWrite all = forTokenWriteLiveAndDown(keyspaceMetadata, token); ReplicaLayout.ForTokenWrite electorate = consistencyForConsensus.isDatacenterLocal() ? all.filter(InOurDc.replicas()) : all; - EndpointsForToken live = all.all().filter(FailureDetector.isReplicaAlive); + EndpointsForToken live = all.all().filter(isReplicaAlive); + return new Participants(metadata.epoch, Keyspace.open(table.keyspace), consistencyForConsensus, all, electorate, live, + (cm) -> get(cm, table, token, consistencyForConsensus)); + } - return new Participants(keyspace, consistencyForConsensus, all, electorate, live); + static Participants get(TableMetadata table, Token token, ConsistencyLevel consistencyForConsensus) + { + return get(ClusterMetadata.current(), table, token, consistencyForConsensus); } static Participants get(TableMetadata cfm, DecoratedKey key, ConsistencyLevel consistency) diff --cc src/java/org/apache/cassandra/service/paxos/PaxosRepair.java index 368c16211c,ae5bc557c7..fd220f83b7 --- a/src/java/org/apache/cassandra/service/paxos/PaxosRepair.java +++ b/src/java/org/apache/cassandra/service/paxos/PaxosRepair.java @@@ -667,16 -666,15 +668,16 @@@ public class PaxosRepair extends Abstra return result; } - static boolean validatePeerCompatibility(TableMetadata table, Range<Token> range) + static boolean validatePeerCompatibility(SharedContext ctx, TableMetadata table, Range<Token> range) { - Participants participants = Participants.get(table, range.right, ConsistencyLevel.SERIAL, r -> ctx.failureDetector().isAlive(r.endpoint())); - return Iterables.all(participants.all, r -> validatePeerCompatibility(ctx.gossiper(), r)); + ClusterMetadata metadata = ClusterMetadata.current(); - Participants participants = Participants.get(table, range.right, ConsistencyLevel.SERIAL); ++ Participants participants = Participants.get(metadata, table, range.right, ConsistencyLevel.SERIAL, r -> ctx.failureDetector().isAlive(r.endpoint())); + return Iterables.all(participants.all, (participant) -> validatePeerCompatibility(metadata, participant)); } - public static boolean validatePeerCompatibility(TableMetadata table, Collection<Range<Token>> ranges) + public static boolean validatePeerCompatibility(SharedContext ctx, TableMetadata table, Collection<Range<Token>> ranges) { - return Iterables.all(ranges, range -> validatePeerCompatibility(table, range)); + return Iterables.all(ranges, range -> validatePeerCompatibility(ctx, table, range)); } public static void shutdownAndWait(long timeout, TimeUnit units) throws InterruptedException, TimeoutException diff --cc src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanup.java index c71577d389,7b4163e2a2..feaa64bd1e --- a/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanup.java +++ b/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanup.java @@@ -24,7 -25,7 +24,6 @@@ import java.util.function.Consumer import com.google.common.base.Preconditions; import com.google.common.collect.Iterables; -- import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@@ -33,8 -33,9 +31,8 @@@ import org.apache.cassandra.db.Keyspace import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.gms.EndpointState; - import org.apache.cassandra.gms.Gossiper; import org.apache.cassandra.locator.InetAddressAndPort; -import org.apache.cassandra.locator.RangesAtEndpoint; + import org.apache.cassandra.repair.SharedContext; import org.apache.cassandra.schema.Schema; import org.apache.cassandra.schema.TableId; import org.apache.cassandra.schema.TableMetadata; @@@ -114,10 -116,21 +113,11 @@@ public class PaxosCleanup extends Async executor.execute(complete); } - private static boolean isOutOfRange(String ksName, Collection<Range<Token>> repairRanges) + private static boolean isOutOfRange(SharedContext ctx, String ksName, Collection<Range<Token>> repairRanges) { Keyspace keyspace = Keyspace.open(ksName); - Collection<Range<Token>> localRanges = Range.normalize(ClusterMetadata.current().localWriteRanges(keyspace.getMetadata())); - List<Range<Token>> localRanges = Range.normalize(keyspace.getReplicationStrategy() - .getAddressReplicas() - .get(ctx.broadcastAddressAndPort()) - .ranges()); - - RangesAtEndpoint pendingRanges = StorageService.instance.getTokenMetadata().getPendingRanges(ksName, ctx.broadcastAddressAndPort()); - if (!pendingRanges.isEmpty()) - { - localRanges.addAll(pendingRanges.ranges()); - localRanges = Range.normalize(localRanges); - } ++ Collection<Range<Token>> localRanges = Range.normalize(ClusterMetadata.current().writeRanges(keyspace.getMetadata(), ctx.broadcastAddressAndPort())); + for (Range<Token> repairRange : Range.normalize(repairRanges)) { if (!Iterables.any(localRanges, localRange -> localRange.contains(repairRange))) diff --cc src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanupRequest.java index 33d6fbd173,2dbbc58d69..2eaeaf2537 --- a/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanupRequest.java +++ b/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanupRequest.java @@@ -38,12 -38,10 +38,12 @@@ import org.apache.cassandra.io.util.Dat import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.net.IVerbHandler; import org.apache.cassandra.net.Message; + import org.apache.cassandra.repair.SharedContext; import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.tcm.ClusterMetadataService; import org.apache.cassandra.utils.UUIDSerializer; - import static org.apache.cassandra.net.MessagingService.instance; import static org.apache.cassandra.net.NoPayload.noPayload; import static org.apache.cassandra.net.Verb.PAXOS2_CLEANUP_RSP2; @@@ -70,43 -68,45 +70,47 @@@ public class PaxosCleanupReques this.ranges = rangesOrMin(ranges); } - public static final IVerbHandler<PaxosCleanupRequest> verbHandler = in -> { - PaxosCleanupRequest request = in.payload; - - if (!PaxosCleanup.isInRangeAndShouldProcess(request.ranges, request.tableId)) - { - // Try catching up, in case it's us - ClusterMetadataService.instance().fetchLogFromPeerOrCMSAsync(ClusterMetadata.current(), in.from(),in.epoch()); - - String msg = String.format("Rejecting cleanup request %s from %s. Some ranges are not replicated (%s)", - request.session, in.from(), request.ranges); - Message<PaxosCleanupResponse> response = Message.out(PAXOS2_CLEANUP_RSP2, PaxosCleanupResponse.failed(request.session, msg)); - instance().send(response, in.respondTo()); - return; - } - - PaxosCleanupLocalCoordinator coordinator = PaxosCleanupLocalCoordinator.create(request); - - coordinator.addCallback(new FutureCallback<PaxosCleanupResponse>() - { - public void onSuccess(@Nullable PaxosCleanupResponse finished) - { - Message<PaxosCleanupResponse> response = Message.out(PAXOS2_CLEANUP_RSP2, coordinator.getNow()); - instance().send(response, in.respondTo()); - } + public static IVerbHandler<PaxosCleanupRequest> createVerbHandler(SharedContext ctx) + { + return in -> { + PaxosCleanupRequest request = in.payload; - public void onFailure(Throwable throwable) + if (!PaxosCleanup.isInRangeAndShouldProcess(ctx, request.ranges, request.tableId)) { - Message<PaxosCleanupResponse> response = Message.out(PAXOS2_CLEANUP_RSP2, PaxosCleanupResponse.failed(request.session, throwable.getMessage())); - instance().send(response, in.respondTo()); ++ // Try catching up, in case it's us ++ ClusterMetadataService.instance().fetchLogFromPeerOrCMSAsync(ClusterMetadata.current(), in.from(),in.epoch()); ++ + String msg = String.format("Rejecting cleanup request %s from %s. Some ranges are not replicated (%s)", + request.session, in.from(), request.ranges); + Message<PaxosCleanupResponse> response = Message.out(PAXOS2_CLEANUP_RSP2, PaxosCleanupResponse.failed(request.session, msg)); + ctx.messaging().send(response, in.respondTo()); + return; } - }); - // ack the request so the coordinator knows we've started - instance().respond(noPayload, in); + PaxosCleanupLocalCoordinator coordinator = PaxosCleanupLocalCoordinator.create(ctx, request); - coordinator.start(); - }; - coordinator.addCallback(new FutureCallback<>() ++ coordinator.addCallback(new FutureCallback<PaxosCleanupResponse>() + { + public void onSuccess(@Nullable PaxosCleanupResponse finished) + { + Message<PaxosCleanupResponse> response = Message.out(PAXOS2_CLEANUP_RSP2, coordinator.getNow()); + ctx.messaging().send(response, in.respondTo()); + } + + public void onFailure(Throwable throwable) + { + Message<PaxosCleanupResponse> response = Message.out(PAXOS2_CLEANUP_RSP2, PaxosCleanupResponse.failed(request.session, throwable.getMessage())); + ctx.messaging().send(response, in.respondTo()); + } + }); + + // ack the request so the coordinator knows we've started + ctx.messaging().respond(noPayload, in); + + coordinator.start(); + }; + } - + public static final IVerbHandler<PaxosCleanupRequest> verbHandler = createVerbHandler(SharedContext.Global.instance); public static final IVersionedSerializer<PaxosCleanupRequest> serializer = new IVersionedSerializer<PaxosCleanupRequest>() { diff --cc src/java/org/apache/cassandra/tcm/ClusterMetadata.java index 3224b29466,0000000000..cc1636bf14 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/tcm/ClusterMetadata.java +++ b/src/java/org/apache/cassandra/tcm/ClusterMetadata.java @@@ -1,1007 -1,0 +1,1012 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.tcm; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.config.CassandraRelevantProperties; +import org.apache.cassandra.db.TypeSizes; +import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.locator.EndpointsForToken; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.locator.Replica; +import org.apache.cassandra.net.CMSIdentifierMismatchException; +import org.apache.cassandra.schema.DistributedSchema; +import org.apache.cassandra.schema.KeyspaceMetadata; +import org.apache.cassandra.schema.Keyspaces; +import org.apache.cassandra.schema.ReplicationParams; +import org.apache.cassandra.tcm.extensions.ExtensionKey; +import org.apache.cassandra.tcm.extensions.ExtensionValue; +import org.apache.cassandra.tcm.membership.Directory; +import org.apache.cassandra.tcm.membership.Location; +import org.apache.cassandra.tcm.membership.NodeAddresses; +import org.apache.cassandra.tcm.membership.NodeId; +import org.apache.cassandra.tcm.membership.NodeState; +import org.apache.cassandra.tcm.membership.NodeVersion; +import org.apache.cassandra.tcm.ownership.DataPlacement; +import org.apache.cassandra.tcm.ownership.DataPlacements; +import org.apache.cassandra.tcm.ownership.PrimaryRangeComparator; +import org.apache.cassandra.tcm.ownership.PlacementForRange; +import org.apache.cassandra.tcm.ownership.TokenMap; +import org.apache.cassandra.tcm.ownership.VersionedEndpoints; +import org.apache.cassandra.tcm.sequences.InProgressSequences; +import org.apache.cassandra.tcm.sequences.LockedRanges; +import org.apache.cassandra.tcm.serialization.MetadataSerializer; +import org.apache.cassandra.tcm.serialization.Version; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.Pair; +import org.apache.cassandra.utils.vint.VIntCoding; + +import static org.apache.cassandra.config.CassandraRelevantProperties.LINE_SEPARATOR; +import static org.apache.cassandra.db.TypeSizes.sizeof; + +public class ClusterMetadata +{ + public static final int EMPTY_METADATA_IDENTIFIER = 0; + public static final Serializer serializer = new Serializer(); + + public final int metadataIdentifier; + + public final Epoch epoch; + public final long period; + public final boolean lastInPeriod; + public final IPartitioner partitioner; // Set during (initial) construction and not modifiable via Transformer + + public final DistributedSchema schema; + public final Directory directory; + public final TokenMap tokenMap; + public final DataPlacements placements; + public final LockedRanges lockedRanges; + public final InProgressSequences inProgressSequences; + public final ImmutableMap<ExtensionKey<?,?>, ExtensionValue<?>> extensions; + + // These two fields are lazy but only for the test purposes, since their computation requires initialization of the log ks + private Set<Replica> fullCMSReplicas; + private Set<InetAddressAndPort> fullCMSEndpoints; + + public ClusterMetadata(IPartitioner partitioner) + { + this(partitioner, Directory.EMPTY); + } + + @VisibleForTesting + public ClusterMetadata(IPartitioner partitioner, Directory directory) + { + this(partitioner, directory, DistributedSchema.first()); + } + + @VisibleForTesting + public ClusterMetadata(IPartitioner partitioner, Directory directory, DistributedSchema schema) + { + this(EMPTY_METADATA_IDENTIFIER, + Epoch.EMPTY, + Period.EMPTY, + true, + partitioner, + schema, + directory, + new TokenMap(partitioner), + DataPlacements.EMPTY, + LockedRanges.EMPTY, + InProgressSequences.EMPTY, + ImmutableMap.of()); + } + + public ClusterMetadata(Epoch epoch, + long period, + boolean lastInPeriod, + IPartitioner partitioner, + DistributedSchema schema, + Directory directory, + TokenMap tokenMap, + DataPlacements placements, + LockedRanges lockedRanges, + InProgressSequences inProgressSequences, + Map<ExtensionKey<?, ?>, ExtensionValue<?>> extensions) + { + this(EMPTY_METADATA_IDENTIFIER, + epoch, + period, + lastInPeriod, + partitioner, + schema, + directory, + tokenMap, + placements, + lockedRanges, + inProgressSequences, + extensions); + } + + private ClusterMetadata(int metadataIdentifier, + Epoch epoch, + long period, + boolean lastInPeriod, + IPartitioner partitioner, + DistributedSchema schema, + Directory directory, + TokenMap tokenMap, + DataPlacements placements, + LockedRanges lockedRanges, + InProgressSequences inProgressSequences, + Map<ExtensionKey<?, ?>, ExtensionValue<?>> extensions) + { + // TODO: token map is a feature of the specific placement strategy, and so may not be a relevant component of + // ClusterMetadata in the long term. We need to consider how the actual components of metadata can be evolved + // over time. + assert tokenMap == null || tokenMap.partitioner().getClass().equals(partitioner.getClass()) : "Partitioner for TokenMap doesn't match base partitioner"; + this.metadataIdentifier = metadataIdentifier; + this.epoch = epoch; + this.period = period; + this.lastInPeriod = lastInPeriod; + this.partitioner = partitioner; + this.schema = schema; + this.directory = directory; + this.tokenMap = tokenMap; + this.placements = placements; + this.lockedRanges = lockedRanges; + this.inProgressSequences = inProgressSequences; + this.extensions = ImmutableMap.copyOf(extensions); + } + + public Set<InetAddressAndPort> fullCMSMembers() + { + if (fullCMSEndpoints == null) + this.fullCMSEndpoints = ImmutableSet.copyOf(placements.get(ReplicationParams.meta(this)).reads.byEndpoint().keySet()); + return fullCMSEndpoints; + } + + public Set<Replica> fullCMSMembersAsReplicas() + { + if (fullCMSReplicas == null) + this.fullCMSReplicas = ImmutableSet.copyOf(placements.get(ReplicationParams.meta(this)).reads.byEndpoint().flattenValues()); + return fullCMSReplicas; + } + + public boolean isCMSMember(InetAddressAndPort endpoint) + { + return fullCMSMembers().contains(endpoint); + } + + public Transformer transformer() + { + return new Transformer(this, this.nextEpoch(), false); + } + + public Transformer transformer(boolean sealPeriod) + { + return new Transformer(this, this.nextEpoch(), sealPeriod); + } + + public ClusterMetadata forceEpoch(Epoch epoch) + { + // In certain circumstances, the last modified epoch of the individual + // components may have been updated beyond the epoch we're specifying here. + // An example is the execution of an UnsafeJoin transformation, where the + // sub-steps (Start/Mid/Finish) are executed in series, each updating a + // single ClusterMetadata and its individual components. At the end of that + // sequence, the CM epoch is then set forcibly to ensure the UnsafeJoin only + // increments the published epoch by one. As each component has its own last + // modified epoch, we may also need to coerce those, but only if they are + // greater than the epoch we're forcing here. + return new ClusterMetadata(metadataIdentifier, + epoch, + period, + lastInPeriod, + partitioner, + capLastModified(schema, epoch), + capLastModified(directory, epoch), + capLastModified(tokenMap, epoch), + capLastModified(placements, epoch), + capLastModified(lockedRanges, epoch), + capLastModified(inProgressSequences, epoch), + capLastModified(extensions, epoch)); + } + + public ClusterMetadata initializeClusterIdentifier(int clusterIdentifier) + { + if (this.metadataIdentifier != EMPTY_METADATA_IDENTIFIER) + throw new IllegalStateException(String.format("Can only initialize cluster identifier once, but it was already set to %d", this.metadataIdentifier)); + + if (clusterIdentifier == EMPTY_METADATA_IDENTIFIER) + throw new IllegalArgumentException("Can not initialize cluster with empty cluster identifier"); + + return new ClusterMetadata(clusterIdentifier, + epoch, + period, + lastInPeriod, + partitioner, + schema, + directory, + tokenMap, + placements, + lockedRanges, + inProgressSequences, + extensions); + } + + public ClusterMetadata forcePeriod(long period) + { + return new ClusterMetadata(metadataIdentifier, + epoch, + period, + false, + partitioner, + schema, + directory, + tokenMap, + placements, + lockedRanges, + inProgressSequences, + extensions); + } + + private static Map<ExtensionKey<?,?>, ExtensionValue<?>> capLastModified(Map<ExtensionKey<?,?>, ExtensionValue<?>> original, Epoch maxEpoch) + { + Map<ExtensionKey<?, ?>, ExtensionValue<?>> updated = new HashMap<>(); + original.forEach((key, value) -> { + ExtensionValue<?> newValue = value == null || value.lastModified().isEqualOrBefore(maxEpoch) + ? value + : (ExtensionValue<?>)value.withLastModified(maxEpoch); + updated.put(key, newValue); + }); + return updated; + } + + @SuppressWarnings("unchecked") + private static <V> V capLastModified(MetadataValue<V> value, Epoch maxEpoch) + { + return value == null || value.lastModified().isEqualOrBefore(maxEpoch) + ? (V)value + : value.withLastModified(maxEpoch); + } + + public Epoch nextEpoch() + { + return epoch.nextEpoch(); + } + + public long nextPeriod() + { + return lastInPeriod ? period + 1 : period; + } + + public DataPlacement writePlacementAllSettled(KeyspaceMetadata ksm) + { + ClusterMetadata metadata = this; + Iterator<MultiStepOperation<?>> iter = metadata.inProgressSequences.iterator(); + while (iter.hasNext()) + { + Transformation.Result result = iter.next().applyTo(metadata); + assert result.isSuccess(); + metadata = result.success().metadata; + } + return metadata.placements.get(ksm.params.replication); + } + + // TODO Remove this as it isn't really an equivalent to the previous concept of pending ranges + public boolean hasPendingRangesFor(KeyspaceMetadata ksm, Token token) + { + PlacementForRange writes = placements.get(ksm.params.replication).writes; + PlacementForRange reads = placements.get(ksm.params.replication).reads; + return !reads.forToken(token).equals(writes.forToken(token)); + } + + // TODO Remove this as it isn't really an equivalent to the previous concept of pending ranges + public boolean hasPendingRangesFor(KeyspaceMetadata ksm, InetAddressAndPort endpoint) + { + PlacementForRange writes = placements.get(ksm.params.replication).writes; + PlacementForRange reads = placements.get(ksm.params.replication).reads; + return !writes.byEndpoint().get(endpoint).equals(reads.byEndpoint().get(endpoint)); + } + + public Collection<Range<Token>> localWriteRanges(KeyspaceMetadata metadata) + { - return placements.get(metadata.params.replication).writes.byEndpoint().get(FBUtilities.getBroadcastAddressAndPort()).ranges(); ++ return writeRanges(metadata, FBUtilities.getBroadcastAddressAndPort()); ++ } ++ ++ public Collection<Range<Token>> writeRanges(KeyspaceMetadata metadata, InetAddressAndPort peer) ++ { ++ return placements.get(metadata.params.replication).writes.byEndpoint().get(peer).ranges(); + } + + // TODO Remove this as it isn't really an equivalent to the previous concept of pending ranges + public Map<Range<Token>, VersionedEndpoints.ForRange> pendingRanges(KeyspaceMetadata metadata) + { + Map<Range<Token>, VersionedEndpoints.ForRange> map = new HashMap<>(); + PlacementForRange writes = placements.get(metadata.params.replication).writes; + PlacementForRange reads = placements.get(metadata.params.replication).reads; + + // first, pending ranges as the result of range splitting or merging + // i.e. new ranges being created through join/leave + List<Range<Token>> pending = new ArrayList<>(writes.ranges()); + pending.removeAll(reads.ranges()); + for (Range<Token> p : pending) + map.put(p, placements.get(metadata.params.replication).writes.forRange(p)); + + // next, ranges where the ranges themselves are not changing, but the replicas are + // i.e. replacement or RF increase + writes.replicaGroups().forEach((range, endpoints) -> { + VersionedEndpoints.ForRange readGroup = reads.forRange(range); + if (!readGroup.equals(endpoints)) + map.put(range, VersionedEndpoints.forRange(endpoints.lastModified(), + endpoints.get().filter(r -> !readGroup.get().contains(r)))); + }); + + return map; + } + + // TODO Remove this as it isn't really an equivalent to the previous concept of pending endpoints + public VersionedEndpoints.ForToken pendingEndpointsFor(KeyspaceMetadata metadata, Token t) + { + VersionedEndpoints.ForToken writeEndpoints = placements.get(metadata.params.replication).writes.forToken(t); + VersionedEndpoints.ForToken readEndpoints = placements.get(metadata.params.replication).reads.forToken(t); + EndpointsForToken.Builder endpointsForToken = writeEndpoints.get().newBuilder(writeEndpoints.size() - readEndpoints.size()); + + for (Replica writeReplica : writeEndpoints.get()) + { + if (!readEndpoints.get().contains(writeReplica)) + endpointsForToken.add(writeReplica); + } + return VersionedEndpoints.forToken(writeEndpoints.lastModified(), endpointsForToken.build()); + } + + public static class Transformer + { + private final ClusterMetadata base; + private final Epoch epoch; + private final long period; + private final boolean lastInPeriod; + private final IPartitioner partitioner; + private DistributedSchema schema; + private Directory directory; + private TokenMap tokenMap; + private DataPlacements placements; + private LockedRanges lockedRanges; + private InProgressSequences inProgressSequences; + private final Map<ExtensionKey<?, ?>, ExtensionValue<?>> extensions; + private final Set<MetadataKey> modifiedKeys; + + private Transformer(ClusterMetadata metadata, Epoch epoch, boolean lastInPeriod) + { + this.base = metadata; + this.epoch = epoch; + this.period = metadata.nextPeriod(); + this.lastInPeriod = lastInPeriod; + this.partitioner = metadata.partitioner; + this.schema = metadata.schema; + this.directory = metadata.directory; + this.tokenMap = metadata.tokenMap; + this.placements = metadata.placements; + this.lockedRanges = metadata.lockedRanges; + this.inProgressSequences = metadata.inProgressSequences; + extensions = new HashMap<>(metadata.extensions); + modifiedKeys = new HashSet<>(); + } + + public Transformer with(DistributedSchema schema) + { + this.schema = schema; + return this; + } + + public Transformer with(Directory directory) + { + this.directory = directory; + return this; + } + + public Transformer register(NodeAddresses addresses, Location location, NodeVersion version) + { + directory = directory.with(addresses, location, version); + return this; + } + + public Transformer unregister(NodeId nodeId) + { + directory = directory.without(nodeId); + return this; + } + + public Transformer withNewAddresses(NodeId nodeId, NodeAddresses addresses) + { + directory = directory.withNodeAddresses(nodeId, addresses); + return this; + } + + public Transformer withVersion(NodeId nodeId, NodeVersion version) + { + directory = directory.withNodeVersion(nodeId, version); + return this; + } + + public Transformer withNodeState(NodeId id, NodeState state) + { + directory = directory.withNodeState(id, state); + return this; + } + + public Transformer proposeToken(NodeId nodeId, Collection<Token> tokens) + { + tokenMap = tokenMap.assignTokens(nodeId, tokens); + return this; + } + + public Transformer addToRackAndDC(NodeId nodeId) + { + directory = directory.withRackAndDC(nodeId); + return this; + } + + public Transformer unproposeTokens(NodeId nodeId) + { + tokenMap = tokenMap.unassignTokens(nodeId); + directory = directory.withoutRackAndDC(nodeId); + return this; + } + + public Transformer moveTokens(NodeId nodeId, Collection<Token> tokens) + { + tokenMap = tokenMap.unassignTokens(nodeId) + .assignTokens(nodeId, tokens); + return this; + } + + public Transformer join(NodeId nodeId) + { + directory = directory.withNodeState(nodeId, NodeState.JOINED); + return this; + } + + public Transformer replaced(NodeId replaced, NodeId replacement) + { + Collection<Token> transferringTokens = tokenMap.tokens(replaced); + tokenMap = tokenMap.unassignTokens(replaced) + .assignTokens(replacement, transferringTokens); + directory = directory.without(replaced) + .withRackAndDC(replacement) + .withNodeState(replacement, NodeState.JOINED); + return this; + } + + public Transformer proposeRemoveNode(NodeId id) + { + tokenMap = tokenMap.unassignTokens(id); + return this; + } + + public Transformer left(NodeId id) + { + tokenMap = tokenMap.unassignTokens(id); + directory = directory.withNodeState(id, NodeState.LEFT) + .withoutRackAndDC(id); + return this; + } + + public Transformer with(DataPlacements placements) + { + this.placements = placements; + return this; + } + + public Transformer with(LockedRanges lockedRanges) + { + this.lockedRanges = lockedRanges; + return this; + } + + public Transformer with(InProgressSequences sequences) + { + this.inProgressSequences = sequences; + return this; + } + + public Transformer with(ExtensionKey<?, ?> key, ExtensionValue<?> obj) + { + if (MetadataKeys.CORE_METADATA.contains(key)) + throw new IllegalArgumentException("Core cluster metadata objects should be addressed directly, " + + "not using the associated MetadataKey"); + + if (!key.valueType.isInstance(obj)) + throw new IllegalArgumentException("Value of type " + obj.getClass() + + " is incompatible with type for key " + key + + " (" + key.valueType + ")"); + + extensions.put(key, obj); + modifiedKeys.add(key); + return this; + } + + public Transformer withIfAbsent(ExtensionKey<?, ?> key, ExtensionValue<?> obj) + { + if (extensions.containsKey(key)) + return this; + return with(key, obj); + } + + public Transformer without(ExtensionKey<?, ?> key) + { + if (MetadataKeys.CORE_METADATA.contains(key)) + throw new IllegalArgumentException("Core cluster metadata objects should be addressed directly, " + + "not using the associated MetadataKey"); + if (extensions.remove(key) != null) + modifiedKeys.add(key); + return this; + } + + public Transformed build() + { + // Process extension first as a) these are actually mutable and b) they are added to the set of + // modified keys when added/updated/removed + for (MetadataKey key : modifiedKeys) + { + ExtensionValue<?> mutable = extensions.get(key); + if (null != mutable) + mutable.withLastModified(epoch); + } + + if (schema != base.schema) + { + modifiedKeys.add(MetadataKeys.SCHEMA); + schema = schema.withLastModified(epoch); + } + + if (directory != base.directory) + { + modifiedKeys.add(MetadataKeys.NODE_DIRECTORY); + directory = directory.withLastModified(epoch); + } + + if (tokenMap != base.tokenMap) + { + modifiedKeys.add(MetadataKeys.TOKEN_MAP); + tokenMap = tokenMap.withLastModified(epoch); + } + + if (placements != base.placements) + { + modifiedKeys.add(MetadataKeys.DATA_PLACEMENTS); + // sort all endpoint lists to preserve primary replica + if (CassandraRelevantProperties.TCM_SORT_REPLICA_GROUPS.getBoolean()) + { + PrimaryRangeComparator comparator = new PrimaryRangeComparator(tokenMap, directory); + placements = DataPlacements.sortReplicaGroups(placements, comparator); + } + placements = placements.withLastModified(epoch); + } + + if (lockedRanges != base.lockedRanges) + { + modifiedKeys.add(MetadataKeys.LOCKED_RANGES); + lockedRanges = lockedRanges.withLastModified(epoch); + } + + if (inProgressSequences != base.inProgressSequences) + { + modifiedKeys.add(MetadataKeys.IN_PROGRESS_SEQUENCES); + inProgressSequences = inProgressSequences.withLastModified(epoch); + } + + return new Transformed(new ClusterMetadata(base.metadataIdentifier, + epoch, + period, + lastInPeriod, + partitioner, + schema, + directory, + tokenMap, + placements, + lockedRanges, + inProgressSequences, + extensions), + ImmutableSet.copyOf(modifiedKeys)); + } + + public ClusterMetadata buildForGossipMode() + { + return new ClusterMetadata(base.metadataIdentifier, + Epoch.UPGRADE_GOSSIP, + Period.EMPTY, + true, + partitioner, + schema, + directory, + tokenMap, + placements, + lockedRanges, + inProgressSequences, + extensions); + } + + @Override + public String toString() + { + return "Transformer{" + + "baseEpoch=" + base.epoch + + ", epoch=" + epoch + + ", lastInPeriod=" + lastInPeriod + + ", partitioner=" + partitioner + + ", schema=" + schema + + ", directory=" + schema + + ", tokenMap=" + tokenMap + + ", placement=" + placements + + ", lockedRanges=" + lockedRanges + + ", inProgressSequences=" + inProgressSequences + + ", extensions=" + extensions + + ", modifiedKeys=" + modifiedKeys + + '}'; + } + + public static class Transformed + { + public final ClusterMetadata metadata; + public final ImmutableSet<MetadataKey> modifiedKeys; + + public Transformed(ClusterMetadata metadata, ImmutableSet<MetadataKey> modifiedKeys) + { + this.metadata = metadata; + this.modifiedKeys = modifiedKeys; + } + } + } + + public String legacyToString() + { + StringBuilder sb = new StringBuilder(); + Set<Pair<Token, InetAddressAndPort>> normal = new HashSet<>(); + Set<Pair<Token, InetAddressAndPort>> bootstrapping = new HashSet<>(); + Set<InetAddressAndPort> leaving = new HashSet<>(); + + for (Map.Entry<NodeId, NodeState> entry : directory.states.entrySet()) + { + InetAddressAndPort endpoint = directory.endpoint(entry.getKey()); + switch (entry.getValue()) + { + case BOOTSTRAPPING: + for (Token t : tokenMap.tokens(entry.getKey())) + bootstrapping.add(Pair.create(t, endpoint)); + break; + case LEAVING: + leaving.add(endpoint); + break; + case JOINED: + for (Token t : tokenMap.tokens(entry.getKey())) + normal.add(Pair.create(t, endpoint)); + break; + case MOVING: + // todo when adding MOVE + break; + } + } + + if (!normal.isEmpty()) + { + sb.append("Normal Tokens:"); + sb.append(LINE_SEPARATOR.getString()); + for (Pair<Token, InetAddressAndPort> ep : normal) + { + sb.append(ep.right); + sb.append(':'); + sb.append(ep.left); + sb.append(LINE_SEPARATOR.getString()); + } + } + + if (!bootstrapping.isEmpty()) + { + sb.append("Bootstrapping Tokens:" ); + sb.append(LINE_SEPARATOR.getString()); + for (Pair<Token, InetAddressAndPort> entry : bootstrapping) + { + sb.append(entry.right).append(':').append(entry.left); + sb.append(LINE_SEPARATOR.getString()); + } + } + + if (!leaving.isEmpty()) + { + sb.append("Leaving Endpoints:"); + sb.append(LINE_SEPARATOR.getString()); + for (InetAddressAndPort ep : leaving) + { + sb.append(ep); + sb.append(LINE_SEPARATOR.getString()); + } + } + return sb.toString(); + } + + @Override + public String toString() + { + return "ClusterMetadata{" + + "epoch=" + epoch + + ", schema=" + schema + + ", directory=" + directory + + ", tokenMap=" + tokenMap + + ", placements=" + placements + + ", lockedRanges=" + lockedRanges + + '}'; + } + + @Override + public boolean equals(Object o) + { + if (this == o) return true; + if (!(o instanceof ClusterMetadata)) return false; + ClusterMetadata that = (ClusterMetadata) o; + return epoch.equals(that.epoch) && + lastInPeriod == that.lastInPeriod && + schema.equals(that.schema) && + directory.equals(that.directory) && + tokenMap.equals(that.tokenMap) && + placements.equals(that.placements) && + lockedRanges.equals(that.lockedRanges) && + inProgressSequences.equals(that.inProgressSequences) && + extensions.equals(that.extensions); + } + + private static final Logger logger = LoggerFactory.getLogger(ClusterMetadata.class); + + public void dumpDiff(ClusterMetadata other) + { + if (!epoch.equals(other.epoch)) + { + logger.warn("Epoch {} != {}", epoch, other.epoch); + } + if (lastInPeriod != other.lastInPeriod) + { + logger.warn("lastInPeriod {} != {}", lastInPeriod, other.lastInPeriod); + } + if (!schema.equals(other.schema)) + { + Keyspaces.KeyspacesDiff diff = Keyspaces.diff(schema.getKeyspaces(), other.schema.getKeyspaces()); + logger.warn("Schemas differ {}", diff); + } + if (!directory.equals(other.directory)) + { + logger.warn("Directories differ:"); + directory.dumpDiff(other.directory); + } + if (!tokenMap.equals(other.tokenMap)) + { + logger.warn("Token maps differ:"); + tokenMap.dumpDiff(other.tokenMap); + } + if (!placements.equals(other.placements)) + { + logger.warn("Placements differ:"); + placements.dumpDiff(other.placements); + } + if (!lockedRanges.equals(other.lockedRanges)) + { + logger.warn("Locked ranges differ: {} != {}", lockedRanges, other.lockedRanges); + } + if (!inProgressSequences.equals(other.inProgressSequences)) + { + logger.warn("In progress sequences differ: {} != {}", inProgressSequences, other.inProgressSequences); + } + if (!extensions.equals(other.extensions)) + { + logger.warn("Extensions differ: {} != {}", extensions, other.extensions); + } + } + + @Override + public int hashCode() + { + return Objects.hash(epoch, lastInPeriod, schema, directory, tokenMap, placements, lockedRanges, inProgressSequences, extensions); + } + + public static ClusterMetadata current() + { + return ClusterMetadataService.instance().metadata(); + } + + public static void checkIdentifier(int remoteIdentifier) + { + ClusterMetadata metadata = currentNullable(); + if (metadata != null) + { + int currentIdentifier = metadata.metadataIdentifier; + // We haven't yet joined CMS fully + if (currentIdentifier == EMPTY_METADATA_IDENTIFIER) + return; + + // Peer hasn't yet joined CMS fully + if (remoteIdentifier == EMPTY_METADATA_IDENTIFIER) + return; + + if (currentIdentifier != remoteIdentifier) + throw new CMSIdentifierMismatchException(String.format("Cluster Metadata Identifier mismatch. Node is attempting to communicate with a node from a different cluster. Current identifier %d. Remote identifier: %d", currentIdentifier, remoteIdentifier)); + } + } + + /** + * Startup of some services may race with cluster metadata initialization. We allow those services to + * gracefully handle scenarios when it is not yet initialized. + */ + public static ClusterMetadata currentNullable() + { + ClusterMetadataService service = ClusterMetadataService.instance(); + if (service == null) + return null; + return service.metadata(); + } + + public NodeId myNodeId() + { + return directory.peerId(FBUtilities.getBroadcastAddressAndPort()); + } + + public NodeState myNodeState() + { + NodeId nodeId = myNodeId(); + if (myNodeId() != null) + return directory.peerState(nodeId); + return null; + } + + public boolean metadataSerializationUpgradeInProgress() + { + return !directory.clusterMaxVersion.serializationVersion().equals(directory.clusterMinVersion.serializationVersion()); + } + + public static class Serializer implements MetadataSerializer<ClusterMetadata> + { + @Override + public void serialize(ClusterMetadata metadata, DataOutputPlus out, Version version) throws IOException + { + if (version.isAtLeast(Version.V1)) + out.writeUTF(metadata.partitioner.getClass().getCanonicalName()); + + if (version.isAtLeast(Version.V2)) + out.writeUnsignedVInt32(metadata.metadataIdentifier); + + Epoch.serializer.serialize(metadata.epoch, out); + out.writeUnsignedVInt(metadata.period); + out.writeBoolean(metadata.lastInPeriod); + + if (version.isBefore(Version.V1)) + out.writeUTF(metadata.partitioner.getClass().getCanonicalName()); + + DistributedSchema.serializer.serialize(metadata.schema, out, version); + Directory.serializer.serialize(metadata.directory, out, version); + TokenMap.serializer.serialize(metadata.tokenMap, out, version); + DataPlacements.serializer.serialize(metadata.placements, out, version); + LockedRanges.serializer.serialize(metadata.lockedRanges, out, version); + InProgressSequences.serializer.serialize(metadata.inProgressSequences, out, version); + out.writeInt(metadata.extensions.size()); + for (Map.Entry<ExtensionKey<?, ?>, ExtensionValue<?>> entry : metadata.extensions.entrySet()) + { + ExtensionKey<?, ?> key = entry.getKey(); + ExtensionValue<?> value = entry.getValue(); + ExtensionKey.serializer.serialize(key, out, version); + assert key.valueType.isInstance(value); + value.serialize(out, version); + } + } + + @Override + public ClusterMetadata deserialize(DataInputPlus in, Version version) throws IOException + { + IPartitioner partitioner = null; + if (version.isAtLeast(Version.V1)) + partitioner = FBUtilities.newPartitioner(in.readUTF()); + + int clusterIdentifier = EMPTY_METADATA_IDENTIFIER; + if (version.isAtLeast(Version.V2)) + { + clusterIdentifier = in.readUnsignedVInt32(); + checkIdentifier(clusterIdentifier); + } + + Epoch epoch = Epoch.serializer.deserialize(in); + long period = in.readUnsignedVInt(); + boolean lastInPeriod = in.readBoolean(); + + if (version.isBefore(Version.V1)) + partitioner = FBUtilities.newPartitioner(in.readUTF()); + + DistributedSchema schema = DistributedSchema.serializer.deserialize(in, version); + Directory dir = Directory.serializer.deserialize(in, version); + TokenMap tokenMap = TokenMap.serializer.deserialize(in, version); + DataPlacements placements = DataPlacements.serializer.deserialize(in, version); + LockedRanges lockedRanges = LockedRanges.serializer.deserialize(in, version); + InProgressSequences ips = InProgressSequences.serializer.deserialize(in, version); + int items = in.readInt(); + Map<ExtensionKey<?, ?>, ExtensionValue<?>> extensions = new HashMap<>(items); + for (int i = 0; i < items; i++) + { + ExtensionKey<?, ?> key = ExtensionKey.serializer.deserialize(in, version); + ExtensionValue<?> value = key.newValue(); + value.deserialize(in, version); + extensions.put(key, value); + } + return new ClusterMetadata(clusterIdentifier, + epoch, + period, + lastInPeriod, + partitioner, + schema, + dir, + tokenMap, + placements, + lockedRanges, + ips, + extensions); + } + + @Override + public long serializedSize(ClusterMetadata metadata, Version version) + { + long size = TypeSizes.INT_SIZE; + for (Map.Entry<ExtensionKey<?, ?>, ExtensionValue<?>> entry : metadata.extensions.entrySet()) + size += ExtensionKey.serializer.serializedSize(entry.getKey(), version) + + entry.getValue().serializedSize(version); + + if (version.isAtLeast(Version.V2)) + size += TypeSizes.sizeofUnsignedVInt(metadata.metadataIdentifier); + + size += Epoch.serializer.serializedSize(metadata.epoch) + + VIntCoding.computeUnsignedVIntSize(metadata.period) + + TypeSizes.BOOL_SIZE + + sizeof(metadata.partitioner.getClass().getCanonicalName()) + + DistributedSchema.serializer.serializedSize(metadata.schema, version) + + Directory.serializer.serializedSize(metadata.directory, version) + + TokenMap.serializer.serializedSize(metadata.tokenMap, version) + + DataPlacements.serializer.serializedSize(metadata.placements, version) + + LockedRanges.serializer.serializedSize(metadata.lockedRanges, version) + + InProgressSequences.serializer.serializedSize(metadata.inProgressSequences, version); + + return size; + } + + public static IPartitioner getPartitioner(DataInputPlus in, Version version) throws IOException + { + if (version.isAtLeast(Version.V1)) + return FBUtilities.newPartitioner(in.readUTF()); + + Epoch.serializer.deserialize(in); + in.readUnsignedVInt(); + in.readBoolean(); + return FBUtilities.newPartitioner(in.readUTF()); + } + } +} diff --cc test/distributed/org/apache/cassandra/distributed/test/PaxosRepairTest.java index 6ec42d40b5,86b8a5199d..8e3db2039f --- a/test/distributed/org/apache/cassandra/distributed/test/PaxosRepairTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/PaxosRepairTest.java @@@ -57,8 -56,13 +57,9 @@@ import org.apache.cassandra.distributed import org.apache.cassandra.distributed.api.IInvokableInstance; import org.apache.cassandra.distributed.api.IMessageFilters; import org.apache.cassandra.gms.FailureDetector; -import org.apache.cassandra.gms.Gossiper; import org.apache.cassandra.locator.InetAddressAndPort; -import org.apache.cassandra.gms.ApplicationState; -import org.apache.cassandra.gms.EndpointState; -import org.apache.cassandra.gms.VersionedValue; import org.apache.cassandra.repair.RepairParallelism; + import org.apache.cassandra.repair.SharedContext; import org.apache.cassandra.repair.messages.RepairOption; import org.apache.cassandra.schema.Schema; import org.apache.cassandra.schema.TableMetadata; diff --cc test/unit/org/apache/cassandra/repair/FuzzTestBase.java index 0c26b3f279,81b341bed9..ff205554f8 --- a/test/unit/org/apache/cassandra/repair/FuzzTestBase.java +++ b/test/unit/org/apache/cassandra/repair/FuzzTestBase.java @@@ -56,9 -57,7 +56,8 @@@ import accord.utils.DefaultRandom import accord.utils.Gen; import accord.utils.Gens; import accord.utils.RandomSource; - import org.agrona.collections.Long2ObjectHashMap; import org.agrona.collections.LongHashSet; +import org.apache.cassandra.ServerTestUtils; import org.apache.cassandra.concurrent.ExecutorBuilder; import org.apache.cassandra.concurrent.ExecutorBuilderFactory; import org.apache.cassandra.concurrent.ExecutorFactory; @@@ -119,7 -118,15 +118,14 @@@ import org.apache.cassandra.schema.Tabl import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.schema.Tables; import org.apache.cassandra.service.ActiveRepairService; -import org.apache.cassandra.service.PendingRangeCalculatorService; import org.apache.cassandra.service.StorageService; + import org.apache.cassandra.service.paxos.cleanup.PaxosCleanupComplete; + import org.apache.cassandra.service.paxos.cleanup.PaxosCleanupHistory; + import org.apache.cassandra.service.paxos.cleanup.PaxosCleanupRequest; + import org.apache.cassandra.service.paxos.cleanup.PaxosCleanupResponse; + import org.apache.cassandra.service.paxos.cleanup.PaxosRepairState; + import org.apache.cassandra.service.paxos.cleanup.PaxosFinishPrepareCleanup; + import org.apache.cassandra.service.paxos.cleanup.PaxosStartPrepareCleanup; import org.apache.cassandra.streaming.StreamEventHandler; import org.apache.cassandra.streaming.StreamReceiveException; import org.apache.cassandra.streaming.StreamSession; @@@ -1195,6 -1312,18 +1308,12 @@@ public abstract class FuzzTestBase exte { return streamExecutor; } + - @Override - public PendingRangeCalculatorService pendingRangeCalculator() - { - return PendingRangeCalculatorService.instance; - } - + @Override + public PaxosRepairState paxosRepairState() + { + return paxosRepairState; + } } private Message serde(Message msg) @@@ -1282,15 -1411,13 +1401,20 @@@ next = it.next(); } if (FuzzTestBase.class.getName().equals(next.getClassName())) return Access.MAIN_THREAD_ONLY; - if (next.getClassName().startsWith("org.apache.cassandra.db.") || - next.getClassName().startsWith("org.apache.cassandra.gms.") || - next.getClassName().startsWith("org.apache.cassandra.cql3.") || - next.getClassName().startsWith("org.apache.cassandra.metrics.") || - next.getClassName().startsWith("org.apache.cassandra.utils.concurrent.") || - next.getClassName().startsWith("org.apache.cassandra.tcm") || - next.getClassName().startsWith("org.apache.cassandra.utils.TimeUUID") || - next.getClassName().startsWith("org.apache.cassandra.schema") || - next.getClassName().startsWith(PendingAntiCompaction.class.getName())) ++ + // this is non-deterministic... but since the scope of the work is testing repair and not paxos... this is unblocked for now... + if (("org.apache.cassandra.service.paxos.Paxos".equals(next.getClassName()) && "newBallot".equals(next.getMethodName())) + || ("org.apache.cassandra.service.paxos.uncommitted.PaxosBallotTracker".equals(next.getClassName()) && "updateLowBound".equals(next.getMethodName()))) + return Access.MAIN_THREAD_ONLY; - if (next.getClassName().startsWith("org.apache.cassandra.db.") || next.getClassName().startsWith("org.apache.cassandra.gms.") || next.getClassName().startsWith("org.apache.cassandra.cql3.") || next.getClassName().startsWith("org.apache.cassandra.metrics.") || next.getClassName().startsWith("org.apache.cassandra.utils.concurrent.") ++ if (next.getClassName().startsWith("org.apache.cassandra.db.") ++ || next.getClassName().startsWith("org.apache.cassandra.gms.") ++ || next.getClassName().startsWith("org.apache.cassandra.cql3.") ++ || next.getClassName().startsWith("org.apache.cassandra.metrics.") ++ || next.getClassName().startsWith("org.apache.cassandra.utils.concurrent.") ++ || next.getClassName().startsWith("org.apache.cassandra.tcm") + || next.getClassName().startsWith("org.apache.cassandra.utils.TimeUUID") // this would be good to solve ++ || next.getClassName().startsWith("org.apache.cassandra.schema") + || next.getClassName().startsWith(PendingAntiCompaction.class.getName())) return Access.IGNORE; if (next.getClassName().startsWith("org.apache.cassandra.repair") || ActiveRepairService.class.getName().startsWith(next.getClassName())) return Access.REJECT; diff --cc test/unit/org/apache/cassandra/repair/RepairJobTest.java index 20bd0c14a5,872ee99abb..36c17855e3 --- a/test/unit/org/apache/cassandra/repair/RepairJobTest.java +++ b/test/unit/org/apache/cassandra/repair/RepairJobTest.java @@@ -40,7 -40,7 +40,8 @@@ import com.google.common.util.concurren import org.apache.cassandra.repair.messages.SyncResponse; import org.apache.cassandra.repair.messages.ValidationResponse; +import org.apache.cassandra.config.DatabaseDescriptor; + import org.apache.cassandra.service.paxos.cleanup.PaxosRepairState; import org.assertj.core.api.Assertions; import org.junit.After; import org.junit.Before; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org