This is an automated email from the ASF dual-hosted git repository. ifesdjeen pushed a commit to branch cep-15-accord in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 0ba7b43207ce3f3b040467a9804cbc9d28f7e802 Author: Alex Petrov <[email protected]> AuthorDate: Wed Oct 23 16:22:51 2024 +0200 Fix SystemKeyspaceStorageTest and SnapshotTest Patch by Alex Petrov; reviewed by Ariel Weisberg for CASSANDRA-20032 --- .../apache/cassandra/tcm/AtomicLongBackedProcessor.java | 8 ++++++-- src/java/org/apache/cassandra/tcm/log/LogReader.java | 17 +++++++++++------ src/java/org/apache/cassandra/tcm/log/LogState.java | 5 +++++ .../BeginConsensusMigrationForTableAndRange.java | 3 +-- .../MaybeFinishConsensusMigrationForTableAndRange.java | 1 - .../distributed/test/log/SystemKeyspaceStorageTest.java | 13 ++++++++----- .../serializers/CommandsForKeySerializerTest.java | 2 +- 7 files changed, 32 insertions(+), 17 deletions(-) diff --git a/src/java/org/apache/cassandra/tcm/AtomicLongBackedProcessor.java b/src/java/org/apache/cassandra/tcm/AtomicLongBackedProcessor.java index 1bf81b6048..55f4d96406 100644 --- a/src/java/org/apache/cassandra/tcm/AtomicLongBackedProcessor.java +++ b/src/java/org/apache/cassandra/tcm/AtomicLongBackedProcessor.java @@ -32,6 +32,7 @@ import com.google.common.collect.ImmutableList; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import accord.utils.Invariants; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.tcm.log.Entry; import org.apache.cassandra.tcm.log.LocalLog; @@ -177,7 +178,7 @@ public class AtomicLongBackedProcessor extends AbstractLocalProcessor public LogState getLogState(Epoch start, Epoch end) { EntryHolder state = getEntries(Epoch.EMPTY); - ClusterMetadata metadata = new ClusterMetadata(DatabaseDescriptor.getPartitioner());; + ClusterMetadata metadata = new ClusterMetadata(DatabaseDescriptor.getPartitioner()); Iterator<Entry> iter = state.iterator(); ImmutableList.Builder<Entry> rest = new ImmutableList.Builder<>(); while (iter.hasNext()) @@ -186,8 +187,11 @@ public class AtomicLongBackedProcessor extends AbstractLocalProcessor if (current.epoch.isAfter(end)) break; if (current.epoch.isEqualOrBefore(start)) + { + Invariants.checkState(current.epoch.isDirectlyAfter(metadata.epoch)); metadata = current.transform.execute(metadata).success().metadata; - else + } + else if (current.epoch.isAfter(start)) rest.add(current); } diff --git a/src/java/org/apache/cassandra/tcm/log/LogReader.java b/src/java/org/apache/cassandra/tcm/log/LogReader.java index effc4d7561..7f2b80602a 100644 --- a/src/java/org/apache/cassandra/tcm/log/LogReader.java +++ b/src/java/org/apache/cassandra/tcm/log/LogReader.java @@ -28,6 +28,7 @@ import java.util.TreeSet; import com.google.common.collect.ImmutableList; import com.google.common.collect.Ordering; +import accord.utils.Invariants; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.tcm.ClusterMetadata; import org.apache.cassandra.tcm.Epoch; @@ -124,12 +125,15 @@ public interface LogReader { try { - ClusterMetadata closestSnapshot = snapshots().getSnapshotBefore(start); + ClusterMetadata closestSnapshot = null; + if (includeSnapshot) + closestSnapshot = snapshots().getSnapshotBefore(start); // Snapshot could not be found, fetch enough epochs to reconstruct the start metadata if (closestSnapshot == null) { - closestSnapshot = new ClusterMetadata(DatabaseDescriptor.getPartitioner()); + if (includeSnapshot) + closestSnapshot = new ClusterMetadata(DatabaseDescriptor.getPartitioner()); ImmutableList.Builder<Entry> entries = new ImmutableList.Builder<>(); EntryHolder entryHolder = getEntries(Epoch.EMPTY, end); for (Entry entry : entryHolder.entries) @@ -144,20 +148,21 @@ public interface LogReader else if (closestSnapshot.epoch.isBefore(start)) { ImmutableList.Builder<Entry> entries = new ImmutableList.Builder<>(); - EntryHolder entryHolder = getEntries(closestSnapshot.epoch, end); + EntryHolder entryHolder = getEntries(closestSnapshot.epoch.nextEpoch(), end); for (Entry entry : entryHolder.entries) { if (entry.epoch.isAfter(start)) entries.add(entry); - else + else if (includeSnapshot) closestSnapshot = entry.transform.execute(closestSnapshot).success().metadata; } return new LogState(closestSnapshot, entries.build()); } else { - assert closestSnapshot.epoch.isEqualOrAfter(start) : String.format("Got %s, but requested snapshot of %s", closestSnapshot.epoch, start); - EntryHolder entryHolder = getEntries(closestSnapshot.epoch.nextEpoch(), end); + Invariants.checkState(closestSnapshot.epoch.isEqualOrAfter(start), + "Got %s, but requested snapshot of %s", closestSnapshot.epoch, start); + EntryHolder entryHolder = getEntries(closestSnapshot.epoch, end); return new LogState(closestSnapshot, ImmutableList.copyOf(entryHolder.entries)); } } diff --git a/src/java/org/apache/cassandra/tcm/log/LogState.java b/src/java/org/apache/cassandra/tcm/log/LogState.java index de69184a89..438d01cd6e 100644 --- a/src/java/org/apache/cassandra/tcm/log/LogState.java +++ b/src/java/org/apache/cassandra/tcm/log/LogState.java @@ -27,6 +27,7 @@ import com.google.common.collect.ImmutableList; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import accord.utils.Invariants; import org.apache.cassandra.concurrent.ScheduledExecutors; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.TypeSizes; @@ -70,6 +71,10 @@ public class LogState // Uses Replication rather than an just a list of entries primarily to avoid duplicating the existing serializer public LogState(ClusterMetadata baseState, ImmutableList<Entry> entries) { + Invariants.checkState(baseState == null || + entries.isEmpty() || + entries.get(0).epoch.isDirectlyAfter(baseState.epoch), + "Base state: %s, first entry: %s", baseState == null ? null : baseState.epoch, entries.isEmpty() ? null : entries.get(0).epoch); this.baseState = baseState; this.entries = entries; } diff --git a/src/java/org/apache/cassandra/tcm/transformations/BeginConsensusMigrationForTableAndRange.java b/src/java/org/apache/cassandra/tcm/transformations/BeginConsensusMigrationForTableAndRange.java index 8bbdbc8e64..be7e9a763f 100644 --- a/src/java/org/apache/cassandra/tcm/transformations/BeginConsensusMigrationForTableAndRange.java +++ b/src/java/org/apache/cassandra/tcm/transformations/BeginConsensusMigrationForTableAndRange.java @@ -29,12 +29,11 @@ import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; -import org.apache.cassandra.schema.Schema; import org.apache.cassandra.schema.TableId; import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.service.consensus.migration.ConsensusMigrationState; import org.apache.cassandra.service.consensus.migration.ConsensusMigrationTarget; import org.apache.cassandra.service.consensus.migration.ConsensusTableMigration; -import org.apache.cassandra.service.consensus.migration.ConsensusMigrationState; import org.apache.cassandra.tcm.ClusterMetadata; import org.apache.cassandra.tcm.Transformation; import org.apache.cassandra.tcm.sequences.LockedRanges; diff --git a/src/java/org/apache/cassandra/tcm/transformations/MaybeFinishConsensusMigrationForTableAndRange.java b/src/java/org/apache/cassandra/tcm/transformations/MaybeFinishConsensusMigrationForTableAndRange.java index 3f857cce09..16ff97e674 100644 --- a/src/java/org/apache/cassandra/tcm/transformations/MaybeFinishConsensusMigrationForTableAndRange.java +++ b/src/java/org/apache/cassandra/tcm/transformations/MaybeFinishConsensusMigrationForTableAndRange.java @@ -33,7 +33,6 @@ import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.schema.DistributedSchema; import org.apache.cassandra.schema.KeyspaceMetadata; import org.apache.cassandra.schema.Keyspaces; -import org.apache.cassandra.schema.Schema; import org.apache.cassandra.schema.TableId; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.schema.TableParams; diff --git a/test/distributed/org/apache/cassandra/distributed/test/log/SystemKeyspaceStorageTest.java b/test/distributed/org/apache/cassandra/distributed/test/log/SystemKeyspaceStorageTest.java index e8bf239f56..dbc2735d2b 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/log/SystemKeyspaceStorageTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/log/SystemKeyspaceStorageTest.java @@ -98,14 +98,13 @@ public class SystemKeyspaceStorageTest extends CoordinatorPathTestBase cluster.get(1).runOnInstance(() -> deleteSnapshot(toRemoveSnapshot.getEpoch())); } } - Epoch latestSnapshot = remainingSnapshots.get(remainingSnapshots.size() - 1); Epoch lastEpoch = allEpochs.stream().max(Comparator.naturalOrder()).get(); repeat(10, () -> { repeat(100, () -> { Epoch since = allEpochs.get(rng.nextInt(allEpochs.size())); - for (boolean consistentReplay : new boolean[]{ true, false }) + for (boolean consistentFetch : new boolean[]{ true, false }) { - LogState logState = simulatedCluster.node(2).requestResponse(new FetchCMSLog(since, consistentReplay)); + LogState logState = simulatedCluster.node(2).requestResponse(new FetchCMSLog(since, consistentFetch)); // if we return a snapshot it is always the most recent one // we don't return a snapshot if there is only 1 snapshot after `since` Epoch start = since; @@ -119,12 +118,16 @@ public class SystemKeyspaceStorageTest extends CoordinatorPathTestBase } else { - assertEquals(latestSnapshot, logState.baseState.epoch); + assertEquals(since, logState.baseState.epoch); start = logState.baseState.epoch; if (logState.entries.isEmpty()) // no entries, snapshot should have the same epoch as since assertEquals(since, start); else // first epoch in entries should be snapshot epoch + 1 + { + if (!start.nextEpoch().equals(logState.entries.get(0).epoch)) + System.out.println(1); assertEquals(start.nextEpoch(), logState.entries.get(0).epoch); + } } for (Entry entry : logState.entries) @@ -174,7 +177,7 @@ public class SystemKeyspaceStorageTest extends CoordinatorPathTestBase } catch (Throwable throwable) { - throw new AssertionError(throwable); + throw new AssertionError(String.format("Failed on %dth/%d repetition", i, num), throwable); } } } diff --git a/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java b/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java index e69fb92610..052736200c 100644 --- a/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java +++ b/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java @@ -104,7 +104,7 @@ public class CommandsForKeySerializerTest parse("CREATE TABLE tbl (k int, c int, v int, primary key (k, c)) WITH transactional_mode='full'", "ks")); StorageService.instance.initServer(); } - + @Before public void before() throws Throwable { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
