This is an automated email from the ASF dual-hosted git repository. marcuse pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push: new 17ecece543 ForceSnapshot transformations should not be persisted in the local log table 17ecece543 is described below commit 17ecece5437ab39aaeaa0eb4b42434cddd9960b5 Author: Sam Tunnicliffe <s...@apache.org> AuthorDate: Thu Dec 14 17:55:05 2023 +0000 ForceSnapshot transformations should not be persisted in the local log table Patch by Sam Tunnicliffe; reviewed by marcuse for CASSANDRA-19190 --- .../apache/cassandra/schema/DistributedSchema.java | 11 +- .../org/apache/cassandra/tcm/ClusterMetadata.java | 2 +- .../cassandra/tcm/StubClusterMetadataService.java | 83 ++++++++++++- .../tcm/listeners/MetadataSnapshotListener.java | 10 +- .../org/apache/cassandra/tcm/log/LocalLog.java | 6 +- .../test/log/ClusterMetadataTestHelper.java | 19 ++- .../listeners/MetadataSnapshotListenerTest.java | 133 +++++++++++++++++++++ .../org/apache/cassandra/tcm/log/LocalLogTest.java | 54 +++++++++ 8 files changed, 310 insertions(+), 8 deletions(-) diff --git a/src/java/org/apache/cassandra/schema/DistributedSchema.java b/src/java/org/apache/cassandra/schema/DistributedSchema.java index 86dd1d5117..a837b0773d 100644 --- a/src/java/org/apache/cassandra/schema/DistributedSchema.java +++ b/src/java/org/apache/cassandra/schema/DistributedSchema.java @@ -58,9 +58,16 @@ public class DistributedSchema implements MetadataValue<DistributedSchema> return new DistributedSchema(Keyspaces.none(), Epoch.EMPTY); } - public static DistributedSchema first() + public static DistributedSchema first(Set<String> knownDatacenters) { - return new DistributedSchema(Keyspaces.of(DistributedMetadataLogKeyspace.initialMetadata(Collections.singleton(DatabaseDescriptor.getLocalDataCenter()))), Epoch.FIRST); + if (knownDatacenters.isEmpty()) + { + if (DatabaseDescriptor.getLocalDataCenter() != null) + knownDatacenters = Collections.singleton(DatabaseDescriptor.getLocalDataCenter()); + else + knownDatacenters = Collections.singleton("DC1"); + } + return new DistributedSchema(Keyspaces.of(DistributedMetadataLogKeyspace.initialMetadata(knownDatacenters)), Epoch.FIRST); } private final Keyspaces keyspaces; diff --git a/src/java/org/apache/cassandra/tcm/ClusterMetadata.java b/src/java/org/apache/cassandra/tcm/ClusterMetadata.java index 33886bec40..fdf4942c13 100644 --- a/src/java/org/apache/cassandra/tcm/ClusterMetadata.java +++ b/src/java/org/apache/cassandra/tcm/ClusterMetadata.java @@ -107,7 +107,7 @@ public class ClusterMetadata @VisibleForTesting public ClusterMetadata(IPartitioner partitioner, Directory directory) { - this(partitioner, directory, DistributedSchema.first()); + this(partitioner, directory, DistributedSchema.first(directory.knownDatacenters())); } @VisibleForTesting diff --git a/src/java/org/apache/cassandra/tcm/StubClusterMetadataService.java b/src/java/org/apache/cassandra/tcm/StubClusterMetadataService.java index 475e8ef21b..8e191307d1 100644 --- a/src/java/org/apache/cassandra/tcm/StubClusterMetadataService.java +++ b/src/java/org/apache/cassandra/tcm/StubClusterMetadataService.java @@ -20,15 +20,24 @@ package org.apache.cassandra.tcm; import java.util.Collections; +import com.google.common.collect.ImmutableMap; + import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.schema.DistributedMetadataLogKeyspace; import org.apache.cassandra.schema.DistributedSchema; import org.apache.cassandra.schema.KeyspaceMetadata; import org.apache.cassandra.schema.Keyspaces; +import org.apache.cassandra.tcm.Commit.Replicator; import org.apache.cassandra.tcm.log.Entry; import org.apache.cassandra.tcm.log.LocalLog; import org.apache.cassandra.tcm.membership.Directory; +import org.apache.cassandra.tcm.ownership.DataPlacements; +import org.apache.cassandra.tcm.ownership.PlacementProvider; +import org.apache.cassandra.tcm.ownership.TokenMap; import org.apache.cassandra.tcm.ownership.UniformRangePlacement; +import org.apache.cassandra.tcm.sequences.InProgressSequences; +import org.apache.cassandra.tcm.sequences.LockedRanges; public class StubClusterMetadataService extends ClusterMetadataService { @@ -73,12 +82,24 @@ public class StubClusterMetadataService extends ClusterMetadataService .withInitialState(initial) .createLog(), new StubProcessor(), - Commit.Replicator.NO_OP, + Replicator.NO_OP, false); this.metadata = initial; this.log().readyUnchecked(); } + private StubClusterMetadataService(PlacementProvider placement, + MetadataSnapshots snapshots, + LocalLog log, + Processor processor, + Replicator replicator, + boolean isMember) + { + super(placement, snapshots, log, processor, replicator, isMember); + this.metadata = log.metadata(); + this.log().readyUnchecked(); + } + @Override public <T1> T1 commit(Transformation transform, CommitSuccessHandler<T1> onSuccess, CommitFailureHandler<T1> onFailure) { @@ -125,4 +146,64 @@ public class StubClusterMetadataService extends ClusterMetadataService throw new UnsupportedOperationException(); } } + + + public static Builder builder() + { + return new Builder(); + } + + public static Builder builder(IPartitioner partitioner) + { + return new Builder(partitioner); + } + + public static class Builder + { + IPartitioner partitioner; + ClusterMetadata initial; + MetadataSnapshots snapshots = MetadataSnapshots.NO_OP; + + public StubClusterMetadataService build() + { + if (initial == null) + initial = new ClusterMetadata(Epoch.EMPTY, + partitioner, + DistributedSchema.empty(), + Directory.EMPTY, + new TokenMap(partitioner), + DataPlacements.EMPTY, + LockedRanges.EMPTY, + InProgressSequences.EMPTY, + ImmutableMap.of()); + return new StubClusterMetadataService(new UniformRangePlacement(), + snapshots != null ? snapshots : MetadataSnapshots.NO_OP, + LocalLog.logSpec().withInitialState(initial).createLog(), + new StubProcessor(), + Replicator.NO_OP, + false); + } + + private Builder() + { + this(DatabaseDescriptor.getPartitioner()); + } + + private Builder(IPartitioner partitioner) + { + this.partitioner = partitioner; + } + + public Builder withInitial(ClusterMetadata initial) + { + this.initial = initial; + return this; + } + + public Builder withSnapshots(MetadataSnapshots snapshots) + { + this.snapshots = snapshots; + return this; + } + } } diff --git a/src/java/org/apache/cassandra/tcm/listeners/MetadataSnapshotListener.java b/src/java/org/apache/cassandra/tcm/listeners/MetadataSnapshotListener.java index 72e3fd5f83..9d650fed51 100644 --- a/src/java/org/apache/cassandra/tcm/listeners/MetadataSnapshotListener.java +++ b/src/java/org/apache/cassandra/tcm/listeners/MetadataSnapshotListener.java @@ -18,6 +18,8 @@ package org.apache.cassandra.tcm.listeners; +import java.util.EnumSet; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -26,14 +28,20 @@ import org.apache.cassandra.tcm.ClusterMetadataService; import org.apache.cassandra.tcm.Transformation; import org.apache.cassandra.tcm.log.Entry; +import static org.apache.cassandra.tcm.Transformation.Kind.FORCE_SNAPSHOT; +import static org.apache.cassandra.tcm.Transformation.Kind.TRIGGER_SNAPSHOT; + public class MetadataSnapshotListener implements LogListener { private static final Logger logger = LoggerFactory.getLogger(MetadataSnapshotListener.class); + + private static final EnumSet<Transformation.Kind> triggers = EnumSet.of(TRIGGER_SNAPSHOT, FORCE_SNAPSHOT); + @Override public void notify(Entry entry, Transformation.Result result) { ClusterMetadata next = result.success().metadata; - if (entry.transform.kind() == Transformation.Kind.TRIGGER_SNAPSHOT) + if (triggers.contains(entry.transform.kind())) { try { diff --git a/src/java/org/apache/cassandra/tcm/log/LocalLog.java b/src/java/org/apache/cassandra/tcm/log/LocalLog.java index 7523fb2043..0307e49048 100644 --- a/src/java/org/apache/cassandra/tcm/log/LocalLog.java +++ b/src/java/org/apache/cassandra/tcm/log/LocalLog.java @@ -263,7 +263,7 @@ public abstract class LocalLog implements Closeable if (spec.initial == null) spec.initial = new ClusterMetadata(DatabaseDescriptor.getPartitioner()); if (spec.prev == null) - spec.prev = new ClusterMetadata(DatabaseDescriptor.getPartitioner()); + spec.prev = new ClusterMetadata(spec.initial.partitioner); assert spec.initial.epoch.is(EMPTY) || spec.initial.epoch.is(Epoch.UPGRADE_STARTUP) || spec.isReset : String.format(String.format("Should start with empty epoch, unless we're in upgrade or reset mode: %s (isReset: %s)", spec.initial, spec.isReset)); @@ -480,7 +480,9 @@ public abstract class LocalLog implements Closeable String.format("Epoch %s for %s can either force snapshot, or immediately follow %s", next.epoch, pendingEntry.transform, prev.epoch); - if (replayComplete.get()) + // If replay during initialisation has completed persist to local storage unless the entry is + // a synthetic ForceSnapshot which is not a replicated event but enables jumping over gaps + if (replayComplete.get() && pendingEntry.transform.kind() != Transformation.Kind.FORCE_SNAPSHOT) storage.append(pendingEntry.maybeUnwrapExecuted()); notifyPreCommit(prev, next, isSnapshot); diff --git a/test/distributed/org/apache/cassandra/distributed/test/log/ClusterMetadataTestHelper.java b/test/distributed/org/apache/cassandra/distributed/test/log/ClusterMetadataTestHelper.java index 2e7dd85352..99cc316e90 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/log/ClusterMetadataTestHelper.java +++ b/test/distributed/org/apache/cassandra/distributed/test/log/ClusterMetadataTestHelper.java @@ -33,6 +33,7 @@ import com.google.common.collect.Sets; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; + import org.apache.cassandra.ServerTestUtils.ResettableClusterMetadataService; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.cql3.QueryProcessor; @@ -62,19 +63,23 @@ import org.apache.cassandra.tcm.Epoch; import org.apache.cassandra.tcm.MetadataSnapshots; import org.apache.cassandra.tcm.Transformation; import org.apache.cassandra.tcm.log.LocalLog; +import org.apache.cassandra.tcm.membership.Directory; import org.apache.cassandra.tcm.membership.Location; import org.apache.cassandra.tcm.membership.NodeAddresses; import org.apache.cassandra.tcm.membership.NodeId; import org.apache.cassandra.tcm.membership.NodeState; import org.apache.cassandra.tcm.membership.NodeVersion; import org.apache.cassandra.tcm.ownership.DataPlacements; +import org.apache.cassandra.tcm.ownership.TokenMap; import org.apache.cassandra.tcm.ownership.UniformRangePlacement; import org.apache.cassandra.tcm.ownership.VersionedEndpoints; import org.apache.cassandra.tcm.sequences.BootstrapAndJoin; import org.apache.cassandra.tcm.sequences.BootstrapAndReplace; +import org.apache.cassandra.tcm.sequences.InProgressSequences; +import org.apache.cassandra.tcm.sequences.LockedRanges; +import org.apache.cassandra.tcm.sequences.Move; import org.apache.cassandra.tcm.sequences.LeaveStreams; import org.apache.cassandra.tcm.sequences.ReconfigureCMS; -import org.apache.cassandra.tcm.sequences.Move; import org.apache.cassandra.tcm.sequences.UnbootstrapAndLeave; import org.apache.cassandra.tcm.transformations.AlterSchema; import org.apache.cassandra.tcm.transformations.PrepareJoin; @@ -137,6 +142,18 @@ public class ClusterMetadataTestHelper return service; } + public static ClusterMetadata minimalForTesting(Epoch epoch, IPartitioner partitioner) + { + return new ClusterMetadata(epoch, Murmur3Partitioner.instance, + DistributedSchema.empty(), + Directory.EMPTY, + new TokenMap(partitioner), + DataPlacements.empty(), + LockedRanges.EMPTY, + InProgressSequences.EMPTY, + ImmutableMap.of()); + } + public static ClusterMetadata minimalForTesting(IPartitioner partitioner) { return new ClusterMetadata(Epoch.EMPTY, diff --git a/test/unit/org/apache/cassandra/tcm/listeners/MetadataSnapshotListenerTest.java b/test/unit/org/apache/cassandra/tcm/listeners/MetadataSnapshotListenerTest.java new file mode 100644 index 0000000000..2f962f1ab2 --- /dev/null +++ b/test/unit/org/apache/cassandra/tcm/listeners/MetadataSnapshotListenerTest.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.tcm.listeners; + +import java.util.Random; + +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.config.CassandraRelevantProperties; +import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.dht.Murmur3Partitioner; +import org.apache.cassandra.tcm.AtomicLongBackedProcessor; +import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.tcm.ClusterMetadataService; +import org.apache.cassandra.tcm.Epoch; +import org.apache.cassandra.tcm.MetadataSnapshots; +import org.apache.cassandra.tcm.StubClusterMetadataService; +import org.apache.cassandra.tcm.Transformation; +import org.apache.cassandra.tcm.log.Entry; +import org.apache.cassandra.tcm.ownership.OwnershipUtils; +import org.apache.cassandra.tcm.transformations.ForceSnapshot; +import org.apache.cassandra.tcm.transformations.TriggerSnapshot; + +import static org.apache.cassandra.distributed.test.log.ClusterMetadataTestHelper.minimalForTesting; +import static org.apache.cassandra.tcm.sequences.SequencesUtils.epoch; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +public class MetadataSnapshotListenerTest +{ + private static final Logger logger = LoggerFactory.getLogger(MetadataSnapshotListenerTest.class); + private IPartitioner partitioner = Murmur3Partitioner.instance; + private Random r; + + @BeforeClass + public static void disableSortedReplicaGroups() + { + // Set this so that we don't attempt to sort the random placements as this depends on a populated + // TokenMap. This is a temporary element of ClusterMetadata, at least in the current form + CassandraRelevantProperties.TCM_SORT_REPLICA_GROUPS.setBoolean(false); + } + + @Before + public void setup() + { + long seed = System.nanoTime(); + r = new Random(seed); + logger.info("SEED: {}", seed); + } + + @Test + public void forceSnapshotTriggersSnapshot() + { + // ForceSnapshot contains a complete ClusterMetadata which is what we expect to be + // stored as the snapshot. The input to its execute method is the previous ClusterMetadata + // and isn't relevant here. + MetadataSnapshots snapshots = init(); + ClusterMetadata toSnapshot = metadataForSnapshot(); + Entry entry = new Entry(Entry.Id.NONE, + toSnapshot.epoch, + new ForceSnapshot(toSnapshot)); + + ClusterMetadata previous = minimalForTesting(Epoch.FIRST, partitioner); + Transformation.Result result = entry.transform.execute(previous); + MetadataSnapshotListener listener = new MetadataSnapshotListener(); + + // The payload of the transformation should be retrievable by its epoch + assertNull(snapshots.getSnapshot(toSnapshot.epoch)); + listener.notify(entry, result); + assertEquals(toSnapshot, snapshots.getSnapshot(toSnapshot.epoch)); + } + + @Test + public void triggerSnapshotTest() + { + // TriggerSnapshot has no payload itself, but stores the preceding ClusterMetadata state as a snapshot. + MetadataSnapshots snapshots = init(); + ClusterMetadata toSnapshot = metadataForSnapshot(); + + Epoch nextEpoch = toSnapshot.nextEpoch(); + Entry entry = new Entry(Entry.Id.NONE, nextEpoch, TriggerSnapshot.instance); + + Transformation.Result result = entry.transform.execute(toSnapshot); + MetadataSnapshotListener listener = new MetadataSnapshotListener(); + + assertNull(snapshots.getSnapshot(nextEpoch)); + listener.notify(entry, result); + ClusterMetadata snapshot = snapshots.getSnapshot(nextEpoch); + assertEquals(nextEpoch, snapshot.epoch); + assertEquals(toSnapshot.placements, snapshot.placements); + } + + private MetadataSnapshots init() + { + MetadataSnapshots snapshots = new AtomicLongBackedProcessor.InMemoryMetadataSnapshots(); + StubClusterMetadataService service = StubClusterMetadataService.builder(partitioner) + .withSnapshots(snapshots) + .build(); + ClusterMetadataService.unsetInstance(); + ClusterMetadataService.setInstance(service); + return snapshots; + } + + private ClusterMetadata metadataForSnapshot() + { + return minimalForTesting(epoch(r), partitioner) + .transformer() + .with(OwnershipUtils.randomPlacements(r)).build() + .metadata; + } + +} diff --git a/test/unit/org/apache/cassandra/tcm/log/LocalLogTest.java b/test/unit/org/apache/cassandra/tcm/log/LocalLogTest.java index fac8d04d6f..fbdafb131d 100644 --- a/test/unit/org/apache/cassandra/tcm/log/LocalLogTest.java +++ b/test/unit/org/apache/cassandra/tcm/log/LocalLogTest.java @@ -18,6 +18,7 @@ package org.apache.cassandra.tcm.log; +import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -37,9 +38,12 @@ import org.junit.Test; import org.apache.cassandra.concurrent.ExecutorPlus; import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.marshal.IntegerType; +import org.apache.cassandra.dht.LocalPartitioner; import org.apache.cassandra.dht.Murmur3Partitioner; import org.apache.cassandra.tcm.ClusterMetadata; import org.apache.cassandra.tcm.Epoch; +import org.apache.cassandra.tcm.MetadataSnapshots; import org.apache.cassandra.tcm.transformations.CustomTransformation; import org.apache.cassandra.tcm.transformations.ForceSnapshot; import org.apache.cassandra.tcm.transformations.TriggerSnapshot; @@ -112,6 +116,56 @@ public class LocalLogTest assertEquals(11, tail.epoch.getEpoch()); } + @Test + public void forceSnapshotIsNotPersisted() + { + LogStorage storage = new LogStorage() + { + @Override + public void append(Entry entry) + { + throw new RuntimeException("we should not append anything"); + } + + @Override + public LogState getPersistedLogState() + { + return LogState.EMPTY; + } + + @Override + public LogState getLogStateBetween(ClusterMetadata base, Epoch end) + { + return LogState.EMPTY; + } + + @Override + public EntryHolder getEntries(Epoch since) throws IOException + { + return new EntryHolder(since); + } + + @Override + public MetadataSnapshots snapshots() + { + return MetadataSnapshots.NO_OP; + } + }; + LocalLog log = LocalLog.logSpec() + .sync() + .withInitialState(cm()) + .withStorage(storage) + .createLog(); + log.readyUnchecked(); + + Entry entry = new Entry(Entry.Id.NONE, + Epoch.create(11), + new ForceSnapshot(new ClusterMetadata(new LocalPartitioner(IntegerType.instance)).forceEpoch(Epoch.create(11)))); + log.append(entry); + ClusterMetadata tail = log.waitForHighestConsecutive(); + + assertEquals(11, tail.epoch.getEpoch()); + } @Test public void multipleSnapshotEntries() --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org