This is an automated email from the ASF dual-hosted git repository. aweisberg pushed a commit to branch retry-new-system in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git
commit 892f19d7714bc7128d84b7e635acc6621ce476c4 Author: Ariel Weisberg <aweisb...@apple.com> AuthorDate: Mon May 6 18:12:30 2024 -0400 retry new system checkpoint --- accord-core/src/main/java/accord/local/Node.java | 18 ++-- .../main/java/accord/topology/TopologyManager.java | 120 +++++++++++++++++---- accord-core/src/test/java/accord/Utils.java | 8 ++ .../java/accord/topology/TopologyManagerTest.java | 61 +++++------ 4 files changed, 149 insertions(+), 58 deletions(-) diff --git a/accord-core/src/main/java/accord/local/Node.java b/accord-core/src/main/java/accord/local/Node.java index 1605f43..62331d0 100644 --- a/accord-core/src/main/java/accord/local/Node.java +++ b/accord-core/src/main/java/accord/local/Node.java @@ -34,14 +34,7 @@ import java.util.function.Function; import java.util.function.LongSupplier; import java.util.function.Supplier; import java.util.function.ToLongFunction; -import javax.annotation.Nonnull; -import javax.annotation.Nullable; -import accord.coordinate.CoordinateEphemeralRead; -import accord.coordinate.CoordinationAdapter; -import accord.coordinate.CoordinationAdapter.Factory.Step; -import accord.utils.DeterministicSet; -import accord.utils.Invariants; import com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -58,7 +51,10 @@ import accord.api.RoutingKey; import accord.api.Scheduler; import accord.api.TopologySorter; import accord.config.LocalConfig; +import accord.coordinate.CoordinateEphemeralRead; import accord.coordinate.CoordinateTransaction; +import accord.coordinate.CoordinationAdapter; +import accord.coordinate.CoordinationAdapter.Factory.Step; import accord.coordinate.MaybeRecover; import accord.coordinate.Outcome; import accord.coordinate.RecoverWithRoute; @@ -85,12 +81,16 @@ import accord.primitives.Unseekables; import accord.topology.Shard; import accord.topology.Topology; import accord.topology.TopologyManager; +import accord.utils.DeterministicSet; +import accord.utils.Invariants; import accord.utils.MapReduceConsume; import accord.utils.RandomSource; import accord.utils.async.AsyncChain; import accord.utils.async.AsyncExecutor; import accord.utils.async.AsyncResult; import accord.utils.async.AsyncResults; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; import net.nicoulaj.compilecommand.annotations.Inline; import static accord.utils.Invariants.illegalState; @@ -179,7 +179,8 @@ public class Node implements ConfigurationService.Listener, NodeTimeService this.localRequestHandler = localRequestHandler; this.configService = configService; this.coordinationAdapters = coordinationAdapters; - this.topology = new TopologyManager(topologySorter, id); + this.topology = new TopologyManager(topologySorter, id, scheduler, nowTimeUnit); + topology.scheduleTopologyUpdateWatchdog(); this.nowSupplier = nowSupplier; this.nowTimeUnit = nowTimeUnit; this.now = new AtomicReference<>(Timestamp.fromValues(topology.epoch(), nowSupplier.getAsLong(), id)); @@ -330,6 +331,7 @@ public class Node implements ConfigurationService.Listener, NodeTimeService public void shutdown() { commandStores.shutdown(); + topology.shutdown(); } public Timestamp uniqueNow() diff --git a/accord-core/src/main/java/accord/topology/TopologyManager.java b/accord-core/src/main/java/accord/topology/TopologyManager.java index bd4b25a..d2f609e 100644 --- a/accord-core/src/main/java/accord/topology/TopologyManager.java +++ b/accord-core/src/main/java/accord/topology/TopologyManager.java @@ -23,14 +23,18 @@ import java.util.BitSet; import java.util.List; import java.util.Set; import java.util.TreeSet; +import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.function.Supplier; +import java.util.function.ToLongFunction; import com.google.common.annotations.VisibleForTesting; import accord.api.ConfigurationService.EpochReady; import accord.api.RoutingKey; +import accord.api.Scheduler; import accord.api.TopologySorter; +import accord.coordinate.Timeout; import accord.coordinate.TopologyMismatch; import accord.coordinate.tracking.QuorumTracker; import accord.local.CommandStore; @@ -44,13 +48,14 @@ import accord.utils.Invariants; import accord.utils.async.AsyncChain; import accord.utils.async.AsyncResult; import accord.utils.async.AsyncResults; - import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; import static accord.coordinate.tracking.RequestStatus.Success; import static accord.primitives.AbstractRanges.UnionMode.MERGE_ADJACENT; import static accord.primitives.Routables.Slice.Minimal; import static accord.utils.Invariants.checkArgument; +import static accord.utils.Invariants.checkState; import static accord.utils.Invariants.illegalState; import static accord.utils.Invariants.nonNull; @@ -71,6 +76,11 @@ public class TopologyManager { private static final AsyncResult<Void> SUCCESS = AsyncResults.success(null); + // How long before we start notifying waiters on an epoch of timeout, + private static final long EPOCH_INITIAL_TIMEOUT_MILLIS = 10_000; + // How often we check for timeout, and once an epoch has timed out, how often we timeout new waiters + private static final long WATCHDOG_INTERVAL_MILLIS = 2_000; + static class EpochState { final Id self; @@ -106,7 +116,7 @@ public class TopologyManager newPrevSyncComplete = newPrevSyncComplete.union(MERGE_ADJACENT, addedRanges).subtract(removedRanges); if (prevSyncComplete.containsAll(newPrevSyncComplete)) return false; - Invariants.checkState(newPrevSyncComplete.containsAll(prevSyncComplete), "Expected %s to contain all ranges in %s; but did not", newPrevSyncComplete, prevSyncComplete); + checkState(newPrevSyncComplete.containsAll(prevSyncComplete), "Expected %s to contain all ranges in %s; but did not", newPrevSyncComplete, prevSyncComplete); prevSyncComplete = newPrevSyncComplete; syncComplete = curSyncComplete.slice(newPrevSyncComplete, Minimal).union(MERGE_ADJACENT, addedRanges); return true; @@ -215,13 +225,13 @@ public class TopologyManager // list of promises to be completed as newer epochs become active. This is to support processes that // are waiting on future epochs to begin (ie: txn requests from futures epochs). Index 0 is for // currentEpoch + 1 - private final List<AsyncResult.Settable<Void>> futureEpochFutures; + private final List<FutureEpoch> futureEpochs; - private Epochs(EpochState[] epochs, List<Notifications> pending, List<AsyncResult.Settable<Void>> futureEpochFutures) + private Epochs(EpochState[] epochs, List<Notifications> pending, List<FutureEpoch> futureEpochs) { this.currentEpoch = epochs.length > 0 ? epochs[0].epoch() : 0; this.pending = pending; - this.futureEpochFutures = futureEpochFutures; + this.futureEpochs = futureEpochs; for (int i=1; i<epochs.length; i++) checkArgument(epochs[i].epoch() == epochs[i-1].epoch() - 1); this.epochs = epochs; @@ -232,16 +242,18 @@ public class TopologyManager this(epochs, new ArrayList<>(), new ArrayList<>()); } - public AsyncResult<Void> awaitEpoch(long epoch) + public AsyncResult<Void> awaitEpoch(long epoch, ToLongFunction<TimeUnit> nowTimeUnit) { if (epoch <= currentEpoch) return SUCCESS; + long now = nowTimeUnit.applyAsLong(TimeUnit.MILLISECONDS); + long deadline = now + EPOCH_INITIAL_TIMEOUT_MILLIS; int diff = (int) (epoch - currentEpoch); - while (futureEpochFutures.size() < diff) - futureEpochFutures.add(AsyncResults.settable()); + while (futureEpochs.size() < diff) + futureEpochs.add(new FutureEpoch(AsyncResults.settable(), deadline)); - return futureEpochFutures.get(diff - 1); + return futureEpochs.get(diff - 1).future; } public long nextEpoch() @@ -365,17 +377,84 @@ public class TopologyManager } } + private static class FutureEpoch + { + private final long deadlineMillis; + private volatile AsyncResult.Settable<Void> future; + + public FutureEpoch(AsyncResult.Settable<Void> future, long deadlineMillis) + { + nonNull(future); + this.future = future; + this.deadlineMillis = deadlineMillis; + } + + /* + * Notify any listeners that are waiting for the epoch that is has been a long time since + * we started waiting for the epoch. We may still eventually get the epoch so also create + * a new future so subsequent operations may have a chance at seeing the epoch if ever appears. + * + * Subsequent waiters may get a timeout notification far sooner (WATCHDOG_INTERVAL_MILLISS) + * instead of EPOCH_INITIAL_TIMEOUT_MILLIS + */ + @GuardedBy("TopologyManager.this") + public void timeOutCurrentListeners() + { + checkState(future != null); + AsyncResult.Settable<Void> oldFuture = future; + checkState(oldFuture != null); + future = AsyncResults.settable(); + oldFuture.tryFailure(new Timeout(null, null)); + } + } + private final TopologySorter.Supplier sorter; private final Id node; + private final Scheduler scheduler; + private final ToLongFunction<TimeUnit> nowTimeUnit; private volatile Epochs epochs; + private Scheduler.Scheduled topologyUpdateWatchdog; - public TopologyManager(TopologySorter.Supplier sorter, Id node) + public TopologyManager(TopologySorter.Supplier sorter, Id node, Scheduler scheduler, ToLongFunction<TimeUnit> nowTimeUnit) { this.sorter = sorter; this.node = node; + this.scheduler = scheduler; + this.nowTimeUnit = nowTimeUnit; this.epochs = Epochs.EMPTY; } + public void shutdown() + { + topologyUpdateWatchdog.cancel(); + } + + public void scheduleTopologyUpdateWatchdog() + { + topologyUpdateWatchdog = scheduler.recurring(() -> { + synchronized (TopologyManager.this) + { + Epochs current = epochs; + if (current.futureEpochs.isEmpty()) + return; + + long now = nowTimeUnit.applyAsLong(TimeUnit.MILLISECONDS); + if (now > current.futureEpochs.get(0).deadlineMillis) + { + List<FutureEpoch> futureEpochs = null; + for (int i = 0; i < current.futureEpochs.size(); i++) + { + FutureEpoch futureEpoch = current.futureEpochs.get(i); + if (now <= futureEpoch.deadlineMillis) + break; + else + futureEpoch.timeOutCurrentListeners(); + } + } + } + }, WATCHDOG_INTERVAL_MILLIS, TimeUnit.MILLISECONDS); + } + public synchronized EpochReady onTopologyUpdate(Topology topology, Supplier<EpochReady> bootstrap) { Epochs current = epochs; @@ -400,11 +479,12 @@ public class TopologyManager nextEpochs[0].recordClosed(notifications.closed); nextEpochs[0].recordComplete(notifications.complete); - List<AsyncResult.Settable<Void>> futureEpochFutures = new ArrayList<>(current.futureEpochFutures); - AsyncResult.Settable<Void> toComplete = !futureEpochFutures.isEmpty() ? futureEpochFutures.remove(0) : null; - epochs = new Epochs(nextEpochs, pending, futureEpochFutures); + List<FutureEpoch> futureEpochs = new ArrayList<>(current.futureEpochs); + FutureEpoch toComplete = !futureEpochs.isEmpty() ? futureEpochs.remove(0) : null; + epochs = new Epochs(nextEpochs, pending, futureEpochs); + epochs = new Epochs(nextEpochs, pending, futureEpochs); if (toComplete != null) - toComplete.trySuccess(null); + toComplete.future.trySuccess(null); return nextEpochs[0].ready = bootstrap.get(); } @@ -414,7 +494,7 @@ public class TopologyManager AsyncResult<Void> result; synchronized (this) { - result = epochs.awaitEpoch(epoch); + result = epochs.awaitEpoch(epoch, nowTimeUnit); } CommandStore current = CommandStore.maybeCurrent(); return current == null || result.isDone() ? result : result.withExecutor(current); @@ -448,11 +528,11 @@ public class TopologyManager return; int newLen = current.epochs.length - (int) (epoch - current.minEpoch()); - Invariants.checkState(current.epochs[newLen - 1].syncComplete(), "Epoch %d's sync is not complete", current.epochs[newLen - 1].epoch()); + checkState(current.epochs[newLen - 1].syncComplete(), "Epoch %d's sync is not complete", current.epochs[newLen - 1].epoch()); EpochState[] nextEpochs = new EpochState[newLen]; System.arraycopy(current.epochs, 0, nextEpochs, 0, newLen); - epochs = new Epochs(nextEpochs, current.pending, current.futureEpochFutures); + epochs = new Epochs(nextEpochs, current.pending, current.futureEpochs); } public synchronized void onEpochClosed(Ranges ranges, long epoch) @@ -527,7 +607,7 @@ public class TopologyManager throw tm; if (maxEpoch == Long.MAX_VALUE) maxEpoch = snapshot.currentEpoch; - else Invariants.checkState(snapshot.currentEpoch >= maxEpoch, "current epoch %d < max %d", snapshot.currentEpoch, maxEpoch); + else checkState(snapshot.currentEpoch >= maxEpoch, "current epoch %d < max %d", snapshot.currentEpoch, maxEpoch); EpochState maxEpochState = nonNull(snapshot.get(maxEpoch)); if (minEpoch == maxEpoch && isSufficientFor.apply(maxEpochState).containsAll(select)) @@ -591,7 +671,7 @@ public class TopologyManager Epochs snapshot = epochs; EpochState maxState = snapshot.get(maxEpoch); - Invariants.checkState(maxState != null, "Unable to find epoch %d; known epochs are %d -> %d", maxEpoch, snapshot.minEpoch(), snapshot.currentEpoch); + checkState(maxState != null, "Unable to find epoch %d; known epochs are %d -> %d", maxEpoch, snapshot.minEpoch(), snapshot.currentEpoch); TopologyMismatch tm = TopologyMismatch.checkForMismatch(maxState.global(), select); if (tm != null) throw tm; @@ -607,7 +687,7 @@ public class TopologyManager topologies.add(epochState.global.forSelection(select)); select = select.subtract(epochState.addedRanges); } - Invariants.checkState(!topologies.isEmpty(), "Unable to find an epoch that contained %s", select); + checkState(!topologies.isEmpty(), "Unable to find an epoch that contained %s", select); return topologies.build(sorter); } diff --git a/accord-core/src/test/java/accord/Utils.java b/accord-core/src/test/java/accord/Utils.java index 78b1298..437a31e 100644 --- a/accord-core/src/test/java/accord/Utils.java +++ b/accord-core/src/test/java/accord/Utils.java @@ -30,6 +30,7 @@ import com.google.common.collect.Sets; import accord.api.Key; import accord.api.MessageSink; import accord.api.Scheduler; +import accord.api.TopologySorter; import accord.config.LocalConfig; import accord.config.MutableLocalConfig; import accord.coordinate.CoordinationAdapter; @@ -45,6 +46,7 @@ import accord.impl.mock.MockCluster; import accord.impl.mock.MockConfigurationService; import accord.impl.mock.MockStore; import accord.local.Node; +import accord.local.Node.Id; import accord.local.NodeTimeService; import accord.local.ShardDistributor; import accord.messages.LocalRequest; @@ -55,6 +57,7 @@ import accord.primitives.Txn; import accord.topology.Shard; import accord.topology.Topologies; import accord.topology.Topology; +import accord.topology.TopologyManager; import accord.utils.DefaultRandom; import accord.utils.EpochFunction; import accord.utils.Invariants; @@ -194,4 +197,9 @@ public class Utils .ignoreExceptions() .untilAsserted(runnable); } + + public static TopologyManager testTopologyManager(TopologySorter.Supplier sorter, Id node) + { + return new TopologyManager(sorter, node, Scheduler.NEVER_RUN_SCHEDULED, NodeTimeService.unixWrapper(TimeUnit.MILLISECONDS, () -> 0)); + } } diff --git a/accord-core/src/test/java/accord/topology/TopologyManagerTest.java b/accord-core/src/test/java/accord/topology/TopologyManagerTest.java index d090772..031e6a4 100644 --- a/accord-core/src/test/java/accord/topology/TopologyManagerTest.java +++ b/accord-core/src/test/java/accord/topology/TopologyManagerTest.java @@ -18,42 +18,43 @@ package accord.topology; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.EnumMap; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Supplier; + +import com.google.common.collect.AbstractIterator; +import com.google.common.collect.Iterators; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + import accord.burn.TopologyUpdates; import accord.impl.PrefixedIntHashKey; import accord.impl.TestAgent; import accord.local.AgentExecutor; +import accord.local.Node; +import accord.primitives.Range; import accord.primitives.Ranges; +import accord.primitives.RoutingKeys; import accord.primitives.Unseekables; import accord.utils.AccordGens; import accord.utils.Gen; import accord.utils.Gens; import accord.utils.RandomSource; -import com.google.common.collect.AbstractIterator; -import com.google.common.collect.Iterators; import org.agrona.collections.Long2ObjectHashMap; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; - -import accord.local.Node; -import accord.primitives.Range; -import accord.primitives.RoutingKeys; import org.mockito.Mockito; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.EnumMap; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.function.Supplier; - import static accord.Utils.id; import static accord.Utils.idList; import static accord.Utils.idSet; import static accord.Utils.shard; +import static accord.Utils.testTopologyManager; import static accord.Utils.topologies; import static accord.Utils.topology; import static accord.impl.IntKey.keys; @@ -79,7 +80,7 @@ public class TopologyManagerTest shard(range(100, 200), idList(2, 3, 5), idSet(2, 3, 5))); int[] unmoved = { 1, 3, 5 }; int[] moved = { 2, 4 }; - TopologyManager service = new TopologyManager(SUPPLIER, ID); + TopologyManager service = testTopologyManager(SUPPLIER, ID); service.onTopologyUpdate(t1, () -> null); service.onTopologyUpdate(t2, () -> null); @@ -105,7 +106,7 @@ public class TopologyManagerTest Topology topology1 = topology(1, shard(range, idList(1, 2, 3), idSet(1, 2))); Topology topology2 = topology(2, shard(range, idList(1, 2, 3), idSet(2, 3))); - TopologyManager service = new TopologyManager(SUPPLIER, ID); + TopologyManager service = testTopologyManager(SUPPLIER, ID); Assertions.assertSame(Topology.EMPTY, service.current()); service.onTopologyUpdate(topology1, () -> null); @@ -130,7 +131,7 @@ public class TopologyManagerTest shard(range(100, 200), idList(1, 2, 3), idSet(3, 4)), shard(range(200, 300), idList(4, 5, 6), idSet(4, 5))); - TopologyManager service = new TopologyManager(SUPPLIER, ID); + TopologyManager service = testTopologyManager(SUPPLIER, ID); service.onTopologyUpdate(topology1, () -> null); service.onTopologyUpdate(topology2, () -> null); @@ -161,7 +162,7 @@ public class TopologyManagerTest Topology topology2 = topology(2, shard(range, idList(1, 2, 3), idSet(2, 3))); Topology topology3 = topology(3, shard(range, idList(1, 2, 3), idSet(1, 2))); - TopologyManager service = new TopologyManager(SUPPLIER, ID); + TopologyManager service = testTopologyManager(SUPPLIER, ID); service.onTopologyUpdate(topology1, () -> null); service.onTopologyUpdate(topology2, () -> null); service.onTopologyUpdate(topology3, () -> null); @@ -198,7 +199,7 @@ public class TopologyManagerTest Topology topology2 = topology(2, shard(range, idList(1, 2, 3), idSet(2, 3))); // Topology topology3 = topology(3, shard(range, idList(1, 2, 3), idSet(3, 4))); - TopologyManager service = new TopologyManager(SUPPLIER, ID); + TopologyManager service = testTopologyManager(SUPPLIER, ID); service.onTopologyUpdate(topology1, () -> null); // sync epoch 2 @@ -220,7 +221,7 @@ public class TopologyManagerTest Topology topology2 = topology(2, shard(range, idList(1, 2, 3), idSet(1, 2))); Topology topology3 = topology(3, shard(range, idList(1, 2, 3), idSet(2, 3))); - TopologyManager service = new TopologyManager(SUPPLIER, ID); + TopologyManager service = testTopologyManager(SUPPLIER, ID); Assertions.assertSame(Topology.EMPTY, service.current()); service.onTopologyUpdate(topology1, () -> null); @@ -250,7 +251,7 @@ public class TopologyManagerTest shard(range(100, 200), idList(1, 2, 3), idSet(1, 2)), shard(range(200, 300), idList(4, 5, 6), idSet(5, 6))); - TopologyManager service = new TopologyManager(SUPPLIER, ID); + TopologyManager service = testTopologyManager(SUPPLIER, ID); service.onTopologyUpdate(topology5, () -> null); service.onTopologyUpdate(topology6, () -> null); @@ -278,7 +279,7 @@ public class TopologyManagerTest void truncateTopologyHistory() { Range range = range(100, 200); - TopologyManager service = new TopologyManager(SUPPLIER, ID); + TopologyManager service = testTopologyManager(SUPPLIER, ID); addAndMarkSynced(service, topology(1, shard(range, idList(1, 2, 3), idSet(1, 2)))); addAndMarkSynced(service, topology(2, shard(range, idList(1, 2, 3), idSet(2, 3)))); addAndMarkSynced(service, topology(3, shard(range, idList(1, 2, 3), idSet(1, 2)))); @@ -319,7 +320,7 @@ public class TopologyManagerTest shard(PrefixedIntHashKey.range(1, 0, 100), idList(1, 2, 3), idSet(1, 2)))); topologies.add(topology(epochCounter++, shard(PrefixedIntHashKey.range(1, 0, 100), idList(1, 2, 3), idSet(1, 2))));; - History history = new History(new TopologyManager(SUPPLIER, ID), topologies.iterator()) { + History history = new History(testTopologyManager(SUPPLIER, ID), topologies.iterator()) { @Override protected void postTopologyUpdate(int id, Topology t) @@ -365,7 +366,7 @@ public class TopologyManagerTest @Test void aba() { - TopologyManager service = new TopologyManager(SUPPLIER, ID); + TopologyManager service = testTopologyManager(SUPPLIER, ID); List<Node.Id> dc1Nodes = idList(1, 2, 3); Set<Node.Id> dc1Fp = idSet(1, 2); List<Node.Id> dc2Nodes = idList(4, 5, 6); @@ -416,7 +417,7 @@ public class TopologyManagerTest return t == null ? endOfData() : t; } }, 42); - History history = new History(new TopologyManager(SUPPLIER, ID), next) { + History history = new History(testTopologyManager(SUPPLIER, ID), next) { @Override protected void postTopologyUpdate(int id, Topology t) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org