This is an automated email from the ASF dual-hosted git repository.

benedict pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 0151131e432ef3c6e923b8f9ee334b1bf9abaf6c
Author: Alex Petrov <[email protected]>
AuthorDate: Fri Aug 1 20:06:16 2025 +0200

    Split out Topologies into Partitions
    
    patch by Alex Petrov; reviewed by Benedict for CASSANDRA-20838
---
 .../db/compaction/CompactionIterator.java          |  55 +++++--
 src/java/org/apache/cassandra/journal/Journal.java |   6 -
 .../cassandra/service/accord/AccordJournal.java    |  64 ++++++--
 .../accord/AccordJournalValueSerializers.java      |   2 +-
 .../cassandra/service/accord/AccordKeyspace.java   |  12 +-
 .../cassandra/service/accord/AccordService.java    |  32 ++--
 .../cassandra/service/accord/JournalKey.java       |  18 +-
 .../accord/journal/AccordTopologyUpdate.java       | 181 +++++----------------
 .../cassandra/fuzz/topology/JournalGCTest.java     |   1 +
 .../accord/AccordConfigurationServiceTest.java     |   8 +-
 .../accord/journal/AccordTopologyUpdateTest.java   |  11 +-
 11 files changed, 178 insertions(+), 212 deletions(-)

diff --git 
a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java 
b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
index 1480069953..79344bdc67 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
@@ -28,18 +28,14 @@ import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.function.LongPredicate;
 import java.util.function.Supplier;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Ordering;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import accord.local.Cleanup;
 import accord.local.DurableBefore;
 import accord.local.RedundantBefore;
 import accord.utils.Invariants;
 import accord.utils.UnhandledEnum;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Ordering;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.cql3.ColumnIdentifier;
 import org.apache.cassandra.db.AbstractCompactionController;
@@ -108,6 +104,9 @@ import 
org.apache.cassandra.utils.NoSpamLogger.NoSpamLogStatement;
 import org.apache.cassandra.utils.TimeUUID;
 import org.apache.cassandra.utils.btree.BTree;
 import org.apache.cassandra.utils.btree.UpdateFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 
 import static accord.local.Cleanup.ERASE;
 import static accord.local.Cleanup.Input.PARTIAL;
@@ -864,8 +863,6 @@ public class CompactionIterator extends 
CompactionInfo.Holder implements Unfilte
 
         JournalKey key;
         AccordRowCompactor<?> compactor;
-        // Initialize topology serializer during compaction to avoid 
deserializing redundant epochs
-        FlyweightSerializer<AccordTopologyUpdate, FlyweightImage> 
topologySerializer;
         final Version userVersion;
 
         public AccordJournalPurger(AccordCompactionInfos compactionInfos, 
Version version, ColumnFamilyStore cfs)
@@ -875,7 +872,6 @@ public class CompactionIterator extends 
CompactionInfo.Holder implements Unfilte
             this.infos = compactionInfos;
             this.recordColumn = 
cfs.metadata().getColumn(ColumnIdentifier.getInterned("record", false));
             this.versionColumn = 
cfs.metadata().getColumn(ColumnIdentifier.getInterned("user_version", false));
-            this.topologySerializer = 
(FlyweightSerializer<AccordTopologyUpdate, FlyweightImage>) 
(FlyweightSerializer) new AccordTopologyUpdate.AccumulatingSerializer(() -> 
infos.minEpoch);
         }
 
         @SuppressWarnings("unchecked")
@@ -891,7 +887,7 @@ public class CompactionIterator extends 
CompactionInfo.Holder implements Unfilte
                         compactor = new AccordCommandRowCompactor(infos, 
userVersion, nowInSec);
                         break;
                     case TOPOLOGY_UPDATE:
-                        compactor = new 
AccordMergingCompactor(topologySerializer, userVersion);
+                        compactor = new 
TopologyCompactor((FlyweightSerializer<Object, 
AccordTopologyUpdate.Accumulator>) key.type.serializer, userVersion, 
infos.minEpoch);
                         break;
                     default:
                         compactor = new 
AccordMergingCompactor(key.type.serializer, userVersion);
@@ -945,6 +941,43 @@ public class CompactionIterator extends 
CompactionInfo.Holder implements Unfilte
         abstract UnfilteredRowIterator result(JournalKey journalKey, 
DecoratedKey partitionKey) throws IOException;
     }
 
+    static class TopologyCompactor extends 
AccordMergingCompactor<AccordTopologyUpdate.Accumulator>
+    {
+        AccordTopologyUpdate.TopologyImage lastChangedTopology;
+        final long minEpoch;
+        TopologyCompactor(FlyweightSerializer<Object, 
AccordTopologyUpdate.Accumulator> serializer, Version userVersion, long 
minEpoch)
+        {
+            super(serializer, userVersion);
+            this.minEpoch = minEpoch;
+        }
+
+        @Override
+        void reset(JournalKey key, UnfilteredRowIterator partition)
+        {
+            super.reset(key, partition);
+        }
+
+        @Override
+        UnfilteredRowIterator result(JournalKey journalKey, DecoratedKey 
partitionKey) throws IOException
+        {
+            AccordTopologyUpdate.TopologyImage current = builder.get();
+
+            if (lastChangedTopology != null && current.getUpdate() != null && 
lastChangedTopology.getUpdate().isEquivalent(current.getUpdate()))
+                builder.update(current.asNoOp());
+
+            if (builder.get().kind() != AccordTopologyUpdate.Kind.NoOp)
+            {
+                lastChangedTopology = builder.get();
+                Invariants.nonNull(lastChangedTopology.getUpdate());
+            }
+
+            if (builder.get().epoch() >= minEpoch)
+                return super.result(journalKey, partitionKey);
+            else
+                return null;
+        }
+    }
+
     static class AccordMergingCompactor<T extends FlyweightImage> extends 
AccordRowCompactor<T>
     {
         final T builder;
diff --git a/src/java/org/apache/cassandra/journal/Journal.java 
b/src/java/org/apache/cassandra/journal/Journal.java
index 5f765b48f2..d0793d39a7 100644
--- a/src/java/org/apache/cassandra/journal/Journal.java
+++ b/src/java/org/apache/cassandra/journal/Journal.java
@@ -960,12 +960,6 @@ public class Journal<K, V> implements Shutdownable
             this.segments = new long[maxSize];
         }
 
-        public void segments(LongConsumer consumer)
-        {
-            for (int i = 0; i < size; i++)
-                consumer.accept(segments[i]);
-        }
-
         public long[] copyOfSegments()
         {
             return segments == null ? new long[0] : Arrays.copyOf(segments, 
size);
diff --git a/src/java/org/apache/cassandra/service/accord/AccordJournal.java 
b/src/java/org/apache/cassandra/service/accord/AccordJournal.java
index 8e0d1b6509..7369b5e280 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordJournal.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordJournal.java
@@ -24,7 +24,6 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Deque;
-import java.util.Iterator;
 import java.util.List;
 import java.util.NavigableMap;
 import java.util.Queue;
@@ -85,7 +84,6 @@ import org.apache.cassandra.journal.ValueSerializer;
 import 
org.apache.cassandra.service.accord.AccordJournalValueSerializers.FlyweightImage;
 import 
org.apache.cassandra.service.accord.AccordJournalValueSerializers.IdentityAccumulator;
 import org.apache.cassandra.service.accord.JournalKey.JournalKeySupport;
-import org.apache.cassandra.service.accord.journal.AccordTopologyUpdate;
 import org.apache.cassandra.service.accord.serializers.CommandSerializers;
 import 
org.apache.cassandra.service.accord.serializers.CommandSerializers.ExecuteAtSerializer;
 import org.apache.cassandra.service.accord.serializers.DepsSerializers;
@@ -114,6 +112,10 @@ import static accord.impl.CommandChange.validateFlags;
 import static accord.local.Cleanup.Input.FULL;
 import static 
org.apache.cassandra.service.accord.AccordJournalValueSerializers.DurableBeforeAccumulator;
 import static org.apache.cassandra.service.accord.JournalKey.Type.COMMAND_DIFF;
+import static 
org.apache.cassandra.service.accord.journal.AccordTopologyUpdate.Accumulator;
+import static 
org.apache.cassandra.service.accord.journal.AccordTopologyUpdate.Kind;
+import static 
org.apache.cassandra.service.accord.journal.AccordTopologyUpdate.TopologyImage;
+import static 
org.apache.cassandra.service.accord.journal.AccordTopologyUpdate.newTopology;
 import static org.apache.cassandra.utils.FBUtilities.getAvailableProcessors;
 
 public class AccordJournal implements accord.api.Journal, 
RangeSearcher.Supplier, Shutdownable
@@ -367,33 +369,58 @@ public class AccordJournal implements accord.api.Journal, 
RangeSearcher.Supplier
             journal.onDurable(pointer, onFlush);
     }
 
-    public void patchCommand(int commandStoreId, TxnId txnId, Cleanup cleanup, 
@Nullable Runnable onFlush)
+    @Override
+    public CloseableIterator<TopologyUpdate> replayTopologies()
     {
-        Builder change = new Builder(txnId);
-        change.maybeCleanup(false, cleanup);
+        return new CloseableIterator<>()
+        {
+            final CloseableIterator<Journal.KeyRefs<JournalKey>> iter = 
journalTable.keyIterator(topologyUpdateKey(0L),
+                                                                               
                  topologyUpdateKey(Timestamp.MAX_EPOCH));
+            TopologyImage prev = null;
 
-        JournalKey key = new JournalKey(txnId, JournalKey.Type.COMMAND_DIFF, 
commandStoreId);
-        RecordPointer pointer = journal.asyncWrite(key, (out, userVersion) -> 
change.serialize(out, Version.fromVersion(configuration().userVersion())));
-        if (onFlush != null)
-            journal.onDurable(pointer, onFlush);
-    }
+            @Override
+            public boolean hasNext()
+            {
+                return iter.hasNext();
+            }
 
-    @Override
-    public Iterator<AccordTopologyUpdate.ImmutableTopoloyImage> 
replayTopologies()
-    {
-        AccordTopologyUpdate.Accumulator accumulator = 
readAll(TopologyUpdateKey);
-        return accumulator.images();
+            @Override
+            public TopologyUpdate next()
+            {
+                Journal.KeyRefs<JournalKey> ref = iter.next();
+                Accumulator read = readAll(ref.key());
+                if (read.accumulated.kind() == Kind.NoOp)
+                    prev = 
read.accumulated.asImage(Invariants.nonNull(prev.getUpdate()));
+                else
+                    prev = read.accumulated;
+
+                return new TopologyUpdate(prev.getUpdate().commandStores,
+                                          prev.getUpdate().global);
+            }
+
+            @Override
+            public void close()
+            {
+                iter.close();
+            }
+        };
     }
 
-    private static final JournalKey TopologyUpdateKey = new 
JournalKey(TxnId.NONE, JournalKey.Type.TOPOLOGY_UPDATE, 0);
     @Override
     public void saveTopology(TopologyUpdate topologyUpdate, Runnable onFlush)
     {
-        RecordPointer pointer = appendInternal(TopologyUpdateKey, 
AccordTopologyUpdate.newTopology(topologyUpdate));
+        RecordPointer pointer = 
appendInternal(topologyUpdateKey(topologyUpdate.global.epoch()),
+                                               newTopology(topologyUpdate));
         if (onFlush != null)
             journal.onDurable(pointer, onFlush);
     }
 
+    private static JournalKey topologyUpdateKey(long epoch)
+    {
+        return new JournalKey(TxnId.fromValues(epoch, 0L, Node.Id.NONE),
+                              JournalKey.Type.TOPOLOGY_UPDATE, 
Integer.MAX_VALUE);
+    }
+
     private static final JournalKey DURABLE_BEFORE_KEY = new 
JournalKey(TxnId.NONE, JournalKey.Type.DURABLE_BEFORE, 0);
 
     @Override
@@ -460,6 +487,7 @@ public class AccordJournal implements accord.api.Journal, 
RangeSearcher.Supplier
     public <BUILDER extends FlyweightImage> BUILDER readAll(JournalKey key)
     {
         BUILDER builder = (BUILDER) key.type.serializer.mergerFor();
+        builder.reset(key);
         // TODO (expected): this can be further improved to avoid allocating 
lambdas
         AccordJournalValueSerializers.FlyweightSerializer<?, BUILDER> 
serializer = (AccordJournalValueSerializers.FlyweightSerializer<?, BUILDER>) 
key.type.serializer;
         // TODO (expected): for those where we store an image, read only the 
first entry we find in DESC order
@@ -1170,4 +1198,4 @@ public class AccordJournal implements accord.api.Journal, 
RangeSearcher.Supplier
             }
         }
     }
-}
+}
\ No newline at end of file
diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordJournalValueSerializers.java
 
b/src/java/org/apache/cassandra/service/accord/AccordJournalValueSerializers.java
index 7df9e27d52..99553db5cc 100644
--- 
a/src/java/org/apache/cassandra/service/accord/AccordJournalValueSerializers.java
+++ 
b/src/java/org/apache/cassandra/service/accord/AccordJournalValueSerializers.java
@@ -111,7 +111,7 @@ public class AccordJournalValueSerializers
             this.accumulated = initial;
         }
 
-        protected void update(V newValue)
+        public void update(V newValue)
         {
             accumulated = accumulate(accumulated, newValue);
         }
diff --git a/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java 
b/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java
index f0dfcf4c66..0de343feb7 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java
@@ -544,12 +544,12 @@ public class AccordKeyspace
         {
             int commandStoreIdBytes = 
VIntCoding.computeUnsignedVIntSize(key.commandStoreId);
             int length = commandStoreIdBytes + 1;
-            if (key.type == JournalKey.Type.COMMAND_DIFF)
+            if (key.type.usesTxnId)
                 length += CommandSerializers.txnId.serializedSize(key.id);
             ByteBuffer pk = ByteBuffer.allocate(length);
             ByteBufferAccessor.instance.putUnsignedVInt32(pk, 0, 
key.commandStoreId);
             pk.put(commandStoreIdBytes, (byte)key.type.id);
-            if (key.type == JournalKey.Type.COMMAND_DIFF)
+            if (key.type.usesTxnId)
                 CommandSerializers.txnId.serializeComparable(key.id, pk, 
ByteBufferAccessor.instance, commandStoreIdBytes + 1);
             return Journal.partitioner.decorateKey(pk);
         }
@@ -569,7 +569,13 @@ public class AccordKeyspace
             int storeId = ByteBufferAccessor.instance.getUnsignedVInt32(bb, 0);
             int offset = VIntCoding.readLengthOfVInt(bb, 0);
             JournalKey.Type type = JournalKey.Type.fromId(bb.get(offset));
-            TxnId txnId = type != JournalKey.Type.COMMAND_DIFF ? TxnId.NONE : 
CommandSerializers.txnId.deserializeComparable(bb, ByteBufferAccessor.instance, 
offset + 1);
+            TxnId txnId;
+
+            if (type.usesTxnId)
+                txnId = CommandSerializers.txnId.deserializeComparable(bb, 
ByteBufferAccessor.instance, offset + 1);
+            else
+                txnId = TxnId.NONE;
+
             return new JournalKey(txnId, type, storeId);
         }
     }
diff --git a/src/java/org/apache/cassandra/service/accord/AccordService.java 
b/src/java/org/apache/cassandra/service/accord/AccordService.java
index f65d3616e4..2549cc5b3d 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordService.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordService.java
@@ -23,7 +23,6 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
@@ -45,7 +44,6 @@ import org.apache.cassandra.utils.concurrent.ImmediateFuture;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import accord.api.Journal;
 import accord.api.ProtocolModifiers;
 import accord.coordinate.CoordinateMaxConflict;
 import accord.coordinate.CoordinateTransaction;
@@ -130,12 +128,14 @@ import org.apache.cassandra.tcm.Epoch;
 import org.apache.cassandra.tcm.listeners.ChangeListener;
 import org.apache.cassandra.tcm.membership.NodeId;
 import org.apache.cassandra.transport.Dispatcher;
+import org.apache.cassandra.utils.CloseableIterator;
 import org.apache.cassandra.utils.ExecutorUtils;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.concurrent.AsyncPromise;
 import org.apache.cassandra.utils.concurrent.Future;
 import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException;
 
+import static accord.api.Journal.TopologyUpdate;
 import static 
accord.api.ProtocolModifiers.Toggles.FastExec.MAY_BYPASS_SAFESTORE;
 import static accord.local.LoadKeys.SYNC;
 import static accord.local.LoadKeysFor.READ_WRITE;
@@ -157,7 +157,6 @@ import static 
org.apache.cassandra.config.DatabaseDescriptor.getPartitioner;
 import static org.apache.cassandra.journal.Params.ReplayMode.RESET;
 import static 
org.apache.cassandra.metrics.ClientRequestsMetricsHolder.accordReadBookkeeping;
 import static 
org.apache.cassandra.metrics.ClientRequestsMetricsHolder.accordWriteBookkeeping;
-import static 
org.apache.cassandra.service.accord.journal.AccordTopologyUpdate.ImmutableTopoloyImage;
 import static 
org.apache.cassandra.service.consensus.migration.ConsensusRequestRouter.getTableMetadata;
 import static org.apache.cassandra.utils.Clock.Global.nanoTime;
 
@@ -429,20 +428,22 @@ public class AccordService implements IAccordService, 
Shutdownable
         ClusterMetadata metadata = ClusterMetadata.current();
         configService.updateMapping(metadata);
 
-        List<ImmutableTopoloyImage> images = new ArrayList<>();
+        List<TopologyUpdate> images = new ArrayList<>();
 
+        TopologyUpdate prev = null;
         // Collect locally known topologies
-        Iterator<ImmutableTopoloyImage> iter = journal.replayTopologies();
-        Journal.TopologyUpdate prev = null;
-        while (iter.hasNext())
+        try (CloseableIterator<TopologyUpdate> iter = 
journal.replayTopologies())
         {
-            ImmutableTopoloyImage next = iter.next();
-            // Due to partial compaction, we can clean up only some of the old 
epochs, creating gaps. We skip these epochs here.
-            if (prev != null && next.global.epoch() > prev.global.epoch() + 1)
-                images.clear();
+            while (iter.hasNext())
+            {
+                TopologyUpdate next = iter.next();
+                // Due to partial compaction, we can clean up only some of the 
old epochs, creating gaps. We skip these epochs here.
+                if (prev != null && next.global.epoch() > prev.global.epoch() 
+ 1)
+                    images.clear();
 
-            images.add(next);
-            prev = next;
+                images.add(next);
+                prev = next;
+            }
         }
 
         // Instantiate latest topology from the log, if known
@@ -452,7 +453,7 @@ public class AccordService implements IAccordService, 
Shutdownable
         }
 
         // Replay local epochs
-        for (ImmutableTopoloyImage image : images)
+        for (TopologyUpdate image : images)
             configService.reportTopology(image.global);
 
         // Subscribe to TCM events
@@ -524,9 +525,6 @@ public class AccordService implements IAccordService, 
Shutdownable
         try
         {
             logger.info("Fetching topologies for epochs [{}, {}] from {}", 
from, metadata.epoch.getEpoch(), peers);
-            Invariants.require(from <= metadata.epoch.getEpoch(),
-                               "Accord epochs should never be ahead of TCM 
ones, but %d was ahead of %d", from, metadata.epoch.getEpoch());
-
             Future<TopologyRange> futures = 
FetchTopologies.fetch(SharedContext.Global.instance,
                                                                   peers,
                                                                   from,
diff --git a/src/java/org/apache/cassandra/service/accord/JournalKey.java 
b/src/java/org/apache/cassandra/service/accord/JournalKey.java
index 9bd42ebaf1..cda98d7c99 100644
--- a/src/java/org/apache/cassandra/service/accord/JournalKey.java
+++ b/src/java/org/apache/cassandra/service/accord/JournalKey.java
@@ -257,22 +257,24 @@ public final class JournalKey
 
     public enum Type
     {
-        COMMAND_DIFF                 (0, new CommandDiffSerializer()),
-        REDUNDANT_BEFORE             (1, new RedundantBeforeSerializer()),
-        DURABLE_BEFORE               (2, new DurableBeforeSerializer()),
-        SAFE_TO_READ                 (3, new SafeToReadSerializer()),
-        BOOTSTRAP_BEGAN_AT           (4, new BootstrapBeganAtSerializer()),
-        RANGES_FOR_EPOCH             (5, new RangesForEpochSerializer()),
-        TOPOLOGY_UPDATE              (6, 
AccordTopologyUpdate.AccumulatingSerializer.defaultInstance),
+        COMMAND_DIFF                 (0, new CommandDiffSerializer(), true),
+        REDUNDANT_BEFORE             (1, new RedundantBeforeSerializer(), 
false),
+        DURABLE_BEFORE               (2, new DurableBeforeSerializer(), false),
+        SAFE_TO_READ                 (3, new SafeToReadSerializer(), false),
+        BOOTSTRAP_BEGAN_AT           (4, new BootstrapBeganAtSerializer(), 
false),
+        RANGES_FOR_EPOCH             (5, new RangesForEpochSerializer(), 
false),
+        TOPOLOGY_UPDATE              (6, new 
AccordTopologyUpdate.FlyweightSerializer(), true),
         ;
 
         public final int id;
         public final FlyweightSerializer<?, ?> serializer;
+        public final boolean usesTxnId;
 
-        Type(int id, FlyweightSerializer<?, ?> serializer)
+        Type(int id, FlyweightSerializer<?, ?> serializer, boolean usesTxnId)
         {
             this.id = id;
             this.serializer = serializer;
+            this.usesTxnId = usesTxnId;
         }
 
         private static final Type[] idToTypeMapping;
diff --git 
a/src/java/org/apache/cassandra/service/accord/journal/AccordTopologyUpdate.java
 
b/src/java/org/apache/cassandra/service/accord/journal/AccordTopologyUpdate.java
index b45de2c477..c32ca52d29 100644
--- 
a/src/java/org/apache/cassandra/service/accord/journal/AccordTopologyUpdate.java
+++ 
b/src/java/org/apache/cassandra/service/accord/journal/AccordTopologyUpdate.java
@@ -19,16 +19,10 @@
 package org.apache.cassandra.service.accord.journal;
 
 import java.io.IOException;
-import java.util.Iterator;
 import java.util.Map;
-import java.util.NavigableMap;
 import java.util.Objects;
-import java.util.TreeMap;
-import java.util.function.Function;
-
 import accord.api.Journal;
 import accord.local.CommandStores;
-import accord.primitives.EpochSupplier;
 import accord.primitives.Ranges;
 import accord.topology.Topology;
 import accord.utils.Invariants;
@@ -50,6 +44,7 @@ public interface AccordTopologyUpdate
     Kind kind();
     void applyTo(TopologyImage accumulator);
     long epoch();
+    AccordTopologyUpdate asNoOp();
 
     Journal.TopologyUpdate getUpdate();
     static AccordTopologyUpdate newTopology(Journal.TopologyUpdate update)
@@ -92,7 +87,7 @@ public interface AccordTopologyUpdate
             long size = TypeSizes.sizeofUnsignedVInt(from.size());
             for (int i = 0; i < from.size(); i++)
             {
-                size += TypeSizes.sizeof(from.epochAtIndex(i));
+                size += TypeSizes.LONG_SIZE;
                 size += 
KeySerializers.ranges.serializedSize(from.rangesAtIndex(i));
             }
             return size;
@@ -162,22 +157,8 @@ public interface AccordTopologyUpdate
                     break;
                 }
                 case NoOp:
-                {
-                    TopologyImage image = (TopologyImage) t;
-                    Invariants.require(image.update == null);
-                    if (image.syncStatus == null)
-                        out.writeByte(Byte.MAX_VALUE);
-                    else
-                        out.writeByte(image.syncStatus.ordinal());
-
-                    KeySerializers.ranges.serialize(image.closed, out);
-                    KeySerializers.ranges.serialize(image.retired, out);
-                    break;
-                }
                 case TopologyImage:
-                {
                     TopologyImage image = (TopologyImage) t;
-
                     out.writeBoolean(image.update != null);
                     if (image.update != null)
                         
TopologyUpdateSerializer.instance.serialize(image.update, out);
@@ -189,7 +170,6 @@ public interface AccordTopologyUpdate
                     KeySerializers.ranges.serialize(image.closed, out);
                     KeySerializers.ranges.serialize(image.retired, out);
                     break;
-                }
                 default:
                     throw new UnhandledEnum(t.kind());
             }
@@ -200,24 +180,14 @@ public interface AccordTopologyUpdate
         {
             long epoch = in.readUnsignedVInt();
             Kind kind = Kind.values()[in.readUnsignedVInt32()];
+
             switch (kind)
             {
                 case NewTopology:
                     return new 
NewTopology(TopologyUpdateSerializer.instance.deserialize(in));
                 case NoOp:
-                {
-                    TopologyImage image = new TopologyImage(epoch, Kind.NoOp);
-                    byte syncStateByte = in.readByte();
-                    if (syncStateByte != Byte.MAX_VALUE)
-                        image.syncStatus = 
AccordConfigurationService.SyncStatus.values()[syncStateByte];
-
-                    image.closed = KeySerializers.ranges.deserialize(in);
-                    image.retired = KeySerializers.ranges.deserialize(in);
-                    return image;
-                }
                 case TopologyImage:
-                {
-                    TopologyImage image = new TopologyImage(epoch, 
Kind.TopologyImage);
+                    TopologyImage image = new TopologyImage(epoch);
                     if (in.readBoolean())
                         image.update = 
TopologyUpdateSerializer.instance.deserialize(in);
 
@@ -228,7 +198,6 @@ public interface AccordTopologyUpdate
                     image.closed = KeySerializers.ranges.deserialize(in);
                     image.retired = KeySerializers.ranges.deserialize(in);
                     return image;
-                }
                 default:
                     throw new UnhandledEnum(kind);
             }
@@ -245,31 +214,17 @@ public interface AccordTopologyUpdate
                 case NewTopology:
                     size += 
TopologyUpdateSerializer.instance.serializedSize(((NewTopology) t).update);
                     break;
-                case NoOp:
-                {
-                    TopologyImage image = (TopologyImage) t;
-                    Invariants.require(image.update == null);
-
-                    size += Byte.BYTES;
-
-                    size += KeySerializers.ranges.serializedSize(image.closed);
-                    size += 
KeySerializers.ranges.serializedSize(image.retired);
-                    break;
-                }
                 case TopologyImage:
-                {
+                case NoOp:
                     TopologyImage image = (TopologyImage) t;
-
-                    size += TypeSizes.sizeof(image.update != null);
+                    size += TypeSizes.BOOL_SIZE;
                     if (image.update != null)
                         size += 
TopologyUpdateSerializer.instance.serializedSize(image.update);
 
-                    size += Byte.BYTES;
-
+                    size += TypeSizes.BYTE_SIZE;
                     size += KeySerializers.ranges.serializedSize(image.closed);
                     size += 
KeySerializers.ranges.serializedSize(image.retired);
                     break;
-                }
                 default:
                     throw new UnhandledEnum(t.kind());
             }
@@ -284,16 +239,12 @@ public interface AccordTopologyUpdate
         // Used when accumulating state during compaction or replay
         TopologyImage,
         // Effectively unchanged topology
+        // During compaction, we can write a no-op if we know that from 
Accord's perspective topology has not changed
+        // (see CompactionIterator$TopologyCompactor). During 
replay/deserialization, we collect last known changed
+        // epoch, and reconstruct its topology.
         NoOp
     }
 
-    class ImmutableTopoloyImage extends Journal.TopologyUpdate
-    {
-        public ImmutableTopoloyImage(TopologyImage image)
-        {
-            super(image.update.commandStores, image.update.global);
-        }
-    }
 
     class TopologyImage implements AccordTopologyUpdate
     {
@@ -304,19 +255,21 @@ public interface AccordTopologyUpdate
         private Ranges retired = Ranges.EMPTY;
 
         private final long epoch;
-        private final Kind kind;
 
-        public TopologyImage(long epoch, Kind kind)
+        public TopologyImage(long epoch)
         {
-            Invariants.require(kind != Kind.NewTopology);
             this.epoch = epoch;
-            this.kind = kind;
+        }
+
+        public TopologyImage(long epoch, Journal.TopologyUpdate update)
+        {
+            this.epoch = epoch;
+            this.update = update;
         }
 
         public TopologyImage asImage(Journal.TopologyUpdate update)
         {
-            TopologyImage image = new TopologyImage(epoch, Kind.TopologyImage);
-            image.update = update.cloneWithEquivalentEpoch(epoch);
+            TopologyImage image = new TopologyImage(epoch, 
update.cloneWithEquivalentEpoch(epoch));
             image.closed = closed;
             image.retired = retired;
             return image;
@@ -324,7 +277,7 @@ public interface AccordTopologyUpdate
 
         public TopologyImage asNoOp()
         {
-            TopologyImage image = new TopologyImage(epoch, Kind.NoOp);
+            TopologyImage image = new TopologyImage(epoch);
             image.closed = closed;
             image.retired = retired;
             return image;
@@ -345,13 +298,19 @@ public interface AccordTopologyUpdate
         @Override
         public Kind kind()
         {
-            return kind;
+            return update == null ? Kind.NoOp : Kind.TopologyImage;
         }
 
         @Override
         public void applyTo(TopologyImage accumulator)
         {
-            Invariants.require(accumulator.epoch == epoch);
+            Invariants.require(accumulator.epoch == epoch, "Expected %d but 
got %d", epoch, accumulator.epoch);
+            if (kind() == Kind.NoOp)
+            {
+                accumulator.update = null;
+                return;
+            }
+
             Invariants.require(accumulator.update == null || 
accumulator.update.equals(update));
             accumulator.update = update;
             // We're iterating in _reverse_ order
@@ -414,6 +373,12 @@ public interface AccordTopologyUpdate
             accumulator.update = update;
         }
 
+        @Override
+        public AccordTopologyUpdate asNoOp()
+        {
+            return new TopologyImage(epoch);
+        }
+
         @Override
         public boolean equals(Object o)
         {
@@ -430,65 +395,30 @@ public interface AccordTopologyUpdate
         }
     }
 
-    class Accumulator
-    extends AccordJournalValueSerializers.Accumulator<NavigableMap<Long, 
TopologyImage>, AccordTopologyUpdate>
+    class Accumulator extends 
AccordJournalValueSerializers.Accumulator<TopologyImage, AccordTopologyUpdate>
     {
         public Accumulator()
         {
-            super(new TreeMap<>());
+            super(null);
         }
 
         @Override
         public void reset(JournalKey key)
         {
-            accumulated = new TreeMap<>();
-        }
-
-        @Override
-        public void update(AccordTopologyUpdate newValue)
-        {
-            super.update(newValue);
-        }
-
-        public Iterator<ImmutableTopoloyImage> images()
-        {
-            return map(get().values().iterator(), ImmutableTopoloyImage::new);
+            accumulated = new TopologyImage(key.id.epoch());
         }
 
         @Override
-        protected NavigableMap<Long, TopologyImage> 
accumulate(NavigableMap<Long, TopologyImage> allEpochs, AccordTopologyUpdate 
update)
+        protected TopologyImage accumulate(TopologyImage acc, 
AccordTopologyUpdate update)
         {
-            update.applyTo(allEpochs.computeIfAbsent(update.epoch(), v -> new 
TopologyImage(update.epoch(), Kind.TopologyImage)));
-            return allEpochs;
+            update.applyTo(acc);
+            return acc;
         }
     }
 
-    static <FROM, TO> Iterator<TO> map(Iterator<FROM> iter, Function<FROM, TO> 
fn)
-    {
-        return new Iterator<TO>()
-        {
-            public boolean hasNext()
-            {
-                return iter.hasNext();
-            }
-
-            public TO next()
-            {
-                return fn.apply(iter.next());
-            }
-        };
-    }
-
-    class AccumulatingSerializer
-    implements 
AccordJournalValueSerializers.FlyweightSerializer<AccordTopologyUpdate, 
Accumulator>
+    class FlyweightSerializer implements 
AccordJournalValueSerializers.FlyweightSerializer<AccordTopologyUpdate, 
Accumulator>
     {
-        public static final AccumulatingSerializer defaultInstance = new 
AccumulatingSerializer(() -> 0);
-
-        private final EpochSupplier minEpoch;
-        public AccumulatingSerializer(EpochSupplier minEpoch)
-        {
-            this.minEpoch = minEpoch;
-        }
+        public FlyweightSerializer() {}
 
         @Override
         public Accumulator mergerFor()
@@ -499,44 +429,19 @@ public interface AccordTopologyUpdate
         @Override
         public void serialize(JournalKey key, AccordTopologyUpdate from, 
DataOutputPlus out, Version version) throws IOException
         {
-            out.writeUnsignedVInt32(1);
             Serializer.instance.serialize(from, out);
         }
 
         @Override
         public void reserialize(JournalKey key, Accumulator from, 
DataOutputPlus out, Version version) throws IOException
         {
-            out.writeUnsignedVInt32(from.get().size());
-            Journal.TopologyUpdate prev = null;
-            for (TopologyImage value : from.get().values())
-            {
-                Journal.TopologyUpdate tmp = value.update;
-                if (prev != null && value.update.isEquivalent(prev))
-                    value = value.asNoOp();
-
-                prev = tmp;
-                Serializer.instance.serialize(value, out);
-            }
+            serialize(key, from.get(), out, version);
         }
 
         @Override
         public void deserialize(JournalKey key, Accumulator into, 
DataInputPlus in, Version version) throws IOException
         {
-            long minEpoch = this.minEpoch.epoch();
-            int count = in.readUnsignedVInt32();
-            AccordTopologyUpdate prev = null;
-            while (--count >= 0)
-            {
-                AccordTopologyUpdate update = 
Serializer.instance.deserialize(in);
-                if (update.kind() == Kind.NoOp)
-                {
-                    Invariants.require(prev != null);
-                    update = ((TopologyImage) 
update).asImage(prev.getUpdate());
-                }
-                if (update.epoch() >= minEpoch)
-                    into.update(update);
-                prev = update;
-            }
+            into.update(Serializer.instance.deserialize(in));
         }
     }
 }
\ No newline at end of file
diff --git 
a/test/distributed/org/apache/cassandra/fuzz/topology/JournalGCTest.java 
b/test/distributed/org/apache/cassandra/fuzz/topology/JournalGCTest.java
index 064d050207..5ba2a7c5d3 100644
--- a/test/distributed/org/apache/cassandra/fuzz/topology/JournalGCTest.java
+++ b/test/distributed/org/apache/cassandra/fuzz/topology/JournalGCTest.java
@@ -89,6 +89,7 @@ public class JournalGCTest extends FuzzTestBase
 
                     if (pk > 0 && pk % 100 == 0)
                     {
+                        cluster.schemaChange("ALTER TABLE " + schema.keyspace 
+ '.' + schema.table + " WITH comment='" + pk + "';");
                         cluster.get(1).runOnInstance(() -> {
                             ((AccordService) 
AccordService.instance()).journal().closeCurrentSegmentForTestingIfNonEmpty();
                             ((AccordService) 
AccordService.instance()).journal().compactor().run();
diff --git 
a/test/unit/org/apache/cassandra/service/accord/AccordConfigurationServiceTest.java
 
b/test/unit/org/apache/cassandra/service/accord/AccordConfigurationServiceTest.java
index 3bea823ae2..535bf7e528 100644
--- 
a/test/unit/org/apache/cassandra/service/accord/AccordConfigurationServiceTest.java
+++ 
b/test/unit/org/apache/cassandra/service/accord/AccordConfigurationServiceTest.java
@@ -27,10 +27,10 @@ import java.util.List;
 import java.util.Optional;
 import java.util.UUID;
 
+import accord.api.Journal.TopologyUpdate;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-import accord.api.Journal;
 import accord.impl.AbstractConfigurationServiceTest;
 import accord.local.Node.Id;
 import accord.topology.Topology;
@@ -63,7 +63,6 @@ import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.schema.Tables;
 import org.apache.cassandra.service.accord.api.AccordAgent;
-import org.apache.cassandra.service.accord.journal.AccordTopologyUpdate;
 import org.apache.cassandra.tcm.ClusterMetadata;
 import org.apache.cassandra.tcm.ValidatingClusterMetadataService;
 import org.apache.cassandra.tcm.membership.Location;
@@ -180,7 +179,7 @@ public class AccordConfigurationServiceTest
                 public AsyncResult<Void> onTopologyUpdate(Topology topology, 
boolean isLoad, boolean startSync)
                 {
                     // Fake journal save
-                    journal_.saveTopology(new Journal.TopologyUpdate(new 
Int2ObjectHashMap<>(), topology), () -> {});
+                    journal_.saveTopology(new TopologyUpdate(new 
Int2ObjectHashMap<>(), topology), () -> {});
                     return super.onTopologyUpdate(topology, isLoad, startSync);
                 }
             };
@@ -205,7 +204,8 @@ public class AccordConfigurationServiceTest
             
loaded.updateMapping(mappingForEpoch(cms.metadata().epoch.getEpoch() + 1));
             listener = new 
AbstractConfigurationServiceTest.TestListener(loaded, true);
             loaded.registerListener(listener);
-            Iterator<AccordTopologyUpdate.ImmutableTopoloyImage> iter = 
journal.replayTopologies();
+            journal_.closeCurrentSegmentForTestingIfNonEmpty();
+            Iterator<TopologyUpdate> iter = journal.replayTopologies();
             // Simulate journal replay
             while (iter.hasNext())
                 loaded.reportTopology(iter.next().global);
diff --git 
a/test/unit/org/apache/cassandra/service/accord/journal/AccordTopologyUpdateTest.java
 
b/test/unit/org/apache/cassandra/service/accord/journal/AccordTopologyUpdateTest.java
index a1f7a738d0..c0c790ba47 100644
--- 
a/test/unit/org/apache/cassandra/service/accord/journal/AccordTopologyUpdateTest.java
+++ 
b/test/unit/org/apache/cassandra/service/accord/journal/AccordTopologyUpdateTest.java
@@ -141,8 +141,8 @@ public class AccordTopologyUpdateTest
             switch (kind)
             {
                 case NewTopology: return new 
AccordTopologyUpdate.NewTopology(topologyUpdateGen.next(rs));
-                case TopologyImage: return new 
AccordTopologyUpdate.TopologyImage(epochGen.nextLong(rs), 
AccordTopologyUpdate.Kind.TopologyImage);
-                case NoOp: return new 
AccordTopologyUpdate.TopologyImage(epochGen.nextLong(rs), 
AccordTopologyUpdate.Kind.NoOp);
+                case TopologyImage: return new 
AccordTopologyUpdate.TopologyImage(epochGen.nextLong(rs), 
topologyUpdateGen.next(rs));
+                case NoOp: return new 
AccordTopologyUpdate.TopologyImage(epochGen.nextLong(rs));
                 default: throw new AssertionError("Unknown kind: " + kind);
             }
         };
@@ -155,10 +155,9 @@ public class AccordTopologyUpdateTest
 
     private static void maybeUpdatePartitioner(AccordTopologyUpdate expected)
     {
-        if (expected instanceof AccordTopologyUpdate.NewTopology)
-        {
-            maybeUpdatePartitioner(((AccordTopologyUpdate.NewTopology) 
expected).update);
-        }
+        Journal.TopologyUpdate update = expected.getUpdate();
+        if (update != null)
+            maybeUpdatePartitioner(expected.getUpdate());
     }
 
     private void maybeUpdatePartitioner(CommandStores.RangesForEpoch expected)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to