This is an automated email from the ASF dual-hosted git repository.

dcapwell pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 694ae39e CEP-15: (Accord) When nodes are removed from a cluster, need 
to update topology tracking to avoid being blocked (#100)
694ae39e is described below

commit 694ae39e2e00075bdabd47632dced0db12a9981d
Author: dcapwell <dcapw...@apache.org>
AuthorDate: Mon Jul 1 13:11:40 2024 -0700

    CEP-15: (Accord) When nodes are removed from a cluster, need to update 
topology tracking to avoid being blocked (#100)
    
    patch by David Capwell; reviewed by Blake Eggleston for CASSANDRA-19719
---
 .../main/java/accord/api/ConfigurationService.java |   3 +
 .../accord/impl/AbstractConfigurationService.java  |  10 ++
 accord-core/src/main/java/accord/local/Node.java   |   6 +
 .../main/java/accord/topology/TopologyManager.java |  29 ++++
 .../src/main/java/accord/utils/RandomSource.java   |  11 +-
 .../java/accord/topology/TopologyManagerTest.java  |   4 +-
 accord-core/src/test/java/accord/utils/Gens.java   |  24 +++-
 .../java/accord/utils/LoggingRandomSource.java     | 160 +++++++++++++++++++++
 8 files changed, 236 insertions(+), 11 deletions(-)

diff --git a/accord-core/src/main/java/accord/api/ConfigurationService.java 
b/accord-core/src/main/java/accord/api/ConfigurationService.java
index 49dd8d63..5d62c709 100644
--- a/accord-core/src/main/java/accord/api/ConfigurationService.java
+++ b/accord-core/src/main/java/accord/api/ConfigurationService.java
@@ -18,6 +18,7 @@
 
 package accord.api;
 
+import java.util.Collection;
 import javax.annotation.Nullable;
 
 import accord.local.Node;
@@ -143,6 +144,8 @@ public interface ConfigurationService
          * in whatever epoch they execute in. Once the whole range is covered 
this epoch is redundant, and may be cleaned up.
          */
         void onEpochRedundant(Ranges ranges, long epoch);
+
+        default void onRemoveNodes(long epoch, Collection<Node.Id> removed) {}
     }
 
     void registerListener(Listener listener);
diff --git 
a/accord-core/src/main/java/accord/impl/AbstractConfigurationService.java 
b/accord-core/src/main/java/accord/impl/AbstractConfigurationService.java
index 2f93a990..6eb5daf1 100644
--- a/accord-core/src/main/java/accord/impl/AbstractConfigurationService.java
+++ b/accord-core/src/main/java/accord/impl/AbstractConfigurationService.java
@@ -110,6 +110,11 @@ public abstract class 
AbstractConfigurationService<EpochState extends AbstractCo
             return epochs.size();
         }
 
+        public boolean isEmpty()
+        {
+            return lastReceived == 0;
+        }
+
         EpochState getOrCreate(long epoch)
         {
             Invariants.checkArgument(epoch > 0, "Epoch must be positive but 
given %d", epoch);
@@ -210,6 +215,11 @@ public abstract class 
AbstractConfigurationService<EpochState extends AbstractCo
         listeners.add(listener);
     }
 
+    public synchronized boolean isEmpty()
+    {
+        return epochs.isEmpty();
+    }
+
     @Override
     public synchronized Topology currentTopology()
     {
diff --git a/accord-core/src/main/java/accord/local/Node.java 
b/accord-core/src/main/java/accord/local/Node.java
index 26ffd317..74446cee 100644
--- a/accord-core/src/main/java/accord/local/Node.java
+++ b/accord-core/src/main/java/accord/local/Node.java
@@ -261,6 +261,12 @@ public class Node implements 
ConfigurationService.Listener, NodeTimeService
         topology.onEpochSyncComplete(node, epoch);
     }
 
+    @Override
+    public void onRemoveNodes(long epoch, Collection<Id> removed)
+    {
+        topology.onRemoveNodes(epoch, removed);
+    }
+
     @Override
     public void truncateTopologyUntil(long epoch)
     {
diff --git a/accord-core/src/main/java/accord/topology/TopologyManager.java 
b/accord-core/src/main/java/accord/topology/TopologyManager.java
index bd4b25af..d2d09afb 100644
--- a/accord-core/src/main/java/accord/topology/TopologyManager.java
+++ b/accord-core/src/main/java/accord/topology/TopologyManager.java
@@ -20,6 +20,7 @@ package accord.topology;
 
 import java.util.ArrayList;
 import java.util.BitSet;
+import java.util.Collection;
 import java.util.List;
 import java.util.Set;
 import java.util.TreeSet;
@@ -112,6 +113,11 @@ public class TopologyManager
             return true;
         }
 
+        public boolean hasReachedQuorum()
+        {
+            return syncTracker == null ? true : syncTracker.hasReachedQuorum();
+        }
+
         public boolean recordSyncComplete(Id node)
         {
             if (syncTracker == null)
@@ -420,6 +426,12 @@ public class TopologyManager
         return current == null || result.isDone() ? result : 
result.withExecutor(current);
     }
 
+    public synchronized boolean hasReachedQuorum(long epoch)
+    {
+        EpochState state = epochs.get(epoch);
+        return state != null && state.hasReachedQuorum();
+    }
+
     @VisibleForTesting
     public EpochReady epochReady(long epoch)
     {
@@ -439,6 +451,23 @@ public class TopologyManager
         epochs.syncComplete(node, epoch);
     }
 
+    public synchronized void onRemoveNodes(long removedIn, Collection<Id> 
removed)
+    {
+        for (long epoch = removedIn, min = minEpoch(); epoch >= min; epoch--)
+        {
+            EpochState state = epochs.get(epoch);
+            if (state == null || state.hasReachedQuorum()) continue;
+            for (Id node : removed)
+                epochs.syncComplete(node, epoch);
+        }
+    }
+
+    @VisibleForTesting
+    public Ranges syncComplete(long epoch)
+    {
+        return epochs.get(epoch).syncComplete;
+    }
+
     public synchronized void truncateTopologyUntil(long epoch)
     {
         Epochs current = epochs;
diff --git a/accord-core/src/main/java/accord/utils/RandomSource.java 
b/accord-core/src/main/java/accord/utils/RandomSource.java
index ed971aac..98579de3 100644
--- a/accord-core/src/main/java/accord/utils/RandomSource.java
+++ b/accord-core/src/main/java/accord/utils/RandomSource.java
@@ -21,9 +21,9 @@ package accord.utils;
 import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.List;
-import java.util.NavigableSet;
 import java.util.Random;
 import java.util.Set;
+import java.util.SortedSet;
 import java.util.function.BooleanSupplier;
 import java.util.function.IntSupplier;
 import java.util.function.LongSupplier;
@@ -288,19 +288,20 @@ public interface RandomSource
         return array[nextInt(offset, offset + length)];
     }
 
-    default <T> T pick(NavigableSet<T> set)
+    default <T> T pickOrderedSet(SortedSet<T> set)
     {
         int offset = nextInt(0, set.size());
         return Iterables.get(set, offset);
     }
 
-    default <T extends Comparable<? super T>> T pick(Set<T> set)
+    default <T extends Comparable<? super T>> T pickUnorderedSet(Set<T> set)
     {
+        if (set instanceof SortedSet)
+            return pickOrderedSet((SortedSet<T>) set);
         List<T> values = new ArrayList<>(set);
         // Non-ordered sets may have different iteration order on different 
environments, which would make a seed produce different histories!
         // To avoid such a problem, make sure to apply a deterministic 
function (sort).
-        if (!(set instanceof NavigableSet))
-            values.sort(Comparator.naturalOrder());
+        values.sort(Comparator.naturalOrder());
         return pick(values);
     }
 
diff --git a/accord-core/src/test/java/accord/topology/TopologyManagerTest.java 
b/accord-core/src/test/java/accord/topology/TopologyManagerTest.java
index d0907723..e018a26a 100644
--- a/accord-core/src/test/java/accord/topology/TopologyManagerTest.java
+++ b/accord-core/src/test/java/accord/topology/TopologyManagerTest.java
@@ -562,9 +562,9 @@ public class TopologyManagerTest
                     postTopologyUpdate(id, t);
                     break;
                 case OnEpochSyncComplete:
-                    long epoch = rs.pick(pendingSyncComplete.keySet());
+                    long epoch = 
rs.pickUnorderedSet(pendingSyncComplete.keySet());
                     Set<Node.Id> pendingNodes = pendingSyncComplete.get(epoch);
-                    Node.Id node = rs.pick(pendingNodes);
+                    Node.Id node = rs.pickUnorderedSet(pendingNodes);
                     pendingNodes.remove(node);
                     if (pendingNodes.isEmpty())
                         pendingSyncComplete.remove(epoch);
diff --git a/accord-core/src/test/java/accord/utils/Gens.java 
b/accord-core/src/test/java/accord/utils/Gens.java
index fe44a81b..936e484b 100644
--- a/accord-core/src/test/java/accord/utils/Gens.java
+++ b/accord-core/src/test/java/accord/utils/Gens.java
@@ -27,6 +27,8 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.EnumMap;
 import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableSet;
@@ -35,7 +37,6 @@ import java.util.Set;
 import java.util.function.BooleanSupplier;
 import java.util.function.Function;
 import java.util.function.Supplier;
-import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import java.util.stream.LongStream;
 import java.util.stream.Stream;
@@ -112,8 +113,18 @@ public class Gens {
     {
         if (values == null || values.isEmpty())
             throw new IllegalArgumentException("values is empty");
+        // if 2 values have the same weight we need some way to tie-break, but 
that isn't always possible...
+        // this method relies on the map having some order and will reject any 
map that doesn't define a deterministic order
+        if (!(values instanceof EnumMap || values instanceof LinkedHashMap))
+            throw new IllegalArgumentException("pick(Map) requires a map with 
deterministic iteration; given " + values.getClass());
         double totalWeight = 
values.values().stream().mapToDouble(Integer::intValue).sum();
-        List<Weight<T>> list = values.entrySet().stream().map(e -> new 
Weight<>(e.getKey(), e.getValue())).collect(Collectors.toList());
+        List<Weight<T>> list = new ArrayList<>(values.size());
+        Iterator<Map.Entry<T, Integer>> it = values.entrySet().iterator();
+        for (int i = 0; it.hasNext(); i++)
+        {
+            Map.Entry<T, Integer> e = it.next();
+            list.add(new Weight<>(e.getKey(), e.getValue(), i));
+        }
         Collections.sort(list);
         return rs -> {
             double value = rs.nextDouble() * totalWeight;
@@ -953,15 +964,20 @@ public class Gens {
     {
         private final T value;
         private final double weight;
+        private final int index;
 
-        private Weight(T value, double weight) {
+        private Weight(T value, double weight, int index) {
             this.value = value;
             this.weight = weight;
+            this.index = index;
         }
 
         @Override
         public int compareTo(Weight<T> o) {
-            return Double.compare(weight, o.weight);
+            int rc = Double.compare(weight, o.weight);
+            if (rc == 0)
+                rc = Integer.compare(index, o.index);
+            return rc;
         }
     }
 }
diff --git a/accord-core/src/test/java/accord/utils/LoggingRandomSource.java 
b/accord-core/src/test/java/accord/utils/LoggingRandomSource.java
new file mode 100644
index 00000000..fa1bf057
--- /dev/null
+++ b/accord-core/src/test/java/accord/utils/LoggingRandomSource.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.utils;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.io.Writer;
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * This class is helpful for finding non-deterministic behavior as it tracks 
every call to {@link RandomSource} so that
+ * different runs can be compared to find where they differ.  The most common 
way to use this is to replace the tests
+ * {@link RandomSource} with this class and run it against 2 or more 
environments (that are experiencing a different outcome),
+ * then compare the logs to find where the first difference is (and the 
previous access is likely where the non-deterministic fault is).
+ */
+@SuppressWarnings("unused")
+public class LoggingRandomSource implements RandomSource
+{
+    private final RandomSource delegate;
+    private final Writer writer;
+
+    public LoggingRandomSource(File out, RandomSource delegate)
+    {
+        this.delegate = delegate;
+        try
+        {
+            this.writer = new BufferedWriter(new FileWriter(out, 
StandardCharsets.UTF_8));
+        }
+        catch (IOException e)
+        {
+            throw new UncheckedIOException(e);
+        }
+    }
+
+    private LoggingRandomSource(Writer writer, RandomSource delegate)
+    {
+        this.delegate = delegate;
+        this.writer = writer;
+    }
+
+    @Override
+    public void nextBytes(byte[] bytes)
+    {
+        delegate.nextBytes(bytes);
+        append("nextBytes(length=" + bytes.length + ") -> " + 
Base64.getEncoder().encodeToString(bytes));
+    }
+
+    @Override
+    public boolean nextBoolean()
+    {
+        boolean result = delegate.nextBoolean();
+        append("nextBoolean -> " + result);
+        return result;
+    }
+
+    @Override
+    public int nextInt()
+    {
+        int result = delegate.nextInt();
+        append("nextInt -> " + result);
+        return result;
+    }
+
+    @Override
+    public long nextLong()
+    {
+        long result = delegate.nextLong();
+        append("nextLong -> " + result);
+        return result;
+    }
+
+    @Override
+    public float nextFloat()
+    {
+        float result = delegate.nextFloat();
+        append("nextFloat -> " + result);
+        return result;
+    }
+
+    @Override
+    public double nextDouble()
+    {
+        double result = delegate.nextDouble();
+        append("nextDouble -> " + result);
+        return result;
+    }
+
+    @Override
+    public double nextGaussian()
+    {
+        double result = delegate.nextGaussian();
+        append("nextGaussian -> " + result);
+        return result;
+    }
+
+    @Override
+    public void setSeed(long seed)
+    {
+        append("setSeed(" + seed + ')');
+        delegate.setSeed(seed);
+    }
+
+    @Override
+    public RandomSource fork()
+    {
+        append("fork");
+        return new LoggingRandomSource(writer, delegate.fork());
+    }
+
+    private void append(String line)
+    {
+        Thread thread = Thread.currentThread();
+        StackTraceElement[] stack = normalize(thread.getStackTrace());
+        try
+        {
+            writer.append("[thread=").append(thread.getName()).append("] ");
+            writer.append(line).append(String.valueOf('\n'));
+            for (int i = 0; i < stack.length; i++)
+                
writer.append("\t").append(stack[i].getClassName()).append("#").append(stack[i].getMethodName()).append(":").append(String.valueOf(stack[i].getLineNumber())).append("\n");
+            writer.flush();
+        }
+        catch (IOException e)
+        {
+            throw new UncheckedIOException(e);
+        }
+    }
+
+    private static StackTraceElement[] normalize(StackTraceElement[] 
stackTrace)
+    {
+        return Stream.of(stackTrace)
+                     .filter(e -> !(e.getClassName().startsWith("com.intellij")
+                                    || e.getClassName().startsWith("org.junit")
+                                    || 
e.getClassName().startsWith("jdk.internal.reflect")
+                                    || 
e.getClassName().startsWith("java.lang.reflect")))
+                     .collect(Collectors.toList())
+                     .toArray(StackTraceElement[]::new);
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to