This is an automated email from the ASF dual-hosted git repository. samt pushed a commit to branch cep-21-tcm in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit d4d35a7d790df44fc151c52f405fc10484f84930 Author: Sam Tunnicliffe <s...@apache.org> AuthorDate: Thu Mar 2 14:42:04 2023 +0000 [CEP-21] Replace fake initial implementation of CMS membership WIP commit (i.e. does not compile) replacing initial toy implementation of CMS membership with proper implementation. Membership of the CMS is determined by ownership of keyspaces with the META replication strategy (more precisely, by being a member of the _read_ placements for meta strategy keyspaces, a node is considered a member of the CMS). Also implements more of the "real" [pre]initialization of the CMS, in preparation for supporting upgrading a running cluster from a gossip based system. Co-authored-by: Marcus Eriksson <marc...@apache.org> Co-authored-by: Alex Petrov <oleksandr.pet...@gmail.com> Co-authored-by: Sam Tunnicliffe <s...@apache.org> --- .../org/apache/cassandra/auth/AuthKeyspace.java | 55 +++--- src/java/org/apache/cassandra/config/Config.java | 2 +- src/java/org/apache/cassandra/gms/Gossiper.java | 6 +- .../org/apache/cassandra/locator/MetaStrategy.java | 4 +- .../schema/SystemDistributedKeyspace.java | 114 +++++------ .../org/apache/cassandra/tcm/ClusterMetadata.java | 63 ++---- src/java/org/apache/cassandra/tcm/Discovery.java | 2 +- .../apache/cassandra/tcm/PaxosBackedProcessor.java | 7 +- .../org/apache/cassandra/tcm/RemoteProcessor.java | 2 +- .../org/apache/cassandra/tcm/Transformation.java | 7 + .../cassandra/tcm/compatibility/GossipHelper.java | 3 - .../org/apache/cassandra/tcm/log/LocalLog.java | 4 +- .../apache/cassandra/tcm/sequences/AddToCMS.java | 217 +++++++++++++++++++++ .../tcm/sequences/InProgressSequences.java | 1 + ...lize.java => BaseMembershipTransformation.java} | 57 ++---- .../tcm/transformations/cms/FinishAddMember.java | 80 ++++++++ .../tcm/transformations/cms/Initialize.java | 58 +++++- .../tcm/transformations/cms/PreInitialize.java | 29 ++- .../tcm/transformations/cms/RemoveMember.java | 59 ++++++ .../tcm/transformations/cms/StartAddMember.java | 84 ++++++++ .../apache/cassandra/tracing/TraceKeyspace.java | 51 +++-- 21 files changed, 690 insertions(+), 215 deletions(-) diff --git a/src/java/org/apache/cassandra/auth/AuthKeyspace.java b/src/java/org/apache/cassandra/auth/AuthKeyspace.java index 67fc9c1a06..12760706fc 100644 --- a/src/java/org/apache/cassandra/auth/AuthKeyspace.java +++ b/src/java/org/apache/cassandra/auth/AuthKeyspace.java @@ -37,7 +37,7 @@ public final class AuthKeyspace { } - private static final int DEFAULT_RF = CassandraRelevantProperties.SYSTEM_AUTH_DEFAULT_RF.getInt(); + public static final int DEFAULT_RF = CassandraRelevantProperties.SYSTEM_AUTH_DEFAULT_RF.getInt(); /** * Generation is used as a timestamp for automatic table creation on startup. @@ -57,49 +57,54 @@ public final class AuthKeyspace public static final long SUPERUSER_SETUP_DELAY = Long.getLong("cassandra.superuser_setup_delay_ms", 10000); + public static String ROLES_CQL = "CREATE TABLE IF NOT EXISTS %s (" + + "role text," + + "is_superuser boolean," + + "can_login boolean," + + "salted_hash text," + + "member_of set<text>," + + "PRIMARY KEY(role))"; private static final TableMetadata Roles = parse(ROLES, "role definitions", - "CREATE TABLE %s (" - + "role text," - + "is_superuser boolean," - + "can_login boolean," - + "salted_hash text," - + "member_of set<text>," - + "PRIMARY KEY(role))"); + ROLES_CQL); + public static String ROLE_MEMBERS_CQL = "CREATE TABLE IF NOT EXISTS %s (" + + "role text," + + "member text," + + "PRIMARY KEY(role, member))"; private static final TableMetadata RoleMembers = parse(ROLE_MEMBERS, "role memberships lookup table", - "CREATE TABLE %s (" - + "role text," - + "member text," - + "PRIMARY KEY(role, member))"); + ROLE_MEMBERS_CQL); + public static String ROLE_PERMISSIONS_CQL = "CREATE TABLE IF NOT EXISTS %s (" + + "role text," + + "resource text," + + "permissions set<text>," + + "PRIMARY KEY(role, resource))"; private static final TableMetadata RolePermissions = parse(ROLE_PERMISSIONS, "permissions granted to db roles", - "CREATE TABLE %s (" - + "role text," - + "resource text," - + "permissions set<text>," - + "PRIMARY KEY(role, resource))"); + ROLE_PERMISSIONS_CQL); + public static String RESOURCE_ROLE_INDEX_CQL = "CREATE TABLE IF NOT EXISTS %s (" + + "resource text," + + "role text," + + "PRIMARY KEY(resource, role))"; private static final TableMetadata ResourceRoleIndex = parse(RESOURCE_ROLE_INDEX, "index of db roles with permissions granted on a resource", - "CREATE TABLE %s (" - + "resource text," - + "role text," - + "PRIMARY KEY(resource, role))"); + RESOURCE_ROLE_INDEX_CQL); + public static String NETWORK_PERMISSIONS_CQL = "CREATE TABLE IF NOT EXISTS %s (" + + "role text, " + + "dcs frozen<set<text>>, " + + "PRIMARY KEY(role))"; private static final TableMetadata NetworkPermissions = parse(NETWORK_PERMISSIONS, "user network permissions", - "CREATE TABLE %s (" - + "role text, " - + "dcs frozen<set<text>>, " - + "PRIMARY KEY(role))"); + NETWORK_PERMISSIONS_CQL); private static TableMetadata parse(String name, String description, String cql) { diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java index 5a37eb1600..15eb49a2e6 100644 --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@ -987,7 +987,7 @@ public class Config /** * See {@link PaxosVariant}. Defaults to v1, recommend upgrading to v2 at earliest opportunity. */ - public volatile PaxosVariant paxos_variant = PaxosVariant.v1; + public volatile PaxosVariant paxos_variant = PaxosVariant.v2; // TODO: only use v2 for CMS operations /** * If true, paxos topology change repair will not run on a topology change - this option should only be used in diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java b/src/java/org/apache/cassandra/gms/Gossiper.java index 86ccee939e..afd7c751c9 100644 --- a/src/java/org/apache/cassandra/gms/Gossiper.java +++ b/src/java/org/apache/cassandra/gms/Gossiper.java @@ -800,7 +800,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean if (seeds.contains(to)) gossipedWith.add(SEED); - if (ClusterMetadata.current().cmsMembers().contains(to)) + if (ClusterMetadata.current().fullCMSMembers().contains(to)) gossipedWith.add(CMS); GossiperDiagnostics.sendGossipDigestSyn(this, to); return gossipedWith; @@ -869,7 +869,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean private void maybeGossipToCMS(Message<GossipDigestSyn> message) { - Set<InetAddressAndPort> cms = ClusterMetadata.current().cmsMembers(); + Set<InetAddressAndPort> cms = ClusterMetadata.current().fullCMSMembers(); if (cms.contains(getBroadcastAddressAndPort())) return; @@ -2212,7 +2212,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean public void triggerRoundWithCMS() { ClusterMetadata metadata = ClusterMetadata.current(); - Set<InetAddressAndPort> cms = metadata.cmsMembers(); + Set<InetAddressAndPort> cms = metadata.fullCMSMembers(); if (!cms.contains(getBroadcastAddressAndPort())) { logger.debug("Triggering gossip round with CMS {}", metadata.epoch); diff --git a/src/java/org/apache/cassandra/locator/MetaStrategy.java b/src/java/org/apache/cassandra/locator/MetaStrategy.java index 295e06f1a9..28272088da 100644 --- a/src/java/org/apache/cassandra/locator/MetaStrategy.java +++ b/src/java/org/apache/cassandra/locator/MetaStrategy.java @@ -51,7 +51,7 @@ public class MetaStrategy extends SystemStrategy private static EndpointsForRange replicas() { - Set<InetAddressAndPort> members = ClusterMetadata.current().cmsMembers(); + Set<InetAddressAndPort> members = ClusterMetadata.current().fullCMSMembers(); return EndpointsForRange.builder(EntireRange.entireRange, members.size()) .addAll(members.stream().map(EntireRange::replica).collect(Collectors.toList())) .build(); @@ -59,7 +59,7 @@ public class MetaStrategy extends SystemStrategy @Override public ReplicationFactor getReplicationFactor() { - int rf = ClusterMetadata.current().cmsMembers.size(); + int rf = ClusterMetadata.current().fullCMSMembers().size(); return ReplicationFactor.fullOnly(rf); } diff --git a/src/java/org/apache/cassandra/schema/SystemDistributedKeyspace.java b/src/java/org/apache/cassandra/schema/SystemDistributedKeyspace.java index 563dedf935..4c978f0af5 100644 --- a/src/java/org/apache/cassandra/schema/SystemDistributedKeyspace.java +++ b/src/java/org/apache/cassandra/schema/SystemDistributedKeyspace.java @@ -48,7 +48,6 @@ import org.apache.cassandra.db.ConsistencyLevel; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; -import org.apache.cassandra.gms.Gossiper; import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.repair.CommonRange; import org.apache.cassandra.repair.messages.RepairOption; @@ -67,7 +66,7 @@ public final class SystemDistributedKeyspace public static final String NAME = "system_distributed"; - private static final int DEFAULT_RF = CassandraRelevantProperties.SYSTEM_DISTRIBUTED_DEFAULT_RF.getInt(); + public static final int DEFAULT_RF = CassandraRelevantProperties.SYSTEM_DISTRIBUTED_DEFAULT_RF.getInt(); private static final Logger logger = LoggerFactory.getLogger(SystemDistributedKeyspace.class); /** @@ -82,6 +81,8 @@ public final class SystemDistributedKeyspace * gen 4: compression chunk length reduced to 16KiB, memtable_flush_period_in_ms now unset on all tables in 4.0 * gen 5: add ttl and TWCS to repair_history tables * gen 6: add denylist table + * + * // TODO: TCM - how do we evolve these tables? */ public static final long GENERATION = 6; @@ -93,65 +94,65 @@ public final class SystemDistributedKeyspace public static final String PARTITION_DENYLIST_TABLE = "partition_denylist"; + public static final String REPAIR_HISTORY_CQL = "CREATE TABLE IF NOT EXISTS %s (" + + "keyspace_name text," + + "columnfamily_name text," + + "id timeuuid," + + "parent_id timeuuid," + + "range_begin text," + + "range_end text," + + "coordinator inet," + + "coordinator_port int," + + "participants set<inet>," + + "participants_v2 set<text>," + + "exception_message text," + + "exception_stacktrace text," + + "status text," + + "started_at timestamp," + + "finished_at timestamp," + + "PRIMARY KEY ((keyspace_name, columnfamily_name), id))"; + private static final TableMetadata RepairHistory = - parse(REPAIR_HISTORY, - "Repair history", - "CREATE TABLE %s (" - + "keyspace_name text," - + "columnfamily_name text," - + "id timeuuid," - + "parent_id timeuuid," - + "range_begin text," - + "range_end text," - + "coordinator inet," - + "coordinator_port int," - + "participants set<inet>," - + "participants_v2 set<text>," - + "exception_message text," - + "exception_stacktrace text," - + "status text," - + "started_at timestamp," - + "finished_at timestamp," - + "PRIMARY KEY ((keyspace_name, columnfamily_name), id))") + parse(REPAIR_HISTORY, "Repair history", REPAIR_HISTORY_CQL) .defaultTimeToLive((int) TimeUnit.DAYS.toSeconds(30)) .compaction(CompactionParams.twcs(ImmutableMap.of("compaction_window_unit","DAYS", "compaction_window_size","1"))) .build(); + public static final String PARENT_REPAIR_HISTORY_CQL = "CREATE TABLE IF NOT EXISTS %s (" + + "parent_id timeuuid," + + "keyspace_name text," + + "columnfamily_names set<text>," + + "started_at timestamp," + + "finished_at timestamp," + + "exception_message text," + + "exception_stacktrace text," + + "requested_ranges set<text>," + + "successful_ranges set<text>," + + "options map<text, text>," + + "PRIMARY KEY (parent_id))"; private static final TableMetadata ParentRepairHistory = - parse(PARENT_REPAIR_HISTORY, - "Repair history", - "CREATE TABLE %s (" - + "parent_id timeuuid," - + "keyspace_name text," - + "columnfamily_names set<text>," - + "started_at timestamp," - + "finished_at timestamp," - + "exception_message text," - + "exception_stacktrace text," - + "requested_ranges set<text>," - + "successful_ranges set<text>," - + "options map<text, text>," - + "PRIMARY KEY (parent_id))") + parse(PARENT_REPAIR_HISTORY, "Repair history", PARENT_REPAIR_HISTORY_CQL) .defaultTimeToLive((int) TimeUnit.DAYS.toSeconds(30)) .compaction(CompactionParams.twcs(ImmutableMap.of("compaction_window_unit","DAYS", "compaction_window_size","1"))) .build(); + public static final String VIEW_BUILD_STATUS_CQL = "CREATE TABLE IF NOT EXISTS %s (" + + "keyspace_name text," + + "view_name text," + + "host_id uuid," + + "status text," + + "PRIMARY KEY ((keyspace_name, view_name), host_id))"; private static final TableMetadata ViewBuildStatus = parse(VIEW_BUILD_STATUS, "Materialized View build status", - "CREATE TABLE %s (" - + "keyspace_name text," - + "view_name text," - + "host_id uuid," - + "status text," - + "PRIMARY KEY ((keyspace_name, view_name), host_id))").build(); + VIEW_BUILD_STATUS_CQL).build(); public static final TableMetadata PartitionDenylistTable = parse(PARTITION_DENYLIST_TABLE, "Partition keys which have been denied access", - "CREATE TABLE %s (" + "CREATE TABLE IF NOT EXISTS %s (" + "ks_name text," + "table_name text," + "key blob," @@ -167,7 +168,9 @@ public final class SystemDistributedKeyspace public static KeyspaceMetadata metadata() { - return KeyspaceMetadata.create(SchemaConstants.DISTRIBUTED_KEYSPACE_NAME, KeyspaceParams.simple(Math.max(DEFAULT_RF, DatabaseDescriptor.getDefaultKeyspaceRF())), Tables.of(RepairHistory, ParentRepairHistory, ViewBuildStatus, PartitionDenylistTable)); + return KeyspaceMetadata.create(SchemaConstants.DISTRIBUTED_KEYSPACE_NAME, + KeyspaceParams.simple(Math.max(DEFAULT_RF, DatabaseDescriptor.getDefaultKeyspaceRF())), + Tables.of(RepairHistory, ParentRepairHistory, ViewBuildStatus, PartitionDenylistTable)); } public static void startParentRepair(TimeUUID parent_id, String keyspaceName, String[] cfnames, RepairOption options) @@ -246,18 +249,19 @@ public final class SystemDistributedKeyspace { for (Range<Token> range : commonRange.ranges) { - String fmtQry = format(query, SchemaConstants.DISTRIBUTED_KEYSPACE_NAME, REPAIR_HISTORY, - keyspaceName, - cfname, - id.toString(), - parent_id.toString(), - range.left.toString(), - range.right.toString(), - coordinator.getHostAddress(false), - coordinator.getPort(), - Joiner.on("', '").join(participants), - Joiner.on("', '").join(participants_v2), - RepairState.STARTED.toString()); + String fmtQry; + fmtQry = format(query, SchemaConstants.DISTRIBUTED_KEYSPACE_NAME, REPAIR_HISTORY, + keyspaceName, + cfname, + id.toString(), + parent_id.toString(), + range.left.toString(), + range.right.toString(), + coordinator.getHostAddress(false), + coordinator.getPort(), + Joiner.on("', '").join(participants), + Joiner.on("', '").join(participants_v2), + RepairState.STARTED.toString()); processSilent(fmtQry); } } diff --git a/src/java/org/apache/cassandra/tcm/ClusterMetadata.java b/src/java/org/apache/cassandra/tcm/ClusterMetadata.java index a89f3c6dd9..3708cadfd1 100644 --- a/src/java/org/apache/cassandra/tcm/ClusterMetadata.java +++ b/src/java/org/apache/cassandra/tcm/ClusterMetadata.java @@ -25,7 +25,6 @@ import java.util.HashSet; import java.util.Map; import java.util.Objects; import java.util.Set; -import java.util.stream.Collectors; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableMap; @@ -37,9 +36,10 @@ import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.dht.Token; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; -import org.apache.cassandra.locator.EndpointsForRange; import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.locator.Replica; import org.apache.cassandra.schema.DistributedSchema; +import org.apache.cassandra.schema.ReplicationParams; import org.apache.cassandra.tcm.extensions.ExtensionKey; import org.apache.cassandra.tcm.extensions.ExtensionValue; import org.apache.cassandra.tcm.membership.Directory; @@ -55,12 +55,10 @@ import org.apache.cassandra.tcm.sequences.InProgressSequences; import org.apache.cassandra.tcm.sequences.LockedRanges; import org.apache.cassandra.tcm.serialization.MetadataSerializer; import org.apache.cassandra.tcm.serialization.Version; -import org.apache.cassandra.tcm.transformations.cms.EntireRange; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.vint.VIntCoding; import static org.apache.cassandra.db.TypeSizes.sizeof; -import static org.apache.cassandra.tcm.transformations.cms.EntireRange.entireRange; public class ClusterMetadata { @@ -70,7 +68,6 @@ public class ClusterMetadata public final long period; public final boolean lastInPeriod; public final IPartitioner partitioner; // Set during (initial) construction and not modifiable via Transformer - public final ImmutableMap<ExtensionKey<?,?>, ExtensionValue<?>> extensions; public final DistributedSchema schema; public final Directory directory; @@ -78,8 +75,9 @@ public class ClusterMetadata public final DataPlacements placements; public final LockedRanges lockedRanges; public final InProgressSequences inProgressSequences; - public final EndpointsForRange cmsReplicas; - public final ImmutableSet<InetAddressAndPort> cmsMembers; + public final ImmutableMap<ExtensionKey<?,?>, ExtensionValue<?>> extensions; + private final Set<Replica> fullCMSReplicas; + private final Set<InetAddressAndPort> fullCMSEndpoints; public ClusterMetadata(IPartitioner partitioner) { @@ -104,7 +102,6 @@ public class ClusterMetadata DataPlacements.EMPTY, LockedRanges.EMPTY, InProgressSequences.EMPTY, - ImmutableSet.of(), ImmutableMap.of()); } @@ -118,7 +115,6 @@ public class ClusterMetadata DataPlacements placements, LockedRanges lockedRanges, InProgressSequences inProgressSequences, - Set<InetAddressAndPort> cmsMembers, Map<ExtensionKey<?, ?>, ExtensionValue<?>> extensions) { // TODO: token map is a feature of the specific placement strategy, and so may not be a relevant component of @@ -135,24 +131,25 @@ public class ClusterMetadata this.placements = placements; this.lockedRanges = lockedRanges; this.inProgressSequences = inProgressSequences; - this.cmsMembers = ImmutableSet.copyOf(cmsMembers); this.extensions = ImmutableMap.copyOf(extensions); - this.cmsReplicas = EndpointsForRange.builder(entireRange) - .addAll(cmsMembers.stream() - .map(EntireRange::replica) - .collect(Collectors.toList())) - .build(); + this.fullCMSReplicas = ImmutableSet.copyOf(placements.get(ReplicationParams.meta()).reads.byEndpoint().flattenValues()); + this.fullCMSEndpoints = ImmutableSet.copyOf(placements.get(ReplicationParams.meta()).reads.byEndpoint().keySet()); } - public boolean isCMSMember(InetAddressAndPort endpoint) + public Set<InetAddressAndPort> fullCMSMembers() + { + return fullCMSEndpoints; + } + + public Set<Replica> fullCMSMembersAsReplicas() { - return cmsMembers.contains(endpoint); + return fullCMSReplicas; } - public Set<InetAddressAndPort> cmsMembers() + public boolean isCMSMember(InetAddressAndPort endpoint) { - return cmsMembers; + return fullCMSMembers().contains(endpoint); } public Transformer transformer() @@ -177,7 +174,6 @@ public class ClusterMetadata placements, lockedRanges, inProgressSequences, - cmsMembers, extensions); } @@ -204,7 +200,6 @@ public class ClusterMetadata private DataPlacements placements; private LockedRanges lockedRanges; private InProgressSequences inProgressSequences; - private final Set<InetAddressAndPort> cmsMembers; private final Map<ExtensionKey<?, ?>, ExtensionValue<?>> extensions; private final Set<MetadataKey> modifiedKeys; @@ -221,7 +216,6 @@ public class ClusterMetadata this.placements = metadata.placements; this.lockedRanges = metadata.lockedRanges; this.inProgressSequences = metadata.inProgressSequences; - this.cmsMembers = new HashSet<>(metadata.cmsMembers); extensions = new HashMap<>(metadata.extensions); modifiedKeys = new HashSet<>(); } @@ -311,17 +305,6 @@ public class ClusterMetadata this.inProgressSequences = sequences; return this; } - public Transformer withCMSMember(InetAddressAndPort member) - { - cmsMembers.add(member); - return this; - } - - public Transformer withoutCMSMember(InetAddressAndPort member) - { - cmsMembers.remove(member); - return this; - } public Transformer with(ExtensionKey<?, ?> key, ExtensionValue<?> obj) { @@ -419,7 +402,6 @@ public class ClusterMetadata placements, lockedRanges, inProgressSequences, - cmsMembers, extensions), ImmutableSet.copyOf(modifiedKeys)); } @@ -439,7 +421,6 @@ public class ClusterMetadata ", lockedRanges=" + lockedRanges + ", inProgressSequences=" + inProgressSequences + ", extensions=" + extensions + - ", cmsMembers=" + cmsMembers + ", modifiedKeys=" + modifiedKeys + '}'; } @@ -542,9 +523,6 @@ public class ClusterMetadata assert key.valueType.isInstance(value); value.serialize(out, version); } - out.writeInt(metadata.cmsMembers.size()); - for (InetAddressAndPort member : metadata.cmsMembers) - InetAddressAndPort.MetadataSerializer.serializer.serialize(member, out, version); } @Override @@ -569,10 +547,6 @@ public class ClusterMetadata value.deserialize(in, version); extensions.put(key, value); } - int memberCount = in.readInt(); - Set<InetAddressAndPort> members = new HashSet<>(memberCount); - for (int i = 0; i < memberCount; i++) - members.add(InetAddressAndPort.MetadataSerializer.serializer.deserialize(in, version)); return new ClusterMetadata(epoch, period, lastInPeriod, @@ -583,7 +557,6 @@ public class ClusterMetadata placements, lockedRanges, ips, - members, extensions); } @@ -606,10 +579,6 @@ public class ClusterMetadata LockedRanges.serializer.serializedSize(metadata.lockedRanges, version) + InProgressSequences.serializer.serializedSize(metadata.inProgressSequences, version); - size += TypeSizes.INT_SIZE; - for (InetAddressAndPort member : metadata.cmsMembers) - size += InetAddressAndPort.MetadataSerializer.serializer.serializedSize(member, version); - return size; } } diff --git a/src/java/org/apache/cassandra/tcm/Discovery.java b/src/java/org/apache/cassandra/tcm/Discovery.java index a846a7b74a..58efbbccc6 100644 --- a/src/java/org/apache/cassandra/tcm/Discovery.java +++ b/src/java/org/apache/cassandra/tcm/Discovery.java @@ -122,7 +122,7 @@ public class Discovery @Override public void doVerb(Message<NoPayload> message) { - Set<InetAddressAndPort> cms = ClusterMetadata.current().cmsMembers; + Set<InetAddressAndPort> cms = ClusterMetadata.current().fullCMSMembers(); DiscoveredNodes discoveredNodes; if (!cms.isEmpty()) diff --git a/src/java/org/apache/cassandra/tcm/PaxosBackedProcessor.java b/src/java/org/apache/cassandra/tcm/PaxosBackedProcessor.java index 5eee7bc3dd..d4ccb20425 100644 --- a/src/java/org/apache/cassandra/tcm/PaxosBackedProcessor.java +++ b/src/java/org/apache/cassandra/tcm/PaxosBackedProcessor.java @@ -18,11 +18,9 @@ package org.apache.cassandra.tcm; -import java.util.ArrayList; -import java.util.List; +import java.util.Set; import java.util.concurrent.TimeUnit; -import com.google.common.collect.Iterables; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -68,8 +66,7 @@ public class PaxosBackedProcessor extends AbstractLocalProcessor log.waitForHighestConsecutive(); ClusterMetadata metadata = log.metadata(); - List<Replica> replicas = new ArrayList<>(); - Iterables.addAll(replicas, metadata.cmsReplicas); + Set<Replica> replicas = metadata.fullCMSMembersAsReplicas(); // We can not use Paxos to catch-up a member of CMS ownership group, since that'd reduce availability, // so instead we allow CMS owners to catch up via inconsistent replay. In other words, from local log diff --git a/src/java/org/apache/cassandra/tcm/RemoteProcessor.java b/src/java/org/apache/cassandra/tcm/RemoteProcessor.java index a54a0b873f..fd5b59dd57 100644 --- a/src/java/org/apache/cassandra/tcm/RemoteProcessor.java +++ b/src/java/org/apache/cassandra/tcm/RemoteProcessor.java @@ -91,7 +91,7 @@ public final class RemoteProcessor implements ClusterMetadataService.Processor private List<InetAddressAndPort> candidates(boolean allowDiscovery) { - List<InetAddressAndPort> candidates = new ArrayList<>(log.metadata().cmsMembers); + List<InetAddressAndPort> candidates = new ArrayList<>(log.metadata().fullCMSMembers()); if (candidates.isEmpty()) candidates.addAll(DatabaseDescriptor.getSeeds()); diff --git a/src/java/org/apache/cassandra/tcm/Transformation.java b/src/java/org/apache/cassandra/tcm/Transformation.java index b81a793ee1..d77194dcf7 100644 --- a/src/java/org/apache/cassandra/tcm/Transformation.java +++ b/src/java/org/apache/cassandra/tcm/Transformation.java @@ -45,8 +45,11 @@ import org.apache.cassandra.tcm.transformations.PrepareReplace; import org.apache.cassandra.tcm.transformations.Register; import org.apache.cassandra.tcm.transformations.SealPeriod; import org.apache.cassandra.tcm.transformations.UnsafeJoin; +import org.apache.cassandra.tcm.transformations.cms.FinishAddMember; import org.apache.cassandra.tcm.transformations.cms.Initialize; import org.apache.cassandra.tcm.transformations.cms.PreInitialize; +import org.apache.cassandra.tcm.transformations.cms.RemoveMember; +import org.apache.cassandra.tcm.transformations.cms.StartAddMember; public interface Transformation { @@ -174,6 +177,10 @@ public interface Transformation CANCEL_SEQUENCE(() -> CancelInProgressSequence.serializer), + START_ADD_TO_CMS(() -> StartAddMember.serializer), + FINISH_ADD_TO_CMS(() -> FinishAddMember.serializer), + REMOVE_FROM_CMS(() -> RemoveMember.serializer), + CUSTOM(() -> CustomTransformation.serializer); private final Supplier<AsymmetricMetadataSerializer<Transformation, ? extends Transformation>> serializer; diff --git a/src/java/org/apache/cassandra/tcm/compatibility/GossipHelper.java b/src/java/org/apache/cassandra/tcm/compatibility/GossipHelper.java index 2f4270ace6..1a6fc17c84 100644 --- a/src/java/org/apache/cassandra/tcm/compatibility/GossipHelper.java +++ b/src/java/org/apache/cassandra/tcm/compatibility/GossipHelper.java @@ -303,7 +303,6 @@ public class GossipHelper DataPlacements.empty(), LockedRanges.EMPTY, InProgressSequences.EMPTY, - Collections.emptySet(), Collections.emptyMap()); } @@ -347,7 +346,6 @@ public class GossipHelper DataPlacements.empty(), LockedRanges.EMPTY, InProgressSequences.EMPTY, - Collections.emptySet(), extensions); return new ClusterMetadata(Epoch.UPGRADE_GOSSIP, Period.EMPTY, @@ -359,7 +357,6 @@ public class GossipHelper new UniformRangePlacement().calculatePlacements(forPlacementCalculation, schema.getKeyspaces()), LockedRanges.EMPTY, InProgressSequences.EMPTY, - Collections.emptySet(), extensions); } } diff --git a/src/java/org/apache/cassandra/tcm/log/LocalLog.java b/src/java/org/apache/cassandra/tcm/log/LocalLog.java index 469cc8a0e4..7adf3fd0fb 100644 --- a/src/java/org/apache/cassandra/tcm/log/LocalLog.java +++ b/src/java/org/apache/cassandra/tcm/log/LocalLog.java @@ -95,7 +95,7 @@ public abstract class LocalLog implements Closeable Transformation transform = PreInitialize.withFirstCMS(addr); append(new Entry(Entry.Id.NONE, FIRST, transform)); waitForHighestConsecutive(); - assert metadata().epoch.is(Epoch.FIRST) : ClusterMetadata.current().epoch + " " + ClusterMetadata.current().cmsMembers; + assert metadata().epoch.is(Epoch.FIRST) : ClusterMetadata.current().epoch + " " + ClusterMetadata.current().fullCMSMembers(); } public ClusterMetadata metadata() @@ -557,7 +557,7 @@ public abstract class LocalLog implements Closeable if ((entry.epoch.getEpoch() % DatabaseDescriptor.getMetadataSnapshotFrequency()) == 0) { - List<InetAddressAndPort> list = new ArrayList<>(ClusterMetadata.current().cmsMembers); + List<InetAddressAndPort> list = new ArrayList<>(ClusterMetadata.current().fullCMSMembers()); list.sort(comparing(i -> i.addressBytes[i.addressBytes.length - 1])); if (list.get(0).equals(FBUtilities.getBroadcastAddressAndPort())) ScheduledExecutors.nonPeriodicTasks.submit(() -> ClusterMetadataService.instance().sealPeriod()); diff --git a/src/java/org/apache/cassandra/tcm/sequences/AddToCMS.java b/src/java/org/apache/cassandra/tcm/sequences/AddToCMS.java new file mode 100644 index 0000000000..8026689e5f --- /dev/null +++ b/src/java/org/apache/cassandra/tcm/sequences/AddToCMS.java @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.tcm.sequences; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.Objects; +import java.util.concurrent.ExecutionException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.locator.RangesAtEndpoint; +import org.apache.cassandra.schema.DistributedMetadataLogKeyspace; +import org.apache.cassandra.schema.SchemaConstants; +import org.apache.cassandra.service.ActiveRepairService; +import org.apache.cassandra.streaming.PreviewKind; +import org.apache.cassandra.streaming.StreamOperation; +import org.apache.cassandra.streaming.StreamPlan; +import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.tcm.ClusterMetadataService; +import org.apache.cassandra.tcm.Epoch; +import org.apache.cassandra.tcm.InProgressSequence; +import org.apache.cassandra.tcm.Transformation; +import org.apache.cassandra.tcm.membership.NodeId; +import org.apache.cassandra.tcm.serialization.AsymmetricMetadataSerializer; +import org.apache.cassandra.tcm.serialization.Version; +import org.apache.cassandra.tcm.transformations.cms.FinishAddMember; +import org.apache.cassandra.tcm.transformations.cms.StartAddMember; +import org.apache.cassandra.utils.FBUtilities; + +import static org.apache.cassandra.db.TypeSizes.sizeof; +import static org.apache.cassandra.tcm.sequences.InProgressSequences.Kind.JOIN_OWNERSHIP_GROUP; +import static org.apache.cassandra.tcm.transformations.cms.EntireRange.*; + +public class AddToCMS implements InProgressSequence<AddToCMS> +{ + private static final Logger logger = LoggerFactory.getLogger(AddToCMS.class); + public static Serializer serializer = new Serializer(); + + private final ProgressBarrier barrier; + private final List<InetAddressAndPort> streamCandidates; + private final FinishAddMember finishJoin; + + public static void initiate() + { + NodeId self = ClusterMetadata.current().myNodeId(); + InProgressSequence<?> continuation = ClusterMetadataService.instance() + .commit(new StartAddMember(FBUtilities.getBroadcastAddressAndPort()), + (metadata) -> !metadata.inProgressSequences.contains(self), + (metadata) -> metadata.inProgressSequences.get(self), + (metadata, reason) -> { + throw new IllegalStateException("Can't join ownership group: " + reason); + }); + if (continuation.kind() != JOIN_OWNERSHIP_GROUP) + throw new IllegalStateException(String.format("Following accepted initiation of node to CMS, " + + "an incorrect sequence %s was found in progress. %s ", + continuation.kind(), continuation)); + continuation.executeNext(); + } + + public AddToCMS(ProgressBarrier barrier, List<InetAddressAndPort> streamCandidates, FinishAddMember join) + { + this.barrier = barrier; + this.streamCandidates = streamCandidates; + this.finishJoin = join; + } + + @Override + public ProgressBarrier barrier() + { + return barrier; + } + + public Transformation.Kind nextStep() + { + return finishJoin.kind(); + } + + private void streamRanges() throws ExecutionException, InterruptedException + { + // TODO: iterate over stream candidates + StreamPlan streamPlan = new StreamPlan(StreamOperation.BOOTSTRAP, 1, true, null, PreviewKind.NONE); + streamPlan.requestRanges(streamCandidates.get(0), + SchemaConstants.METADATA_KEYSPACE_NAME, + new RangesAtEndpoint.Builder(FBUtilities.getBroadcastAddressAndPort()).add(finishJoin.replicaForStreaming()).build(), + new RangesAtEndpoint.Builder(FBUtilities.getBroadcastAddressAndPort()).build(), + DistributedMetadataLogKeyspace.TABLE_NAME); + streamPlan.execute().get(); + } + + private void finishJoin() + { + NodeId self = ClusterMetadata.current().myNodeId(); + ClusterMetadataService.instance().commit(finishJoin, + (ClusterMetadata metadata) -> metadata.inProgressSequences.contains(self), + (ClusterMetadata metadata) -> null, + (ClusterMetadata metadata, String error) -> { + throw new IllegalStateException(String.format("Could not finish join due to \"%s\". Next transformation in sequence: %s.", + error, + metadata.inProgressSequences.contains(self) ? metadata.inProgressSequences.get(self).nextStep() : null)); + }); + } + + private void repairPaxosTopology() throws ExecutionException, InterruptedException + { + ActiveRepairService.instance.repairPaxosForTopologyChange(SchemaConstants.METADATA_KEYSPACE_NAME, + Collections.singletonList(entireRange), + "bootstrap") + .get(); + } + + public boolean executeNext() + { + try + { + streamRanges(); + finishJoin(); + repairPaxosTopology(); + } + catch (Throwable t) + { + logger.error("Could not finish adding the node to the metadata ownership group", t); + throw new RuntimeException(t); + } + + return false; + } + + public AddToCMS advance(Epoch waitForWatermark, Transformation.Kind next) + { + throw new NoSuchElementException(); + } + + public InProgressSequences.Kind kind() + { + return JOIN_OWNERSHIP_GROUP; + } + + @Override + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + AddToCMS addToCMS = (AddToCMS) o; + return Objects.equals(barrier, addToCMS.barrier) && + Objects.equals(streamCandidates, addToCMS.streamCandidates) && + Objects.equals(finishJoin, addToCMS.finishJoin); + } + + @Override + public int hashCode() + { + return Objects.hash(barrier, streamCandidates, finishJoin); + } + + public static class Serializer implements AsymmetricMetadataSerializer<InProgressSequence<?>, AddToCMS> + { + @Override + public void serialize(InProgressSequence<?> t, DataOutputPlus out, Version version) throws IOException + { + AddToCMS seq = (AddToCMS) t; + ProgressBarrier.serializer.serialize(t.barrier(), out, version); + FinishAddMember.serializer.serialize(seq.finishJoin, out, version); + out.writeInt(seq.streamCandidates.size()); + for (InetAddressAndPort ep : seq.streamCandidates) + InetAddressAndPort.MetadataSerializer.serializer.serialize(ep, out, version); + } + + @Override + public AddToCMS deserialize(DataInputPlus in, Version version) throws IOException + { + ProgressBarrier barrier = ProgressBarrier.serializer.deserialize(in, version); + FinishAddMember finish = FinishAddMember.serializer.deserialize(in, version); + int streamCandidatesSize = in.readInt(); + List<InetAddressAndPort> streamCandidates = new ArrayList<>(); + + for (int i = 0; i < streamCandidatesSize; i++) + streamCandidates.add(InetAddressAndPort.MetadataSerializer.serializer.deserialize(in, version)); + return new AddToCMS(barrier, streamCandidates, finish); + } + + @Override + public long serializedSize(InProgressSequence<?> t, Version version) + { + AddToCMS seq = (AddToCMS) t; + long size = ProgressBarrier.serializer.serializedSize(t.barrier(), version); + size += FinishAddMember.serializer.serializedSize(seq.finishJoin, version); + size += sizeof(seq.streamCandidates.size()); + for (InetAddressAndPort ep : seq.streamCandidates) + size += InetAddressAndPort.MetadataSerializer.serializer.serializedSize(ep, version); + return size; + } + } +} diff --git a/src/java/org/apache/cassandra/tcm/sequences/InProgressSequences.java b/src/java/org/apache/cassandra/tcm/sequences/InProgressSequences.java index 32e78c5fd5..5fb7656048 100644 --- a/src/java/org/apache/cassandra/tcm/sequences/InProgressSequences.java +++ b/src/java/org/apache/cassandra/tcm/sequences/InProgressSequences.java @@ -137,6 +137,7 @@ public class InProgressSequences implements MetadataValue<InProgressSequences> public enum Kind { + JOIN_OWNERSHIP_GROUP(AddToCMS.serializer), JOIN(BootstrapAndJoin.serializer), MOVE(Move.serializer), REPLACE(BootstrapAndReplace.serializer), diff --git a/src/java/org/apache/cassandra/tcm/transformations/cms/Initialize.java b/src/java/org/apache/cassandra/tcm/transformations/cms/BaseMembershipTransformation.java similarity index 50% copy from src/java/org/apache/cassandra/tcm/transformations/cms/Initialize.java copy to src/java/org/apache/cassandra/tcm/transformations/cms/BaseMembershipTransformation.java index 50e237a509..f8cebf22a1 100644 --- a/src/java/org/apache/cassandra/tcm/transformations/cms/Initialize.java +++ b/src/java/org/apache/cassandra/tcm/transformations/cms/BaseMembershipTransformation.java @@ -22,58 +22,43 @@ import java.io.IOException; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; -import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.locator.Replica; import org.apache.cassandra.tcm.Transformation; import org.apache.cassandra.tcm.serialization.AsymmetricMetadataSerializer; import org.apache.cassandra.tcm.serialization.Version; -import org.apache.cassandra.tcm.transformations.ForceSnapshot; -import static org.apache.cassandra.tcm.transformations.cms.EntireRange.affectedRanges; - -public class Initialize extends ForceSnapshot +public abstract class BaseMembershipTransformation implements Transformation { - public static final AsymmetricMetadataSerializer<Transformation, Initialize> serializer = new AsymmetricMetadataSerializer<Transformation, Initialize>() + protected final InetAddressAndPort endpoint; + protected final Replica replica; + + protected BaseMembershipTransformation(InetAddressAndPort endpoint) + { + this.endpoint = endpoint; + this.replica = EntireRange.replica(endpoint); + } + + public static abstract class SerializerBase<T extends BaseMembershipTransformation> implements AsymmetricMetadataSerializer<Transformation, T> { public void serialize(Transformation t, DataOutputPlus out, Version version) throws IOException { - Initialize initialize = (Initialize) t; - ClusterMetadata.serializer.serialize(initialize.baseState, out, version); + T transformation = (T) t; + InetAddressAndPort.MetadataSerializer.serializer.serialize(transformation.endpoint, out, version); } - public Initialize deserialize(DataInputPlus in, Version version) throws IOException + public T deserialize(DataInputPlus in, Version version) throws IOException { - return new Initialize(ClusterMetadata.serializer.deserialize(in, version)); + InetAddressAndPort addr = InetAddressAndPort.MetadataSerializer.serializer.deserialize(in, version); + return createTransformation(addr); } public long serializedSize(Transformation t, Version version) { - Initialize initialize = (Initialize) t; - return ClusterMetadata.serializer.serializedSize(initialize.baseState, version); + T transformation = (T) t; + return InetAddressAndPort.MetadataSerializer.serializer.serializedSize(transformation.endpoint, version); } - }; - - public Initialize(ClusterMetadata baseState) - { - super(baseState); - } - public Kind kind() - { - return Kind.INITIALIZE_CMS; - } - - public Result execute(ClusterMetadata prev) - { - ClusterMetadata next = baseState; - ClusterMetadata.Transformer transformer = next.transformer(); - return success(transformer, affectedRanges); - } - - @Override - public String toString() - { - return "Initialize{" + - "baseState = " + baseState + - '}'; + public abstract T createTransformation(InetAddressAndPort addr); } } diff --git a/src/java/org/apache/cassandra/tcm/transformations/cms/FinishAddMember.java b/src/java/org/apache/cassandra/tcm/transformations/cms/FinishAddMember.java new file mode 100644 index 0000000000..2034105e09 --- /dev/null +++ b/src/java/org/apache/cassandra/tcm/transformations/cms/FinishAddMember.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.tcm.transformations.cms; + +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.locator.Replica; +import org.apache.cassandra.schema.ReplicationParams; +import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.tcm.InProgressSequence; +import org.apache.cassandra.tcm.Transformation; +import org.apache.cassandra.tcm.membership.NodeId; +import org.apache.cassandra.tcm.ownership.DataPlacement; +import org.apache.cassandra.tcm.sequences.AddToCMS; +import org.apache.cassandra.tcm.sequences.InProgressSequences; +import org.apache.cassandra.tcm.serialization.AsymmetricMetadataSerializer; + +import static org.apache.cassandra.tcm.transformations.cms.EntireRange.affectedRanges; + +public class FinishAddMember extends BaseMembershipTransformation +{ + public static final AsymmetricMetadataSerializer<Transformation, FinishAddMember> serializer = new SerializerBase<FinishAddMember>() + { + public FinishAddMember createTransformation(InetAddressAndPort addr) + { + return new FinishAddMember(addr); + } + }; + + public FinishAddMember(InetAddressAndPort addr) + { + super(addr); + } + + public Kind kind() + { + return Kind.FINISH_ADD_TO_CMS; + } + + public Replica replicaForStreaming() + { + return replica; + } + + public Result execute(ClusterMetadata prev) + { + InProgressSequences sequences = prev.inProgressSequences; + NodeId targetNode = prev.directory.peerId(replica.endpoint()); + InProgressSequence<?> sequence = sequences.get(targetNode); + + if (sequence == null) + return new Rejected("Can't execute finish join as cluster metadata does not hold join sequence for this node"); + + if (!(sequence instanceof AddToCMS)) + return new Rejected("Can't execute finish join as cluster metadata contains a sequence of a different kind"); + + ClusterMetadata.Transformer transformer = prev.transformer(); + DataPlacement.Builder builder = prev.placements.get(ReplicationParams.meta()) + .unbuild() + .withReadReplica(replica); + transformer = transformer.with(prev.placements.unbuild().with(ReplicationParams.meta(), builder.build()).build()) + .with(prev.inProgressSequences.without(targetNode)); + return success(transformer, affectedRanges); + } +} diff --git a/src/java/org/apache/cassandra/tcm/transformations/cms/Initialize.java b/src/java/org/apache/cassandra/tcm/transformations/cms/Initialize.java index 50e237a509..0113d04ef3 100644 --- a/src/java/org/apache/cassandra/tcm/transformations/cms/Initialize.java +++ b/src/java/org/apache/cassandra/tcm/transformations/cms/Initialize.java @@ -19,14 +19,26 @@ package org.apache.cassandra.tcm.transformations.cms; import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import org.apache.cassandra.auth.AuthKeyspace; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.schema.DistributedSchema; +import org.apache.cassandra.schema.Keyspaces; +import org.apache.cassandra.schema.SchemaConstants; +import org.apache.cassandra.schema.SchemaTransformation; +import org.apache.cassandra.schema.SchemaTransformations; +import org.apache.cassandra.schema.SystemDistributedKeyspace; import org.apache.cassandra.tcm.ClusterMetadata; import org.apache.cassandra.tcm.Transformation; import org.apache.cassandra.tcm.serialization.AsymmetricMetadataSerializer; import org.apache.cassandra.tcm.serialization.Version; import org.apache.cassandra.tcm.transformations.ForceSnapshot; +import org.apache.cassandra.tracing.TraceKeyspace; import static org.apache.cassandra.tcm.transformations.cms.EntireRange.affectedRanges; @@ -65,10 +77,54 @@ public class Initialize extends ForceSnapshot public Result execute(ClusterMetadata prev) { ClusterMetadata next = baseState; - ClusterMetadata.Transformer transformer = next.transformer(); + DistributedSchema initialSchema = new DistributedSchema(setUpDistributedSystemKeyspaces(next)); + ClusterMetadata.Transformer transformer = next.transformer().with(initialSchema); return success(transformer, affectedRanges); } + public static final List<SchemaTransformation> schemaTransformations = + Collections.unmodifiableList(Arrays.asList(SchemaTransformations.fromCql(String.format("CREATE KEYSPACE IF NOT EXISTS %s " + + " WITH REPLICATION = { \n" + + " 'class' : 'SimpleStrategy', \n" + + " 'replication_factor' : %d \n" + + " };", + SchemaConstants.TRACE_KEYSPACE_NAME, + Math.max(TraceKeyspace.DEFAULT_RF, DatabaseDescriptor.getDefaultKeyspaceRF()))), + SchemaTransformations.fromCql(String.format(TraceKeyspace.SESSIONS_CQL, SchemaConstants.TRACE_KEYSPACE_NAME + "." + TraceKeyspace.SESSIONS)), + SchemaTransformations.fromCql(String.format(TraceKeyspace.EVENTS_CQL, SchemaConstants.TRACE_KEYSPACE_NAME + "." + TraceKeyspace.EVENTS)), + SchemaTransformations.fromCql(String.format("CREATE KEYSPACE IF NOT EXISTS %s " + + " WITH REPLICATION = { \n" + + " 'class' : 'SimpleStrategy', \n" + + " 'replication_factor' : %d \n" + + " };", + SchemaConstants.DISTRIBUTED_KEYSPACE_NAME, + Math.max(SystemDistributedKeyspace.DEFAULT_RF, DatabaseDescriptor.getDefaultKeyspaceRF()))), + SchemaTransformations.fromCql(String.format(SystemDistributedKeyspace.REPAIR_HISTORY_CQL, SchemaConstants.DISTRIBUTED_KEYSPACE_NAME + "." + SystemDistributedKeyspace.REPAIR_HISTORY)), + SchemaTransformations.fromCql(String.format(SystemDistributedKeyspace.PARENT_REPAIR_HISTORY_CQL, SchemaConstants.DISTRIBUTED_KEYSPACE_NAME + "." + SystemDistributedKeyspace.PARENT_REPAIR_HISTORY)), + SchemaTransformations.fromCql(String.format(SystemDistributedKeyspace.VIEW_BUILD_STATUS_CQL, SchemaConstants.DISTRIBUTED_KEYSPACE_NAME + "." + SystemDistributedKeyspace.VIEW_BUILD_STATUS)), + SchemaTransformations.fromCql(String.format("CREATE KEYSPACE IF NOT EXISTS %s " + + " WITH REPLICATION = { \n" + + " 'class' : 'SimpleStrategy', \n" + + " 'replication_factor' : %d \n" + + " };", + SchemaConstants.AUTH_KEYSPACE_NAME, + Math.max(AuthKeyspace.DEFAULT_RF, DatabaseDescriptor.getDefaultKeyspaceRF()))), + SchemaTransformations.fromCql(String.format(AuthKeyspace.ROLES_CQL, SchemaConstants.AUTH_KEYSPACE_NAME + "." + AuthKeyspace.ROLES)), + SchemaTransformations.fromCql(String.format(AuthKeyspace.ROLE_MEMBERS_CQL, SchemaConstants.AUTH_KEYSPACE_NAME + "." + AuthKeyspace.ROLE_MEMBERS)), + SchemaTransformations.fromCql(String.format(AuthKeyspace.ROLE_PERMISSIONS_CQL, SchemaConstants.AUTH_KEYSPACE_NAME + "." + AuthKeyspace.ROLE_PERMISSIONS)), + SchemaTransformations.fromCql(String.format(AuthKeyspace.RESOURCE_ROLE_INDEX_CQL, SchemaConstants.AUTH_KEYSPACE_NAME + "." + AuthKeyspace.RESOURCE_ROLE_INDEX)), + SchemaTransformations.fromCql(String.format(AuthKeyspace.NETWORK_PERMISSIONS_CQL, SchemaConstants.AUTH_KEYSPACE_NAME + "." + AuthKeyspace.NETWORK_PERMISSIONS)))); + + public Keyspaces setUpDistributedSystemKeyspaces(ClusterMetadata metadata) + { + Keyspaces keyspaces = metadata.schema.getKeyspaces(); + + for (SchemaTransformation transformation : schemaTransformations) + keyspaces = transformation.apply(metadata, keyspaces); + + return keyspaces; + } + @Override public String toString() { diff --git a/src/java/org/apache/cassandra/tcm/transformations/cms/PreInitialize.java b/src/java/org/apache/cassandra/tcm/transformations/cms/PreInitialize.java index 08f00effdf..41bd7ca4b6 100644 --- a/src/java/org/apache/cassandra/tcm/transformations/cms/PreInitialize.java +++ b/src/java/org/apache/cassandra/tcm/transformations/cms/PreInitialize.java @@ -20,16 +20,22 @@ package org.apache.cassandra.tcm.transformations.cms; import java.io.IOException; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.locator.InetAddressAndPort; -import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.locator.Replica; import org.apache.cassandra.tcm.Epoch; +import org.apache.cassandra.tcm.Period; import org.apache.cassandra.tcm.Transformation; -import org.apache.cassandra.tcm.sequences.LockedRanges; import org.apache.cassandra.tcm.serialization.AsymmetricMetadataSerializer; import org.apache.cassandra.tcm.serialization.Version; +import org.apache.cassandra.schema.ReplicationParams; +import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.tcm.ownership.DataPlacement; +import org.apache.cassandra.tcm.ownership.DataPlacements; +import org.apache.cassandra.tcm.sequences.LockedRanges; public class PreInitialize implements Transformation { @@ -57,7 +63,6 @@ public class PreInitialize implements Transformation return new PreInitialize(addr); } - public Kind kind() { return Kind.PRE_INITIALIZE_CMS; @@ -66,14 +71,26 @@ public class PreInitialize implements Transformation public Result execute(ClusterMetadata metadata) { assert metadata.epoch.isBefore(Epoch.FIRST); + assert metadata.period == Period.EMPTY; - ClusterMetadata.Transformer transformer = metadata.transformer(); + ClusterMetadata.Transformer transformer = metadata.transformer(false); if (addr != null) - transformer = transformer.withCMSMember(addr); - + { + DataPlacement.Builder dataPlacementBuilder = DataPlacement.builder(); + Replica replica = new Replica(addr, + DatabaseDescriptor.getPartitioner().getMinimumToken(), + DatabaseDescriptor.getPartitioner().getMinimumToken(), + true); + dataPlacementBuilder.reads.withReplica(replica); + dataPlacementBuilder.writes.withReplica(replica); + DataPlacements initialPlacement = metadata.placements.unbuild().with(ReplicationParams.meta(), dataPlacementBuilder.build()).build(); + + transformer.with(initialPlacement); + } ClusterMetadata.Transformer.Transformed transformed = transformer.build(); metadata = transformed.metadata; assert metadata.epoch.is(Epoch.FIRST) : metadata.epoch; + assert metadata.period == Period.FIRST : metadata.period; return new Success(metadata, LockedRanges.AffectedRanges.EMPTY, transformed.modifiedKeys); } diff --git a/src/java/org/apache/cassandra/tcm/transformations/cms/RemoveMember.java b/src/java/org/apache/cassandra/tcm/transformations/cms/RemoveMember.java new file mode 100644 index 0000000000..efc29bd53c --- /dev/null +++ b/src/java/org/apache/cassandra/tcm/transformations/cms/RemoveMember.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.tcm.transformations.cms; + +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.schema.ReplicationParams; +import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.tcm.Transformation; +import org.apache.cassandra.tcm.ownership.DataPlacement; +import org.apache.cassandra.tcm.serialization.AsymmetricMetadataSerializer; + +import static org.apache.cassandra.tcm.transformations.cms.EntireRange.affectedRanges; + +public class RemoveMember extends BaseMembershipTransformation +{ + public static final AsymmetricMetadataSerializer<Transformation, RemoveMember> serializer = new SerializerBase<RemoveMember>() + { + public RemoveMember createTransformation(InetAddressAndPort addr) + { + return new RemoveMember(addr); + } + }; + + public RemoveMember(InetAddressAndPort addr) + { + super(addr); + } + + public Kind kind() + { + return Kind.REMOVE_FROM_CMS; + } + + public Result execute(ClusterMetadata prev) + { + ClusterMetadata.Transformer transformer = prev.transformer(); + DataPlacement.Builder builder = prev.placements.get(ReplicationParams.meta()).unbuild(); + builder.reads.withoutReplica(replica); + builder.writes.withoutReplica(replica); + return success(transformer.with(prev.placements.unbuild().with(ReplicationParams.meta(), builder.build()).build()), + affectedRanges); + } +} diff --git a/src/java/org/apache/cassandra/tcm/transformations/cms/StartAddMember.java b/src/java/org/apache/cassandra/tcm/transformations/cms/StartAddMember.java new file mode 100644 index 0000000000..4dcf2e52a5 --- /dev/null +++ b/src/java/org/apache/cassandra/tcm/transformations/cms/StartAddMember.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.tcm.transformations.cms; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.locator.RangesByEndpoint; +import org.apache.cassandra.locator.Replica; +import org.apache.cassandra.schema.ReplicationParams; +import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.tcm.Transformation; +import org.apache.cassandra.tcm.ownership.DataPlacement; +import org.apache.cassandra.tcm.sequences.AddToCMS; +import org.apache.cassandra.tcm.sequences.ProgressBarrier; +import org.apache.cassandra.tcm.serialization.AsymmetricMetadataSerializer; + +import static org.apache.cassandra.tcm.transformations.cms.EntireRange.affectedRanges; + +public class StartAddMember extends BaseMembershipTransformation +{ + public static final AsymmetricMetadataSerializer<Transformation, StartAddMember> serializer = new SerializerBase<StartAddMember>() + { + public StartAddMember createTransformation(InetAddressAndPort addr) + { + return new StartAddMember(addr); + } + }; + + public StartAddMember(InetAddressAndPort addr) + { + super(addr); + } + + public Kind kind() + { + return Kind.START_ADD_TO_CMS; + } + + public Result execute(ClusterMetadata prev) + { + RangesByEndpoint readReplicas = prev.placements.get(ReplicationParams.meta()).reads.byEndpoint(); + RangesByEndpoint writeReplicas = prev.placements.get(ReplicationParams.meta()).writes.byEndpoint(); + + if (readReplicas.containsKey(endpoint) || writeReplicas.containsKey(endpoint)) + return new Rejected("Endpoint is already a member of the Cluster Metadata ownership set"); + + ClusterMetadata.Transformer transformer = prev.transformer(); + DataPlacement.Builder builder = prev.placements.get(ReplicationParams.meta()).unbuild() + .withWriteReplica(replica); + + transformer.with(prev.placements.unbuild().with(ReplicationParams.meta(), builder.build()).build()); + + List<InetAddressAndPort> streamCandidates = new ArrayList<>(); + for (Replica replica : prev.placements.get(ReplicationParams.meta()).reads.byEndpoint().flattenValues()) + { + if (!this.replica.equals(replica)) + streamCandidates.add(replica.endpoint()); + } + + ProgressBarrier barrier = new ProgressBarrier(prev.nextEpoch(), affectedRanges.toPeers(prev.placements, prev.directory), false); + AddToCMS joinSequence = new AddToCMS(barrier, streamCandidates, new FinishAddMember(endpoint)); + + return success(transformer.with(prev.inProgressSequences.with(prev.directory.peerId(replica.endpoint()), joinSequence)), + affectedRanges); + } +} diff --git a/src/java/org/apache/cassandra/tracing/TraceKeyspace.java b/src/java/org/apache/cassandra/tracing/TraceKeyspace.java index 8101b6721f..e64826e653 100644 --- a/src/java/org/apache/cassandra/tracing/TraceKeyspace.java +++ b/src/java/org/apache/cassandra/tracing/TraceKeyspace.java @@ -27,7 +27,6 @@ import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.Mutation; import org.apache.cassandra.db.partitions.PartitionUpdate; import org.apache.cassandra.db.rows.Row; -import org.apache.cassandra.gms.Gossiper; import org.apache.cassandra.schema.KeyspaceMetadata; import org.apache.cassandra.schema.KeyspaceParams; import org.apache.cassandra.schema.SchemaConstants; @@ -45,7 +44,7 @@ public final class TraceKeyspace { } - private static final int DEFAULT_RF = CassandraRelevantProperties.SYSTEM_TRACES_DEFAULT_RF.getInt(); + public static final int DEFAULT_RF = CassandraRelevantProperties.SYSTEM_TRACES_DEFAULT_RF.getInt(); /** * Generation is used as a timestamp for automatic table creation on startup. @@ -67,33 +66,31 @@ public final class TraceKeyspace public static final String SESSIONS = "sessions"; public static final String EVENTS = "events"; + public static final String SESSIONS_CQL = "CREATE TABLE IF NOT EXISTS %s (" + + "session_id uuid," + + "command text," + + "client inet," + + "coordinator inet," + + "coordinator_port int," + + "duration int," + + "parameters map<text, text>," + + "request text," + + "started_at timestamp," + + "PRIMARY KEY ((session_id)))"; private static final TableMetadata Sessions = - parse(SESSIONS, - "tracing sessions", - "CREATE TABLE %s (" - + "session_id uuid," - + "command text," - + "client inet," - + "coordinator inet," - + "coordinator_port int," - + "duration int," - + "parameters map<text, text>," - + "request text," - + "started_at timestamp," - + "PRIMARY KEY ((session_id)))"); - + parse(SESSIONS, "tracing sessions", SESSIONS_CQL); + + public static final String EVENTS_CQL = "CREATE TABLE IF NOT EXISTS %s (" + + "session_id uuid," + + "event_id timeuuid," + + "activity text," + + "source inet," + + "source_port int," + + "source_elapsed int," + + "thread text," + + "PRIMARY KEY ((session_id), event_id))"; private static final TableMetadata Events = - parse(EVENTS, - "tracing events", - "CREATE TABLE %s (" - + "session_id uuid," - + "event_id timeuuid," - + "activity text," - + "source inet," - + "source_port int," - + "source_elapsed int," - + "thread text," - + "PRIMARY KEY ((session_id), event_id))"); + parse(EVENTS, "tracing events", EVENTS_CQL); private static TableMetadata parse(String table, String description, String cql) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org