This is an automated email from the ASF dual-hosted git repository. dcapwell pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git
The following commit(s) were added to refs/heads/trunk by this push: new 11ced982 Add Jepsen's Elle to Accord and Paxos validation (#67) 11ced982 is described below commit 11ced982a6e78810268677b4b6aeed90bc06e25b Author: dcapwell <dcapw...@apache.org> AuthorDate: Tue Nov 7 10:15:19 2023 -0800 Add Jepsen's Elle to Accord and Paxos validation (#67) patch by David Capwell, Jaroslaw Kijanowski; reviewed by Caleb Rackliffe for CASSANDRA-18874 --- accord-core/build.gradle | 10 + .../src/test/java/accord/burn/BurnTest.java | 42 ++- .../test/java/accord/verify/CompositeVerifier.java | 81 +++++ .../src/test/java/accord/verify/ElleVerifier.java | 381 +++++++++++++++++++++ .../test/java/accord/verify/ElleVerifierTest.java | 138 ++++++++ .../verify/StrictSerializabilityVerifier.java | 29 +- .../src/test/java/accord/verify/Verifier.java | 36 ++ 7 files changed, 701 insertions(+), 16 deletions(-) diff --git a/accord-core/build.gradle b/accord-core/build.gradle index df6203b9..db395aaf 100644 --- a/accord-core/build.gradle +++ b/accord-core/build.gradle @@ -36,6 +36,16 @@ dependencies { implementation 'org.agrona:agrona:1.17.1' testImplementation group: 'org.assertj', name: 'assertj-core', version: '3.24.2' + testImplementation 'org.clojure:clojure:1.11.1' + testImplementation 'elle:elle:0.1.7' + // for some reason this isn't pulled in properly? Have to be explicit + testImplementation('spootnik:unilog:0.7.31') { + exclude group: 'ch.qos.logback' + + exclude group: 'org.slf4j', module: 'slf4j-api' + exclude group: 'org.slf4j', module: 'log4j-over-slf4j' + exclude group: 'org.slf4j', module: 'jcl-over-slf4j' + } } task burn(type: JavaExec) { diff --git a/accord-core/src/test/java/accord/burn/BurnTest.java b/accord-core/src/test/java/accord/burn/BurnTest.java index 5fa84f75..b147efb7 100644 --- a/accord-core/src/test/java/accord/burn/BurnTest.java +++ b/accord-core/src/test/java/accord/burn/BurnTest.java @@ -42,6 +42,10 @@ import java.util.function.Supplier; import accord.burn.random.FrequentLargeRange; import accord.impl.MessageListener; +import accord.verify.CompositeVerifier; +import accord.verify.ElleVerifier; +import accord.verify.StrictSerializabilityVerifier; +import accord.verify.Verifier; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; import org.slf4j.Logger; @@ -73,7 +77,6 @@ import accord.primitives.Txn; import accord.utils.DefaultRandom; import accord.utils.RandomSource; import accord.utils.async.AsyncExecutor; -import accord.verify.StrictSerializabilityVerifier; import static accord.impl.IntHashKey.forHash; import static accord.utils.Utils.toArray; @@ -212,8 +215,9 @@ public class BurnTest .asLongSupplier(forked); }; - StrictSerializabilityVerifier strictSerializable = new StrictSerializabilityVerifier(keyCount); + Verifier verifier = createVerifier(keyCount); SimulatedDelayedExecutorService globalExecutor = new SimulatedDelayedExecutorService(queue, agent); + Function<CommandStore, AsyncExecutor> executor = ignore -> globalExecutor; MessageListener listener = MessageListener.get(); @@ -280,23 +284,22 @@ public class BurnTest } acks.incrementAndGet(); - strictSerializable.begin(); - - for (int i = 0 ; i < reply.read.length ; ++i) + try (Verifier.Checker check = verifier.witness(start, end)) { - Key key = reply.responseKeys.get(i); - int k = key(key); + for (int i = 0 ; i < reply.read.length ; ++i) + { + Key key = reply.responseKeys.get(i); + int k = key(key); - int[] read = reply.read[i]; - int write = reply.update == null ? -1 : reply.update.getOrDefault(key, -1); + int[] read = reply.read[i]; + int write = reply.update == null ? -1 : reply.update.getOrDefault(key, -1); - if (read != null) - strictSerializable.witnessRead(k, read); - if (write >= 0) - strictSerializable.witnessWrite(k, write); + if (read != null) + check.read(k, read); + if (write >= 0) + check.write(k, write); + } } - - strictSerializable.apply(start, end); } catch (Throwable t) { @@ -313,6 +316,7 @@ public class BurnTest topologyFactory, initialRequests::poll, onSubmitted::set ); + verifier.close(); } catch (Throwable t) { @@ -343,6 +347,14 @@ public class BurnTest } } + private static Verifier createVerifier(int keyCount) + { + if (!ElleVerifier.Support.allowed()) + return new StrictSerializabilityVerifier(keyCount); + return CompositeVerifier.create(new StrictSerializabilityVerifier(keyCount), + new ElleVerifier()); + } + public static void main(String[] args) { int count = 1; diff --git a/accord-core/src/test/java/accord/verify/CompositeVerifier.java b/accord-core/src/test/java/accord/verify/CompositeVerifier.java new file mode 100644 index 00000000..09045ab5 --- /dev/null +++ b/accord-core/src/test/java/accord/verify/CompositeVerifier.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package accord.verify; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +public class CompositeVerifier implements Verifier +{ + private final List<Verifier> delegates; + + private CompositeVerifier(List<Verifier> delegates) + { + this.delegates = delegates; + } + + public static Verifier create(Verifier... verifiers) + { + return create(Arrays.asList(verifiers)); + } + + public static Verifier create(List<Verifier> verifiers) + { + switch (verifiers.size()) + { + case 0: throw new IllegalArgumentException("Unable to create Verifier from nothing"); + case 1: return verifiers.get(0); + default: return new CompositeVerifier(verifiers); + } + } + + @Override + public Checker witness(int start, int end) + { + List<Checker> sub = new ArrayList<>(delegates.size()); + delegates.forEach(v -> sub.add(v.witness(start, end))); + return new Checker() + { + @Override + public void read(int index, int[] seq) + { + sub.forEach(c -> c.read(index, seq)); + } + + @Override + public void write(int index, int value) + { + sub.forEach(c -> c.write(index, value)); + } + + @Override + public void close() + { + sub.forEach(Checker::close); + } + }; + } + + @Override + public void close() + { + delegates.forEach(Verifier::close); + } +} diff --git a/accord-core/src/test/java/accord/verify/ElleVerifier.java b/accord-core/src/test/java/accord/verify/ElleVerifier.java new file mode 100644 index 00000000..c4b228ec --- /dev/null +++ b/accord-core/src/test/java/accord/verify/ElleVerifier.java @@ -0,0 +1,381 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package accord.verify; + +import clojure.java.api.Clojure; +import clojure.lang.ArraySeq; +import clojure.lang.IFn; +import clojure.lang.IMapEntry; +import clojure.lang.IPersistentCollection; +import clojure.lang.IPersistentMap; +import clojure.lang.ISeq; +import clojure.lang.IteratorSeq; +import clojure.lang.Keyword; +import clojure.lang.PersistentArrayMap; +import clojure.lang.PersistentVector; +import clojure.lang.RT; +import com.google.common.base.StandardSystemProperty; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Sets; + +import javax.annotation.Nullable; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.RandomAccess; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +public class ElleVerifier implements Verifier +{ + public static class Support + { + public static boolean allowed() + { + // Elle only works on JDK 11 + int jdkVersion = Integer.parseInt(StandardSystemProperty.JAVA_VERSION.value().split("\\.")[0]); + return !(jdkVersion == 1 /* 1.8 */ || jdkVersion == 8); + } + } + + // In order to build the jepsen history, we need the full history... so must buffer everything + private final List<Event> events = new ArrayList<>(); + + @Override + public Checker witness(int start, int end) + { + List<Action> invoked = new ArrayList<>(); + List<Action> witnessed = new ArrayList<>(); + return new Checker() + { + @Override + public void read(int index, int[] seq) + { + invoked.add(new Read(index, null)); + witnessed.add(new Read(index, seq)); + } + + @Override + public void write(int index, int value) + { + Append e = new Append(index, value); + invoked.add(e); + witnessed.add(e); + } + + @Override + public void close() + { + // When a range read is performed, if the result was no matching keys then history isn't clear. + // Since StrictSerializabilityVerifier uses indexes and not pk values, it is not possible to find expected keys and putting empty result for them... + if (witnessed.isEmpty()) + return; + events.add(new Event(start, Event.Type.invoke, start, invoked)); + events.add(new Event(start, Event.Type.ok, end, witnessed)); + } + }; + } + + @Override + public void close() + { + if (events.isEmpty()) + throw new IllegalArgumentException("No events seen"); + // invoke and ok are mixed together in order, but there could be time gaps, so order based off time... + events.sort(Comparator.comparingLong(a -> a.time)); + + Object eventHistory = Clj.history.invoke(Event.toClojure(events)); + events.clear(); + PersistentArrayMap result = (PersistentArrayMap) Clj.check.invoke(Clj.elleListAppendOps, eventHistory); + Object isValid = result.get(Keys.valid); + if (isValid == Boolean.TRUE) + return; + if (isValid == Keys.unknown) + { + // Elle couldn't figure out if the history is bad or not... why? + Object anomalyTypes = result.get(Keys.anomalyTypes); + if (anomalyTypes != null) + { + ArraySeq seq = (ArraySeq) anomalyTypes; + if (!seq.isEmpty()) + { + for (Object type : seq) + { + if (type == Keys.emptyTransactionGraph) + continue; // nothing to see here + throw new AssertionError("Unexpected anomaly type detected: " + type); + } + return; // all good + } + } + } + throw new HistoryViolation(-1, "Violation detected: " + result); + } + + private static abstract class Action extends java.util.AbstractList<Object> implements RandomAccess + { + enum Type + { + append, r; + + final Keyword keyword; + + Type() + { + keyword = RT.keyword(null, name()); + } + } + private final Action.Type type; + private final int key; + private final Object value; + + protected Action(Action.Type type, int key, @Nullable Object value) + { + this.type = type; + this.key = key; + this.value = value; + } + + @Override + public Object get(int index) + { + switch (index) + { + case 0: + return type.keyword; + case 1: + return key; + case 2: + if (value != null) + return value; + default: + throw new IndexOutOfBoundsException(); + } + } + + @Override + public int size() + { + return value == null ? 2 : 3; + } + } + + private static class Read extends Action + { + protected Read(int key, int[] seq) + { + // TODO (optimization): rather than vector of boxed int, can we use the interfaces so we can stay primitive array? + super(Type.r, key, seq == null ? null : PersistentVector.create(IntStream.of(seq).boxed().collect(Collectors.toList()))); + } + } + + private static class Append extends Action + { + protected Append(int key, int value) + { + super(Type.append, key, value); + } + } + + private static class Event extends ObjectPersistentMap + { + enum Type + { + invoke, ok, fail; // info is left out as burn test does not have access to the original request, so can't populate an "invoke" event + + final Keyword keyword; + + Type() + { + keyword = RT.keyword(null, name()); + } + } + + private final int process; + private final Event.Type type; + private final List<Action> actions; + private final long time; + private long index = -1; + + private Event(int process, Type type, long time, List<Action> actions) + { + super(Keys.eventKeys); + this.process = process; + this.type = type; + this.actions = actions; + this.time = time; + } + + public static Object toClojure(List<Event> events) + { + return PersistentVector.create(events); + } + + @Override + public boolean containsKey(Object key) + { + if (key == Keys.index) + return index != -1; + return super.containsKey(key); + } + + @Override + public Object valAt(Object key, Object notFound) + { + if (key == Keys.process) return process; + else if (key == Keys.index) return index == -1 ? notFound : index; + else if (key == Keys.time) return time; + else if (key == Keys.type) return type.keyword; + else if (key == Keys.value) return actions; + return notFound; + } + + @Override + public IPersistentMap assoc(Object key, Object val) + { + if (key == Keys.index) + index = ((Long) val).longValue(); + else + throw new UnsupportedOperationException("Unable to update key " + key); + return this; + } + } + + private static class Keys + { + // event keys + private final static Keyword process = RT.keyword(null, "process"); + private final static Keyword index = RT.keyword(null, "index"); + private final static Keyword time = RT.keyword(null, "time"); + private final static Keyword type = RT.keyword(null, "type"); + private final static Keyword value = RT.keyword(null, "value"); + + // elle check results + private final static Keyword valid = RT.keyword(null, "valid?"); + private final static Keyword unknown = RT.keyword(null, "unknown"); + private final static Keyword anomalyTypes = RT.keyword(null, "anomaly-types"); + private final static Keyword emptyTransactionGraph = RT.keyword(null, "empty-transaction-graph"); + + private static final Set<Keyword> eventKeys = ImmutableSet.of(Keys.process, Keys.time, Keys.type, Keys.value); + } + + private static class Clj + { + static + { + IFn require = Clojure.var("clojure.core", "require"); + require.invoke(Clojure.read("elle.list-append")); + require.invoke(Clojure.read("jepsen.history")); + } + + private static final IFn check = Clojure.var("elle.list-append", "check"); + private static final IFn history = Clojure.var("jepsen.history", "history"); + private static final Object elleListAppendOps = Clojure.read("{:consistency-models [:strict-serializable]}"); + } + + private static abstract class ObjectPersistentMap implements clojure.lang.IPersistentMap + { + private Set<Keyword> keys; + + private ObjectPersistentMap(Set<Keyword> keys) + { + this.keys = keys; + } + + @Override + public boolean containsKey(Object key) + { + if (!(key instanceof Keyword)) + throw new AssertionError(String.format("Unexpected key %s; type %s", key, key == null ? null : key.getClass())); + return keys.contains(key); + } + + @Override + public IMapEntry entryAt(Object key) + { + throw new UnsupportedOperationException(); + } + + @Override + public IPersistentMap assoc(Object key, Object val) + { + throw new UnsupportedOperationException(); + } + + @Override + public IPersistentMap assocEx(Object key, Object val) + { + throw new UnsupportedOperationException(); + } + + @Override + public IPersistentMap without(Object key) + { + keys = Sets.filter(keys, k -> !k.equals(key)); + return this; + } + + @Override + public Object valAt(Object key) + { + return valAt(key, null); + } + + @Override + public abstract Object valAt(Object key, Object notFound); + + @Override + public int count() + { + return keys.size(); + } + + @Override + public IPersistentCollection cons(Object o) + { + throw new UnsupportedOperationException(); + } + + @Override + public IPersistentCollection empty() + { + throw new UnsupportedOperationException(); + } + + @Override + public boolean equiv(Object o) + { + throw new UnsupportedOperationException(); + } + + @Override + public ISeq seq() + { + return IteratorSeq.create(iterator()); + } + + @Override + public Iterator iterator() + { + return keys.iterator(); + } + } +} diff --git a/accord-core/src/test/java/accord/verify/ElleVerifierTest.java b/accord-core/src/test/java/accord/verify/ElleVerifierTest.java new file mode 100644 index 00000000..8d600a08 --- /dev/null +++ b/accord-core/src/test/java/accord/verify/ElleVerifierTest.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package accord.verify; + +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Assumptions; +import org.junit.jupiter.api.Test; + +class ElleVerifierTest +{ + @Test + void simple() + { + Assumptions.assumeTrue(ElleVerifier.Support.allowed(), "Elle doesn't support JDK 8"); + + ElleVerifier verifier = new ElleVerifier(); + try (Verifier.Checker checker = verifier.witness(0, 1)) + { + checker.write(0, 1); + } + + try (Verifier.Checker checker = verifier.witness(2, 3)) + { + checker.read(0, new int[]{1}); + } + verifier.close(); + } + + @Test + void largerHistory() + { + Assumptions.assumeTrue(ElleVerifier.Support.allowed(), "Elle doesn't support JDK 8"); + + ElleVerifier verifier = new ElleVerifier(); + + int id = 0; + try (Verifier.Checker checker = verifier.witness(id++, 63)) + { + checker.read(5, new int[]{}); + checker.write(5, 1); + checker.read(6, new int[]{}); + checker.write(6, 2); + checker.read(8, new int[]{}); + } + try (Verifier.Checker checker = verifier.witness(id++, 75)) + { + checker.read(6, new int[]{2}); + } + try (Verifier.Checker checker = verifier.witness(id++, 79)) + { + checker.read(5, new int[]{1}); + } + try (Verifier.Checker checker = verifier.witness(id++, 83)) + { + checker.read(6, new int[]{2}); + checker.read(7, new int[]{}); + } + try (Verifier.Checker checker = verifier.witness(id++, 92)) + { + checker.read(4, new int[]{}); + checker.write(4, 2); + } + try (Verifier.Checker checker = verifier.witness(id++, 94)) + { + checker.read(5, new int[]{1}); + checker.read(4, new int[]{2}); + checker.write(4, 3); + checker.read(8, new int[]{}); + checker.write(8, 2); + } + try (Verifier.Checker checker = verifier.witness(id++, 94)) + { + checker.read(4, new int[]{2, 3}); + checker.read(7, new int[]{}); + checker.write(7, 2); + checker.read(8, new int[]{2}); + } + verifier.close(); + } + + @Test + void readOnly() + { + Assumptions.assumeTrue(ElleVerifier.Support.allowed(), "Elle doesn't support JDK 8"); + ElleVerifier verifier = new ElleVerifier(); + try (Verifier.Checker checker = verifier.witness(3, 63)) + { + checker.read(6, new int[]{}); + } + try (Verifier.Checker checker = verifier.witness(60, 64)) + { + checker.read(4, new int[]{}); + } + verifier.close(); + } + + @Test + void badHistory() + { + Assumptions.assumeTrue(ElleVerifier.Support.allowed(), "Elle doesn't support JDK 8"); + + ElleVerifier verifier = new ElleVerifier(); + try (Verifier.Checker checker = verifier.witness(0, 1)) + { + checker.write(0, 1); + } + + try (Verifier.Checker checker = verifier.witness(2, 3)) + { + checker.read(0, new int[] {1, 2}); + } + + try (Verifier.Checker checker = verifier.witness(4, 5)) + { + checker.write(0, 2); + } + Assertions.assertThatThrownBy(() -> verifier.close()) + .isInstanceOf(HistoryViolation.class) + .hasMessageContaining(":anomalies") + .hasMessageContaining(":G1c-realtime"); + } +} \ No newline at end of file diff --git a/accord-core/src/test/java/accord/verify/StrictSerializabilityVerifier.java b/accord-core/src/test/java/accord/verify/StrictSerializabilityVerifier.java index 8ae2b895..738486e9 100644 --- a/accord-core/src/test/java/accord/verify/StrictSerializabilityVerifier.java +++ b/accord-core/src/test/java/accord/verify/StrictSerializabilityVerifier.java @@ -32,6 +32,7 @@ import java.util.stream.IntStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import static java.util.stream.Collectors.joining; /** @@ -52,7 +53,7 @@ import static java.util.stream.Collectors.joining; * <p> * TODO (low priority): find and report a path when we encounter a violation */ -public class StrictSerializabilityVerifier +public class StrictSerializabilityVerifier implements Verifier { private static final Logger logger = LoggerFactory.getLogger(StrictSerializabilityVerifier.class); @@ -775,6 +776,32 @@ public class StrictSerializabilityVerifier Arrays.fill(bufUnknownSteps, null); } + @Override + public Checker witness(int start, int end) + { + begin(); + return new Checker() + { + @Override + public void read(int index, int[] seq) + { + witnessRead(index, seq); + } + + @Override + public void write(int index, int value) + { + witnessWrite(index, value); + } + + @Override + public void close() + { + apply(start, end); + } + }; + } + /** * Buffer a new read observation. * <p> diff --git a/accord-core/src/test/java/accord/verify/Verifier.java b/accord-core/src/test/java/accord/verify/Verifier.java new file mode 100644 index 00000000..522d92b3 --- /dev/null +++ b/accord-core/src/test/java/accord/verify/Verifier.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package accord.verify; + +public interface Verifier extends AutoCloseable +{ + Checker witness(int start, int end); + + interface Checker extends AutoCloseable + { + void read(int index, int[] seq); + void write(int index, int value); + + @Override + default void close() {} + } + + @Override + default void close() {} +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org