belliottsmith commented on code in PR #256:
URL: https://github.com/apache/cassandra-accord/pull/256#discussion_r2492069344
##########
accord-core/src/main/java/accord/topology/TopologyManager.java:
##########
@@ -18,1625 +18,445 @@
package accord.topology;
-import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.BitSet;
-import java.util.Collections;
import java.util.IdentityHashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Objects;
-import java.util.Set;
-import java.util.TreeSet;
-import java.util.function.Consumer;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
-import java.util.function.LongConsumer;
import java.util.function.Supplier;
import javax.annotation.Nullable;
-import javax.annotation.concurrent.GuardedBy;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.ImmutableList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import accord.api.Agent;
import accord.api.AsyncExecutor;
-import accord.api.ConfigurationService;
-import accord.api.ConfigurationService.EpochReady;
-import accord.api.ProtocolModifiers.QuorumEpochIntersections.Include;
+import accord.api.TopologyListener;
import accord.api.Timeouts;
-import accord.api.Timeouts.RegisteredTimeout;
+import accord.api.TopologyService;
import accord.api.TopologySorter;
import accord.api.VisibleForImplementation;
-import accord.coordinate.EpochTimeout;
-import accord.coordinate.tracking.QuorumTracker;
+import accord.local.Node;
import accord.local.Node.Id;
import accord.local.TimeService;
-import accord.primitives.EpochSupplier;
-import accord.primitives.Participants;
import accord.primitives.Ranges;
-import accord.primitives.RoutableKey;
-import accord.primitives.Routables;
-import accord.primitives.Timestamp;
-import accord.primitives.TxnId.FastPath;
-import accord.primitives.Unseekables;
import accord.topology.Topologies.SelectNodeOwnership;
-import accord.topology.Topologies.Single;
-import accord.utils.IndexedBiFunction;
+import accord.topology.TopologyCollector.BestFastPath;
+import accord.topology.TopologyCollector.Simple;
+import accord.topology.TopologyCollector.SupportsPrivilegedFastPath;
import accord.utils.Invariants;
import accord.utils.async.AsyncChain;
+import accord.utils.async.AsyncChains;
import accord.utils.async.AsyncResult;
import accord.utils.async.AsyncResults;
+import accord.utils.async.NestedAsyncResult;
-import static
accord.api.ProtocolModifiers.QuorumEpochIntersections.Include.Owned;
-import static
accord.api.ProtocolModifiers.QuorumEpochIntersections.Include.Unsynced;
-import static accord.coordinate.tracking.RequestStatus.Success;
-import static accord.primitives.AbstractRanges.UnionMode.MERGE_ADJACENT;
-import static accord.primitives.TxnId.FastPath.PrivilegedCoordinatorWithDeps;
-import static
accord.primitives.TxnId.FastPath.PrivilegedCoordinatorWithoutDeps;
-import static accord.primitives.TxnId.FastPath.Unoptimised;
-import static accord.utils.Invariants.illegalState;
-import static accord.utils.Invariants.nonNull;
-import static java.util.concurrent.TimeUnit.MICROSECONDS;
-import static java.util.stream.Collectors.joining;
-
-/**
- * Manages topology state changes and update bookkeeping
- *
- * Each time the topology changes we need to:
- * * confirm previous owners of ranges we replicate are aware of the new config
- * * learn of any outstanding operations for ranges we replicate
- * * clean up obsolete data
- *
- * Assumes a topology service that won't report epoch n without having n-1 etc
also available
- *
- * TODO (desired, efficiency/clarity): make TopologyManager a Topologies and
copy-on-write update to it,
- * so we can always just take a reference for transactions instead of copying
every time (and index into it by the txnId.epoch)
- */
-public class TopologyManager
-{
- private static final Logger logger =
LoggerFactory.getLogger(TopologyManager.class);
- private static final FutureEpoch SUCCESS;
-
- static
- {
- SUCCESS = new FutureEpoch(-1L, null);
- SUCCESS.setDone();
- }
-
- static class EpochState
- {
- final Id self;
- private final Topology global;
- private final Topology local;
- private final QuorumTracker syncTracker;
- private final BitSet curShardSyncComplete;
- private final Ranges addedRanges, removedRanges;
- @GuardedBy("TopologyManager.this")
- private EpochReady ready;
- @GuardedBy("TopologyManager.this")
- private Ranges synced;
- @GuardedBy("TopologyManager.this")
- Ranges closed = Ranges.EMPTY, retired = Ranges.EMPTY;
-
- private volatile boolean allRetired;
-
- public boolean allRetired()
- {
- if (allRetired)
- return true;
-
- if (!retired.containsAll(global.ranges))
- return false;
-
- Invariants.require(closed.containsAll(global.ranges));
- allRetired = true;
- return true;
- }
-
- EpochState(Id node, Topology global, TopologySorter sorter, Ranges
prevRanges)
- {
- this.self = node;
- this.global = Invariants.requireArgument(global,
!global.isSubset());
- this.local = global.forNode(node).trim();
- this.curShardSyncComplete = new BitSet(global.shards.length);
- if (!global().isEmpty())
- this.syncTracker = new QuorumTracker(new Single(sorter,
global()));
- else
- this.syncTracker = null;
-
- this.addedRanges =
global.ranges.without(prevRanges).mergeTouching();
- this.removedRanges =
prevRanges.mergeTouching().without(global.ranges);
- this.synced = addedRanges;
- }
-
- EpochState(Id node, Topology global, BitSet curShardSyncComplete,
QuorumTracker syncTracker, Ranges addedRanges, Ranges removedRanges, EpochReady
ready, Ranges synced, Ranges closed, Ranges retired)
- {
- this.self = node;
- this.global = Invariants.requireArgument(global,
!global.isSubset());
- this.local = global.forNode(node).trim();
- this.curShardSyncComplete = curShardSyncComplete;
- this.syncTracker = syncTracker;
- this.addedRanges = addedRanges;
- this.removedRanges = removedRanges;
- this.ready = ready;
- this.synced = synced;
- this.closed = closed;
- this.retired = retired;
- }
-
- public boolean hasReachedQuorum()
- {
- return syncTracker == null || syncTracker.hasReachedQuorum();
- }
-
- private boolean recordSyncCompleteFromFuture()
- {
- if (syncTracker == null || syncComplete())
- return false;
- synced = global.ranges.mergeTouching();
- return true;
- }
-
- enum NodeSyncStatus { Untracked, Complete, ShardUpdate, NoUpdate }
-
- NodeSyncStatus recordSyncComplete(Id node)
- {
- if (syncTracker == null)
- return NodeSyncStatus.Untracked;
-
- if (syncTracker.recordSuccess(node) == Success)
- {
- synced = global.ranges.mergeTouching();
- return NodeSyncStatus.Complete;
- }
- else
- {
- boolean updated = false;
- // loop over each current shard, and test if its ranges are
complete
- for (int i = 0 ; i < global.shards.length ; ++i)
- {
- if (syncTracker.get(i).hasReachedQuorum() &&
!curShardSyncComplete.get(i))
- {
- synced = synced.union(MERGE_ADJACENT,
Ranges.of(global.shards[i].range));
- curShardSyncComplete.set(i);
- updated = true;
- }
- }
- return updated ? NodeSyncStatus.ShardUpdate :
NodeSyncStatus.NoUpdate;
- }
- }
-
- // returns those ranges that weren't already closed, so that they can
be propagated to lower epochs
- Ranges recordClosed(Ranges ranges)
- {
- ranges = ranges.without(closed);
- if (ranges.isEmpty())
- return ranges;
- closed = closed.union(MERGE_ADJACENT, ranges);
- Invariants.require(closed.mergeTouching() == closed);
- return ranges.without(addedRanges);
- }
-
- // returns those ranges that weren't already retired, so that they can
be propagated to lower epochs
- Ranges recordRetired(Ranges ranges)
- {
- ranges = ranges.without(retired);
- if (ranges.isEmpty())
- return ranges;
- synced = synced.union(MERGE_ADJACENT, ranges);
- closed = closed.union(MERGE_ADJACENT, ranges);
- retired = retired.union(MERGE_ADJACENT, ranges);
- Invariants.require(closed.mergeTouching() == closed);
- Invariants.require(retired.mergeTouching() == retired);
- return ranges.without(addedRanges);
- }
-
- Topology global()
- {
- return global;
- }
-
- Topology local()
- {
- return local;
- }
-
- long epoch()
- {
- return global().epoch;
- }
-
- boolean syncComplete()
- {
- return synced.containsAll(global.ranges);
- }
-
- /**
- * determine if sync has completed for all shards intersecting with
the given keys
- */
- boolean syncCompleteFor(Unseekables<?> intersect)
- {
- return synced.containsAll(intersect);
- }
-
- @Override
- public String toString()
- {
- return "EpochState{" +
- "epoch=" + global.epoch() +
- '}';
- }
- }
-
- private static class Epochs
- {
- static class Notifications
- {
- final Set<Id> syncComplete = new TreeSet<>();
- Ranges closed = Ranges.EMPTY, retired = Ranges.EMPTY;
- }
-
- private final long currentEpoch;
- private final long firstNonEmptyEpoch;
- // Epochs are sorted in _descending_ order
- private final EpochState[] epochs;
- // nodes we've received sync complete notifications from, for epochs
we do not yet have topologies for.
- // Pending sync notifications are indexed by epoch, with the current
epoch as index[0], and future epochs
- // as index[epoch - currentEpoch]. Sync complete notifications for the
current epoch are marked pending
- // until the superseding epoch has been applied
- private final List<Notifications> pending;
-
- // list of promises to be completed as newer epochs become active.
This is to support processes that
- // are waiting on future epochs to begin (ie: txn requests from
futures epochs). Index 0 is for
- // currentEpoch + 1
- // NOTE: this is NOT copy-on-write. This is mutated in place!
- private final List<FutureEpoch> futureEpochs;
-
- private Epochs(EpochState[] epochs, List<Notifications> pending,
List<FutureEpoch> futureEpochs, long prevFirstNonEmptyEpoch)
- {
- this.currentEpoch = epochs.length > 0 ? epochs[0].epoch() : 0;
- if (prevFirstNonEmptyEpoch != -1)
- this.firstNonEmptyEpoch = prevFirstNonEmptyEpoch;
- else if (epochs.length > 0 && !epochs[0].global().isEmpty())
- this.firstNonEmptyEpoch = currentEpoch;
- else
- this.firstNonEmptyEpoch = prevFirstNonEmptyEpoch;
-
- this.pending = pending;
- this.futureEpochs = futureEpochs;
- if (!futureEpochs.isEmpty())
- Invariants.require(futureEpochs.get(0).epoch == currentEpoch +
1);
-
- for (int i = 1; i < futureEpochs.size(); i++)
- Invariants.requireArgument(futureEpochs.get(i).epoch ==
futureEpochs.get(i - 1).epoch + 1);
- for (int i = 1; i < epochs.length; i++)
- Invariants.requireArgument(epochs[i].epoch() == epochs[i -
1].epoch() - 1);
- int truncateFrom = -1;
- // > 0 because we do not want to be left without epochs in case
they're all empty
- for (int i = epochs.length - 1; i > 0; i--)
- {
- EpochState epochState = epochs[i];
- if (epochState.allRetired() && (truncateFrom == -1 ||
truncateFrom == i + 1))
- truncateFrom = i;
- }
-
- if (truncateFrom == -1)
- {
- this.epochs = epochs;
- }
- else
- {
- this.epochs = Arrays.copyOf(epochs, truncateFrom);
- for (int i = truncateFrom; i < epochs.length; i++)
- {
- EpochState state = epochs[i];
- Invariants.require(epochs[i].syncComplete());
- logger.info("Retired epoch {} with added/removed ranges
{}/{}. Topology: {}. Closed: {}", state.epoch(), state.addedRanges,
state.removedRanges, state.global.ranges, state.closed);
- }
- if (logger.isTraceEnabled())
- {
- for (int i = 0; i < truncateFrom; i++)
- {
- EpochState state = epochs[i];
- Invariants.require(state.syncComplete());
- logger.trace("Leaving epoch {} with added/removed
ranges {}/{}", state.epoch(), state.addedRanges, state.removedRanges);
- }
- }
- }
- }
-
- private FutureEpoch awaitEpoch(long epoch, TopologyManager manager)
- {
- if (epoch <= currentEpoch)
- return SUCCESS;
-
- int expectedIndex = (int) (epoch - (1 + currentEpoch));
- while (futureEpochs.size() <= expectedIndex)
- {
- long addEpoch = currentEpoch + futureEpochs.size() + 1;
- FutureEpoch futureEpoch = new FutureEpoch(addEpoch, manager);
- futureEpochs.add(futureEpoch);
- }
-
- return futureEpochs.get(expectedIndex);
- }
-
- public long nextEpoch()
- {
- return current().epoch + 1;
- }
-
- public long minEpoch()
- {
- if (currentEpoch == 0)
- return 0;
- return currentEpoch - epochs.length + 1;
- }
-
- public long epoch()
- {
- return currentEpoch;
- }
-
- public Topology current()
- {
- return epochs.length > 0 ? epochs[0].global() : Topology.EMPTY;
- }
-
- public Topology currentLocal()
- {
- return epochs.length > 0 ? epochs[0].local() : Topology.EMPTY;
- }
-
- /**
- * Mark sync complete for the given node/epoch, and if this epoch
- * is now synced, update the prevSynced flag on superseding epochs
- */
- public void syncComplete(Id node, long epoch)
- {
- Invariants.requireArgument(epoch > 0);
- int i = indexOf(epoch);
- if (epoch > currentEpoch)
- {
- pending(epoch).syncComplete.add(node);
- if (epochs.length == 0)
- return;
- i = 0;
- }
- else if (i < 0)
- {
- Invariants.require(epoch < minEpoch(), "Could not find epoch
%d. Min: %d, current: %d", epoch, minEpoch(), currentEpoch);
- return;
- }
-
- recordSyncComplete(epochs, i, node);
- }
-
- /**
- * Mark the epoch as "closed" for the provided ranges; this means that
no new transactions
- * that intersect with this range may be proposed in the epoch (they
will be rejected).
- */
- public Ranges epochClosed(Ranges ranges, long epoch)
- {
- Invariants.requireArgument(epoch > 0);
- int i = indexOf(epoch);
- if (epoch > currentEpoch)
- {
- pending(epoch).closed.union(MERGE_ADJACENT, ranges);
- if (epochs.length == 0)
- return ranges;
- i = 0;
- }
- else if (i < 0)
- {
- Invariants.require(epoch < minEpoch(), "Could not find epoch
%d. Min: %d, current: %d", epoch, minEpoch(), currentEpoch);
- return Ranges.EMPTY; // notification came for an already
truncated epoch
- }
-
- Ranges report = ranges = epochs[i++].recordClosed(ranges);
- while (!ranges.isEmpty() && i < epochs.length)
- ranges = epochs[i++].recordClosed(ranges);
- return report;
- }
-
- /**
- * Mark the epoch as "retired" for the provided ranges; this means
that all transactions that can be
- * proposed for this epoch have now been executed globally.
- */
- public Ranges epochRetired(Ranges ranges, long epoch)
- {
- int i = indexOf(epoch);
- if (epoch > currentEpoch)
- {
- pending(epoch).retired.union(MERGE_ADJACENT, ranges);
- if (epochs.length == 0)
- return ranges;
- i = 0;
- }
- else if (i < 0)
- {
- Invariants.require(epoch < minEpoch(), "Could not find epoch
%d. Min: %d, current: %d", epoch, minEpoch(), currentEpoch);
- return Ranges.EMPTY; // notification came for an already
truncated epoch
- }
-
- Ranges report = ranges = epochs[i++].recordRetired(ranges);
- while (!ranges.isEmpty() && i < epochs.length)
- ranges = epochs[i++].recordRetired(ranges);
- return report;
- }
-
- private Notifications pending(long epoch)
- {
- Invariants.requireArgument(epoch > currentEpoch);
- int idx = (int) (epoch - (1 + currentEpoch));
- for (int i = pending.size(); i <= idx; i++)
- pending.add(new Notifications());
-
- return pending.get(idx);
- }
-
- @Nullable
- private EpochState get(long epoch)
- {
- int index = indexOf(epoch);
- if (index < 0)
- return null;
-
- return epochs[index];
- }
-
- private int indexOf(long epoch)
- {
- if (epoch > currentEpoch || epoch <= currentEpoch - epochs.length)
- return -1;
-
- return (int) (currentEpoch - epoch);
- }
- }
-
- private static void recordSyncComplete(EpochState[] epochs, int i, Id node)
- {
- EpochState.NodeSyncStatus status = epochs[i].recordSyncComplete(node);
- switch (status)
- {
- case Complete:
- i++;
- for (; i < epochs.length &&
epochs[i].recordSyncCompleteFromFuture(); i++) {}
- break;
- case Untracked:
- // don't have access to TopologyManager.this.node to check if
the nodes match... this state should not happen unless it is the same node
- case NoUpdate:
- case ShardUpdate:
- break;
- default:
- throw new UnsupportedOperationException("Unknown status " +
status);
- }
- }
-
- static class WaitingForEpoch extends AsyncResults.SettableResult<Void>
- {
- final long deadlineMicros;
- WaitingForEpoch(long deadlineMicros)
- {
- this.deadlineMicros = deadlineMicros;
- }
- }
-
- private static class FutureEpoch implements Timeouts.Timeout
- {
- private final long epoch;
- private final TopologyManager manager;
- private boolean isDone;
- private final ArrayDeque<WaitingForEpoch> waiting = new ArrayDeque<>();
- private RegisteredTimeout timeout;
-
- public FutureEpoch(long epoch, TopologyManager manager)
- {
- this.epoch = epoch;
- this.manager = manager;
- }
-
- // TODO (expected): pass through request deadline
- AsyncResult<Void> waiting()
- {
- WaitingForEpoch result, last;
- synchronized (this)
- {
- if (isDone)
- return AsyncResults.success(null);
-
- long timeoutMicros =
manager.agent.expireEpochWait(MICROSECONDS);
- long deadlineMicros = manager.time.elapsed(MICROSECONDS) +
timeoutMicros;
- result = last = waiting.peekLast();
- if (last == null || last.deadlineMicros < deadlineMicros)
- waiting.add(result = new WaitingForEpoch(deadlineMicros +
(timeoutMicros / 10)));
- }
- if (last == null)
- timeout = manager.timeouts.registerAt(this,
result.deadlineMicros, MICROSECONDS);
- return result;
- }
-
- private void setDone()
- {
- synchronized (this)
- {
- isDone = true;
- WaitingForEpoch next;
- while (null != (next = waiting.poll()))
- next.trySuccess(null);
- }
- RegisteredTimeout cancel = timeout;
- if (cancel != null)
- cancel.cancel();
- }
-
- @Override
- public void timeout()
- {
- long nextDeadlineMicros = 0;
- synchronized (this)
- {
- if (isDone)
- return;
-
- long nowMicros = manager.time.elapsed(MICROSECONDS);
- WaitingForEpoch next;
- while (null != (next = waiting.peek()) && (nextDeadlineMicros
= next.deadlineMicros) <= nowMicros)
- waiting.poll().tryFailure(EpochTimeout.timeout(epoch,
manager.agent));
- }
- if (nextDeadlineMicros > 0)
- timeout = manager.timeouts.registerAt(this,
nextDeadlineMicros, MICROSECONDS);
- }
-
- @Override
- public int stripe()
- {
- return (int) epoch;
- }
- }
-
- // this class could be just the list, but left it here in case we wish to
expose "futureEpochs" and "pending" as well
- public static class EpochsSnapshot implements
Iterable<EpochsSnapshot.Epoch>
- {
- public final ImmutableList<Epoch> epochs;
-
- public EpochsSnapshot(ImmutableList<Epoch> epochs)
- {
- this.epochs = epochs;
- }
-
- @Override
- public Iterator<Epoch> iterator()
- {
- return epochs.iterator();
- }
-
- public enum ResultStatus
- {
- PENDING("pending"),
- SUCCESS("success"),
- FAILURE("failure");
-
- public final String value;
-
- ResultStatus(String value)
- {
- this.value = value;
- }
-
- private static ResultStatus of(AsyncResult<?> result)
- {
- if (result == null || !result.isDone())
- return PENDING;
-
- return result.isSuccess() ? SUCCESS : FAILURE;
- }
- }
-
- public static class EpochReady
- {
- public final ResultStatus metadata, coordinate, data, reads;
-
- public EpochReady(ResultStatus metadata, ResultStatus coordinate,
ResultStatus data, ResultStatus reads)
- {
- this.metadata = metadata;
- this.coordinate = coordinate;
- this.data = data;
- this.reads = reads;
- }
-
- private static EpochReady of(ConfigurationService.EpochReady ready)
- {
- return new EpochReady(ResultStatus.of(ready.metadata),
- ResultStatus.of(ready.coordinate),
- ResultStatus.of(ready.data),
- ResultStatus.of(ready.reads));
- }
- }
-
- public static class Epoch
- {
- public final long epoch;
- public final EpochReady ready;
- public final Ranges global, addedRanges, removedRanges, synced,
closed, retired;
-
- public Epoch(long epoch, EpochReady ready, Ranges global, Ranges
addedRanges, Ranges removedRanges, Ranges synced, Ranges closed, Ranges retired)
- {
- this.epoch = epoch;
- this.ready = ready;
- this.global = global;
- this.addedRanges = addedRanges;
- this.removedRanges = removedRanges;
- this.synced = synced;
- this.closed = closed;
- this.retired = retired;
- }
- }
- }
-
- private final TopologySorter.Supplier sorter;
- private final TopologiesCollectors collector;
- private final BestFastPath bestFastPath;
- private final SupportsPrivilegedFastPath supportsPrivilegedFastPath;
- private final Agent agent;
- private final Id self;
- private final TimeService time;
- private final Timeouts timeouts;
- private volatile Epochs epochs;
-
- public TopologyManager(TopologySorter.Supplier sorter, Agent agent, Id
self, TimeService time, Timeouts timeouts)
- {
- this.sorter = sorter;
- this.collector = new TopologiesCollectors(sorter,
SelectNodeOwnership.SHARE);
- this.bestFastPath = new BestFastPath(self);
- this.supportsPrivilegedFastPath = new SupportsPrivilegedFastPath(self);
- this.agent = agent;
- this.self = self;
- this.time = time;
- this.timeouts = timeouts;
- this.epochs = new Epochs(new EpochState[0], new ArrayList<>(), new
ArrayList<>(), -1);;
- }
-
- public EpochsSnapshot epochsSnapshot()
- {
- // Write to this volatile variable is done via synchronized, so this
is single-writer multi-consumer; safe to read without locks
- Epochs epochs = this.epochs;
- ImmutableList.Builder<EpochsSnapshot.Epoch> builder =
ImmutableList.builderWithExpectedSize(epochs.epochs.length);
- for (int i = 0; i < epochs.epochs.length; i++)
- {
- // This class's state is mutable with regaurd to: ready, synced,
closed, retired
- EpochState epoch = epochs.epochs[i];
- // Even though this field is populated with the same lock epochs
is, it is done before publishing to epochs!
- // For this reason the field maybe null, in which case we need to
use the lock to wait for the field.
- EpochReady ready;
- Ranges global, addedRanges, removedRanges, synced, closed, retired;
- global = epoch.global.ranges.mergeTouching();
- addedRanges = epoch.addedRanges;
- removedRanges = epoch.removedRanges;
- // ready, synced, closed, and retired all rely on TM's object lock
- synchronized (this)
- {
- ready = epoch.ready;
- synced = epoch.synced;
- closed = epoch.closed;
- retired = epoch.retired;
- }
- builder.add(new EpochsSnapshot.Epoch(epoch.epoch(),
EpochsSnapshot.EpochReady.of(ready), global, addedRanges, removedRanges,
synced, closed, retired));
- }
- return new EpochsSnapshot(builder.build());
- }
-
- public List<Topology> topologySnapshot()
- {
- // Write to this volatile variable is done via synchronized, so this
is single-writer multi-consumer; safe to read without locks
- Epochs epochs = this.epochs;
- ImmutableList.Builder<Topology> builder =
ImmutableList.builderWithExpectedSize(epochs.epochs.length);
- for (int i = 0; i < epochs.epochs.length; i++)
- {
- // This class's state is mutable with regaurd to: ready, synced,
closed, retired
- EpochState epoch = epochs.epochs[i];
- builder.add(epoch.global);
- }
- return builder.build();
- }
-
- public EpochReady onTopologyUpdate(Topology topology, Supplier<EpochReady>
bootstrap, LongConsumer truncate)
- {
- FutureEpoch notifyDone;
- EpochReady ready;
- Epochs prev;
- Epochs next;
- synchronized (this)
- {
- prev = epochs;
- Invariants.requireArgument(topology.epoch == prev.nextEpoch() ||
epochs.epochs.length == 0,
- "Expected topology update %d to be %d",
topology.epoch, prev.nextEpoch());
-
- Ranges prevAll = prev.epochs.length == 0 ? Ranges.EMPTY :
prev.epochs[0].global.ranges;
- List<Epochs.Notifications> pending = prev.pending.size() <= 1 ?
new ArrayList<>() : new ArrayList<>(prev.pending.subList(1,
prev.pending.size()));
-
- EpochState[] nextEpochs = new EpochState[prev.epochs.length + 1];
- System.arraycopy(prev.epochs, 0, nextEpochs, 1,
prev.epochs.length);
- nextEpochs[0] = new EpochState(self, topology,
sorter.get(topology), prevAll);
- if (!prev.pending.isEmpty())
- {
- // TODO (expected): we should invoke the same code as we do
when receiving normally, to prevent divergence
- prev.pending.get(0).syncComplete.forEach(id ->
recordSyncComplete(nextEpochs, 0, id));
- for (Epochs.Notifications notifications : prev.pending)
- {
- nextEpochs[0].recordClosed(notifications.closed);
- nextEpochs[0].recordRetired(notifications.retired);
- }
- }
- if (prev.epochs.length > 0 &&
!prev.epochs[0].global.hardRemoved.containsAll(topology.hardRemoved))
- {
- IdentityHashMap<Shard, Shard> cache = new IdentityHashMap<>();
- for (int i = nextEpochs.length - 1 ; i >= 0 ; --i)
- {
- EpochState cur = nextEpochs[i];
- Topology newGlobal =
nextEpochs[i].global.withHardRemoved(topology.hardRemoved, cache);
- if (newGlobal != cur.global)
- nextEpochs[i] = new EpochState(self, newGlobal,
cur.curShardSyncComplete, cur.syncTracker, cur.addedRanges, cur.removedRanges,
cur.ready, cur.synced, cur.closed, cur.retired);
- }
- }
-
- List<FutureEpoch> futureEpochs = new
ArrayList<>(prev.futureEpochs);
- notifyDone = !futureEpochs.isEmpty() ? futureEpochs.remove(0) :
null;
- next = new Epochs(nextEpochs, pending, futureEpochs,
prev.firstNonEmptyEpoch);
- epochs = next;
- ready = nextEpochs[0].ready = bootstrap.get();
- }
-
- if (next.minEpoch() != prev.minEpoch())
- truncate.accept(epochs.minEpoch());
-
- if (notifyDone != null)
- notifyDone.setDone();
-
- return ready;
- }
-
- public AsyncChain<Void> awaitEpoch(long epoch, @Nullable AsyncExecutor
ifAsync)
- {
- FutureEpoch futureEpoch;
- synchronized (this)
- {
- futureEpoch = epochs.awaitEpoch(epoch, this);
- }
- return futureEpoch.waiting().chainImmediatelyElse(ifAsync);
- }
-
- public synchronized boolean hasReachedQuorum(long epoch)
- {
- EpochState state = epochs.get(epoch);
- return state != null && state.hasReachedQuorum();
- }
-
- @VisibleForTesting
- public EpochReady epochReady(long epoch)
- {
- Epochs epochs = this.epochs;
-
- if (epoch < epochs.minEpoch())
- return EpochReady.done(epoch);
-
- if (epoch > epochs.currentEpoch)
- throw new IllegalArgumentException(String.format("Epoch %d is
larger than current epoch %d", epoch, epochs.currentEpoch));
-
- return epochs.get(epoch).ready;
- }
-
- public synchronized void onEpochSyncComplete(Id node, long epoch)
- {
- epochs.syncComplete(node, epoch);
- }
-
- @VisibleForTesting
- public Ranges syncComplete(long epoch)
- {
- return epochs.get(epoch).synced;
- }
-
- public synchronized void truncateTopologiesUntil(long epoch)
- {
- Epochs current = epochs;
- Invariants.requireArgument(current.epoch() >= epoch, "Unable to
truncate; epoch %d is > current epoch %d", epoch , current.epoch());
-
- if (current.minEpoch() >= epoch)
- return;
-
- int newLen = current.epochs.length - (int) (epoch -
current.minEpoch());
- Invariants.require(current.epochs[newLen - 1].syncComplete(), "Epoch
%d's sync is not complete", current.epochs[newLen - 1].epoch());
-
- EpochState[] nextEpochs = new EpochState[newLen];
- System.arraycopy(current.epochs, 0, nextEpochs, 0, newLen);
- epochs = new Epochs(nextEpochs, current.pending, current.futureEpochs,
current.firstNonEmptyEpoch);
- }
-
- public synchronized void onEpochClosed(Ranges ranges, long epoch)
- {
- epochs.epochClosed(ranges, epoch);
- }
-
- /**
- * If ranges were added in epoch X, and are _not_ present in the current
epoch, they
- * are purged and durability scheduling for them should be cancelled.
- */
- public synchronized boolean isFullyRetired(Ranges ranges)
- {
- Epochs epochs = this.epochs;
- EpochState current = epochs.get(epochs.currentEpoch);
- if (!current.addedRanges.containsAll(ranges))
- return false;
-
- long minEpoch = epochs.minEpoch();
- for (long i = minEpoch; i < epochs.currentEpoch; i++)
- {
- EpochState retiredIn = epochs.get(i);
- if (retiredIn.allRetired() &&
retiredIn.addedRanges.containsAll(ranges))
- return true;
- }
-
- return false;
- }
-
- public synchronized void onEpochRetired(Ranges ranges, long epoch)
- {
- epochs.epochRetired(ranges, epoch);
- }
-
- public TopologySorter.Supplier sorter()
- {
- return sorter;
- }
-
- public Topology current()
- {
- return epochs.current();
- }
-
- public Topology currentLocal()
- {
- return epochs.currentLocal();
- }
-
- public boolean isEmpty()
- {
- return epochs.epochs.length == 0;
- }
-
- public long epoch()
- {
- return current().epoch;
- }
-
- // TODO (desired): add tests for epoch GC and tracking
- @VisibleForImplementation
- public long firstNonEmpty()
- {
- return epochs.firstNonEmptyEpoch;
- }
+/**
+ * Manages topology state changes and update bookkeeping
+ *
+ * Each time the topology changes we need to:
+ * * confirm previous owners of ranges we replicate are aware of the new config
+ * * learn of any outstanding operations for ranges we replicate
+ * * clean up obsolete data
+ *
+ * Assumes a topology service that won't report epoch n without having n-1 etc
also available
+ *
+ * TODO (desired, efficiency/clarity): make TopologyManager a Topologies and
copy-on-write update to it,
+ * so we can always just take a reference for transactions instead of copying
every time (and index into it by the txnId.epoch)
+ */
+public class TopologyManager
+{
+ private static final Logger logger =
LoggerFactory.getLogger(TopologyManager.class);
+ private static final PendingEpoch SUCCESS;
- public long minEpoch()
+ static
{
- Epochs epochs = this.epochs;
- return epochs.minEpoch();
+ SUCCESS = new PendingEpoch(-1L, null);
+ SUCCESS.setActive();
}
- @VisibleForTesting
- EpochState getEpochStateUnsafe(long epoch)
- {
- return epochs.get(epoch);
- }
+ final TopologySorter.Supplier sorter;
+ final Simple collector;
+ final BestFastPath bestFastPath;
+ final SupportsPrivilegedFastPath supportsPrivilegedFastPath;
+ final Node node;
+ final TopologyService topologyService;
+ final TimeService time;
+ final Timeouts timeouts;
+ private volatile ActiveEpochs active;
+ private final PendingEpochs pending;
+ private final CopyOnWriteArrayList<TopologyListener> listeners = new
CopyOnWriteArrayList<>();
- /**
- * Fetch topologies between {@param minEpoch} (inclusive), and {@param
maxEpoch} (inclusive).
- */
- public TopologyRange between(long minEpoch, long maxEpoch)
+ public TopologyManager(TopologySorter.Supplier sorter, Node node,
TopologyService topologyService, TimeService time, Timeouts timeouts)
{
- Epochs epochs = this.epochs;
- // No epochs known to Accord
- if (epochs.firstNonEmptyEpoch == -1 || minEpoch > epochs.currentEpoch)
- return new TopologyRange(epochs.minEpoch(), epochs.currentEpoch,
epochs.firstNonEmptyEpoch, Collections.emptyList());
-
- minEpoch = Math.max(minEpoch, epochs.minEpoch());
- int diff = Math.toIntExact(epochs.currentEpoch - minEpoch + 1);
- List<Topology> topologies = new ArrayList<>(diff);
- for (int i = 0; epochs.minEpoch() + i <= maxEpoch && i < diff; i++)
- topologies.add(epochs.get(minEpoch + i).global);
-
- return new TopologyRange(epochs.minEpoch(), epochs.currentEpoch,
epochs.firstNonEmptyEpoch, topologies);
+ this.sorter = sorter;
+ this.collector = new Simple(sorter, SelectNodeOwnership.SHARE);
+ this.bestFastPath = new BestFastPath(node.id());
+ this.supportsPrivilegedFastPath = new
SupportsPrivilegedFastPath(node.id());
+ this.node = node;
+ this.time = time;
+ this.timeouts = timeouts;
+ this.topologyService = topologyService;
+ this.active = new ActiveEpochs(this, new ActiveEpoch[0], -1);
+ this.pending = new PendingEpochs(this);
}
- public static class TopologyRange
+ public void onReadyToCoordinate(Id node, long epoch)
{
- public final long min;
- public final long current;
- public final long firstNonEmpty;
- public final List<Topology> topologies;
-
- public TopologyRange(long min, long current, long firstNonEmpty,
List<Topology> topologies)
+ synchronized (this)
{
- this.min = min;
- this.current = current;
- this.topologies = topologies;
- this.firstNonEmpty = firstNonEmpty;
+ if (epoch >= active.minEpoch())
+ active.onReadyToCoordinate(node, epoch);
+ if (epoch > active.currentEpoch)
+ pending.remoteReadyToCoordinate(node, epoch);
}
+ for (TopologyListener listener : listeners)
+ listener.onRemoteReadyToCoordinate(node, epoch);
+ }
- public void forEach(Consumer<Topology> forEach, long minEpoch, int
count)
+ public void onEpochClosed(Ranges ranges, long epoch)
+ {
+ Topology topology;
+ synchronized (this)
{
- if (minEpoch == 0) // Bootstrap
- minEpoch = this.min;
-
- long emptyUpTo = firstNonEmpty == -1 ? current : firstNonEmpty - 1;
- // Report empty epochs
- for (long epoch = minEpoch; epoch <= emptyUpTo && count > 0;
epoch++, count--)
- forEach.accept(new Topology(epoch));
-
- // Report known non-empty epochs
- for (int i = 0; i < topologies.size() && count > 0; i++, count--)
- {
- Topology topology = topologies.get(i);
- forEach.accept(topology);
- }
+ topology = active.maybeGlobalForEpoch(epoch);
+ if (epoch > active.currentEpoch)
+ ranges = pending.closed(ranges, epoch);
+ ranges = active.closed(ranges, epoch);
}
-
- @Override
- public boolean equals(Object o)
+ if (!ranges.isEmpty())
{
- if (o == null || getClass() != o.getClass()) return false;
- TopologyRange that = (TopologyRange) o;
- return min == that.min && current == that.current && firstNonEmpty
== that.firstNonEmpty && Objects.equals(topologies, that.topologies);
+ for (TopologyListener listener : listeners)
+ listener.onEpochClosed(ranges, epoch, topology);
}
+ }
- @Override
- public int hashCode()
+ public void onEpochRetired(Ranges ranges, long epoch)
+ {
+ Topology topology;
+ synchronized (this)
{
- return Objects.hash(min, current, firstNonEmpty, topologies);
+ topology = active.maybeGlobalForEpoch(epoch);
+ if (epoch > active.currentEpoch)
+ ranges = pending.retired(ranges, epoch);
+ ranges = active.retired(ranges, epoch);
}
-
- @Override
- public String toString()
+ if (!ranges.isEmpty())
{
- return String.format("TopologyRange{min=%d, current=%d,
firstNonEmpty=%d, topologies=[%s]}",
- min,
- current,
- firstNonEmpty,
- topologies.stream().map(t ->
Long.toString(t.epoch())).collect(joining(",")));
+ for (TopologyListener listener : listeners)
+ listener.onEpochRetired(ranges, epoch, topology);
}
}
- public Topologies preciseEpochs(long epoch)
+ public synchronized void truncateTopologiesUntil(long epoch)
{
- return new Single(sorter, epochs.get(epoch).global);
- }
+ ActiveEpochs current = active;
+ Invariants.requireArgument(current.epoch() >= epoch, "Unable to
truncate; epoch %d is > current epoch %d", epoch , current.epoch());
- // TODO (testing): test all of these methods when asking for epochs that
have been cleaned up (and other code paths)
+ if (current.minEpoch() >= epoch)
+ return;
- /**
- * Returns topologies containing epochs where specified ranges haven't
completed synchronization between min/max epochs.
- * Can be used during coordination operations to ensure they contact all
relevant nodes across topology changes,
- * particularly when some ranges are still syncing after cluster
membership changes.
- */
- public Topologies withUnsyncedEpochs(Unseekables<?> select, Timestamp min,
Timestamp max)
- {
- return withUnsyncedEpochs(select, min.epoch(), max.epoch());
- }
+ int newLen = current.epochs.length - (int) (epoch -
current.minEpoch());
+ Invariants.require(current.epochs[newLen - 1].isQuorumReady(), "Epoch
%d is not ready to coordinate", current.epochs[newLen - 1].epoch());
- public Topologies select(Unseekables<?> select, Timestamp min, Timestamp
max, SelectNodeOwnership selectNodeOwnership, Include include)
- {
- return select(select, min.epoch(), max.epoch(), selectNodeOwnership,
include);
+ ActiveEpoch[] nextEpochs = new ActiveEpoch[newLen];
+ System.arraycopy(current.epochs, 0, nextEpochs, 0, newLen);
+ active = new ActiveEpochs(this, nextEpochs,
current.firstNonEmptyEpoch);
}
- public Topologies select(Unseekables<?> select, long minEpoch, long
maxEpoch, SelectNodeOwnership selectNodeOwnership, Include include)
+ public TopologySorter.Supplier sorter()
{
- switch (include)
- {
- default: throw new AssertionError("Unhandled Include: " +include);
- case Unsynced: return withUnsyncedEpochs(select, minEpoch,
maxEpoch);
- case Owned: return preciseEpochs(select, minEpoch, maxEpoch,
selectNodeOwnership);
- }
+ return sorter;
}
- public Topologies reselect(@Nullable Topologies prev, @Nullable Include
prevIncluded, Unseekables<?> select, Timestamp min, Timestamp max,
SelectNodeOwnership selectNodeOwnership, Include include)
+ public Topology current()
{
- return reselect(prev, prevIncluded, select, min.epoch(), max.epoch(),
selectNodeOwnership, include);
+ return active.current();
}
- // prevIncluded may be null even when prev is not null, in cases where we
do not know what prev was produced with
- public Topologies reselect(@Nullable Topologies prev, @Nullable Include
prevIncluded, Unseekables<?> select, long minEpoch, long maxEpoch,
SelectNodeOwnership selectNodeOwnership, Include include)
+ public Topology currentLocal()
{
- if (include == Owned)
- {
- if (prev != null && prev.currentEpoch() >= maxEpoch &&
prev.oldestEpoch() <= minEpoch)
- return prev.forEpochs(minEpoch, maxEpoch);
- else
- return preciseEpochs(select, minEpoch, maxEpoch,
selectNodeOwnership);
- }
- else
- {
- if (prevIncluded == Unsynced && prev != null &&
prev.currentEpoch() == maxEpoch && prev.oldestEpoch() == minEpoch)
- return prev;
- else // TODO (desired): can we avoid recalculating when only
minEpoch advances?
- return withUnsyncedEpochs(select, minEpoch, maxEpoch);
- }
-
+ return active.currentLocal();
}
- public <U extends Participants<?>> @Nullable U unsyncedOnly(U select, long
beforeEpoch)
+ public boolean isEmpty()
{
- return extra(select, 0, beforeEpoch, cur -> cur.synced,
(UnsyncedSelector<U>)UnsyncedSelector.INSTANCE);
+ return active.isEmpty() && pending.isEmpty();
}
- public Topologies withUnsyncedEpochs(Unseekables<?> select, long minEpoch,
long maxEpoch)
+ public long epoch()
{
- Invariants.requireArgument(minEpoch <= maxEpoch, "min epoch %d > max
%d", minEpoch, maxEpoch);
- return withSufficientEpochsAtLeast(select, minEpoch, maxEpoch,
epochState -> epochState.synced);
+ return current().epoch;
}
- public FastPath selectFastPath(Routables<?> select, long epoch)
+ // TODO (desired): add tests for epoch GC and tracking
+ @VisibleForImplementation
+ public long firstNonEmpty()
{
- return atLeast(select, epoch, epoch, epochState -> epochState.synced,
bestFastPath);
+ return active.firstNonEmptyEpoch;
}
- public boolean supportsPrivilegedFastPath(Routables<?> select, long epoch)
+ public long minEpoch()
{
- return atLeast(select, epoch, epoch, epochState -> epochState.synced,
supportsPrivilegedFastPath);
+ ActiveEpochs epochs = this.active;
+ return epochs.minEpoch();
}
- public Topologies withOpenEpochs(Routables<?> select, @Nullable
EpochSupplier min, @Nullable EpochSupplier max)
- {
- return withSufficientEpochsAtMost(select,
- min == null ? Long.MIN_VALUE :
min.epoch(),
- max == null ? Long.MAX_VALUE :
max.epoch(),
- prev -> prev.closed);
- }
+ // TODO (testing): test all of these methods when asking for epochs that
have been cleaned up (and other code paths)
- public Topologies withUncompletedEpochs(Unseekables<?> select, @Nullable
EpochSupplier min, EpochSupplier max)
+ public ActiveEpochs active()
{
- return withSufficientEpochsAtLeast(select,
- min == null ? Long.MIN_VALUE :
min.epoch(),
- max == null ? Long.MAX_VALUE :
max.epoch(),
- prev -> prev.retired);
+ return active;
}
- private Topologies withSufficientEpochsAtLeast(Unseekables<?> select, long
minEpoch, long maxEpoch, Function<EpochState, Ranges> isSufficientFor)
+ public void addListener(TopologyListener listener)
{
- return atLeast(select, minEpoch, maxEpoch, isSufficientFor, collector);
+ listeners.add(listener);
}
- private <C, K extends Routables<?>, T> T atLeast(K select, long minEpoch,
long maxEpoch, Function<EpochState, Ranges> isSufficientFor,
- Collectors<C, K, T>
collectors) throws IllegalArgumentException
+ public void removeListener(TopologyListener listener)
{
- Invariants.requireArgument(minEpoch <= maxEpoch);
- Epochs snapshot = epochs;
- if (maxEpoch < snapshot.minEpoch())
- throw new TopologyRetiredException(maxEpoch, snapshot.minEpoch());
-
- if (maxEpoch == Long.MAX_VALUE) maxEpoch = snapshot.currentEpoch;
- else Invariants.require(snapshot.currentEpoch >= maxEpoch, "current
epoch %d < max %d", snapshot.currentEpoch, maxEpoch);
-
- EpochState maxEpochState = nonNull(snapshot.get(maxEpoch));
- if (minEpoch == maxEpoch &&
isSufficientFor.apply(maxEpochState).containsAll(select))
- return collectors.one(maxEpochState, select, false);
-
- int i = (int)(snapshot.currentEpoch - maxEpoch);
- int maxi = (int)(Math.min(1 + snapshot.currentEpoch - minEpoch,
snapshot.epochs.length));
- C collector = collectors.allocate(maxi - i);
-
- // Previous logic would exclude synced ranges, but this was removed as
that makes min epoch selection harder.
- // An issue was found where a range was removed from a replica and min
selection picked the epoch before that,
- // which caused a node to get included in the txn that actually lost
the range
- // See CASSANDRA-18804
- while (i < maxi)
- {
- EpochState epochState = snapshot.epochs[i++];
- collector = collectors.update(collector, epochState, select,
false);
- select = (K)select.without(epochState.addedRanges);
- }
-
- if (select.isEmpty())
- return collectors.multi(collector);
-
- if (i == snapshot.epochs.length)
- {
- // now we GC epochs, we cannot rely on addedRanges to remove all
ranges, so we also remove the ranges found in the earliest epoch we have
- select = (K)select.without(snapshot.epochs[snapshot.epochs.length
- 1].global.ranges);
- if (!select.isEmpty())
- throw Invariants.illegalArgument("Ranges %s could not be
found", select);
- return collectors.multi(collector);
- }
-
- // remaining is updated based off isSufficientFor, but select is not
- Routables<?> remaining = select;
-
- // include any additional epochs to reach sufficiency
- EpochState prev = snapshot.epochs[maxi - 1];
- do
- {
- remaining = remaining.without(isSufficientFor.apply(prev));
- Routables<?> prevSelect = select;
- select = (K)select.without(prev.addedRanges);
- if (prevSelect != select) // perf optimization; if select wasn't
changed (it does not intersect addedRanges), then remaining won't
- remaining = remaining.without(prev.addedRanges);
- if (remaining.isEmpty())
- return collectors.multi(collector);
-
- EpochState next = snapshot.epochs[i++];
- collector = collectors.update(collector, next, select, false);
- prev = next;
- } while (i < snapshot.epochs.length);
- // need to remove sufficient / added else remaining may not be empty
when the final matches are the last epoch
- remaining = remaining.without(isSufficientFor.apply(prev));
- remaining = remaining.without(prev.addedRanges);
- // TODO (desired): propagate addedRanges to the earliest epoch we
retain for consistency
- // now we GC epochs, we cannot rely on addedRanges to remove all
ranges, so we also remove the ranges found in the earliest epoch we have
- remaining = remaining.without(snapshot.epochs[snapshot.epochs.length -
1].global.ranges);
- if (!remaining.isEmpty())
- Invariants.illegalArgument("Ranges %s could not be found",
remaining);
-
- return collectors.multi(collector);
+ listeners.remove(listener);
}
- private Topologies withSufficientEpochsAtMost(Routables<?> select, long
minEpoch, long maxEpoch, Function<EpochState, Ranges> isSufficientFor)
+ protected Executor executor()
{
- return atMost(select, minEpoch, maxEpoch, isSufficientFor, collector);
+ return Runnable::run;
}
- private <C, K extends Routables<?>, T> T atMost(K select, long minEpoch,
long maxEpoch, Function<EpochState, Ranges> isSufficientFor,
- Collectors<C, K, T>
collectors)
+ public void reportTopology(Topology topology)
{
- Invariants.requireArgument(minEpoch <= maxEpoch);
- Epochs snapshot = epochs;
-
- minEpoch = Math.max(snapshot.minEpoch(), minEpoch);
- maxEpoch = validateMax(maxEpoch, snapshot);
-
- EpochState cur = nonNull(snapshot.get(maxEpoch));
- if (minEpoch == maxEpoch)
- {
- // TODO (required): why are we testing isSufficientFor here?
minEpoch == maxEpoch, we should always return.
- if (isSufficientFor.apply(cur).containsAll(select))
- return collectors.one(cur, select, true);
- }
-
- int i = (int)(snapshot.currentEpoch - maxEpoch);
- int maxi = (int)(Math.min(1 + snapshot.currentEpoch - minEpoch,
snapshot.epochs.length));
- C collector = collectors.allocate(maxi - i);
-
- while (!select.isEmpty())
+ PendingEpoch e;
+ synchronized (this)
{
- collector = collectors.update(collector, cur, select, true);
- select = (K)select.without(cur.addedRanges)
- .without(isSufficientFor.apply(cur));
-
- if (++i == maxi)
- break;
+ long epoch = topology.epoch;
+ if (epoch <= active.currentEpoch)
+ {
+ logger.info("Ignoring topology for epoch {} which is behind
our latest epoch {}", epoch, active.currentEpoch);
+ return;
+ }
- cur = snapshot.epochs[i];
+ e = pending.getOrCreate(epoch);
+ e.setTopology(topology);
}
- return collectors.multi(collector);
+ logger.debug("Epoch {} received", topology.epoch());
+ for (TopologyListener listener : listeners)
+ listener.onReceived(topology);
+
+ updateActive();
}
- private <C, K extends Routables<?>, T> T extra(K select, long minEpoch,
long maxEpoch, Function<EpochState, Ranges> remove,
- Collectors<C, K, T>
collectors)
+ private final AtomicBoolean updatingActive = new AtomicBoolean();
+ private void updateActive()
{
- Invariants.requireArgument(minEpoch <= maxEpoch);
- Epochs snapshot = epochs;
-
- minEpoch = Math.max(snapshot.minEpoch(), minEpoch);
- maxEpoch = Math.min(maxEpoch, snapshot.currentEpoch);
- if (maxEpoch <= minEpoch)
- return collectors.none();
-
- EpochState cur = nonNull(snapshot.get(maxEpoch));
- select = (K) select.without(remove.apply(cur));
- if (select.isEmpty())
- return collectors.none();
-
- int i = (int)(snapshot.currentEpoch - maxEpoch);
- int maxi = (int)(Math.min(1 + snapshot.currentEpoch - minEpoch,
snapshot.epochs.length));
- C collector = collectors.allocate(maxi - i);
+ if (!updatingActive.compareAndSet(false, true))
+ return;
- while (!select.isEmpty())
+ try
{
- collector = collectors.update(collector, cur, select, true);
- select = (K)select.without(cur.addedRanges);
-
- if (++i == maxi)
- break;
-
- cur = snapshot.epochs[i];
- select = (K)select.without(remove.apply(cur));
- }
+ while (true)
+ {
+ Topology topology;
+ PendingEpoch pending;
+ synchronized (this)
+ {
+ if (this.pending.isEmpty() || (!this.active.isEmpty() &&
this.pending.atIndex(0).epoch > 1 + current().epoch()))
+ return;
- return collectors.multi(collector);
- }
+ pending = this.pending.atIndex(0);
+ topology = pending.topology();
+ if (topology == null)
+ return;
+ }
- private static long validateMax(long maxEpoch, Epochs snapshot)
- {
- if (maxEpoch == Long.MAX_VALUE)
- return snapshot.currentEpoch;
+ Supplier<EpochReady> bootstrap =
node.commandStores().updateTopology(node, topology);
+ AsyncResult.Settable<EpochReady> whenSetup = new
AsyncResults.SettableWithDescription<>("Publishing Active Epoch");
+ EpochReady epochReady = new EpochReady(topology.epoch,
+
NestedAsyncResult.flatMap(whenSetup, ignore -> AsyncResults.success(null)),
+
NestedAsyncResult.flatMap(whenSetup, EpochReady::coordinate),
+
NestedAsyncResult.flatMap(whenSetup, EpochReady::data),
+
NestedAsyncResult.flatMap(whenSetup, EpochReady::reads));
- Invariants.require(snapshot.currentEpoch >= maxEpoch, "current epoch
%d < provided max %d", snapshot.currentEpoch, maxEpoch);
- if (maxEpoch < snapshot.minEpoch())
- throw new TopologyRetiredException(maxEpoch, snapshot.minEpoch());
- return maxEpoch;
- }
+ if (!this.active.isEmpty())
+ {
+ ActiveEpoch prev = this.active.epochs[0];
+ Invariants.require(prev.epoch() == topology.epoch - 1);
+ epochReady = orderReporting(prev.epochReady(), epochReady);
+ }
- public Topologies preciseEpochs(Unseekables<?> select, long minEpoch, long
maxEpoch, SelectNodeOwnership selectNodeOwnership)
- {
- return preciseEpochs(select, minEpoch, maxEpoch, selectNodeOwnership,
Topology::select);
- }
+ ActiveEpoch active = new ActiveEpoch(node.id(), topology,
epochReady, sorter.get(topology), this.active.current().ranges);
- public Topologies preciseEpochsIfExists(Unseekables<?> select, long
minEpoch, long maxEpoch, SelectNodeOwnership selectNodeOwnership)
- {
- return preciseEpochs(select, minEpoch, maxEpoch, selectNodeOwnership,
Topology::selectIfExists);
- }
+ synchronized (this)
+ {
+ active.recordClosed(pending.closed);
+ active.recordRetired(pending.retired);
+ pending.ready.forEach(active::onReadyToCoordinate);
- public interface SelectFunction
- {
- Topology apply(Topology topology, Unseekables<?> select,
SelectNodeOwnership selectNodeOwnership);
- }
+ ActiveEpochs prev = this.active;
+ ActiveEpoch[] next = new ActiveEpoch[prev.epochs.length +
1];
+ System.arraycopy(prev.epochs, 0, next, 1,
prev.epochs.length);
+ next[0] = active;
- public Topologies preciseEpochs(Unseekables<?> select, long minEpoch, long
maxEpoch, SelectNodeOwnership selectNodeOwnership, SelectFunction
selectFunction)
- {
- Epochs snapshot = epochs;
+ if (!prev.isEmpty() &&
!prev.epochs[0].global.hardRemoved.containsAll(topology.hardRemoved))
+ {
+ IdentityHashMap<Shard, Shard> cache = new
IdentityHashMap<>();
+ for (int i = next.length - 1 ; i >= 0 ; --i)
+ {
+ ActiveEpoch e = next[i];
+ Topology newGlobal =
next[i].global.withHardRemoved(topology.hardRemoved, cache);
+ if (newGlobal != e.global)
+ {
+ next[i] = new ActiveEpoch(node.id(),
newGlobal, e.shardQuorumReady, e.receivedNodeReady, e.quorumReadyTracker,
+ e.addedRanges,
e.removedRanges, e.epochReady(), e.quorumReady(), e.closed(), e.retired());
+ }
+ }
+ }
- // TODO (expected): we should disambiguate minEpoch we can bump (i.e.
historical epochs) and those we cannot (i.e. txnId.epoch())
- minEpoch = Math.max(snapshot.minEpoch(), minEpoch);
- maxEpoch = validateMax(maxEpoch, snapshot);
- EpochState maxState = snapshot.get(maxEpoch);
+ this.active = new ActiveEpochs(this, next,
prev.firstNonEmptyEpoch);
+ this.pending.removeFirst(topology.epoch);
+ }
- if (maxState == null)
- throw new TopologyRetiredException(maxEpoch, snapshot.minEpoch());
+ EpochReady innerReady = bootstrap(bootstrap);
+ whenSetup.setSuccess(innerReady);
- if (minEpoch == maxEpoch)
- return new Single(sorter,
selectFunction.apply(snapshot.get(minEpoch).global, select,
selectNodeOwnership));
+ pending.setActive();
+ listeners.forEach(listener -> listener.onActive(active));
- int count = (int)(1 + maxEpoch - minEpoch);
- Topologies.Builder topologies = new Topologies.Builder(count);
- for (int i = count - 1 ; i >= 0 ; --i)
- {
- EpochState epochState = snapshot.get(minEpoch + i);
- topologies.add(selectFunction.apply(epochState.global, select,
selectNodeOwnership));
- select = select.without(epochState.addedRanges);
+ long epoch = topology.epoch;
+ Node.Id self = node.id();
+ innerReady.coordinate.invokeIfSuccess(() -> {
+ listeners.forEach(listener ->
listener.onReadyToCoordinate(topology));
+ onReadyToCoordinate(self, epoch);
+ });
+ }
}
- Invariants.require(!topologies.isEmpty(), "Unable to find an epoch
that contained %s", select);
-
- return topologies.build(sorter);
- }
-
- public Topologies forEpoch(Unseekables<?> select, long epoch,
SelectNodeOwnership selectNodeOwnership)
- {
- Epochs snapshot = epochs;
- EpochState state = snapshot.get(epoch);
- if (state == null)
- throw new TopologyRetiredException(epoch, snapshot.minEpoch());
- return new Single(sorter, state.global.select(select,
selectNodeOwnership));
- }
-
- public boolean hasReplicationMaybeChanged(Unseekables<?> select, long
sinceEpoch)
- {
- Epochs snapshot = epochs;
- if (snapshot.minEpoch() > sinceEpoch)
- return true;
-
- return atLeast(select, sinceEpoch, Long.MAX_VALUE, ignore ->
Ranges.EMPTY, HasChangedReplication.INSTANCE);
- }
-
- public Topologies forEpochAtLeast(Unseekables<?> select, long epoch,
SelectNodeOwnership selectNodeOwnership)
- {
- Epochs snapshot = this.epochs;
- EpochState state = snapshot.get(epoch);
- if (state == null)
+ finally
{
- Invariants.require(snapshot.currentEpoch >= epoch, "current epoch
%d < provided max %d", snapshot.currentEpoch, epoch);
- state = snapshot.get(snapshot.minEpoch());
+ updatingActive.set(false);
}
- return new Single(sorter, state.global.select(select,
selectNodeOwnership));
- }
-
- public Shard forEpochIfKnown(RoutableKey key, long epoch)
- {
- EpochState epochState = epochs.get(epoch);
- if (epochState == null)
- return null;
- return epochState.global().forKey(key);
- }
-
- public Shard forEpoch(RoutableKey key, long epoch)
- {
- Shard ifKnown = forEpochIfKnown(key, epoch);
- if (ifKnown == null)
- throw new IndexOutOfBoundsException();
- return ifKnown;
- }
-
- public boolean hasEpoch(long epoch)
- {
- return epochs.get(epoch) != null;
- }
-
- public boolean hasAtLeastEpoch(long epoch)
- {
- return epochs.currentEpoch >= epoch;
- }
-
- public Topology localForEpoch(long epoch)
- {
- if (epoch < minEpoch())
- throw new TopologyRetiredException(epoch, minEpoch());
- EpochState epochState = epochs.get(epoch);
- if (epochState == null)
- throw illegalState("Unknown epoch " + epoch);
- return epochState.local();
- }
-
- public Ranges localRangesForEpoch(long epoch)
- {
- if (epoch < minEpoch())
- throw new TopologyRetiredException(epoch, minEpoch());
- return epochs.get(epoch).local().rangesForNode(self);
}
- public Ranges localRangesForEpochs(long start, long end)
+ @VisibleForTesting
+ protected EpochReady bootstrap(Supplier<EpochReady> bootstrap)
{
- if (end < start) throw new IllegalArgumentException();
- Ranges ranges = localRangesForEpoch(start);
- for (long i = start + 1; i <= end ; ++i)
- ranges = ranges.with(localRangesForEpoch(i));
- return ranges;
+ return bootstrap.get();
}
- public Topology globalForEpoch(long epoch)
+ private static EpochReady orderReporting(EpochReady previous, EpochReady
next)
{
- EpochState epochState = epochs.get(epoch);
- if (epochState == null)
- throw new IllegalArgumentException("Unknown epoch: " + epoch);
- return epochState.global();
- }
+ if (previous.epoch + 1 != next.epoch)
+ throw new IllegalArgumentException("Attempted to order epochs but
they are not next to each other... previous=" + previous.epoch + ", next=" +
next.epoch);
+ if (previous.coordinate.isDone() && previous.data.isDone() &&
previous.reads.isDone())
+ return next;
- public Topology maybeGlobalForEpoch(long epoch)
- {
- EpochState epochState = epochs.get(epoch);
- if (epochState == null)
- return null;
- return epochState.global();
+ return new EpochReady(next.epoch,
+ next.active,
+ NestedAsyncResult.flatMap(previous.coordinate,
ignore -> next.coordinate),
+ NestedAsyncResult.flatMap(previous.data, ignore
-> next.data),
+ NestedAsyncResult.flatMap(previous.reads, ignore
-> next.reads)
+ );
}
- static class TopologiesCollectors implements
Collectors<Topologies.Builder, Routables<?>, Topologies>
+ public AsyncChain<Void> await(long epoch, @Nullable AsyncExecutor ifAsync)
{
- final TopologySorter.Supplier sorter;
- final SelectNodeOwnership selectNodeOwnership;
-
- TopologiesCollectors(TopologySorter.Supplier sorter,
SelectNodeOwnership selectNodeOwnership)
- {
- this.sorter = sorter;
- this.selectNodeOwnership = selectNodeOwnership;
- }
-
- @Override
- public Topologies.Builder update(Topologies.Builder collector,
EpochState epoch, Routables<?> select, boolean permitMissing)
+ PendingEpoch pendingEpoch;
+ boolean fetch;
+ synchronized (this)
{
- collector.add(epoch.global.select(select, permitMissing,
selectNodeOwnership));
- return collector;
- }
+ if (epoch <= active.currentEpoch)
+ return AsyncChains.success(null);
- @Override
- public Topologies one(EpochState epoch, Routables<?> unseekables,
boolean permitMissing)
- {
- return new Topologies.Single(sorter,
epoch.global.select(unseekables, permitMissing, selectNodeOwnership));
+ pendingEpoch = pending.getOrCreate(epoch);
+ fetch = pendingEpoch.fetching == null;
}
- @Override
- public Topologies multi(Topologies.Builder builder)
+ AsyncChain<Void> result =
pendingEpoch.whenActive().chainImmediatelyElse(ifAsync);
+ if (fetch)
{
- return builder.build(sorter);
- }
+ while (true)
+ {
+ fetch(pendingEpoch);
+ --epoch;
+ synchronized (this)
+ {
+ if (epoch <= active.currentEpoch)
+ break;
- @Override
- public Topologies.Builder allocate(int count)
- {
- return new Topologies.Builder(count);
+ pendingEpoch = pending.getOrCreate(epoch);
+ if (pendingEpoch.fetching != null)
+ break;
+ }
+ }
}
+ return result;
}
- static class BestFastPath implements Collectors<FastPath, Routables<?>,
FastPath>, IndexedBiFunction<Shard, Boolean, Boolean>
+ private void fetch(PendingEpoch pending)
{
- final Id self;
-
- BestFastPath(Id self)
- {
- this.self = self;
- }
-
- @Override
- public FastPath update(FastPath collector, EpochState epoch,
Routables<?> select, boolean permitMissing)
- {
- return merge(collector, one(epoch, select, permitMissing));
- }
-
- @Override
- public FastPath one(EpochState epoch, Routables<?> routables, boolean
permitMissing)
- {
- if (!epoch.local.ranges.containsAll(routables) ||
!epoch.local.foldl(routables, this, true))
- return Unoptimised;
-
- return epoch.local.foldl(routables, (s, v, i) -> merge(v,
s.bestFastPath()), null);
- }
-
- @Override
- public FastPath multi(FastPath result)
- {
- return result;
- }
-
- @Override
- public FastPath allocate(int count)
+ synchronized (this)
{
- return null;
- }
+ if (pending.topology() != null || pending.epoch <
active.currentEpoch)
+ return;
- private static FastPath merge(FastPath a, FastPath b)
- {
- if (a == null) return b;
- if (a == Unoptimised || b == Unoptimised) return Unoptimised;
- if (a == PrivilegedCoordinatorWithDeps || b ==
PrivilegedCoordinatorWithDeps) return PrivilegedCoordinatorWithDeps;
- return PrivilegedCoordinatorWithoutDeps;
+ Invariants.require(pending.fetching == null ||
pending.fetching.isDone());
+ pending.fetching =
topologyService.fetchTopologyForEpoch(pending.epoch);
}
- @Override
- public Boolean apply(Shard shard, Boolean prev, int index)
- {
- return prev && shard.isInFastPath(self);
- }
+ pending.fetching.invoke((success, fail) -> {
+ if (fail == null) reportTopology(success);
+ else if (active.currentEpoch < pending.epoch && pending.topology()
== null)
+ {
+ // TODO (expected): special casing of TopologyRetiredException?
+ logger.warn("Failed to fetch epoch {}. Retrying.",
pending.epoch, fail);
+ node.agent().onCaughtException(fail, "Fetch epoch " +
pending.epoch);
+ long retryInMicros = node.agent().retryTopologyDelay(node, 1 +
++pending.fetchAttempts, TimeUnit.MICROSECONDS);
Review Comment:
it should effectively have mutual exclusivity, as we can only have one fetch
inflight at once, and only retry from the preceding fetch
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]