ifesdjeen commented on code in PR #256: URL: https://github.com/apache/cassandra-accord/pull/256#discussion_r2490820285
########## accord-core/src/main/java/accord/topology/ActiveEpochs.java: ########## @@ -0,0 +1,723 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package accord.topology; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.function.BiFunction; +import java.util.function.Function; +import java.util.stream.Stream; +import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import accord.api.ProtocolModifiers; +import accord.local.Node; +import accord.primitives.EpochSupplier; +import accord.primitives.Participants; +import accord.primitives.Ranges; +import accord.primitives.RoutableKey; +import accord.primitives.Routables; +import accord.primitives.Timestamp; +import accord.primitives.TxnId; +import accord.primitives.Unseekables; +import accord.utils.Invariants; + +import static accord.api.ProtocolModifiers.QuorumEpochIntersections.Include.Owned; +import static accord.api.ProtocolModifiers.QuorumEpochIntersections.Include.Unsynced; + +public final class ActiveEpochs implements Iterable<ActiveEpoch> +{ + private static final Logger logger = LoggerFactory.getLogger(ActiveEpochs.class); + + final TopologyManager manager; + final long currentEpoch; + // TODO (desired): move this to TopologyManager + final long firstNonEmptyEpoch; + // Epochs are sorted in _descending_ order + final ActiveEpoch[] epochs; + + ActiveEpochs(TopologyManager manager, ActiveEpoch[] epochs, long prevFirstNonEmptyEpoch) + { + this.manager = manager; + this.currentEpoch = epochs.length > 0 ? epochs[0].epoch() : 0; + if (prevFirstNonEmptyEpoch != -1) + this.firstNonEmptyEpoch = prevFirstNonEmptyEpoch; + else if (epochs.length > 0 && !epochs[0].global().isEmpty()) + this.firstNonEmptyEpoch = currentEpoch; + else + this.firstNonEmptyEpoch = prevFirstNonEmptyEpoch; + + for (int i = 1; i < epochs.length; i++) + Invariants.requireArgument(epochs[i].epoch() == epochs[i - 1].epoch() - 1); + int truncateFrom = -1; + // > 0 because we do not want to be left without epochs in case they're all empty + for (int i = epochs.length - 1; i > 0; i--) + { + ActiveEpoch e = epochs[i]; + if (!e.allRetired()) + break; + truncateFrom = i; + } + + if (truncateFrom == -1) + { + this.epochs = epochs; + } + else + { + this.epochs = Arrays.copyOf(epochs, truncateFrom); + for (int i = truncateFrom; i < epochs.length; i++) + { + ActiveEpoch e = epochs[i]; + Invariants.require(epochs[i].isQuorumReady()); + logger.info("Retired epoch {} with added/removed ranges {}/{}. Topology: {}. Closed: {}", e.epoch(), e.addedRanges, e.removedRanges, e.global.ranges, e.closed()); + } + if (logger.isTraceEnabled()) + { + for (int i = 0; i < truncateFrom; i++) + { + ActiveEpoch e = epochs[i]; + Invariants.require(e.isQuorumReady()); + logger.trace("Leaving epoch {} with added/removed ranges {}/{}", e.epoch(), e.addedRanges, e.removedRanges); + } + } + } + } + + /** + * If ranges were added in epoch X, and are _not_ present in the current epoch, they + * are purged and durability scheduling for them should be cancelled. + */ + public synchronized boolean isFullyRetired(Ranges ranges) + { + // TODO (required): what was this trying to do? Seems potentially faulty (but maybe correct given other assumptions) + if (isEmpty()) + return false; + + ActiveEpoch cur = epochs[0]; + ranges = ranges.without(cur.addedRanges); + if (!cur.addedRanges.containsAll(ranges)) Review Comment: we remove cur.addedRanges from ranges but then check if cur.addedRanges containsAll ranges from what i understand, this could be true iif ranges are empty, right? ########## accord-core/src/main/java/accord/api/TopologyService.java: ########## @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package accord.api; + +import accord.local.Node; +import accord.topology.EpochReady; +import accord.topology.Topology; +import accord.utils.async.AsyncResult; + +/** + * ConfigurationService is responsible for: Review Comment: nit: TopologyService ########## accord-core/src/main/java/accord/topology/TopologyManager.java: ########## @@ -18,1625 +18,445 @@ package accord.topology; -import java.util.ArrayDeque; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.BitSet; -import java.util.Collections; import java.util.IdentityHashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Objects; -import java.util.Set; -import java.util.TreeSet; -import java.util.function.Consumer; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; -import java.util.function.LongConsumer; import java.util.function.Supplier; import javax.annotation.Nullable; -import javax.annotation.concurrent.GuardedBy; import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.ImmutableList; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import accord.api.Agent; import accord.api.AsyncExecutor; -import accord.api.ConfigurationService; -import accord.api.ConfigurationService.EpochReady; -import accord.api.ProtocolModifiers.QuorumEpochIntersections.Include; +import accord.api.TopologyListener; import accord.api.Timeouts; -import accord.api.Timeouts.RegisteredTimeout; +import accord.api.TopologyService; import accord.api.TopologySorter; import accord.api.VisibleForImplementation; -import accord.coordinate.EpochTimeout; -import accord.coordinate.tracking.QuorumTracker; +import accord.local.Node; import accord.local.Node.Id; import accord.local.TimeService; -import accord.primitives.EpochSupplier; -import accord.primitives.Participants; import accord.primitives.Ranges; -import accord.primitives.RoutableKey; -import accord.primitives.Routables; -import accord.primitives.Timestamp; -import accord.primitives.TxnId.FastPath; -import accord.primitives.Unseekables; import accord.topology.Topologies.SelectNodeOwnership; -import accord.topology.Topologies.Single; -import accord.utils.IndexedBiFunction; +import accord.topology.TopologyCollector.BestFastPath; +import accord.topology.TopologyCollector.Simple; +import accord.topology.TopologyCollector.SupportsPrivilegedFastPath; import accord.utils.Invariants; import accord.utils.async.AsyncChain; +import accord.utils.async.AsyncChains; import accord.utils.async.AsyncResult; import accord.utils.async.AsyncResults; +import accord.utils.async.NestedAsyncResult; -import static accord.api.ProtocolModifiers.QuorumEpochIntersections.Include.Owned; -import static accord.api.ProtocolModifiers.QuorumEpochIntersections.Include.Unsynced; -import static accord.coordinate.tracking.RequestStatus.Success; -import static accord.primitives.AbstractRanges.UnionMode.MERGE_ADJACENT; -import static accord.primitives.TxnId.FastPath.PrivilegedCoordinatorWithDeps; -import static accord.primitives.TxnId.FastPath.PrivilegedCoordinatorWithoutDeps; -import static accord.primitives.TxnId.FastPath.Unoptimised; -import static accord.utils.Invariants.illegalState; -import static accord.utils.Invariants.nonNull; -import static java.util.concurrent.TimeUnit.MICROSECONDS; -import static java.util.stream.Collectors.joining; - -/** - * Manages topology state changes and update bookkeeping - * - * Each time the topology changes we need to: - * * confirm previous owners of ranges we replicate are aware of the new config - * * learn of any outstanding operations for ranges we replicate - * * clean up obsolete data - * - * Assumes a topology service that won't report epoch n without having n-1 etc also available - * - * TODO (desired, efficiency/clarity): make TopologyManager a Topologies and copy-on-write update to it, - * so we can always just take a reference for transactions instead of copying every time (and index into it by the txnId.epoch) - */ -public class TopologyManager -{ - private static final Logger logger = LoggerFactory.getLogger(TopologyManager.class); - private static final FutureEpoch SUCCESS; - - static - { - SUCCESS = new FutureEpoch(-1L, null); - SUCCESS.setDone(); - } - - static class EpochState - { - final Id self; - private final Topology global; - private final Topology local; - private final QuorumTracker syncTracker; - private final BitSet curShardSyncComplete; - private final Ranges addedRanges, removedRanges; - @GuardedBy("TopologyManager.this") - private EpochReady ready; - @GuardedBy("TopologyManager.this") - private Ranges synced; - @GuardedBy("TopologyManager.this") - Ranges closed = Ranges.EMPTY, retired = Ranges.EMPTY; - - private volatile boolean allRetired; - - public boolean allRetired() - { - if (allRetired) - return true; - - if (!retired.containsAll(global.ranges)) - return false; - - Invariants.require(closed.containsAll(global.ranges)); - allRetired = true; - return true; - } - - EpochState(Id node, Topology global, TopologySorter sorter, Ranges prevRanges) - { - this.self = node; - this.global = Invariants.requireArgument(global, !global.isSubset()); - this.local = global.forNode(node).trim(); - this.curShardSyncComplete = new BitSet(global.shards.length); - if (!global().isEmpty()) - this.syncTracker = new QuorumTracker(new Single(sorter, global())); - else - this.syncTracker = null; - - this.addedRanges = global.ranges.without(prevRanges).mergeTouching(); - this.removedRanges = prevRanges.mergeTouching().without(global.ranges); - this.synced = addedRanges; - } - - EpochState(Id node, Topology global, BitSet curShardSyncComplete, QuorumTracker syncTracker, Ranges addedRanges, Ranges removedRanges, EpochReady ready, Ranges synced, Ranges closed, Ranges retired) - { - this.self = node; - this.global = Invariants.requireArgument(global, !global.isSubset()); - this.local = global.forNode(node).trim(); - this.curShardSyncComplete = curShardSyncComplete; - this.syncTracker = syncTracker; - this.addedRanges = addedRanges; - this.removedRanges = removedRanges; - this.ready = ready; - this.synced = synced; - this.closed = closed; - this.retired = retired; - } - - public boolean hasReachedQuorum() - { - return syncTracker == null || syncTracker.hasReachedQuorum(); - } - - private boolean recordSyncCompleteFromFuture() - { - if (syncTracker == null || syncComplete()) - return false; - synced = global.ranges.mergeTouching(); - return true; - } - - enum NodeSyncStatus { Untracked, Complete, ShardUpdate, NoUpdate } - - NodeSyncStatus recordSyncComplete(Id node) - { - if (syncTracker == null) - return NodeSyncStatus.Untracked; - - if (syncTracker.recordSuccess(node) == Success) - { - synced = global.ranges.mergeTouching(); - return NodeSyncStatus.Complete; - } - else - { - boolean updated = false; - // loop over each current shard, and test if its ranges are complete - for (int i = 0 ; i < global.shards.length ; ++i) - { - if (syncTracker.get(i).hasReachedQuorum() && !curShardSyncComplete.get(i)) - { - synced = synced.union(MERGE_ADJACENT, Ranges.of(global.shards[i].range)); - curShardSyncComplete.set(i); - updated = true; - } - } - return updated ? NodeSyncStatus.ShardUpdate : NodeSyncStatus.NoUpdate; - } - } - - // returns those ranges that weren't already closed, so that they can be propagated to lower epochs - Ranges recordClosed(Ranges ranges) - { - ranges = ranges.without(closed); - if (ranges.isEmpty()) - return ranges; - closed = closed.union(MERGE_ADJACENT, ranges); - Invariants.require(closed.mergeTouching() == closed); - return ranges.without(addedRanges); - } - - // returns those ranges that weren't already retired, so that they can be propagated to lower epochs - Ranges recordRetired(Ranges ranges) - { - ranges = ranges.without(retired); - if (ranges.isEmpty()) - return ranges; - synced = synced.union(MERGE_ADJACENT, ranges); - closed = closed.union(MERGE_ADJACENT, ranges); - retired = retired.union(MERGE_ADJACENT, ranges); - Invariants.require(closed.mergeTouching() == closed); - Invariants.require(retired.mergeTouching() == retired); - return ranges.without(addedRanges); - } - - Topology global() - { - return global; - } - - Topology local() - { - return local; - } - - long epoch() - { - return global().epoch; - } - - boolean syncComplete() - { - return synced.containsAll(global.ranges); - } - - /** - * determine if sync has completed for all shards intersecting with the given keys - */ - boolean syncCompleteFor(Unseekables<?> intersect) - { - return synced.containsAll(intersect); - } - - @Override - public String toString() - { - return "EpochState{" + - "epoch=" + global.epoch() + - '}'; - } - } - - private static class Epochs - { - static class Notifications - { - final Set<Id> syncComplete = new TreeSet<>(); - Ranges closed = Ranges.EMPTY, retired = Ranges.EMPTY; - } - - private final long currentEpoch; - private final long firstNonEmptyEpoch; - // Epochs are sorted in _descending_ order - private final EpochState[] epochs; - // nodes we've received sync complete notifications from, for epochs we do not yet have topologies for. - // Pending sync notifications are indexed by epoch, with the current epoch as index[0], and future epochs - // as index[epoch - currentEpoch]. Sync complete notifications for the current epoch are marked pending - // until the superseding epoch has been applied - private final List<Notifications> pending; - - // list of promises to be completed as newer epochs become active. This is to support processes that - // are waiting on future epochs to begin (ie: txn requests from futures epochs). Index 0 is for - // currentEpoch + 1 - // NOTE: this is NOT copy-on-write. This is mutated in place! - private final List<FutureEpoch> futureEpochs; - - private Epochs(EpochState[] epochs, List<Notifications> pending, List<FutureEpoch> futureEpochs, long prevFirstNonEmptyEpoch) - { - this.currentEpoch = epochs.length > 0 ? epochs[0].epoch() : 0; - if (prevFirstNonEmptyEpoch != -1) - this.firstNonEmptyEpoch = prevFirstNonEmptyEpoch; - else if (epochs.length > 0 && !epochs[0].global().isEmpty()) - this.firstNonEmptyEpoch = currentEpoch; - else - this.firstNonEmptyEpoch = prevFirstNonEmptyEpoch; - - this.pending = pending; - this.futureEpochs = futureEpochs; - if (!futureEpochs.isEmpty()) - Invariants.require(futureEpochs.get(0).epoch == currentEpoch + 1); - - for (int i = 1; i < futureEpochs.size(); i++) - Invariants.requireArgument(futureEpochs.get(i).epoch == futureEpochs.get(i - 1).epoch + 1); - for (int i = 1; i < epochs.length; i++) - Invariants.requireArgument(epochs[i].epoch() == epochs[i - 1].epoch() - 1); - int truncateFrom = -1; - // > 0 because we do not want to be left without epochs in case they're all empty - for (int i = epochs.length - 1; i > 0; i--) - { - EpochState epochState = epochs[i]; - if (epochState.allRetired() && (truncateFrom == -1 || truncateFrom == i + 1)) - truncateFrom = i; - } - - if (truncateFrom == -1) - { - this.epochs = epochs; - } - else - { - this.epochs = Arrays.copyOf(epochs, truncateFrom); - for (int i = truncateFrom; i < epochs.length; i++) - { - EpochState state = epochs[i]; - Invariants.require(epochs[i].syncComplete()); - logger.info("Retired epoch {} with added/removed ranges {}/{}. Topology: {}. Closed: {}", state.epoch(), state.addedRanges, state.removedRanges, state.global.ranges, state.closed); - } - if (logger.isTraceEnabled()) - { - for (int i = 0; i < truncateFrom; i++) - { - EpochState state = epochs[i]; - Invariants.require(state.syncComplete()); - logger.trace("Leaving epoch {} with added/removed ranges {}/{}", state.epoch(), state.addedRanges, state.removedRanges); - } - } - } - } - - private FutureEpoch awaitEpoch(long epoch, TopologyManager manager) - { - if (epoch <= currentEpoch) - return SUCCESS; - - int expectedIndex = (int) (epoch - (1 + currentEpoch)); - while (futureEpochs.size() <= expectedIndex) - { - long addEpoch = currentEpoch + futureEpochs.size() + 1; - FutureEpoch futureEpoch = new FutureEpoch(addEpoch, manager); - futureEpochs.add(futureEpoch); - } - - return futureEpochs.get(expectedIndex); - } - - public long nextEpoch() - { - return current().epoch + 1; - } - - public long minEpoch() - { - if (currentEpoch == 0) - return 0; - return currentEpoch - epochs.length + 1; - } - - public long epoch() - { - return currentEpoch; - } - - public Topology current() - { - return epochs.length > 0 ? epochs[0].global() : Topology.EMPTY; - } - - public Topology currentLocal() - { - return epochs.length > 0 ? epochs[0].local() : Topology.EMPTY; - } - - /** - * Mark sync complete for the given node/epoch, and if this epoch - * is now synced, update the prevSynced flag on superseding epochs - */ - public void syncComplete(Id node, long epoch) - { - Invariants.requireArgument(epoch > 0); - int i = indexOf(epoch); - if (epoch > currentEpoch) - { - pending(epoch).syncComplete.add(node); - if (epochs.length == 0) - return; - i = 0; - } - else if (i < 0) - { - Invariants.require(epoch < minEpoch(), "Could not find epoch %d. Min: %d, current: %d", epoch, minEpoch(), currentEpoch); - return; - } - - recordSyncComplete(epochs, i, node); - } - - /** - * Mark the epoch as "closed" for the provided ranges; this means that no new transactions - * that intersect with this range may be proposed in the epoch (they will be rejected). - */ - public Ranges epochClosed(Ranges ranges, long epoch) - { - Invariants.requireArgument(epoch > 0); - int i = indexOf(epoch); - if (epoch > currentEpoch) - { - pending(epoch).closed.union(MERGE_ADJACENT, ranges); - if (epochs.length == 0) - return ranges; - i = 0; - } - else if (i < 0) - { - Invariants.require(epoch < minEpoch(), "Could not find epoch %d. Min: %d, current: %d", epoch, minEpoch(), currentEpoch); - return Ranges.EMPTY; // notification came for an already truncated epoch - } - - Ranges report = ranges = epochs[i++].recordClosed(ranges); - while (!ranges.isEmpty() && i < epochs.length) - ranges = epochs[i++].recordClosed(ranges); - return report; - } - - /** - * Mark the epoch as "retired" for the provided ranges; this means that all transactions that can be - * proposed for this epoch have now been executed globally. - */ - public Ranges epochRetired(Ranges ranges, long epoch) - { - int i = indexOf(epoch); - if (epoch > currentEpoch) - { - pending(epoch).retired.union(MERGE_ADJACENT, ranges); - if (epochs.length == 0) - return ranges; - i = 0; - } - else if (i < 0) - { - Invariants.require(epoch < minEpoch(), "Could not find epoch %d. Min: %d, current: %d", epoch, minEpoch(), currentEpoch); - return Ranges.EMPTY; // notification came for an already truncated epoch - } - - Ranges report = ranges = epochs[i++].recordRetired(ranges); - while (!ranges.isEmpty() && i < epochs.length) - ranges = epochs[i++].recordRetired(ranges); - return report; - } - - private Notifications pending(long epoch) - { - Invariants.requireArgument(epoch > currentEpoch); - int idx = (int) (epoch - (1 + currentEpoch)); - for (int i = pending.size(); i <= idx; i++) - pending.add(new Notifications()); - - return pending.get(idx); - } - - @Nullable - private EpochState get(long epoch) - { - int index = indexOf(epoch); - if (index < 0) - return null; - - return epochs[index]; - } - - private int indexOf(long epoch) - { - if (epoch > currentEpoch || epoch <= currentEpoch - epochs.length) - return -1; - - return (int) (currentEpoch - epoch); - } - } - - private static void recordSyncComplete(EpochState[] epochs, int i, Id node) - { - EpochState.NodeSyncStatus status = epochs[i].recordSyncComplete(node); - switch (status) - { - case Complete: - i++; - for (; i < epochs.length && epochs[i].recordSyncCompleteFromFuture(); i++) {} - break; - case Untracked: - // don't have access to TopologyManager.this.node to check if the nodes match... this state should not happen unless it is the same node - case NoUpdate: - case ShardUpdate: - break; - default: - throw new UnsupportedOperationException("Unknown status " + status); - } - } - - static class WaitingForEpoch extends AsyncResults.SettableResult<Void> - { - final long deadlineMicros; - WaitingForEpoch(long deadlineMicros) - { - this.deadlineMicros = deadlineMicros; - } - } - - private static class FutureEpoch implements Timeouts.Timeout - { - private final long epoch; - private final TopologyManager manager; - private boolean isDone; - private final ArrayDeque<WaitingForEpoch> waiting = new ArrayDeque<>(); - private RegisteredTimeout timeout; - - public FutureEpoch(long epoch, TopologyManager manager) - { - this.epoch = epoch; - this.manager = manager; - } - - // TODO (expected): pass through request deadline - AsyncResult<Void> waiting() - { - WaitingForEpoch result, last; - synchronized (this) - { - if (isDone) - return AsyncResults.success(null); - - long timeoutMicros = manager.agent.expireEpochWait(MICROSECONDS); - long deadlineMicros = manager.time.elapsed(MICROSECONDS) + timeoutMicros; - result = last = waiting.peekLast(); - if (last == null || last.deadlineMicros < deadlineMicros) - waiting.add(result = new WaitingForEpoch(deadlineMicros + (timeoutMicros / 10))); - } - if (last == null) - timeout = manager.timeouts.registerAt(this, result.deadlineMicros, MICROSECONDS); - return result; - } - - private void setDone() - { - synchronized (this) - { - isDone = true; - WaitingForEpoch next; - while (null != (next = waiting.poll())) - next.trySuccess(null); - } - RegisteredTimeout cancel = timeout; - if (cancel != null) - cancel.cancel(); - } - - @Override - public void timeout() - { - long nextDeadlineMicros = 0; - synchronized (this) - { - if (isDone) - return; - - long nowMicros = manager.time.elapsed(MICROSECONDS); - WaitingForEpoch next; - while (null != (next = waiting.peek()) && (nextDeadlineMicros = next.deadlineMicros) <= nowMicros) - waiting.poll().tryFailure(EpochTimeout.timeout(epoch, manager.agent)); - } - if (nextDeadlineMicros > 0) - timeout = manager.timeouts.registerAt(this, nextDeadlineMicros, MICROSECONDS); - } - - @Override - public int stripe() - { - return (int) epoch; - } - } - - // this class could be just the list, but left it here in case we wish to expose "futureEpochs" and "pending" as well - public static class EpochsSnapshot implements Iterable<EpochsSnapshot.Epoch> - { - public final ImmutableList<Epoch> epochs; - - public EpochsSnapshot(ImmutableList<Epoch> epochs) - { - this.epochs = epochs; - } - - @Override - public Iterator<Epoch> iterator() - { - return epochs.iterator(); - } - - public enum ResultStatus - { - PENDING("pending"), - SUCCESS("success"), - FAILURE("failure"); - - public final String value; - - ResultStatus(String value) - { - this.value = value; - } - - private static ResultStatus of(AsyncResult<?> result) - { - if (result == null || !result.isDone()) - return PENDING; - - return result.isSuccess() ? SUCCESS : FAILURE; - } - } - - public static class EpochReady - { - public final ResultStatus metadata, coordinate, data, reads; - - public EpochReady(ResultStatus metadata, ResultStatus coordinate, ResultStatus data, ResultStatus reads) - { - this.metadata = metadata; - this.coordinate = coordinate; - this.data = data; - this.reads = reads; - } - - private static EpochReady of(ConfigurationService.EpochReady ready) - { - return new EpochReady(ResultStatus.of(ready.metadata), - ResultStatus.of(ready.coordinate), - ResultStatus.of(ready.data), - ResultStatus.of(ready.reads)); - } - } - - public static class Epoch - { - public final long epoch; - public final EpochReady ready; - public final Ranges global, addedRanges, removedRanges, synced, closed, retired; - - public Epoch(long epoch, EpochReady ready, Ranges global, Ranges addedRanges, Ranges removedRanges, Ranges synced, Ranges closed, Ranges retired) - { - this.epoch = epoch; - this.ready = ready; - this.global = global; - this.addedRanges = addedRanges; - this.removedRanges = removedRanges; - this.synced = synced; - this.closed = closed; - this.retired = retired; - } - } - } - - private final TopologySorter.Supplier sorter; - private final TopologiesCollectors collector; - private final BestFastPath bestFastPath; - private final SupportsPrivilegedFastPath supportsPrivilegedFastPath; - private final Agent agent; - private final Id self; - private final TimeService time; - private final Timeouts timeouts; - private volatile Epochs epochs; - - public TopologyManager(TopologySorter.Supplier sorter, Agent agent, Id self, TimeService time, Timeouts timeouts) - { - this.sorter = sorter; - this.collector = new TopologiesCollectors(sorter, SelectNodeOwnership.SHARE); - this.bestFastPath = new BestFastPath(self); - this.supportsPrivilegedFastPath = new SupportsPrivilegedFastPath(self); - this.agent = agent; - this.self = self; - this.time = time; - this.timeouts = timeouts; - this.epochs = new Epochs(new EpochState[0], new ArrayList<>(), new ArrayList<>(), -1);; - } - - public EpochsSnapshot epochsSnapshot() - { - // Write to this volatile variable is done via synchronized, so this is single-writer multi-consumer; safe to read without locks - Epochs epochs = this.epochs; - ImmutableList.Builder<EpochsSnapshot.Epoch> builder = ImmutableList.builderWithExpectedSize(epochs.epochs.length); - for (int i = 0; i < epochs.epochs.length; i++) - { - // This class's state is mutable with regaurd to: ready, synced, closed, retired - EpochState epoch = epochs.epochs[i]; - // Even though this field is populated with the same lock epochs is, it is done before publishing to epochs! - // For this reason the field maybe null, in which case we need to use the lock to wait for the field. - EpochReady ready; - Ranges global, addedRanges, removedRanges, synced, closed, retired; - global = epoch.global.ranges.mergeTouching(); - addedRanges = epoch.addedRanges; - removedRanges = epoch.removedRanges; - // ready, synced, closed, and retired all rely on TM's object lock - synchronized (this) - { - ready = epoch.ready; - synced = epoch.synced; - closed = epoch.closed; - retired = epoch.retired; - } - builder.add(new EpochsSnapshot.Epoch(epoch.epoch(), EpochsSnapshot.EpochReady.of(ready), global, addedRanges, removedRanges, synced, closed, retired)); - } - return new EpochsSnapshot(builder.build()); - } - - public List<Topology> topologySnapshot() - { - // Write to this volatile variable is done via synchronized, so this is single-writer multi-consumer; safe to read without locks - Epochs epochs = this.epochs; - ImmutableList.Builder<Topology> builder = ImmutableList.builderWithExpectedSize(epochs.epochs.length); - for (int i = 0; i < epochs.epochs.length; i++) - { - // This class's state is mutable with regaurd to: ready, synced, closed, retired - EpochState epoch = epochs.epochs[i]; - builder.add(epoch.global); - } - return builder.build(); - } - - public EpochReady onTopologyUpdate(Topology topology, Supplier<EpochReady> bootstrap, LongConsumer truncate) - { - FutureEpoch notifyDone; - EpochReady ready; - Epochs prev; - Epochs next; - synchronized (this) - { - prev = epochs; - Invariants.requireArgument(topology.epoch == prev.nextEpoch() || epochs.epochs.length == 0, - "Expected topology update %d to be %d", topology.epoch, prev.nextEpoch()); - - Ranges prevAll = prev.epochs.length == 0 ? Ranges.EMPTY : prev.epochs[0].global.ranges; - List<Epochs.Notifications> pending = prev.pending.size() <= 1 ? new ArrayList<>() : new ArrayList<>(prev.pending.subList(1, prev.pending.size())); - - EpochState[] nextEpochs = new EpochState[prev.epochs.length + 1]; - System.arraycopy(prev.epochs, 0, nextEpochs, 1, prev.epochs.length); - nextEpochs[0] = new EpochState(self, topology, sorter.get(topology), prevAll); - if (!prev.pending.isEmpty()) - { - // TODO (expected): we should invoke the same code as we do when receiving normally, to prevent divergence - prev.pending.get(0).syncComplete.forEach(id -> recordSyncComplete(nextEpochs, 0, id)); - for (Epochs.Notifications notifications : prev.pending) - { - nextEpochs[0].recordClosed(notifications.closed); - nextEpochs[0].recordRetired(notifications.retired); - } - } - if (prev.epochs.length > 0 && !prev.epochs[0].global.hardRemoved.containsAll(topology.hardRemoved)) - { - IdentityHashMap<Shard, Shard> cache = new IdentityHashMap<>(); - for (int i = nextEpochs.length - 1 ; i >= 0 ; --i) - { - EpochState cur = nextEpochs[i]; - Topology newGlobal = nextEpochs[i].global.withHardRemoved(topology.hardRemoved, cache); - if (newGlobal != cur.global) - nextEpochs[i] = new EpochState(self, newGlobal, cur.curShardSyncComplete, cur.syncTracker, cur.addedRanges, cur.removedRanges, cur.ready, cur.synced, cur.closed, cur.retired); - } - } - - List<FutureEpoch> futureEpochs = new ArrayList<>(prev.futureEpochs); - notifyDone = !futureEpochs.isEmpty() ? futureEpochs.remove(0) : null; - next = new Epochs(nextEpochs, pending, futureEpochs, prev.firstNonEmptyEpoch); - epochs = next; - ready = nextEpochs[0].ready = bootstrap.get(); - } - - if (next.minEpoch() != prev.minEpoch()) - truncate.accept(epochs.minEpoch()); - - if (notifyDone != null) - notifyDone.setDone(); - - return ready; - } - - public AsyncChain<Void> awaitEpoch(long epoch, @Nullable AsyncExecutor ifAsync) - { - FutureEpoch futureEpoch; - synchronized (this) - { - futureEpoch = epochs.awaitEpoch(epoch, this); - } - return futureEpoch.waiting().chainImmediatelyElse(ifAsync); - } - - public synchronized boolean hasReachedQuorum(long epoch) - { - EpochState state = epochs.get(epoch); - return state != null && state.hasReachedQuorum(); - } - - @VisibleForTesting - public EpochReady epochReady(long epoch) - { - Epochs epochs = this.epochs; - - if (epoch < epochs.minEpoch()) - return EpochReady.done(epoch); - - if (epoch > epochs.currentEpoch) - throw new IllegalArgumentException(String.format("Epoch %d is larger than current epoch %d", epoch, epochs.currentEpoch)); - - return epochs.get(epoch).ready; - } - - public synchronized void onEpochSyncComplete(Id node, long epoch) - { - epochs.syncComplete(node, epoch); - } - - @VisibleForTesting - public Ranges syncComplete(long epoch) - { - return epochs.get(epoch).synced; - } - - public synchronized void truncateTopologiesUntil(long epoch) - { - Epochs current = epochs; - Invariants.requireArgument(current.epoch() >= epoch, "Unable to truncate; epoch %d is > current epoch %d", epoch , current.epoch()); - - if (current.minEpoch() >= epoch) - return; - - int newLen = current.epochs.length - (int) (epoch - current.minEpoch()); - Invariants.require(current.epochs[newLen - 1].syncComplete(), "Epoch %d's sync is not complete", current.epochs[newLen - 1].epoch()); - - EpochState[] nextEpochs = new EpochState[newLen]; - System.arraycopy(current.epochs, 0, nextEpochs, 0, newLen); - epochs = new Epochs(nextEpochs, current.pending, current.futureEpochs, current.firstNonEmptyEpoch); - } - - public synchronized void onEpochClosed(Ranges ranges, long epoch) - { - epochs.epochClosed(ranges, epoch); - } - - /** - * If ranges were added in epoch X, and are _not_ present in the current epoch, they - * are purged and durability scheduling for them should be cancelled. - */ - public synchronized boolean isFullyRetired(Ranges ranges) - { - Epochs epochs = this.epochs; - EpochState current = epochs.get(epochs.currentEpoch); - if (!current.addedRanges.containsAll(ranges)) - return false; - - long minEpoch = epochs.minEpoch(); - for (long i = minEpoch; i < epochs.currentEpoch; i++) - { - EpochState retiredIn = epochs.get(i); - if (retiredIn.allRetired() && retiredIn.addedRanges.containsAll(ranges)) - return true; - } - - return false; - } - - public synchronized void onEpochRetired(Ranges ranges, long epoch) - { - epochs.epochRetired(ranges, epoch); - } - - public TopologySorter.Supplier sorter() - { - return sorter; - } - - public Topology current() - { - return epochs.current(); - } - - public Topology currentLocal() - { - return epochs.currentLocal(); - } - - public boolean isEmpty() - { - return epochs.epochs.length == 0; - } - - public long epoch() - { - return current().epoch; - } - - // TODO (desired): add tests for epoch GC and tracking - @VisibleForImplementation - public long firstNonEmpty() - { - return epochs.firstNonEmptyEpoch; - } +/** + * Manages topology state changes and update bookkeeping + * + * Each time the topology changes we need to: + * * confirm previous owners of ranges we replicate are aware of the new config + * * learn of any outstanding operations for ranges we replicate + * * clean up obsolete data + * + * Assumes a topology service that won't report epoch n without having n-1 etc also available + * + * TODO (desired, efficiency/clarity): make TopologyManager a Topologies and copy-on-write update to it, + * so we can always just take a reference for transactions instead of copying every time (and index into it by the txnId.epoch) + */ +public class TopologyManager +{ + private static final Logger logger = LoggerFactory.getLogger(TopologyManager.class); + private static final PendingEpoch SUCCESS; - public long minEpoch() + static { - Epochs epochs = this.epochs; - return epochs.minEpoch(); + SUCCESS = new PendingEpoch(-1L, null); + SUCCESS.setActive(); } - @VisibleForTesting - EpochState getEpochStateUnsafe(long epoch) - { - return epochs.get(epoch); - } + final TopologySorter.Supplier sorter; + final Simple collector; + final BestFastPath bestFastPath; + final SupportsPrivilegedFastPath supportsPrivilegedFastPath; + final Node node; + final TopologyService topologyService; + final TimeService time; + final Timeouts timeouts; + private volatile ActiveEpochs active; + private final PendingEpochs pending; + private final CopyOnWriteArrayList<TopologyListener> listeners = new CopyOnWriteArrayList<>(); - /** - * Fetch topologies between {@param minEpoch} (inclusive), and {@param maxEpoch} (inclusive). - */ - public TopologyRange between(long minEpoch, long maxEpoch) + public TopologyManager(TopologySorter.Supplier sorter, Node node, TopologyService topologyService, TimeService time, Timeouts timeouts) { - Epochs epochs = this.epochs; - // No epochs known to Accord - if (epochs.firstNonEmptyEpoch == -1 || minEpoch > epochs.currentEpoch) - return new TopologyRange(epochs.minEpoch(), epochs.currentEpoch, epochs.firstNonEmptyEpoch, Collections.emptyList()); - - minEpoch = Math.max(minEpoch, epochs.minEpoch()); - int diff = Math.toIntExact(epochs.currentEpoch - minEpoch + 1); - List<Topology> topologies = new ArrayList<>(diff); - for (int i = 0; epochs.minEpoch() + i <= maxEpoch && i < diff; i++) - topologies.add(epochs.get(minEpoch + i).global); - - return new TopologyRange(epochs.minEpoch(), epochs.currentEpoch, epochs.firstNonEmptyEpoch, topologies); + this.sorter = sorter; + this.collector = new Simple(sorter, SelectNodeOwnership.SHARE); + this.bestFastPath = new BestFastPath(node.id()); + this.supportsPrivilegedFastPath = new SupportsPrivilegedFastPath(node.id()); + this.node = node; + this.time = time; + this.timeouts = timeouts; + this.topologyService = topologyService; + this.active = new ActiveEpochs(this, new ActiveEpoch[0], -1); + this.pending = new PendingEpochs(this); } - public static class TopologyRange + public void onReadyToCoordinate(Id node, long epoch) { - public final long min; - public final long current; - public final long firstNonEmpty; - public final List<Topology> topologies; - - public TopologyRange(long min, long current, long firstNonEmpty, List<Topology> topologies) + synchronized (this) { - this.min = min; - this.current = current; - this.topologies = topologies; - this.firstNonEmpty = firstNonEmpty; + if (epoch >= active.minEpoch()) + active.onReadyToCoordinate(node, epoch); + if (epoch > active.currentEpoch) + pending.remoteReadyToCoordinate(node, epoch); } + for (TopologyListener listener : listeners) + listener.onRemoteReadyToCoordinate(node, epoch); + } - public void forEach(Consumer<Topology> forEach, long minEpoch, int count) + public void onEpochClosed(Ranges ranges, long epoch) + { + Topology topology; + synchronized (this) { - if (minEpoch == 0) // Bootstrap - minEpoch = this.min; - - long emptyUpTo = firstNonEmpty == -1 ? current : firstNonEmpty - 1; - // Report empty epochs - for (long epoch = minEpoch; epoch <= emptyUpTo && count > 0; epoch++, count--) - forEach.accept(new Topology(epoch)); - - // Report known non-empty epochs - for (int i = 0; i < topologies.size() && count > 0; i++, count--) - { - Topology topology = topologies.get(i); - forEach.accept(topology); - } + topology = active.maybeGlobalForEpoch(epoch); + if (epoch > active.currentEpoch) + ranges = pending.closed(ranges, epoch); + ranges = active.closed(ranges, epoch); } - - @Override - public boolean equals(Object o) + if (!ranges.isEmpty()) { - if (o == null || getClass() != o.getClass()) return false; - TopologyRange that = (TopologyRange) o; - return min == that.min && current == that.current && firstNonEmpty == that.firstNonEmpty && Objects.equals(topologies, that.topologies); + for (TopologyListener listener : listeners) + listener.onEpochClosed(ranges, epoch, topology); } + } - @Override - public int hashCode() + public void onEpochRetired(Ranges ranges, long epoch) + { + Topology topology; + synchronized (this) { - return Objects.hash(min, current, firstNonEmpty, topologies); + topology = active.maybeGlobalForEpoch(epoch); + if (epoch > active.currentEpoch) + ranges = pending.retired(ranges, epoch); + ranges = active.retired(ranges, epoch); } - - @Override - public String toString() + if (!ranges.isEmpty()) { - return String.format("TopologyRange{min=%d, current=%d, firstNonEmpty=%d, topologies=[%s]}", - min, - current, - firstNonEmpty, - topologies.stream().map(t -> Long.toString(t.epoch())).collect(joining(","))); + for (TopologyListener listener : listeners) + listener.onEpochRetired(ranges, epoch, topology); } } - public Topologies preciseEpochs(long epoch) + public synchronized void truncateTopologiesUntil(long epoch) { - return new Single(sorter, epochs.get(epoch).global); - } + ActiveEpochs current = active; + Invariants.requireArgument(current.epoch() >= epoch, "Unable to truncate; epoch %d is > current epoch %d", epoch , current.epoch()); - // TODO (testing): test all of these methods when asking for epochs that have been cleaned up (and other code paths) + if (current.minEpoch() >= epoch) + return; - /** - * Returns topologies containing epochs where specified ranges haven't completed synchronization between min/max epochs. - * Can be used during coordination operations to ensure they contact all relevant nodes across topology changes, - * particularly when some ranges are still syncing after cluster membership changes. - */ - public Topologies withUnsyncedEpochs(Unseekables<?> select, Timestamp min, Timestamp max) - { - return withUnsyncedEpochs(select, min.epoch(), max.epoch()); - } + int newLen = current.epochs.length - (int) (epoch - current.minEpoch()); + Invariants.require(current.epochs[newLen - 1].isQuorumReady(), "Epoch %d is not ready to coordinate", current.epochs[newLen - 1].epoch()); - public Topologies select(Unseekables<?> select, Timestamp min, Timestamp max, SelectNodeOwnership selectNodeOwnership, Include include) - { - return select(select, min.epoch(), max.epoch(), selectNodeOwnership, include); + ActiveEpoch[] nextEpochs = new ActiveEpoch[newLen]; + System.arraycopy(current.epochs, 0, nextEpochs, 0, newLen); + active = new ActiveEpochs(this, nextEpochs, current.firstNonEmptyEpoch); } - public Topologies select(Unseekables<?> select, long minEpoch, long maxEpoch, SelectNodeOwnership selectNodeOwnership, Include include) + public TopologySorter.Supplier sorter() { - switch (include) - { - default: throw new AssertionError("Unhandled Include: " +include); - case Unsynced: return withUnsyncedEpochs(select, minEpoch, maxEpoch); - case Owned: return preciseEpochs(select, minEpoch, maxEpoch, selectNodeOwnership); - } + return sorter; } - public Topologies reselect(@Nullable Topologies prev, @Nullable Include prevIncluded, Unseekables<?> select, Timestamp min, Timestamp max, SelectNodeOwnership selectNodeOwnership, Include include) + public Topology current() { - return reselect(prev, prevIncluded, select, min.epoch(), max.epoch(), selectNodeOwnership, include); + return active.current(); } - // prevIncluded may be null even when prev is not null, in cases where we do not know what prev was produced with - public Topologies reselect(@Nullable Topologies prev, @Nullable Include prevIncluded, Unseekables<?> select, long minEpoch, long maxEpoch, SelectNodeOwnership selectNodeOwnership, Include include) + public Topology currentLocal() { - if (include == Owned) - { - if (prev != null && prev.currentEpoch() >= maxEpoch && prev.oldestEpoch() <= minEpoch) - return prev.forEpochs(minEpoch, maxEpoch); - else - return preciseEpochs(select, minEpoch, maxEpoch, selectNodeOwnership); - } - else - { - if (prevIncluded == Unsynced && prev != null && prev.currentEpoch() == maxEpoch && prev.oldestEpoch() == minEpoch) - return prev; - else // TODO (desired): can we avoid recalculating when only minEpoch advances? - return withUnsyncedEpochs(select, minEpoch, maxEpoch); - } - + return active.currentLocal(); } - public <U extends Participants<?>> @Nullable U unsyncedOnly(U select, long beforeEpoch) + public boolean isEmpty() { - return extra(select, 0, beforeEpoch, cur -> cur.synced, (UnsyncedSelector<U>)UnsyncedSelector.INSTANCE); + return active.isEmpty() && pending.isEmpty(); } - public Topologies withUnsyncedEpochs(Unseekables<?> select, long minEpoch, long maxEpoch) + public long epoch() { - Invariants.requireArgument(minEpoch <= maxEpoch, "min epoch %d > max %d", minEpoch, maxEpoch); - return withSufficientEpochsAtLeast(select, minEpoch, maxEpoch, epochState -> epochState.synced); + return current().epoch; } - public FastPath selectFastPath(Routables<?> select, long epoch) + // TODO (desired): add tests for epoch GC and tracking + @VisibleForImplementation + public long firstNonEmpty() { - return atLeast(select, epoch, epoch, epochState -> epochState.synced, bestFastPath); + return active.firstNonEmptyEpoch; } - public boolean supportsPrivilegedFastPath(Routables<?> select, long epoch) + public long minEpoch() { - return atLeast(select, epoch, epoch, epochState -> epochState.synced, supportsPrivilegedFastPath); + ActiveEpochs epochs = this.active; + return epochs.minEpoch(); } - public Topologies withOpenEpochs(Routables<?> select, @Nullable EpochSupplier min, @Nullable EpochSupplier max) - { - return withSufficientEpochsAtMost(select, - min == null ? Long.MIN_VALUE : min.epoch(), - max == null ? Long.MAX_VALUE : max.epoch(), - prev -> prev.closed); - } + // TODO (testing): test all of these methods when asking for epochs that have been cleaned up (and other code paths) - public Topologies withUncompletedEpochs(Unseekables<?> select, @Nullable EpochSupplier min, EpochSupplier max) + public ActiveEpochs active() { - return withSufficientEpochsAtLeast(select, - min == null ? Long.MIN_VALUE : min.epoch(), - max == null ? Long.MAX_VALUE : max.epoch(), - prev -> prev.retired); + return active; } - private Topologies withSufficientEpochsAtLeast(Unseekables<?> select, long minEpoch, long maxEpoch, Function<EpochState, Ranges> isSufficientFor) + public void addListener(TopologyListener listener) { - return atLeast(select, minEpoch, maxEpoch, isSufficientFor, collector); + listeners.add(listener); } - private <C, K extends Routables<?>, T> T atLeast(K select, long minEpoch, long maxEpoch, Function<EpochState, Ranges> isSufficientFor, - Collectors<C, K, T> collectors) throws IllegalArgumentException + public void removeListener(TopologyListener listener) { - Invariants.requireArgument(minEpoch <= maxEpoch); - Epochs snapshot = epochs; - if (maxEpoch < snapshot.minEpoch()) - throw new TopologyRetiredException(maxEpoch, snapshot.minEpoch()); - - if (maxEpoch == Long.MAX_VALUE) maxEpoch = snapshot.currentEpoch; - else Invariants.require(snapshot.currentEpoch >= maxEpoch, "current epoch %d < max %d", snapshot.currentEpoch, maxEpoch); - - EpochState maxEpochState = nonNull(snapshot.get(maxEpoch)); - if (minEpoch == maxEpoch && isSufficientFor.apply(maxEpochState).containsAll(select)) - return collectors.one(maxEpochState, select, false); - - int i = (int)(snapshot.currentEpoch - maxEpoch); - int maxi = (int)(Math.min(1 + snapshot.currentEpoch - minEpoch, snapshot.epochs.length)); - C collector = collectors.allocate(maxi - i); - - // Previous logic would exclude synced ranges, but this was removed as that makes min epoch selection harder. - // An issue was found where a range was removed from a replica and min selection picked the epoch before that, - // which caused a node to get included in the txn that actually lost the range - // See CASSANDRA-18804 - while (i < maxi) - { - EpochState epochState = snapshot.epochs[i++]; - collector = collectors.update(collector, epochState, select, false); - select = (K)select.without(epochState.addedRanges); - } - - if (select.isEmpty()) - return collectors.multi(collector); - - if (i == snapshot.epochs.length) - { - // now we GC epochs, we cannot rely on addedRanges to remove all ranges, so we also remove the ranges found in the earliest epoch we have - select = (K)select.without(snapshot.epochs[snapshot.epochs.length - 1].global.ranges); - if (!select.isEmpty()) - throw Invariants.illegalArgument("Ranges %s could not be found", select); - return collectors.multi(collector); - } - - // remaining is updated based off isSufficientFor, but select is not - Routables<?> remaining = select; - - // include any additional epochs to reach sufficiency - EpochState prev = snapshot.epochs[maxi - 1]; - do - { - remaining = remaining.without(isSufficientFor.apply(prev)); - Routables<?> prevSelect = select; - select = (K)select.without(prev.addedRanges); - if (prevSelect != select) // perf optimization; if select wasn't changed (it does not intersect addedRanges), then remaining won't - remaining = remaining.without(prev.addedRanges); - if (remaining.isEmpty()) - return collectors.multi(collector); - - EpochState next = snapshot.epochs[i++]; - collector = collectors.update(collector, next, select, false); - prev = next; - } while (i < snapshot.epochs.length); - // need to remove sufficient / added else remaining may not be empty when the final matches are the last epoch - remaining = remaining.without(isSufficientFor.apply(prev)); - remaining = remaining.without(prev.addedRanges); - // TODO (desired): propagate addedRanges to the earliest epoch we retain for consistency - // now we GC epochs, we cannot rely on addedRanges to remove all ranges, so we also remove the ranges found in the earliest epoch we have - remaining = remaining.without(snapshot.epochs[snapshot.epochs.length - 1].global.ranges); - if (!remaining.isEmpty()) - Invariants.illegalArgument("Ranges %s could not be found", remaining); - - return collectors.multi(collector); + listeners.remove(listener); } - private Topologies withSufficientEpochsAtMost(Routables<?> select, long minEpoch, long maxEpoch, Function<EpochState, Ranges> isSufficientFor) + protected Executor executor() { - return atMost(select, minEpoch, maxEpoch, isSufficientFor, collector); + return Runnable::run; } - private <C, K extends Routables<?>, T> T atMost(K select, long minEpoch, long maxEpoch, Function<EpochState, Ranges> isSufficientFor, - Collectors<C, K, T> collectors) + public void reportTopology(Topology topology) { - Invariants.requireArgument(minEpoch <= maxEpoch); - Epochs snapshot = epochs; - - minEpoch = Math.max(snapshot.minEpoch(), minEpoch); - maxEpoch = validateMax(maxEpoch, snapshot); - - EpochState cur = nonNull(snapshot.get(maxEpoch)); - if (minEpoch == maxEpoch) - { - // TODO (required): why are we testing isSufficientFor here? minEpoch == maxEpoch, we should always return. - if (isSufficientFor.apply(cur).containsAll(select)) - return collectors.one(cur, select, true); - } - - int i = (int)(snapshot.currentEpoch - maxEpoch); - int maxi = (int)(Math.min(1 + snapshot.currentEpoch - minEpoch, snapshot.epochs.length)); - C collector = collectors.allocate(maxi - i); - - while (!select.isEmpty()) + PendingEpoch e; + synchronized (this) { - collector = collectors.update(collector, cur, select, true); - select = (K)select.without(cur.addedRanges) - .without(isSufficientFor.apply(cur)); - - if (++i == maxi) - break; + long epoch = topology.epoch; + if (epoch <= active.currentEpoch) + { + logger.info("Ignoring topology for epoch {} which is behind our latest epoch {}", epoch, active.currentEpoch); + return; + } - cur = snapshot.epochs[i]; + e = pending.getOrCreate(epoch); + e.setTopology(topology); } - return collectors.multi(collector); + logger.debug("Epoch {} received", topology.epoch()); + for (TopologyListener listener : listeners) + listener.onReceived(topology); + + updateActive(); } - private <C, K extends Routables<?>, T> T extra(K select, long minEpoch, long maxEpoch, Function<EpochState, Ranges> remove, - Collectors<C, K, T> collectors) + private final AtomicBoolean updatingActive = new AtomicBoolean(); + private void updateActive() { - Invariants.requireArgument(minEpoch <= maxEpoch); - Epochs snapshot = epochs; - - minEpoch = Math.max(snapshot.minEpoch(), minEpoch); - maxEpoch = Math.min(maxEpoch, snapshot.currentEpoch); - if (maxEpoch <= minEpoch) - return collectors.none(); - - EpochState cur = nonNull(snapshot.get(maxEpoch)); - select = (K) select.without(remove.apply(cur)); - if (select.isEmpty()) - return collectors.none(); - - int i = (int)(snapshot.currentEpoch - maxEpoch); - int maxi = (int)(Math.min(1 + snapshot.currentEpoch - minEpoch, snapshot.epochs.length)); - C collector = collectors.allocate(maxi - i); + if (!updatingActive.compareAndSet(false, true)) + return; - while (!select.isEmpty()) + try { - collector = collectors.update(collector, cur, select, true); - select = (K)select.without(cur.addedRanges); - - if (++i == maxi) - break; - - cur = snapshot.epochs[i]; - select = (K)select.without(remove.apply(cur)); - } + while (true) + { + Topology topology; + PendingEpoch pending; + synchronized (this) + { + if (this.pending.isEmpty() || (!this.active.isEmpty() && this.pending.atIndex(0).epoch > 1 + current().epoch())) + return; - return collectors.multi(collector); - } + pending = this.pending.atIndex(0); + topology = pending.topology(); + if (topology == null) + return; + } - private static long validateMax(long maxEpoch, Epochs snapshot) - { - if (maxEpoch == Long.MAX_VALUE) - return snapshot.currentEpoch; + Supplier<EpochReady> bootstrap = node.commandStores().updateTopology(node, topology); + AsyncResult.Settable<EpochReady> whenSetup = new AsyncResults.SettableWithDescription<>("Publishing Active Epoch"); + EpochReady epochReady = new EpochReady(topology.epoch, + NestedAsyncResult.flatMap(whenSetup, ignore -> AsyncResults.success(null)), + NestedAsyncResult.flatMap(whenSetup, EpochReady::coordinate), + NestedAsyncResult.flatMap(whenSetup, EpochReady::data), + NestedAsyncResult.flatMap(whenSetup, EpochReady::reads)); - Invariants.require(snapshot.currentEpoch >= maxEpoch, "current epoch %d < provided max %d", snapshot.currentEpoch, maxEpoch); - if (maxEpoch < snapshot.minEpoch()) - throw new TopologyRetiredException(maxEpoch, snapshot.minEpoch()); - return maxEpoch; - } + if (!this.active.isEmpty()) + { + ActiveEpoch prev = this.active.epochs[0]; + Invariants.require(prev.epoch() == topology.epoch - 1); + epochReady = orderReporting(prev.epochReady(), epochReady); + } - public Topologies preciseEpochs(Unseekables<?> select, long minEpoch, long maxEpoch, SelectNodeOwnership selectNodeOwnership) - { - return preciseEpochs(select, minEpoch, maxEpoch, selectNodeOwnership, Topology::select); - } + ActiveEpoch active = new ActiveEpoch(node.id(), topology, epochReady, sorter.get(topology), this.active.current().ranges); - public Topologies preciseEpochsIfExists(Unseekables<?> select, long minEpoch, long maxEpoch, SelectNodeOwnership selectNodeOwnership) - { - return preciseEpochs(select, minEpoch, maxEpoch, selectNodeOwnership, Topology::selectIfExists); - } + synchronized (this) + { + active.recordClosed(pending.closed); + active.recordRetired(pending.retired); + pending.ready.forEach(active::onReadyToCoordinate); - public interface SelectFunction - { - Topology apply(Topology topology, Unseekables<?> select, SelectNodeOwnership selectNodeOwnership); - } + ActiveEpochs prev = this.active; + ActiveEpoch[] next = new ActiveEpoch[prev.epochs.length + 1]; + System.arraycopy(prev.epochs, 0, next, 1, prev.epochs.length); + next[0] = active; - public Topologies preciseEpochs(Unseekables<?> select, long minEpoch, long maxEpoch, SelectNodeOwnership selectNodeOwnership, SelectFunction selectFunction) - { - Epochs snapshot = epochs; + if (!prev.isEmpty() && !prev.epochs[0].global.hardRemoved.containsAll(topology.hardRemoved)) + { + IdentityHashMap<Shard, Shard> cache = new IdentityHashMap<>(); + for (int i = next.length - 1 ; i >= 0 ; --i) + { + ActiveEpoch e = next[i]; + Topology newGlobal = next[i].global.withHardRemoved(topology.hardRemoved, cache); + if (newGlobal != e.global) + { + next[i] = new ActiveEpoch(node.id(), newGlobal, e.shardQuorumReady, e.receivedNodeReady, e.quorumReadyTracker, + e.addedRanges, e.removedRanges, e.epochReady(), e.quorumReady(), e.closed(), e.retired()); + } + } + } - // TODO (expected): we should disambiguate minEpoch we can bump (i.e. historical epochs) and those we cannot (i.e. txnId.epoch()) - minEpoch = Math.max(snapshot.minEpoch(), minEpoch); - maxEpoch = validateMax(maxEpoch, snapshot); - EpochState maxState = snapshot.get(maxEpoch); + this.active = new ActiveEpochs(this, next, prev.firstNonEmptyEpoch); + this.pending.removeFirst(topology.epoch); + } - if (maxState == null) - throw new TopologyRetiredException(maxEpoch, snapshot.minEpoch()); + EpochReady innerReady = bootstrap(bootstrap); + whenSetup.setSuccess(innerReady); - if (minEpoch == maxEpoch) - return new Single(sorter, selectFunction.apply(snapshot.get(minEpoch).global, select, selectNodeOwnership)); + pending.setActive(); + listeners.forEach(listener -> listener.onActive(active)); - int count = (int)(1 + maxEpoch - minEpoch); - Topologies.Builder topologies = new Topologies.Builder(count); - for (int i = count - 1 ; i >= 0 ; --i) - { - EpochState epochState = snapshot.get(minEpoch + i); - topologies.add(selectFunction.apply(epochState.global, select, selectNodeOwnership)); - select = select.without(epochState.addedRanges); + long epoch = topology.epoch; + Node.Id self = node.id(); + innerReady.coordinate.invokeIfSuccess(() -> { + listeners.forEach(listener -> listener.onReadyToCoordinate(topology)); + onReadyToCoordinate(self, epoch); + }); + } } - Invariants.require(!topologies.isEmpty(), "Unable to find an epoch that contained %s", select); - - return topologies.build(sorter); - } - - public Topologies forEpoch(Unseekables<?> select, long epoch, SelectNodeOwnership selectNodeOwnership) - { - Epochs snapshot = epochs; - EpochState state = snapshot.get(epoch); - if (state == null) - throw new TopologyRetiredException(epoch, snapshot.minEpoch()); - return new Single(sorter, state.global.select(select, selectNodeOwnership)); - } - - public boolean hasReplicationMaybeChanged(Unseekables<?> select, long sinceEpoch) - { - Epochs snapshot = epochs; - if (snapshot.minEpoch() > sinceEpoch) - return true; - - return atLeast(select, sinceEpoch, Long.MAX_VALUE, ignore -> Ranges.EMPTY, HasChangedReplication.INSTANCE); - } - - public Topologies forEpochAtLeast(Unseekables<?> select, long epoch, SelectNodeOwnership selectNodeOwnership) - { - Epochs snapshot = this.epochs; - EpochState state = snapshot.get(epoch); - if (state == null) + finally { - Invariants.require(snapshot.currentEpoch >= epoch, "current epoch %d < provided max %d", snapshot.currentEpoch, epoch); - state = snapshot.get(snapshot.minEpoch()); + updatingActive.set(false); } - return new Single(sorter, state.global.select(select, selectNodeOwnership)); - } - - public Shard forEpochIfKnown(RoutableKey key, long epoch) - { - EpochState epochState = epochs.get(epoch); - if (epochState == null) - return null; - return epochState.global().forKey(key); - } - - public Shard forEpoch(RoutableKey key, long epoch) - { - Shard ifKnown = forEpochIfKnown(key, epoch); - if (ifKnown == null) - throw new IndexOutOfBoundsException(); - return ifKnown; - } - - public boolean hasEpoch(long epoch) - { - return epochs.get(epoch) != null; - } - - public boolean hasAtLeastEpoch(long epoch) - { - return epochs.currentEpoch >= epoch; - } - - public Topology localForEpoch(long epoch) - { - if (epoch < minEpoch()) - throw new TopologyRetiredException(epoch, minEpoch()); - EpochState epochState = epochs.get(epoch); - if (epochState == null) - throw illegalState("Unknown epoch " + epoch); - return epochState.local(); - } - - public Ranges localRangesForEpoch(long epoch) - { - if (epoch < minEpoch()) - throw new TopologyRetiredException(epoch, minEpoch()); - return epochs.get(epoch).local().rangesForNode(self); } - public Ranges localRangesForEpochs(long start, long end) + @VisibleForTesting + protected EpochReady bootstrap(Supplier<EpochReady> bootstrap) { - if (end < start) throw new IllegalArgumentException(); - Ranges ranges = localRangesForEpoch(start); - for (long i = start + 1; i <= end ; ++i) - ranges = ranges.with(localRangesForEpoch(i)); - return ranges; + return bootstrap.get(); } - public Topology globalForEpoch(long epoch) + private static EpochReady orderReporting(EpochReady previous, EpochReady next) { - EpochState epochState = epochs.get(epoch); - if (epochState == null) - throw new IllegalArgumentException("Unknown epoch: " + epoch); - return epochState.global(); - } + if (previous.epoch + 1 != next.epoch) + throw new IllegalArgumentException("Attempted to order epochs but they are not next to each other... previous=" + previous.epoch + ", next=" + next.epoch); + if (previous.coordinate.isDone() && previous.data.isDone() && previous.reads.isDone()) + return next; - public Topology maybeGlobalForEpoch(long epoch) - { - EpochState epochState = epochs.get(epoch); - if (epochState == null) - return null; - return epochState.global(); + return new EpochReady(next.epoch, + next.active, + NestedAsyncResult.flatMap(previous.coordinate, ignore -> next.coordinate), + NestedAsyncResult.flatMap(previous.data, ignore -> next.data), + NestedAsyncResult.flatMap(previous.reads, ignore -> next.reads) + ); } - static class TopologiesCollectors implements Collectors<Topologies.Builder, Routables<?>, Topologies> + public AsyncChain<Void> await(long epoch, @Nullable AsyncExecutor ifAsync) { - final TopologySorter.Supplier sorter; - final SelectNodeOwnership selectNodeOwnership; - - TopologiesCollectors(TopologySorter.Supplier sorter, SelectNodeOwnership selectNodeOwnership) - { - this.sorter = sorter; - this.selectNodeOwnership = selectNodeOwnership; - } - - @Override - public Topologies.Builder update(Topologies.Builder collector, EpochState epoch, Routables<?> select, boolean permitMissing) + PendingEpoch pendingEpoch; + boolean fetch; + synchronized (this) { - collector.add(epoch.global.select(select, permitMissing, selectNodeOwnership)); - return collector; - } + if (epoch <= active.currentEpoch) + return AsyncChains.success(null); - @Override - public Topologies one(EpochState epoch, Routables<?> unseekables, boolean permitMissing) - { - return new Topologies.Single(sorter, epoch.global.select(unseekables, permitMissing, selectNodeOwnership)); + pendingEpoch = pending.getOrCreate(epoch); + fetch = pendingEpoch.fetching == null; } - @Override - public Topologies multi(Topologies.Builder builder) + AsyncChain<Void> result = pendingEpoch.whenActive().chainImmediatelyElse(ifAsync); + if (fetch) { - return builder.build(sorter); - } + while (true) + { + fetch(pendingEpoch); + --epoch; + synchronized (this) + { + if (epoch <= active.currentEpoch) + break; - @Override - public Topologies.Builder allocate(int count) - { - return new Topologies.Builder(count); + pendingEpoch = pending.getOrCreate(epoch); + if (pendingEpoch.fetching != null) + break; + } + } } + return result; } - static class BestFastPath implements Collectors<FastPath, Routables<?>, FastPath>, IndexedBiFunction<Shard, Boolean, Boolean> + private void fetch(PendingEpoch pending) { - final Id self; - - BestFastPath(Id self) - { - this.self = self; - } - - @Override - public FastPath update(FastPath collector, EpochState epoch, Routables<?> select, boolean permitMissing) - { - return merge(collector, one(epoch, select, permitMissing)); - } - - @Override - public FastPath one(EpochState epoch, Routables<?> routables, boolean permitMissing) - { - if (!epoch.local.ranges.containsAll(routables) || !epoch.local.foldl(routables, this, true)) - return Unoptimised; - - return epoch.local.foldl(routables, (s, v, i) -> merge(v, s.bestFastPath()), null); - } - - @Override - public FastPath multi(FastPath result) - { - return result; - } - - @Override - public FastPath allocate(int count) + synchronized (this) { - return null; - } + if (pending.topology() != null || pending.epoch < active.currentEpoch) + return; - private static FastPath merge(FastPath a, FastPath b) - { - if (a == null) return b; - if (a == Unoptimised || b == Unoptimised) return Unoptimised; - if (a == PrivilegedCoordinatorWithDeps || b == PrivilegedCoordinatorWithDeps) return PrivilegedCoordinatorWithDeps; - return PrivilegedCoordinatorWithoutDeps; + Invariants.require(pending.fetching == null || pending.fetching.isDone()); + pending.fetching = topologyService.fetchTopologyForEpoch(pending.epoch); } - @Override - public Boolean apply(Shard shard, Boolean prev, int index) - { - return prev && shard.isInFastPath(self); - } + pending.fetching.invoke((success, fail) -> { + if (fail == null) reportTopology(success); + else if (active.currentEpoch < pending.epoch && pending.topology() == null) + { + // TODO (expected): special casing of TopologyRetiredException? + logger.warn("Failed to fetch epoch {}. Retrying.", pending.epoch, fail); + node.agent().onCaughtException(fail, "Fetch epoch " + pending.epoch); + long retryInMicros = node.agent().retryTopologyDelay(node, 1 + ++pending.fetchAttempts, TimeUnit.MICROSECONDS); Review Comment: looks like `pending.fetchAttempts` can be incremented concurrently, since lock is not held at that moment anymore. ########## accord-core/src/main/java/accord/topology/PendingEpoch.java: ########## @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package accord.topology; + +import java.util.ArrayDeque; +import java.util.Set; +import javax.annotation.concurrent.GuardedBy; + +import accord.api.Timeouts; +import accord.coordinate.EpochTimeout; +import accord.local.Node; +import accord.primitives.Ranges; +import accord.utils.async.AsyncResult; +import accord.utils.async.AsyncResults; +import org.agrona.collections.ObjectHashSet; + +import static accord.primitives.AbstractRanges.UnionMode.MERGE_ADJACENT; +import static java.util.concurrent.TimeUnit.MICROSECONDS; + +class PendingEpoch implements Timeouts.Timeout +{ + final long epoch; + + private final TopologyManager manager; + private boolean isActive; + private final ArrayDeque<WaitingForEpoch> waiting = new ArrayDeque<>(); + private Timeouts.RegisteredTimeout timeout; + + private volatile Topology topology; + volatile int fetchAttempts; + AsyncResult<Topology> fetching; + + @GuardedBy("TopologyManager.this") + final Set<Node.Id> ready = new ObjectHashSet<>(); + @GuardedBy("TopologyManager.this") + Ranges closed = Ranges.EMPTY, retired = Ranges.EMPTY; + + public PendingEpoch(long epoch, TopologyManager manager) + { + this.epoch = epoch; + this.manager = manager; + } + + @GuardedBy("TopologyManager") + void remoteReadyToCoordinate(Node.Id id) + { + ready.add(id); + } + + @GuardedBy("TopologyManager") + Ranges closed(Ranges ranges) + { + ranges = ranges.without(closed); + closed = closed.union(MERGE_ADJACENT, ranges); + return ranges; + } + + @GuardedBy("TopologyManager") + Ranges retired(Ranges ranges) + { + ranges = ranges.without(retired); + retired = retired.union(MERGE_ADJACENT, ranges); + return ranges; + } + + Topology topology() + { + return topology; + } + + void setTopology(Topology topology) + { + this.topology = topology; + } + + // TODO (expected): pass through request deadline + AsyncResult<Void> whenActive() + { + WaitingForEpoch result, last; + synchronized (this) + { + if (isActive) + return AsyncResults.success(null); + + long timeoutMicros = manager.node.agent().expireEpochWait(MICROSECONDS); + long deadlineMicros = manager.time.elapsed(MICROSECONDS) + timeoutMicros; + result = last = waiting.peekLast(); + if (last == null || last.deadlineMicros < deadlineMicros) + waiting.add(result = new WaitingForEpoch(deadlineMicros + (timeoutMicros / 10))); + } + if (last == null) Review Comment: Could you help me understand why it is safe for this to be outside `syncrhonized` block? `timeout` is a non-volatile field, so I am concerned that we will add `waiting` but not have a corresponding timeout. ########## accord-core/src/main/java/accord/topology/PendingEpoch.java: ########## @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package accord.topology; + +import java.util.ArrayDeque; +import java.util.Set; +import javax.annotation.concurrent.GuardedBy; + +import accord.api.Timeouts; +import accord.coordinate.EpochTimeout; +import accord.local.Node; +import accord.primitives.Ranges; +import accord.utils.async.AsyncResult; +import accord.utils.async.AsyncResults; +import org.agrona.collections.ObjectHashSet; + +import static accord.primitives.AbstractRanges.UnionMode.MERGE_ADJACENT; +import static java.util.concurrent.TimeUnit.MICROSECONDS; + +class PendingEpoch implements Timeouts.Timeout +{ + final long epoch; + + private final TopologyManager manager; + private boolean isActive; + private final ArrayDeque<WaitingForEpoch> waiting = new ArrayDeque<>(); + private Timeouts.RegisteredTimeout timeout; + + private volatile Topology topology; + volatile int fetchAttempts; + AsyncResult<Topology> fetching; + + @GuardedBy("TopologyManager.this") + final Set<Node.Id> ready = new ObjectHashSet<>(); + @GuardedBy("TopologyManager.this") + Ranges closed = Ranges.EMPTY, retired = Ranges.EMPTY; + + public PendingEpoch(long epoch, TopologyManager manager) + { + this.epoch = epoch; + this.manager = manager; + } + + @GuardedBy("TopologyManager") Review Comment: Looks like it is guarded by `TopologyManager.this` (also subsequent mentions). ########## accord-core/src/main/java/accord/topology/ActiveEpochs.java: ########## @@ -0,0 +1,723 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package accord.topology; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.function.BiFunction; +import java.util.function.Function; +import java.util.stream.Stream; +import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import accord.api.ProtocolModifiers; +import accord.local.Node; +import accord.primitives.EpochSupplier; +import accord.primitives.Participants; +import accord.primitives.Ranges; +import accord.primitives.RoutableKey; +import accord.primitives.Routables; +import accord.primitives.Timestamp; +import accord.primitives.TxnId; +import accord.primitives.Unseekables; +import accord.utils.Invariants; + +import static accord.api.ProtocolModifiers.QuorumEpochIntersections.Include.Owned; +import static accord.api.ProtocolModifiers.QuorumEpochIntersections.Include.Unsynced; + +public final class ActiveEpochs implements Iterable<ActiveEpoch> +{ + private static final Logger logger = LoggerFactory.getLogger(ActiveEpochs.class); + + final TopologyManager manager; + final long currentEpoch; + // TODO (desired): move this to TopologyManager + final long firstNonEmptyEpoch; + // Epochs are sorted in _descending_ order + final ActiveEpoch[] epochs; + + ActiveEpochs(TopologyManager manager, ActiveEpoch[] epochs, long prevFirstNonEmptyEpoch) + { + this.manager = manager; + this.currentEpoch = epochs.length > 0 ? epochs[0].epoch() : 0; + if (prevFirstNonEmptyEpoch != -1) + this.firstNonEmptyEpoch = prevFirstNonEmptyEpoch; + else if (epochs.length > 0 && !epochs[0].global().isEmpty()) + this.firstNonEmptyEpoch = currentEpoch; + else + this.firstNonEmptyEpoch = prevFirstNonEmptyEpoch; + + for (int i = 1; i < epochs.length; i++) + Invariants.requireArgument(epochs[i].epoch() == epochs[i - 1].epoch() - 1); + int truncateFrom = -1; + // > 0 because we do not want to be left without epochs in case they're all empty + for (int i = epochs.length - 1; i > 0; i--) + { + ActiveEpoch e = epochs[i]; + if (!e.allRetired()) + break; + truncateFrom = i; + } + + if (truncateFrom == -1) + { + this.epochs = epochs; + } + else + { + this.epochs = Arrays.copyOf(epochs, truncateFrom); + for (int i = truncateFrom; i < epochs.length; i++) + { + ActiveEpoch e = epochs[i]; + Invariants.require(epochs[i].isQuorumReady()); + logger.info("Retired epoch {} with added/removed ranges {}/{}. Topology: {}. Closed: {}", e.epoch(), e.addedRanges, e.removedRanges, e.global.ranges, e.closed()); + } + if (logger.isTraceEnabled()) + { + for (int i = 0; i < truncateFrom; i++) + { + ActiveEpoch e = epochs[i]; + Invariants.require(e.isQuorumReady()); + logger.trace("Leaving epoch {} with added/removed ranges {}/{}", e.epoch(), e.addedRanges, e.removedRanges); + } + } + } + } + + /** + * If ranges were added in epoch X, and are _not_ present in the current epoch, they + * are purged and durability scheduling for them should be cancelled. + */ + public synchronized boolean isFullyRetired(Ranges ranges) + { + // TODO (required): what was this trying to do? Seems potentially faulty (but maybe correct given other assumptions) + if (isEmpty()) + return false; + + ActiveEpoch cur = epochs[0]; + ranges = ranges.without(cur.addedRanges); + if (!cur.addedRanges.containsAll(ranges)) + return false; + + for (long i = 0 ; i < epochs.length; i++) + { + ActiveEpoch retiredIn = get(i); + if (retiredIn.allRetired()) + return true; + + if (retiredIn.retired().containsAll(ranges)) + return true; + } + + return false; + } + + + public boolean isEmpty() + { + return epochs.length == 0; + } + + public long nextEpoch() + { + return current().epoch + 1; + } + + public long minEpoch() + { + if (currentEpoch == 0) + return 0; + return currentEpoch - epochs.length + 1; + } + + public long epoch() + { + return currentEpoch; + } + + public Topology current() + { + return epochs.length > 0 ? epochs[0].global() : Topology.EMPTY; + } + + public Topology currentLocal() + { + return epochs.length > 0 ? epochs[0].local() : Topology.EMPTY; + } + + boolean isQuorumReady(long epoch) + { + ActiveEpoch e = ifExists(epoch); + return e != null && e.isQuorumReady(); + } + + @VisibleForTesting + public EpochReady epochReady(long epoch) + { + if (epoch < minEpoch()) + return EpochReady.done(epoch); + + if (epoch > currentEpoch) + throw new IllegalArgumentException(String.format("Epoch %d is larger than current epoch %d", epoch, currentEpoch)); + + return get(epoch).epochReady(); + } + + /** + * Mark sync complete for the given node/epoch, and if this epoch + * is now synced, update the prevSynced flag on superseding epochs + */ + @GuardedBy("TopologyManager") + void onReadyToCoordinate(Node.Id id, long epoch) + { + Invariants.requireArgument(epoch >= 0); + + int i = indexIfExists(epoch); + if (i < 0) + { + if (epoch > currentEpoch) i = 0; + else return; + } + + while (i < epochs.length && epochs[i].onReadyToCoordinate(id)) + ++i; + } + + private Ranges closedOrRetired(Ranges ranges, long epoch, BiFunction<ActiveEpoch, Ranges, Ranges> f) + { + Invariants.requireArgument(epoch >= 0); + + int i = indexIfExists(epoch); + if (i < 0) + { + if (epoch > currentEpoch) + { + if (epochs.length == 0) + return ranges; + i = 0; + } + else + { + return Ranges.EMPTY; // notification came for an already truncated epoch + } + } + + Ranges cur = f.apply(epochs[i++], ranges); + Ranges report = epoch <= currentEpoch ? cur : ranges; + while (!cur.isEmpty() && i < epochs.length) + cur = f.apply(epochs[i++], cur); + return report; + } + + /** + * Mark the epoch as "closed" for the provided ranges; this means that no new transactions + * that intersect with this range may be proposed in the epoch (they will be rejected). + */ + @GuardedBy("TopologyManager") + Ranges closed(Ranges ranges, long epoch) + { + return closedOrRetired(ranges, epoch, ActiveEpoch::recordClosed); + } + + /** + * Mark the epoch as "retired" for the provided ranges; this means that all transactions that can be + * proposed for this epoch have now been executed globally. + */ + @GuardedBy("TopologyManager") + Ranges retired(Ranges ranges, long epoch) + { + return closedOrRetired(ranges, epoch, ActiveEpoch::recordRetired); + } + + ActiveEpoch get(long epoch) + { + int index = indexOf(epoch); + return epochs[index]; + } + + public @Nullable ActiveEpoch ifExists(long epoch) + { + int index = indexIfExists(epoch); + if (index < 0) + return null; + + return epochs[index]; + } + + private int indexOf(long epoch) + { + if (epoch < minEpoch()) throw new TopologyRetiredException(epoch, minEpoch()); + else if (epoch > currentEpoch) throw new IllegalArgumentException("Topology in future: " + epoch + " (max active: " + currentEpoch + ")"); + + return (int) (currentEpoch - epoch); + } + + private int indexIfExists(long epoch) + { + if (epoch > currentEpoch || epoch <= currentEpoch - epochs.length) + return -1; + + return (int) (currentEpoch - epoch); + } + + + /** + * Fetch topologies between {@param minEpoch} (inclusive), and {@param maxEpoch} (inclusive). + */ + public TopologyRange between(long minEpoch, long maxEpoch) + { + // No epochs known to Accord + if (firstNonEmptyEpoch == -1 || minEpoch > currentEpoch) + return new TopologyRange(minEpoch(), currentEpoch, firstNonEmptyEpoch, Collections.emptyList()); + + minEpoch = Math.max(minEpoch, minEpoch()); + int diff = Math.toIntExact(currentEpoch - minEpoch + 1); + List<Topology> topologies = new ArrayList<>(diff); + for (int i = 0; minEpoch() + i <= maxEpoch && i < diff; i++) + topologies.add(get(minEpoch + i).global); + + return new TopologyRange(minEpoch(), currentEpoch, firstNonEmptyEpoch, topologies); Review Comment: I think this one should also be `minEpoch` ########## accord-core/src/main/java/accord/topology/PendingEpochs.java: ########## @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package accord.topology; + +import accord.local.Node; +import accord.primitives.Ranges; +import accord.utils.Invariants; + +/** + * Thread safety is managed by a synchronized lock on this object, and extending classes can use the same lock when needed. Review Comment: Looks like thread safety is guaranteed by `TopologyManager` synchornization, or? ########## accord-core/src/main/java/accord/topology/ActiveEpochs.java: ########## @@ -0,0 +1,723 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package accord.topology; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.function.BiFunction; +import java.util.function.Function; +import java.util.stream.Stream; +import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import accord.api.ProtocolModifiers; +import accord.local.Node; +import accord.primitives.EpochSupplier; +import accord.primitives.Participants; +import accord.primitives.Ranges; +import accord.primitives.RoutableKey; +import accord.primitives.Routables; +import accord.primitives.Timestamp; +import accord.primitives.TxnId; +import accord.primitives.Unseekables; +import accord.utils.Invariants; + +import static accord.api.ProtocolModifiers.QuorumEpochIntersections.Include.Owned; +import static accord.api.ProtocolModifiers.QuorumEpochIntersections.Include.Unsynced; + +public final class ActiveEpochs implements Iterable<ActiveEpoch> +{ + private static final Logger logger = LoggerFactory.getLogger(ActiveEpochs.class); + + final TopologyManager manager; + final long currentEpoch; + // TODO (desired): move this to TopologyManager + final long firstNonEmptyEpoch; + // Epochs are sorted in _descending_ order + final ActiveEpoch[] epochs; + + ActiveEpochs(TopologyManager manager, ActiveEpoch[] epochs, long prevFirstNonEmptyEpoch) + { + this.manager = manager; + this.currentEpoch = epochs.length > 0 ? epochs[0].epoch() : 0; + if (prevFirstNonEmptyEpoch != -1) + this.firstNonEmptyEpoch = prevFirstNonEmptyEpoch; + else if (epochs.length > 0 && !epochs[0].global().isEmpty()) + this.firstNonEmptyEpoch = currentEpoch; + else + this.firstNonEmptyEpoch = prevFirstNonEmptyEpoch; + + for (int i = 1; i < epochs.length; i++) + Invariants.requireArgument(epochs[i].epoch() == epochs[i - 1].epoch() - 1); + int truncateFrom = -1; + // > 0 because we do not want to be left without epochs in case they're all empty + for (int i = epochs.length - 1; i > 0; i--) + { + ActiveEpoch e = epochs[i]; + if (!e.allRetired()) + break; + truncateFrom = i; + } + + if (truncateFrom == -1) + { + this.epochs = epochs; + } + else + { + this.epochs = Arrays.copyOf(epochs, truncateFrom); + for (int i = truncateFrom; i < epochs.length; i++) + { + ActiveEpoch e = epochs[i]; + Invariants.require(epochs[i].isQuorumReady()); + logger.info("Retired epoch {} with added/removed ranges {}/{}. Topology: {}. Closed: {}", e.epoch(), e.addedRanges, e.removedRanges, e.global.ranges, e.closed()); + } + if (logger.isTraceEnabled()) + { + for (int i = 0; i < truncateFrom; i++) + { + ActiveEpoch e = epochs[i]; + Invariants.require(e.isQuorumReady()); + logger.trace("Leaving epoch {} with added/removed ranges {}/{}", e.epoch(), e.addedRanges, e.removedRanges); + } + } + } + } + + /** + * If ranges were added in epoch X, and are _not_ present in the current epoch, they + * are purged and durability scheduling for them should be cancelled. + */ + public synchronized boolean isFullyRetired(Ranges ranges) + { + // TODO (required): what was this trying to do? Seems potentially faulty (but maybe correct given other assumptions) + if (isEmpty()) + return false; + + ActiveEpoch cur = epochs[0]; + ranges = ranges.without(cur.addedRanges); + if (!cur.addedRanges.containsAll(ranges)) + return false; + + for (long i = 0 ; i < epochs.length; i++) + { + ActiveEpoch retiredIn = get(i); + if (retiredIn.allRetired()) + return true; + + if (retiredIn.retired().containsAll(ranges)) + return true; + } + + return false; + } + + + public boolean isEmpty() + { + return epochs.length == 0; + } + + public long nextEpoch() + { + return current().epoch + 1; + } + + public long minEpoch() + { + if (currentEpoch == 0) + return 0; + return currentEpoch - epochs.length + 1; + } + + public long epoch() + { + return currentEpoch; + } + + public Topology current() + { + return epochs.length > 0 ? epochs[0].global() : Topology.EMPTY; + } + + public Topology currentLocal() + { + return epochs.length > 0 ? epochs[0].local() : Topology.EMPTY; + } + + boolean isQuorumReady(long epoch) + { + ActiveEpoch e = ifExists(epoch); + return e != null && e.isQuorumReady(); + } + + @VisibleForTesting + public EpochReady epochReady(long epoch) + { + if (epoch < minEpoch()) + return EpochReady.done(epoch); + + if (epoch > currentEpoch) + throw new IllegalArgumentException(String.format("Epoch %d is larger than current epoch %d", epoch, currentEpoch)); + + return get(epoch).epochReady(); + } + + /** + * Mark sync complete for the given node/epoch, and if this epoch + * is now synced, update the prevSynced flag on superseding epochs + */ + @GuardedBy("TopologyManager") + void onReadyToCoordinate(Node.Id id, long epoch) + { + Invariants.requireArgument(epoch >= 0); + + int i = indexIfExists(epoch); + if (i < 0) + { + if (epoch > currentEpoch) i = 0; + else return; + } + + while (i < epochs.length && epochs[i].onReadyToCoordinate(id)) + ++i; + } + + private Ranges closedOrRetired(Ranges ranges, long epoch, BiFunction<ActiveEpoch, Ranges, Ranges> f) + { + Invariants.requireArgument(epoch >= 0); + + int i = indexIfExists(epoch); + if (i < 0) + { + if (epoch > currentEpoch) + { + if (epochs.length == 0) + return ranges; + i = 0; + } + else + { + return Ranges.EMPTY; // notification came for an already truncated epoch + } + } + + Ranges cur = f.apply(epochs[i++], ranges); + Ranges report = epoch <= currentEpoch ? cur : ranges; + while (!cur.isEmpty() && i < epochs.length) + cur = f.apply(epochs[i++], cur); + return report; + } + + /** + * Mark the epoch as "closed" for the provided ranges; this means that no new transactions + * that intersect with this range may be proposed in the epoch (they will be rejected). + */ + @GuardedBy("TopologyManager") + Ranges closed(Ranges ranges, long epoch) + { + return closedOrRetired(ranges, epoch, ActiveEpoch::recordClosed); + } + + /** + * Mark the epoch as "retired" for the provided ranges; this means that all transactions that can be + * proposed for this epoch have now been executed globally. + */ + @GuardedBy("TopologyManager") + Ranges retired(Ranges ranges, long epoch) + { + return closedOrRetired(ranges, epoch, ActiveEpoch::recordRetired); + } + + ActiveEpoch get(long epoch) + { + int index = indexOf(epoch); + return epochs[index]; + } + + public @Nullable ActiveEpoch ifExists(long epoch) + { + int index = indexIfExists(epoch); + if (index < 0) + return null; + + return epochs[index]; + } + + private int indexOf(long epoch) + { + if (epoch < minEpoch()) throw new TopologyRetiredException(epoch, minEpoch()); + else if (epoch > currentEpoch) throw new IllegalArgumentException("Topology in future: " + epoch + " (max active: " + currentEpoch + ")"); + + return (int) (currentEpoch - epoch); + } + + private int indexIfExists(long epoch) + { + if (epoch > currentEpoch || epoch <= currentEpoch - epochs.length) + return -1; + + return (int) (currentEpoch - epoch); + } + + + /** + * Fetch topologies between {@param minEpoch} (inclusive), and {@param maxEpoch} (inclusive). + */ + public TopologyRange between(long minEpoch, long maxEpoch) + { + // No epochs known to Accord + if (firstNonEmptyEpoch == -1 || minEpoch > currentEpoch) + return new TopologyRange(minEpoch(), currentEpoch, firstNonEmptyEpoch, Collections.emptyList()); + + minEpoch = Math.max(minEpoch, minEpoch()); + int diff = Math.toIntExact(currentEpoch - minEpoch + 1); + List<Topology> topologies = new ArrayList<>(diff); + for (int i = 0; minEpoch() + i <= maxEpoch && i < diff; i++) Review Comment: Was `minEpoch + i` meant here? (just double checked, looks like this was pre-existing code, so this is my bad...) ########## accord-core/src/main/java/accord/topology/EpochReady.java: ########## @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package accord.topology; + +import accord.utils.Invariants; +import accord.utils.async.AsyncResult; +import accord.utils.async.AsyncResults; +import accord.utils.async.NestedAsyncResult; + +/** + * Review Comment: nit: empty comment ########## accord-core/src/main/java/accord/topology/ActiveEpochs.java: ########## @@ -0,0 +1,723 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package accord.topology; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.function.BiFunction; +import java.util.function.Function; +import java.util.stream.Stream; +import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import accord.api.ProtocolModifiers; +import accord.local.Node; +import accord.primitives.EpochSupplier; +import accord.primitives.Participants; +import accord.primitives.Ranges; +import accord.primitives.RoutableKey; +import accord.primitives.Routables; +import accord.primitives.Timestamp; +import accord.primitives.TxnId; +import accord.primitives.Unseekables; +import accord.utils.Invariants; + +import static accord.api.ProtocolModifiers.QuorumEpochIntersections.Include.Owned; +import static accord.api.ProtocolModifiers.QuorumEpochIntersections.Include.Unsynced; + +public final class ActiveEpochs implements Iterable<ActiveEpoch> +{ + private static final Logger logger = LoggerFactory.getLogger(ActiveEpochs.class); + + final TopologyManager manager; + final long currentEpoch; + // TODO (desired): move this to TopologyManager + final long firstNonEmptyEpoch; + // Epochs are sorted in _descending_ order + final ActiveEpoch[] epochs; + + ActiveEpochs(TopologyManager manager, ActiveEpoch[] epochs, long prevFirstNonEmptyEpoch) + { + this.manager = manager; + this.currentEpoch = epochs.length > 0 ? epochs[0].epoch() : 0; + if (prevFirstNonEmptyEpoch != -1) + this.firstNonEmptyEpoch = prevFirstNonEmptyEpoch; + else if (epochs.length > 0 && !epochs[0].global().isEmpty()) + this.firstNonEmptyEpoch = currentEpoch; + else + this.firstNonEmptyEpoch = prevFirstNonEmptyEpoch; + + for (int i = 1; i < epochs.length; i++) + Invariants.requireArgument(epochs[i].epoch() == epochs[i - 1].epoch() - 1); + int truncateFrom = -1; + // > 0 because we do not want to be left without epochs in case they're all empty + for (int i = epochs.length - 1; i > 0; i--) + { + ActiveEpoch e = epochs[i]; + if (!e.allRetired()) + break; + truncateFrom = i; + } + + if (truncateFrom == -1) + { + this.epochs = epochs; + } + else + { + this.epochs = Arrays.copyOf(epochs, truncateFrom); + for (int i = truncateFrom; i < epochs.length; i++) + { + ActiveEpoch e = epochs[i]; + Invariants.require(epochs[i].isQuorumReady()); + logger.info("Retired epoch {} with added/removed ranges {}/{}. Topology: {}. Closed: {}", e.epoch(), e.addedRanges, e.removedRanges, e.global.ranges, e.closed()); + } + if (logger.isTraceEnabled()) + { + for (int i = 0; i < truncateFrom; i++) + { + ActiveEpoch e = epochs[i]; + Invariants.require(e.isQuorumReady()); + logger.trace("Leaving epoch {} with added/removed ranges {}/{}", e.epoch(), e.addedRanges, e.removedRanges); + } + } + } + } + + /** + * If ranges were added in epoch X, and are _not_ present in the current epoch, they + * are purged and durability scheduling for them should be cancelled. + */ + public synchronized boolean isFullyRetired(Ranges ranges) + { + // TODO (required): what was this trying to do? Seems potentially faulty (but maybe correct given other assumptions) + if (isEmpty()) + return false; + + ActiveEpoch cur = epochs[0]; + ranges = ranges.without(cur.addedRanges); + if (!cur.addedRanges.containsAll(ranges)) + return false; + + for (long i = 0 ; i < epochs.length; i++) Review Comment: It seems like we need`int` here and ``` ActiveEpoch retiredIn = epochs[i]; ``` below ########## accord-core/src/test/java/accord/impl/TopologyManager2Test.java: ########## @@ -0,0 +1,321 @@ +///* +// * Licensed to the Apache Software Foundation (ASF) under one +// * or more contributor license agreements. See the NOTICE file +// * distributed with this work for additional information +// * regarding copyright ownership. The ASF licenses this file +// * to you under the Apache License, Version 2.0 (the +// * "License"); you may not use this file except in compliance +// * with the License. You may obtain a copy of the License at +// * +// * http://www.apache.org/licenses/LICENSE-2.0 +// * +// * Unless required by applicable law or agreed to in writing, software +// * distributed under the License is distributed on an "AS IS" BASIS, +// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// * See the License for the specific language governing permissions and +// * limitations under the License. +// */ +// +//package accord.impl; +// +//import java.util.Arrays; +//import java.util.HashMap; +//import java.util.HashSet; +//import java.util.Map; +//import java.util.Objects; +//import java.util.Set; +//import java.util.stream.Collectors; +// +//import accord.api.Agent; +//import accord.api.TopologyListener; +//import accord.topology.EpochReady; +//import accord.primitives.Ranges; +//import accord.utils.SortedArrays.SortedArrayList; +//import accord.utils.async.AsyncResult; +//import accord.utils.async.AsyncResults; +// +//import com.google.common.collect.ImmutableSet; +// +//import accord.api.TopologyService; +//import accord.local.Node.Id; +//import accord.primitives.Range; +//import accord.topology.Shard; +//import accord.topology.Topology; +//import org.junit.jupiter.api.Assertions; +//import org.junit.jupiter.api.Test; +// +//public class TopologyManager2Test Review Comment: Should we remove this test? ########## accord-core/src/main/java/accord/topology/ActiveEpochs.java: ########## @@ -0,0 +1,723 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package accord.topology; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.function.BiFunction; +import java.util.function.Function; +import java.util.stream.Stream; +import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import accord.api.ProtocolModifiers; +import accord.local.Node; +import accord.primitives.EpochSupplier; +import accord.primitives.Participants; +import accord.primitives.Ranges; +import accord.primitives.RoutableKey; +import accord.primitives.Routables; +import accord.primitives.Timestamp; +import accord.primitives.TxnId; +import accord.primitives.Unseekables; +import accord.utils.Invariants; + +import static accord.api.ProtocolModifiers.QuorumEpochIntersections.Include.Owned; +import static accord.api.ProtocolModifiers.QuorumEpochIntersections.Include.Unsynced; + +public final class ActiveEpochs implements Iterable<ActiveEpoch> +{ + private static final Logger logger = LoggerFactory.getLogger(ActiveEpochs.class); + + final TopologyManager manager; + final long currentEpoch; + // TODO (desired): move this to TopologyManager + final long firstNonEmptyEpoch; + // Epochs are sorted in _descending_ order + final ActiveEpoch[] epochs; + + ActiveEpochs(TopologyManager manager, ActiveEpoch[] epochs, long prevFirstNonEmptyEpoch) + { + this.manager = manager; + this.currentEpoch = epochs.length > 0 ? epochs[0].epoch() : 0; + if (prevFirstNonEmptyEpoch != -1) + this.firstNonEmptyEpoch = prevFirstNonEmptyEpoch; + else if (epochs.length > 0 && !epochs[0].global().isEmpty()) + this.firstNonEmptyEpoch = currentEpoch; + else + this.firstNonEmptyEpoch = prevFirstNonEmptyEpoch; + + for (int i = 1; i < epochs.length; i++) + Invariants.requireArgument(epochs[i].epoch() == epochs[i - 1].epoch() - 1); + int truncateFrom = -1; + // > 0 because we do not want to be left without epochs in case they're all empty + for (int i = epochs.length - 1; i > 0; i--) + { + ActiveEpoch e = epochs[i]; + if (!e.allRetired()) + break; + truncateFrom = i; + } + + if (truncateFrom == -1) + { + this.epochs = epochs; + } + else + { + this.epochs = Arrays.copyOf(epochs, truncateFrom); + for (int i = truncateFrom; i < epochs.length; i++) + { + ActiveEpoch e = epochs[i]; + Invariants.require(epochs[i].isQuorumReady()); + logger.info("Retired epoch {} with added/removed ranges {}/{}. Topology: {}. Closed: {}", e.epoch(), e.addedRanges, e.removedRanges, e.global.ranges, e.closed()); + } + if (logger.isTraceEnabled()) + { + for (int i = 0; i < truncateFrom; i++) + { + ActiveEpoch e = epochs[i]; + Invariants.require(e.isQuorumReady()); + logger.trace("Leaving epoch {} with added/removed ranges {}/{}", e.epoch(), e.addedRanges, e.removedRanges); + } + } + } + } + + /** + * If ranges were added in epoch X, and are _not_ present in the current epoch, they + * are purged and durability scheduling for them should be cancelled. + */ + public synchronized boolean isFullyRetired(Ranges ranges) + { + // TODO (required): what was this trying to do? Seems potentially faulty (but maybe correct given other assumptions) + if (isEmpty()) + return false; + + ActiveEpoch cur = epochs[0]; + ranges = ranges.without(cur.addedRanges); + if (!cur.addedRanges.containsAll(ranges)) + return false; + + for (long i = 0 ; i < epochs.length; i++) + { + ActiveEpoch retiredIn = get(i); + if (retiredIn.allRetired()) + return true; + + if (retiredIn.retired().containsAll(ranges)) + return true; + } + + return false; + } + + + public boolean isEmpty() + { + return epochs.length == 0; + } + + public long nextEpoch() + { + return current().epoch + 1; + } + + public long minEpoch() + { + if (currentEpoch == 0) + return 0; + return currentEpoch - epochs.length + 1; + } + + public long epoch() + { + return currentEpoch; + } + + public Topology current() + { + return epochs.length > 0 ? epochs[0].global() : Topology.EMPTY; + } + + public Topology currentLocal() + { + return epochs.length > 0 ? epochs[0].local() : Topology.EMPTY; + } + + boolean isQuorumReady(long epoch) + { + ActiveEpoch e = ifExists(epoch); + return e != null && e.isQuorumReady(); + } + + @VisibleForTesting + public EpochReady epochReady(long epoch) + { + if (epoch < minEpoch()) + return EpochReady.done(epoch); + + if (epoch > currentEpoch) + throw new IllegalArgumentException(String.format("Epoch %d is larger than current epoch %d", epoch, currentEpoch)); + + return get(epoch).epochReady(); + } + + /** + * Mark sync complete for the given node/epoch, and if this epoch + * is now synced, update the prevSynced flag on superseding epochs + */ + @GuardedBy("TopologyManager") + void onReadyToCoordinate(Node.Id id, long epoch) + { + Invariants.requireArgument(epoch >= 0); + + int i = indexIfExists(epoch); + if (i < 0) + { + if (epoch > currentEpoch) i = 0; + else return; + } + + while (i < epochs.length && epochs[i].onReadyToCoordinate(id)) + ++i; + } + + private Ranges closedOrRetired(Ranges ranges, long epoch, BiFunction<ActiveEpoch, Ranges, Ranges> f) + { + Invariants.requireArgument(epoch >= 0); + + int i = indexIfExists(epoch); + if (i < 0) + { + if (epoch > currentEpoch) + { + if (epochs.length == 0) + return ranges; + i = 0; + } + else + { + return Ranges.EMPTY; // notification came for an already truncated epoch + } + } + + Ranges cur = f.apply(epochs[i++], ranges); + Ranges report = epoch <= currentEpoch ? cur : ranges; + while (!cur.isEmpty() && i < epochs.length) + cur = f.apply(epochs[i++], cur); + return report; + } + + /** + * Mark the epoch as "closed" for the provided ranges; this means that no new transactions + * that intersect with this range may be proposed in the epoch (they will be rejected). + */ + @GuardedBy("TopologyManager") + Ranges closed(Ranges ranges, long epoch) + { + return closedOrRetired(ranges, epoch, ActiveEpoch::recordClosed); + } + + /** + * Mark the epoch as "retired" for the provided ranges; this means that all transactions that can be + * proposed for this epoch have now been executed globally. + */ + @GuardedBy("TopologyManager") + Ranges retired(Ranges ranges, long epoch) + { + return closedOrRetired(ranges, epoch, ActiveEpoch::recordRetired); + } + + ActiveEpoch get(long epoch) + { + int index = indexOf(epoch); + return epochs[index]; + } + + public @Nullable ActiveEpoch ifExists(long epoch) + { + int index = indexIfExists(epoch); + if (index < 0) + return null; + + return epochs[index]; + } + + private int indexOf(long epoch) + { + if (epoch < minEpoch()) throw new TopologyRetiredException(epoch, minEpoch()); + else if (epoch > currentEpoch) throw new IllegalArgumentException("Topology in future: " + epoch + " (max active: " + currentEpoch + ")"); + + return (int) (currentEpoch - epoch); + } + + private int indexIfExists(long epoch) + { + if (epoch > currentEpoch || epoch <= currentEpoch - epochs.length) + return -1; + + return (int) (currentEpoch - epoch); + } + + + /** + * Fetch topologies between {@param minEpoch} (inclusive), and {@param maxEpoch} (inclusive). + */ + public TopologyRange between(long minEpoch, long maxEpoch) + { + // No epochs known to Accord + if (firstNonEmptyEpoch == -1 || minEpoch > currentEpoch) + return new TopologyRange(minEpoch(), currentEpoch, firstNonEmptyEpoch, Collections.emptyList()); + + minEpoch = Math.max(minEpoch, minEpoch()); + int diff = Math.toIntExact(currentEpoch - minEpoch + 1); + List<Topology> topologies = new ArrayList<>(diff); + for (int i = 0; minEpoch() + i <= maxEpoch && i < diff; i++) + topologies.add(get(minEpoch + i).global); + + return new TopologyRange(minEpoch(), currentEpoch, firstNonEmptyEpoch, topologies); + } + + public Topologies preciseEpochs(long epoch) + { + return new Topologies.Single(manager.sorter, get(epoch).global); + } + + // TODO (testing): test all of these methods when asking for epochs that have been cleaned up (and other code paths) + + /** + * Returns topologies containing epochs where specified ranges haven't completed synchronization between min/max epochs. + * Can be used during coordination operations to ensure they contact all relevant nodes across topology changes, + * particularly when some ranges are still syncing after cluster membership changes. + */ + public Topologies withUnsyncedEpochs(Unseekables<?> select, Timestamp min, Timestamp max) + { + return withUnsyncedEpochs(select, min.epoch(), max.epoch()); + } + + public Topologies select(Unseekables<?> select, Timestamp min, Timestamp max, Topologies.SelectNodeOwnership selectNodeOwnership, ProtocolModifiers.QuorumEpochIntersections.Include include) + { + return select(select, min.epoch(), max.epoch(), selectNodeOwnership, include); + } + + public Topologies select(Unseekables<?> select, long minEpoch, long maxEpoch, Topologies.SelectNodeOwnership selectNodeOwnership, ProtocolModifiers.QuorumEpochIntersections.Include include) + { + switch (include) + { + default: throw new AssertionError("Unhandled Include: " +include); + case Unsynced: return withUnsyncedEpochs(select, minEpoch, maxEpoch); + case Owned: return preciseEpochs(select, minEpoch, maxEpoch, selectNodeOwnership); + } + } + + public Topologies reselect(@Nullable Topologies prev, @Nullable ProtocolModifiers.QuorumEpochIntersections.Include prevIncluded, Unseekables<?> select, Timestamp min, Timestamp max, Topologies.SelectNodeOwnership selectNodeOwnership, ProtocolModifiers.QuorumEpochIntersections.Include include) + { + return reselect(prev, prevIncluded, select, min.epoch(), max.epoch(), selectNodeOwnership, include); + } + + // prevIncluded may be null even when prev is not null, in cases where we do not know what prev was produced with + public Topologies reselect(@Nullable Topologies prev, @Nullable ProtocolModifiers.QuorumEpochIntersections.Include prevIncluded, Unseekables<?> select, long minEpoch, long maxEpoch, Topologies.SelectNodeOwnership selectNodeOwnership, ProtocolModifiers.QuorumEpochIntersections.Include include) + { + if (include == Owned) + { + if (prev != null && prev.currentEpoch() >= maxEpoch && prev.oldestEpoch() <= minEpoch) + return prev.forEpochs(minEpoch, maxEpoch); + else + return preciseEpochs(select, minEpoch, maxEpoch, selectNodeOwnership); + } + else + { + if (prevIncluded == Unsynced && prev != null && prev.currentEpoch() == maxEpoch && prev.oldestEpoch() == minEpoch) + return prev; + else // TODO (desired): can we avoid recalculating when only minEpoch advances? + return withUnsyncedEpochs(select, minEpoch, maxEpoch); + } + + } + + public <U extends Participants<?>> @Nullable U unsyncedOnly(U select, long beforeEpoch) + { + return extra(select, 0, beforeEpoch, ActiveEpoch::quorumReady, (TopologyCollector.Simple.UnsyncedSelector<U>) TopologyCollector.Simple.UnsyncedSelector.INSTANCE); + } + + public Topologies withUnsyncedEpochs(Unseekables<?> select, long minEpoch, long maxEpoch) + { + Invariants.requireArgument(minEpoch <= maxEpoch, "min epoch %d > max %d", minEpoch, maxEpoch); + return withSufficientEpochsAtLeast(select, minEpoch, maxEpoch, ActiveEpoch::quorumReady); + } + + public TxnId.FastPath selectFastPath(Routables<?> select, long epoch) + { + return atLeast(select, epoch, epoch, ActiveEpoch::quorumReady, manager.bestFastPath); + } + + public boolean supportsPrivilegedFastPath(Routables<?> select, long epoch) + { + return atLeast(select, epoch, epoch, ActiveEpoch::quorumReady, manager.supportsPrivilegedFastPath); + } + + public Topologies withOpenEpochs(Routables<?> select, @Nullable EpochSupplier min, @Nullable EpochSupplier max) + { + return withSufficientEpochsAtMost(select, + min == null ? Long.MIN_VALUE : min.epoch(), + max == null ? Long.MAX_VALUE : max.epoch(), + ActiveEpoch::closed); + } + + public Topologies withUncompletedEpochs(Unseekables<?> select, @Nullable EpochSupplier min, EpochSupplier max) + { + return withSufficientEpochsAtLeast(select, + min == null ? Long.MIN_VALUE : min.epoch(), + max == null ? Long.MAX_VALUE : max.epoch(), + ActiveEpoch::retired); + } + + private Topologies withSufficientEpochsAtLeast(Unseekables<?> select, long minEpoch, long maxEpoch, Function<ActiveEpoch, Ranges> isSufficientFor) + { + return atLeast(select, minEpoch, maxEpoch, isSufficientFor, manager.collector); + } + + private <C, K extends Routables<?>, T> T atLeast(K select, long minEpoch, long maxEpoch, Function<ActiveEpoch, Ranges> isSufficientFor, + TopologyCollector<C, K, T> collectors) throws IllegalArgumentException + { + Invariants.requireArgument(minEpoch <= maxEpoch); + if (maxEpoch < minEpoch()) + throw new TopologyRetiredException(maxEpoch, minEpoch()); + + if (maxEpoch == Long.MAX_VALUE) maxEpoch = currentEpoch; + else Invariants.require(currentEpoch >= maxEpoch, "current epoch %d < max %d", currentEpoch, maxEpoch); + + ActiveEpoch max = get(maxEpoch); + if (minEpoch == maxEpoch && isSufficientFor.apply(max).containsAll(select)) + return collectors.one(max, select, false); + + int i = (int)(currentEpoch - maxEpoch); + int maxi = (int)(Math.min(1 + currentEpoch - minEpoch, epochs.length)); + C collector = collectors.allocate(maxi - i); + + // Previous logic would exclude synced ranges, but this was removed as that makes min epoch selection harder. + // An issue was found where a range was removed from a replica and min selection picked the epoch before that, + // which caused a node to get included in the txn that actually lost the range + // See CASSANDRA-18804 + while (i < maxi) + { + ActiveEpoch e = epochs[i++]; + collector = collectors.update(collector, e, select, false); + select = (K)select.without(e.addedRanges); + } + + if (select.isEmpty()) + return collectors.multi(collector); + + if (i == epochs.length) + { + // now we GC epochs, we cannot rely on addedRanges to remove all ranges, so we also remove the ranges found in the earliest epoch we have + select = (K)select.without(epochs[epochs.length - 1].global.ranges); + if (!select.isEmpty()) + throw Invariants.illegalArgument("Ranges %s could not be found", select); + return collectors.multi(collector); + } + + // remaining is updated based off isSufficientFor, but select is not + Routables<?> remaining = select; + + // include any additional epochs to reach sufficiency + ActiveEpoch prev = epochs[maxi - 1]; + do + { + remaining = remaining.without(isSufficientFor.apply(prev)); + Routables<?> prevSelect = select; + select = (K)select.without(prev.addedRanges); + if (prevSelect != select) // perf optimization; if select wasn't changed (it does not intersect addedRanges), then remaining won't + remaining = remaining.without(prev.addedRanges); + if (remaining.isEmpty()) + return collectors.multi(collector); + + ActiveEpoch next = epochs[i++]; + collector = collectors.update(collector, next, select, false); + prev = next; + } while (i < epochs.length); + // need to remove sufficient / added else remaining may not be empty when the final matches are the last epoch + remaining = remaining.without(isSufficientFor.apply(prev)); + remaining = remaining.without(prev.addedRanges); + // TODO (desired): propagate addedRanges to the earliest epoch we retain for consistency + // now we GC epochs, we cannot rely on addedRanges to remove all ranges, so we also remove the ranges found in the earliest epoch we have + remaining = remaining.without(epochs[epochs.length - 1].global.ranges); + if (!remaining.isEmpty()) + Invariants.illegalArgument("Ranges %s could not be found", remaining); + + return collectors.multi(collector); + } + + private Topologies withSufficientEpochsAtMost(Routables<?> select, long minEpoch, long maxEpoch, Function<ActiveEpoch, Ranges> isSufficientFor) + { + return atMost(select, minEpoch, maxEpoch, isSufficientFor, manager.collector); + } + + private <C, K extends Routables<?>, T> T atMost(K select, long minEpoch, long maxEpoch, Function<ActiveEpoch, Ranges> isSufficientFor, + TopologyCollector<C, K, T> collectors) + { + Invariants.requireArgument(minEpoch <= maxEpoch); + + minEpoch = Math.max(minEpoch(), minEpoch); + maxEpoch = validateMax(maxEpoch); + + ActiveEpoch cur = get(maxEpoch); + if (minEpoch == maxEpoch) + { + // TODO (required): why are we testing isSufficientFor here? minEpoch == maxEpoch, we should always return. + if (isSufficientFor.apply(cur).containsAll(select)) + return collectors.one(cur, select, true); + } + + int i = (int)(currentEpoch - maxEpoch); + int maxi = (int)(Math.min(1 + currentEpoch - minEpoch, epochs.length)); + C collector = collectors.allocate(maxi - i); + + while (!select.isEmpty()) + { + collector = collectors.update(collector, cur, select, true); + select = (K)select.without(cur.addedRanges) + .without(isSufficientFor.apply(cur)); + + if (++i == maxi) + break; + + cur = epochs[i]; + } + + return collectors.multi(collector); + } + + private <C, K extends Routables<?>, T> T extra(K select, long minEpoch, long maxEpoch, Function<ActiveEpoch, Ranges> remove, + TopologyCollector<C, K, T> collectors) + { + Invariants.requireArgument(minEpoch <= maxEpoch); + + minEpoch = Math.max(minEpoch(), minEpoch); + maxEpoch = Math.min(maxEpoch, currentEpoch); + if (maxEpoch <= minEpoch) + return collectors.none(); + + ActiveEpoch cur = get(maxEpoch); + select = (K) select.without(remove.apply(cur)); + if (select.isEmpty()) + return collectors.none(); + + int i = (int)(currentEpoch - maxEpoch); + int maxi = (int)(Math.min(1 + currentEpoch - minEpoch, epochs.length)); + C collector = collectors.allocate(maxi - i); + + while (!select.isEmpty()) + { + collector = collectors.update(collector, cur, select, true); + select = (K)select.without(cur.addedRanges); + + if (++i == maxi) + break; + + cur = epochs[i]; + select = (K)select.without(remove.apply(cur)); + } + + return collectors.multi(collector); + } + + private long validateMax(long maxEpoch) + { + if (maxEpoch == Long.MAX_VALUE) + return currentEpoch; + + Invariants.require(currentEpoch >= maxEpoch, "current epoch %d < provided max %d", currentEpoch, maxEpoch); + if (maxEpoch < minEpoch()) + throw new TopologyRetiredException(maxEpoch, minEpoch()); + return maxEpoch; + } + + public Topologies preciseEpochs(Unseekables<?> select, long minEpoch, long maxEpoch, Topologies.SelectNodeOwnership selectNodeOwnership) + { + return preciseEpochs(select, minEpoch, maxEpoch, selectNodeOwnership, Topology::select); + } + + public Topologies preciseEpochsIfExists(Unseekables<?> select, long minEpoch, long maxEpoch, Topologies.SelectNodeOwnership selectNodeOwnership) + { + return preciseEpochs(select, minEpoch, maxEpoch, selectNodeOwnership, Topology::selectIfExists); + } + + @Override + public Iterator<ActiveEpoch> iterator() + { + return Stream.of(epochs).iterator(); + } + + public Stream<ActiveEpoch> stream() + { + return Stream.of(epochs); + } + + public interface SelectFunction + { + Topology apply(Topology topology, Unseekables<?> select, Topologies.SelectNodeOwnership selectNodeOwnership); + } + + public Topologies preciseEpochs(Unseekables<?> select, long minEpoch, long maxEpoch, Topologies.SelectNodeOwnership selectNodeOwnership, SelectTopology selectTopology) + { + // TODO (expected): we should disambiguate minEpoch we can bump (i.e. historical epochs) and those we cannot (i.e. txnId.epoch()) + minEpoch = Math.max(minEpoch(), minEpoch); + maxEpoch = validateMax(maxEpoch); + ActiveEpoch max = get(maxEpoch); + + if (max == null) Review Comment: I think `max` can not be `null` anymore ########## accord-core/src/main/java/accord/topology/PendingEpochs.java: ########## @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package accord.topology; + +import accord.local.Node; +import accord.primitives.Ranges; +import accord.utils.Invariants; + +/** + * Thread safety is managed by a synchronized lock on this object, and extending classes can use the same lock when needed. + * <p> + * There is a special case when the last recieved/acknowleged epochs are needed, they can be accessed without a lock + * and provide a happens-before relationship (if lastReceived=42, epoch 42 exists and is set up) + */ +class PendingEpochs +{ + final TopologyManager manager; + private PendingEpoch[] epochs = new PendingEpoch[16]; + private int start, end; + + PendingEpochs(TopologyManager manager) + { + this.manager = manager; + } + + int size() + { + return end - start; + } + + boolean isEmpty() + { + return end == start; + } + + private void append(PendingEpoch append) + { + if (end == epochs.length) + { + int capacity = Math.max(epochs.length, size() * 2); + resize(capacity, 0); + } + epochs[end++] = append; + } + + private void prepend(PendingEpoch append) + { + if (start == 0) + { + int size = size(); + int capacity = Math.max(epochs.length, size * 2); + resize(capacity, Math.max(1, capacity / 4)); + } + epochs[--start] = append; + } + + private void resize(int newCapacity, int newStart) + { + int size = size(); + PendingEpoch[] nextEpochs = epochs.length == newCapacity ? epochs : new PendingEpoch[newCapacity]; Review Comment: When copying to "self", could you add filling the rest of array with nulls? This should not impact functionality but is easier on the eye when debugging. ########## accord-core/src/main/java/accord/topology/PendingEpochs.java: ########## @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package accord.topology; + +import accord.local.Node; +import accord.primitives.Ranges; +import accord.utils.Invariants; + +/** + * Thread safety is managed by a synchronized lock on this object, and extending classes can use the same lock when needed. + * <p> + * There is a special case when the last recieved/acknowleged epochs are needed, they can be accessed without a lock + * and provide a happens-before relationship (if lastReceived=42, epoch 42 exists and is set up) + */ +class PendingEpochs +{ + final TopologyManager manager; + private PendingEpoch[] epochs = new PendingEpoch[16]; + private int start, end; + + PendingEpochs(TopologyManager manager) + { + this.manager = manager; + } + + int size() + { + return end - start; + } + + boolean isEmpty() + { + return end == start; + } + + private void append(PendingEpoch append) + { + if (end == epochs.length) + { + int capacity = Math.max(epochs.length, size() * 2); + resize(capacity, 0); + } + epochs[end++] = append; + } + + private void prepend(PendingEpoch append) + { + if (start == 0) + { + int size = size(); + int capacity = Math.max(epochs.length, size * 2); + resize(capacity, Math.max(1, capacity / 4)); Review Comment: Any reason why not shift by one all the time? I think it is reasonable to assume that epochs will be mostly observed in order, and prepend will be only rarely exercised -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]

