This is an automated email from the ASF dual-hosted git repository.

marcuse pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 17ecece543 ForceSnapshot transformations should not be persisted in 
the local log table
17ecece543 is described below

commit 17ecece5437ab39aaeaa0eb4b42434cddd9960b5
Author: Sam Tunnicliffe <s...@apache.org>
AuthorDate: Thu Dec 14 17:55:05 2023 +0000

    ForceSnapshot transformations should not be persisted in the local log table
    
    Patch by Sam Tunnicliffe; reviewed by marcuse for CASSANDRA-19190
---
 .../apache/cassandra/schema/DistributedSchema.java |  11 +-
 .../org/apache/cassandra/tcm/ClusterMetadata.java  |   2 +-
 .../cassandra/tcm/StubClusterMetadataService.java  |  83 ++++++++++++-
 .../tcm/listeners/MetadataSnapshotListener.java    |  10 +-
 .../org/apache/cassandra/tcm/log/LocalLog.java     |   6 +-
 .../test/log/ClusterMetadataTestHelper.java        |  19 ++-
 .../listeners/MetadataSnapshotListenerTest.java    | 133 +++++++++++++++++++++
 .../org/apache/cassandra/tcm/log/LocalLogTest.java |  54 +++++++++
 8 files changed, 310 insertions(+), 8 deletions(-)

diff --git a/src/java/org/apache/cassandra/schema/DistributedSchema.java 
b/src/java/org/apache/cassandra/schema/DistributedSchema.java
index 86dd1d5117..a837b0773d 100644
--- a/src/java/org/apache/cassandra/schema/DistributedSchema.java
+++ b/src/java/org/apache/cassandra/schema/DistributedSchema.java
@@ -58,9 +58,16 @@ public class DistributedSchema implements 
MetadataValue<DistributedSchema>
         return new DistributedSchema(Keyspaces.none(), Epoch.EMPTY);
     }
 
-    public static DistributedSchema first()
+    public static DistributedSchema first(Set<String> knownDatacenters)
     {
-        return new 
DistributedSchema(Keyspaces.of(DistributedMetadataLogKeyspace.initialMetadata(Collections.singleton(DatabaseDescriptor.getLocalDataCenter()))),
 Epoch.FIRST);
+        if (knownDatacenters.isEmpty())
+        {
+            if (DatabaseDescriptor.getLocalDataCenter() != null)
+                knownDatacenters = 
Collections.singleton(DatabaseDescriptor.getLocalDataCenter());
+            else
+                knownDatacenters = Collections.singleton("DC1");
+        }
+        return new 
DistributedSchema(Keyspaces.of(DistributedMetadataLogKeyspace.initialMetadata(knownDatacenters)),
 Epoch.FIRST);
     }
 
     private final Keyspaces keyspaces;
diff --git a/src/java/org/apache/cassandra/tcm/ClusterMetadata.java 
b/src/java/org/apache/cassandra/tcm/ClusterMetadata.java
index 33886bec40..fdf4942c13 100644
--- a/src/java/org/apache/cassandra/tcm/ClusterMetadata.java
+++ b/src/java/org/apache/cassandra/tcm/ClusterMetadata.java
@@ -107,7 +107,7 @@ public class ClusterMetadata
     @VisibleForTesting
     public ClusterMetadata(IPartitioner partitioner, Directory directory)
     {
-        this(partitioner, directory, DistributedSchema.first());
+        this(partitioner, directory, 
DistributedSchema.first(directory.knownDatacenters()));
     }
 
     @VisibleForTesting
diff --git a/src/java/org/apache/cassandra/tcm/StubClusterMetadataService.java 
b/src/java/org/apache/cassandra/tcm/StubClusterMetadataService.java
index 475e8ef21b..8e191307d1 100644
--- a/src/java/org/apache/cassandra/tcm/StubClusterMetadataService.java
+++ b/src/java/org/apache/cassandra/tcm/StubClusterMetadataService.java
@@ -20,15 +20,24 @@ package org.apache.cassandra.tcm;
 
 import java.util.Collections;
 
+import com.google.common.collect.ImmutableMap;
+
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.schema.DistributedMetadataLogKeyspace;
 import org.apache.cassandra.schema.DistributedSchema;
 import org.apache.cassandra.schema.KeyspaceMetadata;
 import org.apache.cassandra.schema.Keyspaces;
+import org.apache.cassandra.tcm.Commit.Replicator;
 import org.apache.cassandra.tcm.log.Entry;
 import org.apache.cassandra.tcm.log.LocalLog;
 import org.apache.cassandra.tcm.membership.Directory;
+import org.apache.cassandra.tcm.ownership.DataPlacements;
+import org.apache.cassandra.tcm.ownership.PlacementProvider;
+import org.apache.cassandra.tcm.ownership.TokenMap;
 import org.apache.cassandra.tcm.ownership.UniformRangePlacement;
+import org.apache.cassandra.tcm.sequences.InProgressSequences;
+import org.apache.cassandra.tcm.sequences.LockedRanges;
 
 public class StubClusterMetadataService extends ClusterMetadataService
 {
@@ -73,12 +82,24 @@ public class StubClusterMetadataService extends 
ClusterMetadataService
                       .withInitialState(initial)
                       .createLog(),
               new StubProcessor(),
-              Commit.Replicator.NO_OP,
+              Replicator.NO_OP,
               false);
         this.metadata = initial;
         this.log().readyUnchecked();
     }
 
+    private StubClusterMetadataService(PlacementProvider placement,
+                                       MetadataSnapshots snapshots,
+                                       LocalLog log,
+                                       Processor processor,
+                                       Replicator replicator,
+                                       boolean isMember)
+    {
+       super(placement, snapshots, log, processor, replicator, isMember);
+       this.metadata = log.metadata();
+       this.log().readyUnchecked();
+    }
+
     @Override
     public <T1> T1 commit(Transformation transform, CommitSuccessHandler<T1> 
onSuccess, CommitFailureHandler<T1> onFailure)
     {
@@ -125,4 +146,64 @@ public class StubClusterMetadataService extends 
ClusterMetadataService
             throw new UnsupportedOperationException();
         }
     }
+
+
+    public static Builder builder()
+    {
+        return new Builder();
+    }
+
+    public static Builder builder(IPartitioner partitioner)
+    {
+        return new Builder(partitioner);
+    }
+
+    public static class Builder
+    {
+        IPartitioner partitioner;
+        ClusterMetadata initial;
+        MetadataSnapshots snapshots = MetadataSnapshots.NO_OP;
+
+        public StubClusterMetadataService build()
+        {
+            if (initial == null)
+                initial = new ClusterMetadata(Epoch.EMPTY,
+                                              partitioner,
+                                              DistributedSchema.empty(),
+                                              Directory.EMPTY,
+                                              new TokenMap(partitioner),
+                                              DataPlacements.EMPTY,
+                                              LockedRanges.EMPTY,
+                                              InProgressSequences.EMPTY,
+                                              ImmutableMap.of());
+            return new StubClusterMetadataService(new UniformRangePlacement(),
+                                                  snapshots != null ? 
snapshots : MetadataSnapshots.NO_OP,
+                                                  
LocalLog.logSpec().withInitialState(initial).createLog(),
+                                                  new StubProcessor(),
+                                                  Replicator.NO_OP,
+                                                  false);
+        }
+
+        private Builder()
+        {
+            this(DatabaseDescriptor.getPartitioner());
+        }
+
+        private Builder(IPartitioner partitioner)
+        {
+            this.partitioner = partitioner;
+        }
+
+        public Builder withInitial(ClusterMetadata initial)
+        {
+            this.initial = initial;
+            return this;
+        }
+
+        public Builder withSnapshots(MetadataSnapshots snapshots)
+        {
+            this.snapshots = snapshots;
+            return this;
+        }
+    }
 }
diff --git 
a/src/java/org/apache/cassandra/tcm/listeners/MetadataSnapshotListener.java 
b/src/java/org/apache/cassandra/tcm/listeners/MetadataSnapshotListener.java
index 72e3fd5f83..9d650fed51 100644
--- a/src/java/org/apache/cassandra/tcm/listeners/MetadataSnapshotListener.java
+++ b/src/java/org/apache/cassandra/tcm/listeners/MetadataSnapshotListener.java
@@ -18,6 +18,8 @@
 
 package org.apache.cassandra.tcm.listeners;
 
+import java.util.EnumSet;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -26,14 +28,20 @@ import org.apache.cassandra.tcm.ClusterMetadataService;
 import org.apache.cassandra.tcm.Transformation;
 import org.apache.cassandra.tcm.log.Entry;
 
+import static org.apache.cassandra.tcm.Transformation.Kind.FORCE_SNAPSHOT;
+import static org.apache.cassandra.tcm.Transformation.Kind.TRIGGER_SNAPSHOT;
+
 public class MetadataSnapshotListener implements LogListener
 {
     private static final Logger logger = 
LoggerFactory.getLogger(MetadataSnapshotListener.class);
+
+    private static final EnumSet<Transformation.Kind> triggers = 
EnumSet.of(TRIGGER_SNAPSHOT, FORCE_SNAPSHOT);
+
     @Override
     public void notify(Entry entry, Transformation.Result result)
     {
         ClusterMetadata next = result.success().metadata;
-        if (entry.transform.kind() == Transformation.Kind.TRIGGER_SNAPSHOT)
+        if (triggers.contains(entry.transform.kind()))
         {
             try
             {
diff --git a/src/java/org/apache/cassandra/tcm/log/LocalLog.java 
b/src/java/org/apache/cassandra/tcm/log/LocalLog.java
index 7523fb2043..0307e49048 100644
--- a/src/java/org/apache/cassandra/tcm/log/LocalLog.java
+++ b/src/java/org/apache/cassandra/tcm/log/LocalLog.java
@@ -263,7 +263,7 @@ public abstract class LocalLog implements Closeable
         if (spec.initial == null)
             spec.initial = new 
ClusterMetadata(DatabaseDescriptor.getPartitioner());
         if (spec.prev == null)
-            spec.prev = new 
ClusterMetadata(DatabaseDescriptor.getPartitioner());
+            spec.prev = new ClusterMetadata(spec.initial.partitioner);
         assert spec.initial.epoch.is(EMPTY) || 
spec.initial.epoch.is(Epoch.UPGRADE_STARTUP) || spec.isReset :
         String.format(String.format("Should start with empty epoch, unless 
we're in upgrade or reset mode: %s (isReset: %s)", spec.initial, spec.isReset));
 
@@ -480,7 +480,9 @@ public abstract class LocalLog implements Closeable
                     String.format("Epoch %s for %s can either force snapshot, 
or immediately follow %s",
                                   next.epoch, pendingEntry.transform, 
prev.epoch);
 
-                    if (replayComplete.get())
+                    // If replay during initialisation has completed persist 
to local storage unless the entry is
+                    // a synthetic ForceSnapshot which is not a replicated 
event but enables jumping over gaps
+                    if (replayComplete.get() && pendingEntry.transform.kind() 
!= Transformation.Kind.FORCE_SNAPSHOT)
                         storage.append(pendingEntry.maybeUnwrapExecuted());
 
                     notifyPreCommit(prev, next, isSnapshot);
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/log/ClusterMetadataTestHelper.java
 
b/test/distributed/org/apache/cassandra/distributed/test/log/ClusterMetadataTestHelper.java
index 2e7dd85352..99cc316e90 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/log/ClusterMetadataTestHelper.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/log/ClusterMetadataTestHelper.java
@@ -33,6 +33,7 @@ import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
 
+
 import org.apache.cassandra.ServerTestUtils.ResettableClusterMetadataService;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.cql3.QueryProcessor;
@@ -62,19 +63,23 @@ import org.apache.cassandra.tcm.Epoch;
 import org.apache.cassandra.tcm.MetadataSnapshots;
 import org.apache.cassandra.tcm.Transformation;
 import org.apache.cassandra.tcm.log.LocalLog;
+import org.apache.cassandra.tcm.membership.Directory;
 import org.apache.cassandra.tcm.membership.Location;
 import org.apache.cassandra.tcm.membership.NodeAddresses;
 import org.apache.cassandra.tcm.membership.NodeId;
 import org.apache.cassandra.tcm.membership.NodeState;
 import org.apache.cassandra.tcm.membership.NodeVersion;
 import org.apache.cassandra.tcm.ownership.DataPlacements;
+import org.apache.cassandra.tcm.ownership.TokenMap;
 import org.apache.cassandra.tcm.ownership.UniformRangePlacement;
 import org.apache.cassandra.tcm.ownership.VersionedEndpoints;
 import org.apache.cassandra.tcm.sequences.BootstrapAndJoin;
 import org.apache.cassandra.tcm.sequences.BootstrapAndReplace;
+import org.apache.cassandra.tcm.sequences.InProgressSequences;
+import org.apache.cassandra.tcm.sequences.LockedRanges;
+import org.apache.cassandra.tcm.sequences.Move;
 import org.apache.cassandra.tcm.sequences.LeaveStreams;
 import org.apache.cassandra.tcm.sequences.ReconfigureCMS;
-import org.apache.cassandra.tcm.sequences.Move;
 import org.apache.cassandra.tcm.sequences.UnbootstrapAndLeave;
 import org.apache.cassandra.tcm.transformations.AlterSchema;
 import org.apache.cassandra.tcm.transformations.PrepareJoin;
@@ -137,6 +142,18 @@ public class ClusterMetadataTestHelper
         return service;
     }
 
+    public static ClusterMetadata minimalForTesting(Epoch epoch, IPartitioner 
partitioner)
+    {
+        return new ClusterMetadata(epoch, Murmur3Partitioner.instance,
+                                   DistributedSchema.empty(),
+                                   Directory.EMPTY,
+                                   new TokenMap(partitioner),
+                                   DataPlacements.empty(),
+                                   LockedRanges.EMPTY,
+                                   InProgressSequences.EMPTY,
+                                   ImmutableMap.of());
+    }
+
     public static ClusterMetadata minimalForTesting(IPartitioner partitioner)
     {
         return new ClusterMetadata(Epoch.EMPTY,
diff --git 
a/test/unit/org/apache/cassandra/tcm/listeners/MetadataSnapshotListenerTest.java
 
b/test/unit/org/apache/cassandra/tcm/listeners/MetadataSnapshotListenerTest.java
new file mode 100644
index 0000000000..2f962f1ab2
--- /dev/null
+++ 
b/test/unit/org/apache/cassandra/tcm/listeners/MetadataSnapshotListenerTest.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tcm.listeners;
+
+import java.util.Random;
+
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.CassandraRelevantProperties;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.tcm.AtomicLongBackedProcessor;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.ClusterMetadataService;
+import org.apache.cassandra.tcm.Epoch;
+import org.apache.cassandra.tcm.MetadataSnapshots;
+import org.apache.cassandra.tcm.StubClusterMetadataService;
+import org.apache.cassandra.tcm.Transformation;
+import org.apache.cassandra.tcm.log.Entry;
+import org.apache.cassandra.tcm.ownership.OwnershipUtils;
+import org.apache.cassandra.tcm.transformations.ForceSnapshot;
+import org.apache.cassandra.tcm.transformations.TriggerSnapshot;
+
+import static 
org.apache.cassandra.distributed.test.log.ClusterMetadataTestHelper.minimalForTesting;
+import static org.apache.cassandra.tcm.sequences.SequencesUtils.epoch;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class MetadataSnapshotListenerTest
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(MetadataSnapshotListenerTest.class);
+    private IPartitioner partitioner = Murmur3Partitioner.instance;
+    private Random r;
+
+    @BeforeClass
+    public static void disableSortedReplicaGroups()
+    {
+        // Set this so that we don't attempt to sort the random placements as 
this depends on a populated
+        // TokenMap. This is a temporary element of ClusterMetadata, at least 
in the current form
+        CassandraRelevantProperties.TCM_SORT_REPLICA_GROUPS.setBoolean(false);
+    }
+
+    @Before
+    public void setup()
+    {
+        long seed = System.nanoTime();
+        r = new Random(seed);
+        logger.info("SEED: {}", seed);
+    }
+
+    @Test
+    public void forceSnapshotTriggersSnapshot()
+    {
+        // ForceSnapshot contains a complete ClusterMetadata which is what we 
expect to be
+        // stored as the snapshot. The input to its execute method is the 
previous ClusterMetadata
+        // and isn't relevant here.
+        MetadataSnapshots snapshots = init();
+        ClusterMetadata toSnapshot = metadataForSnapshot();
+        Entry entry = new Entry(Entry.Id.NONE,
+                                toSnapshot.epoch,
+                                new ForceSnapshot(toSnapshot));
+
+        ClusterMetadata previous = minimalForTesting(Epoch.FIRST, partitioner);
+        Transformation.Result result = entry.transform.execute(previous);
+        MetadataSnapshotListener listener = new MetadataSnapshotListener();
+
+        // The payload of the transformation should be retrievable by its epoch
+        assertNull(snapshots.getSnapshot(toSnapshot.epoch));
+        listener.notify(entry, result);
+        assertEquals(toSnapshot, snapshots.getSnapshot(toSnapshot.epoch));
+    }
+
+    @Test
+    public void triggerSnapshotTest()
+    {
+        // TriggerSnapshot has no payload itself, but stores the preceding 
ClusterMetadata state as a snapshot.
+        MetadataSnapshots snapshots = init();
+        ClusterMetadata toSnapshot = metadataForSnapshot();
+
+        Epoch nextEpoch = toSnapshot.nextEpoch();
+        Entry entry = new Entry(Entry.Id.NONE, nextEpoch, 
TriggerSnapshot.instance);
+
+        Transformation.Result result = entry.transform.execute(toSnapshot);
+        MetadataSnapshotListener listener = new MetadataSnapshotListener();
+
+        assertNull(snapshots.getSnapshot(nextEpoch));
+        listener.notify(entry, result);
+        ClusterMetadata snapshot = snapshots.getSnapshot(nextEpoch);
+        assertEquals(nextEpoch, snapshot.epoch);
+        assertEquals(toSnapshot.placements, snapshot.placements);
+    }
+
+    private MetadataSnapshots init()
+    {
+        MetadataSnapshots snapshots = new 
AtomicLongBackedProcessor.InMemoryMetadataSnapshots();
+        StubClusterMetadataService service = 
StubClusterMetadataService.builder(partitioner)
+                                                                       
.withSnapshots(snapshots)
+                                                                       
.build();
+        ClusterMetadataService.unsetInstance();
+        ClusterMetadataService.setInstance(service);
+        return snapshots;
+    }
+
+    private ClusterMetadata metadataForSnapshot()
+    {
+        return minimalForTesting(epoch(r), partitioner)
+               .transformer()
+               .with(OwnershipUtils.randomPlacements(r)).build()
+               .metadata;
+    }
+
+}
diff --git a/test/unit/org/apache/cassandra/tcm/log/LocalLogTest.java 
b/test/unit/org/apache/cassandra/tcm/log/LocalLogTest.java
index fac8d04d6f..fbdafb131d 100644
--- a/test/unit/org/apache/cassandra/tcm/log/LocalLogTest.java
+++ b/test/unit/org/apache/cassandra/tcm/log/LocalLogTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.cassandra.tcm.log;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -37,9 +38,12 @@ import org.junit.Test;
 
 import org.apache.cassandra.concurrent.ExecutorPlus;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.marshal.IntegerType;
+import org.apache.cassandra.dht.LocalPartitioner;
 import org.apache.cassandra.dht.Murmur3Partitioner;
 import org.apache.cassandra.tcm.ClusterMetadata;
 import org.apache.cassandra.tcm.Epoch;
+import org.apache.cassandra.tcm.MetadataSnapshots;
 import org.apache.cassandra.tcm.transformations.CustomTransformation;
 import org.apache.cassandra.tcm.transformations.ForceSnapshot;
 import org.apache.cassandra.tcm.transformations.TriggerSnapshot;
@@ -112,6 +116,56 @@ public class LocalLogTest
         assertEquals(11, tail.epoch.getEpoch());
     }
 
+    @Test
+    public void forceSnapshotIsNotPersisted()
+    {
+        LogStorage storage = new LogStorage()
+        {
+            @Override
+            public void append(Entry entry)
+            {
+                throw new RuntimeException("we should not append anything");
+            }
+
+            @Override
+            public LogState getPersistedLogState()
+            {
+                return LogState.EMPTY;
+            }
+
+            @Override
+            public LogState getLogStateBetween(ClusterMetadata base, Epoch end)
+            {
+                return LogState.EMPTY;
+            }
+
+            @Override
+            public EntryHolder getEntries(Epoch since) throws IOException
+            {
+                return new EntryHolder(since);
+            }
+
+            @Override
+            public MetadataSnapshots snapshots()
+            {
+                return MetadataSnapshots.NO_OP;
+            }
+        };
+        LocalLog log = LocalLog.logSpec()
+                               .sync()
+                               .withInitialState(cm())
+                               .withStorage(storage)
+                               .createLog();
+        log.readyUnchecked();
+
+        Entry entry = new Entry(Entry.Id.NONE,
+                                Epoch.create(11),
+                                new ForceSnapshot(new ClusterMetadata(new 
LocalPartitioner(IntegerType.instance)).forceEpoch(Epoch.create(11))));
+        log.append(entry);
+        ClusterMetadata tail = log.waitForHighestConsecutive();
+
+        assertEquals(11, tail.epoch.getEpoch());
+    }
 
     @Test
     public void multipleSnapshotEntries()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to