This is an automated email from the ASF dual-hosted git repository. dcapwell pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git
The following commit(s) were added to refs/heads/trunk by this push: new b1befa3c Add support for repair coordinator to retry messages that timeout (#68) b1befa3c is described below commit b1befa3cc0a8496451bb48ec3bb1c0f56b8c7653 Author: dcapwell <dcapw...@apache.org> AuthorDate: Fri Sep 29 15:16:00 2023 -0700 Add support for repair coordinator to retry messages that timeout (#68) patch by David Capwell; reviewed by Caleb Rackliffe for CASSANDRA-18816 --- .../src/main/java/accord/utils/DefaultRandom.java | 11 +-- .../src/main/java/accord/utils/RandomSource.java | 5 ++ .../accord/burn/random/FrequentLargeRange.java | 21 ++++- ...mRangeTest.java => FrequentLargeRangeTest.java} | 6 +- .../src/test/java/accord/impl/basic/NodeSink.java | 4 +- .../src/test/java/accord/utils/GenTest.java | 2 +- accord-core/src/test/java/accord/utils/Gens.java | 8 +- .../src/test/java/accord/utils/Property.java | 98 +++++++++++++++++++++- 8 files changed, 131 insertions(+), 24 deletions(-) diff --git a/accord-core/src/main/java/accord/utils/DefaultRandom.java b/accord-core/src/main/java/accord/utils/DefaultRandom.java index 8efff223..5d00da38 100644 --- a/accord-core/src/main/java/accord/utils/DefaultRandom.java +++ b/accord-core/src/main/java/accord/utils/DefaultRandom.java @@ -20,25 +20,20 @@ package accord.utils; import java.util.Random; -public class DefaultRandom extends Random implements RandomSource +public class DefaultRandom extends WrappedRandomSource { public DefaultRandom() { + super(new Random()); } public DefaultRandom(long seed) { - super(seed); + super(new Random(seed)); } @Override public DefaultRandom fork() { return new DefaultRandom(nextLong()); } - - @Override - public Random asJdkRandom() - { - return this; - } } diff --git a/accord-core/src/main/java/accord/utils/RandomSource.java b/accord-core/src/main/java/accord/utils/RandomSource.java index da5033a5..3d4861e5 100644 --- a/accord-core/src/main/java/accord/utils/RandomSource.java +++ b/accord-core/src/main/java/accord/utils/RandomSource.java @@ -248,6 +248,11 @@ public interface RandomSource } } + default <T> T pick(T[] array) + { + return array[nextInt(array.length)]; + } + default <T> T pick(List<T> values) { return pick(values, 0, values.size()); diff --git a/accord-core/src/test/java/accord/burn/random/FrequentLargeRange.java b/accord-core/src/test/java/accord/burn/random/FrequentLargeRange.java index 41cae465..7c77b972 100644 --- a/accord-core/src/test/java/accord/burn/random/FrequentLargeRange.java +++ b/accord-core/src/test/java/accord/burn/random/FrequentLargeRange.java @@ -31,11 +31,11 @@ public class FrequentLargeRange implements LongGen private final LongGen small, large; private final Gen<Boolean> runs; - public FrequentLargeRange(LongGen small, LongGen large, double ratio) + public FrequentLargeRange(LongGen small, LongGen large, double ratio, int maxRuns) { this.small = small; this.large = large; - this.runs = Gens.bools().biasedRepeatingRuns(ratio); + this.runs = Gens.bools().biasedRepeatingRuns(ratio, maxRuns); } @Override @@ -54,6 +54,7 @@ public class FrequentLargeRange implements LongGen { private final RandomSource random; private Double ratio; + private Integer maxRuns; private LongGen small, large; public Builder(RandomSource random) @@ -73,6 +74,18 @@ public class FrequentLargeRange implements LongGen return this; } + public Builder maxRuns(int maxRuns) + { + this.maxRuns = maxRuns; + return this; + } + + public Builder maxRuns(int min, int max) + { + this.maxRuns = random.nextInt(min, max); + return this; + } + public Builder small(Duration min, Duration max) { small = create(min, max); @@ -132,7 +145,9 @@ public class FrequentLargeRange implements LongGen throw new IllegalStateException("Large range undefined"); if (ratio == null) ratio(1, 11); - return new FrequentLargeRange(small, large, ratio); + if (maxRuns == null) + maxRuns(3, 15); + return new FrequentLargeRange(small, large, ratio, maxRuns); } } } diff --git a/accord-core/src/test/java/accord/burn/random/SegmentedRandomRangeTest.java b/accord-core/src/test/java/accord/burn/random/FrequentLargeRangeTest.java similarity index 97% rename from accord-core/src/test/java/accord/burn/random/SegmentedRandomRangeTest.java rename to accord-core/src/test/java/accord/burn/random/FrequentLargeRangeTest.java index b16cdecd..ed838bb6 100644 --- a/accord-core/src/test/java/accord/burn/random/SegmentedRandomRangeTest.java +++ b/accord-core/src/test/java/accord/burn/random/FrequentLargeRangeTest.java @@ -30,7 +30,7 @@ import java.util.stream.LongStream; import static accord.utils.Property.qt; -class SegmentedRandomRangeTest +class FrequentLargeRangeTest { enum Type { @@ -107,13 +107,13 @@ class SegmentedRandomRangeTest int largeRatio = ratio.next(rs); return new TestCase(0, ints[0], ints[1], ints[2], largeRatio, typeGen.next(rs)); }; - qt().forAll(Gens.random(), test).check(SegmentedRandomRangeTest::test); + qt().forAll(Gens.random(), test).check(FrequentLargeRangeTest::test); } private static void test(RandomSource rs, TestCase tc) { double ratio = tc.ratio(); - FrequentLargeRange period = new FrequentLargeRange(tc.min(rs), tc.max(rs), ratio); + FrequentLargeRange period = new FrequentLargeRange(tc.min(rs), tc.max(rs), ratio, tc.largeRatio); int numSamples = 1000; int maxResamples = 1000; diff --git a/accord-core/src/test/java/accord/impl/basic/NodeSink.java b/accord-core/src/test/java/accord/impl/basic/NodeSink.java index a19d6964..ccec87f9 100644 --- a/accord-core/src/test/java/accord/impl/basic/NodeSink.java +++ b/accord-core/src/test/java/accord/impl/basic/NodeSink.java @@ -179,8 +179,8 @@ public class NodeSink implements MessageSink private Supplier<Action> actions() { - Gen<Boolean> drops = Gens.bools().biasedRepeatingRuns(0.01); - Gen<Boolean> failures = Gens.bools().biasedRepeatingRuns(0.01); + Gen<Boolean> drops = Gens.bools().biasedRepeatingRuns(0.01, random.nextInt(3, 15)); + Gen<Boolean> failures = Gens.bools().biasedRepeatingRuns(0.01, random.nextInt(3, 15)); Gen<Action> actionGen = rs -> { if (drops.next(rs)) return Action.DROP; diff --git a/accord-core/src/test/java/accord/utils/GenTest.java b/accord-core/src/test/java/accord/utils/GenTest.java index 7a046694..8d4ebd72 100644 --- a/accord-core/src/test/java/accord/utils/GenTest.java +++ b/accord-core/src/test/java/accord/utils/GenTest.java @@ -141,7 +141,7 @@ public class GenTest { { double ratio = 0.0625; int samples = 1000; - Gen<Runs> gen = Gens.lists(Gens.bools().biasedRepeatingRuns(ratio)).ofSize(samples).map(Runs::new); + Gen<Runs> gen = Gens.lists(Gens.bools().biasedRepeatingRuns(ratio, 15)).ofSize(samples).map(Runs::new); qt().forAll(gen).check(runs -> { assertThat(IntStream.of(runs.runs).filter(i -> i > 5).toArray()).isNotEmpty(); assertThat(runs.counts.get(true) / 1000.0).isBetween(ratio * .5, 0.1); diff --git a/accord-core/src/test/java/accord/utils/Gens.java b/accord-core/src/test/java/accord/utils/Gens.java index 244cd645..741efd18 100644 --- a/accord-core/src/test/java/accord/utils/Gens.java +++ b/accord-core/src/test/java/accord/utils/Gens.java @@ -34,7 +34,6 @@ import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; - public class Gens { private Gens() { } @@ -170,10 +169,9 @@ public class Gens { return RandomSource::nextBoolean; } - public Gen<Boolean> biasedRepeatingRuns(double ratio) + public Gen<Boolean> biasedRepeatingRuns(double ratio, int maxRuns) { Invariants.checkArgument(ratio > 0 && ratio <= 1, "Expected %d to be larger than 0 and <= 1", ratio); - int steps = (int) (1 / ratio); double lower = ratio * .8; double upper = ratio * 1.2; return new Gen<Boolean>() { @@ -204,7 +202,7 @@ public class Gens { } if (rs.decide(ratio)) { - run = rs.nextInt(steps); + run = rs.nextInt(maxRuns); run--; trueCount++; return true; @@ -281,7 +279,7 @@ public class Gens { return pick(values); } } - + public static class StringDSL { public Gen<String> of(Gen.IntGen sizes, char[] domain) diff --git a/accord-core/src/test/java/accord/utils/Property.java b/accord-core/src/test/java/accord/utils/Property.java index 01735686..9c813758 100644 --- a/accord-core/src/test/java/accord/utils/Property.java +++ b/accord-core/src/test/java/accord/utils/Property.java @@ -18,9 +18,18 @@ package accord.utils; +import accord.utils.async.AsyncChains; +import accord.utils.async.AsyncResult; +import accord.utils.async.AsyncResults; + +import java.time.Duration; import java.util.Arrays; import java.util.Objects; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import javax.annotation.Nullable; public class Property { @@ -30,6 +39,8 @@ public class Property protected int examples = 1000; protected boolean pure = true; + @Nullable + protected Duration timeout = null; protected Common() { } @@ -38,6 +49,7 @@ public class Property this.seed = other.seed; this.examples = other.examples; this.pure = other.pure; + this.timeout = other.timeout; } public T withSeed(long seed) @@ -59,6 +71,51 @@ public class Property this.pure = pure; return (T) this; } + + public T withTimeout(Duration timeout) + { + this.timeout = timeout; + return (T) this; + } + + protected void checkWithTimeout(Runnable fn) + { + AsyncResult.Settable<?> promise = AsyncResults.settable(); + Thread t = new Thread(() -> { + try + { + fn.run(); + promise.setSuccess(null); + } + catch (Throwable e) + { + promise.setFailure(e); + } + }); + t.setName("property with timeout"); + t.setDaemon(true); + try + { + t.start(); + AsyncChains.getBlocking(promise, timeout.toNanos(), TimeUnit.NANOSECONDS); + } + catch (ExecutionException e) + { + throw new PropertyError(propertyError(this, e.getCause())); + } + catch (InterruptedException e) + { + t.interrupt(); + throw new PropertyError(propertyError(this, e)); + } + catch (TimeoutException e) + { + t.interrupt(); + TimeoutException override = new TimeoutException("property test did not complete within " + this.timeout); + override.setStackTrace(new StackTraceElement[0]); + throw new PropertyError(propertyError(this, override)); + } + } } public static class ForBuilder extends Common<ForBuilder> @@ -161,6 +218,16 @@ public class Property } public void check(FailingConsumer<T> fn) + { + if (timeout != null) + { + checkWithTimeout(() -> checkInternal(fn)); + return; + } + checkInternal(fn); + } + + private void checkInternal(FailingConsumer<T> fn) { RandomSource random = new DefaultRandom(seed); for (int i = 0; i < examples; i++) @@ -201,6 +268,16 @@ public class Property } public void check(FailingBiConsumer<A, B> fn) + { + if (timeout != null) + { + checkWithTimeout(() -> checkInternal(fn)); + return; + } + checkInternal(fn); + } + + private void checkInternal(FailingBiConsumer<A, B> fn) { RandomSource random = new DefaultRandom(seed); for (int i = 0; i < examples; i++) @@ -245,6 +322,16 @@ public class Property } public void check(FailingTriConsumer<A, B, C> fn) + { + if (timeout != null) + { + checkWithTimeout(() -> checkInternal(fn)); + return; + } + checkInternal(fn); + } + + private void checkInternal(FailingTriConsumer<A, B, C> fn) { RandomSource random = new DefaultRandom(seed); for (int i = 0; i < examples; i++) @@ -270,16 +357,23 @@ public class Property } } - private static void checkInterrupted() throws InterruptedException { + private static void checkInterrupted() throws InterruptedException + { if (Thread.currentThread().isInterrupted()) throw new InterruptedException(); } public static class PropertyError extends AssertionError { - public PropertyError(String message, Throwable cause) { + public PropertyError(String message, Throwable cause) + { super(message, cause); } + + public PropertyError(String message) + { + super(message); + } } public static ForBuilder qt() --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org