This is an automated email from the ASF dual-hosted git repository.

samt pushed a commit to branch cep-21-tcm
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit d4d35a7d790df44fc151c52f405fc10484f84930
Author: Sam Tunnicliffe <s...@apache.org>
AuthorDate: Thu Mar 2 14:42:04 2023 +0000

    [CEP-21] Replace fake initial implementation of CMS membership
    
    WIP commit (i.e. does not compile) replacing initial toy implementation
    of CMS membership with proper implementation. Membership of the CMS is
    determined by ownership of keyspaces with the META replication strategy
    (more precisely, by being a member of the _read_ placements for meta
    strategy keyspaces, a node is considered a member of the CMS).
    
    Also implements more of the "real" [pre]initialization of the CMS, in
    preparation for supporting upgrading a running cluster from a gossip
    based system.
    
    Co-authored-by: Marcus Eriksson <marc...@apache.org>
    Co-authored-by: Alex Petrov <oleksandr.pet...@gmail.com>
    Co-authored-by: Sam Tunnicliffe <s...@apache.org>
---
 .../org/apache/cassandra/auth/AuthKeyspace.java    |  55 +++---
 src/java/org/apache/cassandra/config/Config.java   |   2 +-
 src/java/org/apache/cassandra/gms/Gossiper.java    |   6 +-
 .../org/apache/cassandra/locator/MetaStrategy.java |   4 +-
 .../schema/SystemDistributedKeyspace.java          | 114 +++++------
 .../org/apache/cassandra/tcm/ClusterMetadata.java  |  63 ++----
 src/java/org/apache/cassandra/tcm/Discovery.java   |   2 +-
 .../apache/cassandra/tcm/PaxosBackedProcessor.java |   7 +-
 .../org/apache/cassandra/tcm/RemoteProcessor.java  |   2 +-
 .../org/apache/cassandra/tcm/Transformation.java   |   7 +
 .../cassandra/tcm/compatibility/GossipHelper.java  |   3 -
 .../org/apache/cassandra/tcm/log/LocalLog.java     |   4 +-
 .../apache/cassandra/tcm/sequences/AddToCMS.java   | 217 +++++++++++++++++++++
 .../tcm/sequences/InProgressSequences.java         |   1 +
 ...lize.java => BaseMembershipTransformation.java} |  57 ++----
 .../tcm/transformations/cms/FinishAddMember.java   |  80 ++++++++
 .../tcm/transformations/cms/Initialize.java        |  58 +++++-
 .../tcm/transformations/cms/PreInitialize.java     |  29 ++-
 .../tcm/transformations/cms/RemoveMember.java      |  59 ++++++
 .../tcm/transformations/cms/StartAddMember.java    |  84 ++++++++
 .../apache/cassandra/tracing/TraceKeyspace.java    |  51 +++--
 21 files changed, 690 insertions(+), 215 deletions(-)

diff --git a/src/java/org/apache/cassandra/auth/AuthKeyspace.java 
b/src/java/org/apache/cassandra/auth/AuthKeyspace.java
index 67fc9c1a06..12760706fc 100644
--- a/src/java/org/apache/cassandra/auth/AuthKeyspace.java
+++ b/src/java/org/apache/cassandra/auth/AuthKeyspace.java
@@ -37,7 +37,7 @@ public final class AuthKeyspace
     {
     }
 
-    private static final int DEFAULT_RF = 
CassandraRelevantProperties.SYSTEM_AUTH_DEFAULT_RF.getInt();
+    public static final int DEFAULT_RF = 
CassandraRelevantProperties.SYSTEM_AUTH_DEFAULT_RF.getInt();
 
     /**
      * Generation is used as a timestamp for automatic table creation on 
startup.
@@ -57,49 +57,54 @@ public final class AuthKeyspace
 
     public static final long SUPERUSER_SETUP_DELAY = 
Long.getLong("cassandra.superuser_setup_delay_ms", 10000);
 
+    public static String ROLES_CQL = "CREATE TABLE IF NOT EXISTS %s ("
+                                     + "role text,"
+                                     + "is_superuser boolean,"
+                                     + "can_login boolean,"
+                                     + "salted_hash text,"
+                                     + "member_of set<text>,"
+                                     + "PRIMARY KEY(role))";
     private static final TableMetadata Roles =
         parse(ROLES,
               "role definitions",
-              "CREATE TABLE %s ("
-              + "role text,"
-              + "is_superuser boolean,"
-              + "can_login boolean,"
-              + "salted_hash text,"
-              + "member_of set<text>,"
-              + "PRIMARY KEY(role))");
+              ROLES_CQL);
 
+    public static String ROLE_MEMBERS_CQL = "CREATE TABLE IF NOT EXISTS %s ("
+                                            + "role text,"
+                                            + "member text,"
+                                            + "PRIMARY KEY(role, member))";
     private static final TableMetadata RoleMembers =
         parse(ROLE_MEMBERS,
               "role memberships lookup table",
-              "CREATE TABLE %s ("
-              + "role text,"
-              + "member text,"
-              + "PRIMARY KEY(role, member))");
+              ROLE_MEMBERS_CQL);
 
+    public static String ROLE_PERMISSIONS_CQL = "CREATE TABLE IF NOT EXISTS %s 
("
+                                                + "role text,"
+                                                + "resource text,"
+                                                + "permissions set<text>,"
+                                                + "PRIMARY KEY(role, 
resource))";
     private static final TableMetadata RolePermissions =
         parse(ROLE_PERMISSIONS,
               "permissions granted to db roles",
-              "CREATE TABLE %s ("
-              + "role text,"
-              + "resource text,"
-              + "permissions set<text>,"
-              + "PRIMARY KEY(role, resource))");
+              ROLE_PERMISSIONS_CQL);
 
+    public static String RESOURCE_ROLE_INDEX_CQL = "CREATE TABLE IF NOT EXISTS 
%s ("
+                                               + "resource text,"
+                                               + "role text,"
+                                               + "PRIMARY KEY(resource, 
role))";
     private static final TableMetadata ResourceRoleIndex =
         parse(RESOURCE_ROLE_INDEX,
               "index of db roles with permissions granted on a resource",
-              "CREATE TABLE %s ("
-              + "resource text,"
-              + "role text,"
-              + "PRIMARY KEY(resource, role))");
+              RESOURCE_ROLE_INDEX_CQL);
 
+    public static String NETWORK_PERMISSIONS_CQL = "CREATE TABLE IF NOT EXISTS 
%s ("
+                                                   + "role text, "
+                                                   + "dcs frozen<set<text>>, "
+                                                   + "PRIMARY KEY(role))";
     private static final TableMetadata NetworkPermissions =
         parse(NETWORK_PERMISSIONS,
               "user network permissions",
-              "CREATE TABLE %s ("
-              + "role text, "
-              + "dcs frozen<set<text>>, "
-              + "PRIMARY KEY(role))");
+              NETWORK_PERMISSIONS_CQL);
 
     private static TableMetadata parse(String name, String description, String 
cql)
     {
diff --git a/src/java/org/apache/cassandra/config/Config.java 
b/src/java/org/apache/cassandra/config/Config.java
index 5a37eb1600..15eb49a2e6 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -987,7 +987,7 @@ public class Config
     /**
      * See {@link PaxosVariant}. Defaults to v1, recommend upgrading to v2 at 
earliest opportunity.
      */
-    public volatile PaxosVariant paxos_variant = PaxosVariant.v1;
+    public volatile PaxosVariant paxos_variant = PaxosVariant.v2; // TODO: 
only use v2 for CMS operations
 
     /**
      * If true, paxos topology change repair will not run on a topology change 
- this option should only be used in
diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java 
b/src/java/org/apache/cassandra/gms/Gossiper.java
index 86ccee939e..afd7c751c9 100644
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@ -800,7 +800,7 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
 
         if (seeds.contains(to))
             gossipedWith.add(SEED);
-        if (ClusterMetadata.current().cmsMembers().contains(to))
+        if (ClusterMetadata.current().fullCMSMembers().contains(to))
             gossipedWith.add(CMS);
         GossiperDiagnostics.sendGossipDigestSyn(this, to);
         return gossipedWith;
@@ -869,7 +869,7 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
 
     private void maybeGossipToCMS(Message<GossipDigestSyn> message)
     {
-        Set<InetAddressAndPort> cms = ClusterMetadata.current().cmsMembers();
+        Set<InetAddressAndPort> cms = 
ClusterMetadata.current().fullCMSMembers();
         if (cms.contains(getBroadcastAddressAndPort()))
             return;
 
@@ -2212,7 +2212,7 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
     public void triggerRoundWithCMS()
     {
         ClusterMetadata metadata = ClusterMetadata.current();
-        Set<InetAddressAndPort> cms = metadata.cmsMembers();
+        Set<InetAddressAndPort> cms = metadata.fullCMSMembers();
         if (!cms.contains(getBroadcastAddressAndPort()))
         {
             logger.debug("Triggering gossip round with CMS {}", 
metadata.epoch);
diff --git a/src/java/org/apache/cassandra/locator/MetaStrategy.java 
b/src/java/org/apache/cassandra/locator/MetaStrategy.java
index 295e06f1a9..28272088da 100644
--- a/src/java/org/apache/cassandra/locator/MetaStrategy.java
+++ b/src/java/org/apache/cassandra/locator/MetaStrategy.java
@@ -51,7 +51,7 @@ public class MetaStrategy extends SystemStrategy
 
     private static EndpointsForRange replicas()
     {
-        Set<InetAddressAndPort> members = 
ClusterMetadata.current().cmsMembers();
+        Set<InetAddressAndPort> members = 
ClusterMetadata.current().fullCMSMembers();
         return EndpointsForRange.builder(EntireRange.entireRange, 
members.size())
                                 
.addAll(members.stream().map(EntireRange::replica).collect(Collectors.toList()))
                                 .build();
@@ -59,7 +59,7 @@ public class MetaStrategy extends SystemStrategy
     @Override
     public ReplicationFactor getReplicationFactor()
     {
-        int rf = ClusterMetadata.current().cmsMembers.size();
+        int rf = ClusterMetadata.current().fullCMSMembers().size();
         return ReplicationFactor.fullOnly(rf);
     }
 
diff --git 
a/src/java/org/apache/cassandra/schema/SystemDistributedKeyspace.java 
b/src/java/org/apache/cassandra/schema/SystemDistributedKeyspace.java
index 563dedf935..4c978f0af5 100644
--- a/src/java/org/apache/cassandra/schema/SystemDistributedKeyspace.java
+++ b/src/java/org/apache/cassandra/schema/SystemDistributedKeyspace.java
@@ -48,7 +48,6 @@ import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.repair.CommonRange;
 import org.apache.cassandra.repair.messages.RepairOption;
@@ -67,7 +66,7 @@ public final class SystemDistributedKeyspace
 
     public static final String NAME = "system_distributed";
 
-    private static final int DEFAULT_RF = 
CassandraRelevantProperties.SYSTEM_DISTRIBUTED_DEFAULT_RF.getInt();
+    public static final int DEFAULT_RF = 
CassandraRelevantProperties.SYSTEM_DISTRIBUTED_DEFAULT_RF.getInt();
     private static final Logger logger = 
LoggerFactory.getLogger(SystemDistributedKeyspace.class);
 
     /**
@@ -82,6 +81,8 @@ public final class SystemDistributedKeyspace
      * gen 4: compression chunk length reduced to 16KiB, 
memtable_flush_period_in_ms now unset on all tables in 4.0
      * gen 5: add ttl and TWCS to repair_history tables
      * gen 6: add denylist table
+     *
+     * // TODO: TCM - how do we evolve these tables?
      */
     public static final long GENERATION = 6;
 
@@ -93,65 +94,65 @@ public final class SystemDistributedKeyspace
 
     public static final String PARTITION_DENYLIST_TABLE = "partition_denylist";
 
+    public static final String REPAIR_HISTORY_CQL = "CREATE TABLE IF NOT 
EXISTS %s ("
+                                                     + "keyspace_name text,"
+                                                     + "columnfamily_name 
text,"
+                                                     + "id timeuuid,"
+                                                     + "parent_id timeuuid,"
+                                                     + "range_begin text,"
+                                                     + "range_end text,"
+                                                     + "coordinator inet,"
+                                                     + "coordinator_port int,"
+                                                     + "participants 
set<inet>,"
+                                                     + "participants_v2 
set<text>,"
+                                                     + "exception_message 
text,"
+                                                     + "exception_stacktrace 
text,"
+                                                     + "status text,"
+                                                     + "started_at timestamp,"
+                                                     + "finished_at timestamp,"
+                                                     + "PRIMARY KEY 
((keyspace_name, columnfamily_name), id))";
+
     private static final TableMetadata RepairHistory =
-        parse(REPAIR_HISTORY,
-                "Repair history",
-                "CREATE TABLE %s ("
-                     + "keyspace_name text,"
-                     + "columnfamily_name text,"
-                     + "id timeuuid,"
-                     + "parent_id timeuuid,"
-                     + "range_begin text,"
-                     + "range_end text,"
-                     + "coordinator inet,"
-                     + "coordinator_port int,"
-                     + "participants set<inet>,"
-                     + "participants_v2 set<text>,"
-                     + "exception_message text,"
-                     + "exception_stacktrace text,"
-                     + "status text,"
-                     + "started_at timestamp,"
-                     + "finished_at timestamp,"
-                     + "PRIMARY KEY ((keyspace_name, columnfamily_name), id))")
+        parse(REPAIR_HISTORY, "Repair history", REPAIR_HISTORY_CQL)
         .defaultTimeToLive((int) TimeUnit.DAYS.toSeconds(30))
         
.compaction(CompactionParams.twcs(ImmutableMap.of("compaction_window_unit","DAYS",
                                                           
"compaction_window_size","1")))
         .build();
 
+    public static final String PARENT_REPAIR_HISTORY_CQL = "CREATE TABLE IF 
NOT EXISTS %s ("
+                                                            + "parent_id 
timeuuid,"
+                                                            + "keyspace_name 
text,"
+                                                            + 
"columnfamily_names set<text>,"
+                                                            + "started_at 
timestamp,"
+                                                            + "finished_at 
timestamp,"
+                                                            + 
"exception_message text,"
+                                                            + 
"exception_stacktrace text,"
+                                                            + 
"requested_ranges set<text>,"
+                                                            + 
"successful_ranges set<text>,"
+                                                            + "options 
map<text, text>,"
+                                                            + "PRIMARY KEY 
(parent_id))";
     private static final TableMetadata ParentRepairHistory =
-        parse(PARENT_REPAIR_HISTORY,
-                "Repair history",
-                "CREATE TABLE %s ("
-                     + "parent_id timeuuid,"
-                     + "keyspace_name text,"
-                     + "columnfamily_names set<text>,"
-                     + "started_at timestamp,"
-                     + "finished_at timestamp,"
-                     + "exception_message text,"
-                     + "exception_stacktrace text,"
-                     + "requested_ranges set<text>,"
-                     + "successful_ranges set<text>,"
-                     + "options map<text, text>,"
-                     + "PRIMARY KEY (parent_id))")
+        parse(PARENT_REPAIR_HISTORY, "Repair history", 
PARENT_REPAIR_HISTORY_CQL)
         .defaultTimeToLive((int) TimeUnit.DAYS.toSeconds(30))
         
.compaction(CompactionParams.twcs(ImmutableMap.of("compaction_window_unit","DAYS",
                                                           
"compaction_window_size","1")))
         .build();
 
+    public static final String VIEW_BUILD_STATUS_CQL = "CREATE TABLE IF NOT 
EXISTS %s ("
+                                                       + "keyspace_name text,"
+                                                       + "view_name text,"
+                                                       + "host_id uuid,"
+                                                       + "status text,"
+                                                       + "PRIMARY KEY 
((keyspace_name, view_name), host_id))";
     private static final TableMetadata ViewBuildStatus =
         parse(VIEW_BUILD_STATUS,
             "Materialized View build status",
-            "CREATE TABLE %s ("
-                     + "keyspace_name text,"
-                     + "view_name text,"
-                     + "host_id uuid,"
-                     + "status text,"
-                     + "PRIMARY KEY ((keyspace_name, view_name), 
host_id))").build();
+              VIEW_BUILD_STATUS_CQL).build();
 
     public static final TableMetadata PartitionDenylistTable =
     parse(PARTITION_DENYLIST_TABLE,
           "Partition keys which have been denied access",
-          "CREATE TABLE %s ("
+          "CREATE TABLE IF NOT EXISTS %s ("
           + "ks_name text,"
           + "table_name text,"
           + "key blob,"
@@ -167,7 +168,9 @@ public final class SystemDistributedKeyspace
 
     public static KeyspaceMetadata metadata()
     {
-        return 
KeyspaceMetadata.create(SchemaConstants.DISTRIBUTED_KEYSPACE_NAME, 
KeyspaceParams.simple(Math.max(DEFAULT_RF, 
DatabaseDescriptor.getDefaultKeyspaceRF())), Tables.of(RepairHistory, 
ParentRepairHistory, ViewBuildStatus, PartitionDenylistTable));
+        return 
KeyspaceMetadata.create(SchemaConstants.DISTRIBUTED_KEYSPACE_NAME,
+                                       
KeyspaceParams.simple(Math.max(DEFAULT_RF, 
DatabaseDescriptor.getDefaultKeyspaceRF())),
+                                       Tables.of(RepairHistory, 
ParentRepairHistory, ViewBuildStatus, PartitionDenylistTable));
     }
 
     public static void startParentRepair(TimeUUID parent_id, String 
keyspaceName, String[] cfnames, RepairOption options)
@@ -246,18 +249,19 @@ public final class SystemDistributedKeyspace
         {
             for (Range<Token> range : commonRange.ranges)
             {
-                String fmtQry = format(query, 
SchemaConstants.DISTRIBUTED_KEYSPACE_NAME, REPAIR_HISTORY,
-                                       keyspaceName,
-                                       cfname,
-                                       id.toString(),
-                                       parent_id.toString(),
-                                       range.left.toString(),
-                                       range.right.toString(),
-                                       coordinator.getHostAddress(false),
-                                       coordinator.getPort(),
-                                       Joiner.on("', '").join(participants),
-                                       Joiner.on("', '").join(participants_v2),
-                                       RepairState.STARTED.toString());
+                String fmtQry;
+                fmtQry = format(query, 
SchemaConstants.DISTRIBUTED_KEYSPACE_NAME, REPAIR_HISTORY,
+                                keyspaceName,
+                                cfname,
+                                id.toString(),
+                                parent_id.toString(),
+                                range.left.toString(),
+                                range.right.toString(),
+                                coordinator.getHostAddress(false),
+                                coordinator.getPort(),
+                                Joiner.on("', '").join(participants),
+                                Joiner.on("', '").join(participants_v2),
+                                RepairState.STARTED.toString());
                 processSilent(fmtQry);
             }
         }
diff --git a/src/java/org/apache/cassandra/tcm/ClusterMetadata.java 
b/src/java/org/apache/cassandra/tcm/ClusterMetadata.java
index a89f3c6dd9..3708cadfd1 100644
--- a/src/java/org/apache/cassandra/tcm/ClusterMetadata.java
+++ b/src/java/org/apache/cassandra/tcm/ClusterMetadata.java
@@ -25,7 +25,6 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
-import java.util.stream.Collectors;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableMap;
@@ -37,9 +36,10 @@ import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.locator.EndpointsForRange;
 import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.Replica;
 import org.apache.cassandra.schema.DistributedSchema;
+import org.apache.cassandra.schema.ReplicationParams;
 import org.apache.cassandra.tcm.extensions.ExtensionKey;
 import org.apache.cassandra.tcm.extensions.ExtensionValue;
 import org.apache.cassandra.tcm.membership.Directory;
@@ -55,12 +55,10 @@ import 
org.apache.cassandra.tcm.sequences.InProgressSequences;
 import org.apache.cassandra.tcm.sequences.LockedRanges;
 import org.apache.cassandra.tcm.serialization.MetadataSerializer;
 import org.apache.cassandra.tcm.serialization.Version;
-import org.apache.cassandra.tcm.transformations.cms.EntireRange;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.vint.VIntCoding;
 
 import static org.apache.cassandra.db.TypeSizes.sizeof;
-import static 
org.apache.cassandra.tcm.transformations.cms.EntireRange.entireRange;
 
 public class ClusterMetadata
 {
@@ -70,7 +68,6 @@ public class ClusterMetadata
     public final long period;
     public final boolean lastInPeriod;
     public final IPartitioner partitioner;       // Set during (initial) 
construction and not modifiable via Transformer
-    public final ImmutableMap<ExtensionKey<?,?>, ExtensionValue<?>> extensions;
 
     public final DistributedSchema schema;
     public final Directory directory;
@@ -78,8 +75,9 @@ public class ClusterMetadata
     public final DataPlacements placements;
     public final LockedRanges lockedRanges;
     public final InProgressSequences inProgressSequences;
-    public final EndpointsForRange cmsReplicas;
-    public final ImmutableSet<InetAddressAndPort> cmsMembers;
+    public final ImmutableMap<ExtensionKey<?,?>, ExtensionValue<?>> extensions;
+    private final Set<Replica> fullCMSReplicas;
+    private final Set<InetAddressAndPort> fullCMSEndpoints;
 
     public ClusterMetadata(IPartitioner partitioner)
     {
@@ -104,7 +102,6 @@ public class ClusterMetadata
              DataPlacements.EMPTY,
              LockedRanges.EMPTY,
              InProgressSequences.EMPTY,
-             ImmutableSet.of(),
              ImmutableMap.of());
     }
 
@@ -118,7 +115,6 @@ public class ClusterMetadata
                            DataPlacements placements,
                            LockedRanges lockedRanges,
                            InProgressSequences inProgressSequences,
-                           Set<InetAddressAndPort> cmsMembers,
                            Map<ExtensionKey<?, ?>, ExtensionValue<?>> 
extensions)
     {
         // TODO: token map is a feature of the specific placement strategy, 
and so may not be a relevant component of
@@ -135,24 +131,25 @@ public class ClusterMetadata
         this.placements = placements;
         this.lockedRanges = lockedRanges;
         this.inProgressSequences = inProgressSequences;
-        this.cmsMembers = ImmutableSet.copyOf(cmsMembers);
         this.extensions = ImmutableMap.copyOf(extensions);
 
-        this.cmsReplicas = EndpointsForRange.builder(entireRange)
-                                            .addAll(cmsMembers.stream()
-                                                              
.map(EntireRange::replica)
-                                                              
.collect(Collectors.toList()))
-                                            .build();
+        this.fullCMSReplicas = 
ImmutableSet.copyOf(placements.get(ReplicationParams.meta()).reads.byEndpoint().flattenValues());
+        this.fullCMSEndpoints = 
ImmutableSet.copyOf(placements.get(ReplicationParams.meta()).reads.byEndpoint().keySet());
     }
 
-    public boolean isCMSMember(InetAddressAndPort endpoint)
+    public Set<InetAddressAndPort> fullCMSMembers()
+    {
+        return fullCMSEndpoints;
+    }
+
+    public Set<Replica> fullCMSMembersAsReplicas()
     {
-        return cmsMembers.contains(endpoint);
+        return fullCMSReplicas;
     }
 
-    public Set<InetAddressAndPort> cmsMembers()
+    public boolean isCMSMember(InetAddressAndPort endpoint)
     {
-        return cmsMembers;
+        return fullCMSMembers().contains(endpoint);
     }
 
     public Transformer transformer()
@@ -177,7 +174,6 @@ public class ClusterMetadata
                                    placements,
                                    lockedRanges,
                                    inProgressSequences,
-                                   cmsMembers,
                                    extensions);
     }
 
@@ -204,7 +200,6 @@ public class ClusterMetadata
         private DataPlacements placements;
         private LockedRanges lockedRanges;
         private InProgressSequences inProgressSequences;
-        private final Set<InetAddressAndPort> cmsMembers;
         private final Map<ExtensionKey<?, ?>, ExtensionValue<?>> extensions;
         private final Set<MetadataKey> modifiedKeys;
 
@@ -221,7 +216,6 @@ public class ClusterMetadata
             this.placements = metadata.placements;
             this.lockedRanges = metadata.lockedRanges;
             this.inProgressSequences = metadata.inProgressSequences;
-            this.cmsMembers = new HashSet<>(metadata.cmsMembers);
             extensions = new HashMap<>(metadata.extensions);
             modifiedKeys = new HashSet<>();
         }
@@ -311,17 +305,6 @@ public class ClusterMetadata
             this.inProgressSequences = sequences;
             return this;
         }
-        public Transformer withCMSMember(InetAddressAndPort member)
-        {
-            cmsMembers.add(member);
-            return this;
-        }
-
-        public Transformer withoutCMSMember(InetAddressAndPort member)
-        {
-            cmsMembers.remove(member);
-            return this;
-        }
 
         public Transformer with(ExtensionKey<?, ?> key, ExtensionValue<?> obj)
         {
@@ -419,7 +402,6 @@ public class ClusterMetadata
                                                        placements,
                                                        lockedRanges,
                                                        inProgressSequences,
-                                                       cmsMembers,
                                                        extensions),
                                    ImmutableSet.copyOf(modifiedKeys));
         }
@@ -439,7 +421,6 @@ public class ClusterMetadata
                    ", lockedRanges=" + lockedRanges +
                    ", inProgressSequences=" + inProgressSequences +
                    ", extensions=" + extensions +
-                   ", cmsMembers=" + cmsMembers +
                    ", modifiedKeys=" + modifiedKeys +
                    '}';
         }
@@ -542,9 +523,6 @@ public class ClusterMetadata
                 assert key.valueType.isInstance(value);
                 value.serialize(out, version);
             }
-            out.writeInt(metadata.cmsMembers.size());
-            for (InetAddressAndPort member : metadata.cmsMembers)
-                
InetAddressAndPort.MetadataSerializer.serializer.serialize(member, out, 
version);
         }
 
         @Override
@@ -569,10 +547,6 @@ public class ClusterMetadata
                 value.deserialize(in, version);
                 extensions.put(key, value);
             }
-            int memberCount = in.readInt();
-            Set<InetAddressAndPort> members = new HashSet<>(memberCount);
-            for (int i = 0; i < memberCount; i++)
-                
members.add(InetAddressAndPort.MetadataSerializer.serializer.deserialize(in, 
version));
             return new ClusterMetadata(epoch,
                                        period,
                                        lastInPeriod,
@@ -583,7 +557,6 @@ public class ClusterMetadata
                                        placements,
                                        lockedRanges,
                                        ips,
-                                       members,
                                        extensions);
         }
 
@@ -606,10 +579,6 @@ public class ClusterMetadata
                     
LockedRanges.serializer.serializedSize(metadata.lockedRanges, version) +
                     
InProgressSequences.serializer.serializedSize(metadata.inProgressSequences, 
version);
 
-            size += TypeSizes.INT_SIZE;
-            for (InetAddressAndPort member : metadata.cmsMembers)
-                size += 
InetAddressAndPort.MetadataSerializer.serializer.serializedSize(member, 
version);
-
             return size;
         }
     }
diff --git a/src/java/org/apache/cassandra/tcm/Discovery.java 
b/src/java/org/apache/cassandra/tcm/Discovery.java
index a846a7b74a..58efbbccc6 100644
--- a/src/java/org/apache/cassandra/tcm/Discovery.java
+++ b/src/java/org/apache/cassandra/tcm/Discovery.java
@@ -122,7 +122,7 @@ public class Discovery
         @Override
         public void doVerb(Message<NoPayload> message)
         {
-            Set<InetAddressAndPort> cms = ClusterMetadata.current().cmsMembers;
+            Set<InetAddressAndPort> cms = 
ClusterMetadata.current().fullCMSMembers();
 
             DiscoveredNodes discoveredNodes;
             if (!cms.isEmpty())
diff --git a/src/java/org/apache/cassandra/tcm/PaxosBackedProcessor.java 
b/src/java/org/apache/cassandra/tcm/PaxosBackedProcessor.java
index 5eee7bc3dd..d4ccb20425 100644
--- a/src/java/org/apache/cassandra/tcm/PaxosBackedProcessor.java
+++ b/src/java/org/apache/cassandra/tcm/PaxosBackedProcessor.java
@@ -18,11 +18,9 @@
 
 package org.apache.cassandra.tcm;
 
-import java.util.ArrayList;
-import java.util.List;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
-import com.google.common.collect.Iterables;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -68,8 +66,7 @@ public class PaxosBackedProcessor extends 
AbstractLocalProcessor
         log.waitForHighestConsecutive();
         ClusterMetadata metadata = log.metadata();
 
-        List<Replica> replicas = new ArrayList<>();
-        Iterables.addAll(replicas, metadata.cmsReplicas);
+        Set<Replica> replicas = metadata.fullCMSMembersAsReplicas();
 
         // We can not use Paxos to catch-up a member of CMS ownership group, 
since that'd reduce availability,
         // so instead we allow CMS owners to catch up via inconsistent replay. 
In other words, from local log
diff --git a/src/java/org/apache/cassandra/tcm/RemoteProcessor.java 
b/src/java/org/apache/cassandra/tcm/RemoteProcessor.java
index a54a0b873f..fd5b59dd57 100644
--- a/src/java/org/apache/cassandra/tcm/RemoteProcessor.java
+++ b/src/java/org/apache/cassandra/tcm/RemoteProcessor.java
@@ -91,7 +91,7 @@ public final class RemoteProcessor implements 
ClusterMetadataService.Processor
 
     private List<InetAddressAndPort> candidates(boolean allowDiscovery)
     {
-        List<InetAddressAndPort> candidates = new 
ArrayList<>(log.metadata().cmsMembers);
+        List<InetAddressAndPort> candidates = new 
ArrayList<>(log.metadata().fullCMSMembers());
         if (candidates.isEmpty())
             candidates.addAll(DatabaseDescriptor.getSeeds());
 
diff --git a/src/java/org/apache/cassandra/tcm/Transformation.java 
b/src/java/org/apache/cassandra/tcm/Transformation.java
index b81a793ee1..d77194dcf7 100644
--- a/src/java/org/apache/cassandra/tcm/Transformation.java
+++ b/src/java/org/apache/cassandra/tcm/Transformation.java
@@ -45,8 +45,11 @@ import 
org.apache.cassandra.tcm.transformations.PrepareReplace;
 import org.apache.cassandra.tcm.transformations.Register;
 import org.apache.cassandra.tcm.transformations.SealPeriod;
 import org.apache.cassandra.tcm.transformations.UnsafeJoin;
+import org.apache.cassandra.tcm.transformations.cms.FinishAddMember;
 import org.apache.cassandra.tcm.transformations.cms.Initialize;
 import org.apache.cassandra.tcm.transformations.cms.PreInitialize;
+import org.apache.cassandra.tcm.transformations.cms.RemoveMember;
+import org.apache.cassandra.tcm.transformations.cms.StartAddMember;
 
 public interface Transformation
 {
@@ -174,6 +177,10 @@ public interface Transformation
 
         CANCEL_SEQUENCE(() -> CancelInProgressSequence.serializer),
 
+        START_ADD_TO_CMS(() -> StartAddMember.serializer),
+        FINISH_ADD_TO_CMS(() -> FinishAddMember.serializer),
+        REMOVE_FROM_CMS(() -> RemoveMember.serializer),
+
         CUSTOM(() -> CustomTransformation.serializer);
 
         private final Supplier<AsymmetricMetadataSerializer<Transformation, ? 
extends Transformation>> serializer;
diff --git a/src/java/org/apache/cassandra/tcm/compatibility/GossipHelper.java 
b/src/java/org/apache/cassandra/tcm/compatibility/GossipHelper.java
index 2f4270ace6..1a6fc17c84 100644
--- a/src/java/org/apache/cassandra/tcm/compatibility/GossipHelper.java
+++ b/src/java/org/apache/cassandra/tcm/compatibility/GossipHelper.java
@@ -303,7 +303,6 @@ public class GossipHelper
                                    DataPlacements.empty(),
                                    LockedRanges.EMPTY,
                                    InProgressSequences.EMPTY,
-                                   Collections.emptySet(),
                                    Collections.emptyMap());
     }
 
@@ -347,7 +346,6 @@ public class GossipHelper
                                                                       
DataPlacements.empty(),
                                                                       
LockedRanges.EMPTY,
                                                                       
InProgressSequences.EMPTY,
-                                                                      
Collections.emptySet(),
                                                                       
extensions);
         return new ClusterMetadata(Epoch.UPGRADE_GOSSIP,
                                    Period.EMPTY,
@@ -359,7 +357,6 @@ public class GossipHelper
                                    new 
UniformRangePlacement().calculatePlacements(forPlacementCalculation, 
schema.getKeyspaces()),
                                    LockedRanges.EMPTY,
                                    InProgressSequences.EMPTY,
-                                   Collections.emptySet(),
                                    extensions);
     }
 }
diff --git a/src/java/org/apache/cassandra/tcm/log/LocalLog.java 
b/src/java/org/apache/cassandra/tcm/log/LocalLog.java
index 469cc8a0e4..7adf3fd0fb 100644
--- a/src/java/org/apache/cassandra/tcm/log/LocalLog.java
+++ b/src/java/org/apache/cassandra/tcm/log/LocalLog.java
@@ -95,7 +95,7 @@ public abstract class LocalLog implements Closeable
         Transformation transform = PreInitialize.withFirstCMS(addr);
         append(new Entry(Entry.Id.NONE, FIRST, transform));
         waitForHighestConsecutive();
-        assert metadata().epoch.is(Epoch.FIRST) : 
ClusterMetadata.current().epoch + " " + ClusterMetadata.current().cmsMembers;
+        assert metadata().epoch.is(Epoch.FIRST) : 
ClusterMetadata.current().epoch + " " + 
ClusterMetadata.current().fullCMSMembers();
     }
 
     public ClusterMetadata metadata()
@@ -557,7 +557,7 @@ public abstract class LocalLog implements Closeable
 
             if ((entry.epoch.getEpoch() % 
DatabaseDescriptor.getMetadataSnapshotFrequency()) == 0)
             {
-                List<InetAddressAndPort> list = new 
ArrayList<>(ClusterMetadata.current().cmsMembers);
+                List<InetAddressAndPort> list = new 
ArrayList<>(ClusterMetadata.current().fullCMSMembers());
                 list.sort(comparing(i -> i.addressBytes[i.addressBytes.length 
- 1]));
                 if 
(list.get(0).equals(FBUtilities.getBroadcastAddressAndPort()))
                     ScheduledExecutors.nonPeriodicTasks.submit(() -> 
ClusterMetadataService.instance().sealPeriod());
diff --git a/src/java/org/apache/cassandra/tcm/sequences/AddToCMS.java 
b/src/java/org/apache/cassandra/tcm/sequences/AddToCMS.java
new file mode 100644
index 0000000000..8026689e5f
--- /dev/null
+++ b/src/java/org/apache/cassandra/tcm/sequences/AddToCMS.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tcm.sequences;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.concurrent.ExecutionException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.RangesAtEndpoint;
+import org.apache.cassandra.schema.DistributedMetadataLogKeyspace;
+import org.apache.cassandra.schema.SchemaConstants;
+import org.apache.cassandra.service.ActiveRepairService;
+import org.apache.cassandra.streaming.PreviewKind;
+import org.apache.cassandra.streaming.StreamOperation;
+import org.apache.cassandra.streaming.StreamPlan;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.ClusterMetadataService;
+import org.apache.cassandra.tcm.Epoch;
+import org.apache.cassandra.tcm.InProgressSequence;
+import org.apache.cassandra.tcm.Transformation;
+import org.apache.cassandra.tcm.membership.NodeId;
+import org.apache.cassandra.tcm.serialization.AsymmetricMetadataSerializer;
+import org.apache.cassandra.tcm.serialization.Version;
+import org.apache.cassandra.tcm.transformations.cms.FinishAddMember;
+import org.apache.cassandra.tcm.transformations.cms.StartAddMember;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static org.apache.cassandra.db.TypeSizes.sizeof;
+import static 
org.apache.cassandra.tcm.sequences.InProgressSequences.Kind.JOIN_OWNERSHIP_GROUP;
+import static org.apache.cassandra.tcm.transformations.cms.EntireRange.*;
+
+public class AddToCMS implements InProgressSequence<AddToCMS>
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(AddToCMS.class);
+    public static Serializer serializer = new Serializer();
+
+    private final ProgressBarrier barrier;
+    private final List<InetAddressAndPort> streamCandidates;
+    private final FinishAddMember finishJoin;
+
+    public static void initiate()
+    {
+        NodeId self = ClusterMetadata.current().myNodeId();
+        InProgressSequence<?> continuation = ClusterMetadataService.instance()
+                                                                   .commit(new 
StartAddMember(FBUtilities.getBroadcastAddressAndPort()),
+                                                                           
(metadata) -> !metadata.inProgressSequences.contains(self),
+                                                                           
(metadata) -> metadata.inProgressSequences.get(self),
+                                                                           
(metadata, reason) -> {
+                                                                               
throw new IllegalStateException("Can't join ownership group: " + reason);
+                                                                           });
+        if (continuation.kind() != JOIN_OWNERSHIP_GROUP)
+            throw new IllegalStateException(String.format("Following accepted 
initiation of node to CMS, " +
+                                                          "an incorrect 
sequence %s was found in progress. %s ",
+                                            continuation.kind(), 
continuation));
+        continuation.executeNext();
+    }
+
+    public AddToCMS(ProgressBarrier barrier, List<InetAddressAndPort> 
streamCandidates, FinishAddMember join)
+    {
+        this.barrier = barrier;
+        this.streamCandidates = streamCandidates;
+        this.finishJoin = join;
+    }
+
+    @Override
+    public ProgressBarrier barrier()
+    {
+        return barrier;
+    }
+
+    public Transformation.Kind nextStep()
+    {
+        return finishJoin.kind();
+    }
+
+    private void streamRanges() throws ExecutionException, InterruptedException
+    {
+        // TODO: iterate over stream candidates
+        StreamPlan streamPlan = new StreamPlan(StreamOperation.BOOTSTRAP, 1, 
true, null, PreviewKind.NONE);
+        streamPlan.requestRanges(streamCandidates.get(0),
+                                 SchemaConstants.METADATA_KEYSPACE_NAME,
+                                 new 
RangesAtEndpoint.Builder(FBUtilities.getBroadcastAddressAndPort()).add(finishJoin.replicaForStreaming()).build(),
+                                 new 
RangesAtEndpoint.Builder(FBUtilities.getBroadcastAddressAndPort()).build(),
+                                 DistributedMetadataLogKeyspace.TABLE_NAME);
+        streamPlan.execute().get();
+    }
+
+    private void finishJoin()
+    {
+        NodeId self = ClusterMetadata.current().myNodeId();
+        ClusterMetadataService.instance().commit(finishJoin,
+                                                 (ClusterMetadata metadata) -> 
metadata.inProgressSequences.contains(self),
+                                                 (ClusterMetadata metadata) -> 
null,
+                                                 (ClusterMetadata metadata, 
String error) -> {
+                                                     throw new 
IllegalStateException(String.format("Could not finish join due to \"%s\". Next 
transformation in sequence: %s.",
+                                                                               
                    error,
+                                                                               
                    metadata.inProgressSequences.contains(self) ? 
metadata.inProgressSequences.get(self).nextStep() : null));
+                                                 });
+    }
+
+    private void repairPaxosTopology() throws ExecutionException, 
InterruptedException
+    {
+        
ActiveRepairService.instance.repairPaxosForTopologyChange(SchemaConstants.METADATA_KEYSPACE_NAME,
+                                                                  
Collections.singletonList(entireRange),
+                                                                  "bootstrap")
+                                    .get();
+    }
+
+    public boolean executeNext()
+    {
+        try
+        {
+            streamRanges();
+            finishJoin();
+            repairPaxosTopology();
+        }
+        catch (Throwable t)
+        {
+            logger.error("Could not finish adding the node to the metadata 
ownership group", t);
+            throw new RuntimeException(t);
+        }
+
+        return false;
+    }
+
+    public AddToCMS advance(Epoch waitForWatermark, Transformation.Kind next)
+    {
+        throw new NoSuchElementException();
+    }
+
+    public InProgressSequences.Kind kind()
+    {
+        return JOIN_OWNERSHIP_GROUP;
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        AddToCMS addToCMS = (AddToCMS) o;
+        return Objects.equals(barrier, addToCMS.barrier) &&
+               Objects.equals(streamCandidates, addToCMS.streamCandidates) &&
+               Objects.equals(finishJoin, addToCMS.finishJoin);
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return Objects.hash(barrier, streamCandidates, finishJoin);
+    }
+
+    public static class Serializer implements 
AsymmetricMetadataSerializer<InProgressSequence<?>, AddToCMS>
+    {
+        @Override
+        public void serialize(InProgressSequence<?> t, DataOutputPlus out, 
Version version) throws IOException
+        {
+            AddToCMS seq = (AddToCMS) t;
+            ProgressBarrier.serializer.serialize(t.barrier(), out, version);
+            FinishAddMember.serializer.serialize(seq.finishJoin, out, version);
+            out.writeInt(seq.streamCandidates.size());
+            for (InetAddressAndPort ep : seq.streamCandidates)
+                InetAddressAndPort.MetadataSerializer.serializer.serialize(ep, 
out, version);
+        }
+
+        @Override
+        public AddToCMS deserialize(DataInputPlus in, Version version) throws 
IOException
+        {
+            ProgressBarrier barrier = 
ProgressBarrier.serializer.deserialize(in, version);
+            FinishAddMember finish = 
FinishAddMember.serializer.deserialize(in, version);
+            int streamCandidatesSize = in.readInt();
+            List<InetAddressAndPort> streamCandidates = new ArrayList<>();
+
+            for (int i = 0; i < streamCandidatesSize; i++)
+                
streamCandidates.add(InetAddressAndPort.MetadataSerializer.serializer.deserialize(in,
 version));
+            return new AddToCMS(barrier, streamCandidates, finish);
+        }
+
+        @Override
+        public long serializedSize(InProgressSequence<?> t, Version version)
+        {
+            AddToCMS seq = (AddToCMS) t;
+            long size = ProgressBarrier.serializer.serializedSize(t.barrier(), 
version);
+            size += FinishAddMember.serializer.serializedSize(seq.finishJoin, 
version);
+            size += sizeof(seq.streamCandidates.size());
+            for (InetAddressAndPort ep : seq.streamCandidates)
+                size += 
InetAddressAndPort.MetadataSerializer.serializer.serializedSize(ep, version);
+            return size;
+        }
+    }
+}
diff --git 
a/src/java/org/apache/cassandra/tcm/sequences/InProgressSequences.java 
b/src/java/org/apache/cassandra/tcm/sequences/InProgressSequences.java
index 32e78c5fd5..5fb7656048 100644
--- a/src/java/org/apache/cassandra/tcm/sequences/InProgressSequences.java
+++ b/src/java/org/apache/cassandra/tcm/sequences/InProgressSequences.java
@@ -137,6 +137,7 @@ public class InProgressSequences implements 
MetadataValue<InProgressSequences>
 
     public enum Kind
     {
+        JOIN_OWNERSHIP_GROUP(AddToCMS.serializer),
         JOIN(BootstrapAndJoin.serializer),
         MOVE(Move.serializer),
         REPLACE(BootstrapAndReplace.serializer),
diff --git 
a/src/java/org/apache/cassandra/tcm/transformations/cms/Initialize.java 
b/src/java/org/apache/cassandra/tcm/transformations/cms/BaseMembershipTransformation.java
similarity index 50%
copy from src/java/org/apache/cassandra/tcm/transformations/cms/Initialize.java
copy to 
src/java/org/apache/cassandra/tcm/transformations/cms/BaseMembershipTransformation.java
index 50e237a509..f8cebf22a1 100644
--- a/src/java/org/apache/cassandra/tcm/transformations/cms/Initialize.java
+++ 
b/src/java/org/apache/cassandra/tcm/transformations/cms/BaseMembershipTransformation.java
@@ -22,58 +22,43 @@ import java.io.IOException;
 
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.Replica;
 import org.apache.cassandra.tcm.Transformation;
 import org.apache.cassandra.tcm.serialization.AsymmetricMetadataSerializer;
 import org.apache.cassandra.tcm.serialization.Version;
-import org.apache.cassandra.tcm.transformations.ForceSnapshot;
 
-import static 
org.apache.cassandra.tcm.transformations.cms.EntireRange.affectedRanges;
-
-public class Initialize extends ForceSnapshot
+public abstract class BaseMembershipTransformation implements Transformation
 {
-    public static final AsymmetricMetadataSerializer<Transformation, 
Initialize> serializer = new AsymmetricMetadataSerializer<Transformation, 
Initialize>()
+    protected final InetAddressAndPort endpoint;
+    protected final Replica replica;
+
+    protected BaseMembershipTransformation(InetAddressAndPort endpoint)
+    {
+        this.endpoint = endpoint;
+        this.replica = EntireRange.replica(endpoint);
+    }
+
+    public static abstract class SerializerBase<T extends 
BaseMembershipTransformation> implements 
AsymmetricMetadataSerializer<Transformation, T>
     {
         public void serialize(Transformation t, DataOutputPlus out, Version 
version) throws IOException
         {
-            Initialize initialize = (Initialize) t;
-            ClusterMetadata.serializer.serialize(initialize.baseState, out, 
version);
+            T transformation = (T) t;
+            
InetAddressAndPort.MetadataSerializer.serializer.serialize(transformation.endpoint,
 out, version);
         }
 
-        public Initialize deserialize(DataInputPlus in, Version version) 
throws IOException
+        public T deserialize(DataInputPlus in, Version version) throws 
IOException
         {
-            return new Initialize(ClusterMetadata.serializer.deserialize(in, 
version));
+            InetAddressAndPort addr = 
InetAddressAndPort.MetadataSerializer.serializer.deserialize(in, version);
+            return createTransformation(addr);
         }
 
         public long serializedSize(Transformation t, Version version)
         {
-            Initialize initialize = (Initialize) t;
-            return 
ClusterMetadata.serializer.serializedSize(initialize.baseState, version);
+            T transformation = (T) t;
+            return 
InetAddressAndPort.MetadataSerializer.serializer.serializedSize(transformation.endpoint,
 version);
         }
-    };
-
-    public Initialize(ClusterMetadata baseState)
-    {
-        super(baseState);
-    }
 
-    public Kind kind()
-    {
-        return Kind.INITIALIZE_CMS;
-    }
-
-    public Result execute(ClusterMetadata prev)
-    {
-        ClusterMetadata next = baseState;
-        ClusterMetadata.Transformer transformer = next.transformer();
-        return success(transformer, affectedRanges);
-    }
-
-    @Override
-    public String toString()
-    {
-        return "Initialize{" +
-               "baseState = " + baseState +
-               '}';
+        public abstract T createTransformation(InetAddressAndPort addr);
     }
 }
diff --git 
a/src/java/org/apache/cassandra/tcm/transformations/cms/FinishAddMember.java 
b/src/java/org/apache/cassandra/tcm/transformations/cms/FinishAddMember.java
new file mode 100644
index 0000000000..2034105e09
--- /dev/null
+++ b/src/java/org/apache/cassandra/tcm/transformations/cms/FinishAddMember.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tcm.transformations.cms;
+
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.Replica;
+import org.apache.cassandra.schema.ReplicationParams;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.InProgressSequence;
+import org.apache.cassandra.tcm.Transformation;
+import org.apache.cassandra.tcm.membership.NodeId;
+import org.apache.cassandra.tcm.ownership.DataPlacement;
+import org.apache.cassandra.tcm.sequences.AddToCMS;
+import org.apache.cassandra.tcm.sequences.InProgressSequences;
+import org.apache.cassandra.tcm.serialization.AsymmetricMetadataSerializer;
+
+import static 
org.apache.cassandra.tcm.transformations.cms.EntireRange.affectedRanges;
+
+public class FinishAddMember extends BaseMembershipTransformation
+{
+    public static final AsymmetricMetadataSerializer<Transformation, 
FinishAddMember> serializer = new SerializerBase<FinishAddMember>()
+    {
+        public FinishAddMember createTransformation(InetAddressAndPort addr)
+        {
+            return new FinishAddMember(addr);
+        }
+    };
+
+    public FinishAddMember(InetAddressAndPort addr)
+    {
+        super(addr);
+    }
+
+    public Kind kind()
+    {
+        return Kind.FINISH_ADD_TO_CMS;
+    }
+
+    public Replica replicaForStreaming()
+    {
+        return replica;
+    }
+
+    public Result execute(ClusterMetadata prev)
+    {
+        InProgressSequences sequences = prev.inProgressSequences;
+        NodeId targetNode = prev.directory.peerId(replica.endpoint());
+        InProgressSequence<?> sequence = sequences.get(targetNode);
+
+        if (sequence == null)
+            return new Rejected("Can't execute finish join as cluster metadata 
does not hold join sequence for this node");
+
+        if (!(sequence instanceof AddToCMS))
+            return new Rejected("Can't execute finish join as cluster metadata 
contains a sequence of a different kind");
+
+        ClusterMetadata.Transformer transformer = prev.transformer();
+        DataPlacement.Builder builder = 
prev.placements.get(ReplicationParams.meta())
+                                                       .unbuild()
+                                                       
.withReadReplica(replica);
+        transformer = 
transformer.with(prev.placements.unbuild().with(ReplicationParams.meta(), 
builder.build()).build())
+                                 
.with(prev.inProgressSequences.without(targetNode));
+        return success(transformer, affectedRanges);
+    }
+}
diff --git 
a/src/java/org/apache/cassandra/tcm/transformations/cms/Initialize.java 
b/src/java/org/apache/cassandra/tcm/transformations/cms/Initialize.java
index 50e237a509..0113d04ef3 100644
--- a/src/java/org/apache/cassandra/tcm/transformations/cms/Initialize.java
+++ b/src/java/org/apache/cassandra/tcm/transformations/cms/Initialize.java
@@ -19,14 +19,26 @@
 package org.apache.cassandra.tcm.transformations.cms;
 
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
 
+import org.apache.cassandra.auth.AuthKeyspace;
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.schema.DistributedSchema;
+import org.apache.cassandra.schema.Keyspaces;
+import org.apache.cassandra.schema.SchemaConstants;
+import org.apache.cassandra.schema.SchemaTransformation;
+import org.apache.cassandra.schema.SchemaTransformations;
+import org.apache.cassandra.schema.SystemDistributedKeyspace;
 import org.apache.cassandra.tcm.ClusterMetadata;
 import org.apache.cassandra.tcm.Transformation;
 import org.apache.cassandra.tcm.serialization.AsymmetricMetadataSerializer;
 import org.apache.cassandra.tcm.serialization.Version;
 import org.apache.cassandra.tcm.transformations.ForceSnapshot;
+import org.apache.cassandra.tracing.TraceKeyspace;
 
 import static 
org.apache.cassandra.tcm.transformations.cms.EntireRange.affectedRanges;
 
@@ -65,10 +77,54 @@ public class Initialize extends ForceSnapshot
     public Result execute(ClusterMetadata prev)
     {
         ClusterMetadata next = baseState;
-        ClusterMetadata.Transformer transformer = next.transformer();
+        DistributedSchema initialSchema = new 
DistributedSchema(setUpDistributedSystemKeyspaces(next));
+        ClusterMetadata.Transformer transformer = 
next.transformer().with(initialSchema);
         return success(transformer, affectedRanges);
     }
 
+    public static final List<SchemaTransformation> schemaTransformations =
+    
Collections.unmodifiableList(Arrays.asList(SchemaTransformations.fromCql(String.format("CREATE
 KEYSPACE IF NOT EXISTS %s " +
+                                                                               
            "  WITH REPLICATION = { \n" +
+                                                                               
            "   'class' : 'SimpleStrategy', \n" +
+                                                                               
            "   'replication_factor' : %d \n" +
+                                                                               
            "  };",
+                                                                               
            SchemaConstants.TRACE_KEYSPACE_NAME,
+                                                                               
            Math.max(TraceKeyspace.DEFAULT_RF, 
DatabaseDescriptor.getDefaultKeyspaceRF()))),
+                                               
SchemaTransformations.fromCql(String.format(TraceKeyspace.SESSIONS_CQL, 
SchemaConstants.TRACE_KEYSPACE_NAME + "." + TraceKeyspace.SESSIONS)),
+                                               
SchemaTransformations.fromCql(String.format(TraceKeyspace.EVENTS_CQL, 
SchemaConstants.TRACE_KEYSPACE_NAME + "." + TraceKeyspace.EVENTS)),
+                                               
SchemaTransformations.fromCql(String.format("CREATE KEYSPACE IF NOT EXISTS %s " 
+
+                                                                               
            "  WITH REPLICATION = { \n" +
+                                                                               
            "   'class' : 'SimpleStrategy', \n" +
+                                                                               
            "   'replication_factor' : %d \n" +
+                                                                               
            "  };",
+                                                                               
            SchemaConstants.DISTRIBUTED_KEYSPACE_NAME,
+                                                                               
            Math.max(SystemDistributedKeyspace.DEFAULT_RF, 
DatabaseDescriptor.getDefaultKeyspaceRF()))),
+                                               
SchemaTransformations.fromCql(String.format(SystemDistributedKeyspace.REPAIR_HISTORY_CQL,
 SchemaConstants.DISTRIBUTED_KEYSPACE_NAME + "." + 
SystemDistributedKeyspace.REPAIR_HISTORY)),
+                                               
SchemaTransformations.fromCql(String.format(SystemDistributedKeyspace.PARENT_REPAIR_HISTORY_CQL,
 SchemaConstants.DISTRIBUTED_KEYSPACE_NAME + "." + 
SystemDistributedKeyspace.PARENT_REPAIR_HISTORY)),
+                                               
SchemaTransformations.fromCql(String.format(SystemDistributedKeyspace.VIEW_BUILD_STATUS_CQL,
 SchemaConstants.DISTRIBUTED_KEYSPACE_NAME + "." + 
SystemDistributedKeyspace.VIEW_BUILD_STATUS)),
+                                               
SchemaTransformations.fromCql(String.format("CREATE KEYSPACE IF NOT EXISTS %s " 
+
+                                                                               
            "  WITH REPLICATION = { \n" +
+                                                                               
            "   'class' : 'SimpleStrategy', \n" +
+                                                                               
            "   'replication_factor' : %d \n" +
+                                                                               
            "  };",
+                                                                               
            SchemaConstants.AUTH_KEYSPACE_NAME,
+                                                                               
            Math.max(AuthKeyspace.DEFAULT_RF, 
DatabaseDescriptor.getDefaultKeyspaceRF()))),
+                                               
SchemaTransformations.fromCql(String.format(AuthKeyspace.ROLES_CQL, 
SchemaConstants.AUTH_KEYSPACE_NAME + "." + AuthKeyspace.ROLES)),
+                                               
SchemaTransformations.fromCql(String.format(AuthKeyspace.ROLE_MEMBERS_CQL, 
SchemaConstants.AUTH_KEYSPACE_NAME + "." + AuthKeyspace.ROLE_MEMBERS)),
+                                               
SchemaTransformations.fromCql(String.format(AuthKeyspace.ROLE_PERMISSIONS_CQL, 
SchemaConstants.AUTH_KEYSPACE_NAME + "." + AuthKeyspace.ROLE_PERMISSIONS)),
+                                               
SchemaTransformations.fromCql(String.format(AuthKeyspace.RESOURCE_ROLE_INDEX_CQL,
 SchemaConstants.AUTH_KEYSPACE_NAME + "." + AuthKeyspace.RESOURCE_ROLE_INDEX)),
+                                               
SchemaTransformations.fromCql(String.format(AuthKeyspace.NETWORK_PERMISSIONS_CQL,
 SchemaConstants.AUTH_KEYSPACE_NAME + "." + 
AuthKeyspace.NETWORK_PERMISSIONS))));
+
+    public Keyspaces setUpDistributedSystemKeyspaces(ClusterMetadata metadata)
+    {
+        Keyspaces keyspaces = metadata.schema.getKeyspaces();
+
+        for (SchemaTransformation transformation : schemaTransformations)
+            keyspaces = transformation.apply(metadata, keyspaces);
+
+        return keyspaces;
+    }
+
     @Override
     public String toString()
     {
diff --git 
a/src/java/org/apache/cassandra/tcm/transformations/cms/PreInitialize.java 
b/src/java/org/apache/cassandra/tcm/transformations/cms/PreInitialize.java
index 08f00effdf..41bd7ca4b6 100644
--- a/src/java/org/apache/cassandra/tcm/transformations/cms/PreInitialize.java
+++ b/src/java/org/apache/cassandra/tcm/transformations/cms/PreInitialize.java
@@ -20,16 +20,22 @@ package org.apache.cassandra.tcm.transformations.cms;
 
 import java.io.IOException;
 
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.locator.Replica;
 import org.apache.cassandra.tcm.Epoch;
+import org.apache.cassandra.tcm.Period;
 import org.apache.cassandra.tcm.Transformation;
-import org.apache.cassandra.tcm.sequences.LockedRanges;
 import org.apache.cassandra.tcm.serialization.AsymmetricMetadataSerializer;
 import org.apache.cassandra.tcm.serialization.Version;
+import org.apache.cassandra.schema.ReplicationParams;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.ownership.DataPlacement;
+import org.apache.cassandra.tcm.ownership.DataPlacements;
+import org.apache.cassandra.tcm.sequences.LockedRanges;
 
 public class PreInitialize implements Transformation
 {
@@ -57,7 +63,6 @@ public class PreInitialize implements Transformation
         return new PreInitialize(addr);
     }
 
-
     public Kind kind()
     {
         return Kind.PRE_INITIALIZE_CMS;
@@ -66,14 +71,26 @@ public class PreInitialize implements Transformation
     public Result execute(ClusterMetadata metadata)
     {
         assert metadata.epoch.isBefore(Epoch.FIRST);
+        assert metadata.period == Period.EMPTY;
 
-        ClusterMetadata.Transformer transformer = metadata.transformer();
+        ClusterMetadata.Transformer transformer = metadata.transformer(false);
         if (addr != null)
-            transformer = transformer.withCMSMember(addr);
-
+        {
+            DataPlacement.Builder dataPlacementBuilder = 
DataPlacement.builder();
+            Replica replica = new Replica(addr,
+                                          
DatabaseDescriptor.getPartitioner().getMinimumToken(),
+                                          
DatabaseDescriptor.getPartitioner().getMinimumToken(),
+                                          true);
+            dataPlacementBuilder.reads.withReplica(replica);
+            dataPlacementBuilder.writes.withReplica(replica);
+            DataPlacements initialPlacement = 
metadata.placements.unbuild().with(ReplicationParams.meta(), 
dataPlacementBuilder.build()).build();
+
+            transformer.with(initialPlacement);
+        }
         ClusterMetadata.Transformer.Transformed transformed = 
transformer.build();
         metadata = transformed.metadata;
         assert metadata.epoch.is(Epoch.FIRST) : metadata.epoch;
+        assert metadata.period == Period.FIRST : metadata.period;
 
         return new Success(metadata, LockedRanges.AffectedRanges.EMPTY, 
transformed.modifiedKeys);
     }
diff --git 
a/src/java/org/apache/cassandra/tcm/transformations/cms/RemoveMember.java 
b/src/java/org/apache/cassandra/tcm/transformations/cms/RemoveMember.java
new file mode 100644
index 0000000000..efc29bd53c
--- /dev/null
+++ b/src/java/org/apache/cassandra/tcm/transformations/cms/RemoveMember.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tcm.transformations.cms;
+
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.schema.ReplicationParams;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.Transformation;
+import org.apache.cassandra.tcm.ownership.DataPlacement;
+import org.apache.cassandra.tcm.serialization.AsymmetricMetadataSerializer;
+
+import static 
org.apache.cassandra.tcm.transformations.cms.EntireRange.affectedRanges;
+
+public class RemoveMember extends BaseMembershipTransformation
+{
+    public static final AsymmetricMetadataSerializer<Transformation, 
RemoveMember> serializer = new SerializerBase<RemoveMember>()
+    {
+        public RemoveMember createTransformation(InetAddressAndPort addr)
+        {
+            return new RemoveMember(addr);
+        }
+    };
+
+    public RemoveMember(InetAddressAndPort addr)
+    {
+        super(addr);
+    }
+
+    public Kind kind()
+    {
+        return Kind.REMOVE_FROM_CMS;
+    }
+
+    public Result execute(ClusterMetadata prev)
+    {
+        ClusterMetadata.Transformer transformer = prev.transformer();
+        DataPlacement.Builder builder = 
prev.placements.get(ReplicationParams.meta()).unbuild();
+        builder.reads.withoutReplica(replica);
+        builder.writes.withoutReplica(replica);
+        return 
success(transformer.with(prev.placements.unbuild().with(ReplicationParams.meta(),
 builder.build()).build()),
+                       affectedRanges);
+    }
+}
diff --git 
a/src/java/org/apache/cassandra/tcm/transformations/cms/StartAddMember.java 
b/src/java/org/apache/cassandra/tcm/transformations/cms/StartAddMember.java
new file mode 100644
index 0000000000..4dcf2e52a5
--- /dev/null
+++ b/src/java/org/apache/cassandra/tcm/transformations/cms/StartAddMember.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tcm.transformations.cms;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.RangesByEndpoint;
+import org.apache.cassandra.locator.Replica;
+import org.apache.cassandra.schema.ReplicationParams;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.Transformation;
+import org.apache.cassandra.tcm.ownership.DataPlacement;
+import org.apache.cassandra.tcm.sequences.AddToCMS;
+import org.apache.cassandra.tcm.sequences.ProgressBarrier;
+import org.apache.cassandra.tcm.serialization.AsymmetricMetadataSerializer;
+
+import static 
org.apache.cassandra.tcm.transformations.cms.EntireRange.affectedRanges;
+
+public class StartAddMember extends BaseMembershipTransformation
+{
+    public static final AsymmetricMetadataSerializer<Transformation, 
StartAddMember> serializer = new SerializerBase<StartAddMember>()
+    {
+        public StartAddMember createTransformation(InetAddressAndPort addr)
+        {
+            return new StartAddMember(addr);
+        }
+    };
+
+    public StartAddMember(InetAddressAndPort addr)
+    {
+        super(addr);
+    }
+
+    public Kind kind()
+    {
+        return Kind.START_ADD_TO_CMS;
+    }
+
+    public Result execute(ClusterMetadata prev)
+    {
+        RangesByEndpoint readReplicas = 
prev.placements.get(ReplicationParams.meta()).reads.byEndpoint();
+        RangesByEndpoint writeReplicas = 
prev.placements.get(ReplicationParams.meta()).writes.byEndpoint();
+
+        if (readReplicas.containsKey(endpoint) || 
writeReplicas.containsKey(endpoint))
+            return new Rejected("Endpoint is already a member of the Cluster 
Metadata ownership set");
+
+        ClusterMetadata.Transformer transformer = prev.transformer();
+        DataPlacement.Builder builder = 
prev.placements.get(ReplicationParams.meta()).unbuild()
+                                                       
.withWriteReplica(replica);
+
+        
transformer.with(prev.placements.unbuild().with(ReplicationParams.meta(), 
builder.build()).build());
+
+        List<InetAddressAndPort> streamCandidates = new ArrayList<>();
+        for (Replica replica : 
prev.placements.get(ReplicationParams.meta()).reads.byEndpoint().flattenValues())
+        {
+            if (!this.replica.equals(replica))
+                streamCandidates.add(replica.endpoint());
+        }
+
+        ProgressBarrier barrier = new ProgressBarrier(prev.nextEpoch(), 
affectedRanges.toPeers(prev.placements, prev.directory), false);
+        AddToCMS joinSequence = new AddToCMS(barrier, streamCandidates, new 
FinishAddMember(endpoint));
+
+        return 
success(transformer.with(prev.inProgressSequences.with(prev.directory.peerId(replica.endpoint()),
 joinSequence)),
+                       affectedRanges);
+    }
+}
diff --git a/src/java/org/apache/cassandra/tracing/TraceKeyspace.java 
b/src/java/org/apache/cassandra/tracing/TraceKeyspace.java
index 8101b6721f..e64826e653 100644
--- a/src/java/org/apache/cassandra/tracing/TraceKeyspace.java
+++ b/src/java/org/apache/cassandra/tracing/TraceKeyspace.java
@@ -27,7 +27,6 @@ import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.Mutation;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.db.rows.Row;
-import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.schema.KeyspaceMetadata;
 import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.schema.SchemaConstants;
@@ -45,7 +44,7 @@ public final class TraceKeyspace
     {
     }
 
-    private static final int DEFAULT_RF = 
CassandraRelevantProperties.SYSTEM_TRACES_DEFAULT_RF.getInt();
+    public static final int DEFAULT_RF = 
CassandraRelevantProperties.SYSTEM_TRACES_DEFAULT_RF.getInt();
 
     /**
      * Generation is used as a timestamp for automatic table creation on 
startup.
@@ -67,33 +66,31 @@ public final class TraceKeyspace
     public static final String SESSIONS = "sessions";
     public static final String EVENTS = "events";
 
+    public static final String SESSIONS_CQL = "CREATE TABLE IF NOT EXISTS %s ("
+                                              + "session_id uuid,"
+                                              + "command text,"
+                                              + "client inet,"
+                                              + "coordinator inet,"
+                                              + "coordinator_port int,"
+                                              + "duration int,"
+                                              + "parameters map<text, text>,"
+                                              + "request text,"
+                                              + "started_at timestamp,"
+                                              + "PRIMARY KEY ((session_id)))";
     private static final TableMetadata Sessions =
-        parse(SESSIONS,
-                "tracing sessions",
-                "CREATE TABLE %s ("
-                + "session_id uuid,"
-                + "command text,"
-                + "client inet,"
-                + "coordinator inet,"
-                + "coordinator_port int,"
-                + "duration int,"
-                + "parameters map<text, text>,"
-                + "request text,"
-                + "started_at timestamp,"
-                + "PRIMARY KEY ((session_id)))");
-
+        parse(SESSIONS, "tracing sessions", SESSIONS_CQL);
+
+    public static final String EVENTS_CQL = "CREATE TABLE IF NOT EXISTS %s ("
+                                            + "session_id uuid,"
+                                            + "event_id timeuuid,"
+                                            + "activity text,"
+                                            + "source inet,"
+                                            + "source_port int,"
+                                            + "source_elapsed int,"
+                                            + "thread text,"
+                                            + "PRIMARY KEY ((session_id), 
event_id))";
     private static final TableMetadata Events =
-        parse(EVENTS,
-                "tracing events",
-                "CREATE TABLE %s ("
-                + "session_id uuid,"
-                + "event_id timeuuid,"
-                + "activity text,"
-                + "source inet,"
-                + "source_port int,"
-                + "source_elapsed int,"
-                + "thread text,"
-                + "PRIMARY KEY ((session_id), event_id))");
+        parse(EVENTS, "tracing events", EVENTS_CQL);
 
     private static TableMetadata parse(String table, String description, 
String cql)
     {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to