This is an automated email from the ASF dual-hosted git repository.

ifesdjeen pushed a commit to branch cep-15-accord
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 0ba7b43207ce3f3b040467a9804cbc9d28f7e802
Author: Alex Petrov <[email protected]>
AuthorDate: Wed Oct 23 16:22:51 2024 +0200

    Fix SystemKeyspaceStorageTest and SnapshotTest
    
    Patch by Alex Petrov; reviewed by Ariel Weisberg for CASSANDRA-20032
---
 .../apache/cassandra/tcm/AtomicLongBackedProcessor.java |  8 ++++++--
 src/java/org/apache/cassandra/tcm/log/LogReader.java    | 17 +++++++++++------
 src/java/org/apache/cassandra/tcm/log/LogState.java     |  5 +++++
 .../BeginConsensusMigrationForTableAndRange.java        |  3 +--
 .../MaybeFinishConsensusMigrationForTableAndRange.java  |  1 -
 .../distributed/test/log/SystemKeyspaceStorageTest.java | 13 ++++++++-----
 .../serializers/CommandsForKeySerializerTest.java       |  2 +-
 7 files changed, 32 insertions(+), 17 deletions(-)

diff --git a/src/java/org/apache/cassandra/tcm/AtomicLongBackedProcessor.java 
b/src/java/org/apache/cassandra/tcm/AtomicLongBackedProcessor.java
index 1bf81b6048..55f4d96406 100644
--- a/src/java/org/apache/cassandra/tcm/AtomicLongBackedProcessor.java
+++ b/src/java/org/apache/cassandra/tcm/AtomicLongBackedProcessor.java
@@ -32,6 +32,7 @@ import com.google.common.collect.ImmutableList;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import accord.utils.Invariants;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.tcm.log.Entry;
 import org.apache.cassandra.tcm.log.LocalLog;
@@ -177,7 +178,7 @@ public class AtomicLongBackedProcessor extends 
AbstractLocalProcessor
         public LogState getLogState(Epoch start, Epoch end)
         {
             EntryHolder state = getEntries(Epoch.EMPTY);
-            ClusterMetadata metadata = new 
ClusterMetadata(DatabaseDescriptor.getPartitioner());;
+            ClusterMetadata metadata = new 
ClusterMetadata(DatabaseDescriptor.getPartitioner());
             Iterator<Entry> iter = state.iterator();
             ImmutableList.Builder<Entry> rest = new ImmutableList.Builder<>();
             while (iter.hasNext())
@@ -186,8 +187,11 @@ public class AtomicLongBackedProcessor extends 
AbstractLocalProcessor
                 if (current.epoch.isAfter(end))
                     break;
                 if (current.epoch.isEqualOrBefore(start))
+                {
+                    
Invariants.checkState(current.epoch.isDirectlyAfter(metadata.epoch));
                     metadata = 
current.transform.execute(metadata).success().metadata;
-                else
+                }
+                else if (current.epoch.isAfter(start))
                     rest.add(current);
             }
 
diff --git a/src/java/org/apache/cassandra/tcm/log/LogReader.java 
b/src/java/org/apache/cassandra/tcm/log/LogReader.java
index effc4d7561..7f2b80602a 100644
--- a/src/java/org/apache/cassandra/tcm/log/LogReader.java
+++ b/src/java/org/apache/cassandra/tcm/log/LogReader.java
@@ -28,6 +28,7 @@ import java.util.TreeSet;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Ordering;
 
+import accord.utils.Invariants;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.tcm.ClusterMetadata;
 import org.apache.cassandra.tcm.Epoch;
@@ -124,12 +125,15 @@ public interface LogReader
     {
         try
         {
-            ClusterMetadata closestSnapshot = 
snapshots().getSnapshotBefore(start);
+            ClusterMetadata closestSnapshot = null;
+            if (includeSnapshot)
+                closestSnapshot = snapshots().getSnapshotBefore(start);
 
             // Snapshot could not be found, fetch enough epochs to reconstruct 
the start metadata
             if (closestSnapshot == null)
             {
-                closestSnapshot = new 
ClusterMetadata(DatabaseDescriptor.getPartitioner());
+                if (includeSnapshot)
+                    closestSnapshot = new 
ClusterMetadata(DatabaseDescriptor.getPartitioner());
                 ImmutableList.Builder<Entry> entries = new 
ImmutableList.Builder<>();
                 EntryHolder entryHolder = getEntries(Epoch.EMPTY, end);
                 for (Entry entry : entryHolder.entries)
@@ -144,20 +148,21 @@ public interface LogReader
             else if (closestSnapshot.epoch.isBefore(start))
             {
                 ImmutableList.Builder<Entry> entries = new 
ImmutableList.Builder<>();
-                EntryHolder entryHolder = getEntries(closestSnapshot.epoch, 
end);
+                EntryHolder entryHolder = 
getEntries(closestSnapshot.epoch.nextEpoch(), end);
                 for (Entry entry : entryHolder.entries)
                 {
                     if (entry.epoch.isAfter(start))
                         entries.add(entry);
-                    else
+                    else if (includeSnapshot)
                         closestSnapshot = 
entry.transform.execute(closestSnapshot).success().metadata;
                 }
                 return new LogState(closestSnapshot, entries.build());
             }
             else
             {
-                assert closestSnapshot.epoch.isEqualOrAfter(start) : 
String.format("Got %s, but requested snapshot of %s", closestSnapshot.epoch, 
start);
-                EntryHolder entryHolder = 
getEntries(closestSnapshot.epoch.nextEpoch(), end);
+                
Invariants.checkState(closestSnapshot.epoch.isEqualOrAfter(start),
+                                      "Got %s, but requested snapshot of %s", 
closestSnapshot.epoch, start);
+                EntryHolder entryHolder = getEntries(closestSnapshot.epoch, 
end);
                 return new LogState(closestSnapshot, 
ImmutableList.copyOf(entryHolder.entries));
             }
         }
diff --git a/src/java/org/apache/cassandra/tcm/log/LogState.java 
b/src/java/org/apache/cassandra/tcm/log/LogState.java
index de69184a89..438d01cd6e 100644
--- a/src/java/org/apache/cassandra/tcm/log/LogState.java
+++ b/src/java/org/apache/cassandra/tcm/log/LogState.java
@@ -27,6 +27,7 @@ import com.google.common.collect.ImmutableList;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import accord.utils.Invariants;
 import org.apache.cassandra.concurrent.ScheduledExecutors;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.TypeSizes;
@@ -70,6 +71,10 @@ public class LogState
     // Uses Replication rather than an just a list of entries primarily to 
avoid duplicating the existing serializer
     public LogState(ClusterMetadata baseState, ImmutableList<Entry> entries)
     {
+        Invariants.checkState(baseState == null ||
+                              entries.isEmpty() ||
+                              
entries.get(0).epoch.isDirectlyAfter(baseState.epoch),
+                              "Base state: %s, first entry: %s", baseState == 
null ? null : baseState.epoch, entries.isEmpty() ? null : entries.get(0).epoch);
         this.baseState = baseState;
         this.entries = entries;
     }
diff --git 
a/src/java/org/apache/cassandra/tcm/transformations/BeginConsensusMigrationForTableAndRange.java
 
b/src/java/org/apache/cassandra/tcm/transformations/BeginConsensusMigrationForTableAndRange.java
index 8bbdbc8e64..be7e9a763f 100644
--- 
a/src/java/org/apache/cassandra/tcm/transformations/BeginConsensusMigrationForTableAndRange.java
+++ 
b/src/java/org/apache/cassandra/tcm/transformations/BeginConsensusMigrationForTableAndRange.java
@@ -29,12 +29,11 @@ import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.schema.TableMetadata;
+import 
org.apache.cassandra.service.consensus.migration.ConsensusMigrationState;
 import 
org.apache.cassandra.service.consensus.migration.ConsensusMigrationTarget;
 import 
org.apache.cassandra.service.consensus.migration.ConsensusTableMigration;
-import 
org.apache.cassandra.service.consensus.migration.ConsensusMigrationState;
 import org.apache.cassandra.tcm.ClusterMetadata;
 import org.apache.cassandra.tcm.Transformation;
 import org.apache.cassandra.tcm.sequences.LockedRanges;
diff --git 
a/src/java/org/apache/cassandra/tcm/transformations/MaybeFinishConsensusMigrationForTableAndRange.java
 
b/src/java/org/apache/cassandra/tcm/transformations/MaybeFinishConsensusMigrationForTableAndRange.java
index 3f857cce09..16ff97e674 100644
--- 
a/src/java/org/apache/cassandra/tcm/transformations/MaybeFinishConsensusMigrationForTableAndRange.java
+++ 
b/src/java/org/apache/cassandra/tcm/transformations/MaybeFinishConsensusMigrationForTableAndRange.java
@@ -33,7 +33,6 @@ import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.schema.DistributedSchema;
 import org.apache.cassandra.schema.KeyspaceMetadata;
 import org.apache.cassandra.schema.Keyspaces;
-import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.schema.TableParams;
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/log/SystemKeyspaceStorageTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/log/SystemKeyspaceStorageTest.java
index e8bf239f56..dbc2735d2b 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/log/SystemKeyspaceStorageTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/log/SystemKeyspaceStorageTest.java
@@ -98,14 +98,13 @@ public class SystemKeyspaceStorageTest extends 
CoordinatorPathTestBase
                     cluster.get(1).runOnInstance(() -> 
deleteSnapshot(toRemoveSnapshot.getEpoch()));
                 }
             }
-            Epoch latestSnapshot = 
remainingSnapshots.get(remainingSnapshots.size() - 1);
             Epoch lastEpoch =  
allEpochs.stream().max(Comparator.naturalOrder()).get();
             repeat(10, () -> {
                 repeat(100, () -> {
                     Epoch since = allEpochs.get(rng.nextInt(allEpochs.size()));
-                    for (boolean consistentReplay : new boolean[]{ true, false 
})
+                    for (boolean consistentFetch : new boolean[]{ true, false 
})
                     {
-                        LogState logState = 
simulatedCluster.node(2).requestResponse(new FetchCMSLog(since, 
consistentReplay));
+                        LogState logState = 
simulatedCluster.node(2).requestResponse(new FetchCMSLog(since, 
consistentFetch));
                         // if we return a snapshot it is always the most 
recent one
                         // we don't return a snapshot if there is only 1 
snapshot after `since`
                         Epoch start = since;
@@ -119,12 +118,16 @@ public class SystemKeyspaceStorageTest extends 
CoordinatorPathTestBase
                         }
                         else
                         {
-                            assertEquals(latestSnapshot, 
logState.baseState.epoch);
+                            assertEquals(since, logState.baseState.epoch);
                             start = logState.baseState.epoch;
                             if (logState.entries.isEmpty()) // no entries, 
snapshot should have the same epoch as since
                                 assertEquals(since, start);
                             else // first epoch in entries should be snapshot 
epoch + 1
+                            {
+                                if 
(!start.nextEpoch().equals(logState.entries.get(0).epoch))
+                                    System.out.println(1);
                                 assertEquals(start.nextEpoch(), 
logState.entries.get(0).epoch);
+                            }
                         }
 
                         for (Entry entry : logState.entries)
@@ -174,7 +177,7 @@ public class SystemKeyspaceStorageTest extends 
CoordinatorPathTestBase
             }
             catch (Throwable throwable)
             {
-                throw new AssertionError(throwable);
+                throw new AssertionError(String.format("Failed on %dth/%d 
repetition", i, num), throwable);
             }
         }
     }
diff --git 
a/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java
 
b/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java
index e69fb92610..052736200c 100644
--- 
a/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java
+++ 
b/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java
@@ -104,7 +104,7 @@ public class CommandsForKeySerializerTest
                                     parse("CREATE TABLE tbl (k int, c int, v 
int, primary key (k, c)) WITH transactional_mode='full'", "ks"));
         StorageService.instance.initServer();
     }
-    
+
     @Before
     public void before() throws Throwable
     {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to