This is an automated email from the ASF dual-hosted git repository.

bdeggleston pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 527094c6 Move burn test read timestamp validation from replica to 
coordination
527094c6 is described below

commit 527094c69ad23319b058917b1c8974d01c5d86e6
Author: Blake Eggleston <bl...@ultrablake.com>
AuthorDate: Fri Jun 14 10:24:43 2024 -0700

    Move burn test read timestamp validation from replica to coordination
    
    Patch by Blake Eggleston; reviewed by David Capwell for CASSANDRA-19288
---
 .../src/main/java/accord/messages/ReadData.java    | 10 +++--
 accord-core/src/test/java/accord/Utils.java        |  2 +-
 .../src/test/java/accord/burn/BurnTest.java        |  4 +-
 .../src/test/java/accord/impl/list/ListAgent.java  |  2 +-
 .../accord/impl/list/ListFetchCoordinator.java     |  2 +-
 .../src/test/java/accord/impl/list/ListQuery.java  | 10 ++++-
 .../src/test/java/accord/impl/list/ListRead.java   | 50 ++++++++++------------
 .../src/test/java/accord/impl/list/ListWrite.java  |  1 +
 8 files changed, 44 insertions(+), 37 deletions(-)

diff --git a/accord-core/src/main/java/accord/messages/ReadData.java 
b/accord-core/src/main/java/accord/messages/ReadData.java
index 37558ec6..2746b6d0 100644
--- a/accord-core/src/main/java/accord/messages/ReadData.java
+++ b/accord-core/src/main/java/accord/messages/ReadData.java
@@ -359,11 +359,13 @@ public abstract class ReadData extends 
AbstractEpochRequest<ReadData.CommitOrRea
             if (safeCommand != null) safeCommand.removeListener(this);
             waitingOn.clear(safeStore.commandStore().id());
         }
+
         // TODO (expected): efficient unsubscribe mechanism
-        node.commandStores().mapReduceConsume(this, waitingOn.stream(), 
forEach(in -> {
-            SafeCommand safeCommand = in.ifInitialised(txnId);
-            if (safeCommand != null) safeCommand.removeListener(this);
-        }, node.agent()));
+        if (waitingOn != null)
+            node.commandStores().mapReduceConsume(this, waitingOn.stream(), 
forEach(in -> {
+                SafeCommand safeCommand = in.ifInitialised(txnId);
+                if (safeCommand != null) safeCommand.removeListener(this);
+            }, node.agent()));
         state = State.OBSOLETE;
         waitingOn = null;
         reading = null;
diff --git a/accord-core/src/test/java/accord/Utils.java 
b/accord-core/src/test/java/accord/Utils.java
index 680b47d1..3cc082be 100644
--- a/accord-core/src/test/java/accord/Utils.java
+++ b/accord-core/src/test/java/accord/Utils.java
@@ -127,7 +127,7 @@ public class Utils
         for (Key k : keys)
             update.put(k, 1);
         ListRead read = new ListRead(Function.identity(), false, keys, keys);
-        ListQuery query = new ListQuery(client, keys.size());
+        ListQuery query = new ListQuery(client, keys.size(), false);
         return new Txn.InMemory(keys, read, query, update);
     }
 
diff --git a/accord-core/src/test/java/accord/burn/BurnTest.java 
b/accord-core/src/test/java/accord/burn/BurnTest.java
index 447b80b0..70e6110e 100644
--- a/accord-core/src/test/java/accord/burn/BurnTest.java
+++ b/accord-core/src/test/java/accord/burn/BurnTest.java
@@ -149,7 +149,7 @@ public class BurnTest
                         requestRanges.add(nextRange.apply(prefixes));
                     Ranges ranges = 
Ranges.of(requestRanges.toArray(EMPTY_RANGES));
                     ListRead read = new 
ListRead(random.decide(readInCommandStore) ? Function.identity() : executor, 
false, ranges, ranges);
-                    ListQuery query = new ListQuery(client, finalCount);
+                    ListQuery query = new ListQuery(client, finalCount, false);
                     return new Txn.InMemory(ranges, read, query);
                 };
             }
@@ -183,7 +183,7 @@ public class BurnTest
                     if (isWrite)
                         requestKeys.addAll(update.keySet());
                     ListRead read = new 
ListRead(random.decide(readInCommandStore) ? Function.identity() : executor, 
kind == EphemeralRead, readKeys, new Keys(requestKeys));
-                    ListQuery query = new ListQuery(client, finalCount);
+                    ListQuery query = new ListQuery(client, finalCount, kind 
== EphemeralRead);
                     return new Txn.InMemory(kind, new Keys(requestKeys), read, 
query, update);
                 };
             }
diff --git a/accord-core/src/test/java/accord/impl/list/ListAgent.java 
b/accord-core/src/test/java/accord/impl/list/ListAgent.java
index 31cb5155..343be285 100644
--- a/accord-core/src/test/java/accord/impl/list/ListAgent.java
+++ b/accord-core/src/test/java/accord/impl/list/ListAgent.java
@@ -105,7 +105,7 @@ public class ListAgent implements Agent
     @Override
     public Txn emptyTxn(Txn.Kind kind, Seekables<?, ?> keysOrRanges)
     {
-        return new Txn.InMemory(kind, keysOrRanges, new ListRead(identity(), 
false, Keys.EMPTY, Keys.EMPTY), new ListQuery(NONE, Integer.MIN_VALUE), null);
+        return new Txn.InMemory(kind, keysOrRanges, new ListRead(identity(), 
false, Keys.EMPTY, Keys.EMPTY), new ListQuery(NONE, Integer.MIN_VALUE, false), 
null);
     }
 
     public boolean collectMaxApplied()
diff --git 
a/accord-core/src/test/java/accord/impl/list/ListFetchCoordinator.java 
b/accord-core/src/test/java/accord/impl/list/ListFetchCoordinator.java
index 4e8b3e9a..9f7cc28b 100644
--- a/accord-core/src/test/java/accord/impl/list/ListFetchCoordinator.java
+++ b/accord-core/src/test/java/accord/impl/list/ListFetchCoordinator.java
@@ -53,7 +53,7 @@ public class ListFetchCoordinator extends 
AbstractFetchCoordinator
     @Override
     protected PartialTxn rangeReadTxn(Ranges ranges)
     {
-        return new PartialTxn.InMemory(ranges, Txn.Kind.Read, ranges, new 
ListRead(Function.identity(), false, ranges, ranges), new 
ListQuery(Node.Id.NONE, Long.MIN_VALUE), null);
+        return new PartialTxn.InMemory(ranges, Txn.Kind.Read, ranges, new 
ListRead(Function.identity(), false, ranges, ranges), new 
ListQuery(Node.Id.NONE, Long.MIN_VALUE, false), null);
     }
 
     @Override
diff --git a/accord-core/src/test/java/accord/impl/list/ListQuery.java 
b/accord-core/src/test/java/accord/impl/list/ListQuery.java
index 24488101..de886b99 100644
--- a/accord-core/src/test/java/accord/impl/list/ListQuery.java
+++ b/accord-core/src/test/java/accord/impl/list/ListQuery.java
@@ -32,6 +32,7 @@ import accord.primitives.Keys;
 import accord.primitives.Seekables;
 import accord.primitives.Timestamp;
 import accord.primitives.TxnId;
+import accord.utils.Invariants;
 import accord.utils.Timestamped;
 import javax.annotation.Nonnull;
 
@@ -39,11 +40,13 @@ public class ListQuery implements Query
 {
     final Id client;
     final long requestId;
+    final boolean isEphemeralRead;
 
-    public ListQuery(Id client, long requestId)
+    public ListQuery(Id client, long requestId, boolean isEphemeralRead)
     {
         this.client = client;
         this.requestId = requestId;
+        this.isEphemeralRead = isEphemeralRead;
     }
 
     @Override
@@ -59,7 +62,12 @@ public class ListQuery implements Query
         {
             int i = responseKeys.indexOf(e.getKey());
             if (i >= 0)
+            {
+                Timestamp timestamp = e.getValue().timestamp;
+                Invariants.checkState(isEphemeralRead || 
timestamp.compareTo(executeAt) < 0,
+                                      "Data timestamp %s >= execute at %s", 
timestamp, executeAt);
                 values[i] = e.getValue().data;
+            }
         }
         return new ListResult(ListResult.Status.Applied, client, requestId, 
txnId, read.userReadKeys, responseKeys, values, (ListUpdate) update);
     }
diff --git a/accord-core/src/test/java/accord/impl/list/ListRead.java 
b/accord-core/src/test/java/accord/impl/list/ListRead.java
index d6ce5f22..c9215e41 100644
--- a/accord-core/src/test/java/accord/impl/list/ListRead.java
+++ b/accord-core/src/test/java/accord/impl/list/ListRead.java
@@ -25,7 +25,6 @@ import java.util.function.Function;
 import accord.local.SafeCommandStore;
 import accord.primitives.Ranges;
 import accord.primitives.Timestamp;
-import accord.utils.Invariants;
 import accord.utils.async.AsyncChain;
 import accord.utils.Timestamped;
 import org.slf4j.Logger;
@@ -67,33 +66,30 @@ public class ListRead implements Read
     @Override
     public AsyncChain<Data> read(Seekable key, SafeCommandStore safeStore, 
Timestamp executeAt, DataStore store)
     {
-        // read synchronously, logically taking a snapshot, so we can impose 
our invariant of not reading the future
         ListStore s = (ListStore)store;
-        Ranges unavailable = safeStore.ranges().unsafeToReadAt(executeAt);
-        // TODO (now, correctness): move the read into the executor thread to 
match real impl
-        // There is a bug (link jira) where the stale read handle logic no 
longer detects and fails with the new assert below
-        // There is a comment early about running synchronously, but this 
isn't easy for different implementations so should likely
-        // be an optimization impl take rather than a foundational 
requirement...
-        ListData result = new ListData();
-        switch (key.domain())
-        {
-            default: throw new AssertionError();
-            case Key:
-                if (!keys.contains((Key)key))
-                    throw new IllegalArgumentException("Attempted to read key 
" + key + " which is outside of the expected range " + keys);
-                Timestamped<int[]> data = s.get(unavailable, executeAt, 
(Key)key);
-                logger.trace("READ on {} at {} key:{} -> {}", s.node, 
executeAt, key, data);
-                Invariants.checkState(isEphemeralRead || 
data.timestamp.compareTo(executeAt) < 0,
-                                      "Data timestamp %s >= execute at %s", 
data.timestamp, executeAt);
-                result.put((Key)key, data);
-                break;
-            case Range:
-                if (!keys.containsAll(Ranges.single((Range)key)))
-                    throw new IllegalArgumentException("Attempted to read 
range " + key + " which is outside of the expected range " + keys);
-                for (Map.Entry<Key, Timestamped<int[]>> e : s.get(unavailable, 
executeAt, (Range)key))
-                    result.put(e.getKey(), e.getValue());
-        }
-        return executor.apply(safeStore.commandStore()).submit(() -> result);
+        logger.trace("submitting READ on {} at {} key:{}", s.node, executeAt, 
key);
+        return executor.apply(safeStore.commandStore()).submit(() -> {
+            Ranges unavailable = safeStore.ranges().unsafeToReadAt(executeAt);
+            ListData result = new ListData();
+            switch (key.domain())
+            {
+                default: throw new AssertionError();
+                case Key:
+                    if (!keys.contains((Key)key))
+                        throw new IllegalArgumentException("Attempted to read 
key " + key + " which is outside of the expected range " + keys);
+                    Timestamped<int[]> data = s.get(unavailable, executeAt, 
(Key)key);
+                    logger.trace("READ on {} at {} key:{} -> {}", s.node, 
executeAt, key, data);
+                    result.put((Key)key, data);
+                    break;
+                case Range:
+                    if (!keys.containsAll(Ranges.single((Range)key)))
+                        throw new IllegalArgumentException("Attempted to read 
range " + key + " which is outside of the expected range " + keys);
+                    for (Map.Entry<Key, Timestamped<int[]>> e : 
s.get(unavailable, executeAt, (Range)key))
+                        result.put(e.getKey(), e.getValue());
+            }
+            return result;
+        });
+
     }
 
     @Override
diff --git a/accord-core/src/test/java/accord/impl/list/ListWrite.java 
b/accord-core/src/test/java/accord/impl/list/ListWrite.java
index 1ff9e7ee..2edfa9fb 100644
--- a/accord-core/src/test/java/accord/impl/list/ListWrite.java
+++ b/accord-core/src/test/java/accord/impl/list/ListWrite.java
@@ -58,6 +58,7 @@ public class ListWrite extends TreeMap<Key, int[]> implements 
Write
             return Writes.SUCCESS;
         
TimestampsForKeys.updateLastExecutionTimestamps((AbstractSafeCommandStore<?, ?, 
?>) safeStore, (Key) key, executeAt, true);
 
+        logger.trace("submitting WRITE on {} at {} key:{}", s.node, executeAt, 
key);
         return executor.apply(safeStore.commandStore()).submit(() -> {
             int[] data = get(key);
             s.data.merge((Key)key, new Timestamped<>(executeAt, data, 
Arrays::toString), ListStore::merge);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to