This is an automated email from the ASF dual-hosted git repository.

dcapwell pushed a commit to branch cep-15-accord
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cep-15-accord by this push:
     new 10671745a2 CEP-15 (Accord) Original and recover coordinators may hit a 
race condition with PreApply where reads and writes are interleaved, causing 
one of the coordinators to see the writes from the other
10671745a2 is described below

commit 10671745a254b0a7acf50310d7504896c9f2c584
Author: David Capwell <dcapw...@apache.org>
AuthorDate: Fri Apr 7 15:39:42 2023 -0700

    CEP-15 (Accord) Original and recover coordinators may hit a race condition 
with PreApply where reads and writes are interleaved, causing one of the 
coordinators to see the writes from the other
    
    patch by David Capwell; reviewed by Ariel Weisberg for CASSANDRA-18422
---
 modules/accord                                     |   2 +-
 .../service/accord/AccordCommandStore.java         |   8 +
 .../simulator/paxos/HistoryValidatorTest.java      | 225 +++++++++++++++------
 .../service/accord/async/AsyncOperationTest.java   |  13 +-
 4 files changed, 181 insertions(+), 67 deletions(-)

diff --git a/modules/accord b/modules/accord
index bc81f81c75..08aaab6e33 160000
--- a/modules/accord
+++ b/modules/accord
@@ -1 +1 @@
-Subproject commit bc81f81c75f93c73989a30bbc51b5c241a893c1a
+Subproject commit 08aaab6e33d43406e0649146144e4df67648602a
diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java 
b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
index 5c9f3e4e9d..70962298f4 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
@@ -19,6 +19,7 @@
 package org.apache.cassandra.service.accord;
 
 import java.util.Map;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -42,6 +43,7 @@ import accord.primitives.RoutableKey;
 import accord.primitives.TxnId;
 import accord.utils.Invariants;
 import accord.utils.async.AsyncChain;
+import accord.utils.async.AsyncChains;
 import org.apache.cassandra.service.accord.async.AsyncOperation;
 import org.apache.cassandra.utils.Clock;
 import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException;
@@ -187,6 +189,12 @@ public class AccordCommandStore implements CommandStore
         return AsyncOperation.create(this, loadCtx, function);
     }
 
+    @Override
+    public <T> AsyncChain<T> submit(Callable<T> task)
+    {
+        return AsyncChains.ofCallable(executor, task);
+    }
+
     public DataStore dataStore()
     {
         return dataStore;
diff --git 
a/test/simulator/test/org/apache/cassandra/simulator/paxos/HistoryValidatorTest.java
 
b/test/simulator/test/org/apache/cassandra/simulator/paxos/HistoryValidatorTest.java
index 6c773fcca8..c9cff2891f 100644
--- 
a/test/simulator/test/org/apache/cassandra/simulator/paxos/HistoryValidatorTest.java
+++ 
b/test/simulator/test/org/apache/cassandra/simulator/paxos/HistoryValidatorTest.java
@@ -43,6 +43,7 @@ import com.carrotsearch.hppc.IntHashSet;
 import com.carrotsearch.hppc.IntIntHashMap;
 import com.carrotsearch.hppc.IntIntMap;
 import com.carrotsearch.hppc.IntSet;
+import com.carrotsearch.hppc.cursors.IntCursor;
 import org.apache.cassandra.distributed.api.QueryResults;
 import org.apache.cassandra.utils.Clock;
 import org.assertj.core.api.AbstractThrowableAssert;
@@ -281,6 +282,32 @@ public class HistoryValidatorTest
         );
     }
 
+    private static String trim(String log, int... keys)
+    {
+        // this is deaad code, but exists to help when new validation errors 
are detected
+        // the logic will shrink the history to only contain transactions that 
contain the set of keys
+        IntSet set = new IntHashSet();
+        IntStream.of(keys).forEach(set::add);
+        Parsed parsed = parse(log);
+        StringBuilder sb = new StringBuilder();
+        for (Witness w : parsed.witnesses)
+        {
+            boolean match = false;
+            for (IntCursor pk : w.pks())
+            {
+                if (set.contains(pk.value))
+                {
+                    match = true;
+                    break;
+                }
+            }
+            if (!match) continue;
+            sb.append(w).append("\n");
+        }
+        return sb.toString();
+    }
+
+
     private void requiresMultiKeySupport()
     {
         Assume.assumeTrue("Validator " + factory.getClass() + " does not 
support multi-key", factory instanceof StrictSerializabilityValidator.Factory);
@@ -356,79 +383,146 @@ public class HistoryValidatorTest
         return new Event(EnumSet.of(Event.Type.WRITE), pk, null);
     }
 
-    private void fromLog(String log)
+    private interface Operation
     {
-        IntSet pks = new IntHashSet();
-        class Read
+        int pk();
+        void check(HistoryValidator.Checker check);
+        void appendString(StringBuilder sb);
+    }
+
+    private static class Read implements Operation
+    {
+        final int pk, id, count;
+        final int[] seq;
+
+        Read(int pk, int id, int count, int[] seq)
         {
-            final int pk, id, count;
-            final int[] seq;
+            this.pk = pk;
+            this.id = id;
+            this.count = count;
+            this.seq = seq;
+        }
 
-            Read(int pk, int id, int count, int[] seq)
-            {
-                this.pk = pk;
-                this.id = id;
-                this.count = count;
-                this.seq = seq;
-            }
+        @Override
+        public int pk()
+        {
+            return pk;
         }
-        class Write
+
+        @Override
+        public void check(HistoryValidator.Checker check)
         {
-            final int pk, id;
-            final boolean success;
+            check.read(pk, id, count, seq);
+        }
 
-            Write(int pk, int id, boolean success)
-            {
-                this.pk = pk;
-                this.id = id;
-                this.success = success;
-            }
+        @Override
+        public void appendString(StringBuilder sb)
+        {
+            sb.append("read(pk=").append(pk).append(", 
id=").append(id).append(", count=").append(count).append(", 
seq=").append(Arrays.toString(seq)).append(")\n");
         }
-        class Witness
+    }
+
+    private static class Write implements Operation
+    {
+        final int pk, id;
+        final boolean success;
+
+        Write(int pk, int id, boolean success)
         {
-            final int start, end;
-            final List<Object> actions = new ArrayList<>();
+            this.pk = pk;
+            this.id = id;
+            this.success = success;
+        }
 
-            Witness(int start, int end)
-            {
-                this.start = start;
-                this.end = end;
-            }
+        @Override
+        public int pk()
+        {
+            return pk;
+        }
 
-            void read(int pk, int id, int count, int[] seq)
-            {
-                actions.add(new Read(pk, id, count, seq));
-            }
+        @Override
+        public void check(HistoryValidator.Checker check)
+        {
+            check.write(pk, id, success);
+        }
 
-            void write(int pk, int id, boolean success)
-            {
-                actions.add(new Write(pk, id, success));
-            }
+        @Override
+        public void appendString(StringBuilder sb)
+        {
+            sb.append("write(pk=").append(pk).append(", 
id=").append(id).append(", success=").append(success).append(")\n");
+        }
+    }
+
+    private static class Witness
+    {
+        final int start, end;
+        final List<Operation> actions = new ArrayList<>();
+
+        Witness(int start, int end)
+        {
+            this.start = start;
+            this.end = end;
+        }
+
+        void read(int pk, int id, int count, int[] seq)
+        {
+            actions.add(new Read(pk, id, count, seq));
+        }
 
-            void process(HistoryValidator validator)
+        void write(int pk, int id, boolean success)
+        {
+            actions.add(new Write(pk, id, success));
+        }
+
+        void process(HistoryValidator validator)
+        {
+            try (HistoryValidator.Checker check = validator.witness(start, 
end))
             {
-                try (HistoryValidator.Checker check = validator.witness(start, 
end))
-                {
-                    for (Object a : actions)
-                    {
-                        if (a instanceof Read)
-                        {
-                            Read read = (Read) a;
-                            check.read(read.pk, read.id, read.count, read.seq);
-                        }
-                        else
-                        {
-                            Write write = (Write) a;
-                            check.write(write.pk, write.id, write.success);
-                        }
-                    }
-                }
+                for (Operation a : actions)
+                    a.check(check);
             }
         }
+
+        IntSet pks()
+        {
+            IntSet pks = new IntHashSet();
+            for (Operation action : actions)
+                pks.add(action.pk());
+            return pks;
+        }
+
+        @Override
+        public String toString()
+        {
+            StringBuilder sb = new StringBuilder();
+            sb.append("Witness(start=").append(start).append(", 
end=").append(end).append(")\n");
+            for (Operation a : actions)
+                a.appendString(sb.append('\t'));
+            return sb.toString();
+        }
+    }
+
+    private static class Parsed
+    {
+        private final int[] keys;
+        private final List<Witness> witnesses;
+
+        private Parsed(int[] keys, List<Witness> witnesses)
+        {
+            this.keys = keys;
+            this.witnesses = witnesses;
+        }
+    }
+
+    private static Parsed parse(String log)
+    {
+        IntSet pks = new IntHashSet();
         List<Witness> witnesses = new ArrayList<>();
         Witness current = null;
         for (String line : log.split("\n"))
         {
+            if (line.trim().isEmpty())
+                continue;
             if (line.startsWith("Witness"))
             {
                 if (current != null)
@@ -468,9 +562,26 @@ public class HistoryValidatorTest
             witnesses.add(current);
         int[] keys = pks.toArray();
         Arrays.sort(keys);
-        HistoryValidator validator = factory.create(keys);
-        for (Witness w : witnesses)
-            w.process(validator);
+        return new Parsed(keys, witnesses);
+    }
+
+    private void fromLog(String log)
+    {
+        Parsed parsed = parse(log);
+        HistoryValidator validator = factory.create(parsed.keys);
+        for (Witness w : parsed.witnesses)
+        {
+            try
+            {
+                w.process(validator);
+            }
+            catch (HistoryViolation e)
+            {
+                HistoryViolation hv = new HistoryViolation(e.primaryKey, 
"Violation detected for witnessed action " + w + "; " + e.getMessage() + ";\n" 
+ log);
+                hv.setStackTrace(e.getStackTrace());
+                throw hv;
+            }
+        }
     }
 
     private static class Event
diff --git 
a/test/unit/org/apache/cassandra/service/accord/async/AsyncOperationTest.java 
b/test/unit/org/apache/cassandra/service/accord/async/AsyncOperationTest.java
index b76793aaf7..638b9cbede 100644
--- 
a/test/unit/org/apache/cassandra/service/accord/async/AsyncOperationTest.java
+++ 
b/test/unit/org/apache/cassandra/service/accord/async/AsyncOperationTest.java
@@ -42,8 +42,8 @@ import org.slf4j.LoggerFactory;
 
 import accord.api.RoutingKey;
 import accord.impl.SafeCommandsForKey;
+import accord.local.CheckedCommands;
 import accord.local.Command;
-import accord.local.Commands;
 import accord.local.PreLoadContext;
 import accord.local.SafeCommand;
 import accord.local.SafeCommandStore;
@@ -194,14 +194,9 @@ public class AsyncOperationTest
         try
         {
             return 
getUninterruptibly(commandStore.submit(PreLoadContext.contextFor(Collections.singleton(txnId),
 partialTxn.keys()), safe -> {
-                Commands.AcceptOutcome result = Commands.preaccept(safe, 
txnId, partialTxn, route, null);
-                if (result != Commands.AcceptOutcome.Success) throw new 
IllegalStateException("Command mutation rejected: " + result);
-
-                result = Commands.accept(safe, txnId, Ballot.ZERO, 
partialRoute, partialTxn.keys(), null, executeAt, deps);
-                if (result != Commands.AcceptOutcome.Success) throw new 
IllegalStateException("Command mutation rejected: " + result);
-
-                Commands.CommitOutcome commit = Commands.commit(safe, txnId, 
route, null, partialTxn, executeAt, deps);
-                if (commit != Commands.CommitOutcome.Success) throw new 
IllegalStateException("Command mutation rejected: " + result);
+                CheckedCommands.preaccept(safe, txnId, partialTxn, route, 
null);
+                CheckedCommands.accept(safe, txnId, Ballot.ZERO, partialRoute, 
partialTxn.keys(), null, executeAt, deps);
+                CheckedCommands.commit(safe, txnId, route, null, partialTxn, 
executeAt, deps);
 
                 // clear cache
                 long cacheSize = commandStore.getCacheSize();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to