This is an automated email from the ASF dual-hosted git repository. ifesdjeen pushed a commit to branch cep-15-accord in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cep-15-accord by this push: new 8aeab43d47 Migrate in memory journal to CommandChange logic shared with AccordJournal 8aeab43d47 is described below commit 8aeab43d47274b9ff6b00eaf24ea587f92449b54 Author: Alex Petrov <oleksandr.pet...@gmail.com> AuthorDate: Tue Nov 26 15:26:54 2024 +0100 Migrate in memory journal to CommandChange logic shared with AccordJournal Patch by Alex Petrov; reviewed by Benedict Elliott Smith for CASSANDRA-20115 --- modules/accord | 2 +- .../db/compaction/CompactionIterator.java | 6 +- .../cassandra/service/accord/AccordCache.java | 30 +- .../cassandra/service/accord/AccordCacheEntry.java | 8 +- .../service/accord/AccordCommandStore.java | 14 +- .../cassandra/service/accord/AccordJournal.java | 400 +++++++-- .../accord/AccordJournalValueSerializers.java | 24 +- .../cassandra/service/accord/AccordTask.java | 2 +- .../service/accord/CommandsForRanges.java | 4 +- .../apache/cassandra/service/accord/IJournal.java | 40 - .../cassandra/service/accord/SavedCommand.java | 980 --------------------- .../distributed/test/accord/AccordLoadTest.java | 4 - .../service/accord/AccordJournalBurnTest.java | 4 +- .../service/accord/AccordJournalOrderTest.java | 2 +- ...avedCommandTest.java => CommandChangeTest.java} | 25 +- .../accord/SimulatedAccordCommandStore.java | 21 +- 16 files changed, 396 insertions(+), 1170 deletions(-) diff --git a/modules/accord b/modules/accord index 520cc1072d..f7b9bb8887 160000 --- a/modules/accord +++ b/modules/accord @@ -1 +1 @@ -Subproject commit 520cc1072d44a5f7617566b6667e915532b89033 +Subproject commit f7b9bb8887ed672185f269ebcbc9d11e6aeafca9 diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java index e11d154829..90b583235c 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java @@ -92,6 +92,7 @@ import org.apache.cassandra.schema.Schema; import org.apache.cassandra.schema.SchemaConstants; import org.apache.cassandra.schema.TableId; import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.service.accord.AccordJournal; import org.apache.cassandra.service.accord.AccordJournalValueSerializers; import org.apache.cassandra.service.accord.AccordJournalValueSerializers.FlyweightSerializer; import org.apache.cassandra.service.accord.AccordKeyspace; @@ -101,7 +102,6 @@ import org.apache.cassandra.service.accord.AccordKeyspace.CommandsForKeyAccessor import org.apache.cassandra.service.accord.AccordService; import org.apache.cassandra.service.accord.IAccordService; import org.apache.cassandra.service.accord.JournalKey; -import org.apache.cassandra.service.accord.SavedCommand; import org.apache.cassandra.service.accord.api.AccordAgent; import org.apache.cassandra.service.accord.api.AccordRoutingKey.TokenKey; import org.apache.cassandra.service.paxos.PaxosRepairHistory; @@ -1016,7 +1016,7 @@ public class CompactionIterator extends CompactionInfo.Holder implements Unfilte return newVersion.build().unfilteredIterator(); } - SavedCommand.Builder commandBuilder = (SavedCommand.Builder) builder; + AccordJournal.Builder commandBuilder = (AccordJournal.Builder) builder; if (commandBuilder.isEmpty()) { Invariants.checkState(rows.isEmpty()); @@ -1038,7 +1038,7 @@ public class CompactionIterator extends CompactionInfo.Holder implements Unfilte PartitionUpdate.SimpleBuilder newVersion = PartitionUpdate.simpleBuilder(AccordKeyspace.Journal, partition.partitionKey()); Row.SimpleBuilder rowBuilder = newVersion.row(firstClustering); - rowBuilder.add("record", commandBuilder.asByteBuffer(userVersion)) + rowBuilder.add("record", commandBuilder.asByteBuffer(redundantBefore, userVersion)) .add("user_version", userVersion); return newVersion.build().unfilteredIterator(); diff --git a/src/java/org/apache/cassandra/service/accord/AccordCache.java b/src/java/org/apache/cassandra/service/accord/AccordCache.java index 180b827531..cbdba5224f 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordCache.java +++ b/src/java/org/apache/cassandra/service/accord/AccordCache.java @@ -104,7 +104,7 @@ public class AccordCache implements CacheSize @Nullable V quickShrink(V value); // a result of null means we cannot shrink, and should save/evict as appropriate @Nullable Object fullShrink(K key, V value); - @Nullable V inflate(K key, Object shrunk); + @Nullable V inflate(AccordCommandStore commandStore, K key, Object shrunk); long estimateHeapSize(V value); long estimateShrunkHeapSize(Object shrunk); boolean validate(AccordCommandStore commandStore, K key, V value); @@ -359,7 +359,7 @@ public class AccordCache implements CacheSize ToLongFunction<V> heapEstimator, Function<AccordCacheEntry<K, V>, S> safeRefFactory) { - return newType(keyClass, loadFunction, saveFunction, quickShrink, (i, j) -> j, (i, j) -> (V)j, validateFunction, heapEstimator, i -> 0, safeRefFactory); + return newType(keyClass, loadFunction, saveFunction, quickShrink, (i, j) -> j, (c, i, j) -> (V)j, validateFunction, heapEstimator, i -> 0, safeRefFactory); } public <K, V, S extends AccordSafeState<K, V>> Type<K, V, S> newType( @@ -368,7 +368,7 @@ public class AccordCache implements CacheSize QuadFunction<AccordCommandStore, K, V, Object, Runnable> saveFunction, Function<V, V> quickShrink, BiFunction<K, V, Object> fullShrink, - BiFunction<K, Object, V> inflate, + TriFunction<AccordCommandStore, K, Object, V> inflate, TriFunction<AccordCommandStore, K, V, Boolean> validateFunction, ToLongFunction<V> heapEstimator, ToLongFunction<Object> shrunkHeapEstimator, @@ -583,7 +583,7 @@ public class AccordCache implements CacheSize { Object shrunk = state.tryGetShrunk(); if (shrunk != null) - evicted = adapter.inflate(key, shrunk); + evicted = adapter.inflate(commandStore, key, shrunk); } catch (RuntimeException rte) { @@ -971,7 +971,7 @@ public class AccordCache implements CacheSize final QuadFunction<AccordCommandStore, K, V, Object, Runnable> save; final Function<V, V> quickShrink; final BiFunction<K, V, Object> shrink; - final BiFunction<K, Object, V> inflate; + final TriFunction<AccordCommandStore, K, Object, V> inflate; final TriFunction<AccordCommandStore, K, V, Boolean> validate; final ToLongFunction<V> estimateHeapSize; final ToLongFunction<Object> estimateShrunkHeapSize; @@ -981,7 +981,7 @@ public class AccordCache implements CacheSize FunctionalAdapter(BiFunction<AccordCommandStore, K, V> load, QuadFunction<AccordCommandStore, K, V, Object, Runnable> save, Function<V, V> quickShrink, BiFunction<K, V, Object> shrink, - BiFunction<K, Object, V> inflate, + TriFunction<AccordCommandStore, K, Object, V> inflate, TriFunction<AccordCommandStore, K, V, Boolean> validate, ToLongFunction<V> estimateHeapSize, ToLongFunction<Object> estimateShrunkHeapSize, @@ -1030,9 +1030,9 @@ public class AccordCache implements CacheSize } @Override - public V inflate(K key, Object shrunk) + public V inflate(AccordCommandStore commandStore, K key, Object shrunk) { - return inflate.apply(key, shrunk); + return inflate.apply(commandStore, key, shrunk); } @Override @@ -1096,7 +1096,7 @@ public class AccordCache implements CacheSize @Override public Runnable save(AccordCommandStore commandStore, K key, @Nullable V value, @Nullable Object shrunk) { return null; } @Override public V quickShrink(V value) { return null; } @Override public Object fullShrink(K key, V value) { return null; } - @Override public V inflate(K key, Object shrunk) { return null; } + @Override public V inflate(AccordCommandStore commandStore, K key, Object shrunk) { return null; } @Override public long estimateHeapSize(V value) { return 0; } @Override public long estimateShrunkHeapSize(Object shrunk) { return 0; } @Override public boolean validate(AccordCommandStore commandStore, K key, V value) { return false; } @@ -1136,7 +1136,7 @@ public class AccordCache implements CacheSize } @Override - public CommandsForKey inflate(RoutingKey key, Object shrunk) + public CommandsForKey inflate(AccordCommandStore commandStore, RoutingKey key, Object shrunk) { return Serialize.fromBytes(key, (ByteBuffer)shrunk); } @@ -1186,7 +1186,7 @@ public class AccordCache implements CacheSize if (value == null) { - value = inflate(txnId, serialized); + value = inflate(commandStore, txnId, serialized); if (value == null) return null; } @@ -1212,7 +1212,7 @@ public class AccordCache implements CacheSize try { - return SavedCommand.asSerializedDiff(null, value, current_version); + return AccordJournal.asSerializedChange(null, value, current_version); } catch (IOException e) { @@ -1222,15 +1222,15 @@ public class AccordCache implements CacheSize } @Override - public @Nullable Command inflate(TxnId key, Object serialized) + public @Nullable Command inflate(AccordCommandStore commandStore, TxnId key, Object serialized) { - SavedCommand.Builder builder = new SavedCommand.Builder(key); + AccordJournal.Builder builder = new AccordJournal.Builder(key); ByteBuffer buffer = (ByteBuffer) serialized; buffer.mark(); try (DataInputBuffer buf = new DataInputBuffer(buffer, false)) { builder.deserializeNext(buf, current_version); - return builder.construct(); + return builder.construct(commandStore.unsafeGetRedundantBefore()); } catch (UnknownTableException e) { diff --git a/src/java/org/apache/cassandra/service/accord/AccordCacheEntry.java b/src/java/org/apache/cassandra/service/accord/AccordCacheEntry.java index 4bfa4be308..4e152e1650 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordCacheEntry.java +++ b/src/java/org/apache/cassandra/service/accord/AccordCacheEntry.java @@ -368,7 +368,7 @@ public class AccordCacheEntry<K, V> extends IntrusiveLinkedListNode if (isShrunk()) { AccordCache.Type<K, V, ?> parent = owner.parent(); - inflate(key, parent.adapter()); + inflate(owner.commandStore, key, parent.adapter()); updateSize(parent); } @@ -538,17 +538,17 @@ public class AccordCacheEntry<K, V> extends IntrusiveLinkedListNode return true; } - private void inflate(K key, Adapter<K, V, ?> adapter) + private void inflate(AccordCommandStore commandStore, K key, Adapter<K, V, ?> adapter) { Invariants.checkState(isShrunk()); if (isNested()) { Nested nested = (Nested) state; - nested.state = adapter.inflate(key, nested.state); + nested.state = adapter.inflate(commandStore, key, nested.state); } else { - state = adapter.inflate(key, state); + state = adapter.inflate(commandStore, key, state); } status &= ~SHRUNK; } diff --git a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java index 35e8dca12a..81ce3ac8a4 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java +++ b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java @@ -35,6 +35,7 @@ import com.google.common.annotations.VisibleForTesting; import accord.api.Agent; import accord.api.DataStore; +import accord.api.Journal; import accord.api.LocalListeners; import accord.api.ProgressLog; import accord.api.RoutingKey; @@ -63,7 +64,6 @@ import accord.utils.Invariants; import accord.utils.async.AsyncChain; import accord.utils.async.AsyncChains; import org.apache.cassandra.db.Mutation; -import org.apache.cassandra.service.accord.SavedCommand.MinimalCommand; import org.apache.cassandra.service.accord.api.AccordRoutingKey.TokenKey; import org.apache.cassandra.service.accord.txn.TxnRead; import org.apache.cassandra.utils.Clock; @@ -71,12 +71,12 @@ import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException; import static accord.api.Journal.CommandUpdate; import static accord.api.Journal.FieldUpdates; +import static accord.api.Journal.Load.MINIMAL; import static accord.api.Journal.Loader; import static accord.api.Journal.OnDone; import static accord.local.KeyHistory.SYNC; import static accord.primitives.Status.Committed; import static accord.utils.Invariants.checkState; -import static org.apache.cassandra.service.accord.SavedCommand.Load.MINIMAL; public class AccordCommandStore extends CommandStore { @@ -141,7 +141,7 @@ public class AccordCommandStore extends CommandStore } public final String loggingId; - private final IJournal journal; + private final Journal journal; private final AccordExecutor executor; private final Executor taskExecutor; private final ExclusiveCaches caches; @@ -160,7 +160,7 @@ public class AccordCommandStore extends CommandStore ProgressLog.Factory progressLogFactory, LocalListeners.Factory listenerFactory, EpochUpdateHolder epochUpdateHolder, - IJournal journal, + Journal journal, AccordExecutor executor) { super(id, node, agent, dataStore, progressLogFactory, listenerFactory, epochUpdateHolder); @@ -300,9 +300,9 @@ public class AccordCommandStore extends CommandStore } @VisibleForTesting - public void sanityCheckCommand(Command command) + public void sanityCheckCommand(RedundantBefore redundantBefore, Command command) { - ((AccordJournal) journal).sanityCheck(id, command); + ((AccordJournal) journal).sanityCheck(id, redundantBefore, command); } CommandsForKey loadCommandsForKey(RoutableKey key) @@ -487,7 +487,7 @@ public class AccordCommandStore extends CommandStore return command; } - public MinimalCommand loadMinimal(TxnId txnId) + public Command.Minimal loadMinimal(TxnId txnId) { return journal.loadMinimal(id, txnId, MINIMAL, unsafeGetRedundantBefore(), durableBefore()); } diff --git a/src/java/org/apache/cassandra/service/accord/AccordJournal.java b/src/java/org/apache/cassandra/service/accord/AccordJournal.java index 6e2e57fc40..a17c3528f5 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordJournal.java +++ b/src/java/org/apache/cassandra/service/accord/AccordJournal.java @@ -18,19 +18,20 @@ package org.apache.cassandra.service.accord; import java.io.IOException; -import java.util.ArrayList; +import java.nio.ByteBuffer; import java.util.Collections; -import java.util.List; import java.util.NavigableMap; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.function.BiConsumer; +import javax.annotation.Nullable; import com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import accord.impl.CommandChange; import accord.impl.ErasedSafeCommand; import accord.local.Cleanup; import accord.local.Command; @@ -54,6 +55,7 @@ import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.io.util.DataInputBuffer; import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputBuffer; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.io.util.File; import org.apache.cassandra.journal.Compactor; @@ -65,14 +67,30 @@ import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.service.accord.AccordJournalValueSerializers.IdentityAccumulator; import org.apache.cassandra.service.accord.JournalKey.JournalKeySupport; import org.apache.cassandra.service.accord.api.AccordAgent; +import org.apache.cassandra.service.accord.serializers.CommandSerializers; +import org.apache.cassandra.service.accord.serializers.DepsSerializers; +import org.apache.cassandra.service.accord.serializers.ResultSerializers; +import org.apache.cassandra.service.accord.serializers.WaitingOnSerializer; import org.apache.cassandra.utils.ExecutorUtils; +import org.apache.cassandra.utils.Throwables; import org.apache.cassandra.utils.concurrent.AsyncPromise; +import static accord.impl.CommandChange.anyFieldChanged; +import static accord.impl.CommandChange.getFieldChanged; +import static accord.impl.CommandChange.getFieldIsNull; +import static accord.impl.CommandChange.getFlags; +import static accord.impl.CommandChange.getWaitingOn; +import static accord.impl.CommandChange.nextSetField; +import static accord.impl.CommandChange.setFieldChanged; +import static accord.impl.CommandChange.setFieldIsNull; +import static accord.impl.CommandChange.toIterableSetFields; +import static accord.impl.CommandChange.unsetIterableFields; +import static accord.impl.CommandChange.validateFlags; import static accord.primitives.SaveStatus.ErasedOrVestigial; import static accord.primitives.Status.Truncated; import static org.apache.cassandra.service.accord.AccordJournalValueSerializers.DurableBeforeAccumulator; -public class AccordJournal implements IJournal, Shutdownable +public class AccordJournal implements accord.api.Journal, Shutdownable { static { @@ -188,7 +206,7 @@ public class AccordJournal implements IJournal, Shutdownable @Override public Command loadCommand(int commandStoreId, TxnId txnId, RedundantBefore redundantBefore, DurableBefore durableBefore) { - SavedCommand.Builder builder = loadDiffs(commandStoreId, txnId); + Builder builder = load(commandStoreId, txnId); Cleanup cleanup = builder.shouldCleanup(agent, redundantBefore, durableBefore); switch (cleanup) { @@ -197,14 +215,14 @@ public class AccordJournal implements IJournal, Shutdownable case ERASE: return ErasedSafeCommand.erased(txnId, ErasedOrVestigial); } - return builder.construct(); + return builder.construct(redundantBefore); } @Override - public SavedCommand.MinimalCommand loadMinimal(int commandStoreId, TxnId txnId, SavedCommand.Load load, RedundantBefore redundantBefore, DurableBefore durableBefore) + public Command.Minimal loadMinimal(int commandStoreId, TxnId txnId, Load load, RedundantBefore redundantBefore, DurableBefore durableBefore) { - SavedCommand.Builder builder = loadDiffs(commandStoreId, txnId, load); - if (!builder.nextCalled) + Builder builder = loadDiffs(commandStoreId, txnId, load); + if (builder.isEmpty()) return null; Cleanup cleanup = builder.shouldCleanup(node.agent(), redundantBefore, durableBefore); @@ -215,11 +233,11 @@ public class AccordJournal implements IJournal, Shutdownable case ERASE: return null; } - Invariants.checkState(builder.saveStatus != null, "No saveSatus loaded, but next was called and cleanup was not: %s", builder); + Invariants.checkState(builder.saveStatus() != null, "No saveSatus loaded, but next was called and cleanup was not: %s", builder); return builder.asMinimal(); } - @VisibleForTesting + @Override public RedundantBefore loadRedundantBefore(int store) { IdentityAccumulator<RedundantBefore> accumulator = readAll(new JournalKey(TxnId.NONE, JournalKey.Type.REDUNDANT_BEFORE, store)); @@ -250,8 +268,8 @@ public class AccordJournal implements IJournal, Shutdownable @Override public void saveCommand(int store, CommandUpdate update, Runnable onFlush) { - SavedCommand.Writer diff = SavedCommand.diff(update.before, update.after); - if (diff == null || status == Status.REPLAY) + Writer diff = Writer.make(update.before, update.after); + if (diff == null) { if (onFlush != null) onFlush.run(); @@ -272,9 +290,6 @@ public class AccordJournal implements IJournal, Shutdownable @Override public AsyncResult<?> persist(DurableBefore addDurableBefore, DurableBefore newDurableBefore) { - if (status == Status.REPLAY) - return AsyncResults.success(null); - AsyncResult.Settable<Void> result = AsyncResults.settable(); JournalKey key = new JournalKey(TxnId.NONE, JournalKey.Type.DURABLE_BEFORE, 0); RecordPointer pointer = appendInternal(key, addDurableBefore); @@ -315,18 +330,18 @@ public class AccordJournal implements IJournal, Shutdownable onFlush.run(); } - @VisibleForTesting - public SavedCommand.Builder loadDiffs(int commandStoreId, TxnId txnId, SavedCommand.Load load) + private Builder loadDiffs(int commandStoreId, TxnId txnId, Load load) { JournalKey key = new JournalKey(txnId, JournalKey.Type.COMMAND_DIFF, commandStoreId); - SavedCommand.Builder builder = new SavedCommand.Builder(txnId, load); + Builder builder = new Builder(txnId, load); journalTable.readAll(key, builder::deserializeNext); return builder; } - public SavedCommand.Builder loadDiffs(int commandStoreId, TxnId txnId) + @VisibleForTesting + public Builder load(int commandStoreId, TxnId txnId) { - return loadDiffs(commandStoreId, txnId, SavedCommand.Load.ALL); + return loadDiffs(commandStoreId, txnId, Load.ALL); } private <BUILDER> BUILDER readAll(JournalKey key) @@ -351,17 +366,17 @@ public class AccordJournal implements IJournal, Shutdownable journal.closeCurrentSegmentForTestingIfNonEmpty(); } - public void sanityCheck(int commandStoreId, Command orig) + public void sanityCheck(int commandStoreId, RedundantBefore redundantBefore, Command orig) { - SavedCommand.Builder diffs = loadDiffs(commandStoreId, orig.txnId()); - diffs.forceResult(orig.result()); + Builder builder = load(commandStoreId, orig.txnId()); + builder.forceResult(orig.result()); // We can only use strict equality if we supply result. - Command reconstructed = diffs.construct(); + Command reconstructed = builder.construct(redundantBefore); Invariants.checkState(orig.equals(reconstructed), '\n' + "Original: %s\n" + "Reconstructed: %s\n" + - "Diffs: %s", orig, reconstructed, diffs); + "Diffs: %s", orig, reconstructed, builder); } @VisibleForTesting @@ -391,7 +406,7 @@ public class AccordJournal implements IJournal, Shutdownable try (AccordJournalTable.KeyOrderIterator<JournalKey> iter = journalTable.readAll()) { JournalKey key; - SavedCommand.Builder builder = new SavedCommand.Builder(); + Builder builder = new Builder(); while ((key = iter.key()) != null) { @@ -417,12 +432,12 @@ public class AccordJournal implements IJournal, Shutdownable } }); - if (builder.nextCalled) + if (!builder.isEmpty()) { - Command command = builder.construct(); + CommandStore commandStore = commandStores.forId(key.commandStoreId); + Command command = builder.construct(commandStore.unsafeGetRedundantBefore()); Invariants.checkState(command.saveStatus() != SaveStatus.Uninitialised, "Found uninitialized command in the log: %s %s", command.toString(), builder.toString()); - CommandStore commandStore = commandStores.forId(key.commandStoreId); Loader loader = commandStore.loader(); async(loader::load, command).get(); if (command.saveStatus().compareTo(SaveStatus.Stable) >= 0 && !command.hasBeen(Truncated)) @@ -454,65 +469,306 @@ public class AccordJournal implements IJournal, Shutdownable return future; } - // TODO: this is here temporarily; for debugging purposes + public static @Nullable ByteBuffer asSerializedChange(Command before, Command after, int userVersion) throws IOException + { + try (DataOutputBuffer out = new DataOutputBuffer()) + { + Writer writer = Writer.make(before, after); + if (writer == null) + return null; + + writer.write(out, userVersion); + return out.asNewBuffer(); + } + } + @VisibleForTesting - public void checkAllCommands() + public void unsafeSetStarted() { - try (AccordJournalTable.KeyOrderIterator<JournalKey> iter = journalTable.readAll()) + status = Status.STARTED; + } + + public static class Writer implements Journal.Writer + { + private final Command after; + private final int flags; + + private Writer(Command after, int flags) { - IAccordService.CompactionInfo compactionInfo = AccordService.instance().getCompactionInfo(); - JournalKey key; - SavedCommand.Builder builder = new SavedCommand.Builder(); - while ((key = iter.key()) != null) + this.after = after; + this.flags = flags; + } + + public static Writer make(Command before, Command after) + { + if (before == after + || after == null + || after.saveStatus() == SaveStatus.Uninitialised) + return null; + + int flags = validateFlags(getFlags(before, after)); + if (!anyFieldChanged(flags)) + return null; + + return new Writer(after, flags); + } + + @Override + public void write(DataOutputPlus out, int userVersion) throws IOException + { + serialize(after, flags, out, userVersion); + } + + private static void serialize(Command command, int flags, DataOutputPlus out, int userVersion) throws IOException + { + Invariants.checkState(flags != 0); + out.writeInt(flags); + + int iterable = toIterableSetFields(flags); + while (iterable != 0) { - builder.reset(key.id); - if (key.type != JournalKey.Type.COMMAND_DIFF) + CommandChange.Fields field = nextSetField(iterable); + if (getFieldIsNull(field, flags)) { - // TODO (required): add "skip" for the key to avoid getting stuck - iter.readAllForKey(key, (segment, position, key1, buffer, hosts, userVersion) -> {}); + iterable = unsetIterableFields(field, iterable); continue; } - JournalKey finalKey = key; - List<RecordPointer> pointers = new ArrayList<>(); - try + switch (field) + { + case EXECUTE_AT: + CommandSerializers.timestamp.serialize(command.executeAt(), out, userVersion); + break; + case EXECUTES_AT_LEAST: + CommandSerializers.timestamp.serialize(command.executesAtLeast(), out, userVersion); + break; + case SAVE_STATUS: + out.writeShort(command.saveStatus().ordinal()); + break; + case DURABILITY: + out.writeByte(command.durability().ordinal()); + break; + case ACCEPTED: + CommandSerializers.ballot.serialize(command.acceptedOrCommitted(), out, userVersion); + break; + case PROMISED: + CommandSerializers.ballot.serialize(command.promised(), out, userVersion); + break; + case PARTICIPANTS: + CommandSerializers.participants.serialize(command.participants(), out, userVersion); + break; + case PARTIAL_TXN: + CommandSerializers.partialTxn.serialize(command.partialTxn(), out, userVersion); + break; + case PARTIAL_DEPS: + DepsSerializers.partialDeps.serialize(command.partialDeps(), out, userVersion); + break; + case WAITING_ON: + Command.WaitingOn waitingOn = getWaitingOn(command); + long size = WaitingOnSerializer.serializedSize(command.txnId(), waitingOn); + ByteBuffer serialized = WaitingOnSerializer.serialize(command.txnId(), waitingOn); + Invariants.checkState(serialized.remaining() == size); + out.writeInt((int) size); + out.write(serialized); + break; + case WRITES: + CommandSerializers.writes.serialize(command.writes(), out, userVersion); + break; + case RESULT: + ResultSerializers.result.serialize(command.result(), out, userVersion); + break; + case CLEANUP: + throw new IllegalStateException(); + } + + iterable = unsetIterableFields(field, iterable); + } + } + } + + public static class Builder extends CommandChange.Builder + { + public Builder() + { + super(null, Load.ALL); + } + + public Builder(TxnId txnId) + { + super(txnId, Load.ALL); + } + + public Builder(TxnId txnId, Load load) + { + super(txnId, load); + } + public ByteBuffer asByteBuffer(RedundantBefore redundantBefore, int userVersion) throws IOException + { + try (DataOutputBuffer out = new DataOutputBuffer()) + { + serialize(out, redundantBefore, userVersion); + return out.asNewBuffer(); + } + } + + public Builder maybeCleanup(Cleanup cleanup) + { + super.maybeCleanup(cleanup); + return this; + } + + public void serialize(DataOutputPlus out, RedundantBefore redundantBefore, int userVersion) throws IOException + { + Invariants.checkState(mask == 0); + Invariants.checkState(flags != 0); + + int flags = validateFlags(this.flags); + Writer.serialize(construct(redundantBefore), flags, out, userVersion); + } + + public void deserializeNext(DataInputPlus in, int userVersion) throws IOException + { + Invariants.checkState(txnId != null); + int flags = in.readInt(); + Invariants.checkState(flags != 0); + nextCalled = true; + count++; + + int iterable = toIterableSetFields(flags); + while (iterable != 0) + { + CommandChange.Fields field = nextSetField(iterable); + if (getFieldChanged(field, this.flags) || getFieldIsNull(field, mask)) + { + if (!getFieldIsNull(field, flags)) + skip(field, in, userVersion); + + iterable = unsetIterableFields(field, iterable); + continue; + } + this.flags = setFieldChanged(field, this.flags); + + if (getFieldIsNull(field, flags)) + { + this.flags = setFieldIsNull(field, this.flags); + } + else { - iter.readAllForKey(key, (segment, position, local, buffer, hosts, userVersion) -> { - pointers.add(new RecordPointer(segment, position)); - Invariants.checkState(finalKey.equals(local)); - try (DataInputBuffer in = new DataInputBuffer(buffer, false)) + deserialize(field, in, userVersion); + } + + iterable = unsetIterableFields(field, iterable); + } + } + + private void deserialize(CommandChange.Fields field, DataInputPlus in, int userVersion) throws IOException + { + switch (field) + { + case EXECUTE_AT: + executeAt = CommandSerializers.timestamp.deserialize(in, userVersion); + break; + case EXECUTES_AT_LEAST: + executeAtLeast = CommandSerializers.timestamp.deserialize(in, userVersion); + break; + case SAVE_STATUS: + saveStatus = SaveStatus.values()[in.readShort()]; + break; + case DURABILITY: + durability = accord.primitives.Status.Durability.values()[in.readByte()]; + break; + case ACCEPTED: + acceptedOrCommitted = CommandSerializers.ballot.deserialize(in, userVersion); + break; + case PROMISED: + promised = CommandSerializers.ballot.deserialize(in, userVersion); + break; + case PARTICIPANTS: + participants = CommandSerializers.participants.deserialize(in, userVersion); + break; + case PARTIAL_TXN: + partialTxn = CommandSerializers.partialTxn.deserialize(in, userVersion); + break; + case PARTIAL_DEPS: + partialDeps = DepsSerializers.partialDeps.deserialize(in, userVersion); + break; + case WAITING_ON: + int size = in.readInt(); + + byte[] waitingOnBytes = new byte[size]; + in.readFully(waitingOnBytes); + ByteBuffer buffer = ByteBuffer.wrap(waitingOnBytes); + waitingOn = (localTxnId, deps) -> { + try { - builder.deserializeNext(in, userVersion); + Invariants.nonNull(deps); + return WaitingOnSerializer.deserialize(localTxnId, deps.keyDeps.keys(), deps.rangeDeps, deps.directKeyDeps, buffer); } catch (IOException e) { - // can only throw if serializer is buggy - throw new RuntimeException(e); + throw Throwables.unchecked(e); } - }); - - Cleanup cleanup = builder.shouldCleanup(node.agent(), compactionInfo.redundantBefores.get(key.commandStoreId), compactionInfo.durableBefores.get(key.commandStoreId)); - switch (cleanup) - { - case ERASE: - case EXPUNGE: - case EXPUNGE_PARTIAL: - case VESTIGIAL: - continue; - } - builder.construct(); - } - catch (Throwable t) - { - throw new RuntimeException(String.format("Caught an exception after iterating over: %s", pointers), - t); - } + }; + break; + case WRITES: + writes = CommandSerializers.writes.deserialize(in, userVersion); + break; + case CLEANUP: + Cleanup newCleanup = Cleanup.forOrdinal(in.readByte()); + if (cleanup == null || newCleanup.compareTo(cleanup) > 0) + cleanup = newCleanup; + break; + case RESULT: + result = ResultSerializers.result.deserialize(in, userVersion); + break; } } - } - public void unsafeSetStarted() - { - status = Status.STARTED; + private void skip(CommandChange.Fields field, DataInputPlus in, int userVersion) throws IOException + { + switch (field) + { + case EXECUTE_AT: + case EXECUTES_AT_LEAST: + CommandSerializers.timestamp.skip(in, userVersion); + break; + case SAVE_STATUS: + in.readShort(); + break; + case DURABILITY: + in.readByte(); + break; + case ACCEPTED: + case PROMISED: + CommandSerializers.ballot.skip(in, userVersion); + break; + case PARTICIPANTS: + // TODO (expected): skip + CommandSerializers.participants.deserialize(in, userVersion); + break; + case PARTIAL_TXN: + CommandSerializers.partialTxn.deserialize(in, userVersion); + break; + case PARTIAL_DEPS: + // TODO (expected): skip + DepsSerializers.partialDeps.deserialize(in, userVersion); + break; + case WAITING_ON: + int size = in.readInt(); + in.skipBytesFully(size); + break; + case WRITES: + // TODO (expected): skip + CommandSerializers.writes.deserialize(in, userVersion); + break; + case CLEANUP: + in.readByte(); + break; + case RESULT: + // TODO (expected): skip + result = ResultSerializers.result.deserialize(in, userVersion); + break; + } + } } } \ No newline at end of file diff --git a/src/java/org/apache/cassandra/service/accord/AccordJournalValueSerializers.java b/src/java/org/apache/cassandra/service/accord/AccordJournalValueSerializers.java index 7f1c71f351..a11dfb744b 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordJournalValueSerializers.java +++ b/src/java/org/apache/cassandra/service/accord/AccordJournalValueSerializers.java @@ -35,8 +35,8 @@ import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.service.accord.serializers.CommandStoreSerializers; import org.apache.cassandra.service.accord.serializers.KeySerializers; +import static accord.api.Journal.Load.ALL; import static accord.local.CommandStores.RangesForEpoch; -import static org.apache.cassandra.service.accord.SavedCommand.Load.ALL; // TODO (required): test with large collection values, and perhaps split out some fields if they have a tendency to grow larger // TODO (required): alert on metadata size @@ -56,16 +56,16 @@ public class AccordJournalValueSerializers } public static class CommandDiffSerializer - implements FlyweightSerializer<SavedCommand.Writer, SavedCommand.Builder> + implements FlyweightSerializer<AccordJournal.Writer, AccordJournal.Builder> { @Override - public SavedCommand.Builder mergerFor(JournalKey journalKey) + public AccordJournal.Builder mergerFor(JournalKey journalKey) { - return new SavedCommand.Builder(journalKey.id, ALL); + return new AccordJournal.Builder(journalKey.id, ALL); } @Override - public void serialize(JournalKey key, SavedCommand.Writer writer, DataOutputPlus out, int userVersion) + public void serialize(JournalKey key, AccordJournal.Writer writer, DataOutputPlus out, int userVersion) { try { @@ -78,13 +78,17 @@ public class AccordJournalValueSerializers } @Override - public void reserialize(JournalKey key, SavedCommand.Builder from, DataOutputPlus out, int userVersion) throws IOException + public void reserialize(JournalKey key, AccordJournal.Builder from, DataOutputPlus out, int userVersion) throws IOException { - from.serialize(out, userVersion); + from.serialize(out, + // In CompactionIterator, we are dealing with relatively recent records, so we do not pass redundant before here. + // However, we do on load and during Journal SSTable compaction. + RedundantBefore.EMPTY, + userVersion); } @Override - public void deserialize(JournalKey journalKey, SavedCommand.Builder into, DataInputPlus in, int userVersion) throws IOException + public void deserialize(JournalKey journalKey, AccordJournal.Builder into, DataInputPlus in, int userVersion) throws IOException { into.deserializeNext(in, userVersion); } @@ -296,8 +300,8 @@ public class AccordJournalValueSerializers from.forEach((epoch, ranges) -> { try { - KeySerializers.ranges.serialize(ranges, out, messagingVersion); out.writeLong(epoch); + KeySerializers.ranges.serialize(ranges, out, messagingVersion); } catch (Throwable t) { @@ -320,8 +324,8 @@ public class AccordJournalValueSerializers long[] epochs = new long[size]; for (int i = 0; i < ranges.length; i++) { - ranges[i] = KeySerializers.ranges.deserialize(in, messagingVersion); epochs[i] = in.readLong(); + ranges[i] = KeySerializers.ranges.deserialize(in, messagingVersion); } Invariants.checkState(ranges.length == epochs.length); into.update(new RangesForEpoch(epochs, ranges)); diff --git a/src/java/org/apache/cassandra/service/accord/AccordTask.java b/src/java/org/apache/cassandra/service/accord/AccordTask.java index ea4277391a..db36c42562 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordTask.java +++ b/src/java/org/apache/cassandra/service/accord/AccordTask.java @@ -618,7 +618,7 @@ public abstract class AccordTask<R> extends Task implements Runnable, Function<S condition.awaitUninterruptibly(); for (Command check : sanityCheck) - this.commandStore.sanityCheckCommand(check); + this.commandStore.sanityCheckCommand(commandStore.unsafeGetRedundantBefore(), check); if (onFlush != null) onFlush.run(); } diff --git a/src/java/org/apache/cassandra/service/accord/CommandsForRanges.java b/src/java/org/apache/cassandra/service/accord/CommandsForRanges.java index b0aa948e26..29cdcb5195 100644 --- a/src/java/org/apache/cassandra/service/accord/CommandsForRanges.java +++ b/src/java/org/apache/cassandra/service/accord/CommandsForRanges.java @@ -169,7 +169,7 @@ public class CommandsForRanges extends TreeMap<Timestamp, Summary> implements Co { if (findAsDep == null) { - SavedCommand.MinimalCommand cmd = manager.commandStore.loadMinimal(txnId); + Command.Minimal cmd = manager.commandStore.loadMinimal(txnId); if (cmd != null) return ifRelevant(cmd); } @@ -225,7 +225,7 @@ public class CommandsForRanges extends TreeMap<Timestamp, Summary> implements Co return ifRelevant(cmd.txnId(), cmd.executeAt(), cmd.saveStatus(), cmd.participants(), cmd.partialDeps()); } - public Summary ifRelevant(SavedCommand.MinimalCommand cmd) + public Summary ifRelevant(Command.Minimal cmd) { Invariants.checkState(findAsDep == null); return ifRelevant(cmd.txnId, cmd.executeAt, cmd.saveStatus, cmd.participants, null); diff --git a/src/java/org/apache/cassandra/service/accord/IJournal.java b/src/java/org/apache/cassandra/service/accord/IJournal.java deleted file mode 100644 index 66b1f65ce5..0000000000 --- a/src/java/org/apache/cassandra/service/accord/IJournal.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.cassandra.service.accord; - -import accord.api.Journal; -import accord.local.Command; -import accord.local.DurableBefore; -import accord.local.RedundantBefore; -import accord.primitives.TxnId; -import accord.utils.PersistentField.Persister; - -public interface IJournal extends Journal -{ - // TODO (required): migrate to accord.api.Journal - default SavedCommand.MinimalCommand loadMinimal(int commandStoreId, TxnId txnId, SavedCommand.Load load, RedundantBefore redundantBefore, DurableBefore durableBefore) - { - Command command = loadCommand(commandStoreId, txnId, redundantBefore, durableBefore); - if (command == null) - return null; - return new SavedCommand.MinimalCommand(command.txnId(), command.saveStatus(), command.participants(), command.durability(), command.executeAt(), command.writes()); - } - - Persister<DurableBefore, DurableBefore> durableBeforePersister(); -} \ No newline at end of file diff --git a/src/java/org/apache/cassandra/service/accord/SavedCommand.java b/src/java/org/apache/cassandra/service/accord/SavedCommand.java deleted file mode 100644 index 9e38e5158c..0000000000 --- a/src/java/org/apache/cassandra/service/accord/SavedCommand.java +++ /dev/null @@ -1,980 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.cassandra.service.accord; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.function.Function; -import javax.annotation.Nullable; - -import com.google.common.annotations.VisibleForTesting; - -import accord.api.Agent; -import accord.api.Result; -import accord.local.Cleanup; -import accord.local.Command; -import accord.local.CommonAttributes; -import accord.local.DurableBefore; -import accord.local.RedundantBefore; -import accord.local.StoreParticipants; -import accord.primitives.Ballot; -import accord.primitives.PartialDeps; -import accord.primitives.PartialTxn; -import accord.primitives.SaveStatus; -import accord.primitives.Status; -import accord.primitives.Timestamp; -import accord.primitives.TxnId; -import accord.primitives.Writes; -import accord.utils.Invariants; -import org.apache.cassandra.io.util.DataInputPlus; -import org.apache.cassandra.io.util.DataOutputBuffer; -import org.apache.cassandra.io.util.DataOutputPlus; -import org.apache.cassandra.journal.Journal; -import org.apache.cassandra.service.accord.serializers.CommandSerializers; -import org.apache.cassandra.service.accord.serializers.DepsSerializers; -import org.apache.cassandra.service.accord.serializers.ResultSerializers; -import org.apache.cassandra.service.accord.serializers.WaitingOnSerializer; -import org.apache.cassandra.utils.Throwables; - -import static accord.local.Cleanup.NO; -import static accord.local.Cleanup.TRUNCATE_WITH_OUTCOME; -import static accord.primitives.Known.KnownDeps.DepsErased; -import static accord.primitives.Known.KnownDeps.DepsUnknown; -import static accord.primitives.Known.KnownDeps.NoDeps; -import static accord.primitives.SaveStatus.TruncatedApplyWithOutcome; -import static accord.primitives.Status.Durability.NotDurable; -import static accord.utils.Invariants.illegalState; -import static org.apache.cassandra.service.accord.SavedCommand.Fields.DURABILITY; -import static org.apache.cassandra.service.accord.SavedCommand.Fields.EXECUTE_AT; -import static org.apache.cassandra.service.accord.SavedCommand.Fields.PARTICIPANTS; -import static org.apache.cassandra.service.accord.SavedCommand.Fields.RESULT; -import static org.apache.cassandra.service.accord.SavedCommand.Fields.SAVE_STATUS; -import static org.apache.cassandra.service.accord.SavedCommand.Fields.WRITES; -import static org.apache.cassandra.service.accord.SavedCommand.Load.ALL; - -public class SavedCommand -{ - // This enum is order-dependent - public enum Fields - { - PARTICIPANTS, // stored first so we can index it - SAVE_STATUS, - PARTIAL_DEPS, - EXECUTE_AT, - EXECUTES_AT_LEAST, - DURABILITY, - ACCEPTED, - PROMISED, - WAITING_ON, - PARTIAL_TXN, - WRITES, - CLEANUP, - RESULT, - ; - - public static final Fields[] FIELDS = values(); - } - - // TODO: maybe rename this and enclosing classes? - public static class Writer implements Journal.Writer - { - private final Command after; - private final TxnId txnId; - private final int flags; - - @VisibleForTesting - public Writer(Command after, int flags) - { - this(after.txnId(), after, flags); - } - - @VisibleForTesting - public Writer(TxnId txnId, Command after, int flags) - { - this.txnId = txnId; - this.after = after; - this.flags = flags; - } - - @VisibleForTesting - public Command after() - { - return after; - } - - public void write(DataOutputPlus out, int userVersion) throws IOException - { - serialize(after, flags, out, userVersion); - } - - public TxnId key() - { - return txnId; - } - } - - public static @Nullable ByteBuffer asSerializedDiff(Command before, Command after, int userVersion) throws IOException - { - try (DataOutputBuffer out = new DataOutputBuffer()) - { - Writer writer = diff(before, after); - if (writer == null) - return null; - - writer.write(out, userVersion); - return out.asNewBuffer(); - } - } - - @Nullable - public static Writer diff(Command before, Command after) - { - if (before == after - || after == null - || after.saveStatus() == SaveStatus.Uninitialised) - return null; - - int flags = validateFlags(getFlags(before, after)); - if (!anyFieldChanged(flags)) - return null; - - return new Writer(after, flags); - } - - // TODO (required): calculate flags once - private static boolean anyFieldChanged(int flags) - { - return (flags >>> 16) != 0; - } - - private static int validateFlags(int flags) - { - Invariants.checkState(0 == (~(flags >>> 16) & (flags & 0xffff))); - return flags; - } - - public static void serialize(Command after, int flags, DataOutputPlus out, int userVersion) throws IOException - { - Invariants.checkState(flags != 0); - out.writeInt(flags); - - int iterable = toIterableSetFields(flags); - while (iterable != 0) - { - Fields field = nextSetField(iterable); - if (getFieldIsNull(field, flags)) - { - iterable = unsetIterableFields(field, iterable); - continue; - } - - switch (field) - { - case EXECUTE_AT: - CommandSerializers.timestamp.serialize(after.executeAt(), out, userVersion); - break; - case EXECUTES_AT_LEAST: - CommandSerializers.timestamp.serialize(after.executesAtLeast(), out, userVersion); - break; - case SAVE_STATUS: - out.writeShort(after.saveStatus().ordinal()); - break; - case DURABILITY: - out.writeByte(after.durability().ordinal()); - break; - case ACCEPTED: - CommandSerializers.ballot.serialize(after.acceptedOrCommitted(), out, userVersion); - break; - case PROMISED: - CommandSerializers.ballot.serialize(after.promised(), out, userVersion); - break; - case PARTICIPANTS: - CommandSerializers.participants.serialize(after.participants(), out, userVersion); - break; - case PARTIAL_TXN: - CommandSerializers.partialTxn.serialize(after.partialTxn(), out, userVersion); - break; - case PARTIAL_DEPS: - DepsSerializers.partialDeps.serialize(after.partialDeps(), out, userVersion); - break; - case WAITING_ON: - Command.WaitingOn waitingOn = getWaitingOn(after); - long size = WaitingOnSerializer.serializedSize(after.txnId(), waitingOn); - ByteBuffer serialized = WaitingOnSerializer.serialize(after.txnId(), waitingOn); - Invariants.checkState(serialized.remaining() == size); - out.writeInt((int) size); - out.write(serialized); - break; - case WRITES: - CommandSerializers.writes.serialize(after.writes(), out, userVersion); - break; - case RESULT: - ResultSerializers.result.serialize(after.result(), out, userVersion); - break; - case CLEANUP: - throw new IllegalStateException(); - } - - iterable = unsetIterableFields(field, iterable); - } - } - - @VisibleForTesting - public static int getFlags(Command before, Command after) - { - int flags = 0; - - flags = collectFlags(before, after, Command::executeAt, true, Fields.EXECUTE_AT, flags); - flags = collectFlags(before, after, Command::executesAtLeast, true, Fields.EXECUTES_AT_LEAST, flags); - flags = collectFlags(before, after, Command::saveStatus, false, SAVE_STATUS, flags); - flags = collectFlags(before, after, Command::durability, false, DURABILITY, flags); - - flags = collectFlags(before, after, Command::acceptedOrCommitted, false, Fields.ACCEPTED, flags); - flags = collectFlags(before, after, Command::promised, false, Fields.PROMISED, flags); - - flags = collectFlags(before, after, Command::participants, true, PARTICIPANTS, flags); - flags = collectFlags(before, after, Command::partialTxn, false, Fields.PARTIAL_TXN, flags); - flags = collectFlags(before, after, Command::partialDeps, false, Fields.PARTIAL_DEPS, flags); - - // TODO: waitingOn vs WaitingOnWithExecutedAt? - flags = collectFlags(before, after, SavedCommand::getWaitingOn, true, Fields.WAITING_ON, flags); - - flags = collectFlags(before, after, Command::writes, false, WRITES, flags); - - // Special-cased for Journal BurnTest integration - if ((before != null && before.result() != null && before.result() != ResultSerializers.APPLIED) || - (after != null && after.result() != null && after.result() != ResultSerializers.APPLIED)) - { - flags = collectFlags(before, after, Command::writes, false, RESULT, flags); - } - - return flags; - } - - static Command.WaitingOn getWaitingOn(Command command) - { - if (command instanceof Command.Committed) - return command.asCommitted().waitingOn(); - - return null; - } - - private static <OBJ, VAL> int collectFlags(OBJ lo, OBJ ro, Function<OBJ, VAL> convert, boolean allowClassMismatch, Fields field, int flags) - { - VAL l = null; - VAL r = null; - if (lo != null) l = convert.apply(lo); - if (ro != null) r = convert.apply(ro); - - if (l == r) - return flags; // no change - - if (r == null) - flags = setFieldIsNull(field, flags); - - if (l == null || r == null) - return setFieldChanged(field, flags); - - assert allowClassMismatch || l.getClass() == r.getClass() : String.format("%s != %s", l.getClass(), r.getClass()); - - if (l.equals(r)) - return flags; // no change - - return setFieldChanged(field, flags); - } - - private static int setFieldChanged(Fields field, int oldFlags) - { - return oldFlags | (0x10000 << field.ordinal()); - } - - @VisibleForTesting - static boolean getFieldChanged(Fields field, int oldFlags) - { - return (oldFlags & (0x10000 << field.ordinal())) != 0; - } - - static int toIterableSetFields(int flags) - { - return flags >>> 16; - } - - static Fields nextSetField(int iterable) - { - int i = Integer.numberOfTrailingZeros(Integer.lowestOneBit(iterable)); - return i == 32 ? null : Fields.FIELDS[i]; - } - - static int unsetIterableFields(Fields field, int iterable) - { - return iterable & ~(1 << field.ordinal()); - } - - @VisibleForTesting - static boolean getFieldIsNull(Fields field, int oldFlags) - { - return (oldFlags & (1 << field.ordinal())) != 0; - } - - private static int setFieldIsNull(Fields field, int oldFlags) - { - return oldFlags | (1 << field.ordinal()); - } - - public enum Load - { - ALL(0), - PURGEABLE(SAVE_STATUS, PARTICIPANTS, DURABILITY, EXECUTE_AT, WRITES), - MINIMAL(SAVE_STATUS, PARTICIPANTS, EXECUTE_AT); - - final int mask; - - Load(int mask) - { - this.mask = mask; - } - - Load(Fields ... fields) - { - int mask = -1; - for (Fields field : fields) - mask &= ~(1<< field.ordinal()); - this.mask = mask; - } - } - - public static class MinimalCommand - { - public final TxnId txnId; - public final SaveStatus saveStatus; - public final StoreParticipants participants; - public final Status.Durability durability; - public final Timestamp executeAt; - public final Writes writes; - - public MinimalCommand(TxnId txnId, SaveStatus saveStatus, StoreParticipants participants, Status.Durability durability, Timestamp executeAt, Writes writes) - { - this.txnId = txnId; - this.saveStatus = saveStatus; - this.participants = participants; - this.durability = durability; - this.executeAt = executeAt; - this.writes = writes; - } - } - - public static class Builder - { - final int mask; - int flags; - - TxnId txnId; - - Timestamp executeAt; - Timestamp executeAtLeast; - SaveStatus saveStatus; - Status.Durability durability; - - Ballot acceptedOrCommitted; - Ballot promised; - - StoreParticipants participants; - PartialTxn partialTxn; - PartialDeps partialDeps; - - byte[] waitingOnBytes; - SavedCommand.WaitingOnProvider waitingOn; - Writes writes; - Result result; - Cleanup cleanup; - - boolean nextCalled; - int count; - - public Builder(TxnId txnId, Load load) - { - this.mask = load.mask; - init(txnId); - } - - public Builder(TxnId txnId) - { - this(txnId, ALL); - } - - public Builder(Load load) - { - this.mask = load.mask; - } - - public Builder() - { - this(ALL); - } - - public TxnId txnId() - { - return txnId; - } - - public Timestamp executeAt() - { - return executeAt; - } - - public Timestamp executeAtLeast() - { - return executeAtLeast; - } - - public SaveStatus saveStatus() - { - return saveStatus; - } - - public Status.Durability durability() - { - return durability; - } - - public Ballot acceptedOrCommitted() - { - return acceptedOrCommitted; - } - - public Ballot promised() - { - return promised; - } - - public StoreParticipants participants() - { - return participants; - } - - public PartialTxn partialTxn() - { - return partialTxn; - } - - public PartialDeps partialDeps() - { - return partialDeps; - } - - public SavedCommand.WaitingOnProvider waitingOn() - { - return waitingOn; - } - - public Writes writes() - { - return writes; - } - - public Result result() - { - return result; - } - - public void clear() - { - flags = 0; - txnId = null; - - executeAt = null; - executeAtLeast = null; - saveStatus = null; - durability = null; - - acceptedOrCommitted = null; - promised = null; - - participants = null; - partialTxn = null; - partialDeps = null; - - waitingOnBytes = null; - waitingOn = null; - writes = null; - result = null; - cleanup = null; - - nextCalled = false; - count = 0; - } - - public void reset(TxnId txnId) - { - clear(); - init(txnId); - } - - public void init(TxnId txnId) - { - this.txnId = txnId; - durability = NotDurable; - acceptedOrCommitted = promised = Ballot.ZERO; - waitingOn = (txn, deps) -> null; - result = ResultSerializers.APPLIED; - } - - public boolean isEmpty() - { - return !nextCalled; - } - - public int count() - { - return count; - } - - public Cleanup shouldCleanup(Agent agent, RedundantBefore redundantBefore, DurableBefore durableBefore) - { - if (!nextCalled) - return NO; - - if (saveStatus == null || participants == null) - return Cleanup.NO; - - Cleanup cleanup = Cleanup.shouldCleanupPartial(agent, txnId, saveStatus, durability, participants, redundantBefore, durableBefore); - if (this.cleanup != null && this.cleanup.compareTo(cleanup) > 0) - cleanup = this.cleanup; - return cleanup; - } - - // TODO (expected): avoid allocating new builder - public Builder maybeCleanup(Cleanup cleanup) - { - if (saveStatus() == null) - return this; - - switch (cleanup) - { - case EXPUNGE: - case ERASE: - return null; - - case EXPUNGE_PARTIAL: - return expungePartial(cleanup, saveStatus, true); - - case VESTIGIAL: - case INVALIDATE: - return saveStatusOnly(); - - case TRUNCATE_WITH_OUTCOME: - case TRUNCATE: - return expungePartial(cleanup, cleanup.appliesIfNot, cleanup == TRUNCATE_WITH_OUTCOME); - - case NO: - return this; - default: - throw new IllegalStateException("Unknown cleanup: " + cleanup);} - } - - private Builder expungePartial(Cleanup cleanup, SaveStatus saveStatus, boolean includeOutcome) - { - Invariants.checkState(txnId != null); - Builder builder = new Builder(txnId, ALL); - - builder.count++; - builder.nextCalled = true; - - Invariants.checkState(saveStatus != null); - builder.flags = setFieldChanged(SAVE_STATUS, builder.flags); - builder.saveStatus = saveStatus; - builder.flags = setFieldChanged(Fields.CLEANUP, builder.flags); - builder.cleanup = cleanup; - if (executeAt != null) - { - builder.flags = setFieldChanged(Fields.EXECUTE_AT, builder.flags); - builder.executeAt = executeAt; - } - if (durability != null) - { - builder.flags = setFieldChanged(DURABILITY, builder.flags); - builder.durability = durability; - } - if (participants != null) - { - builder.flags = setFieldChanged(PARTICIPANTS, builder.flags); - builder.participants = participants; - } - if (includeOutcome && builder.writes != null) - { - builder.flags = setFieldChanged(WRITES, builder.flags); - builder.writes = writes; - } - - return builder; - } - - private Builder saveStatusOnly() - { - Invariants.checkState(txnId != null); - Builder builder = new Builder(txnId, ALL); - - builder.count++; - builder.nextCalled = true; - - // TODO: these accesses can be abstracted away - if (saveStatus != null) - { - builder.flags = setFieldChanged(SAVE_STATUS, builder.flags); - builder.saveStatus = saveStatus; - } - - return builder; - } - - public ByteBuffer asByteBuffer(int userVersion) throws IOException - { - try (DataOutputBuffer out = new DataOutputBuffer()) - { - serialize(out, userVersion); - return out.asNewBuffer(); - } - } - - public MinimalCommand asMinimal() - { - return new MinimalCommand(txnId, saveStatus, participants, durability, executeAt, writes); - } - - public void serialize(DataOutputPlus out, int userVersion) throws IOException - { - Invariants.checkState(mask == 0); - Invariants.checkState(flags != 0); - out.writeInt(validateFlags(flags)); - - int iterable = toIterableSetFields(flags); - while (iterable != 0) - { - Fields field = nextSetField(iterable); - if (getFieldIsNull(field, flags)) - { - iterable = unsetIterableFields(field, iterable); - continue; - } - - switch (field) - { - case EXECUTE_AT: - CommandSerializers.timestamp.serialize(executeAt(), out, userVersion); - break; - case EXECUTES_AT_LEAST: - CommandSerializers.timestamp.serialize(executeAtLeast(), out, userVersion); - break; - case SAVE_STATUS: - out.writeShort(saveStatus().ordinal()); - break; - case DURABILITY: - out.writeByte(durability().ordinal()); - break; - case ACCEPTED: - CommandSerializers.ballot.serialize(acceptedOrCommitted(), out, userVersion); - break; - case PROMISED: - CommandSerializers.ballot.serialize(promised(), out, userVersion); - break; - case PARTICIPANTS: - CommandSerializers.participants.serialize(participants(), out, userVersion); - break; - case PARTIAL_TXN: - CommandSerializers.partialTxn.serialize(partialTxn(), out, userVersion); - break; - case PARTIAL_DEPS: - DepsSerializers.partialDeps.serialize(partialDeps(), out, userVersion); - break; - case WAITING_ON: - out.writeInt(waitingOnBytes.length); - out.write(waitingOnBytes); - break; - case WRITES: - CommandSerializers.writes.serialize(writes(), out, userVersion); - break; - case CLEANUP: - out.writeByte(cleanup.ordinal()); - break; - case RESULT: - ResultSerializers.result.serialize(result(), out, userVersion); - break; - } - - iterable = unsetIterableFields(field, iterable); - } - } - - public void deserializeNext(DataInputPlus in, int userVersion) throws IOException - { - Invariants.checkState(txnId != null); - int flags = in.readInt(); - Invariants.checkState(flags != 0); - nextCalled = true; - count++; - - int iterable = toIterableSetFields(flags); - while (iterable != 0) - { - Fields field = nextSetField(iterable); - if (getFieldChanged(field, this.flags) || getFieldIsNull(field, mask)) - { - if (!getFieldIsNull(field, flags)) - skip(field, in, userVersion); - - iterable = unsetIterableFields(field, iterable); - continue; - } - this.flags = setFieldChanged(field, this.flags); - - if (getFieldIsNull(field, flags)) - { - this.flags = setFieldIsNull(field, this.flags); - } - else - { - deserialize(field, in, userVersion); - } - - iterable = unsetIterableFields(field, iterable); - } - } - - private void deserialize(Fields field, DataInputPlus in, int userVersion) throws IOException - { - switch (field) - { - case EXECUTE_AT: - executeAt = CommandSerializers.timestamp.deserialize(in, userVersion); - break; - case EXECUTES_AT_LEAST: - executeAtLeast = CommandSerializers.timestamp.deserialize(in, userVersion); - break; - case SAVE_STATUS: - saveStatus = SaveStatus.values()[in.readShort()]; - break; - case DURABILITY: - durability = Status.Durability.values()[in.readByte()]; - break; - case ACCEPTED: - acceptedOrCommitted = CommandSerializers.ballot.deserialize(in, userVersion); - break; - case PROMISED: - promised = CommandSerializers.ballot.deserialize(in, userVersion); - break; - case PARTICIPANTS: - participants = CommandSerializers.participants.deserialize(in, userVersion); - break; - case PARTIAL_TXN: - partialTxn = CommandSerializers.partialTxn.deserialize(in, userVersion); - break; - case PARTIAL_DEPS: - partialDeps = DepsSerializers.partialDeps.deserialize(in, userVersion); - break; - case WAITING_ON: - int size = in.readInt(); - waitingOnBytes = new byte[size]; - in.readFully(waitingOnBytes); - ByteBuffer buffer = ByteBuffer.wrap(waitingOnBytes); - waitingOn = (localTxnId, deps) -> { - try - { - Invariants.nonNull(deps); - return WaitingOnSerializer.deserialize(localTxnId, deps.keyDeps.keys(), deps.rangeDeps, deps.directKeyDeps, buffer); - } - catch (IOException e) - { - throw Throwables.unchecked(e); - } - }; - break; - case WRITES: - writes = CommandSerializers.writes.deserialize(in, userVersion); - break; - case CLEANUP: - Cleanup newCleanup = Cleanup.forOrdinal(in.readByte()); - if (cleanup == null || newCleanup.compareTo(cleanup) > 0) - cleanup = newCleanup; - break; - case RESULT: - result = ResultSerializers.result.deserialize(in, userVersion); - break; - } - } - - private void skip(Fields field, DataInputPlus in, int userVersion) throws IOException - { - switch (field) - { - case EXECUTE_AT: - case EXECUTES_AT_LEAST: - CommandSerializers.timestamp.skip(in, userVersion); - break; - case SAVE_STATUS: - in.readShort(); - break; - case DURABILITY: - in.readByte(); - break; - case ACCEPTED: - case PROMISED: - CommandSerializers.ballot.skip(in, userVersion); - break; - case PARTICIPANTS: - // TODO (expected): skip - CommandSerializers.participants.deserialize(in, userVersion); - break; - case PARTIAL_TXN: - CommandSerializers.partialTxn.deserialize(in, userVersion); - break; - case PARTIAL_DEPS: - // TODO (expected): skip - DepsSerializers.partialDeps.deserialize(in, userVersion); - break; - case WAITING_ON: - int size = in.readInt(); - in.skipBytesFully(size); - break; - case WRITES: - // TODO (expected): skip - CommandSerializers.writes.deserialize(in, userVersion); - break; - case CLEANUP: - in.readByte(); - break; - case RESULT: - // TODO (expected): skip - result = ResultSerializers.result.deserialize(in, userVersion); - break; - } - } - - public void forceResult(Result newValue) - { - this.result = newValue; - } - - public Command construct() - { - if (!nextCalled) - return null; - - Invariants.checkState(txnId != null); - CommonAttributes.Mutable attrs = new CommonAttributes.Mutable(txnId); - if (partialTxn != null) - attrs.partialTxn(partialTxn); - if (durability != null) - attrs.durability(durability); - if (participants != null) - attrs.setParticipants(participants); - else - attrs.setParticipants(StoreParticipants.empty(txnId)); - if (partialDeps != null && - (saveStatus.known.deps != NoDeps && - saveStatus.known.deps != DepsErased && - saveStatus.known.deps != DepsUnknown)) - attrs.partialDeps(partialDeps); - - switch (saveStatus.known.outcome) - { - case Erased: - case WasApply: - writes = null; - result = null; - break; - } - - Command.WaitingOn waitingOn = null; - if (this.waitingOn != null) - waitingOn = this.waitingOn.provide(txnId, partialDeps); - - switch (saveStatus.status) - { - case NotDefined: - return saveStatus == SaveStatus.Uninitialised ? Command.NotDefined.uninitialised(attrs.txnId()) - : Command.NotDefined.notDefined(attrs, promised); - case PreAccepted: - return Command.PreAccepted.preAccepted(attrs, executeAt, promised); - case AcceptedInvalidate: - case Accepted: - case PreCommitted: - if (saveStatus == SaveStatus.AcceptedInvalidate) - return Command.AcceptedInvalidateWithoutDefinition.acceptedInvalidate(attrs, promised, acceptedOrCommitted); - else - return Command.Accepted.accepted(attrs, saveStatus, executeAt, promised, acceptedOrCommitted); - case Committed: - case Stable: - return Command.Committed.committed(attrs, saveStatus, executeAt, promised, acceptedOrCommitted, waitingOn); - case PreApplied: - case Applied: - return Command.Executed.executed(attrs, saveStatus, executeAt, promised, acceptedOrCommitted, waitingOn, writes, result); - case Truncated: - case Invalidated: - return truncated(attrs, saveStatus, executeAt, executeAtLeast, writes, result); - default: - throw new IllegalStateException(); - } - } - - private static Command.Truncated truncated(CommonAttributes.Mutable attrs, SaveStatus status, Timestamp executeAt, Timestamp executesAtLeast, Writes writes, Result result) - { - switch (status) - { - default: - throw illegalState("Unhandled SaveStatus: " + status); - case TruncatedApplyWithOutcome: - case TruncatedApplyWithDeps: - case TruncatedApply: - if (status != TruncatedApplyWithOutcome) - result = null; - if (attrs.txnId().kind().awaitsOnlyDeps()) - return Command.Truncated.truncatedApply(attrs, status, executeAt, writes, result, executesAtLeast); - return Command.Truncated.truncatedApply(attrs, status, executeAt, writes, result, null); - case ErasedOrVestigial: - return Command.Truncated.erasedOrVestigial(attrs.txnId(), attrs.participants()); - case Erased: - return Command.Truncated.erased(attrs.txnId(), attrs.durability(), attrs.participants()); - case Invalidated: - return Command.Truncated.invalidated(attrs.txnId()); - } - } - - public String toString() - { - return "Diff {" + - "txnId=" + txnId + - ", executeAt=" + executeAt + - ", saveStatus=" + saveStatus + - ", durability=" + durability + - ", acceptedOrCommitted=" + acceptedOrCommitted + - ", promised=" + promised + - ", participants=" + participants + - ", partialTxn=" + partialTxn + - ", partialDeps=" + partialDeps + - ", waitingOn=" + waitingOn + - ", writes=" + writes + - '}'; - } - } - - public interface WaitingOnProvider - { - Command.WaitingOn provide(TxnId txnId, PartialDeps deps); - } -} \ No newline at end of file diff --git a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordLoadTest.java b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordLoadTest.java index 6ad2f09d2a..2a99a350b5 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordLoadTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordLoadTest.java @@ -186,9 +186,6 @@ public class AccordLoadTest extends AccordTestBase System.out.println("compacting accord..."); cluster.forEach(i -> { i.nodetool("compact", "system_accord.journal"); - i.runOnInstance(() -> { - ((AccordService) AccordService.instance()).journal().checkAllCommands(); - }); }); } @@ -198,7 +195,6 @@ public class AccordLoadTest extends AccordTestBase System.out.println("flushing journal..."); cluster.forEach(i -> i.runOnInstance(() -> { ((AccordService) AccordService.instance()).journal().closeCurrentSegmentForTestingIfNonEmpty(); - ((AccordService) AccordService.instance()).journal().checkAllCommands(); })); } diff --git a/test/distributed/org/apache/cassandra/service/accord/AccordJournalBurnTest.java b/test/distributed/org/apache/cassandra/service/accord/AccordJournalBurnTest.java index 25f9c61d38..9e41e0add9 100644 --- a/test/distributed/org/apache/cassandra/service/accord/AccordJournalBurnTest.java +++ b/test/distributed/org/apache/cassandra/service/accord/AccordJournalBurnTest.java @@ -136,7 +136,7 @@ public class AccordJournalBurnTest extends BurnTestBase operations, 10 + random.nextInt(30), new RandomDelayQueue.Factory(random).get(), - (node) -> { + (node, agent) -> { try { File directory = new File(Files.createTempDirectory(Integer.toString(counter.incrementAndGet()))); @@ -175,6 +175,4 @@ public class AccordJournalBurnTest extends BurnTestBase throw SimulationException.wrap(seed, t); } } - - } diff --git a/test/unit/org/apache/cassandra/service/accord/AccordJournalOrderTest.java b/test/unit/org/apache/cassandra/service/accord/AccordJournalOrderTest.java index 34bb215c0a..3c655133fa 100644 --- a/test/unit/org/apache/cassandra/service/accord/AccordJournalOrderTest.java +++ b/test/unit/org/apache/cassandra/service/accord/AccordJournalOrderTest.java @@ -89,7 +89,7 @@ public class AccordJournalOrderTest Runnable check = () -> { for (JournalKey key : res.keySet()) { - SavedCommand.Builder diffs = accordJournal.loadDiffs(key.commandStoreId, key.id); + AccordJournal.Builder diffs = accordJournal.load(key.commandStoreId, key.id); Assert.assertEquals(String.format("%d != %d for key %s", diffs.count(), res.get(key).intValue(), key), diffs.count(), res.get(key).intValue()); } diff --git a/test/unit/org/apache/cassandra/service/accord/SavedCommandTest.java b/test/unit/org/apache/cassandra/service/accord/CommandChangeTest.java similarity index 87% rename from test/unit/org/apache/cassandra/service/accord/SavedCommandTest.java rename to test/unit/org/apache/cassandra/service/accord/CommandChangeTest.java index 1760286ba3..2878c5750a 100644 --- a/test/unit/org/apache/cassandra/service/accord/SavedCommandTest.java +++ b/test/unit/org/apache/cassandra/service/accord/CommandChangeTest.java @@ -26,7 +26,9 @@ import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; +import accord.impl.CommandChange; import accord.local.Command; +import accord.local.RedundantBefore; import accord.primitives.SaveStatus; import accord.primitives.TxnId; import accord.utils.Gen; @@ -39,17 +41,17 @@ import org.apache.cassandra.schema.KeyspaceParams; import org.apache.cassandra.schema.Schema; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.service.StorageService; -import org.apache.cassandra.service.accord.SavedCommand.Fields; -import org.apache.cassandra.service.accord.SavedCommand.Load; import org.apache.cassandra.service.consensus.TransactionalMode; import org.apache.cassandra.utils.AccordGenerators; import org.assertj.core.api.SoftAssertions; +import static accord.api.Journal.*; +import static accord.impl.CommandChange.*; +import static accord.impl.CommandChange.getFlags; import static accord.utils.Property.qt; import static org.apache.cassandra.cql3.statements.schema.CreateTableStatement.parse; -import static org.apache.cassandra.service.accord.SavedCommand.getFlags; -public class SavedCommandTest +public class CommandChangeTest { private static final EnumSet<Fields> ALL = EnumSet.allOf(Fields.class); @@ -97,13 +99,14 @@ public class SavedCommandTest if (saveStatus == SaveStatus.TruncatedApplyWithDeps) continue; out.clear(); Command orig = cmdBuilder.build(saveStatus); - SavedCommand.serialize(orig, getFlags(null, orig), out, userVersion); - SavedCommand.Builder builder = new SavedCommand.Builder(orig.txnId(), Load.ALL); + + AccordJournal.Writer.make(null, orig).write(out, userVersion); + AccordJournal.Builder builder = new AccordJournal.Builder(orig.txnId(), Load.ALL); builder.deserializeNext(new DataInputBuffer(out.unsafeGetBufferAndFlip(), false), userVersion); // We are not persisting the result, so force it for strict equality builder.forceResult(orig.result()); - Command reconstructed = builder.construct(); + Command reconstructed = builder.construct(RedundantBefore.EMPTY); checks.assertThat(reconstructed) .describedAs("lhs=expected\nrhs=actual\n%s", new LazyToString(() -> ReflectionUtils.recursiveEquals(orig, reconstructed).toString())) @@ -119,10 +122,10 @@ public class SavedCommandTest SoftAssertions checks = new SoftAssertions(); for (Fields field : missing) { - checks.assertThat(SavedCommand.getFieldChanged(field, flags)) + checks.assertThat(CommandChange.getFieldChanged(field, flags)) .describedAs("field %s changed", field). isTrue(); - checks.assertThat(SavedCommand.getFieldIsNull(field, flags)) + checks.assertThat(CommandChange.getFieldIsNull(field, flags)) .describedAs("field %s not null", field) .isFalse(); } @@ -135,11 +138,11 @@ public class SavedCommandTest for (Fields field : missing) { if (field == Fields.CLEANUP) continue; - checks.assertThat(SavedCommand.getFieldChanged(field, flags)) + checks.assertThat(CommandChange.getFieldChanged(field, flags)) .describedAs("field %s changed", field) .isFalse(); // Is null flag can not be set on a field that has not changed - checks.assertThat(SavedCommand.getFieldIsNull(field, flags)) + checks.assertThat(CommandChange.getFieldIsNull(field, flags)) .describedAs("field %s not null", field) .isFalse(); } diff --git a/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStore.java b/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStore.java index c376c46fbf..65b315a7e2 100644 --- a/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStore.java +++ b/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStore.java @@ -30,6 +30,7 @@ import java.util.function.Function; import java.util.function.Predicate; import java.util.function.ToLongFunction; +import accord.api.Journal; import accord.api.LocalListeners; import accord.api.ProgressLog; import accord.api.RemoteListeners; @@ -39,6 +40,7 @@ import accord.impl.DefaultLocalListeners; import accord.impl.DefaultTimeouts; import accord.impl.SizeOfIntersectionSorter; import accord.impl.TestAgent; +import accord.impl.basic.InMemoryJournal; import accord.impl.basic.SimulatedFault; import accord.local.Command; import accord.local.CommandStore; @@ -68,7 +70,6 @@ import accord.primitives.Unseekables; import accord.topology.Topologies; import accord.topology.Topology; import accord.utils.Gens; -import accord.utils.PersistentField; import accord.utils.RandomSource; import accord.utils.async.AsyncChains; import accord.utils.async.AsyncResult; @@ -105,7 +106,7 @@ public class SimulatedAccordCommandStore implements AutoCloseable public final Node.Id nodeId; public final Topology topology; public final Topologies topologies; - public final IJournal journal; + public final Journal journal; public final ScheduledExecutorPlus unorderedScheduled; public final List<String> evictions = new ArrayList<>(); public Predicate<Throwable> ignoreExceptions = ignore -> false; @@ -185,7 +186,6 @@ public class SimulatedAccordCommandStore implements AutoCloseable } }; - this.journal = new InMemoryJournal(nodeId); TestAgent.RethrowAgent agent = new TestAgent.RethrowAgent() { @Override @@ -201,6 +201,8 @@ public class SimulatedAccordCommandStore implements AutoCloseable super.onUncaughtException(t); } }; + + this.journal = new InMemoryJournal(nodeId, agent); this.commandStore = new AccordCommandStore(0, storeService, agent, @@ -246,19 +248,6 @@ public class SimulatedAccordCommandStore implements AutoCloseable }); } - private final class InMemoryJournal extends accord.impl.basic.InMemoryJournal implements IJournal - { - public InMemoryJournal(Node.Id id) - { - super(id); - } - - public PersistentField.Persister<DurableBefore, DurableBefore> durableBeforePersister() - { - throw new IllegalArgumentException("Not implemented"); - } - } - private <K, V> void updateLoadFunction(AccordCache.Type<K, V, ?> i, FunctionWrapper wrapper) { i.unsafeSetLoadFunction(wrapper.wrap(i.unsafeGetLoadFunction())); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org