This is an automated email from the ASF dual-hosted git repository. benedict pushed a commit to branch cep-15-accord in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 4319c14893adcb88c6e9abe9b600484e741ce3cf Author: Alex Petrov <oleksandr.pet...@gmail.com> AuthorDate: Wed Oct 2 13:35:57 2024 +0200 More follow-up to CASSANDRA-19967 and CASSANDRA-19869 --- modules/accord | 2 +- .../db/compaction/CompactionIterator.java | 42 ++++++++-- src/java/org/apache/cassandra/journal/Journal.java | 16 +++- .../org/apache/cassandra/journal/Segments.java | 8 ++ .../apache/cassandra/journal/StaticSegment.java | 9 ++- .../service/accord/AccordCommandStore.java | 11 ++- .../cassandra/service/accord/AccordJournal.java | 82 +++++++++++++++++--- .../service/accord/AccordSafeCommandStore.java | 90 +++++++++++++++++----- .../service/accord/AccordSegmentCompactor.java | 10 +++ .../cassandra/service/accord/AccordService.java | 11 ++- .../service/accord/CommandsForRangesLoader.java | 2 +- .../cassandra/service/accord/IAccordService.java | 8 +- .../cassandra/service/accord/SavedCommand.java | 64 +++++++++++---- .../service/accord/async/AsyncOperation.java | 23 +++--- .../test/accord/AccordBootstrapTest.java | 28 +++---- .../distributed/test/accord/AccordLoadTest.java | 16 +++- .../accord/AccordJournalCompactionTest.java | 22 +++--- .../compaction/CompactionAccordIteratorsTest.java | 5 +- .../service/accord/AccordCommandStoreTest.java | 5 +- .../cassandra/service/accord/MockJournal.java | 24 +++--- .../cassandra/service/accord/SavedCommandTest.java | 1 + 21 files changed, 350 insertions(+), 129 deletions(-) diff --git a/modules/accord b/modules/accord index 4cf0070d60..4a8566af7b 160000 --- a/modules/accord +++ b/modules/accord @@ -1 +1 @@ -Subproject commit 4cf0070d604abd2db460a5f1c3f8cd8dc7d26696 +Subproject commit 4a8566af7b7de2ddec2c7527d7e2da593f99865f diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java index 3e9a1de462..fc993e98da 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java @@ -61,6 +61,7 @@ import org.apache.cassandra.db.RegularAndStaticColumns; import org.apache.cassandra.db.SystemKeyspace; import org.apache.cassandra.db.filter.ColumnFilter; import org.apache.cassandra.db.marshal.Int32Type; +import org.apache.cassandra.db.marshal.LongType; import org.apache.cassandra.db.partitions.PartitionUpdate; import org.apache.cassandra.db.partitions.PurgeFunction; import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; @@ -109,6 +110,7 @@ import org.apache.cassandra.service.paxos.uncommitted.PaxosRows; import org.apache.cassandra.utils.TimeUUID; import static accord.local.Cleanup.ERASE; +import static accord.local.Cleanup.TRUNCATE; import static accord.local.Cleanup.TRUNCATE_WITH_OUTCOME; import static accord.local.Cleanup.shouldCleanupPartial; import static com.google.common.base.Preconditions.checkState; @@ -148,6 +150,7 @@ public class CompactionIterator extends CompactionInfo.Holder implements Unfilte { private static final Logger logger = LoggerFactory.getLogger(CompactionIterator.class); private static final long UNFILTERED_TO_UPDATE_PROGRESS = 100; + private static Object[] TRUNCATE_CLUSTERING_VALUE = new Object[] { Long.MAX_VALUE, Integer.MAX_VALUE }; private final OperationType type; private final AbstractCompactionController controller; @@ -806,8 +809,8 @@ public class CompactionIterator extends CompactionInfo.Holder implements Unfilte class AccordCommandsPurger extends AbstractPurger { final Int2ObjectHashMap<RedundantBefore> redundantBefores; + final Int2ObjectHashMap<DurableBefore> durableBefores; final Int2ObjectHashMap<RangesForEpoch> ranges; - final DurableBefore durableBefore; int storeId; TxnId txnId; @@ -817,7 +820,7 @@ public class CompactionIterator extends CompactionInfo.Holder implements Unfilte IAccordService.CompactionInfo compactionInfo = accordService.get().getCompactionInfo(); this.redundantBefores = compactionInfo.redundantBefores; this.ranges = compactionInfo.ranges; - this.durableBefore = compactionInfo.durableBefore; + this.durableBefores = compactionInfo.durableBefores; } protected void beginPartition(UnfilteredRowIterator partition) @@ -833,6 +836,7 @@ public class CompactionIterator extends CompactionInfo.Holder implements Unfilte updateProgress(); RedundantBefore redundantBefore = redundantBefores.get(storeId); + DurableBefore durableBefore = durableBefores.get(storeId); // TODO (expected): if the store has been retired, this should return null if (redundantBefore == null) return row; @@ -1013,8 +1017,8 @@ public class CompactionIterator extends CompactionInfo.Holder implements Unfilte class AccordJournalPurger extends AbstractPurger { final Int2ObjectHashMap<RedundantBefore> redundantBefores; + final Int2ObjectHashMap<DurableBefore> durableBefores; final Int2ObjectHashMap<CommandStores.RangesForEpoch> ranges; - final DurableBefore durableBefore; final ColumnMetadata recordColumn; final ColumnMetadata versionColumn; final KeySupport<JournalKey> keySupport = JournalKey.SUPPORT; @@ -1026,6 +1030,8 @@ public class CompactionIterator extends CompactionInfo.Holder implements Unfilte Object[] lastClustering = null; long maxSeenTimestamp = -1; final int userVersion; + long lastDescriptor = -1; + int lastOffset = -1; public AccordJournalPurger(Supplier<IAccordService> serviceSupplier) { @@ -1036,7 +1042,7 @@ public class CompactionIterator extends CompactionInfo.Holder implements Unfilte this.redundantBefores = compactionInfo.redundantBefores; this.ranges = compactionInfo.ranges; - this.durableBefore = compactionInfo.durableBefore; + this.durableBefores = compactionInfo.durableBefores; ColumnFamilyStore cfs = Keyspace.open(AccordKeyspace.metadata().name).getColumnFamilyStore(AccordKeyspace.JOURNAL); this.recordColumn = cfs.metadata().getColumn(ColumnIdentifier.getInterned("record", false)); this.versionColumn = cfs.metadata().getColumn(ColumnIdentifier.getInterned("user_version", false)); @@ -1050,6 +1056,8 @@ public class CompactionIterator extends CompactionInfo.Holder implements Unfilte serializer = (AccordJournalValueSerializers.FlyweightSerializer<Object, Object>) key.type.serializer; builder = serializer.mergerFor(key); maxSeenTimestamp = -1; + lastDescriptor = -1; + lastOffset = -1; } @Override @@ -1096,6 +1104,7 @@ public class CompactionIterator extends CompactionInfo.Holder implements Unfilte } RedundantBefore redundantBefore = redundantBefores.get(key.commandStoreId); + DurableBefore durableBefore = durableBefores.get(key.commandStoreId); Cleanup cleanup = commandBuilder.shouldCleanup(redundantBefore, durableBefore); if (cleanup == ERASE) return PartitionUpdate.fullPartitionDelete(metadata(), partition.partitionKey(), maxSeenTimestamp, nowInSec).unfilteredIterator(); @@ -1107,9 +1116,16 @@ public class CompactionIterator extends CompactionInfo.Holder implements Unfilte return null; PartitionUpdate.SimpleBuilder newVersion = PartitionUpdate.simpleBuilder(AccordKeyspace.Journal, partition.partitionKey()); - newVersion.row(lastClustering) - .add("record", commandBuilder.asByteBuffer(userVersion)) + + Row.SimpleBuilder rowBuilder; + if (cleanup == TRUNCATE || cleanup == TRUNCATE_WITH_OUTCOME) + rowBuilder = newVersion.row(TRUNCATE_CLUSTERING_VALUE); + else + rowBuilder = newVersion.row(lastClustering); + + rowBuilder.add("record", commandBuilder.asByteBuffer(userVersion)) .add("user_version", userVersion); + return newVersion.build().unfilteredIterator(); } @@ -1133,6 +1149,20 @@ public class CompactionIterator extends CompactionInfo.Holder implements Unfilte updateProgress(); maxSeenTimestamp = row.primaryKeyLivenessInfo().timestamp(); ByteBuffer record = row.getCell(recordColumn).buffer(); + long descriptor = LongType.instance.compose(row.clustering().getBufferArray()[0]); + int offset = Int32Type.instance.compose(row.clustering().getBufferArray()[1]); + + if (lastOffset != -1) + { + Invariants.checkState(descriptor >= lastDescriptor, + "Descriptors were accessed out of order: %d was accessed after %d", descriptor, lastDescriptor); + Invariants.checkState(descriptor != lastDescriptor || + offset > lastOffset, + "Offsets within %s were accessed out of order: %d was accessed after %s", offset, lastOffset); + } + lastDescriptor = descriptor; + lastOffset = offset; + try (DataInputBuffer in = new DataInputBuffer(record, false)) { int userVersion = Int32Type.instance.compose(row.getCell(versionColumn).buffer()); diff --git a/src/java/org/apache/cassandra/journal/Journal.java b/src/java/org/apache/cassandra/journal/Journal.java index d47d801d0e..5e91c7d3d3 100644 --- a/src/java/org/apache/cassandra/journal/Journal.java +++ b/src/java/org/apache/cassandra/journal/Journal.java @@ -525,6 +525,8 @@ public class Journal<K, V> implements Shutdownable ActiveSegment<K, V>.Allocation alloc; while (null == (alloc = segment.allocate(entrySize, hosts))) { + if (entrySize >= (params.segmentSize() * 3) / 4) + throw new IllegalStateException("entrySize " + entrySize + " too large for a segmentSize of " + params.segmentSize()); // failed to allocate; move to a new segment with enough room advanceSegment(segment); segment = currentSegment; @@ -776,6 +778,11 @@ public class Journal<K, V> implements Shutdownable swapSegments(current -> current.withNewActiveSegment(activeSegment)); } + private void removeEmptySegment(ActiveSegment<K, V> activeSegment) + { + swapSegments(current -> current.withoutEmptySegment(activeSegment)); + } + private void replaceCompletedSegment(ActiveSegment<K, V> activeSegment, StaticSegment<K, V> staticSegment) { swapSegments(current -> current.withCompletedSegment(activeSegment, staticSegment)); @@ -869,6 +876,13 @@ public class Journal<K, V> implements Shutdownable void closeActiveSegmentAndOpenAsStatic(ActiveSegment<K, V> activeSegment) { + if (activeSegment.isEmpty()) + { + removeEmptySegment(activeSegment); + activeSegment.closeAndDiscard(); + return; + } + closer.execute(new CloseActiveSegmentRunnable(activeSegment)); } @@ -973,7 +987,7 @@ public class Journal<K, V> implements Shutdownable private StaticSegmentIterator() { this.segments = selectAndReference(Segment::isStatic); - this.readers = new PriorityQueue<>((o1, o2) -> keySupport.compare(o1.key(), o2.key())); + this.readers = new PriorityQueue<>(); for (Segment<K, V> segment : this.segments.all()) { StaticSegment<K, V> staticSegment = (StaticSegment<K, V>)segment; diff --git a/src/java/org/apache/cassandra/journal/Segments.java b/src/java/org/apache/cassandra/journal/Segments.java index a779aebf23..94282e9d87 100644 --- a/src/java/org/apache/cassandra/journal/Segments.java +++ b/src/java/org/apache/cassandra/journal/Segments.java @@ -63,6 +63,14 @@ class Segments<K, V> return new Segments<>(newSegments); } + Segments<K, V> withoutEmptySegment(ActiveSegment<K, V> activeSegment) + { + Long2ObjectHashMap<Segment<K, V>> newSegments = new Long2ObjectHashMap<>(segments); + Segment<K, V> oldValue = segments.remove(activeSegment.descriptor.timestamp); + Invariants.checkState(oldValue.asActive().isEmpty()); + return new Segments<>(newSegments); + } + Segments<K, V> withCompletedSegment(ActiveSegment<K, V> activeSegment, StaticSegment<K, V> staticSegment) { Invariants.checkArgument(activeSegment.descriptor.equals(staticSegment.descriptor)); diff --git a/src/java/org/apache/cassandra/journal/StaticSegment.java b/src/java/org/apache/cassandra/journal/StaticSegment.java index c7ac7ce410..f5f15ee13c 100644 --- a/src/java/org/apache/cassandra/journal/StaticSegment.java +++ b/src/java/org/apache/cassandra/journal/StaticSegment.java @@ -473,9 +473,12 @@ public final class StaticSegment<K, V> extends Segment<K, V> that.ensureHasAdvanced(); int cmp = keySupport.compare(this.key(), that.key()); - return cmp != 0 - ? cmp - : this.descriptor.compareTo(that.descriptor); + if (cmp != 0) + return cmp; + cmp = Long.compare(this.descriptor.timestamp, that.descriptor.timestamp); + if (cmp != 0) + return cmp; + return Integer.compare(this.offset, that.offset); } } } \ No newline at end of file diff --git a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java index a2367e5768..418c5cd687 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java +++ b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java @@ -502,9 +502,8 @@ public class AccordCommandStore extends CommandStore // We find a set of dependencies for a range then update CommandsFor to know about them Ranges allRanges = safeStore.ranges().all(); deps.keyDeps.keys().forEach(allRanges, key -> { - // TODO (now): batch register to minimise GC + // TODO (desired): batch register to minimise GC deps.keyDeps.forEach(key, (txnId, txnIdx) -> { - // TODO (desired, efficiency): this can be made more efficient by batching by epoch if (ranges.coordinates(txnId).contains(key)) return; // already coordinates, no need to replicate if (!ranges.allBefore(txnId.epoch()).contains(key)) @@ -525,13 +524,13 @@ public class AccordCommandStore extends CommandStore if (!ranges.allBefore(txnId.epoch()).intersects(range)) return; + // TODO (required): this is potentially not safe - it should not be persisted until we save in journal + // but, preferable to retire historical transactions as a concept entirely, and rely on ExclusiveSyncPoints instead diskCommandsForRanges().mergeHistoricalTransaction(txnId, Ranges.single(range).slice(allRanges), Ranges::with); }); } } - public NavigableMap<Timestamp, Ranges> safeToRead() { return super.safeToRead(); } - public void appendCommands(List<SavedCommand.DiffWriter> diffs, Runnable onFlush) { for (int i = 0; i < diffs.size(); i++) @@ -545,7 +544,7 @@ public class AccordCommandStore extends CommandStore @VisibleForTesting public Command loadCommand(TxnId txnId) { - return journal.loadCommand(id, txnId, redundantBefore(), durableBefore()); + return journal.loadCommand(id, txnId, unsafeGetRedundantBefore(), unsafeGetDurableBefore()); } public interface Loader @@ -592,7 +591,7 @@ public class AccordCommandStore extends CommandStore Command local = command; if (local.status() != Truncated && local.status() != Invalidated) { - Cleanup cleanup = Cleanup.shouldCleanup(AccordCommandStore.this, local, local.participants()); + Cleanup cleanup = Cleanup.shouldCleanup(local, unsafeGetRedundantBefore(), unsafeGetDurableBefore()); switch (cleanup) { case NO: diff --git a/src/java/org/apache/cassandra/service/accord/AccordJournal.java b/src/java/org/apache/cassandra/service/accord/AccordJournal.java index c1292d5322..eb8c0007f2 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordJournal.java +++ b/src/java/org/apache/cassandra/service/accord/AccordJournal.java @@ -253,18 +253,18 @@ public class AccordJournal implements IJournal, Shutdownable { RecordPointer pointer = null; // TODO: avoid allocating keys - if (fieldUpdates.redundantBefore != null) - pointer = appendInternal(new JournalKey(TxnId.NONE, JournalKey.Type.REDUNDANT_BEFORE, store), fieldUpdates.redundantBefore); - if (fieldUpdates.durableBefore != null) - pointer = appendInternal(new JournalKey(TxnId.NONE, JournalKey.Type.DURABLE_BEFORE, store), fieldUpdates.durableBefore); - if (fieldUpdates.bootstrapBeganAt != null) - pointer = appendInternal(new JournalKey(TxnId.NONE, JournalKey.Type.BOOTSTRAP_BEGAN_AT, store), fieldUpdates.bootstrapBeganAt); - if (fieldUpdates.safeToRead != null) - pointer = appendInternal(new JournalKey(TxnId.NONE, JournalKey.Type.SAFE_TO_READ, store), fieldUpdates.safeToRead); - if (fieldUpdates.rangesForEpoch != null) - pointer = appendInternal(new JournalKey(TxnId.NONE, JournalKey.Type.RANGES_FOR_EPOCH, store), fieldUpdates.rangesForEpoch); - if (fieldUpdates.historicalTransactions != null) - pointer = appendInternal(new JournalKey(TxnId.NONE, JournalKey.Type.HISTORICAL_TRANSACTIONS, store), fieldUpdates.historicalTransactions); + if (fieldUpdates.addRedundantBefore != null) + pointer = appendInternal(new JournalKey(TxnId.NONE, JournalKey.Type.REDUNDANT_BEFORE, store), fieldUpdates.addRedundantBefore); + if (fieldUpdates.addDurableBefore != null) + pointer = appendInternal(new JournalKey(TxnId.NONE, JournalKey.Type.DURABLE_BEFORE, store), fieldUpdates.addDurableBefore); + if (fieldUpdates.newBootstrapBeganAt != null) + pointer = appendInternal(new JournalKey(TxnId.NONE, JournalKey.Type.BOOTSTRAP_BEGAN_AT, store), fieldUpdates.newBootstrapBeganAt); + if (fieldUpdates.newSafeToRead != null) + pointer = appendInternal(new JournalKey(TxnId.NONE, JournalKey.Type.SAFE_TO_READ, store), fieldUpdates.newSafeToRead); + if (fieldUpdates.newRangesForEpoch != null) + pointer = appendInternal(new JournalKey(TxnId.NONE, JournalKey.Type.RANGES_FOR_EPOCH, store), fieldUpdates.newRangesForEpoch); + if (fieldUpdates.addHistoricalTransactions != null) + pointer = appendInternal(new JournalKey(TxnId.NONE, JournalKey.Type.HISTORICAL_TRANSACTIONS, store), fieldUpdates.addHistoricalTransactions); if (onFlush == null) return; @@ -414,4 +414,62 @@ public class AccordJournal implements IJournal, Shutdownable isReplay.set(false); } } + + // TODO: this is here temporarily; for debugging purposes + @VisibleForTesting + public void checkAllCommands() + { + try (AccordJournalTable.KeyOrderIterator<JournalKey> iter = journalTable.readAll()) + { + IAccordService.CompactionInfo compactionInfo = AccordService.instance().getCompactionInfo(); + JournalKey key; + SavedCommand.Builder builder = new SavedCommand.Builder(); + while ((key = iter.key()) != null) + { + builder.reset(key.id); + if (key.type != JournalKey.Type.COMMAND_DIFF) + { + // TODO (required): add "skip" for the key to avoid getting stuck + iter.readAllForKey(key, (segment, position, key1, buffer, hosts, userVersion) -> {}); + continue; + } + + JournalKey finalKey = key; + List<RecordPointer> pointers = new ArrayList<>(); + try + { + iter.readAllForKey(key, (segment, position, local, buffer, hosts, userVersion) -> { + pointers.add(new RecordPointer(segment, position)); + Invariants.checkState(finalKey.equals(local)); + try (DataInputBuffer in = new DataInputBuffer(buffer, false)) + { + builder.deserializeNext(in, userVersion); + } + catch (IOException e) + { + // can only throw if serializer is buggy + throw new RuntimeException(e); + } + }); + + Cleanup cleanup = builder.shouldCleanup(compactionInfo.redundantBefores.get(key.commandStoreId), compactionInfo.durableBefores.get(key.commandStoreId)); + switch (cleanup) + { + case ERASE: + case EXPUNGE: + case EXPUNGE_PARTIAL: + case VESTIGIAL: + continue; + } + builder.construct(); + } + catch (Throwable t) + { + throw new RuntimeException(String.format("Caught an exception after iterating over: %s", pointers), + t); + } + } + + } + } } \ No newline at end of file diff --git a/src/java/org/apache/cassandra/service/accord/AccordSafeCommandStore.java b/src/java/org/apache/cassandra/service/accord/AccordSafeCommandStore.java index fae5e4634f..34cb57ed5b 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordSafeCommandStore.java +++ b/src/java/org/apache/cassandra/service/accord/AccordSafeCommandStore.java @@ -284,45 +284,78 @@ public class AccordSafeCommandStore extends AbstractSafeCommandStore<AccordSafeC @Override public void upsertRedundantBefore(RedundantBefore addRedundantBefore) { - // TODO (now): this is a temporary measure, see comment on AccordJournalValueSerializers; upsert instead + // TODO (required): this is a temporary measure, see comment on AccordJournalValueSerializers; upsert instead // when modifying, only modify together with AccordJournalValueSerializers - ensureFieldUpdates().redundantBefore = RedundantBefore.merge(commandStore.redundantBefore(), addRedundantBefore); - super.upsertRedundantBefore(addRedundantBefore); + ensureFieldUpdates().newRedundantBefore = ensureFieldUpdates().addRedundantBefore = RedundantBefore.merge(redundantBefore(), addRedundantBefore); } @Override public void setBootstrapBeganAt(NavigableMap<TxnId, Ranges> newBootstrapBeganAt) { - ensureFieldUpdates().bootstrapBeganAt = newBootstrapBeganAt; - super.setBootstrapBeganAt(newBootstrapBeganAt); + ensureFieldUpdates().newBootstrapBeganAt = newBootstrapBeganAt; } @Override public void upsertDurableBefore(DurableBefore addDurableBefore) { - ensureFieldUpdates().durableBefore = addDurableBefore; - super.upsertDurableBefore(addDurableBefore); + ensureFieldUpdates().addDurableBefore = addDurableBefore; } @Override public void setSafeToRead(NavigableMap<Timestamp, Ranges> newSafeToRead) { - ensureFieldUpdates().safeToRead = newSafeToRead; - super.setSafeToRead(newSafeToRead); + ensureFieldUpdates().newSafeToRead = newSafeToRead; } @Override public void setRangesForEpoch(CommandStores.RangesForEpoch rangesForEpoch) { - ensureFieldUpdates().rangesForEpoch = rangesForEpoch.snapshot(); - super.setRangesForEpoch(rangesForEpoch); + ensureFieldUpdates().newRangesForEpoch = rangesForEpoch.snapshot(); ranges = rangesForEpoch; } + @Override + public NavigableMap<TxnId, Ranges> bootstrapBeganAt() + { + if (fieldUpdates != null && fieldUpdates.newBootstrapBeganAt != null) + return fieldUpdates.newBootstrapBeganAt; + + return super.bootstrapBeganAt(); + } + + @Override + public NavigableMap<Timestamp, Ranges> safeToReadAt() + { + if (fieldUpdates != null && fieldUpdates.newSafeToRead != null) + return fieldUpdates.newSafeToRead; + + return super.safeToReadAt(); + } + + @Override + public RedundantBefore redundantBefore() + { + if (fieldUpdates != null && fieldUpdates.newRedundantBefore != null) + return fieldUpdates.newRedundantBefore; + + return super.redundantBefore(); + } + + @Override + public DurableBefore durableBefore() + { + if (fieldUpdates != null && fieldUpdates.newDurableBefore != null) + return fieldUpdates.newDurableBefore; + + return super.durableBefore(); + } + @Override protected void registerHistoricalTransactions(Deps deps) { - ensureFieldUpdates().historicalTransactions = deps; + ensureFieldUpdates().addHistoricalTransactions = deps; + // TODO (required): it is potentially unsafe to propagate this synchronously, as if we fail to write to the journal we may be in an inconsistent state + // however, we can and should retire the concept of historical transactions in favour of ExclusiveSyncPoints ensuring their deps are known super.registerHistoricalTransactions(deps); } @@ -337,13 +370,34 @@ public class AccordSafeCommandStore extends AbstractSafeCommandStore<AccordSafeC return fieldUpdates; } + public void postExecute() + { + if (fieldUpdates == null) + return; + + if (fieldUpdates.newRedundantBefore != null) + super.unsafeSetRedundantBefore(fieldUpdates.newRedundantBefore); + + if (fieldUpdates.newDurableBefore != null) + super.unsafeSetDurableBefore(fieldUpdates.newDurableBefore); + + if (fieldUpdates.newBootstrapBeganAt != null) + super.setBootstrapBeganAt(fieldUpdates.newBootstrapBeganAt); + + if (fieldUpdates.newSafeToRead != null) + super.setSafeToRead(fieldUpdates.newSafeToRead); + + if (fieldUpdates.newRangesForEpoch != null) + super.setRangesForEpoch(ranges); + } + public static class FieldUpdates { - public RedundantBefore redundantBefore; - public DurableBefore durableBefore; - public NavigableMap<TxnId, Ranges> bootstrapBeganAt; - public NavigableMap<Timestamp, Ranges> safeToRead; - public RangesForEpoch.Snapshot rangesForEpoch; - public Deps historicalTransactions; + public RedundantBefore addRedundantBefore, newRedundantBefore; + public DurableBefore addDurableBefore, newDurableBefore; + public NavigableMap<TxnId, Ranges> newBootstrapBeganAt; + public NavigableMap<Timestamp, Ranges> newSafeToRead; + public RangesForEpoch.Snapshot newRangesForEpoch; + public Deps addHistoricalTransactions; } } \ No newline at end of file diff --git a/src/java/org/apache/cassandra/service/accord/AccordSegmentCompactor.java b/src/java/org/apache/cassandra/service/accord/AccordSegmentCompactor.java index f0c7b38c37..f94510b8b8 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordSegmentCompactor.java +++ b/src/java/org/apache/cassandra/service/accord/AccordSegmentCompactor.java @@ -99,6 +99,8 @@ public class AccordSegmentCompactor<V> implements SegmentCompactor<JournalKey, V key = reader.key(); serializer = (FlyweightSerializer<Object, Object>) key.type.serializer; builder = serializer.mergerFor(key); + lastOffset = -1; + lastDescriptor = -1; } boolean advanced; @@ -106,6 +108,14 @@ public class AccordSegmentCompactor<V> implements SegmentCompactor<JournalKey, V { try (DataInputBuffer in = new DataInputBuffer(reader.record(), false)) { + if (lastDescriptor != -1) + { + Invariants.checkState(reader.descriptor.timestamp >= lastDescriptor, + "Descriptors were accessed out of order: %d was accessed after %d", reader.descriptor.timestamp, lastDescriptor); + Invariants.checkState(reader.descriptor.timestamp != lastDescriptor || + reader.offset() > lastOffset, + "Offsets within %s were accessed out of order: %d was accessed after %s", reader.offset(), lastOffset); + } serializer.deserialize(key, builder, in, reader.descriptor.userVersion); lastDescriptor = reader.descriptor.timestamp; lastOffset = reader.offset(); diff --git a/src/java/org/apache/cassandra/service/accord/AccordService.java b/src/java/org/apache/cassandra/service/accord/AccordService.java index bce8a198da..e051a53803 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordService.java +++ b/src/java/org/apache/cassandra/service/accord/AccordService.java @@ -31,7 +31,6 @@ import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiFunction; import java.util.function.Function; import java.util.function.Supplier; @@ -298,7 +297,7 @@ public class AccordService implements IAccordService, Shutdownable @Override public CompactionInfo getCompactionInfo() { - return new CompactionInfo(new Int2ObjectHashMap<>(), new Int2ObjectHashMap<>(), DurableBefore.EMPTY); + return new CompactionInfo(new Int2ObjectHashMap<>(), new Int2ObjectHashMap<>(), new Int2ObjectHashMap<>()); } @Override @@ -1261,17 +1260,17 @@ public class AccordService implements IAccordService, Shutdownable public CompactionInfo getCompactionInfo() { Int2ObjectHashMap<RedundantBefore> redundantBefores = new Int2ObjectHashMap<>(); + Int2ObjectHashMap<DurableBefore> durableBefores = new Int2ObjectHashMap<>(); Int2ObjectHashMap<RangesForEpoch> ranges = new Int2ObjectHashMap<>(); - AtomicReference<DurableBefore> durableBefore = new AtomicReference<>(DurableBefore.EMPTY); AsyncChains.getBlockingAndRethrow(node.commandStores().forEach(safeStore -> { synchronized (redundantBefores) { - redundantBefores.put(safeStore.commandStore().id(), safeStore.commandStore().redundantBefore()); + redundantBefores.put(safeStore.commandStore().id(), safeStore.redundantBefore()); ranges.put(safeStore.commandStore().id(), safeStore.ranges()); + durableBefores.put(safeStore.commandStore().id(), safeStore.durableBefore()); } - durableBefore.set(DurableBefore.merge(durableBefore.get(), safeStore.commandStore().durableBefore())); })); - return new CompactionInfo(redundantBefores, ranges, durableBefore.get()); + return new CompactionInfo(redundantBefores, ranges, durableBefores); } @Override diff --git a/src/java/org/apache/cassandra/service/accord/CommandsForRangesLoader.java b/src/java/org/apache/cassandra/service/accord/CommandsForRangesLoader.java index 6324735883..e7e1461054 100644 --- a/src/java/org/apache/cassandra/service/accord/CommandsForRangesLoader.java +++ b/src/java/org/apache/cassandra/service/accord/CommandsForRangesLoader.java @@ -187,7 +187,7 @@ public class CommandsForRangesLoader { //TODO (now): this logic is kinda duplicate of org.apache.cassandra.service.accord.CommandsForRange.mapReduce // should figure out if this can be improved... also what is correct? - var durableBefore = store.durableBefore(); + var durableBefore = store.unsafeGetDurableBefore(); NavigableMap<TxnId, Summary> map = new TreeMap<>(); for (TxnId txnId : possibleTxns) { diff --git a/src/java/org/apache/cassandra/service/accord/IAccordService.java b/src/java/org/apache/cassandra/service/accord/IAccordService.java index 1be920bc1b..e5e2d125f1 100644 --- a/src/java/org/apache/cassandra/service/accord/IAccordService.java +++ b/src/java/org/apache/cassandra/service/accord/IAccordService.java @@ -134,17 +134,17 @@ public interface IAccordService class CompactionInfo { - static final Supplier<CompactionInfo> NO_OP = () -> new CompactionInfo(new Int2ObjectHashMap<>(), new Int2ObjectHashMap<>(), DurableBefore.EMPTY); + static final Supplier<CompactionInfo> NO_OP = () -> new CompactionInfo(new Int2ObjectHashMap<>(), new Int2ObjectHashMap<>(), new Int2ObjectHashMap<>()); public final Int2ObjectHashMap<RedundantBefore> redundantBefores; + public final Int2ObjectHashMap<DurableBefore> durableBefores; public final Int2ObjectHashMap<RangesForEpoch> ranges; - public final DurableBefore durableBefore; - public CompactionInfo(Int2ObjectHashMap<RedundantBefore> redundantBefores, Int2ObjectHashMap<RangesForEpoch> ranges, DurableBefore durableBefore) + public CompactionInfo(Int2ObjectHashMap<RedundantBefore> redundantBefores, Int2ObjectHashMap<RangesForEpoch> ranges, Int2ObjectHashMap<DurableBefore> durableBefores) { this.redundantBefores = redundantBefores; this.ranges = ranges; - this.durableBefore = durableBefore; + this.durableBefores = durableBefores; } } diff --git a/src/java/org/apache/cassandra/service/accord/SavedCommand.java b/src/java/org/apache/cassandra/service/accord/SavedCommand.java index 1f0086b484..209208989f 100644 --- a/src/java/org/apache/cassandra/service/accord/SavedCommand.java +++ b/src/java/org/apache/cassandra/service/accord/SavedCommand.java @@ -51,6 +51,7 @@ import org.apache.cassandra.service.accord.serializers.WaitingOnSerializer; import org.apache.cassandra.utils.Throwables; import static accord.local.Cleanup.NO; +import static accord.local.Cleanup.TRUNCATE_WITH_OUTCOME; import static accord.primitives.Known.KnownDeps.DepsErased; import static accord.primitives.Known.KnownDeps.DepsUnknown; import static accord.primitives.Known.KnownDeps.NoDeps; @@ -73,9 +74,10 @@ public class SavedCommand PARTIAL_DEPS, WAITING_ON, WRITES, + CLEANUP ; - static final Fields[] FIELDS = values(); + public static final Fields[] FIELDS = values(); } // TODO: maybe rename this and enclosing classes? @@ -123,7 +125,6 @@ public class SavedCommand } } - public static ByteBuffer asSerializedDiff(Command after, int userVersion) throws IOException { try (DataOutputBuffer out = new DataOutputBuffer()) @@ -304,6 +305,7 @@ public class SavedCommand SavedCommand.WaitingOnProvider waitingOn; Writes writes; Result result; + Cleanup cleanup; boolean nextCalled; int count; @@ -385,14 +387,26 @@ public class SavedCommand public void clear() { flags = 0; + txnId = null; + executeAt = null; + executeAtLeast = null; saveStatus = null; durability = null; + + acceptedOrCommitted = null; promised = null; + participants = null; partialTxn = null; partialDeps = null; + + waitingOnBytes = null; + waitingOn = null; writes = null; + result = null; + cleanup = null; + nextCalled = false; count = 0; } @@ -428,17 +442,19 @@ public class SavedCommand return NO; if (saveStatus == null || participants == null) - return Cleanup.EXPUNGE_PARTIAL; + return Cleanup.NO; - return Cleanup.shouldCleanup(txnId, saveStatus, durability, participants, redundantBefore, durableBefore); + Cleanup cleanup = Cleanup.shouldCleanup(txnId, saveStatus, durability, participants, redundantBefore, durableBefore); + if (this.cleanup != null && this.cleanup.compareTo(cleanup) > 0) + cleanup = this.cleanup; + return cleanup; } // TODO (expected): avoid allocating new builder public Builder maybeCleanup(Cleanup cleanup) { - // Do not have txnId in selected SSTables; remove if (saveStatus() == null) - return null; + return this; switch (cleanup) { @@ -447,19 +463,23 @@ public class SavedCommand return null; case EXPUNGE_PARTIAL: - return expungePartial(); + return expungePartial(cleanup, saveStatus, true); + case VESTIGIAL: case INVALIDATE: + return saveStatusOnly(); + case TRUNCATE_WITH_OUTCOME: case TRUNCATE: - return saveStatusOnly(); + return expungePartial(cleanup, cleanup.appliesIfNot, cleanup == TRUNCATE_WITH_OUTCOME); + case NO: return this; default: throw new IllegalStateException("Unknown cleanup: " + cleanup);} } - public Builder expungePartial() + public Builder expungePartial(Cleanup cleanup, SaveStatus saveStatus, boolean includeOutcome) { Invariants.checkState(txnId != null); Builder builder = new Builder(txnId); @@ -467,12 +487,11 @@ public class SavedCommand builder.count++; builder.nextCalled = true; - // TODO: these accesses can be abstracted away - if (saveStatus != null) - { - builder.flags = setFieldChanged(Fields.SAVE_STATUS, builder.flags); - builder.saveStatus = saveStatus; - } + Invariants.checkState(saveStatus != null); + builder.flags = setFieldChanged(Fields.SAVE_STATUS, builder.flags); + builder.saveStatus = saveStatus; + builder.flags = setFieldChanged(Fields.CLEANUP, builder.flags); + builder.cleanup = cleanup; if (executeAt != null) { builder.flags = setFieldChanged(Fields.EXECUTE_AT, builder.flags); @@ -488,6 +507,11 @@ public class SavedCommand builder.flags = setFieldChanged(Fields.PARTICIPANTS, builder.flags); builder.participants = participants; } + if (includeOutcome && builder.writes != null) + { + builder.flags = setFieldChanged(Fields.WRITES, builder.flags); + builder.writes = writes; + } return builder; } @@ -554,6 +578,9 @@ public class SavedCommand if (getFieldChanged(Fields.WRITES, flags) && !getFieldIsNull(Fields.WRITES, flags)) CommandSerializers.writes.serialize(writes(), out, userVersion); + + if (getFieldChanged(Fields.CLEANUP, flags)) + out.writeByte(cleanup.ordinal()); } @@ -682,6 +709,13 @@ public class SavedCommand else writes = CommandSerializers.writes.deserialize(in, userVersion); } + + if (getFieldChanged(Fields.CLEANUP, flags)) + { + Cleanup newCleanup = Cleanup.forOrdinal(in.readByte()); + if (cleanup == null || newCleanup.compareTo(cleanup) > 0) + cleanup = newCleanup; + } } public void forceResult(Result newValue) diff --git a/src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java b/src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java index 62cda848a6..2c2867aa65 100644 --- a/src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java +++ b/src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java @@ -269,24 +269,23 @@ public abstract class AsyncOperation<R> extends AsyncChains.Head<R> implements R } } - commandStore.completeOperation(safeStore); - context.releaseResources(commandStore); - state(COMPLETING); + boolean flushed = false; if (diffs != null || safeStore.fieldUpdates() != null) { Runnable onFlush = () -> finish(result, null); if (safeStore.fieldUpdates() != null) - { - if (diffs != null) - appendCommands(diffs, null); - commandStore.persistFieldUpdates(safeStore.fieldUpdates(), onFlush); - } - else - { + commandStore.persistFieldUpdates(safeStore.fieldUpdates(), diffs == null ? onFlush : null); + if (diffs != null) appendCommands(diffs, onFlush); - } - return false; + flushed = true; } + + commandStore.completeOperation(safeStore); + context.releaseResources(commandStore); + state(COMPLETING); + if (flushed) + return false; + case COMPLETING: finish(result, null); case FINISHED: diff --git a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordBootstrapTest.java b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordBootstrapTest.java index 483f7e4f37..fa93afa2ae 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordBootstrapTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordBootstrapTest.java @@ -47,9 +47,9 @@ import org.apache.cassandra.distributed.test.TestBaseImpl; import org.apache.cassandra.schema.Schema; import org.apache.cassandra.schema.TableId; import org.apache.cassandra.service.StorageService; -import org.apache.cassandra.service.accord.AccordCommandStore; import org.apache.cassandra.service.accord.AccordConfigurationService; import org.apache.cassandra.service.accord.AccordConfigurationService.EpochSnapshot; +import org.apache.cassandra.service.accord.AccordSafeCommandStore; import org.apache.cassandra.service.accord.AccordService; import org.apache.cassandra.service.accord.api.PartitionKey; import org.apache.cassandra.streaming.StreamManager; @@ -271,9 +271,9 @@ public class AccordBootstrapTest extends TestBaseImpl }); awaitUninterruptiblyAndRethrow(service().node().commandStores().forEach(safeStore -> { - AccordCommandStore commandStore = (AccordCommandStore) safeStore.commandStore(); - Assert.assertEquals(Timestamp.NONE, getOnlyElement(commandStore.bootstrapBeganAt().keySet())); - Assert.assertEquals(Timestamp.NONE, getOnlyElement(commandStore.safeToRead().keySet())); + AccordSafeCommandStore ss = (AccordSafeCommandStore) safeStore; + Assert.assertEquals(Timestamp.NONE, getOnlyElement(ss.bootstrapBeganAt().keySet())); + Assert.assertEquals(Timestamp.NONE, getOnlyElement(ss.safeToReadAt().keySet())); // // Assert.assertTrue(commandStore.maxBootstrapEpoch() > 0); // Assert.assertTrue(commandStore.bootstrapBeganAt().isEmpty()); @@ -316,17 +316,17 @@ public class AccordBootstrapTest extends TestBaseImpl awaitUninterruptiblyAndRethrow(service().node().commandStores().forEach(safeStore -> { if (safeStore.ranges().currentRanges().contains(partitionKey)) { - AccordCommandStore commandStore = (AccordCommandStore) safeStore.commandStore(); - Assert.assertFalse(commandStore.bootstrapBeganAt().isEmpty()); - Assert.assertFalse(commandStore.safeToRead().isEmpty()); + AccordSafeCommandStore ss = (AccordSafeCommandStore) safeStore; + Assert.assertFalse(ss.bootstrapBeganAt().isEmpty()); + Assert.assertFalse(ss.safeToReadAt().isEmpty()); - Assert.assertEquals(1, commandStore.bootstrapBeganAt().entrySet().stream() + Assert.assertEquals(1, ss.bootstrapBeganAt().entrySet().stream() .filter(entry -> entry.getValue().contains(partitionKey)) .map(entry -> { Assert.assertTrue(entry.getKey().compareTo(Timestamp.NONE) > 0); return entry; }).count()); - Assert.assertEquals(1, commandStore.safeToRead().entrySet().stream() + Assert.assertEquals(1, ss.safeToReadAt().entrySet().stream() .filter(entry -> entry.getValue().contains(partitionKey)) .map(entry -> { Assert.assertTrue(entry.getKey().compareTo(Timestamp.NONE) > 0); @@ -458,17 +458,17 @@ public class AccordBootstrapTest extends TestBaseImpl safeStore -> { if (!safeStore.ranges().allAt(preMove).contains(partitionKey)) { - AccordCommandStore commandStore = (AccordCommandStore) safeStore.commandStore(); - Assert.assertFalse(commandStore.bootstrapBeganAt().isEmpty()); - Assert.assertFalse(commandStore.safeToRead().isEmpty()); + AccordSafeCommandStore ss = (AccordSafeCommandStore) safeStore; + Assert.assertFalse(ss.bootstrapBeganAt().isEmpty()); + Assert.assertFalse(ss.safeToReadAt().isEmpty()); - Assert.assertEquals(1, commandStore.bootstrapBeganAt().entrySet().stream() + Assert.assertEquals(1, ss.bootstrapBeganAt().entrySet().stream() .filter(entry -> entry.getValue().contains(partitionKey)) .map(entry -> { Assert.assertTrue(entry.getKey().compareTo(Timestamp.NONE) > 0); return entry; }).count()); - Assert.assertEquals(1, commandStore.safeToRead().entrySet().stream() + Assert.assertEquals(1, ss.safeToReadAt().entrySet().stream() .filter(entry -> entry.getValue().contains(partitionKey)) .map(entry -> { Assert.assertTrue(entry.getKey().compareTo(Timestamp.NONE) > 0); diff --git a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordLoadTest.java b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordLoadTest.java index 48f8903577..d9315cf2c7 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordLoadTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordLoadTest.java @@ -38,7 +38,6 @@ import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.cassandra.config.AccordSpec; import org.apache.cassandra.config.CassandraRelevantProperties; import org.apache.cassandra.distributed.api.ConsistencyLevel; import org.apache.cassandra.distributed.api.ICoordinator; @@ -85,7 +84,7 @@ public class AccordLoadTest extends AccordTestBase cluster.forEach(i -> i.runOnInstance(() -> { ((AccordService) AccordService.instance()).journal().compactor().updateCompactionPeriod(1, SECONDS); - ((AccordSpec.JournalSpec)((AccordService) AccordService.instance()).journal().configuration()).segmentSize = 128 << 10; +// ((AccordSpec.JournalSpec)((AccordService) AccordService.instance()).journal().configuration()).segmentSize = 128 << 10; })); ICoordinator coordinator = cluster.coordinator(1); @@ -162,14 +161,23 @@ public class AccordLoadTest extends AccordTestBase { nextCompactionAt += compactionInterval; System.out.println("compacting accord..."); - cluster.forEach(i -> i.nodetool("compact", "system_accord.journal")); + cluster.forEach(i -> { + i.nodetool("compact", "system_accord.journal"); + i.runOnInstance(() -> { + ((AccordService) AccordService.instance()).journal().checkAllCommands(); + }); + }); + } if ((nextFlushAt -= batchSize) <= 0) { nextFlushAt += flushInterval; System.out.println("flushing journal..."); - cluster.forEach(i -> i.runOnInstance(() -> ((AccordService) AccordService.instance()).journal().closeCurrentSegmentForTestingIfNonEmpty())); + cluster.forEach(i -> i.runOnInstance(() -> { + ((AccordService) AccordService.instance()).journal().closeCurrentSegmentForTestingIfNonEmpty(); + ((AccordService) AccordService.instance()).journal().checkAllCommands(); + })); } final Date date = new Date(); diff --git a/test/distributed/org/apache/cassandra/service/accord/AccordJournalCompactionTest.java b/test/distributed/org/apache/cassandra/service/accord/AccordJournalCompactionTest.java index da1b9cea10..6229e8148f 100644 --- a/test/distributed/org/apache/cassandra/service/accord/AccordJournalCompactionTest.java +++ b/test/distributed/org/apache/cassandra/service/accord/AccordJournalCompactionTest.java @@ -141,22 +141,22 @@ public class AccordJournalCompactionTest { timestamp = timestamp.next(); AccordSafeCommandStore.FieldUpdates updates = new AccordSafeCommandStore.FieldUpdates(); - updates.durableBefore = durableBeforeGen.next(rs); + updates.addDurableBefore = durableBeforeGen.next(rs); // TODO: improve redundant before generator and re-enable // updates.redundantBefore = redundantBeforeGen.next(rs); - updates.safeToRead = safeToReadGen.next(rs); - updates.rangesForEpoch = rangesForEpochGen.next(rs); - updates.historicalTransactions = historicalTransactionsGen.next(rs); + updates.newSafeToRead = safeToReadGen.next(rs); + updates.newRangesForEpoch = rangesForEpochGen.next(rs); + updates.addHistoricalTransactions = historicalTransactionsGen.next(rs); journal.persistStoreState(1, updates, null); - redundantBeforeAccumulator.update(updates.redundantBefore); - durableBeforeAccumulator.update(updates.durableBefore); - if (updates.bootstrapBeganAt != null) - bootstrapBeganAtAccumulator.update(updates.bootstrapBeganAt); - safeToReadAccumulator.update(updates.safeToRead); - rangesForEpochAccumulator.update(updates.rangesForEpoch); - historicalTransactionsAccumulator.update(updates.historicalTransactions); + redundantBeforeAccumulator.update(updates.addRedundantBefore); + durableBeforeAccumulator.update(updates.addDurableBefore); + if (updates.newBootstrapBeganAt != null) + bootstrapBeganAtAccumulator.update(updates.newBootstrapBeganAt); + safeToReadAccumulator.update(updates.newSafeToRead); + rangesForEpochAccumulator.update(updates.newRangesForEpoch); + historicalTransactionsAccumulator.update(updates.addHistoricalTransactions); if (i % 100 == 0) journal.closeCurrentSegmentForTestingIfNonEmpty(); diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionAccordIteratorsTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionAccordIteratorsTest.java index 57ef025061..a329be5864 100644 --- a/test/unit/org/apache/cassandra/db/compaction/CompactionAccordIteratorsTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/CompactionAccordIteratorsTest.java @@ -420,9 +420,12 @@ public class CompactionAccordIteratorsTest Int2ObjectHashMap<RedundantBefore> redundantBefores = new Int2ObjectHashMap<>(); if (redundantBefore != null) redundantBefores.put(commandStore.id(), redundantBefore); + Int2ObjectHashMap<DurableBefore> durableBefores = new Int2ObjectHashMap<>(); + if (durableBefore != null) + durableBefores.put(commandStore.id(), durableBefore); Int2ObjectHashMap<CommandStores.RangesForEpoch> rangesForEpochs = new Int2ObjectHashMap<>(); rangesForEpochs.put(commandStore.id(), commandStore.unsafeRangesForEpoch()); - when(mockAccordService.getCompactionInfo()).thenReturn(new IAccordService.CompactionInfo(redundantBefores, rangesForEpochs, durableBefore)); + when(mockAccordService.getCompactionInfo()).thenReturn(new IAccordService.CompactionInfo(redundantBefores, rangesForEpochs, durableBefores)); return mockAccordService; } diff --git a/test/unit/org/apache/cassandra/service/accord/AccordCommandStoreTest.java b/test/unit/org/apache/cassandra/service/accord/AccordCommandStoreTest.java index b6fca2e9cc..e41e4ca579 100644 --- a/test/unit/org/apache/cassandra/service/accord/AccordCommandStoreTest.java +++ b/test/unit/org/apache/cassandra/service/accord/AccordCommandStoreTest.java @@ -159,6 +159,7 @@ public class AccordCommandStoreTest { AtomicLong clock = new AtomicLong(0); AccordCommandStore commandStore = createAccordCommandStore(clock::incrementAndGet, "ks", "tbl"); +// SafeCommandStore safeStore = Timestamp maxTimestamp = timestamp(1, clock.incrementAndGet(), 1); PartialTxn txn = createPartialTxn(1); @@ -172,10 +173,10 @@ public class AccordCommandStoreTest AccordSafeTimestampsForKey tfk = new AccordSafeTimestampsForKey(loaded(key, null)); tfk.initialize(); - TimestampsForKeys.updateLastExecutionTimestamps(commandStore, tfk, txnId1, txnId1, true); + TimestampsForKeys.updateLastExecutionTimestamps(null, tfk, txnId1, txnId1, true); Assert.assertEquals(txnId1.hlc(), AccordSafeTimestampsForKey.timestampMicrosFor(tfk.current(), txnId1, true)); - TimestampsForKeys.updateLastExecutionTimestamps(commandStore, tfk, txnId2, txnId2, true); + TimestampsForKeys.updateLastExecutionTimestamps(null, tfk, txnId2, txnId2, true); Assert.assertEquals(txnId2.hlc(), AccordSafeTimestampsForKey.timestampMicrosFor(tfk.current(), txnId2, true)); Assert.assertEquals(txnId2, tfk.current().lastExecutedTimestamp()); diff --git a/test/unit/org/apache/cassandra/service/accord/MockJournal.java b/test/unit/org/apache/cassandra/service/accord/MockJournal.java index 24b911a3ff..dd7377ab58 100644 --- a/test/unit/org/apache/cassandra/service/accord/MockJournal.java +++ b/test/unit/org/apache/cassandra/service/accord/MockJournal.java @@ -137,18 +137,18 @@ public class MockJournal implements IJournal public void persistStoreState(int store, AccordSafeCommandStore.FieldUpdates fieldUpdates, Runnable onFlush) { FieldUpdates updates = fieldUpdates(store); - if (fieldUpdates.redundantBefore != null) - updates.redundantBeforeAccumulator.update(fieldUpdates.redundantBefore); - if (fieldUpdates.durableBefore != null) - updates.durableBeforeAccumulator.update(fieldUpdates.durableBefore); - if (fieldUpdates.bootstrapBeganAt != null) - updates.bootstrapBeganAtAccumulator.update(fieldUpdates.bootstrapBeganAt); - if (fieldUpdates.safeToRead != null) - updates.safeToReadAccumulator.update(fieldUpdates.safeToRead); - if (fieldUpdates.rangesForEpoch != null) - updates.rangesForEpochAccumulator.update(fieldUpdates.rangesForEpoch); - if (fieldUpdates.historicalTransactions != null) - updates.historicalTransactionsAccumulator.update(fieldUpdates.historicalTransactions); + if (fieldUpdates.addRedundantBefore != null) + updates.redundantBeforeAccumulator.update(fieldUpdates.addRedundantBefore); + if (fieldUpdates.addDurableBefore != null) + updates.durableBeforeAccumulator.update(fieldUpdates.addDurableBefore); + if (fieldUpdates.newBootstrapBeganAt != null) + updates.bootstrapBeganAtAccumulator.update(fieldUpdates.newBootstrapBeganAt); + if (fieldUpdates.newSafeToRead != null) + updates.safeToReadAccumulator.update(fieldUpdates.newSafeToRead); + if (fieldUpdates.newRangesForEpoch != null) + updates.rangesForEpochAccumulator.update(fieldUpdates.newRangesForEpoch); + if (fieldUpdates.addHistoricalTransactions != null) + updates.historicalTransactionsAccumulator.update(fieldUpdates.addHistoricalTransactions); onFlush.run(); } diff --git a/test/unit/org/apache/cassandra/service/accord/SavedCommandTest.java b/test/unit/org/apache/cassandra/service/accord/SavedCommandTest.java index 5627029644..99963c3af0 100644 --- a/test/unit/org/apache/cassandra/service/accord/SavedCommandTest.java +++ b/test/unit/org/apache/cassandra/service/accord/SavedCommandTest.java @@ -132,6 +132,7 @@ public class SavedCommandTest SoftAssertions checks = new SoftAssertions(); for (Fields field : missing) { + if (field == Fields.CLEANUP) continue; checks.assertThat(SavedCommand.getFieldChanged(field, flags)) .describedAs("field %s changed", field) .isFalse(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org