This is an automated email from the ASF dual-hosted git repository. blerer pushed a commit to branch cep-21-tcm-review in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 73c7ca4a5f90cec5297c3983752506646645146a Author: Benjamin Lerer <b.le...@gmail.com> AuthorDate: Thu Oct 5 16:41:48 2023 +0200 Add javadoc WIP --- .../config/CassandraRelevantProperties.java | 3 +- src/java/org/apache/cassandra/config/Config.java | 7 +- .../cassandra/config/DatabaseDescriptor.java | 6 ++ .../cql3/statements/DescribeStatement.java | 6 ++ .../org/apache/cassandra/db/SystemKeyspace.java | 24 ++++++ .../apache/cassandra/schema/DistributedSchema.java | 4 + src/java/org/apache/cassandra/schema/Schema.java | 7 ++ .../cassandra/schema/SchemaTransformation.java | 2 +- .../org/apache/cassandra/tcm/ClusterMetadata.java | 25 ++++++ .../cassandra/tcm/ClusterMetadataService.java | 22 ++++-- src/java/org/apache/cassandra/tcm/Discovery.java | 22 +++++- src/java/org/apache/cassandra/tcm/Epoch.java | 92 +++++++++++++++++++++- .../apache/cassandra/tcm/MetadataSnapshots.java | 56 +++++-------- .../org/apache/cassandra/tcm/MetadataValue.java | 14 ++++ src/java/org/apache/cassandra/tcm/Period.java | 4 + .../cassandra/tcm/RecentlySealedPeriods.java | 8 +- src/java/org/apache/cassandra/tcm/Retry.java | 79 +++++++++++++++++++ src/java/org/apache/cassandra/tcm/Sealed.java | 27 +++++++ src/java/org/apache/cassandra/tcm/Startup.java | 19 +++++ .../org/apache/cassandra/tcm/Transformation.java | 34 ++++++++ .../cassandra/tcm/listeners/ChangeListener.java | 4 +- .../tcm/listeners/MetadataSnapshotListener.java | 7 ++ src/java/org/apache/cassandra/tcm/log/Entry.java | 17 ++++ .../org/apache/cassandra/tcm/log/LocalLog.java | 31 +++++++- .../org/apache/cassandra/tcm/log/LogState.java | 11 +++ .../org/apache/cassandra/tcm/log/LogStorage.java | 8 ++ .../org/apache/cassandra/tcm/log/Replication.java | 22 +++++- .../cassandra/tcm/log/SystemKeyspaceStorage.java | 12 ++- .../apache/cassandra/tcm/migration/Election.java | 11 +++ .../tcm/ownership/UniformRangePlacement.java | 6 +- .../cassandra/tcm/transformations/SealPeriod.java | 2 +- 31 files changed, 531 insertions(+), 61 deletions(-) diff --git a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java index 32c5d4fb59..554e407bbf 100644 --- a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java +++ b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java @@ -511,8 +511,9 @@ public enum CassandraRelevantProperties */ TCM_PROGRESS_BARRIER_BACKOFF_MILLIS("cassandra.progress_barrier_backoff_ms", "1000"), TCM_PROGRESS_BARRIER_TIMEOUT_MILLIS("cassandra.progress_barrier_timeout_ms", "3600000"), + /** - * size of in-memory index of max epoch -> sealed period + * Maximum sized of the {@code RecentlySealedPeriods} in-memory index. */ TCM_RECENTLY_SEALED_PERIOD_INDEX_SIZE("cassandra.recently_sealed_period_index_size", "10"), diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java index 01f9be8c15..927e830ce1 100644 --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@ -174,8 +174,13 @@ public class Config public volatile DurationSpec.LongMillisecondsBound cms_await_timeout = new DurationSpec.LongMillisecondsBound("120000ms"); public volatile int cms_default_max_retries = 10; public volatile DurationSpec.IntMillisecondsBound cms_default_retry_backoff = new DurationSpec.IntMillisecondsBound("50ms"); + /** - * How often we should snapshot the cluster metadata. + * Specify how often a snapshot of the cluster metadata must be taken. + * <p>The frequency is express in epochs. A frequency of 100, for example, means that a snapshot will be taken every time + * the epoch is a multiple of 100.</p> + * <p>Taking a snapshot will also seal a period (e.g. cluster metadata partition). Therefore the snapshot frequency also determine the size of the + * {@code system.local_metadata_log} and {@code cluster_metadata.distributed_metadata_log} tables partitions.</p> */ public volatile int metadata_snapshot_frequency = 100; diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index f22f845b38..365dd255d6 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -4985,6 +4985,12 @@ public class DatabaseDescriptor return conf.cms_await_timeout; } + /** + * Returns how often a snapshot of the cluster metadata must be taken. + * <p>The frequency is express in epochs. A frequency of 100, for example, means that a snapshot will be taken every time + * the epoch is a multiple of 100.</p> + * @return how often a snapshot of the cluster metadata must be taken. + */ public static int getMetadataSnapshotFrequency() { return conf.metadata_snapshot_frequency; diff --git a/src/java/org/apache/cassandra/cql3/statements/DescribeStatement.java b/src/java/org/apache/cassandra/cql3/statements/DescribeStatement.java index 45fd3a761b..b3badab71b 100644 --- a/src/java/org/apache/cassandra/cql3/statements/DescribeStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/DescribeStatement.java @@ -202,6 +202,12 @@ public abstract class DescribeStatement<T> extends CQLStatement.Raw implements C } } + @Override + public short[] getPartitionKeyBindVariableIndexes() + { + return CQLStatement.super.getPartitionKeyBindVariableIndexes(); + } + private long getOffset(PagingState pagingState, UUID schemaVersion) { if (pagingState == null) diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java index be8444fb51..51074de248 100644 --- a/src/java/org/apache/cassandra/db/SystemKeyspace.java +++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java @@ -1968,6 +1968,13 @@ public final class SystemKeyspace } } + /** + * Insert the cluster metadata snapshot into the {@code metadata_snapshot} table. + * + * @param epoch the snapshot epoch + * @param period the period to which the snapshot belong + * @param snapshot the snapshot to store + */ public static void storeSnapshot(Epoch epoch, long period, ByteBuffer snapshot) { logger.info("Storing snapshot of cluster metadata at epoch {} (period {})", epoch, period); @@ -1975,6 +1982,12 @@ public final class SystemKeyspace executeInternal(query, epoch.getEpoch(), period, snapshot); } + /** + * Retrieves the cluster metadata snapshot for the specified epoch from the {@code metadata_snapshot} table. + * + * @param epoch the epoch for which the snapshot must be retrieved + * @return the cluster metadata snapshot for the specified epoch or {@code null} if no snapshot exists for the epoch. + */ public static ByteBuffer getSnapshot(Epoch epoch) { logger.info("Getting snapshot of epoch = {}", epoch); @@ -2030,6 +2043,11 @@ public final class SystemKeyspace return new Sealed(maxPeriod + 1, maxEpoch + 1); } + + /** + * Retrieves the last sealed period from the {@code metadata_last_sealed_period} table. + * @return the last sealed period + */ public static Sealed getLastSealedPeriod() { String query = String.format("SELECT epoch, period FROM %s.%s WHERE key = 'latest'", SchemaConstants.SYSTEM_KEYSPACE_NAME, LAST_SEALED_PERIOD_TABLE_NAME); @@ -2041,6 +2059,12 @@ public final class SystemKeyspace return new Sealed(period, Epoch.create(epoch)); } + /** + * Marks the period as sealed in the {@code metadata_sealed_periods} table and mark it as the latest seal period in + * the {@code metadata_last_sealed_period} table. + * @param period the period being sealed + * @param epoch the last epoch of the period + */ public static void sealPeriod(long period, Epoch epoch) { String query = String.format("INSERT INTO %s.%s (max_epoch, period) VALUES (?,?)", SchemaConstants.SYSTEM_KEYSPACE_NAME, SEALED_PERIODS_TABLE_NAME); diff --git a/src/java/org/apache/cassandra/schema/DistributedSchema.java b/src/java/org/apache/cassandra/schema/DistributedSchema.java index d55a90c8be..0e395333d9 100644 --- a/src/java/org/apache/cassandra/schema/DistributedSchema.java +++ b/src/java/org/apache/cassandra/schema/DistributedSchema.java @@ -48,6 +48,10 @@ public class DistributedSchema implements MetadataValue<DistributedSchema> { public static final Serializer serializer = new Serializer(); + /** + * A schema without any keyspace. + * @return a schema without any keyspace. + */ public static final DistributedSchema empty() { return new DistributedSchema(Keyspaces.none(), Epoch.EMPTY); diff --git a/src/java/org/apache/cassandra/schema/Schema.java b/src/java/org/apache/cassandra/schema/Schema.java index 3cb5eaefcc..68760d5307 100644 --- a/src/java/org/apache/cassandra/schema/Schema.java +++ b/src/java/org/apache/cassandra/schema/Schema.java @@ -20,6 +20,7 @@ package org.apache.cassandra.schema; import java.util.HashMap; import java.util.Map; import java.util.Optional; +import java.util.UUID; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.LockSupport; import java.util.function.Supplier; @@ -307,6 +308,12 @@ public final class Schema implements SchemaProvider }); } + @Override + public UUID getVersion() + { + return SchemaProvider.super.getVersion(); + } + // We need to lazy-initialize schema for test purposes: since column families are initialized // eagerly, if local schema initialization is attempted before commit log instance is started, // cf initialization will fail to grab a current commit log position. diff --git a/src/java/org/apache/cassandra/schema/SchemaTransformation.java b/src/java/org/apache/cassandra/schema/SchemaTransformation.java index 75ab3767ab..5b50e184bc 100644 --- a/src/java/org/apache/cassandra/schema/SchemaTransformation.java +++ b/src/java/org/apache/cassandra/schema/SchemaTransformation.java @@ -37,7 +37,7 @@ public interface SchemaTransformation /** * Apply a statement transformation to a schema snapshot. * <p> - * Implementing methods should be side-effect free (outside of throwing exceptions if the transformation cannot + * Implementing methods should be side effect free (outside of throwing exceptions if the transformation cannot * be successfully applied to the provided schema). * * @param metadata Cluster metadata representing the current state, including the DistributedSchema with the diff --git a/src/java/org/apache/cassandra/tcm/ClusterMetadata.java b/src/java/org/apache/cassandra/tcm/ClusterMetadata.java index 3e2184bd99..f9b08fccd7 100644 --- a/src/java/org/apache/cassandra/tcm/ClusterMetadata.java +++ b/src/java/org/apache/cassandra/tcm/ClusterMetadata.java @@ -19,6 +19,7 @@ package org.apache.cassandra.tcm; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -40,7 +41,9 @@ import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; +import org.apache.cassandra.io.util.DataInputBuffer; import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputBuffer; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.locator.EndpointsForToken; import org.apache.cassandra.locator.InetAddressAndPort; @@ -67,6 +70,7 @@ import org.apache.cassandra.tcm.sequences.BootstrapAndJoin; import org.apache.cassandra.tcm.sequences.InProgressSequences; import org.apache.cassandra.tcm.sequences.LockedRanges; import org.apache.cassandra.tcm.serialization.MetadataSerializer; +import org.apache.cassandra.tcm.serialization.VerboseMetadataSerializer; import org.apache.cassandra.tcm.serialization.Version; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.Pair; @@ -832,6 +836,27 @@ public class ClusterMetadata return !directory.clusterMaxVersion.serializationVersion().equals(directory.clusterMinVersion.serializationVersion()); } + public static ByteBuffer toBytes(ClusterMetadata metadata) throws IOException + { + Version serializationVersion = Version.minCommonSerializationVersion(); + long serializedSize = VerboseMetadataSerializer.serializedSize(serializer, metadata, serializationVersion); + ByteBuffer bytes = ByteBuffer.allocate((int) serializedSize); + try (DataOutputBuffer dob = new DataOutputBuffer(bytes)) + { + VerboseMetadataSerializer.serialize(serializer, metadata, dob, serializationVersion); + } + bytes.flip().rewind(); + return bytes; + } + + public static ClusterMetadata fromBytes(ByteBuffer serialized) throws IOException + { + if (serialized == null) + return null; + + return VerboseMetadataSerializer.deserialize(serializer, new DataInputBuffer(serialized, false)); + } + public static class Serializer implements MetadataSerializer<ClusterMetadata> { @Override diff --git a/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java b/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java index c757dde73a..6871289e27 100644 --- a/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java +++ b/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java @@ -125,6 +125,10 @@ public class ClusterMetadataService private final AtomicBoolean commitsPaused = new AtomicBoolean(); + /** + * Returns the state of the {@code ClusteMetadataService}. + * @return the state of the {@code ClusteMetadataService}. + */ public static State state() { return state(ClusterMetadata.current()); @@ -141,7 +145,7 @@ public class ClusterMetadataService // The node is a full member of the CMS if it has started participating in reads for distributed metadata table (which // implies it is a write replica as well). In other words, it's a fully joined member of the replica set responsible for // the distributed metadata table. - if (ClusterMetadata.current().isCMSMember(FBUtilities.getBroadcastAddressAndPort())) + if (metadata.isCMSMember(FBUtilities.getBroadcastAddressAndPort())) return LOCAL; return REMOTE; } @@ -733,14 +737,18 @@ public class ClusterMetadataService return snapshots; } + /** + * Attempt to seal the current period. + * @return the latest cluster metadata + */ public ClusterMetadata sealPeriod() { - return ClusterMetadataService.instance.commit(SealPeriod.instance, - (ClusterMetadata metadata) -> metadata, - (metadata, code, reason) -> { - // If the transformation got rejected, someone else has beat us to seal this period - return metadata; - }); + return commit(SealPeriod.instance, + (ClusterMetadata metadata) -> metadata, + (metadata, code, reason) -> { + // If the transformation got rejected, someone else has beat us to seal this period + return metadata; + }); } public void initRecentlySealedPeriodsIndex() diff --git a/src/java/org/apache/cassandra/tcm/Discovery.java b/src/java/org/apache/cassandra/tcm/Discovery.java index a353828121..5b94e328ae 100644 --- a/src/java/org/apache/cassandra/tcm/Discovery.java +++ b/src/java/org/apache/cassandra/tcm/Discovery.java @@ -48,6 +48,9 @@ import org.apache.cassandra.utils.Clock; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.Pair; +/** + * Service in charge of discovering nodes + */ public class Discovery { private static final Logger logger = LoggerFactory.getLogger(Discovery.class); @@ -143,6 +146,13 @@ public class Discovery return new ArrayList<>(discovered); } + /** + * A set of nodes that can either be: + * <ul> + * <li>existing members of the CMS if one exists. i.e. this is an established cluster that the new node is joining (kind = {@code CMS_ONLY})</li> + * <li>known peers, if the respondant does not know of any CMS. i.e. this is a brand new cluster that has not yet initialised its CMS (kind = {@code KNOWN_PEERS})</li> + * </ul> + */ public static class DiscoveredNodes { private final Set<InetAddressAndPort> nodes; @@ -174,7 +184,14 @@ public class Discovery public enum Kind { - CMS_ONLY, KNOWN_PEERS + /** + * All discovered nodes are all part of the CMS. + */ + CMS_ONLY, + /** + * None of the discovered nodes are part of the CMS (no CMS exists). + */ + KNOWN_PEERS } } @@ -211,6 +228,9 @@ public class Discovery } } + /** + * The states of the discovery process. + */ private enum State { NOT_STARTED, diff --git a/src/java/org/apache/cassandra/tcm/Epoch.java b/src/java/org/apache/cassandra/tcm/Epoch.java index 1b7c33a895..ac59baa59c 100644 --- a/src/java/org/apache/cassandra/tcm/Epoch.java +++ b/src/java/org/apache/cassandra/tcm/Epoch.java @@ -32,10 +32,25 @@ import org.apache.cassandra.tcm.serialization.MetadataSerializer; import org.apache.cassandra.tcm.serialization.Version; import org.apache.cassandra.utils.vint.VIntCoding; +/** + * An epoch is a monotonically increasing counter associated with an event in the metadata change log. Therefore, + * an epoch can also be seen as a position in the cluster metadata log. + * + * <p> Each event committed to the log by the CMS implies a new epoch and as such, + * each epoch simply represents a specific point in the linearized history of cluster metadata. + * Both epochs and the change log itself are immutable and once an event is assigned a particular order in the log, this cannot be modified. + * + * <p> {@code Epoch} instance can be compared, serialized, and deserialized to facilitate event ordering + * and state reconciliation across nodes. + * + * <p> This class also defines several special epoch instances for identifying + * unique states or events in the cluster, such as the first epoch, or epochs + * designated for upgrade processes. + */ public class Epoch implements Comparable<Epoch>, Serializable { public static final EpochSerializer serializer = new EpochSerializer(); - public static final IVersionedSerializer<Epoch> messageSerializer = new IVersionedSerializer<Epoch>() + public static final IVersionedSerializer<Epoch> messageSerializer = new IVersionedSerializer<>() { @Override public void serialize(Epoch t, DataOutputPlus out, int version) throws IOException @@ -64,11 +79,23 @@ public class Epoch implements Comparable<Epoch>, Serializable private final long epoch; + /** + * Constructs an instance of {@code Epoch} with the specified epoch value. + * + * @param epoch A long value representing the epoch. + */ private Epoch(long epoch) { this.epoch = epoch; } + /** + * Creates and returns an {@code Epoch} instance for the given epoch value. + * Utilizes existing constant instances when possible. + * + * @param epoch A long value representing the epoch. + * @return An instance of {@code Epoch}. + */ public static Epoch create(long epoch) { if (epoch == EMPTY.epoch) @@ -82,11 +109,24 @@ public class Epoch implements Comparable<Epoch>, Serializable return new Epoch(epoch); } + /** + * Determines and returns the maximum epoch among the provided two epochs. + * + * @param l The first {@code Epoch} to compare. + * @param r The second {@code Epoch} to compare. + * @return The {@code Epoch} instance which is larger. + */ public static Epoch max(Epoch l, Epoch r) { return l.compareTo(r) > 0 ? l : r; } + /** + * Checks whether this epoch is directly before the specified epoch. + * + * @param epoch the Epoch to compare with. + * @return true if this epoch is directly before the provided epoch; false otherwise. + */ public boolean isDirectlyBefore(Epoch epoch) { if (epoch.equals(Epoch.FIRST)) @@ -94,11 +134,22 @@ public class Epoch implements Comparable<Epoch>, Serializable return this.epoch + 1 == epoch.epoch; } + /** + * Checks whether this epoch is directly after the specified epoch. + * + * @param epoch the Epoch to compare with. + * @return true if this epoch is directly after the provided epoch; false otherwise. + */ public boolean isDirectlyAfter(Epoch epoch) { return epoch.isDirectlyBefore(this); } + /** + * Produces a new Epoch instance representing the subsequent epoch. + * + * @return a new Epoch instance incremented by one from the current epoch. + */ public Epoch nextEpoch() { if (beforeFirst.contains(this)) @@ -113,26 +164,57 @@ public class Epoch implements Comparable<Epoch>, Serializable return Long.compare(epoch, other.epoch); } + /** + * Determines whether this epoch is before the specified epoch. + * + * @param other The {@code Epoch} to compare against. + * @return {@code true} if this epoch is before the other epoch, + * {@code false} otherwise. + */ public boolean isBefore(Epoch other) { return compareTo(other) < 0; } + /** + * Checks if this epoch is equal to or before the specified epoch. + * + * @param other the Epoch to compare with. + * @return true if this epoch is equal to or before the provided epoch; false otherwise. + */ public boolean isEqualOrBefore(Epoch other) { return compareTo(other) <= 0; } + /** + * Checks if this epoch is after the specified epoch. + * + * @param other the Epoch to compare with. + * @return true if this epoch is after the provided epoch; false otherwise. + */ public boolean isAfter(Epoch other) { return compareTo(other) > 0; } + /** + * Checks if this epoch is equal to or after the specified epoch. + * + * @param other the Epoch to compare with. + * @return true if this epoch is equal to or after the provided epoch; false otherwise. + */ public boolean isEqualOrAfter(Epoch other) { return compareTo(other) >= 0; } + /** + * Compares this epoch with the specified epoch for equality. + * + * @param other the Epoch to compare with. + * @return true if this epoch is equal to the provided epoch; false otherwise. + */ public boolean is(Epoch other) { return equals(other); @@ -161,11 +243,19 @@ public class Epoch implements Comparable<Epoch>, Serializable '}'; } + /** + * Retrieves the epoch time value. + * + * @return the long value of the epoch. + */ public long getEpoch() { return epoch; } + /** + * Serializer that serialize an {@code Epoch} as an unsigned Vint. + */ public static class EpochSerializer implements MetadataSerializer<Epoch> { // convenience methods for messageSerializer et al diff --git a/src/java/org/apache/cassandra/tcm/MetadataSnapshots.java b/src/java/org/apache/cassandra/tcm/MetadataSnapshots.java index e2491d0d59..0117147dcc 100644 --- a/src/java/org/apache/cassandra/tcm/MetadataSnapshots.java +++ b/src/java/org/apache/cassandra/tcm/MetadataSnapshots.java @@ -19,52 +19,38 @@ package org.apache.cassandra.tcm; import java.io.IOException; -import java.nio.ByteBuffer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.db.SystemKeyspace; -import org.apache.cassandra.io.util.DataInputBuffer; -import org.apache.cassandra.io.util.DataOutputBuffer; -import org.apache.cassandra.tcm.serialization.VerboseMetadataSerializer; -import org.apache.cassandra.tcm.serialization.Version; +/** + * {@code MetadataSnapshots} allow to store and retrieve cluster metadata snapshots. + * Snapshots are optimizations used to make local startup quicker or allow faster catch up by avoiding having to replay + * the all transformation history. + */ public interface MetadataSnapshots { Logger logger = LoggerFactory.getLogger(MetadataSnapshots.class); ClusterMetadata getLatestSnapshotAfter(Epoch epoch); - ClusterMetadata getSnapshot(Epoch epoch); - void storeSnapshot(ClusterMetadata metadata); - - static ByteBuffer toBytes(ClusterMetadata metadata) throws IOException - { - Version serializationVersion = Version.minCommonSerializationVersion(); - long serializedSize = VerboseMetadataSerializer.serializedSize(ClusterMetadata.serializer, metadata, serializationVersion); - ByteBuffer bytes = ByteBuffer.allocate((int) serializedSize); - try (DataOutputBuffer dob = new DataOutputBuffer(bytes)) - { - VerboseMetadataSerializer.serialize(ClusterMetadata.serializer, metadata, dob, serializationVersion); - } - bytes.flip().rewind(); - return bytes; - } - - @SuppressWarnings("resource") - static ClusterMetadata fromBytes(ByteBuffer serialized) throws IOException - { - if (serialized == null) - return null; - - return VerboseMetadataSerializer.deserialize(ClusterMetadata.serializer, - new DataInputBuffer(serialized, false)); - } + /** + * Retrieves the cluster metadata snapshot taken at the specified epoch. + * + * @param epoch the epoch for which the snapshot must be retrieved + * @return the cluster metadata snapshot for the specified epoch or {@code null} if no snapshot exists for the epoch. + */ + ClusterMetadata getSnapshot(Epoch epoch); - MetadataSnapshots NO_OP = new NoOp(); + /** + * Store the specified snapshot + * @param metadata the cluster metadata snapshot + */ + void storeSnapshot(ClusterMetadata metadata); - public class NoOp implements MetadataSnapshots + MetadataSnapshots NO_OP = new MetadataSnapshots() { @Override public ClusterMetadata getLatestSnapshotAfter(Epoch epoch) @@ -80,7 +66,7 @@ public interface MetadataSnapshots @Override public void storeSnapshot(ClusterMetadata metadata) {} - } + }; class SystemKeyspaceMetadataSnapshots implements MetadataSnapshots { @@ -96,7 +82,7 @@ public interface MetadataSnapshots { try { - return fromBytes(SystemKeyspace.getSnapshot(epoch)); + return ClusterMetadata.fromBytes(SystemKeyspace.getSnapshot(epoch)); } catch (IOException e) { @@ -110,7 +96,7 @@ public interface MetadataSnapshots { try { - SystemKeyspace.storeSnapshot(metadata.epoch, metadata.period, toBytes(metadata)); + SystemKeyspace.storeSnapshot(metadata.epoch, metadata.period, ClusterMetadata.toBytes(metadata)); } catch (IOException e) { diff --git a/src/java/org/apache/cassandra/tcm/MetadataValue.java b/src/java/org/apache/cassandra/tcm/MetadataValue.java index ca01b23389..a33863be22 100644 --- a/src/java/org/apache/cassandra/tcm/MetadataValue.java +++ b/src/java/org/apache/cassandra/tcm/MetadataValue.java @@ -18,8 +18,22 @@ package org.apache.cassandra.tcm; +/** + * A Cluster metadata element. + * @param <V> the {@code MetadataValue} object type. + */ public interface MetadataValue<V> { + /** + * Creates a copy of this {@code MetadataValue} instance with the specified epoch. + * @param epoch the new epoch + * @return a copy of this {@code MetadataValue} instance with the specified epoch + */ V withLastModified(Epoch epoch); + + /** + * Returns the epoch at which this value was last modified. + * @return the epoch at which this value was last modified. + */ Epoch lastModified(); } diff --git a/src/java/org/apache/cassandra/tcm/Period.java b/src/java/org/apache/cassandra/tcm/Period.java index 82363551d1..03c07ded32 100644 --- a/src/java/org/apache/cassandra/tcm/Period.java +++ b/src/java/org/apache/cassandra/tcm/Period.java @@ -41,6 +41,10 @@ import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; +/** + * Utility methods and classes to work with {@code Period}. + * + */ public class Period { private static final Logger logger = LoggerFactory.getLogger(Period.class); diff --git a/src/java/org/apache/cassandra/tcm/RecentlySealedPeriods.java b/src/java/org/apache/cassandra/tcm/RecentlySealedPeriods.java index b04ec56622..cc16ae656e 100644 --- a/src/java/org/apache/cassandra/tcm/RecentlySealedPeriods.java +++ b/src/java/org/apache/cassandra/tcm/RecentlySealedPeriods.java @@ -36,8 +36,11 @@ import org.apache.cassandra.config.CassandraRelevantProperties; */ public class RecentlySealedPeriods { - private static final Sealed[] EMPTY_ARRAY = new Sealed[0]; - public static final RecentlySealedPeriods EMPTY = new RecentlySealedPeriods(EMPTY_ARRAY); + public static final RecentlySealedPeriods EMPTY = new RecentlySealedPeriods(new Sealed[0]); + + /** + * The maximum number of sealed periods stored in memory. + */ private int maxSize = CassandraRelevantProperties.TCM_RECENTLY_SEALED_PERIOD_INDEX_SIZE.getInt(); private Sealed[] recent; @@ -57,6 +60,7 @@ public class RecentlySealedPeriods return new RecentlySealedPeriods(recent.toArray(new Sealed[recent.size()])); } + public RecentlySealedPeriods with(Epoch epoch, long period) { if (recent == null) diff --git a/src/java/org/apache/cassandra/tcm/Retry.java b/src/java/org/apache/cassandra/tcm/Retry.java index 6e5b74a8da..33b04fa372 100644 --- a/src/java/org/apache/cassandra/tcm/Retry.java +++ b/src/java/org/apache/cassandra/tcm/Retry.java @@ -29,10 +29,19 @@ import org.apache.cassandra.utils.Clock; import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; import static org.apache.cassandra.tcm.Retry.Jitter.MAX_JITTER_MS; +/** + * Represents a strategy for retrying operations in the event of failures or issues. + * It provides mechanisms to manage and control retry attempts, respecting certain conditions and utilizing different + * backoff strategies to avoid overwhelming systems and to handle transient failures gracefully. + */ public abstract class Retry { protected static final int MAX_TRIES = DatabaseDescriptor.getCmsDefaultRetryMaxTries(); protected final int maxTries; + + /** + * The number of attempts made so far. + */ protected int tries; protected Meter retryMeter; @@ -47,16 +56,28 @@ public abstract class Retry this.retryMeter = retryMeter; } + /** + * Returns the current number of attempts. + * @return the current number of attempts. + */ public int currentTries() { return tries; } + /** + * Determines whether the retry strategy has reached the maximum retry attempts or has surpassed the deadline. + * + * @return {@code true} if the maximum retry attempts are reached or the deadline has been surpassed; {@code false} otherwise. + */ public boolean reachedMax() { return tries >= maxTries; } + /** + * Sleep if needed. + */ public void maybeSleep() { tries++; @@ -64,12 +85,25 @@ public abstract class Retry sleepUninterruptibly(sleepFor(), TimeUnit.MILLISECONDS); } + /** + * Determines the duration to sleep before the next retry attempt. + * + * @return the duration to sleep in milliseconds. + */ protected abstract long sleepFor(); + /** + * Represents a retry strategy that introduces a randomized delay (jitter) + * between retry attempts. + */ public static class Jitter extends Retry { public static final int MAX_JITTER_MS = Math.toIntExact(DatabaseDescriptor.getDefaultRetryBackoff().to(TimeUnit.MILLISECONDS)); private final Random random; + + /** + * The maximum ammount of jitter per milliseconds + */ private final int maxJitterMs; public Jitter(Meter retryMeter) @@ -84,6 +118,7 @@ public abstract class Retry this.maxJitterMs = maxJitterMs; } + @Override public long sleepFor() { int actualBackoff = ThreadLocalRandom.current().nextInt(maxJitterMs / 2, maxJitterMs); @@ -91,11 +126,24 @@ public abstract class Retry } } + /** + * Retry strategy that introduces a fixed or exponentially increasing delay between retry attempts, + * allowing for a more conservative retry approach. + */ public static class Backoff extends Retry { private static final int RETRY_BACKOFF_MS = Math.toIntExact(DatabaseDescriptor.getDefaultRetryBackoff().to(TimeUnit.MILLISECONDS)); + + /** + * The initial delay between retries in milliseconds + */ protected final int backoffMs; + /** + * Constructs a new {@code Backoff} instance using specified retry meter. + * + * @param retryMeter a {@code Meter} instance used to keep track of retry attempts. + */ public Backoff(Meter retryMeter) { this(MAX_TRIES, RETRY_BACKOFF_MS, retryMeter); @@ -107,6 +155,7 @@ public abstract class Retry this.backoffMs = backoffMs; } + @Override public long sleepFor() { return (long) tries * backoffMs; @@ -123,9 +172,20 @@ public abstract class Retry } } + /** + * {@code Retry} strategy that enforces a deadline, ensuring that retry attempts are halted after a certain point in time, + * preventing indefinite retries. + * <p>{@code Deadline} will retry using provided delegate but will ensure that the attempts stop after the deadline has been reached.</p> + */ public static class Deadline extends Retry { + /** + * The deadline in nanoseconds + */ public final long deadlineNanos; + /** + * The decorated {@code Retry} used to perform the retry attempts under the hood. + */ protected final Retry delegate; private Deadline(long deadlineNanos, Retry delegate) @@ -136,11 +196,25 @@ public abstract class Retry this.delegate = delegate; } + /** + * Creates a {@code Deadline} that will stop retrying at the specified time. + * + * @param deadlineNanos the number of nanoseconds at which the {@code Deadline} should stop retrying + * @param delegate the {@code Retry} to which the {@code Deadline} will delegate the retries. + * @return a new {@code Deadline} that will stop retrying at the specified time. + */ public static Deadline at(long deadlineNanos, Retry delegate) { return new Deadline(deadlineNanos, delegate); } + /** + * Creates a {@code Deadline} that will stop retrying after the specified amount of nanoseconds. + * + * @param timeoutNanos the number of nanoseconds after which the {@code Deadline} should stop retrying + * @param delegate the {@code Retry} to which the {@code Deadline} will delegate the retries. + * @return a new {@code Deadline} that will stop retrying after the specified amount of nanoseconds. + */ public static Deadline after(long timeoutNanos, Retry delegate) { return new Deadline(Clock.Global.nanoTime() + timeoutNanos, delegate); @@ -176,6 +250,11 @@ public abstract class Retry return delegate.reachedMax() || Clock.Global.nanoTime() > deadlineNanos; } + /** + * Calculates the remaining time until the deadline. + * + * @return the remaining time in nanoseconds. + */ public long remainingNanos() { return Math.max(0, deadlineNanos - Clock.Global.nanoTime()); diff --git a/src/java/org/apache/cassandra/tcm/Sealed.java b/src/java/org/apache/cassandra/tcm/Sealed.java index fc5377ae0e..574131b626 100644 --- a/src/java/org/apache/cassandra/tcm/Sealed.java +++ b/src/java/org/apache/cassandra/tcm/Sealed.java @@ -30,12 +30,39 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.config.CassandraRelevantProperties; import org.apache.cassandra.db.SystemKeyspace; +/** + * A period that has been sealed. + * <p> + * A period is an implementation detail of having the logs stored in C* tables. It is the partition key of + * the log tables (system.local_metadata_log and cluster_metadata.distributed_metadata_log) and allows to keep partitions + * to a manageable size. Periods are "sealed" when the period number is bumped and a new partition is used to store transformations in the log table. + * </p> + * <p> + * When a new created epoch is a multiple of the {@code metadata_snapshot_frequency} the {@code LocalLog} snapshot listener + * will attempt to commit a {@code SealPeriod} transformation. The sealing of the period in the local log will trigger the {@code MetadataSnapshotListener} + * that will create a snaphot of the cluster metadata. + * The size of a sealed period is therefore controlled by the {@code metadata_snapshot_frequency} yaml property that control when + * {@code SealPeriod} transformations are created. + * + * </p> + * @see org.apache.cassandra.tcm.log.LocalLog + * @see ClusterMetadataService#sealPeriod() + * @see org.apache.cassandra.tcm.listeners.MetadataSnapshotListener + */ public class Sealed implements Comparable<Sealed> { private static final Logger logger = LoggerFactory.getLogger(Sealed.class); public static final Sealed EMPTY = new Sealed(Period.EMPTY, Epoch.EMPTY); + + /** + * The period number + */ public final long period; + + /** + * The latest epoch of the period + */ public final Epoch epoch; private static final AtomicReference<RecentlySealedPeriods> index = new AtomicReference<>(RecentlySealedPeriods.EMPTY); diff --git a/src/java/org/apache/cassandra/tcm/Startup.java b/src/java/org/apache/cassandra/tcm/Startup.java index 681d2e79fd..f52a21f9fd 100644 --- a/src/java/org/apache/cassandra/tcm/Startup.java +++ b/src/java/org/apache/cassandra/tcm/Startup.java @@ -223,6 +223,13 @@ public class Startup assert cmGossip.equals(initial) : cmGossip + " != " + initial; } + /** + * Initializes or re-initializes the {@code ClusterMetadata} from the serialized data stored in the specified file. + * @param fileName the name of the file containing the serialized {@code ClusterMetadata}. + * @param wrapProcessor allow to wrap the processor for testing needs + * @param initMessaging allow the system to wait for messaging to be ready + * @throws IOException if the {@code ClusterMetadata} could not be deserialized + */ public static void reinitializeWithClusterMetadata(String fileName, Function<Processor, Processor> wrapProcessor, Runnable initMessaging) throws IOException { // First set a minimal ClusterMetadata as some deserialization depends @@ -267,10 +274,22 @@ public class Startup enum StartupMode { + /** + * The node will initialize as a non-CMS node. + */ NORMAL, + /** + * The node will transition from the gossip protocol to CMS. + */ UPGRADE, VOTE, + /** + * The node will start as the first node from the CMS + */ FIRST_CMS, + /** + * The node will use the existing {@code ClusterMetadata} provided through a file + */ BOOT_WITH_CLUSTERMETADATA; static StartupMode get(Set<InetAddressAndPort> seeds) diff --git a/src/java/org/apache/cassandra/tcm/Transformation.java b/src/java/org/apache/cassandra/tcm/Transformation.java index 4c58020d1c..50dd6a0f82 100644 --- a/src/java/org/apache/cassandra/tcm/Transformation.java +++ b/src/java/org/apache/cassandra/tcm/Transformation.java @@ -43,14 +43,31 @@ import org.apache.cassandra.tcm.transformations.cms.PreInitialize; import org.apache.cassandra.tcm.transformations.cms.RemoveFromCMS; import org.apache.cassandra.tcm.transformations.cms.StartAddToCMS; +/** + * A {@code ClusterMetadata} transformation. + * <p>{@code Transformation} are stored in the local and distributed cluster log and allow to rebuild the latest state of the cluster incrementally.</p> + */ public interface Transformation { Serializer serializer = new Serializer(); + /** + * Returns the transformation type. + * @return the transformation type. + */ Kind kind(); + /** + * Execute this transformation on the specified {@code ClusterMetadata}. + * @param metadata the {@code ClusterMetadata} on which the transformation must be performed. + * @return the result of the operation. + */ Result execute(ClusterMetadata metadata); + /** + * Checks if this transformation is allowed during upgrades. + * @return {@code true} if this transformation is allowed during upgrade, {@code false} otherwise. + */ default boolean allowDuringUpgrades() { return false; @@ -62,8 +79,15 @@ public interface Transformation return new Success(transformed.metadata, affectedRanges, transformed.modifiedKeys); } + /** + * Describes if the transformation was successful or not and provide information on the outcome. + */ interface Result { + /** + * Checks is the operation was successful. + * @return {@code true} if the operation was successful, {@code false} otherwise. + */ boolean isSuccess(); boolean isRejected(); @@ -152,11 +176,15 @@ public interface Transformation } } + /** + * A transformation kind/type. + */ enum Kind { PRE_INITIALIZE_CMS(() -> PreInitialize.serializer), INITIALIZE_CMS(() -> Initialize.serializer), FORCE_SNAPSHOT(() -> ForceSnapshot.serializer), + // Transformation that seals the period and trigger the creation of a snapshot. SEAL_PERIOD(() -> SealPeriod.serializer), SCHEMA_CHANGE(() -> AlterSchema.serializer), REGISTER(() -> Register.serializer), @@ -219,6 +247,12 @@ public interface Transformation return bb; } + /** + * Deserializes the specified bytes into a {@code Transformation} + * @param bb the bytes representing the transformation + * @return the {@code Transformation} + * @throws IOException if the {@code Transformation} cannot be deserialized + */ public Transformation fromVersionedBytes(ByteBuffer bb) throws IOException { try (DataInputBuffer in = new DataInputBuffer(bb, true)) diff --git a/src/java/org/apache/cassandra/tcm/listeners/ChangeListener.java b/src/java/org/apache/cassandra/tcm/listeners/ChangeListener.java index 35bd754b4f..6a8240f51e 100644 --- a/src/java/org/apache/cassandra/tcm/listeners/ChangeListener.java +++ b/src/java/org/apache/cassandra/tcm/listeners/ChangeListener.java @@ -22,8 +22,8 @@ import org.apache.cassandra.tcm.ClusterMetadata; /** * Invoked when cluster metadata changes - * - * `next` epoch is not guaranteed to directly follow `prev` epoch + * <p> + * {@code next} epoch is not guaranteed to directly follow {@code prev} epoch */ public interface ChangeListener { diff --git a/src/java/org/apache/cassandra/tcm/listeners/MetadataSnapshotListener.java b/src/java/org/apache/cassandra/tcm/listeners/MetadataSnapshotListener.java index 3a2e6a4ec0..3467cbf629 100644 --- a/src/java/org/apache/cassandra/tcm/listeners/MetadataSnapshotListener.java +++ b/src/java/org/apache/cassandra/tcm/listeners/MetadataSnapshotListener.java @@ -27,6 +27,13 @@ import org.apache.cassandra.tcm.Sealed; import org.apache.cassandra.tcm.Transformation; import org.apache.cassandra.tcm.log.Entry; +/** + * {@code LogListener} that store a snapshot of the cluster metadata when a period has been sealed. + * <p> + * Snapshots can be used to make local startup quicker as the node doesn't have to replay all the history of the metadata log. + * Likewise, when a node wants to catch up from a peer or the CMS, the responder may send the most recent snapshot + additional entries (i.e a Replication). + * </p> + */ public class MetadataSnapshotListener implements LogListener { private static final Logger logger = LoggerFactory.getLogger(MetadataSnapshotListener.class); diff --git a/src/java/org/apache/cassandra/tcm/log/Entry.java b/src/java/org/apache/cassandra/tcm/log/Entry.java index 59ee35ba2c..588480132a 100644 --- a/src/java/org/apache/cassandra/tcm/log/Entry.java +++ b/src/java/org/apache/cassandra/tcm/log/Entry.java @@ -34,6 +34,23 @@ import org.apache.cassandra.tcm.serialization.Version; import org.apache.cassandra.utils.Clock; import org.apache.cassandra.utils.FBUtilities; +/** + * Represents a log entry. + * <p>A log entry contains: + * <ul> + * <li>an ID encoded on 8 bytes. An ID is composed of the node address encoded on the most significant 4 bytes + * and of an incremented value encoded on the least significant 4 bytes</li> + * <li>the epoch associated to the transformation</li> + * <li>the transformation representing the Cluster Metadata change</li> + * </ul> + * </p> + * <p> + * Log entries are stored in the {@code system} local keyspace in the {@code local_metadata_log} table + * and in the distributed {@code cluster_metadata} keyspace in the {@code distributed_metadata_log} table. + * </p> + * @see org.apache.cassandra.db.SystemKeyspace + * @see org.apache.cassandra.schema.DistributedMetadataLogKeyspace + */ public class Entry implements Comparable<Entry> { public static final Serializer serializer = new Serializer(); diff --git a/src/java/org/apache/cassandra/tcm/log/LocalLog.java b/src/java/org/apache/cassandra/tcm/log/LocalLog.java index 8455c8098c..c1219aaa3e 100644 --- a/src/java/org/apache/cassandra/tcm/log/LocalLog.java +++ b/src/java/org/apache/cassandra/tcm/log/LocalLog.java @@ -72,10 +72,24 @@ import static org.apache.cassandra.tcm.Epoch.FIRST; import static org.apache.cassandra.utils.concurrent.WaitQueue.newWaitQueue; // TODO metrics for contention/buffer size/etc + +/** + * The {@code LocalLog} represent a local copy of the global cluster metadata log. + * <p> + * Every peers persist a local copy of the metadata log so they can: + * <ul> + * <li>replay up to the last known point when they start up. This is an optimisation as peers could interrogate the CMS every time if necessary.</li> + * <li>help peers catch up if they detect they are out of sync during a read/write operation. A local copy allows peers to catch each other while putting less load on the CMS.</li> + * </p> + */ public abstract class LocalLog implements Closeable { private static final Logger logger = LoggerFactory.getLogger(LocalLog.class); + /** + * The current {@code ClusterMetadata} for this node. + * This + */ protected final AtomicReference<ClusterMetadata> committed; /** @@ -209,6 +223,10 @@ public abstract class LocalLog implements Closeable return persistence.getReplication(since); } + /** + * + * @return + */ public ClusterMetadata waitForHighestConsecutive() { runOnce(); @@ -355,7 +373,7 @@ public abstract class LocalLog implements Closeable if (committed.compareAndSet(prev, next)) { logger.info("Enacted {}. New tail is {}", pendingEntry.transform, next.epoch); - maybeNotifyListeners(pendingEntry, transformed); + notifyLogListeners(pendingEntry, transformed); } else { @@ -409,7 +427,12 @@ public abstract class LocalLog implements Closeable return waitForHighestConsecutive().epoch; } - private void maybeNotifyListeners(Entry entry, Transformation.Result result) + /** + * + * @param entry + * @param result + */ + private void notifyLogListeners(Entry entry, Transformation.Result result) { for (LogListener listener : listeners) listener.notify(entry, result); @@ -679,6 +702,10 @@ public abstract class LocalLog implements Closeable addListener(new UpgradeMigrationListener()); } + /** + * Creates a {@code LogListener} that will take a snapshot of the cluster metadata + * @return + */ private LogListener snapshotListener() { return (entry, metadata) -> { diff --git a/src/java/org/apache/cassandra/tcm/log/LogState.java b/src/java/org/apache/cassandra/tcm/log/LogState.java index 4518659e69..fc1ef8a528 100644 --- a/src/java/org/apache/cassandra/tcm/log/LogState.java +++ b/src/java/org/apache/cassandra/tcm/log/LogState.java @@ -42,6 +42,10 @@ import org.apache.cassandra.tcm.serialization.VerboseMetadataSerializer; import org.apache.cassandra.tcm.serialization.Version; import org.apache.cassandra.utils.JVMStabilityInspector; +/** + * A {@code LogState} represents the state of a cluster metadata log as a base state {@code ClusterMetadata) + * and a series of transformations applied to that base state. + */ public class LogState { private static final Logger logger = LoggerFactory.getLogger(LogState.class); @@ -59,7 +63,14 @@ public class LogState return cached; } + /** + * The base state. + */ public final ClusterMetadata baseState; + + /** + * The log entries containing the transformations that need to be applied to the base state to recreate the final state. + */ public final Replication transformations; // Uses Replication rather than an just a list of entries primarily to avoid duplicating the existing serializer diff --git a/src/java/org/apache/cassandra/tcm/log/LogStorage.java b/src/java/org/apache/cassandra/tcm/log/LogStorage.java index e35dcfb2b1..6d3627b402 100644 --- a/src/java/org/apache/cassandra/tcm/log/LogStorage.java +++ b/src/java/org/apache/cassandra/tcm/log/LogStorage.java @@ -22,7 +22,14 @@ import org.apache.cassandra.tcm.Epoch; public interface LogStorage extends LogReader { + /** + * Appends the log entry to the period + * @param period the period to which the log entry belong + * @param entry the log entry + */ void append(long period, Entry entry); + + LogState getLogState(Epoch since); /** @@ -34,6 +41,7 @@ public interface LogStorage extends LogReader * table up to the last snapshot at any given time. */ LogStorage SystemKeyspace = new SystemKeyspaceStorage(); + LogStorage None = new NoOpLogStorage(); class NoOpLogStorage implements LogStorage diff --git a/src/java/org/apache/cassandra/tcm/log/Replication.java b/src/java/org/apache/cassandra/tcm/log/Replication.java index a4ef111252..2c3d64ddbf 100644 --- a/src/java/org/apache/cassandra/tcm/log/Replication.java +++ b/src/java/org/apache/cassandra/tcm/log/Replication.java @@ -44,6 +44,9 @@ import org.apache.cassandra.tcm.serialization.MetadataSerializer; import org.apache.cassandra.tcm.serialization.VerboseMetadataSerializer; import org.apache.cassandra.tcm.serialization.Version; +/** + * A sorted set of log entries mainly used to replicate log entries within the cluster. + */ public class Replication { private static final Logger logger = LoggerFactory.getLogger(Replication.class); @@ -63,7 +66,9 @@ public class Replication return cached; } - + /** + * The sorted entries that needs to be replicated. + */ private final ImmutableList<Entry> entries; public Replication(Collection<Entry> entries) @@ -94,6 +99,12 @@ public class Replication return entries; } + /** + * Retains only the log entries that have an epoch greater or equal to the specified epoch. + * + * @param epoch the epoch + * @return only the log entries that have an epoch greater or equal to the specified epoch. + */ public Replication retainFrom(Epoch epoch) { ImmutableList.Builder<Entry> builder = ImmutableList.builder(); @@ -101,6 +112,10 @@ public class Replication return new Replication(builder.build()); } + /** + * Returns the epoch of the last entry/transformation. + * @return the epoch of the last entry/transformation. + */ public Epoch latestEpoch() { return tail().epoch; @@ -117,6 +132,11 @@ public class Replication return entries.isEmpty(); } + /** + * Appends the entries to the specified log. + * @param log the log to append to + * @return the new current epoch + */ public Epoch apply(LocalLog log) { log.append(entries()); diff --git a/src/java/org/apache/cassandra/tcm/log/SystemKeyspaceStorage.java b/src/java/org/apache/cassandra/tcm/log/SystemKeyspaceStorage.java index 83c443731b..3d5928efde 100644 --- a/src/java/org/apache/cassandra/tcm/log/SystemKeyspaceStorage.java +++ b/src/java/org/apache/cassandra/tcm/log/SystemKeyspaceStorage.java @@ -39,6 +39,9 @@ import org.apache.cassandra.tcm.Transformation; import static org.apache.cassandra.cql3.QueryProcessor.executeInternal; import static org.apache.cassandra.db.SystemKeyspace.LocalMetadataLog; +/** + * {@code LogStorage} storing CMS log entries into the {@code system.local_metadata_log} table. + */ public class SystemKeyspaceStorage implements LogStorage { private static final Logger logger = LoggerFactory.getLogger(SystemKeyspaceStorage.class); @@ -67,6 +70,7 @@ public class SystemKeyspaceStorage implements LogStorage } // This method is always called from a single thread, so doesn't have to be synchonised. + @Override public void append(long period, Entry entry) { try @@ -86,6 +90,10 @@ public class SystemKeyspaceStorage implements LogStorage } } + /** + * Checks if the metdata log table contains at least one row. + * @return {@code true} if the metdata log table contains at least one row, {@code false} otherwise. + */ public synchronized static boolean hasAnyEpoch() { String query = String.format("SELECT epoch FROM %s.%s LIMIT 1", SchemaConstants.SYSTEM_KEYSPACE_NAME, NAME); @@ -106,12 +114,10 @@ public class SystemKeyspaceStorage implements LogStorage * collating log entries which follow the supplied epoch. It is assumed that the * target epoch is found in the starting period, so any entries returned will be * from either the starting period or subsequent periods. + * @param startPeriod * @param since target epoch * @return contiguous list of log entries which follow the given epoch, * which may be empty - * @param startPeriod - * @param since - * @return */ public Replication getReplication(long startPeriod, Epoch since) { diff --git a/src/java/org/apache/cassandra/tcm/migration/Election.java b/src/java/org/apache/cassandra/tcm/migration/Election.java index b37b095f77..155cd509ff 100644 --- a/src/java/org/apache/cassandra/tcm/migration/Election.java +++ b/src/java/org/apache/cassandra/tcm/migration/Election.java @@ -179,6 +179,17 @@ public class Election return coordinator != null && coordinator != MIGRATED; } + /** + * Sends the specified message to specified nodes and wait for the responses. + * + * @param messaging the messaging service to use to send the message to the different nodes + * @param sendTo the nodes to which the message must be sent + * @param verb the verb used to send the message + * @param payload the message payload + * @return the responses per node + * @param <REQ> the request type + * @param <RSP> the response type + */ public static <REQ, RSP> Collection<Pair<InetAddressAndPort, RSP>> fanoutAndWait(MessageDelivery messaging, Set<InetAddressAndPort> sendTo, Verb verb, REQ payload) { Accumulator<Pair<InetAddressAndPort, RSP>> responses = new Accumulator<>(sendTo.size()); diff --git a/src/java/org/apache/cassandra/tcm/ownership/UniformRangePlacement.java b/src/java/org/apache/cassandra/tcm/ownership/UniformRangePlacement.java index febe9518b0..0d77b205eb 100644 --- a/src/java/org/apache/cassandra/tcm/ownership/UniformRangePlacement.java +++ b/src/java/org/apache/cassandra/tcm/ownership/UniformRangePlacement.java @@ -43,17 +43,17 @@ import org.apache.cassandra.tcm.membership.NodeId; * The defining feature of this placement stategy is that all layouts (i.e. replication params) use the same * set of ranges. So when splitting the current ranges, we only need to calculate the splits once and apply to * all existing placements. - * + * <p> * Also, when using this strategy, the read and write placements should (eventually) be identical. While range * movements/bootstraps/decommissions are in-flight, this will not be the case as the read and write replica * sets will diverge while nodes are acquiring/relinquishing ranges. Although there may always be such operations * ongoing, this is technically a temporary state. - * + * <p> * Because of this, when calculating the steps to transition between the current state and a proposed new state, * we work from the associated TokenMaps, the assumption being that eventually both the read and write placements * will converge and will, at that point, reflect those TokenMaps. * This means that the starting point of each transition is the intended end state of the preceding transitions. - * + * <p> * To do this calculation, we create a canonical DataPlacement from the current TokenMap and split it according * to the proposed tokens. As we iterate through and split the existing ranges, we construct a new DataPlacement for * each currently defined. There is no movement of data between the initial and new placements, only splitting of diff --git a/src/java/org/apache/cassandra/tcm/transformations/SealPeriod.java b/src/java/org/apache/cassandra/tcm/transformations/SealPeriod.java index dc34c6a8b5..05f0b717eb 100644 --- a/src/java/org/apache/cassandra/tcm/transformations/SealPeriod.java +++ b/src/java/org/apache/cassandra/tcm/transformations/SealPeriod.java @@ -35,7 +35,7 @@ import static org.apache.cassandra.exceptions.ExceptionCode.INVALID; * asynchonous action, and we generally do not rely on the fact snapshot is, in fact going to be * there all the time. Snapshots are used as a performance optimization. */ -public class SealPeriod implements Transformation +public final class SealPeriod implements Transformation { public static final Serializer serializer = new Serializer(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org