This is an automated email from the ASF dual-hosted git repository. maedhroz pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git
The following commit(s) were added to refs/heads/trunk by this push: new df492dfd - add shareable APPLIED and INVALIDATED implementations of Result - API changes to support splicing in complete update fragments from PartialTxn as mutations are finally being applied df492dfd is described below commit df492dfd2ffe993c33761d0531ac5b979b80f080 Author: Caleb Rackliffe <calebrackli...@gmail.com> AuthorDate: Thu Mar 23 14:47:54 2023 -0500 - add shareable APPLIED and INVALIDATED implementations of Result - API changes to support splicing in complete update fragments from PartialTxn as mutations are finally being applied patch by Caleb Rackliffe; reviewed by David Capwell, Benedict Elliot Smith, and Ariel Weisberg for CASSANDRA-18355 --- accord-core/src/main/java/accord/api/Result.java | 10 ++++++++-- accord-core/src/main/java/accord/api/Write.java | 3 ++- accord-core/src/main/java/accord/local/Command.java | 2 +- accord-core/src/main/java/accord/local/Commands.java | 2 +- accord-core/src/main/java/accord/primitives/Writes.java | 4 ++-- accord-core/src/test/java/accord/impl/list/ListWrite.java | 3 ++- accord-core/src/test/java/accord/impl/mock/MockStore.java | 2 +- accord-core/src/test/java/accord/messages/ReadDataTest.java | 2 +- .../src/main/java/accord/maelstrom/MaelstromWrite.java | 3 ++- 9 files changed, 20 insertions(+), 11 deletions(-) diff --git a/accord-core/src/main/java/accord/api/Result.java b/accord-core/src/main/java/accord/api/Result.java index d8b8fd6d..0509c476 100644 --- a/accord-core/src/main/java/accord/api/Result.java +++ b/accord-core/src/main/java/accord/api/Result.java @@ -23,11 +23,17 @@ import accord.primitives.ProgressToken; /** * A result to be returned to a client, or be stored in a node's command state. - * - * TODO (expected, efficiency): support minimizing the result for storage in a node's command state (e.g. to only retain success/failure) */ public interface Result extends Outcome { + Result APPLIED = new Result() { }; + + Result INVALIDATED = new Result() + { + @Override + public ProgressToken asProgressToken() { return ProgressToken.INVALIDATED; } + }; + @Override default ProgressToken asProgressToken() { return ProgressToken.APPLIED; } } diff --git a/accord-core/src/main/java/accord/api/Write.java b/accord-core/src/main/java/accord/api/Write.java index ebe25903..62635379 100644 --- a/accord-core/src/main/java/accord/api/Write.java +++ b/accord-core/src/main/java/accord/api/Write.java @@ -19,6 +19,7 @@ package accord.api; import accord.local.SafeCommandStore; +import accord.primitives.PartialTxn; import accord.primitives.Seekable; import accord.primitives.Timestamp; import accord.utils.async.AsyncChain; @@ -30,5 +31,5 @@ import accord.utils.async.AsyncChain; */ public interface Write { - AsyncChain<Void> apply(Seekable key, SafeCommandStore safeStore, Timestamp executeAt, DataStore store); + AsyncChain<Void> apply(Seekable key, SafeCommandStore safeStore, Timestamp executeAt, DataStore store, PartialTxn txn); } diff --git a/accord-core/src/main/java/accord/local/Command.java b/accord-core/src/main/java/accord/local/Command.java index 47f9caba..194fe2da 100644 --- a/accord-core/src/main/java/accord/local/Command.java +++ b/accord-core/src/main/java/accord/local/Command.java @@ -650,7 +650,7 @@ public abstract class Command implements CommonAttributes public static Truncated invalidated(TxnId txnId, Listeners.Immutable durableListeners) { - return new Truncated(txnId, SaveStatus.Invalidated, DurableOrInvalidated, null, Timestamp.NONE, durableListeners, null, null); + return new Truncated(txnId, SaveStatus.Invalidated, DurableOrInvalidated, null, Timestamp.NONE, durableListeners, null, Result.INVALIDATED); } @Override diff --git a/accord-core/src/main/java/accord/local/Commands.java b/accord-core/src/main/java/accord/local/Commands.java index 38d3f18b..52c37a5f 100644 --- a/accord-core/src/main/java/accord/local/Commands.java +++ b/accord-core/src/main/java/accord/local/Commands.java @@ -582,7 +582,7 @@ public class Commands // that was pre-bootstrap for some range (so redundant and we may have gone ahead of), but had to be executed locally // for another range CommandStore unsafeStore = safeStore.commandStore(); - return command.writes().apply(safeStore, applyRanges(safeStore, command.executeAt())) + return command.writes().apply(safeStore, applyRanges(safeStore, command.executeAt()), command.partialTxn()) .flatMap(unused -> unsafeStore.submit(context, ss -> { postApply(ss, txnId); return null; diff --git a/accord-core/src/main/java/accord/primitives/Writes.java b/accord-core/src/main/java/accord/primitives/Writes.java index 6cc08f8f..fb908abe 100644 --- a/accord-core/src/main/java/accord/primitives/Writes.java +++ b/accord-core/src/main/java/accord/primitives/Writes.java @@ -63,7 +63,7 @@ public class Writes return Objects.hash(executeAt, keys, write); } - public AsyncChain<Void> apply(SafeCommandStore safeStore, Ranges ranges) + public AsyncChain<Void> apply(SafeCommandStore safeStore, Ranges ranges, PartialTxn txn) { if (write == null) return SUCCESS; @@ -72,7 +72,7 @@ public class Writes return SUCCESS; List<AsyncChain<Void>> futures = Routables.foldl(keys, ranges, (key, accumulate, index) -> { - accumulate.add(write.apply(key, safeStore, executeAt, safeStore.dataStore())); + accumulate.add(write.apply(key, safeStore, executeAt, safeStore.dataStore(), txn)); return accumulate; }, new ArrayList<>()); return AsyncChains.reduce(futures, (l, r) -> null); diff --git a/accord-core/src/test/java/accord/impl/list/ListWrite.java b/accord-core/src/test/java/accord/impl/list/ListWrite.java index 44e0a963..e206aec5 100644 --- a/accord-core/src/test/java/accord/impl/list/ListWrite.java +++ b/accord-core/src/test/java/accord/impl/list/ListWrite.java @@ -23,6 +23,7 @@ import java.util.TreeMap; import java.util.function.Function; import java.util.stream.Collectors; +import accord.primitives.PartialTxn; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,7 +51,7 @@ public class ListWrite extends TreeMap<Key, int[]> implements Write } @Override - public AsyncChain<Void> apply(Seekable key, SafeCommandStore commandStore, Timestamp executeAt, DataStore store) + public AsyncChain<Void> apply(Seekable key, SafeCommandStore commandStore, Timestamp executeAt, DataStore store, PartialTxn txn) { ListStore s = (ListStore) store; if (!containsKey(key)) diff --git a/accord-core/src/test/java/accord/impl/mock/MockStore.java b/accord-core/src/test/java/accord/impl/mock/MockStore.java index db8b0219..8fa17d7e 100644 --- a/accord-core/src/test/java/accord/impl/mock/MockStore.java +++ b/accord-core/src/test/java/accord/impl/mock/MockStore.java @@ -44,7 +44,7 @@ public class MockStore implements DataStore public static final Result RESULT = new Result() {}; public static final Query QUERY = (txnId, executeAt, data, read, update) -> RESULT; - public static final Write WRITE = (key, commandStore, executeAt, store) -> Writes.SUCCESS; + public static final Write WRITE = (key, commandStore, executeAt, store, command) -> Writes.SUCCESS; public static Read read(Seekables<?, ?> keys) { diff --git a/accord-core/src/test/java/accord/messages/ReadDataTest.java b/accord-core/src/test/java/accord/messages/ReadDataTest.java index fdb40fc6..27c3e9bf 100644 --- a/accord-core/src/test/java/accord/messages/ReadDataTest.java +++ b/accord-core/src/test/java/accord/messages/ReadDataTest.java @@ -286,7 +286,7 @@ class ReadDataTest { AsyncResults.SettableResult<Void> writeResult = new AsyncResults.SettableResult<>(); Write write = Mockito.mock(Write.class); - Mockito.when(write.apply(any(), any(), any(), any())).thenReturn(writeResult); + Mockito.when(write.apply(any(), any(), any(), any(), any())).thenReturn(writeResult); Writes writes = new Writes(txnId, executeAt, keys, write); forEach(store -> check(store.execute(PreLoadContext.contextFor(txnId, keys), safe -> { diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromWrite.java b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromWrite.java index 55a14f21..eeeeb926 100644 --- a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromWrite.java +++ b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromWrite.java @@ -22,6 +22,7 @@ import accord.api.Key; import accord.api.DataStore; import accord.api.Write; import accord.local.SafeCommandStore; +import accord.primitives.PartialTxn; import accord.primitives.Seekable; import accord.primitives.Timestamp; import accord.primitives.Writes; @@ -33,7 +34,7 @@ import java.util.TreeMap; public class MaelstromWrite extends TreeMap<Key, Value> implements Write { @Override - public AsyncChain<Void> apply(Seekable key, SafeCommandStore commandStore, Timestamp executeAt, DataStore store) + public AsyncChain<Void> apply(Seekable key, SafeCommandStore commandStore, Timestamp executeAt, DataStore store, PartialTxn txn) { MaelstromStore s = (MaelstromStore) store; if (containsKey(key)) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org