This is an automated email from the ASF dual-hosted git repository. bdeggleston pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git
The following commit(s) were added to refs/heads/trunk by this push: new 527094c6 Move burn test read timestamp validation from replica to coordination 527094c6 is described below commit 527094c69ad23319b058917b1c8974d01c5d86e6 Author: Blake Eggleston <bl...@ultrablake.com> AuthorDate: Fri Jun 14 10:24:43 2024 -0700 Move burn test read timestamp validation from replica to coordination Patch by Blake Eggleston; reviewed by David Capwell for CASSANDRA-19288 --- .../src/main/java/accord/messages/ReadData.java | 10 +++-- accord-core/src/test/java/accord/Utils.java | 2 +- .../src/test/java/accord/burn/BurnTest.java | 4 +- .../src/test/java/accord/impl/list/ListAgent.java | 2 +- .../accord/impl/list/ListFetchCoordinator.java | 2 +- .../src/test/java/accord/impl/list/ListQuery.java | 10 ++++- .../src/test/java/accord/impl/list/ListRead.java | 50 ++++++++++------------ .../src/test/java/accord/impl/list/ListWrite.java | 1 + 8 files changed, 44 insertions(+), 37 deletions(-) diff --git a/accord-core/src/main/java/accord/messages/ReadData.java b/accord-core/src/main/java/accord/messages/ReadData.java index 37558ec6..2746b6d0 100644 --- a/accord-core/src/main/java/accord/messages/ReadData.java +++ b/accord-core/src/main/java/accord/messages/ReadData.java @@ -359,11 +359,13 @@ public abstract class ReadData extends AbstractEpochRequest<ReadData.CommitOrRea if (safeCommand != null) safeCommand.removeListener(this); waitingOn.clear(safeStore.commandStore().id()); } + // TODO (expected): efficient unsubscribe mechanism - node.commandStores().mapReduceConsume(this, waitingOn.stream(), forEach(in -> { - SafeCommand safeCommand = in.ifInitialised(txnId); - if (safeCommand != null) safeCommand.removeListener(this); - }, node.agent())); + if (waitingOn != null) + node.commandStores().mapReduceConsume(this, waitingOn.stream(), forEach(in -> { + SafeCommand safeCommand = in.ifInitialised(txnId); + if (safeCommand != null) safeCommand.removeListener(this); + }, node.agent())); state = State.OBSOLETE; waitingOn = null; reading = null; diff --git a/accord-core/src/test/java/accord/Utils.java b/accord-core/src/test/java/accord/Utils.java index 680b47d1..3cc082be 100644 --- a/accord-core/src/test/java/accord/Utils.java +++ b/accord-core/src/test/java/accord/Utils.java @@ -127,7 +127,7 @@ public class Utils for (Key k : keys) update.put(k, 1); ListRead read = new ListRead(Function.identity(), false, keys, keys); - ListQuery query = new ListQuery(client, keys.size()); + ListQuery query = new ListQuery(client, keys.size(), false); return new Txn.InMemory(keys, read, query, update); } diff --git a/accord-core/src/test/java/accord/burn/BurnTest.java b/accord-core/src/test/java/accord/burn/BurnTest.java index 447b80b0..70e6110e 100644 --- a/accord-core/src/test/java/accord/burn/BurnTest.java +++ b/accord-core/src/test/java/accord/burn/BurnTest.java @@ -149,7 +149,7 @@ public class BurnTest requestRanges.add(nextRange.apply(prefixes)); Ranges ranges = Ranges.of(requestRanges.toArray(EMPTY_RANGES)); ListRead read = new ListRead(random.decide(readInCommandStore) ? Function.identity() : executor, false, ranges, ranges); - ListQuery query = new ListQuery(client, finalCount); + ListQuery query = new ListQuery(client, finalCount, false); return new Txn.InMemory(ranges, read, query); }; } @@ -183,7 +183,7 @@ public class BurnTest if (isWrite) requestKeys.addAll(update.keySet()); ListRead read = new ListRead(random.decide(readInCommandStore) ? Function.identity() : executor, kind == EphemeralRead, readKeys, new Keys(requestKeys)); - ListQuery query = new ListQuery(client, finalCount); + ListQuery query = new ListQuery(client, finalCount, kind == EphemeralRead); return new Txn.InMemory(kind, new Keys(requestKeys), read, query, update); }; } diff --git a/accord-core/src/test/java/accord/impl/list/ListAgent.java b/accord-core/src/test/java/accord/impl/list/ListAgent.java index 31cb5155..343be285 100644 --- a/accord-core/src/test/java/accord/impl/list/ListAgent.java +++ b/accord-core/src/test/java/accord/impl/list/ListAgent.java @@ -105,7 +105,7 @@ public class ListAgent implements Agent @Override public Txn emptyTxn(Txn.Kind kind, Seekables<?, ?> keysOrRanges) { - return new Txn.InMemory(kind, keysOrRanges, new ListRead(identity(), false, Keys.EMPTY, Keys.EMPTY), new ListQuery(NONE, Integer.MIN_VALUE), null); + return new Txn.InMemory(kind, keysOrRanges, new ListRead(identity(), false, Keys.EMPTY, Keys.EMPTY), new ListQuery(NONE, Integer.MIN_VALUE, false), null); } public boolean collectMaxApplied() diff --git a/accord-core/src/test/java/accord/impl/list/ListFetchCoordinator.java b/accord-core/src/test/java/accord/impl/list/ListFetchCoordinator.java index 4e8b3e9a..9f7cc28b 100644 --- a/accord-core/src/test/java/accord/impl/list/ListFetchCoordinator.java +++ b/accord-core/src/test/java/accord/impl/list/ListFetchCoordinator.java @@ -53,7 +53,7 @@ public class ListFetchCoordinator extends AbstractFetchCoordinator @Override protected PartialTxn rangeReadTxn(Ranges ranges) { - return new PartialTxn.InMemory(ranges, Txn.Kind.Read, ranges, new ListRead(Function.identity(), false, ranges, ranges), new ListQuery(Node.Id.NONE, Long.MIN_VALUE), null); + return new PartialTxn.InMemory(ranges, Txn.Kind.Read, ranges, new ListRead(Function.identity(), false, ranges, ranges), new ListQuery(Node.Id.NONE, Long.MIN_VALUE, false), null); } @Override diff --git a/accord-core/src/test/java/accord/impl/list/ListQuery.java b/accord-core/src/test/java/accord/impl/list/ListQuery.java index 24488101..de886b99 100644 --- a/accord-core/src/test/java/accord/impl/list/ListQuery.java +++ b/accord-core/src/test/java/accord/impl/list/ListQuery.java @@ -32,6 +32,7 @@ import accord.primitives.Keys; import accord.primitives.Seekables; import accord.primitives.Timestamp; import accord.primitives.TxnId; +import accord.utils.Invariants; import accord.utils.Timestamped; import javax.annotation.Nonnull; @@ -39,11 +40,13 @@ public class ListQuery implements Query { final Id client; final long requestId; + final boolean isEphemeralRead; - public ListQuery(Id client, long requestId) + public ListQuery(Id client, long requestId, boolean isEphemeralRead) { this.client = client; this.requestId = requestId; + this.isEphemeralRead = isEphemeralRead; } @Override @@ -59,7 +62,12 @@ public class ListQuery implements Query { int i = responseKeys.indexOf(e.getKey()); if (i >= 0) + { + Timestamp timestamp = e.getValue().timestamp; + Invariants.checkState(isEphemeralRead || timestamp.compareTo(executeAt) < 0, + "Data timestamp %s >= execute at %s", timestamp, executeAt); values[i] = e.getValue().data; + } } return new ListResult(ListResult.Status.Applied, client, requestId, txnId, read.userReadKeys, responseKeys, values, (ListUpdate) update); } diff --git a/accord-core/src/test/java/accord/impl/list/ListRead.java b/accord-core/src/test/java/accord/impl/list/ListRead.java index d6ce5f22..c9215e41 100644 --- a/accord-core/src/test/java/accord/impl/list/ListRead.java +++ b/accord-core/src/test/java/accord/impl/list/ListRead.java @@ -25,7 +25,6 @@ import java.util.function.Function; import accord.local.SafeCommandStore; import accord.primitives.Ranges; import accord.primitives.Timestamp; -import accord.utils.Invariants; import accord.utils.async.AsyncChain; import accord.utils.Timestamped; import org.slf4j.Logger; @@ -67,33 +66,30 @@ public class ListRead implements Read @Override public AsyncChain<Data> read(Seekable key, SafeCommandStore safeStore, Timestamp executeAt, DataStore store) { - // read synchronously, logically taking a snapshot, so we can impose our invariant of not reading the future ListStore s = (ListStore)store; - Ranges unavailable = safeStore.ranges().unsafeToReadAt(executeAt); - // TODO (now, correctness): move the read into the executor thread to match real impl - // There is a bug (link jira) where the stale read handle logic no longer detects and fails with the new assert below - // There is a comment early about running synchronously, but this isn't easy for different implementations so should likely - // be an optimization impl take rather than a foundational requirement... - ListData result = new ListData(); - switch (key.domain()) - { - default: throw new AssertionError(); - case Key: - if (!keys.contains((Key)key)) - throw new IllegalArgumentException("Attempted to read key " + key + " which is outside of the expected range " + keys); - Timestamped<int[]> data = s.get(unavailable, executeAt, (Key)key); - logger.trace("READ on {} at {} key:{} -> {}", s.node, executeAt, key, data); - Invariants.checkState(isEphemeralRead || data.timestamp.compareTo(executeAt) < 0, - "Data timestamp %s >= execute at %s", data.timestamp, executeAt); - result.put((Key)key, data); - break; - case Range: - if (!keys.containsAll(Ranges.single((Range)key))) - throw new IllegalArgumentException("Attempted to read range " + key + " which is outside of the expected range " + keys); - for (Map.Entry<Key, Timestamped<int[]>> e : s.get(unavailable, executeAt, (Range)key)) - result.put(e.getKey(), e.getValue()); - } - return executor.apply(safeStore.commandStore()).submit(() -> result); + logger.trace("submitting READ on {} at {} key:{}", s.node, executeAt, key); + return executor.apply(safeStore.commandStore()).submit(() -> { + Ranges unavailable = safeStore.ranges().unsafeToReadAt(executeAt); + ListData result = new ListData(); + switch (key.domain()) + { + default: throw new AssertionError(); + case Key: + if (!keys.contains((Key)key)) + throw new IllegalArgumentException("Attempted to read key " + key + " which is outside of the expected range " + keys); + Timestamped<int[]> data = s.get(unavailable, executeAt, (Key)key); + logger.trace("READ on {} at {} key:{} -> {}", s.node, executeAt, key, data); + result.put((Key)key, data); + break; + case Range: + if (!keys.containsAll(Ranges.single((Range)key))) + throw new IllegalArgumentException("Attempted to read range " + key + " which is outside of the expected range " + keys); + for (Map.Entry<Key, Timestamped<int[]>> e : s.get(unavailable, executeAt, (Range)key)) + result.put(e.getKey(), e.getValue()); + } + return result; + }); + } @Override diff --git a/accord-core/src/test/java/accord/impl/list/ListWrite.java b/accord-core/src/test/java/accord/impl/list/ListWrite.java index 1ff9e7ee..2edfa9fb 100644 --- a/accord-core/src/test/java/accord/impl/list/ListWrite.java +++ b/accord-core/src/test/java/accord/impl/list/ListWrite.java @@ -58,6 +58,7 @@ public class ListWrite extends TreeMap<Key, int[]> implements Write return Writes.SUCCESS; TimestampsForKeys.updateLastExecutionTimestamps((AbstractSafeCommandStore<?, ?, ?>) safeStore, (Key) key, executeAt, true); + logger.trace("submitting WRITE on {} at {} key:{}", s.node, executeAt, key); return executor.apply(safeStore.commandStore()).submit(() -> { int[] data = get(key); s.data.merge((Key)key, new Timestamped<>(executeAt, data, Arrays::toString), ListStore::merge); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org