This is an automated email from the ASF dual-hosted git repository. dcapwell pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git
The following commit(s) were added to refs/heads/trunk by this push: new 694ae39e CEP-15: (Accord) When nodes are removed from a cluster, need to update topology tracking to avoid being blocked (#100) 694ae39e is described below commit 694ae39e2e00075bdabd47632dced0db12a9981d Author: dcapwell <dcapw...@apache.org> AuthorDate: Mon Jul 1 13:11:40 2024 -0700 CEP-15: (Accord) When nodes are removed from a cluster, need to update topology tracking to avoid being blocked (#100) patch by David Capwell; reviewed by Blake Eggleston for CASSANDRA-19719 --- .../main/java/accord/api/ConfigurationService.java | 3 + .../accord/impl/AbstractConfigurationService.java | 10 ++ accord-core/src/main/java/accord/local/Node.java | 6 + .../main/java/accord/topology/TopologyManager.java | 29 ++++ .../src/main/java/accord/utils/RandomSource.java | 11 +- .../java/accord/topology/TopologyManagerTest.java | 4 +- accord-core/src/test/java/accord/utils/Gens.java | 24 +++- .../java/accord/utils/LoggingRandomSource.java | 160 +++++++++++++++++++++ 8 files changed, 236 insertions(+), 11 deletions(-) diff --git a/accord-core/src/main/java/accord/api/ConfigurationService.java b/accord-core/src/main/java/accord/api/ConfigurationService.java index 49dd8d63..5d62c709 100644 --- a/accord-core/src/main/java/accord/api/ConfigurationService.java +++ b/accord-core/src/main/java/accord/api/ConfigurationService.java @@ -18,6 +18,7 @@ package accord.api; +import java.util.Collection; import javax.annotation.Nullable; import accord.local.Node; @@ -143,6 +144,8 @@ public interface ConfigurationService * in whatever epoch they execute in. Once the whole range is covered this epoch is redundant, and may be cleaned up. */ void onEpochRedundant(Ranges ranges, long epoch); + + default void onRemoveNodes(long epoch, Collection<Node.Id> removed) {} } void registerListener(Listener listener); diff --git a/accord-core/src/main/java/accord/impl/AbstractConfigurationService.java b/accord-core/src/main/java/accord/impl/AbstractConfigurationService.java index 2f93a990..6eb5daf1 100644 --- a/accord-core/src/main/java/accord/impl/AbstractConfigurationService.java +++ b/accord-core/src/main/java/accord/impl/AbstractConfigurationService.java @@ -110,6 +110,11 @@ public abstract class AbstractConfigurationService<EpochState extends AbstractCo return epochs.size(); } + public boolean isEmpty() + { + return lastReceived == 0; + } + EpochState getOrCreate(long epoch) { Invariants.checkArgument(epoch > 0, "Epoch must be positive but given %d", epoch); @@ -210,6 +215,11 @@ public abstract class AbstractConfigurationService<EpochState extends AbstractCo listeners.add(listener); } + public synchronized boolean isEmpty() + { + return epochs.isEmpty(); + } + @Override public synchronized Topology currentTopology() { diff --git a/accord-core/src/main/java/accord/local/Node.java b/accord-core/src/main/java/accord/local/Node.java index 26ffd317..74446cee 100644 --- a/accord-core/src/main/java/accord/local/Node.java +++ b/accord-core/src/main/java/accord/local/Node.java @@ -261,6 +261,12 @@ public class Node implements ConfigurationService.Listener, NodeTimeService topology.onEpochSyncComplete(node, epoch); } + @Override + public void onRemoveNodes(long epoch, Collection<Id> removed) + { + topology.onRemoveNodes(epoch, removed); + } + @Override public void truncateTopologyUntil(long epoch) { diff --git a/accord-core/src/main/java/accord/topology/TopologyManager.java b/accord-core/src/main/java/accord/topology/TopologyManager.java index bd4b25af..d2d09afb 100644 --- a/accord-core/src/main/java/accord/topology/TopologyManager.java +++ b/accord-core/src/main/java/accord/topology/TopologyManager.java @@ -20,6 +20,7 @@ package accord.topology; import java.util.ArrayList; import java.util.BitSet; +import java.util.Collection; import java.util.List; import java.util.Set; import java.util.TreeSet; @@ -112,6 +113,11 @@ public class TopologyManager return true; } + public boolean hasReachedQuorum() + { + return syncTracker == null ? true : syncTracker.hasReachedQuorum(); + } + public boolean recordSyncComplete(Id node) { if (syncTracker == null) @@ -420,6 +426,12 @@ public class TopologyManager return current == null || result.isDone() ? result : result.withExecutor(current); } + public synchronized boolean hasReachedQuorum(long epoch) + { + EpochState state = epochs.get(epoch); + return state != null && state.hasReachedQuorum(); + } + @VisibleForTesting public EpochReady epochReady(long epoch) { @@ -439,6 +451,23 @@ public class TopologyManager epochs.syncComplete(node, epoch); } + public synchronized void onRemoveNodes(long removedIn, Collection<Id> removed) + { + for (long epoch = removedIn, min = minEpoch(); epoch >= min; epoch--) + { + EpochState state = epochs.get(epoch); + if (state == null || state.hasReachedQuorum()) continue; + for (Id node : removed) + epochs.syncComplete(node, epoch); + } + } + + @VisibleForTesting + public Ranges syncComplete(long epoch) + { + return epochs.get(epoch).syncComplete; + } + public synchronized void truncateTopologyUntil(long epoch) { Epochs current = epochs; diff --git a/accord-core/src/main/java/accord/utils/RandomSource.java b/accord-core/src/main/java/accord/utils/RandomSource.java index ed971aac..98579de3 100644 --- a/accord-core/src/main/java/accord/utils/RandomSource.java +++ b/accord-core/src/main/java/accord/utils/RandomSource.java @@ -21,9 +21,9 @@ package accord.utils; import java.util.ArrayList; import java.util.Comparator; import java.util.List; -import java.util.NavigableSet; import java.util.Random; import java.util.Set; +import java.util.SortedSet; import java.util.function.BooleanSupplier; import java.util.function.IntSupplier; import java.util.function.LongSupplier; @@ -288,19 +288,20 @@ public interface RandomSource return array[nextInt(offset, offset + length)]; } - default <T> T pick(NavigableSet<T> set) + default <T> T pickOrderedSet(SortedSet<T> set) { int offset = nextInt(0, set.size()); return Iterables.get(set, offset); } - default <T extends Comparable<? super T>> T pick(Set<T> set) + default <T extends Comparable<? super T>> T pickUnorderedSet(Set<T> set) { + if (set instanceof SortedSet) + return pickOrderedSet((SortedSet<T>) set); List<T> values = new ArrayList<>(set); // Non-ordered sets may have different iteration order on different environments, which would make a seed produce different histories! // To avoid such a problem, make sure to apply a deterministic function (sort). - if (!(set instanceof NavigableSet)) - values.sort(Comparator.naturalOrder()); + values.sort(Comparator.naturalOrder()); return pick(values); } diff --git a/accord-core/src/test/java/accord/topology/TopologyManagerTest.java b/accord-core/src/test/java/accord/topology/TopologyManagerTest.java index d0907723..e018a26a 100644 --- a/accord-core/src/test/java/accord/topology/TopologyManagerTest.java +++ b/accord-core/src/test/java/accord/topology/TopologyManagerTest.java @@ -562,9 +562,9 @@ public class TopologyManagerTest postTopologyUpdate(id, t); break; case OnEpochSyncComplete: - long epoch = rs.pick(pendingSyncComplete.keySet()); + long epoch = rs.pickUnorderedSet(pendingSyncComplete.keySet()); Set<Node.Id> pendingNodes = pendingSyncComplete.get(epoch); - Node.Id node = rs.pick(pendingNodes); + Node.Id node = rs.pickUnorderedSet(pendingNodes); pendingNodes.remove(node); if (pendingNodes.isEmpty()) pendingSyncComplete.remove(epoch); diff --git a/accord-core/src/test/java/accord/utils/Gens.java b/accord-core/src/test/java/accord/utils/Gens.java index fe44a81b..936e484b 100644 --- a/accord-core/src/test/java/accord/utils/Gens.java +++ b/accord-core/src/test/java/accord/utils/Gens.java @@ -27,6 +27,8 @@ import java.util.Collections; import java.util.Comparator; import java.util.EnumMap; import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.NavigableSet; @@ -35,7 +37,6 @@ import java.util.Set; import java.util.function.BooleanSupplier; import java.util.function.Function; import java.util.function.Supplier; -import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.LongStream; import java.util.stream.Stream; @@ -112,8 +113,18 @@ public class Gens { { if (values == null || values.isEmpty()) throw new IllegalArgumentException("values is empty"); + // if 2 values have the same weight we need some way to tie-break, but that isn't always possible... + // this method relies on the map having some order and will reject any map that doesn't define a deterministic order + if (!(values instanceof EnumMap || values instanceof LinkedHashMap)) + throw new IllegalArgumentException("pick(Map) requires a map with deterministic iteration; given " + values.getClass()); double totalWeight = values.values().stream().mapToDouble(Integer::intValue).sum(); - List<Weight<T>> list = values.entrySet().stream().map(e -> new Weight<>(e.getKey(), e.getValue())).collect(Collectors.toList()); + List<Weight<T>> list = new ArrayList<>(values.size()); + Iterator<Map.Entry<T, Integer>> it = values.entrySet().iterator(); + for (int i = 0; it.hasNext(); i++) + { + Map.Entry<T, Integer> e = it.next(); + list.add(new Weight<>(e.getKey(), e.getValue(), i)); + } Collections.sort(list); return rs -> { double value = rs.nextDouble() * totalWeight; @@ -953,15 +964,20 @@ public class Gens { { private final T value; private final double weight; + private final int index; - private Weight(T value, double weight) { + private Weight(T value, double weight, int index) { this.value = value; this.weight = weight; + this.index = index; } @Override public int compareTo(Weight<T> o) { - return Double.compare(weight, o.weight); + int rc = Double.compare(weight, o.weight); + if (rc == 0) + rc = Integer.compare(index, o.index); + return rc; } } } diff --git a/accord-core/src/test/java/accord/utils/LoggingRandomSource.java b/accord-core/src/test/java/accord/utils/LoggingRandomSource.java new file mode 100644 index 00000000..fa1bf057 --- /dev/null +++ b/accord-core/src/test/java/accord/utils/LoggingRandomSource.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package accord.utils; + +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.io.Writer; +import java.nio.charset.StandardCharsets; +import java.util.Base64; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * This class is helpful for finding non-deterministic behavior as it tracks every call to {@link RandomSource} so that + * different runs can be compared to find where they differ. The most common way to use this is to replace the tests + * {@link RandomSource} with this class and run it against 2 or more environments (that are experiencing a different outcome), + * then compare the logs to find where the first difference is (and the previous access is likely where the non-deterministic fault is). + */ +@SuppressWarnings("unused") +public class LoggingRandomSource implements RandomSource +{ + private final RandomSource delegate; + private final Writer writer; + + public LoggingRandomSource(File out, RandomSource delegate) + { + this.delegate = delegate; + try + { + this.writer = new BufferedWriter(new FileWriter(out, StandardCharsets.UTF_8)); + } + catch (IOException e) + { + throw new UncheckedIOException(e); + } + } + + private LoggingRandomSource(Writer writer, RandomSource delegate) + { + this.delegate = delegate; + this.writer = writer; + } + + @Override + public void nextBytes(byte[] bytes) + { + delegate.nextBytes(bytes); + append("nextBytes(length=" + bytes.length + ") -> " + Base64.getEncoder().encodeToString(bytes)); + } + + @Override + public boolean nextBoolean() + { + boolean result = delegate.nextBoolean(); + append("nextBoolean -> " + result); + return result; + } + + @Override + public int nextInt() + { + int result = delegate.nextInt(); + append("nextInt -> " + result); + return result; + } + + @Override + public long nextLong() + { + long result = delegate.nextLong(); + append("nextLong -> " + result); + return result; + } + + @Override + public float nextFloat() + { + float result = delegate.nextFloat(); + append("nextFloat -> " + result); + return result; + } + + @Override + public double nextDouble() + { + double result = delegate.nextDouble(); + append("nextDouble -> " + result); + return result; + } + + @Override + public double nextGaussian() + { + double result = delegate.nextGaussian(); + append("nextGaussian -> " + result); + return result; + } + + @Override + public void setSeed(long seed) + { + append("setSeed(" + seed + ')'); + delegate.setSeed(seed); + } + + @Override + public RandomSource fork() + { + append("fork"); + return new LoggingRandomSource(writer, delegate.fork()); + } + + private void append(String line) + { + Thread thread = Thread.currentThread(); + StackTraceElement[] stack = normalize(thread.getStackTrace()); + try + { + writer.append("[thread=").append(thread.getName()).append("] "); + writer.append(line).append(String.valueOf('\n')); + for (int i = 0; i < stack.length; i++) + writer.append("\t").append(stack[i].getClassName()).append("#").append(stack[i].getMethodName()).append(":").append(String.valueOf(stack[i].getLineNumber())).append("\n"); + writer.flush(); + } + catch (IOException e) + { + throw new UncheckedIOException(e); + } + } + + private static StackTraceElement[] normalize(StackTraceElement[] stackTrace) + { + return Stream.of(stackTrace) + .filter(e -> !(e.getClassName().startsWith("com.intellij") + || e.getClassName().startsWith("org.junit") + || e.getClassName().startsWith("jdk.internal.reflect") + || e.getClassName().startsWith("java.lang.reflect"))) + .collect(Collectors.toList()) + .toArray(StackTraceElement[]::new); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org