This is an automated email from the ASF dual-hosted git repository. dcapwell pushed a commit to branch cep-15-accord in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cep-15-accord by this push: new 10671745a2 CEP-15 (Accord) Original and recover coordinators may hit a race condition with PreApply where reads and writes are interleaved, causing one of the coordinators to see the writes from the other 10671745a2 is described below commit 10671745a254b0a7acf50310d7504896c9f2c584 Author: David Capwell <dcapw...@apache.org> AuthorDate: Fri Apr 7 15:39:42 2023 -0700 CEP-15 (Accord) Original and recover coordinators may hit a race condition with PreApply where reads and writes are interleaved, causing one of the coordinators to see the writes from the other patch by David Capwell; reviewed by Ariel Weisberg for CASSANDRA-18422 --- modules/accord | 2 +- .../service/accord/AccordCommandStore.java | 8 + .../simulator/paxos/HistoryValidatorTest.java | 225 +++++++++++++++------ .../service/accord/async/AsyncOperationTest.java | 13 +- 4 files changed, 181 insertions(+), 67 deletions(-) diff --git a/modules/accord b/modules/accord index bc81f81c75..08aaab6e33 160000 --- a/modules/accord +++ b/modules/accord @@ -1 +1 @@ -Subproject commit bc81f81c75f93c73989a30bbc51b5c241a893c1a +Subproject commit 08aaab6e33d43406e0649146144e4df67648602a diff --git a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java index 5c9f3e4e9d..70962298f4 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java +++ b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java @@ -19,6 +19,7 @@ package org.apache.cassandra.service.accord; import java.util.Map; +import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; @@ -42,6 +43,7 @@ import accord.primitives.RoutableKey; import accord.primitives.TxnId; import accord.utils.Invariants; import accord.utils.async.AsyncChain; +import accord.utils.async.AsyncChains; import org.apache.cassandra.service.accord.async.AsyncOperation; import org.apache.cassandra.utils.Clock; import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException; @@ -187,6 +189,12 @@ public class AccordCommandStore implements CommandStore return AsyncOperation.create(this, loadCtx, function); } + @Override + public <T> AsyncChain<T> submit(Callable<T> task) + { + return AsyncChains.ofCallable(executor, task); + } + public DataStore dataStore() { return dataStore; diff --git a/test/simulator/test/org/apache/cassandra/simulator/paxos/HistoryValidatorTest.java b/test/simulator/test/org/apache/cassandra/simulator/paxos/HistoryValidatorTest.java index 6c773fcca8..c9cff2891f 100644 --- a/test/simulator/test/org/apache/cassandra/simulator/paxos/HistoryValidatorTest.java +++ b/test/simulator/test/org/apache/cassandra/simulator/paxos/HistoryValidatorTest.java @@ -43,6 +43,7 @@ import com.carrotsearch.hppc.IntHashSet; import com.carrotsearch.hppc.IntIntHashMap; import com.carrotsearch.hppc.IntIntMap; import com.carrotsearch.hppc.IntSet; +import com.carrotsearch.hppc.cursors.IntCursor; import org.apache.cassandra.distributed.api.QueryResults; import org.apache.cassandra.utils.Clock; import org.assertj.core.api.AbstractThrowableAssert; @@ -281,6 +282,32 @@ public class HistoryValidatorTest ); } + private static String trim(String log, int... keys) + { + // this is deaad code, but exists to help when new validation errors are detected + // the logic will shrink the history to only contain transactions that contain the set of keys + IntSet set = new IntHashSet(); + IntStream.of(keys).forEach(set::add); + Parsed parsed = parse(log); + StringBuilder sb = new StringBuilder(); + for (Witness w : parsed.witnesses) + { + boolean match = false; + for (IntCursor pk : w.pks()) + { + if (set.contains(pk.value)) + { + match = true; + break; + } + } + if (!match) continue; + sb.append(w).append("\n"); + } + return sb.toString(); + } + + private void requiresMultiKeySupport() { Assume.assumeTrue("Validator " + factory.getClass() + " does not support multi-key", factory instanceof StrictSerializabilityValidator.Factory); @@ -356,79 +383,146 @@ public class HistoryValidatorTest return new Event(EnumSet.of(Event.Type.WRITE), pk, null); } - private void fromLog(String log) + private interface Operation { - IntSet pks = new IntHashSet(); - class Read + int pk(); + void check(HistoryValidator.Checker check); + void appendString(StringBuilder sb); + } + + private static class Read implements Operation + { + final int pk, id, count; + final int[] seq; + + Read(int pk, int id, int count, int[] seq) { - final int pk, id, count; - final int[] seq; + this.pk = pk; + this.id = id; + this.count = count; + this.seq = seq; + } - Read(int pk, int id, int count, int[] seq) - { - this.pk = pk; - this.id = id; - this.count = count; - this.seq = seq; - } + @Override + public int pk() + { + return pk; } - class Write + + @Override + public void check(HistoryValidator.Checker check) { - final int pk, id; - final boolean success; + check.read(pk, id, count, seq); + } - Write(int pk, int id, boolean success) - { - this.pk = pk; - this.id = id; - this.success = success; - } + @Override + public void appendString(StringBuilder sb) + { + sb.append("read(pk=").append(pk).append(", id=").append(id).append(", count=").append(count).append(", seq=").append(Arrays.toString(seq)).append(")\n"); } - class Witness + } + + private static class Write implements Operation + { + final int pk, id; + final boolean success; + + Write(int pk, int id, boolean success) { - final int start, end; - final List<Object> actions = new ArrayList<>(); + this.pk = pk; + this.id = id; + this.success = success; + } - Witness(int start, int end) - { - this.start = start; - this.end = end; - } + @Override + public int pk() + { + return pk; + } - void read(int pk, int id, int count, int[] seq) - { - actions.add(new Read(pk, id, count, seq)); - } + @Override + public void check(HistoryValidator.Checker check) + { + check.write(pk, id, success); + } - void write(int pk, int id, boolean success) - { - actions.add(new Write(pk, id, success)); - } + @Override + public void appendString(StringBuilder sb) + { + sb.append("write(pk=").append(pk).append(", id=").append(id).append(", success=").append(success).append(")\n"); + } + } + + private static class Witness + { + final int start, end; + final List<Operation> actions = new ArrayList<>(); + + Witness(int start, int end) + { + this.start = start; + this.end = end; + } + + void read(int pk, int id, int count, int[] seq) + { + actions.add(new Read(pk, id, count, seq)); + } - void process(HistoryValidator validator) + void write(int pk, int id, boolean success) + { + actions.add(new Write(pk, id, success)); + } + + void process(HistoryValidator validator) + { + try (HistoryValidator.Checker check = validator.witness(start, end)) { - try (HistoryValidator.Checker check = validator.witness(start, end)) - { - for (Object a : actions) - { - if (a instanceof Read) - { - Read read = (Read) a; - check.read(read.pk, read.id, read.count, read.seq); - } - else - { - Write write = (Write) a; - check.write(write.pk, write.id, write.success); - } - } - } + for (Operation a : actions) + a.check(check); } } + + IntSet pks() + { + IntSet pks = new IntHashSet(); + for (Operation action : actions) + pks.add(action.pk()); + return pks; + } + + @Override + public String toString() + { + StringBuilder sb = new StringBuilder(); + sb.append("Witness(start=").append(start).append(", end=").append(end).append(")\n"); + for (Operation a : actions) + a.appendString(sb.append('\t')); + return sb.toString(); + } + } + + private static class Parsed + { + private final int[] keys; + private final List<Witness> witnesses; + + private Parsed(int[] keys, List<Witness> witnesses) + { + this.keys = keys; + this.witnesses = witnesses; + } + } + + private static Parsed parse(String log) + { + IntSet pks = new IntHashSet(); List<Witness> witnesses = new ArrayList<>(); Witness current = null; for (String line : log.split("\n")) { + if (line.trim().isEmpty()) + continue; if (line.startsWith("Witness")) { if (current != null) @@ -468,9 +562,26 @@ public class HistoryValidatorTest witnesses.add(current); int[] keys = pks.toArray(); Arrays.sort(keys); - HistoryValidator validator = factory.create(keys); - for (Witness w : witnesses) - w.process(validator); + return new Parsed(keys, witnesses); + } + + private void fromLog(String log) + { + Parsed parsed = parse(log); + HistoryValidator validator = factory.create(parsed.keys); + for (Witness w : parsed.witnesses) + { + try + { + w.process(validator); + } + catch (HistoryViolation e) + { + HistoryViolation hv = new HistoryViolation(e.primaryKey, "Violation detected for witnessed action " + w + "; " + e.getMessage() + ";\n" + log); + hv.setStackTrace(e.getStackTrace()); + throw hv; + } + } } private static class Event diff --git a/test/unit/org/apache/cassandra/service/accord/async/AsyncOperationTest.java b/test/unit/org/apache/cassandra/service/accord/async/AsyncOperationTest.java index b76793aaf7..638b9cbede 100644 --- a/test/unit/org/apache/cassandra/service/accord/async/AsyncOperationTest.java +++ b/test/unit/org/apache/cassandra/service/accord/async/AsyncOperationTest.java @@ -42,8 +42,8 @@ import org.slf4j.LoggerFactory; import accord.api.RoutingKey; import accord.impl.SafeCommandsForKey; +import accord.local.CheckedCommands; import accord.local.Command; -import accord.local.Commands; import accord.local.PreLoadContext; import accord.local.SafeCommand; import accord.local.SafeCommandStore; @@ -194,14 +194,9 @@ public class AsyncOperationTest try { return getUninterruptibly(commandStore.submit(PreLoadContext.contextFor(Collections.singleton(txnId), partialTxn.keys()), safe -> { - Commands.AcceptOutcome result = Commands.preaccept(safe, txnId, partialTxn, route, null); - if (result != Commands.AcceptOutcome.Success) throw new IllegalStateException("Command mutation rejected: " + result); - - result = Commands.accept(safe, txnId, Ballot.ZERO, partialRoute, partialTxn.keys(), null, executeAt, deps); - if (result != Commands.AcceptOutcome.Success) throw new IllegalStateException("Command mutation rejected: " + result); - - Commands.CommitOutcome commit = Commands.commit(safe, txnId, route, null, partialTxn, executeAt, deps); - if (commit != Commands.CommitOutcome.Success) throw new IllegalStateException("Command mutation rejected: " + result); + CheckedCommands.preaccept(safe, txnId, partialTxn, route, null); + CheckedCommands.accept(safe, txnId, Ballot.ZERO, partialRoute, partialTxn.keys(), null, executeAt, deps); + CheckedCommands.commit(safe, txnId, route, null, partialTxn, executeAt, deps); // clear cache long cacheSize = commandStore.getCacheSize(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org