This is an automated email from the ASF dual-hosted git repository. benedict pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 6c6ca6dd7ef6e6917f6fc884a4d7019a6fc1a558 Author: Alex Petrov <[email protected]> AuthorDate: Fri Aug 1 20:06:16 2025 +0200 Split out Topologies into Partitions patch by Alex Petrov; reviewed by Benedict for CASSANDRA-20838 --- .../db/compaction/CompactionIterator.java | 55 +++++-- src/java/org/apache/cassandra/journal/Journal.java | 6 - .../cassandra/service/accord/AccordJournal.java | 68 ++++++-- .../accord/AccordJournalValueSerializers.java | 2 +- .../cassandra/service/accord/AccordKeyspace.java | 12 +- .../cassandra/service/accord/AccordService.java | 32 ++-- .../cassandra/service/accord/JournalKey.java | 18 +- .../accord/journal/AccordTopologyUpdate.java | 181 +++++---------------- .../cassandra/fuzz/topology/JournalGCTest.java | 1 + .../accord/AccordConfigurationServiceTest.java | 8 +- .../accord/journal/AccordTopologyUpdateTest.java | 11 +- 11 files changed, 182 insertions(+), 212 deletions(-) diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java index 1480069953..79344bdc67 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java @@ -28,18 +28,14 @@ import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.function.LongPredicate; import java.util.function.Supplier; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Ordering; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import accord.local.Cleanup; import accord.local.DurableBefore; import accord.local.RedundantBefore; import accord.utils.Invariants; import accord.utils.UnhandledEnum; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Ordering; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.cql3.ColumnIdentifier; import org.apache.cassandra.db.AbstractCompactionController; @@ -108,6 +104,9 @@ import org.apache.cassandra.utils.NoSpamLogger.NoSpamLogStatement; import org.apache.cassandra.utils.TimeUUID; import org.apache.cassandra.utils.btree.BTree; import org.apache.cassandra.utils.btree.UpdateFunction; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import static accord.local.Cleanup.ERASE; import static accord.local.Cleanup.Input.PARTIAL; @@ -864,8 +863,6 @@ public class CompactionIterator extends CompactionInfo.Holder implements Unfilte JournalKey key; AccordRowCompactor<?> compactor; - // Initialize topology serializer during compaction to avoid deserializing redundant epochs - FlyweightSerializer<AccordTopologyUpdate, FlyweightImage> topologySerializer; final Version userVersion; public AccordJournalPurger(AccordCompactionInfos compactionInfos, Version version, ColumnFamilyStore cfs) @@ -875,7 +872,6 @@ public class CompactionIterator extends CompactionInfo.Holder implements Unfilte this.infos = compactionInfos; this.recordColumn = cfs.metadata().getColumn(ColumnIdentifier.getInterned("record", false)); this.versionColumn = cfs.metadata().getColumn(ColumnIdentifier.getInterned("user_version", false)); - this.topologySerializer = (FlyweightSerializer<AccordTopologyUpdate, FlyweightImage>) (FlyweightSerializer) new AccordTopologyUpdate.AccumulatingSerializer(() -> infos.minEpoch); } @SuppressWarnings("unchecked") @@ -891,7 +887,7 @@ public class CompactionIterator extends CompactionInfo.Holder implements Unfilte compactor = new AccordCommandRowCompactor(infos, userVersion, nowInSec); break; case TOPOLOGY_UPDATE: - compactor = new AccordMergingCompactor(topologySerializer, userVersion); + compactor = new TopologyCompactor((FlyweightSerializer<Object, AccordTopologyUpdate.Accumulator>) key.type.serializer, userVersion, infos.minEpoch); break; default: compactor = new AccordMergingCompactor(key.type.serializer, userVersion); @@ -945,6 +941,43 @@ public class CompactionIterator extends CompactionInfo.Holder implements Unfilte abstract UnfilteredRowIterator result(JournalKey journalKey, DecoratedKey partitionKey) throws IOException; } + static class TopologyCompactor extends AccordMergingCompactor<AccordTopologyUpdate.Accumulator> + { + AccordTopologyUpdate.TopologyImage lastChangedTopology; + final long minEpoch; + TopologyCompactor(FlyweightSerializer<Object, AccordTopologyUpdate.Accumulator> serializer, Version userVersion, long minEpoch) + { + super(serializer, userVersion); + this.minEpoch = minEpoch; + } + + @Override + void reset(JournalKey key, UnfilteredRowIterator partition) + { + super.reset(key, partition); + } + + @Override + UnfilteredRowIterator result(JournalKey journalKey, DecoratedKey partitionKey) throws IOException + { + AccordTopologyUpdate.TopologyImage current = builder.get(); + + if (lastChangedTopology != null && current.getUpdate() != null && lastChangedTopology.getUpdate().isEquivalent(current.getUpdate())) + builder.update(current.asNoOp()); + + if (builder.get().kind() != AccordTopologyUpdate.Kind.NoOp) + { + lastChangedTopology = builder.get(); + Invariants.nonNull(lastChangedTopology.getUpdate()); + } + + if (builder.get().epoch() >= minEpoch) + return super.result(journalKey, partitionKey); + else + return null; + } + } + static class AccordMergingCompactor<T extends FlyweightImage> extends AccordRowCompactor<T> { final T builder; diff --git a/src/java/org/apache/cassandra/journal/Journal.java b/src/java/org/apache/cassandra/journal/Journal.java index 5f765b48f2..d0793d39a7 100644 --- a/src/java/org/apache/cassandra/journal/Journal.java +++ b/src/java/org/apache/cassandra/journal/Journal.java @@ -960,12 +960,6 @@ public class Journal<K, V> implements Shutdownable this.segments = new long[maxSize]; } - public void segments(LongConsumer consumer) - { - for (int i = 0; i < size; i++) - consumer.accept(segments[i]); - } - public long[] copyOfSegments() { return segments == null ? new long[0] : Arrays.copyOf(segments, size); diff --git a/src/java/org/apache/cassandra/service/accord/AccordJournal.java b/src/java/org/apache/cassandra/service/accord/AccordJournal.java index 8e0d1b6509..76d122bf5f 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordJournal.java +++ b/src/java/org/apache/cassandra/service/accord/AccordJournal.java @@ -24,7 +24,6 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.Deque; -import java.util.Iterator; import java.util.List; import java.util.NavigableMap; import java.util.Queue; @@ -63,6 +62,7 @@ import accord.utils.PersistentField; import accord.utils.UnhandledEnum; import accord.utils.async.AsyncResult; import accord.utils.async.AsyncResults; +import com.google.common.annotations.VisibleForTesting; import org.agrona.collections.Int2ObjectHashMap; import org.agrona.collections.IntArrayList; import org.apache.cassandra.concurrent.Shutdownable; @@ -85,7 +85,6 @@ import org.apache.cassandra.journal.ValueSerializer; import org.apache.cassandra.service.accord.AccordJournalValueSerializers.FlyweightImage; import org.apache.cassandra.service.accord.AccordJournalValueSerializers.IdentityAccumulator; import org.apache.cassandra.service.accord.JournalKey.JournalKeySupport; -import org.apache.cassandra.service.accord.journal.AccordTopologyUpdate; import org.apache.cassandra.service.accord.serializers.CommandSerializers; import org.apache.cassandra.service.accord.serializers.CommandSerializers.ExecuteAtSerializer; import org.apache.cassandra.service.accord.serializers.DepsSerializers; @@ -96,6 +95,9 @@ import org.apache.cassandra.utils.Closeable; import org.apache.cassandra.utils.CloseableIterator; import org.apache.cassandra.utils.ExecutorUtils; import org.apache.cassandra.utils.concurrent.Semaphore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import static accord.api.Journal.Load.ALL; import static accord.api.Journal.Load.MINIMAL; @@ -114,6 +116,10 @@ import static accord.impl.CommandChange.validateFlags; import static accord.local.Cleanup.Input.FULL; import static org.apache.cassandra.service.accord.AccordJournalValueSerializers.DurableBeforeAccumulator; import static org.apache.cassandra.service.accord.JournalKey.Type.COMMAND_DIFF; +import static org.apache.cassandra.service.accord.journal.AccordTopologyUpdate.Accumulator; +import static org.apache.cassandra.service.accord.journal.AccordTopologyUpdate.Kind; +import static org.apache.cassandra.service.accord.journal.AccordTopologyUpdate.TopologyImage; +import static org.apache.cassandra.service.accord.journal.AccordTopologyUpdate.newTopology; import static org.apache.cassandra.utils.FBUtilities.getAvailableProcessors; public class AccordJournal implements accord.api.Journal, RangeSearcher.Supplier, Shutdownable @@ -367,33 +373,58 @@ public class AccordJournal implements accord.api.Journal, RangeSearcher.Supplier journal.onDurable(pointer, onFlush); } - public void patchCommand(int commandStoreId, TxnId txnId, Cleanup cleanup, @Nullable Runnable onFlush) + @Override + public CloseableIterator<TopologyUpdate> replayTopologies() { - Builder change = new Builder(txnId); - change.maybeCleanup(false, cleanup); + return new CloseableIterator<>() + { + final CloseableIterator<Journal.KeyRefs<JournalKey>> iter = journalTable.keyIterator(topologyUpdateKey(0L), + topologyUpdateKey(Timestamp.MAX_EPOCH)); + TopologyImage prev = null; - JournalKey key = new JournalKey(txnId, JournalKey.Type.COMMAND_DIFF, commandStoreId); - RecordPointer pointer = journal.asyncWrite(key, (out, userVersion) -> change.serialize(out, Version.fromVersion(configuration().userVersion()))); - if (onFlush != null) - journal.onDurable(pointer, onFlush); - } + @Override + public boolean hasNext() + { + return iter.hasNext(); + } - @Override - public Iterator<AccordTopologyUpdate.ImmutableTopoloyImage> replayTopologies() - { - AccordTopologyUpdate.Accumulator accumulator = readAll(TopologyUpdateKey); - return accumulator.images(); + @Override + public TopologyUpdate next() + { + Journal.KeyRefs<JournalKey> ref = iter.next(); + Accumulator read = readAll(ref.key()); + if (read.accumulated.kind() == Kind.NoOp) + prev = read.accumulated.asImage(Invariants.nonNull(prev.getUpdate())); + else + prev = read.accumulated; + + return new TopologyUpdate(prev.getUpdate().commandStores, + prev.getUpdate().global); + } + + @Override + public void close() + { + iter.close(); + } + }; } - private static final JournalKey TopologyUpdateKey = new JournalKey(TxnId.NONE, JournalKey.Type.TOPOLOGY_UPDATE, 0); @Override public void saveTopology(TopologyUpdate topologyUpdate, Runnable onFlush) { - RecordPointer pointer = appendInternal(TopologyUpdateKey, AccordTopologyUpdate.newTopology(topologyUpdate)); + RecordPointer pointer = appendInternal(topologyUpdateKey(topologyUpdate.global.epoch()), + newTopology(topologyUpdate)); if (onFlush != null) journal.onDurable(pointer, onFlush); } + private static JournalKey topologyUpdateKey(long epoch) + { + return new JournalKey(TxnId.fromValues(epoch, 0L, Node.Id.NONE), + JournalKey.Type.TOPOLOGY_UPDATE, Integer.MAX_VALUE); + } + private static final JournalKey DURABLE_BEFORE_KEY = new JournalKey(TxnId.NONE, JournalKey.Type.DURABLE_BEFORE, 0); @Override @@ -460,6 +491,7 @@ public class AccordJournal implements accord.api.Journal, RangeSearcher.Supplier public <BUILDER extends FlyweightImage> BUILDER readAll(JournalKey key) { BUILDER builder = (BUILDER) key.type.serializer.mergerFor(); + builder.reset(key); // TODO (expected): this can be further improved to avoid allocating lambdas AccordJournalValueSerializers.FlyweightSerializer<?, BUILDER> serializer = (AccordJournalValueSerializers.FlyweightSerializer<?, BUILDER>) key.type.serializer; // TODO (expected): for those where we store an image, read only the first entry we find in DESC order @@ -1170,4 +1202,4 @@ public class AccordJournal implements accord.api.Journal, RangeSearcher.Supplier } } } -} +} \ No newline at end of file diff --git a/src/java/org/apache/cassandra/service/accord/AccordJournalValueSerializers.java b/src/java/org/apache/cassandra/service/accord/AccordJournalValueSerializers.java index 7df9e27d52..99553db5cc 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordJournalValueSerializers.java +++ b/src/java/org/apache/cassandra/service/accord/AccordJournalValueSerializers.java @@ -111,7 +111,7 @@ public class AccordJournalValueSerializers this.accumulated = initial; } - protected void update(V newValue) + public void update(V newValue) { accumulated = accumulate(accumulated, newValue); } diff --git a/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java b/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java index f0dfcf4c66..0de343feb7 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java +++ b/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java @@ -544,12 +544,12 @@ public class AccordKeyspace { int commandStoreIdBytes = VIntCoding.computeUnsignedVIntSize(key.commandStoreId); int length = commandStoreIdBytes + 1; - if (key.type == JournalKey.Type.COMMAND_DIFF) + if (key.type.usesTxnId) length += CommandSerializers.txnId.serializedSize(key.id); ByteBuffer pk = ByteBuffer.allocate(length); ByteBufferAccessor.instance.putUnsignedVInt32(pk, 0, key.commandStoreId); pk.put(commandStoreIdBytes, (byte)key.type.id); - if (key.type == JournalKey.Type.COMMAND_DIFF) + if (key.type.usesTxnId) CommandSerializers.txnId.serializeComparable(key.id, pk, ByteBufferAccessor.instance, commandStoreIdBytes + 1); return Journal.partitioner.decorateKey(pk); } @@ -569,7 +569,13 @@ public class AccordKeyspace int storeId = ByteBufferAccessor.instance.getUnsignedVInt32(bb, 0); int offset = VIntCoding.readLengthOfVInt(bb, 0); JournalKey.Type type = JournalKey.Type.fromId(bb.get(offset)); - TxnId txnId = type != JournalKey.Type.COMMAND_DIFF ? TxnId.NONE : CommandSerializers.txnId.deserializeComparable(bb, ByteBufferAccessor.instance, offset + 1); + TxnId txnId; + + if (type.usesTxnId) + txnId = CommandSerializers.txnId.deserializeComparable(bb, ByteBufferAccessor.instance, offset + 1); + else + txnId = TxnId.NONE; + return new JournalKey(txnId, type, storeId); } } diff --git a/src/java/org/apache/cassandra/service/accord/AccordService.java b/src/java/org/apache/cassandra/service/accord/AccordService.java index f65d3616e4..2549cc5b3d 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordService.java +++ b/src/java/org/apache/cassandra/service/accord/AccordService.java @@ -23,7 +23,6 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashSet; -import java.util.Iterator; import java.util.List; import java.util.Set; import java.util.concurrent.ExecutionException; @@ -45,7 +44,6 @@ import org.apache.cassandra.utils.concurrent.ImmediateFuture; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import accord.api.Journal; import accord.api.ProtocolModifiers; import accord.coordinate.CoordinateMaxConflict; import accord.coordinate.CoordinateTransaction; @@ -130,12 +128,14 @@ import org.apache.cassandra.tcm.Epoch; import org.apache.cassandra.tcm.listeners.ChangeListener; import org.apache.cassandra.tcm.membership.NodeId; import org.apache.cassandra.transport.Dispatcher; +import org.apache.cassandra.utils.CloseableIterator; import org.apache.cassandra.utils.ExecutorUtils; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.concurrent.AsyncPromise; import org.apache.cassandra.utils.concurrent.Future; import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException; +import static accord.api.Journal.TopologyUpdate; import static accord.api.ProtocolModifiers.Toggles.FastExec.MAY_BYPASS_SAFESTORE; import static accord.local.LoadKeys.SYNC; import static accord.local.LoadKeysFor.READ_WRITE; @@ -157,7 +157,6 @@ import static org.apache.cassandra.config.DatabaseDescriptor.getPartitioner; import static org.apache.cassandra.journal.Params.ReplayMode.RESET; import static org.apache.cassandra.metrics.ClientRequestsMetricsHolder.accordReadBookkeeping; import static org.apache.cassandra.metrics.ClientRequestsMetricsHolder.accordWriteBookkeeping; -import static org.apache.cassandra.service.accord.journal.AccordTopologyUpdate.ImmutableTopoloyImage; import static org.apache.cassandra.service.consensus.migration.ConsensusRequestRouter.getTableMetadata; import static org.apache.cassandra.utils.Clock.Global.nanoTime; @@ -429,20 +428,22 @@ public class AccordService implements IAccordService, Shutdownable ClusterMetadata metadata = ClusterMetadata.current(); configService.updateMapping(metadata); - List<ImmutableTopoloyImage> images = new ArrayList<>(); + List<TopologyUpdate> images = new ArrayList<>(); + TopologyUpdate prev = null; // Collect locally known topologies - Iterator<ImmutableTopoloyImage> iter = journal.replayTopologies(); - Journal.TopologyUpdate prev = null; - while (iter.hasNext()) + try (CloseableIterator<TopologyUpdate> iter = journal.replayTopologies()) { - ImmutableTopoloyImage next = iter.next(); - // Due to partial compaction, we can clean up only some of the old epochs, creating gaps. We skip these epochs here. - if (prev != null && next.global.epoch() > prev.global.epoch() + 1) - images.clear(); + while (iter.hasNext()) + { + TopologyUpdate next = iter.next(); + // Due to partial compaction, we can clean up only some of the old epochs, creating gaps. We skip these epochs here. + if (prev != null && next.global.epoch() > prev.global.epoch() + 1) + images.clear(); - images.add(next); - prev = next; + images.add(next); + prev = next; + } } // Instantiate latest topology from the log, if known @@ -452,7 +453,7 @@ public class AccordService implements IAccordService, Shutdownable } // Replay local epochs - for (ImmutableTopoloyImage image : images) + for (TopologyUpdate image : images) configService.reportTopology(image.global); // Subscribe to TCM events @@ -524,9 +525,6 @@ public class AccordService implements IAccordService, Shutdownable try { logger.info("Fetching topologies for epochs [{}, {}] from {}", from, metadata.epoch.getEpoch(), peers); - Invariants.require(from <= metadata.epoch.getEpoch(), - "Accord epochs should never be ahead of TCM ones, but %d was ahead of %d", from, metadata.epoch.getEpoch()); - Future<TopologyRange> futures = FetchTopologies.fetch(SharedContext.Global.instance, peers, from, diff --git a/src/java/org/apache/cassandra/service/accord/JournalKey.java b/src/java/org/apache/cassandra/service/accord/JournalKey.java index 9bd42ebaf1..cda98d7c99 100644 --- a/src/java/org/apache/cassandra/service/accord/JournalKey.java +++ b/src/java/org/apache/cassandra/service/accord/JournalKey.java @@ -257,22 +257,24 @@ public final class JournalKey public enum Type { - COMMAND_DIFF (0, new CommandDiffSerializer()), - REDUNDANT_BEFORE (1, new RedundantBeforeSerializer()), - DURABLE_BEFORE (2, new DurableBeforeSerializer()), - SAFE_TO_READ (3, new SafeToReadSerializer()), - BOOTSTRAP_BEGAN_AT (4, new BootstrapBeganAtSerializer()), - RANGES_FOR_EPOCH (5, new RangesForEpochSerializer()), - TOPOLOGY_UPDATE (6, AccordTopologyUpdate.AccumulatingSerializer.defaultInstance), + COMMAND_DIFF (0, new CommandDiffSerializer(), true), + REDUNDANT_BEFORE (1, new RedundantBeforeSerializer(), false), + DURABLE_BEFORE (2, new DurableBeforeSerializer(), false), + SAFE_TO_READ (3, new SafeToReadSerializer(), false), + BOOTSTRAP_BEGAN_AT (4, new BootstrapBeganAtSerializer(), false), + RANGES_FOR_EPOCH (5, new RangesForEpochSerializer(), false), + TOPOLOGY_UPDATE (6, new AccordTopologyUpdate.FlyweightSerializer(), true), ; public final int id; public final FlyweightSerializer<?, ?> serializer; + public final boolean usesTxnId; - Type(int id, FlyweightSerializer<?, ?> serializer) + Type(int id, FlyweightSerializer<?, ?> serializer, boolean usesTxnId) { this.id = id; this.serializer = serializer; + this.usesTxnId = usesTxnId; } private static final Type[] idToTypeMapping; diff --git a/src/java/org/apache/cassandra/service/accord/journal/AccordTopologyUpdate.java b/src/java/org/apache/cassandra/service/accord/journal/AccordTopologyUpdate.java index b45de2c477..c32ca52d29 100644 --- a/src/java/org/apache/cassandra/service/accord/journal/AccordTopologyUpdate.java +++ b/src/java/org/apache/cassandra/service/accord/journal/AccordTopologyUpdate.java @@ -19,16 +19,10 @@ package org.apache.cassandra.service.accord.journal; import java.io.IOException; -import java.util.Iterator; import java.util.Map; -import java.util.NavigableMap; import java.util.Objects; -import java.util.TreeMap; -import java.util.function.Function; - import accord.api.Journal; import accord.local.CommandStores; -import accord.primitives.EpochSupplier; import accord.primitives.Ranges; import accord.topology.Topology; import accord.utils.Invariants; @@ -50,6 +44,7 @@ public interface AccordTopologyUpdate Kind kind(); void applyTo(TopologyImage accumulator); long epoch(); + AccordTopologyUpdate asNoOp(); Journal.TopologyUpdate getUpdate(); static AccordTopologyUpdate newTopology(Journal.TopologyUpdate update) @@ -92,7 +87,7 @@ public interface AccordTopologyUpdate long size = TypeSizes.sizeofUnsignedVInt(from.size()); for (int i = 0; i < from.size(); i++) { - size += TypeSizes.sizeof(from.epochAtIndex(i)); + size += TypeSizes.LONG_SIZE; size += KeySerializers.ranges.serializedSize(from.rangesAtIndex(i)); } return size; @@ -162,22 +157,8 @@ public interface AccordTopologyUpdate break; } case NoOp: - { - TopologyImage image = (TopologyImage) t; - Invariants.require(image.update == null); - if (image.syncStatus == null) - out.writeByte(Byte.MAX_VALUE); - else - out.writeByte(image.syncStatus.ordinal()); - - KeySerializers.ranges.serialize(image.closed, out); - KeySerializers.ranges.serialize(image.retired, out); - break; - } case TopologyImage: - { TopologyImage image = (TopologyImage) t; - out.writeBoolean(image.update != null); if (image.update != null) TopologyUpdateSerializer.instance.serialize(image.update, out); @@ -189,7 +170,6 @@ public interface AccordTopologyUpdate KeySerializers.ranges.serialize(image.closed, out); KeySerializers.ranges.serialize(image.retired, out); break; - } default: throw new UnhandledEnum(t.kind()); } @@ -200,24 +180,14 @@ public interface AccordTopologyUpdate { long epoch = in.readUnsignedVInt(); Kind kind = Kind.values()[in.readUnsignedVInt32()]; + switch (kind) { case NewTopology: return new NewTopology(TopologyUpdateSerializer.instance.deserialize(in)); case NoOp: - { - TopologyImage image = new TopologyImage(epoch, Kind.NoOp); - byte syncStateByte = in.readByte(); - if (syncStateByte != Byte.MAX_VALUE) - image.syncStatus = AccordConfigurationService.SyncStatus.values()[syncStateByte]; - - image.closed = KeySerializers.ranges.deserialize(in); - image.retired = KeySerializers.ranges.deserialize(in); - return image; - } case TopologyImage: - { - TopologyImage image = new TopologyImage(epoch, Kind.TopologyImage); + TopologyImage image = new TopologyImage(epoch); if (in.readBoolean()) image.update = TopologyUpdateSerializer.instance.deserialize(in); @@ -228,7 +198,6 @@ public interface AccordTopologyUpdate image.closed = KeySerializers.ranges.deserialize(in); image.retired = KeySerializers.ranges.deserialize(in); return image; - } default: throw new UnhandledEnum(kind); } @@ -245,31 +214,17 @@ public interface AccordTopologyUpdate case NewTopology: size += TopologyUpdateSerializer.instance.serializedSize(((NewTopology) t).update); break; - case NoOp: - { - TopologyImage image = (TopologyImage) t; - Invariants.require(image.update == null); - - size += Byte.BYTES; - - size += KeySerializers.ranges.serializedSize(image.closed); - size += KeySerializers.ranges.serializedSize(image.retired); - break; - } case TopologyImage: - { + case NoOp: TopologyImage image = (TopologyImage) t; - - size += TypeSizes.sizeof(image.update != null); + size += TypeSizes.BOOL_SIZE; if (image.update != null) size += TopologyUpdateSerializer.instance.serializedSize(image.update); - size += Byte.BYTES; - + size += TypeSizes.BYTE_SIZE; size += KeySerializers.ranges.serializedSize(image.closed); size += KeySerializers.ranges.serializedSize(image.retired); break; - } default: throw new UnhandledEnum(t.kind()); } @@ -284,16 +239,12 @@ public interface AccordTopologyUpdate // Used when accumulating state during compaction or replay TopologyImage, // Effectively unchanged topology + // During compaction, we can write a no-op if we know that from Accord's perspective topology has not changed + // (see CompactionIterator$TopologyCompactor). During replay/deserialization, we collect last known changed + // epoch, and reconstruct its topology. NoOp } - class ImmutableTopoloyImage extends Journal.TopologyUpdate - { - public ImmutableTopoloyImage(TopologyImage image) - { - super(image.update.commandStores, image.update.global); - } - } class TopologyImage implements AccordTopologyUpdate { @@ -304,19 +255,21 @@ public interface AccordTopologyUpdate private Ranges retired = Ranges.EMPTY; private final long epoch; - private final Kind kind; - public TopologyImage(long epoch, Kind kind) + public TopologyImage(long epoch) { - Invariants.require(kind != Kind.NewTopology); this.epoch = epoch; - this.kind = kind; + } + + public TopologyImage(long epoch, Journal.TopologyUpdate update) + { + this.epoch = epoch; + this.update = update; } public TopologyImage asImage(Journal.TopologyUpdate update) { - TopologyImage image = new TopologyImage(epoch, Kind.TopologyImage); - image.update = update.cloneWithEquivalentEpoch(epoch); + TopologyImage image = new TopologyImage(epoch, update.cloneWithEquivalentEpoch(epoch)); image.closed = closed; image.retired = retired; return image; @@ -324,7 +277,7 @@ public interface AccordTopologyUpdate public TopologyImage asNoOp() { - TopologyImage image = new TopologyImage(epoch, Kind.NoOp); + TopologyImage image = new TopologyImage(epoch); image.closed = closed; image.retired = retired; return image; @@ -345,13 +298,19 @@ public interface AccordTopologyUpdate @Override public Kind kind() { - return kind; + return update == null ? Kind.NoOp : Kind.TopologyImage; } @Override public void applyTo(TopologyImage accumulator) { - Invariants.require(accumulator.epoch == epoch); + Invariants.require(accumulator.epoch == epoch, "Expected %d but got %d", epoch, accumulator.epoch); + if (kind() == Kind.NoOp) + { + accumulator.update = null; + return; + } + Invariants.require(accumulator.update == null || accumulator.update.equals(update)); accumulator.update = update; // We're iterating in _reverse_ order @@ -414,6 +373,12 @@ public interface AccordTopologyUpdate accumulator.update = update; } + @Override + public AccordTopologyUpdate asNoOp() + { + return new TopologyImage(epoch); + } + @Override public boolean equals(Object o) { @@ -430,65 +395,30 @@ public interface AccordTopologyUpdate } } - class Accumulator - extends AccordJournalValueSerializers.Accumulator<NavigableMap<Long, TopologyImage>, AccordTopologyUpdate> + class Accumulator extends AccordJournalValueSerializers.Accumulator<TopologyImage, AccordTopologyUpdate> { public Accumulator() { - super(new TreeMap<>()); + super(null); } @Override public void reset(JournalKey key) { - accumulated = new TreeMap<>(); - } - - @Override - public void update(AccordTopologyUpdate newValue) - { - super.update(newValue); - } - - public Iterator<ImmutableTopoloyImage> images() - { - return map(get().values().iterator(), ImmutableTopoloyImage::new); + accumulated = new TopologyImage(key.id.epoch()); } @Override - protected NavigableMap<Long, TopologyImage> accumulate(NavigableMap<Long, TopologyImage> allEpochs, AccordTopologyUpdate update) + protected TopologyImage accumulate(TopologyImage acc, AccordTopologyUpdate update) { - update.applyTo(allEpochs.computeIfAbsent(update.epoch(), v -> new TopologyImage(update.epoch(), Kind.TopologyImage))); - return allEpochs; + update.applyTo(acc); + return acc; } } - static <FROM, TO> Iterator<TO> map(Iterator<FROM> iter, Function<FROM, TO> fn) - { - return new Iterator<TO>() - { - public boolean hasNext() - { - return iter.hasNext(); - } - - public TO next() - { - return fn.apply(iter.next()); - } - }; - } - - class AccumulatingSerializer - implements AccordJournalValueSerializers.FlyweightSerializer<AccordTopologyUpdate, Accumulator> + class FlyweightSerializer implements AccordJournalValueSerializers.FlyweightSerializer<AccordTopologyUpdate, Accumulator> { - public static final AccumulatingSerializer defaultInstance = new AccumulatingSerializer(() -> 0); - - private final EpochSupplier minEpoch; - public AccumulatingSerializer(EpochSupplier minEpoch) - { - this.minEpoch = minEpoch; - } + public FlyweightSerializer() {} @Override public Accumulator mergerFor() @@ -499,44 +429,19 @@ public interface AccordTopologyUpdate @Override public void serialize(JournalKey key, AccordTopologyUpdate from, DataOutputPlus out, Version version) throws IOException { - out.writeUnsignedVInt32(1); Serializer.instance.serialize(from, out); } @Override public void reserialize(JournalKey key, Accumulator from, DataOutputPlus out, Version version) throws IOException { - out.writeUnsignedVInt32(from.get().size()); - Journal.TopologyUpdate prev = null; - for (TopologyImage value : from.get().values()) - { - Journal.TopologyUpdate tmp = value.update; - if (prev != null && value.update.isEquivalent(prev)) - value = value.asNoOp(); - - prev = tmp; - Serializer.instance.serialize(value, out); - } + serialize(key, from.get(), out, version); } @Override public void deserialize(JournalKey key, Accumulator into, DataInputPlus in, Version version) throws IOException { - long minEpoch = this.minEpoch.epoch(); - int count = in.readUnsignedVInt32(); - AccordTopologyUpdate prev = null; - while (--count >= 0) - { - AccordTopologyUpdate update = Serializer.instance.deserialize(in); - if (update.kind() == Kind.NoOp) - { - Invariants.require(prev != null); - update = ((TopologyImage) update).asImage(prev.getUpdate()); - } - if (update.epoch() >= minEpoch) - into.update(update); - prev = update; - } + into.update(Serializer.instance.deserialize(in)); } } } \ No newline at end of file diff --git a/test/distributed/org/apache/cassandra/fuzz/topology/JournalGCTest.java b/test/distributed/org/apache/cassandra/fuzz/topology/JournalGCTest.java index 064d050207..5ba2a7c5d3 100644 --- a/test/distributed/org/apache/cassandra/fuzz/topology/JournalGCTest.java +++ b/test/distributed/org/apache/cassandra/fuzz/topology/JournalGCTest.java @@ -89,6 +89,7 @@ public class JournalGCTest extends FuzzTestBase if (pk > 0 && pk % 100 == 0) { + cluster.schemaChange("ALTER TABLE " + schema.keyspace + '.' + schema.table + " WITH comment='" + pk + "';"); cluster.get(1).runOnInstance(() -> { ((AccordService) AccordService.instance()).journal().closeCurrentSegmentForTestingIfNonEmpty(); ((AccordService) AccordService.instance()).journal().compactor().run(); diff --git a/test/unit/org/apache/cassandra/service/accord/AccordConfigurationServiceTest.java b/test/unit/org/apache/cassandra/service/accord/AccordConfigurationServiceTest.java index 3bea823ae2..535bf7e528 100644 --- a/test/unit/org/apache/cassandra/service/accord/AccordConfigurationServiceTest.java +++ b/test/unit/org/apache/cassandra/service/accord/AccordConfigurationServiceTest.java @@ -27,10 +27,10 @@ import java.util.List; import java.util.Optional; import java.util.UUID; +import accord.api.Journal.TopologyUpdate; import org.junit.BeforeClass; import org.junit.Test; -import accord.api.Journal; import accord.impl.AbstractConfigurationServiceTest; import accord.local.Node.Id; import accord.topology.Topology; @@ -63,7 +63,6 @@ import org.apache.cassandra.schema.TableId; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.schema.Tables; import org.apache.cassandra.service.accord.api.AccordAgent; -import org.apache.cassandra.service.accord.journal.AccordTopologyUpdate; import org.apache.cassandra.tcm.ClusterMetadata; import org.apache.cassandra.tcm.ValidatingClusterMetadataService; import org.apache.cassandra.tcm.membership.Location; @@ -180,7 +179,7 @@ public class AccordConfigurationServiceTest public AsyncResult<Void> onTopologyUpdate(Topology topology, boolean isLoad, boolean startSync) { // Fake journal save - journal_.saveTopology(new Journal.TopologyUpdate(new Int2ObjectHashMap<>(), topology), () -> {}); + journal_.saveTopology(new TopologyUpdate(new Int2ObjectHashMap<>(), topology), () -> {}); return super.onTopologyUpdate(topology, isLoad, startSync); } }; @@ -205,7 +204,8 @@ public class AccordConfigurationServiceTest loaded.updateMapping(mappingForEpoch(cms.metadata().epoch.getEpoch() + 1)); listener = new AbstractConfigurationServiceTest.TestListener(loaded, true); loaded.registerListener(listener); - Iterator<AccordTopologyUpdate.ImmutableTopoloyImage> iter = journal.replayTopologies(); + journal_.closeCurrentSegmentForTestingIfNonEmpty(); + Iterator<TopologyUpdate> iter = journal.replayTopologies(); // Simulate journal replay while (iter.hasNext()) loaded.reportTopology(iter.next().global); diff --git a/test/unit/org/apache/cassandra/service/accord/journal/AccordTopologyUpdateTest.java b/test/unit/org/apache/cassandra/service/accord/journal/AccordTopologyUpdateTest.java index a1f7a738d0..c0c790ba47 100644 --- a/test/unit/org/apache/cassandra/service/accord/journal/AccordTopologyUpdateTest.java +++ b/test/unit/org/apache/cassandra/service/accord/journal/AccordTopologyUpdateTest.java @@ -141,8 +141,8 @@ public class AccordTopologyUpdateTest switch (kind) { case NewTopology: return new AccordTopologyUpdate.NewTopology(topologyUpdateGen.next(rs)); - case TopologyImage: return new AccordTopologyUpdate.TopologyImage(epochGen.nextLong(rs), AccordTopologyUpdate.Kind.TopologyImage); - case NoOp: return new AccordTopologyUpdate.TopologyImage(epochGen.nextLong(rs), AccordTopologyUpdate.Kind.NoOp); + case TopologyImage: return new AccordTopologyUpdate.TopologyImage(epochGen.nextLong(rs), topologyUpdateGen.next(rs)); + case NoOp: return new AccordTopologyUpdate.TopologyImage(epochGen.nextLong(rs)); default: throw new AssertionError("Unknown kind: " + kind); } }; @@ -155,10 +155,9 @@ public class AccordTopologyUpdateTest private static void maybeUpdatePartitioner(AccordTopologyUpdate expected) { - if (expected instanceof AccordTopologyUpdate.NewTopology) - { - maybeUpdatePartitioner(((AccordTopologyUpdate.NewTopology) expected).update); - } + Journal.TopologyUpdate update = expected.getUpdate(); + if (update != null) + maybeUpdatePartitioner(expected.getUpdate()); } private void maybeUpdatePartitioner(CommandStores.RangesForEpoch expected) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
