This is an automated email from the ASF dual-hosted git repository. benedict pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git
The following commit(s) were added to refs/heads/trunk by this push: new bb070c6 Faster SimpleProgressLog and BurnTest (#16) bb070c6 is described below commit bb070c6b47616e47d6370c41438715634bcd8b48 Author: Benedict Elliott Smith <bened...@apache.org> AuthorDate: Wed Jan 4 16:00:30 2023 +0000 Faster SimpleProgressLog and BurnTest (#16) Improve the SimpleProgressLog to only perform work as necessary, and make some other minor improvements to performance to improve burn test throughput. Co-authored-by: Aleksey Yeschenko <alek...@apache.org> --- .../main/java/accord/coordinate/CheckShards.java | 4 +- .../java/accord/coordinate/ReadCoordinator.java | 8 +- .../coordinate/tracking/AbstractTracker.java | 2 +- .../accord/coordinate/tracking/ReadTracker.java | 11 +- .../java/accord/impl/InMemoryCommandStore.java | 4 +- .../main/java/accord/impl/SimpleProgressLog.java | 1072 ++++++++++---------- .../java/accord/impl/SizeOfIntersectionSorter.java | 2 +- .../src/main/java/accord/local/Command.java | 2 +- .../src/main/java/accord/local/CommandStores.java | 4 +- .../main/java/accord/messages/BeginRecovery.java | 12 +- .../src/main/java/accord/messages/Defer.java | 5 +- .../main/java/accord/messages/InformDurable.java | 2 + .../main/java/accord/primitives/AbstractKeys.java | 10 +- .../java/accord/primitives/AbstractRanges.java | 2 +- .../src/main/java/accord/primitives/Deps.java | 74 +- .../src/main/java/accord/primitives/Routables.java | 8 +- .../src/main/java/accord/primitives/Txn.java | 2 +- .../src/main/java/accord/primitives/Writes.java | 2 +- .../src/main/java/accord/topology/Topologies.java | 2 +- .../src/main/java/accord/topology/Topology.java | 126 ++- .../main/java/accord/topology/TopologyManager.java | 14 +- .../src/main/java/accord/utils/ArrayBuffers.java | 14 +- .../main/java/accord/utils/IndexedBiFunction.java | 4 +- .../main/java/accord/utils/IndexedConsumer.java | 4 +- .../src/main/java/accord/utils/IndexedFold.java | 3 +- .../accord/utils/IndexedFoldIntersectToLong.java | 4 +- .../main/java/accord/utils/IndexedFoldToLong.java | 4 +- .../main/java/accord/utils/IndexedFunction.java | 4 +- .../main/java/accord/utils/IndexedIntFunction.java | 4 +- .../main/java/accord/utils/IndexedPredicate.java | 4 +- .../java/accord/utils/IndexedRangeFoldToLong.java | 2 +- .../java/accord/utils/IndexedRangeTriConsumer.java | 6 + .../main/java/accord/utils/IndexedTriConsumer.java | 7 + .../main/java/accord/utils/IndexedTriFunction.java | 4 +- .../java/accord/utils/IntrusiveLinkedList.java | 109 ++ ...oldToLong.java => IntrusiveLinkedListNode.java} | 24 +- .../src/main/java/accord/utils/SortedArrays.java | 2 +- accord-core/src/test/java/accord/KeysTest.java | 27 +- .../src/test/java/accord/burn/BurnTest.java | 9 +- .../coordinate/tracking/TrackerReconciler.java | 2 +- .../src/test/java/accord/impl/basic/Cluster.java | 10 +- 41 files changed, 920 insertions(+), 695 deletions(-) diff --git a/accord-core/src/main/java/accord/coordinate/CheckShards.java b/accord-core/src/main/java/accord/coordinate/CheckShards.java index 965e6a9..dd91993 100644 --- a/accord-core/src/main/java/accord/coordinate/CheckShards.java +++ b/accord-core/src/main/java/accord/coordinate/CheckShards.java @@ -59,7 +59,9 @@ public abstract class CheckShards extends ReadCoordinator<CheckStatusReply> @Override protected Action process(Id from, CheckStatusReply reply) { - debug.put(from, reply); + if (debug != null) + debug.put(from, reply); + if (reply.isOk()) { CheckStatusOk ok = (CheckStatusOk) reply; diff --git a/accord-core/src/main/java/accord/coordinate/ReadCoordinator.java b/accord-core/src/main/java/accord/coordinate/ReadCoordinator.java index 4154b21..3c9ccbf 100644 --- a/accord-core/src/main/java/accord/coordinate/ReadCoordinator.java +++ b/accord-core/src/main/java/accord/coordinate/ReadCoordinator.java @@ -16,6 +16,8 @@ import static com.google.common.collect.Sets.newHashSetWithExpectedSize; abstract class ReadCoordinator<Reply extends accord.messages.Reply> extends ReadTracker implements Callback<Reply> { + private static final boolean DEBUG = false; + protected enum Action { /** @@ -53,7 +55,7 @@ abstract class ReadCoordinator<Reply extends accord.messages.Reply> extends Read final TxnId txnId; private boolean isDone; private Throwable failure; - Map<Id, Object> debug = new HashMap<>(); + Map<Id, Object> debug = DEBUG ? new HashMap<>() : null; ReadCoordinator(Node node, Topologies topologies, TxnId txnId) { @@ -69,7 +71,9 @@ abstract class ReadCoordinator<Reply extends accord.messages.Reply> extends Read @Override public void onSuccess(Id from, Reply reply) { - if (debug != null) debug.put(from, reply); + if (debug != null) + debug.put(from, reply); + if (isDone) return; diff --git a/accord-core/src/main/java/accord/coordinate/tracking/AbstractTracker.java b/accord-core/src/main/java/accord/coordinate/tracking/AbstractTracker.java index e8fadff..68a3201 100644 --- a/accord-core/src/main/java/accord/coordinate/tracking/AbstractTracker.java +++ b/accord-core/src/main/java/accord/coordinate/tracking/AbstractTracker.java @@ -138,7 +138,7 @@ public abstract class AbstractTracker<ST extends ShardTracker, P> } static <ST extends ShardTracker, P, T extends AbstractTracker<ST, P>> - ShardOutcomes apply(int trackerIndex, T tracker, BiFunction<? super ST, P, ? extends ShardOutcome<? super T>> function, P param) + ShardOutcomes apply(T tracker, BiFunction<? super ST, P, ? extends ShardOutcome<? super T>> function, P param, int trackerIndex) { return function.apply(tracker.trackers[trackerIndex], param).apply(tracker, trackerIndex); } diff --git a/accord-core/src/main/java/accord/coordinate/tracking/ReadTracker.java b/accord-core/src/main/java/accord/coordinate/tracking/ReadTracker.java index 4d4142e..e86f2d4 100644 --- a/accord-core/src/main/java/accord/coordinate/tracking/ReadTracker.java +++ b/accord-core/src/main/java/accord/coordinate/tracking/ReadTracker.java @@ -158,10 +158,9 @@ public class ReadTracker extends AbstractTracker<ReadTracker.ReadShardTracker, B } // TODO: abstract the candidate selection process so the implementation may prioritise based on distance/health etc - // TODO: faster Id sets and arrays using primitive ints when unambiguous - final Set<Id> inflight; - final List<Id> candidates; - private Set<Id> slow; + final Set<Id> inflight; // TODO: use Agrona's IntHashSet as soon as Node.Id switches from long to int + final List<Id> candidates; // TODO: use Agrona's IntArrayList as soon as Node.Id switches from long to int + private Set<Id> slow; // TODO: use Agrona's IntHashSet as soon as Node.Id switches from long to int protected int waitingOnData; public ReadTracker(Topologies topologies) @@ -265,9 +264,9 @@ public class ReadTracker extends AbstractTracker<ReadTracker.ReadShardTracker, B while (i >= 0) { Id candidate = candidates.get(i); - topologies().forEach((ti, topology) -> { + topologies().forEach((topology, ti) -> { int offset = topologyOffset(ti); - topology.forEachOn(candidate, (si, s) -> toRead.clear(offset + si)); + topology.forEachOn(candidate, (s, si) -> toRead.clear(offset + si)); }); if (toRead.isEmpty()) diff --git a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java index 8cd1de2..ce4562d 100644 --- a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java +++ b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java @@ -22,10 +22,10 @@ import accord.api.Agent; import accord.api.DataStore; import accord.api.Key; import accord.api.ProgressLog; -import accord.local.CommandStore; // java8 fails compilation if this is in correct position -import accord.local.SyncCommandStores.SyncCommandStore; // java8 fails compilation if this is in correct position import accord.impl.InMemoryCommandStore.SingleThread.AsyncState; import accord.impl.InMemoryCommandStore.Synchronized.SynchronizedState; +import accord.local.CommandStore; // java8 fails compilation if this is in correct position +import accord.local.SyncCommandStores.SyncCommandStore; // java8 fails compilation if this is in correct position import accord.local.Command; import accord.local.CommandStore.RangesForEpoch; import accord.local.CommandsForKey; diff --git a/accord-core/src/main/java/accord/impl/SimpleProgressLog.java b/accord-core/src/main/java/accord/impl/SimpleProgressLog.java index cfde871..f295235 100644 --- a/accord-core/src/main/java/accord/impl/SimpleProgressLog.java +++ b/accord-core/src/main/java/accord/impl/SimpleProgressLog.java @@ -29,38 +29,34 @@ import java.util.function.BiConsumer; import javax.annotation.Nullable; -import accord.coordinate.*; -import accord.local.*; -import accord.local.Status.Known; -import accord.primitives.*; -import accord.utils.Invariants; - import accord.api.ProgressLog; import accord.api.RoutingKey; +import accord.coordinate.*; +import accord.impl.SimpleProgressLog.Instance.State.Monitoring; +import accord.local.*; import accord.local.Node.Id; -import accord.impl.SimpleProgressLog.CoordinateState.CoordinateStatus; +import accord.local.Status.Known; import accord.messages.Callback; import accord.messages.InformDurable; import accord.messages.SimpleReply; +import accord.primitives.*; import accord.topology.Topologies; +import accord.utils.IntrusiveLinkedList; +import accord.utils.IntrusiveLinkedListNode; +import accord.utils.Invariants; import org.apache.cassandra.utils.concurrent.Future; import static accord.api.ProgressLog.ProgressShard.Home; import static accord.api.ProgressLog.ProgressShard.Unsure; import static accord.coordinate.InformHomeOfTxn.inform; -import static accord.impl.SimpleProgressLog.DisseminateState.DisseminateStatus.NotExecuted; -import static accord.impl.SimpleProgressLog.CoordinateState.CoordinateStatus.NotWitnessed; -import static accord.impl.SimpleProgressLog.CoordinateState.CoordinateStatus.ReadyToExecute; -import static accord.impl.SimpleProgressLog.CoordinateState.CoordinateStatus.Uncommitted; -import static accord.impl.SimpleProgressLog.NonHomeState.Safe; -import static accord.impl.SimpleProgressLog.NonHomeState.StillUnsafe; -import static accord.impl.SimpleProgressLog.NonHomeState.Unsafe; +import static accord.impl.SimpleProgressLog.CoordinateStatus.ReadyToExecute; +import static accord.impl.SimpleProgressLog.CoordinateStatus.Uncommitted; +import static accord.impl.SimpleProgressLog.DisseminateStatus.NotExecuted; import static accord.impl.SimpleProgressLog.Progress.Done; import static accord.impl.SimpleProgressLog.Progress.Expected; import static accord.impl.SimpleProgressLog.Progress.Investigating; import static accord.impl.SimpleProgressLog.Progress.NoProgress; import static accord.impl.SimpleProgressLog.Progress.NoneExpected; -import static accord.impl.SimpleProgressLog.Progress.advance; import static accord.local.PreLoadContext.contextFor; import static accord.local.Status.Durability.Durable; import static accord.local.Status.Known.Nothing; @@ -69,565 +65,576 @@ import static accord.local.Status.PreCommitted; import static accord.primitives.Route.isFullRoute; // TODO: consider propagating invalidations in the same way as we do applied -public class SimpleProgressLog implements Runnable, ProgressLog.Factory +public class SimpleProgressLog implements ProgressLog.Factory { - enum Progress - { - NoneExpected, Expected, NoProgress, Investigating, Done; + enum Progress { NoneExpected, Expected, NoProgress, Investigating, Done } - static Progress advance(Progress current) - { - switch (current) - { - default: throw new IllegalStateException(); - case NoneExpected: - case Investigating: - case Done: - return current; - case Expected: - case NoProgress: - return NoProgress; - } - } - } - - // exists only on home shard - static class CoordinateState + enum CoordinateStatus { - enum CoordinateStatus - { - NotWitnessed, Uncommitted, Committed, ReadyToExecute, Done; - boolean isAtMost(CoordinateStatus equalOrLessThan) - { - return compareTo(equalOrLessThan) <= 0; - } - boolean isAtLeast(CoordinateStatus equalOrGreaterThan) - { - return compareTo(equalOrGreaterThan) >= 0; - } - } - - CoordinateStatus status = NotWitnessed; - Progress progress = NoneExpected; - ProgressToken token = ProgressToken.NONE; - - Object debugInvestigating; + NotWitnessed, Uncommitted, Committed, ReadyToExecute, Done; - void ensureAtLeast(Command command, CoordinateStatus newStatus, Progress newProgress) + boolean isAtMostReadyToExecute() { - ensureAtLeast(newStatus, newProgress); - updateMax(command); + return compareTo(CoordinateStatus.ReadyToExecute) <= 0; } - void ensureAtLeast(CoordinateStatus newStatus, Progress newProgress) + boolean isAtLeastCommitted() { - if (newStatus.compareTo(status) > 0) - { - status = newStatus; - progress = newProgress; - } + return compareTo(CoordinateStatus.Committed) >= 0; } + } - void updateMax(Command command) - { - token = token.merge(new ProgressToken(command.durability(), command.status(), command.promised(), command.accepted())); - } + enum DisseminateStatus { NotExecuted, Durable, Done } - void updateMax(ProgressToken ok) - { - // TODO: perhaps set localProgress back to Waiting if Investigating and we update anything? - token = token.merge(ok); - } + final Node node; + final List<Instance> instances = new CopyOnWriteArrayList<>(); - void durableGlobal() - { - switch (status) - { - default: throw new IllegalStateException(); - case NotWitnessed: - case Uncommitted: - case Committed: - case ReadyToExecute: - status = CoordinateStatus.Done; - progress = NoneExpected; - case Done: - } - } + public SimpleProgressLog(Node node) + { + this.node = node; + } - void update(Node node, CommandStore commandStore, TxnId txnId, Command command) + class Instance extends IntrusiveLinkedList<Monitoring> implements ProgressLog, Runnable + { + class State { - if (progress != NoProgress) + abstract class Monitoring extends IntrusiveLinkedListNode { - progress = advance(progress); - return; - } + private Progress progress = NoneExpected; - progress = Investigating; - switch (status) - { - default: throw new AssertionError(); - case NotWitnessed: // can't make progress if we haven't witnessed it yet - case Committed: // can't make progress if we aren't yet ReadyToExecute - case Done: // shouldn't be trying to make progress, as we're done - throw new IllegalStateException(); - - case Uncommitted: - case ReadyToExecute: + void setProgress(Progress newProgress) { - if (status.isAtLeast(CoordinateStatus.Committed) && command.durability().isDurable()) + this.progress = newProgress; + switch (newProgress) { - // must also be committed, as at the time of writing we do not guarantee dissemination of Commit - // records to the home shard, so we only know the executeAt shards will have witnessed this - // if the home shard is at an earlier phase, it must run recovery - long epoch = command.executeAt().epoch; - node.withEpoch(epoch, () -> debugInvestigating = FetchData.fetch(PreApplied.minKnown, node, txnId, command.route(), epoch, (success, fail) -> { - // should have found enough information to apply the result, but in case we did not reset progress - if (progress == Investigating) - progress = Expected; - })); + default: throw new AssertionError(); + case NoneExpected: + case Done: + case Investigating: + remove(); + break; + case Expected: + case NoProgress: + if (isFree()) + addFirst(this); } - else + } + + boolean shouldRun() + { + switch (progress) { - RoutingKey homeKey = command.homeKey(); - node.withEpoch(txnId.epoch, () -> { - - Future<? extends Outcome> recover = node.maybeRecover(txnId, homeKey, command.route(), token); - recover.addCallback((success, fail) -> { - if (status.isAtMost(ReadyToExecute) && progress == Investigating) - { - progress = Expected; - if (fail != null) - return; - - ProgressToken token = success.asProgressToken(); - // TODO: avoid returning null (need to change semantics here in this case, though, as Recover doesn't return CheckStatusOk) - if (token.durability.isDurable()) - { - commandStore.execute(contextFor(txnId), safeStore -> { - Command cmd = safeStore.command(txnId); - cmd.setDurability(safeStore, token.durability, homeKey, null); - safeStore.progressLog().durable(txnId, cmd.maxUnseekables(), null); - }).addCallback(commandStore.agent()); - } - - updateMax(token); - } - }); - - debugInvestigating = recover; - }); + default: throw new AssertionError(); + case NoneExpected: + case Done: + case Investigating: + throw new IllegalStateException(); + case Expected: + if (isFree()) + throw new IllegalStateException(); + progress = NoProgress; + return false; + case NoProgress: + remove(); + return true; } } - } - } - - @Override - public String toString() - { - return "{" + status + ',' + progress + '}'; - } - } - // exists only on home shard - static class DisseminateState - { - enum DisseminateStatus { NotExecuted, Durable, Done } + abstract void run(Command command); - // TODO: thread safety (schedule on progress log executor) - class CoordinateAwareness implements Callback<SimpleReply> - { - @Override - public void onSuccess(Id from, SimpleReply reply) - { - notAwareOfDurability.remove(from); - maybeDone(); - } + Progress progress() + { + return progress; + } - @Override - public void onFailure(Id from, Throwable failure) - { + TxnId txnId() + { + return txnId; + } } - @Override - public void onCallbackFailure(Id from, Throwable failure) + // exists only on home shard + class CoordinateState extends Monitoring { - } - } + CoordinateStatus status = CoordinateStatus.NotWitnessed; + ProgressToken token = ProgressToken.NONE; - DisseminateStatus status = NotExecuted; - Progress progress = NoneExpected; - Set<Id> notAwareOfDurability; - Set<Id> notPersisted; + Object debugInvestigating; - List<Runnable> whenReady; - - CoordinateAwareness investigating; + void ensureAtLeast(Command command, CoordinateStatus newStatus, Progress newProgress) + { + ensureAtLeast(newStatus, newProgress); + updateMax(command); + } - private void whenReady(Node node, Command command, Runnable runnable) - { - if (notAwareOfDurability != null || maybeReady(node, command)) - { - runnable.run(); - } - else - { - if (whenReady == null) - whenReady = new ArrayList<>(); - whenReady.add(runnable); - } - } + void ensureAtLeast(CoordinateStatus newStatus, Progress newProgress) + { + if (newStatus.compareTo(status) > 0) + { + status = newStatus; + setProgress(newProgress); + } + } - private void whenReady(Runnable runnable) - { - if (notAwareOfDurability != null) - { - runnable.run(); - } - else - { - if (whenReady == null) - whenReady = new ArrayList<>(); - whenReady.add(runnable); - } - } + void updateMax(Command command) + { + token = token.merge(new ProgressToken(command.durability(), command.status(), command.promised(), command.accepted())); + } - // must know the epoch information, and have a valid Route - private boolean maybeReady(Node node, Command command) - { - if (!command.status().hasBeen(Status.PreCommitted)) - return false; + void updateMax(ProgressToken ok) + { + // TODO: perhaps set localProgress back to Waiting if Investigating and we update anything? + token = token.merge(ok); + } - if (!isFullRoute(command.route())) - return false; + void durableGlobal() + { + switch (status) + { + default: throw new IllegalStateException(); + case NotWitnessed: + case Uncommitted: + case Committed: + case ReadyToExecute: + status = CoordinateStatus.Done; + setProgress(NoneExpected); + case Done: + } + } - if (!node.topology().hasEpoch(command.executeAt().epoch)) - return false; + @Override + void run(Command command) + { + setProgress(Investigating); + switch (status) + { + default: throw new AssertionError(); + case NotWitnessed: // can't make progress if we haven't witnessed it yet + case Committed: // can't make progress if we aren't yet ReadyToExecute + case Done: // shouldn't be trying to make progress, as we're done + throw new IllegalStateException(); + + case Uncommitted: + case ReadyToExecute: + { + if (status.isAtLeastCommitted() && command.durability().isDurable()) + { + // must also be committed, as at the time of writing we do not guarantee dissemination of Commit + // records to the home shard, so we only know the executeAt shards will have witnessed this + // if the home shard is at an earlier phase, it must run recovery + long epoch = command.executeAt().epoch; + node.withEpoch(epoch, () -> debugInvestigating = FetchData.fetch(PreApplied.minKnown, node, txnId, command.route(), epoch, (success, fail) -> { + // should have found enough information to apply the result, but in case we did not reset progress + if (progress() == Investigating) + setProgress(Expected); + })); + } + else + { + RoutingKey homeKey = command.homeKey(); + node.withEpoch(txnId.epoch, () -> { + + Future<? extends Outcome> recover = node.maybeRecover(txnId, homeKey, command.route(), token); + recover.addCallback((success, fail) -> { + if (status.isAtMostReadyToExecute() && progress() == Investigating) + { + setProgress(Expected); + if (fail != null) + return; + + ProgressToken token = success.asProgressToken(); + // TODO: avoid returning null (need to change semantics here in this case, though, as Recover doesn't return CheckStatusOk) + if (token.durability.isDurable()) + { + commandStore.execute(contextFor(txnId), safeStore -> { + Command cmd = safeStore.command(txnId); + cmd.setDurability(safeStore, token.durability, homeKey, null); + safeStore.progressLog().durable(txnId, cmd.maxUnseekables(), null); + }).addCallback(commandStore.agent()); + } + + updateMax(token); + } + }); + + debugInvestigating = recover; + }); + } + } + } + } - Topologies topology = node.topology().preciseEpochs(command.route(), command.txnId().epoch, command.executeAt().epoch); - notAwareOfDurability = topology.copyOfNodes(); - notPersisted = topology.copyOfNodes(); - if (whenReady != null) - { - whenReady.forEach(Runnable::run); - whenReady = null; + @Override + public String toString() + { + return "{" + status + ',' + progress() + '}'; + } } - return true; - } - - private void maybeDone() - { - if (notAwareOfDurability.isEmpty()) + // exists only on home shard + class DisseminateState extends Monitoring { - status = DisseminateStatus.Done; - progress = Done; - } - } + class CoordinateAwareness implements Callback<SimpleReply> + { + @Override + public void onSuccess(Id from, SimpleReply reply) + { + // TODO: callbacks should be associated with a commandStore for processing to avoid this + commandStore.execute(PreLoadContext.empty(), ignore -> { + notAwareOfDurability.remove(from); + maybeDone(); + }); + } - void durableGlobal(Node node, Command command, @Nullable Set<Id> persistedOn) - { - if (status == DisseminateStatus.Done) - return; + @Override + public void onFailure(Id from, Throwable failure) + { + } - status = DisseminateStatus.Durable; - progress = Expected; - if (persistedOn == null) - return; + @Override + public void onCallbackFailure(Id from, Throwable failure) + { + } + } - whenReady(node, command, () -> { - notPersisted.removeAll(persistedOn); - notAwareOfDurability.removeAll(persistedOn); - maybeDone(); - }); - } + DisseminateStatus status = NotExecuted; + Set<Id> notAwareOfDurability; // TODO: use Agrona's IntHashSet as soon as Node.Id switches from long to int + Set<Id> notPersisted; // TODO: use Agrona's IntHashSet as soon as Node.Id switches from long to int - void durableLocal(Node node) - { - if (status == DisseminateStatus.Done) - return; + List<Runnable> whenReady; - status = DisseminateStatus.Durable; - progress = Expected; + CoordinateAwareness investigating; - whenReady(() -> { - notPersisted.remove(node.id()); - notAwareOfDurability.remove(node.id()); - maybeDone(); - }); - } + private void whenReady(Node node, Command command, Runnable runnable) + { + if (notAwareOfDurability != null || maybeReady(node, command)) + { + runnable.run(); + } + else + { + if (whenReady == null) + whenReady = new ArrayList<>(); + whenReady.add(runnable); + } + } - void update(Node node, TxnId txnId, Command command) - { - switch (status) - { - default: throw new IllegalStateException(); - case NotExecuted: - case Done: - return; - case Durable: - } + private void whenReady(Runnable runnable) + { + if (notAwareOfDurability != null) + { + runnable.run(); + } + else + { + if (whenReady == null) + whenReady = new ArrayList<>(); + whenReady.add(runnable); + } + } - if (notAwareOfDurability == null && !maybeReady(node, command)) - return; + // must know the epoch information, and have a valid Route + private boolean maybeReady(Node node, Command command) + { + if (!command.status().hasBeen(Status.PreCommitted)) + return false; - if (progress != NoProgress) - { - progress = advance(progress); - return; - } + if (!isFullRoute(command.route())) + return false; - progress = Investigating; - if (notAwareOfDurability.isEmpty()) - { - // TODO: also track actual durability - status = DisseminateStatus.Done; - progress = Done; - return; - } + if (!node.topology().hasEpoch(command.executeAt().epoch)) + return false; - FullRoute<?> route = Route.castToFullRoute(command.route()); - Timestamp executeAt = command.executeAt(); - investigating = new CoordinateAwareness(); - Topologies topologies = node.topology().preciseEpochs(route, txnId.epoch, executeAt.epoch); - node.send(notAwareOfDurability, to -> new InformDurable(to, topologies, route, txnId, executeAt, Durable), investigating); - } + Topologies topology = node.topology().preciseEpochs(command.route(), command.txnId().epoch, command.executeAt().epoch); + notAwareOfDurability = topology.copyOfNodes(); + notPersisted = topology.copyOfNodes(); + if (whenReady != null) + { + whenReady.forEach(Runnable::run); + whenReady = null; + } - @Override - public String toString() - { - return "{" + status + ',' + progress + '}'; - } - } + return true; + } - static class BlockingState - { - Known blockedUntil = Nothing; - Progress progress = NoneExpected; + private void maybeDone() + { + if (notAwareOfDurability.isEmpty()) + { + status = DisseminateStatus.Done; + setProgress(Done); + } + } - Unseekables<?, ?> blockedOn; + void durableGlobal(Node node, Command command, @Nullable Set<Id> persistedOn) + { + if (status == DisseminateStatus.Done) + return; + + status = DisseminateStatus.Durable; + setProgress(Expected); + if (persistedOn == null) + return; + + whenReady(node, command, () -> { + notPersisted.removeAll(persistedOn); + notAwareOfDurability.removeAll(persistedOn); + maybeDone(); + }); + } - Object debugInvestigating; + void durableLocal(Node node) + { + if (status == DisseminateStatus.Done) + return; - void recordBlocking(Known blockedUntil, Unseekables<?, ?> blockedOn) - { - Invariants.checkState(!blockedOn.isEmpty()); - if (this.blockedOn == null) this.blockedOn = blockedOn; - else this.blockedOn = Unseekables.merge(this.blockedOn, (Unseekables)blockedOn); - if (!blockedUntil.isSatisfiedBy(this.blockedUntil)) - { - this.blockedUntil = this.blockedUntil.merge(blockedUntil); - progress = Expected; - } - } + status = DisseminateStatus.Durable; + setProgress(Expected); - void record(Known known) - { - if (blockedUntil.isSatisfiedBy(known)) - progress = NoneExpected; - } + whenReady(() -> { + notPersisted.remove(node.id()); + notAwareOfDurability.remove(node.id()); + maybeDone(); + }); + } - void update(Node node, TxnId txnId, Command command) - { - if (progress != NoProgress) - { - progress = advance(progress); - return; - } + @Override + void run(Command command) + { + switch (status) + { + default: throw new IllegalStateException(); + case NotExecuted: + case Done: + return; + case Durable: + } - if (command.has(blockedUntil)) - { - progress = NoneExpected; - return; - } + if (notAwareOfDurability == null && !maybeReady(node, command)) + return; - progress = Investigating; - // first make sure we have enough information to obtain the command locally - Timestamp executeAt = command.hasBeen(PreCommitted) ? command.executeAt() : null; - long srcEpoch = (executeAt != null ? executeAt : txnId).epoch; - // TODO: compute fromEpoch, the epoch we already have this txn replicated until - long toEpoch = Math.max(srcEpoch, node.topology().epoch()); - Unseekables<?, ?> someKeys = unseekables(command); + setProgress(Investigating); + if (notAwareOfDurability.isEmpty()) + { + // TODO: also track actual durability + status = DisseminateStatus.Done; + setProgress(Done); + return; + } - BiConsumer<Known, Throwable> callback = (success, fail) -> { - if (progress != Investigating) - return; + FullRoute<?> route = Route.castToFullRoute(command.route()); + Timestamp executeAt = command.executeAt(); + investigating = new CoordinateAwareness(); + Topologies topologies = node.topology().preciseEpochs(route, txnId.epoch, executeAt.epoch); + node.send(notAwareOfDurability, to -> new InformDurable(to, topologies, route, txnId, executeAt, Durable), investigating); + } - progress = Expected; - if (fail == null) + @Override + public String toString() { - if (!success.isDefinitionKnown()) invalidate(node, txnId, someKeys); - else record(success); + return "{" + status + ',' + progress() + '}'; } - }; + } - node.withEpoch(toEpoch, () -> { - debugInvestigating = FetchData.fetch(blockedUntil, node, txnId, someKeys, executeAt, toEpoch, callback); - }); - } + class BlockingState extends Monitoring + { + Known blockedUntil = Nothing; - private Unseekables<?, ?> unseekables(Command command) - { - return Unseekables.merge((Route)command.route(), blockedOn); - } + Unseekables<?, ?> blockedOn; - private void invalidate(Node node, TxnId txnId, Unseekables<?, ?> someKeys) - { - progress = Investigating; - // TODO (RangeTxns): This should be a Routable, or we should guarantee it is safe to operate on any key in the range - RoutingKey someKey = Route.isRoute(someKeys) ? (Route.castToRoute(someKeys)).homeKey() : someKeys.get(0).someIntersectingRoutingKey(); - someKeys = someKeys.with(someKey); - debugInvestigating = Invalidate.invalidate(node, txnId, someKeys, (success, fail) -> { - if (progress != Investigating) - return; - - progress = Expected; - if (fail == null && success.asProgressToken().durability.isDurable()) - progress = Done; - }); - } + Object debugInvestigating; - public String toString() - { - return progress.toString(); - } - } + void recordBlocking(Known blockedUntil, Unseekables<?, ?> blockedOn) + { + Invariants.checkState(!blockedOn.isEmpty()); + if (this.blockedOn == null) this.blockedOn = blockedOn; + else this.blockedOn = Unseekables.merge(this.blockedOn, (Unseekables)blockedOn); + if (!blockedUntil.isSatisfiedBy(this.blockedUntil)) + { + this.blockedUntil = this.blockedUntil.merge(blockedUntil); + setProgress(Expected); + } + } - enum NonHomeState - { - Unsafe, StillUnsafe, Investigating, Safe - } + void record(Known known) + { + if (blockedUntil.isSatisfiedBy(known)) + setProgress(NoneExpected); + } - static class State - { - final TxnId txnId; - final CommandStore commandStore; + @Override + void run(Command command) + { + if (command.has(blockedUntil)) + { + setProgress(NoneExpected); + return; + } - CoordinateState coordinateState; - DisseminateState disseminateState; - NonHomeState nonHomeState; - BlockingState blockingState; + setProgress(Investigating); + // first make sure we have enough information to obtain the command locally + Timestamp executeAt = command.hasBeen(PreCommitted) ? command.executeAt() : null; + long srcEpoch = (executeAt != null ? executeAt : txnId).epoch; + // TODO: compute fromEpoch, the epoch we already have this txn replicated until + long toEpoch = Math.max(srcEpoch, node.topology().epoch()); + Unseekables<?, ?> someKeys = unseekables(command); - State(TxnId txnId, CommandStore commandStore) - { - this.txnId = txnId; - this.commandStore = commandStore; - } + BiConsumer<Known, Throwable> callback = (success, fail) -> { + if (progress() != Investigating) + return; - void recordBlocking(TxnId txnId, Known waitingFor, Unseekables<?, ?> unseekables) - { - Invariants.checkArgument(txnId.equals(this.txnId)); - if (blockingState == null) - blockingState = new BlockingState(); - blockingState.recordBlocking(waitingFor, unseekables); - } + setProgress(Expected); + if (fail == null) + { + if (!success.isDefinitionKnown()) invalidate(node, txnId, someKeys); + else record(success); + } + }; - void ensureAtLeast(NonHomeState ensureAtLeast) - { - if (nonHomeState == null || nonHomeState.compareTo(ensureAtLeast) < 0) - nonHomeState = ensureAtLeast; - } + node.withEpoch(toEpoch, () -> { + debugInvestigating = FetchData.fetch(blockedUntil, node, txnId, someKeys, executeAt, toEpoch, callback); + }); + } - CoordinateState local() - { - if (coordinateState == null) - coordinateState = new CoordinateState(); - return coordinateState; - } + private Unseekables<?, ?> unseekables(Command command) + { + return Unseekables.merge((Route)command.route(), blockedOn); + } - DisseminateState global() - { - if (disseminateState == null) - disseminateState = new DisseminateState(); - return disseminateState; - } + private void invalidate(Node node, TxnId txnId, Unseekables<?, ?> someKeys) + { + setProgress(Investigating); + // TODO (RangeTxns): This should be a Routable, or we should guarantee it is safe to operate on any key in the range + RoutingKey someKey = Route.isRoute(someKeys) ? (Route.castToRoute(someKeys)).homeKey() : someKeys.get(0).someIntersectingRoutingKey(); + someKeys = someKeys.with(someKey); + debugInvestigating = Invalidate.invalidate(node, txnId, someKeys, (success, fail) -> { + if (progress() != Investigating) + return; - void ensureAtLeast(Command command, CoordinateStatus newStatus, Progress newProgress) - { - local().ensureAtLeast(command, newStatus, newProgress); - } + setProgress(Expected); + if (fail == null && success.asProgressToken().durability.isDurable()) + setProgress(Done); + }); + } - void ensureAtLeast(TxnId txnId, RoutingKey homeKey, CoordinateStatus newStatus, Progress newProgress) - { - local().ensureAtLeast(newStatus, newProgress); - } + @Override + public String toString() + { + return progress().toString(); + } + } - void updateNonHome(Node node, Command command) - { - switch (nonHomeState) + class NonHomeState extends Monitoring { - default: throw new IllegalStateException(); - case Safe: - case Investigating: - break; - case Unsafe: - nonHomeState = StillUnsafe; - break; - case StillUnsafe: + NonHomeState() + { + setProgress(Expected); + } + + void setSafe() + { + setProgress(Done); + } + + @Override + void run(Command command) + { // make sure a quorum of the home shard is aware of the transaction, so we can rely on it to ensure progress Future<Void> inform = inform(node, txnId, command.homeKey()); inform.addCallback((success, fail) -> { - if (nonHomeState == Safe) + if (progress() == Done) return; - if (fail != null) nonHomeState = Unsafe; - else nonHomeState = Safe; + setProgress(fail != null ? Expected : Done); }); - break; + } + + @Override + public String toString() + { + return progress() == Done ? "Safe" : "Unsafe"; + } } - } - void update(Node node) - { - PreLoadContext context = contextFor(txnId); - commandStore.execute(context, safeStore -> { - Command command = safeStore.command(txnId); - if (blockingState != null) - blockingState.update(node, txnId, command); + final TxnId txnId; - if (coordinateState != null) - coordinateState.update(node, safeStore.commandStore(), txnId, command); + CoordinateState coordinateState; + DisseminateState disseminateState; + NonHomeState nonHomeState; + BlockingState blockingState; - if (disseminateState != null) - disseminateState.update(node, txnId, command); + State(TxnId txnId) + { + this.txnId = txnId; + } - if (nonHomeState != null) - updateNonHome(node, command); - }).addCallback(commandStore.agent()); - } + void recordBlocking(TxnId txnId, Known waitingFor, Unseekables<?, ?> routables) + { + Invariants.checkArgument(txnId.equals(this.txnId)); + if (blockingState == null) + blockingState = new BlockingState(); + blockingState.recordBlocking(waitingFor, routables); + } - @Override - public String toString() - { - return coordinateState != null ? coordinateState.toString() - : nonHomeState != null - ? nonHomeState.toString() - : blockingState.toString(); - } - } + CoordinateState local() + { + if (coordinateState == null) + coordinateState = new CoordinateState(); + return coordinateState; + } - final Node node; - final List<Instance> instances = new CopyOnWriteArrayList<>(); + DisseminateState global() + { + if (disseminateState == null) + disseminateState = new DisseminateState(); + return disseminateState; + } - public SimpleProgressLog(Node node) - { - this.node = node; - node.scheduler().recurring(this, 200L, TimeUnit.MILLISECONDS); - } + void ensureAtLeast(Command command, CoordinateStatus newStatus, Progress newProgress) + { + local().ensureAtLeast(command, newStatus, newProgress); + } + + void ensureAtLeast(CoordinateStatus newStatus, Progress newProgress) + { + local().ensureAtLeast(newStatus, newProgress); + } + + void touchNonHomeUnsafe() + { + if (nonHomeState == null) + nonHomeState = new NonHomeState(); + } + + void setSafe() + { + if (nonHomeState == null) + nonHomeState = new NonHomeState(); + nonHomeState.setSafe(); + } + + @Override + public String toString() + { + return coordinateState != null ? coordinateState.toString() + : nonHomeState != null + ? nonHomeState.toString() + : blockingState.toString(); + } + } - class Instance implements ProgressLog - { final CommandStore commandStore; final Map<TxnId, State> stateMap = new HashMap<>(); + boolean isScheduled; Instance(CommandStore commandStore) { this.commandStore = commandStore; - instances.add(this); } State ensure(TxnId txnId) { - return stateMap.computeIfAbsent(txnId, id -> new State(id, commandStore)); + return stateMap.computeIfAbsent(txnId, State::new); } State ensure(TxnId txnId, State state) @@ -635,23 +642,21 @@ public class SimpleProgressLog implements Runnable, ProgressLog.Factory return state != null ? state : ensure(txnId); } - @Override - public void unwitnessed(TxnId txnId, RoutingKey homeKey, ProgressShard shard) - { - if (shard.isHome()) - ensure(txnId).ensureAtLeast(txnId, homeKey, Uncommitted, Expected); - } - - @Override - public void preaccepted(Command command, ProgressShard shard) + private void ensureSafeOrAtLeast(Command command, ProgressShard shard, CoordinateStatus newStatus, Progress newProgress) { Invariants.checkState(shard != Unsure); + State state = null; + assert newStatus.isAtMostReadyToExecute(); + if (newStatus.isAtLeastCommitted()) + state = recordCommit(command.txnId()); + if (shard.isProgress()) { - State state = ensure(command.txnId()); - if (shard.isHome()) state.ensureAtLeast(command, Uncommitted, Expected); - else state.ensureAtLeast(NonHomeState.Unsafe); + state = ensure(command.txnId(), state); + + if (shard.isHome()) state.ensureAtLeast(command, newStatus, newProgress); + else ensure(command.txnId()).setSafe(); } } @@ -671,21 +676,23 @@ public class SimpleProgressLog implements Runnable, ProgressLog.Factory return state; } - private void ensureSafeOrAtLeast(Command command, ProgressShard shard, CoordinateStatus newStatus, Progress newProgress) + @Override + public void unwitnessed(TxnId txnId, RoutingKey homeKey, ProgressShard shard) { - Invariants.checkState(shard != Unsure); + if (shard.isHome()) + ensure(txnId).ensureAtLeast(Uncommitted, Expected); + } - State state = null; - assert newStatus.isAtMost(ReadyToExecute); - if (newStatus.isAtLeast(CoordinateStatus.Committed)) - state = recordCommit(command.txnId()); + @Override + public void preaccepted(Command command, ProgressShard shard) + { + Invariants.checkState(shard != Unsure); if (shard.isProgress()) { - state = ensure(command.txnId(), state); - - if (shard.isHome()) state.ensureAtLeast(command, newStatus, newProgress); - else ensure(command.txnId()).ensureAtLeast(Safe); + State state = ensure(command.txnId()); + if (shard.isHome()) state.ensureAtLeast(command, Uncommitted, Expected); + else state.touchNonHomeUnsafe(); } } @@ -704,7 +711,7 @@ public class SimpleProgressLog implements Runnable, ProgressLog.Factory @Override public void readyToExecute(Command command, ProgressShard shard) { - ensureSafeOrAtLeast(command, shard, CoordinateStatus.ReadyToExecute, Expected); + ensureSafeOrAtLeast(command, shard, ReadyToExecute, Expected); } @Override @@ -712,7 +719,7 @@ public class SimpleProgressLog implements Runnable, ProgressLog.Factory { recordApply(command.txnId()); // this is the home shard's state ONLY, so we don't know it is fully durable locally - ensureSafeOrAtLeast(command, shard, CoordinateStatus.ReadyToExecute, Expected); + ensureSafeOrAtLeast(command, shard, ReadyToExecute, Expected); } @Override @@ -728,7 +735,7 @@ public class SimpleProgressLog implements Runnable, ProgressLog.Factory state = ensure(command.txnId(), state); if (shard.isHome()) state.ensureAtLeast(command, CoordinateStatus.Done, Done); - else ensure(command.txnId()).ensureAtLeast(Safe); + else ensure(command.txnId()).setSafe(); } } @@ -753,43 +760,78 @@ public class SimpleProgressLog implements Runnable, ProgressLog.Factory public void durable(TxnId txnId, Unseekables<?, ?> unseekables, ProgressShard shard) { State state = ensure(txnId); - // TODO: we can probably simplify things by requiring (empty) Apply messages to be sent also to the coordinating topology + // TODO (progress consider-prerelease): we can probably simplify things by requiring (empty) Apply messages to be sent also to the coordinating topology state.recordBlocking(txnId, PreApplied.minKnown, unseekables); } + @Override public void waiting(TxnId blockedBy, Known blockedUntil, Unseekables<?, ?> blockedOn) { - // TODO (soon): forward to progress shard for processing (if known) + // TODO (perf+ consider-prerelease): consider triggering a preemption of existing coordinator (if any) in some circumstances; + // today, an LWT can pre-empt more efficiently (i.e. instantly) a failed operation whereas Accord will + // wait for some progress interval before taking over; there is probably some middle ground where we trigger + // faster preemption once we're blocked on a transaction, while still offering some amount of time to complete. + // TODO (soon): forward to local progress shard for processing (if known) // TODO (soon): if we are co-located with the home shard, don't need to do anything unless we're in a // later topology that wasn't covered by its coordination ensure(blockedBy).recordBlocking(blockedBy, blockedUntil, blockedOn); } - } - @Override - public void run() - { - for (Instance instance : instances) + @Override + public void addFirst(Monitoring add) { - // TODO: we want to be able to poll others about pending dependencies to check forward progress, - // as we don't know all dependencies locally (or perhaps any, at execution time) so we may - // begin expecting forward progress too early - new ArrayList<>(instance.stateMap.values()).forEach(state -> { - try - { - state.update(node); - } - catch (Throwable t) + super.addFirst(add); + ensureScheduled(); + } + + @Override + public void addLast(Monitoring add) + { + throw new UnsupportedOperationException(); + } + + void ensureScheduled() + { + if (isScheduled) + return; + + isScheduled = true; + node.scheduler().once(() -> commandStore.execute(PreLoadContext.empty(), ignore -> run()), 200L, TimeUnit.MILLISECONDS); + } + + @Override + public void run() + { + isScheduled = false; + try + { + for (Monitoring run : this) { - node.agent().onUncaughtException(t); + if (run.shouldRun()) + { + commandStore.execute(contextFor(run.txnId()), safeStore -> { + run.run(safeStore.command(run.txnId())); + }); + } } - }); + } + catch (Throwable t) + { + t.printStackTrace(); + } + finally + { + if (!isEmpty()) + ensureScheduled(); + } } } @Override - public ProgressLog create(CommandStore commandStore) + public Instance create(CommandStore commandStore) { - return new Instance(commandStore); + Instance instance = new Instance(commandStore); + instances.add(instance); + return instance; } } diff --git a/accord-core/src/main/java/accord/impl/SizeOfIntersectionSorter.java b/accord-core/src/main/java/accord/impl/SizeOfIntersectionSorter.java index 39bac13..2d7f6b8 100644 --- a/accord-core/src/main/java/accord/impl/SizeOfIntersectionSorter.java +++ b/accord-core/src/main/java/accord/impl/SizeOfIntersectionSorter.java @@ -46,6 +46,6 @@ public class SizeOfIntersectionSorter implements TopologySorter private static int count(Node.Id node, ShardSelection shards, int offset, Topology topology) { - return topology.foldlIntOn(node, (i, shard, v) -> shard.get(i) ? v + 1 : v, shards, offset, 0, 0); + return topology.foldlIntOn(node, (shard, v, index) -> shard.get(index) ? v + 1 : v, shards, offset, 0, 0); } } diff --git a/accord-core/src/main/java/accord/local/Command.java b/accord-core/src/main/java/accord/local/Command.java index cf483a5..7b8cacb 100644 --- a/accord-core/src/main/java/accord/local/Command.java +++ b/accord-core/src/main/java/accord/local/Command.java @@ -1021,7 +1021,7 @@ public abstract class Command implements CommandListener, BiConsumer<SafeCommand if (partialTxn() != null) { partialTxn = partialTxn.slice(allRanges, shard.isHome()); - Routables.foldlMissing((Seekables)partialTxn.keys(), partialTxn().keys(), (i, keyOrRange, p, v) -> { + Routables.foldlMissing((Seekables)partialTxn.keys(), partialTxn().keys(), (keyOrRange, p, v, i) -> { // TODO: duplicate application of ranges safeStore.forEach(keyOrRange, allRanges, forKey -> forKey.register(this)); return v; diff --git a/accord-core/src/main/java/accord/local/CommandStores.java b/accord-core/src/main/java/accord/local/CommandStores.java index cecacba..87d39b2 100644 --- a/accord-core/src/main/java/accord/local/CommandStores.java +++ b/accord-core/src/main/java/accord/local/CommandStores.java @@ -191,7 +191,7 @@ public abstract class CommandStores<S extends CommandStore> for (int i = Math.max(0, indexForEpoch(minEpoch)), maxi = indexForEpoch(maxEpoch); i <= maxi ; ++i) { // include every shard if we match a range - accumulate = Routables.foldl((Ranges)keysOrRanges, ranges[i], (idx, k, p, a) -> p, terminalValue, accumulate, terminalValue); + accumulate = Routables.foldl((Ranges)keysOrRanges, ranges[i], (k, p, a, idx) -> p, terminalValue, accumulate, terminalValue); } return accumulate; } @@ -208,7 +208,7 @@ public abstract class CommandStores<S extends CommandStore> return Integer.toUnsignedLong(key.routingHash()) % numShards; } - private static long addKeyIndex(int i, RoutableKey key, long numShards, long accumulate) + private static long addKeyIndex(RoutableKey key, long numShards, long accumulate, int i) { return accumulate | (1L << keyIndex(key, numShards)); } diff --git a/accord-core/src/main/java/accord/messages/BeginRecovery.java b/accord-core/src/main/java/accord/messages/BeginRecovery.java index 19cb9f1..8421c63 100644 --- a/accord-core/src/main/java/accord/messages/BeginRecovery.java +++ b/accord-core/src/main/java/accord/messages/BeginRecovery.java @@ -301,7 +301,7 @@ public class BeginRecovery extends TxnRequest<BeginRecovery.RecoverReply> { commandStore.forEach(keys, ranges, forKey -> { // accepted txns with an earlier txnid that do not have our txnid as a dependency - /** + /* * The idea here is to discover those transactions that have been Accepted without witnessing us * and whom may not have adopted us as dependencies as responses to the Accept. Once we have * reached a quorum for recovery any re-proposals will discover us. So we do not need to look @@ -327,13 +327,13 @@ public class BeginRecovery extends TxnRequest<BeginRecovery.RecoverReply> try (Deps.OrderedBuilder builder = Deps.orderedBuilder(true)) { commandStore.forEach(keys, ranges, forKey -> { - /** + /* * The idea here is to discover those transactions that have been Committed and DID witness us * so that we can remove these from the set of acceptedStartedBeforeAndDidNotWitness * on other nodes, to minimise the number of transactions we try to wait for on recovery */ builder.nextKey(forKey.key()); - forKey.committedById().before(txnId, RorWs, WITH, txnId, ANY_STATUS, null) + forKey.committedById().before(txnId, RorWs, WITH, txnId, HAS_BEEN, Committed) .forEach(builder::add); }); return builder.build(); @@ -342,7 +342,7 @@ public class BeginRecovery extends TxnRequest<BeginRecovery.RecoverReply> private static Stream<? extends TxnIdWithExecuteAt> acceptedStartedAfterWithoutWitnessing(SafeCommandStore commandStore, TxnId startedAfter, Ranges ranges, Seekables<?, ?> keys) { - /** + /* * The idea here is to discover those transactions that were started after us and have been Accepted * and did not witness us as part of their pre-accept round, as this means that we CANNOT have taken * the fast path. This is central to safe recovery, as if every transaction that executes later has @@ -356,14 +356,14 @@ public class BeginRecovery extends TxnRequest<BeginRecovery.RecoverReply> private static Stream<TxnId> committedExecutesAfterWithoutWitnessing(SafeCommandStore commandStore, TxnId startedAfter, Ranges ranges, Seekables<?, ?> keys) { - /** + /* * The idea here is to discover those transactions that have been decided to execute after us * and did not witness us as part of their pre-accept or accept round, as this means that we CANNOT have * taken the fast path. This is central to safe recovery, as if every transaction that executes later has * witnessed us we are safe to propose the pre-accept timestamp regardless, whereas if any transaction * has not witnessed us we can safely invalidate it. */ - return commandStore.mapReduce(keys, ranges, forKey -> forKey.committedByExecuteAt().after(startedAfter, RorWs, WITHOUT, startedAfter, ANY_STATUS, null), + return commandStore.mapReduce(keys, ranges, forKey -> forKey.committedByExecuteAt().after(startedAfter, RorWs, WITHOUT, startedAfter, HAS_BEEN, Committed), Stream::concat, Stream.empty()); } } diff --git a/accord-core/src/main/java/accord/messages/Defer.java b/accord-core/src/main/java/accord/messages/Defer.java index c528c11..07a592b 100644 --- a/accord-core/src/main/java/accord/messages/Defer.java +++ b/accord-core/src/main/java/accord/messages/Defer.java @@ -6,8 +6,8 @@ import java.util.function.Function; import accord.local.*; import accord.local.Status.Known; import accord.primitives.TxnId; +import accord.utils.Invariants; -import static accord.local.PreLoadContext.contextFor; import static accord.messages.Defer.Ready.Expired; import static accord.messages.Defer.Ready.No; import static accord.messages.Defer.Ready.Yes; @@ -73,7 +73,8 @@ class Defer implements CommandListener @Override public PreLoadContext listenerPreLoadContext(TxnId caller) { - return contextFor(caller); + Invariants.checkState(caller.equals(request.txnId)); + return request; } } diff --git a/accord-core/src/main/java/accord/messages/InformDurable.java b/accord-core/src/main/java/accord/messages/InformDurable.java index 66da6aa..3631003 100644 --- a/accord-core/src/main/java/accord/messages/InformDurable.java +++ b/accord-core/src/main/java/accord/messages/InformDurable.java @@ -52,6 +52,8 @@ public class InformDurable extends TxnRequest<Reply> implements PreLoadContext { // we need to pick a progress log, but this node might not have participated in the coordination epoch // in this rare circumstance we simply pick a key to select some progress log to coordinate this + // TODO (now): We might not replicate either txnId.epoch OR executeAt.epoch, but some inbetween. + // Do we need to receive this message in that case? If so, we need to account for this when selecting a progress key at = executeAt; progressKey = node.selectProgressKey(executeAt.epoch, scope, scope.homeKey()); shard = Adhoc; diff --git a/accord-core/src/main/java/accord/primitives/AbstractKeys.java b/accord-core/src/main/java/accord/primitives/AbstractKeys.java index 47725fe..a089823 100644 --- a/accord-core/src/main/java/accord/primitives/AbstractKeys.java +++ b/accord-core/src/main/java/accord/primitives/AbstractKeys.java @@ -68,7 +68,7 @@ public abstract class AbstractKeys<K extends RoutableKey, KS extends Routables<K public final boolean containsAll(Routables<?, ?> keysOrRanges) { - return keysOrRanges.size() == Routables.foldl(keysOrRanges, this, (i, k, p, v) -> v + 1, 0, 0, 0); + return keysOrRanges.size() == Routables.foldl(keysOrRanges, this, (k, p, v, i) -> v + 1, 0, 0, 0); } @Override @@ -152,12 +152,12 @@ public abstract class AbstractKeys<K extends RoutableKey, KS extends Routables<K public boolean any(Ranges ranges, Predicate<? super K> predicate) { - return 1 == foldl(ranges, (i1, key, i2, i3) -> predicate.test(key) ? 1 : 0, 0, 0, 1); + return 1 == foldl(ranges, (key, p2, prev, index) -> predicate.test(key) ? 1 : 0, 0, 0, 1); } public boolean any(Predicate<? super K> predicate) { - return 1 == foldl((i, key, p, v) -> predicate.test(key) ? 1 : 0, 0, 0, 1); + return 1 == foldl((key, p2, prev, index) -> predicate.test(key) ? 1 : 0, 0, 0, 1); } public boolean none(Predicate<? super K> predicate) @@ -182,7 +182,7 @@ public abstract class AbstractKeys<K extends RoutableKey, KS extends Routables<K @Inline public final void forEach(Ranges rs, Consumer<? super K> forEach) { - Routables.foldl(this, rs, (i, k, consumer) -> { consumer.accept(k); return consumer; }, forEach); + Routables.foldl(this, rs, (k, consumer, i) -> { consumer.accept(k); return consumer; }, forEach); } @Inline @@ -196,7 +196,7 @@ public abstract class AbstractKeys<K extends RoutableKey, KS extends Routables<K { for (int i = 0; i < keys.length; i++) { - initialValue = fold.apply(i, keys[i], param, initialValue); + initialValue = fold.apply(keys[i], param, initialValue, i); if (terminalValue == initialValue) return initialValue; } diff --git a/accord-core/src/main/java/accord/primitives/AbstractRanges.java b/accord-core/src/main/java/accord/primitives/AbstractRanges.java index 7b5a24c..a377221 100644 --- a/accord-core/src/main/java/accord/primitives/AbstractRanges.java +++ b/accord-core/src/main/java/accord/primitives/AbstractRanges.java @@ -77,7 +77,7 @@ public abstract class AbstractRanges<RS extends Routables<Range, ?>> implements { if (this.isEmpty()) return that.isEmpty(); if (that.isEmpty()) return true; - return Routables.rangeFoldl(that, this, (from, to, p, v) -> v + (to - from), 0, 0, 0) == that.size(); + return Routables.rangeFoldl(that, this, (p, v, from, to) -> v + (to - from), 0, 0, 0) == that.size(); } /** diff --git a/accord-core/src/main/java/accord/primitives/Deps.java b/accord-core/src/main/java/accord/primitives/Deps.java index fef69b9..6f9cc2c 100644 --- a/accord-core/src/main/java/accord/primitives/Deps.java +++ b/accord-core/src/main/java/accord/primitives/Deps.java @@ -35,7 +35,6 @@ import accord.utils.Invariants; import static accord.utils.ArrayBuffers.*; import static accord.utils.SortedArrays.*; import static accord.utils.SortedArrays.Search.FAST; -import static accord.utils.Utils.listOf; /** * A collection of dependencies for a transaction, organised by the key the dependency is adopted via. @@ -153,6 +152,7 @@ public class Deps implements Iterable<Map.Entry<Key, TxnId>> if (totalCount != keyOffset && !hasOrderedTxnId) { + // TODO: this allocates a significant amount of memory: would be preferable to be able to sort using a pre-defined scratch buffer Arrays.sort(keyToTxnId, keyOffset, totalCount); for (int i = keyOffset + 1 ; i < totalCount ; ++i) { @@ -218,7 +218,7 @@ public class Deps implements Iterable<Map.Entry<Key, TxnId>> result[k] = keyCount + keyLimits[k]; int from = k == 0 ? 0 : keyLimits[k - 1]; int to = keyLimits[k]; - offset = (int)SortedArrays.foldlIntersection(txnIds, 0, txnIdCount, keyToTxnId, from, to, (li, ri, key, p, v) -> { + offset = (int)SortedArrays.foldlIntersection(txnIds, 0, txnIdCount, keyToTxnId, from, to, (key, p, v, li, ri) -> { result[(int)v] = li; return v + 1; }, keyCount, offset, -1); @@ -885,63 +885,63 @@ public class Deps implements Iterable<Map.Entry<Key, TxnId>> return this; IntBuffers cache = ArrayBuffers.cachedInts(); - int[] remapTxnIds = cache.getInts(txnIds.length); - int[] keyToTxnId = null; + TxnId[] oldTxnIds = txnIds; + int[] oldKeyToTxnId = keyToTxnId; + int[] remapTxnIds = cache.getInts(oldTxnIds.length); + int[] newKeyToTxnId = null; + TxnId[] newTxnIds; int o = 0; try { - TxnId[] txnIds; { - int count = 0; - for (int i = 0 ; i < this.txnIds.length ; ++i) - { - if (remove.test(this.txnIds[i])) remapTxnIds[i] = -1; - else remapTxnIds[i] = count++; - } + int count = 0; + for (int i = 0 ; i < oldTxnIds.length ; ++i) + { + if (remove.test(oldTxnIds[i])) remapTxnIds[i] = -1; + else remapTxnIds[i] = count++; + } - if (count == this.txnIds.length) - return this; + if (count == oldTxnIds.length) + return this; - if (count == 0) - return NONE; + if (count == 0) + return NONE; - txnIds = new TxnId[count]; - for (int i = 0 ; i < this.txnIds.length ; ++i) - { - if (remapTxnIds[i] >= 0) - txnIds[remapTxnIds[i]] = this.txnIds[i]; - } + newTxnIds = new TxnId[count]; + for (int i = 0 ; i < oldTxnIds.length ; ++i) + { + if (remapTxnIds[i] >= 0) + newTxnIds[remapTxnIds[i]] = oldTxnIds[i]; } - keyToTxnId = cache.getInts(this.keyToTxnId.length); + newKeyToTxnId = cache.getInts(oldKeyToTxnId.length); int k = 0, i = keys.size(); o = i; - while (i < this.keyToTxnId.length) + while (i < oldKeyToTxnId.length) { - while (this.keyToTxnId[k] == i) - keyToTxnId[k++] = o; + while (oldKeyToTxnId[k] == i) + newKeyToTxnId[k++] = o; - int remapped = remapTxnIds[this.keyToTxnId[i]]; + int remapped = remapTxnIds[oldKeyToTxnId[i]]; if (remapped >= 0) - keyToTxnId[o++] = remapped; + newKeyToTxnId[o++] = remapped; ++i; } while (k < keys.size()) - keyToTxnId[k++] = o; - - int[] result = cache.complete(keyToTxnId, o); - cache.discard(keyToTxnId, o); - return new Deps(keys, txnIds, result); + newKeyToTxnId[k++] = o; } catch (Throwable t) { - cache.forceDiscard(keyToTxnId, o); + cache.forceDiscard(newKeyToTxnId, o); throw t; } finally { - cache.forceDiscard(remapTxnIds, txnIds.length); + cache.forceDiscard(remapTxnIds, oldTxnIds.length); } + + newKeyToTxnId = cache.completeAndDiscard(newKeyToTxnId, o); + return new Deps(keys, newTxnIds, newKeyToTxnId); } public boolean contains(TxnId txnId) @@ -1054,7 +1054,7 @@ public class Deps implements Iterable<Map.Entry<Key, TxnId>> public void forEachOn(Ranges ranges, Predicate<Key> include, BiConsumer<Key, TxnId> forEach) { - Routables.foldl(keys, ranges, (index, key, value) -> { + Routables.foldl(keys, ranges, (key, value, index) -> { if (!include.test(key)) return null; @@ -1082,7 +1082,7 @@ public class Deps implements Iterable<Map.Entry<Key, TxnId>> // does not rely on this ordering. for (int offset = 0 ; offset < txnIds.length ; offset += 64) { - long bitset = Routables.foldl(keys, ranges, (keyIndex, key, off, value) -> { + long bitset = Routables.foldl(keys, ranges, (key, off, value, keyIndex) -> { if (!include.test(key)) return value; @@ -1151,7 +1151,7 @@ public class Deps implements Iterable<Map.Entry<Key, TxnId>> public Collection<TxnId> txnIds() { - return listOf(txnIds); + return Arrays.asList(txnIds); } public List<TxnId> txnIds(Key key) diff --git a/accord-core/src/main/java/accord/primitives/Routables.java b/accord-core/src/main/java/accord/primitives/Routables.java index 05e85ba..94d4baa 100644 --- a/accord-core/src/main/java/accord/primitives/Routables.java +++ b/accord-core/src/main/java/accord/primitives/Routables.java @@ -148,7 +148,7 @@ public interface Routables<K extends Routable, U extends Routables<K, ?>> extend int nexti = valueIntersections.findLimit(is, i, ms, m); while (i < nexti) { - initialValue = fold.apply(i, is.get(i), initialValue); + initialValue = fold.apply(is.get(i), initialValue, i); ++i; } } @@ -174,7 +174,7 @@ public interface Routables<K extends Routable, U extends Routables<K, ?>> extend int nexti = valueIntersections.findLimit(is, i, ms, m); while (i < nexti) { - initialValue = fold.apply(i, is.get(i), param, initialValue); + initialValue = fold.apply(is.get(i), param, initialValue, i); if (initialValue == terminalValue) break done; ++i; @@ -199,7 +199,7 @@ public interface Routables<K extends Routable, U extends Routables<K, ?>> extend int nexti = (int)(im); while (i < nexti) { - initialValue = fold.apply(i, is.get(i), param, initialValue); + initialValue = fold.apply(is.get(i), param, initialValue, i); if (initialValue == terminalValue) break done; ++i; @@ -227,7 +227,7 @@ public interface Routables<K extends Routable, U extends Routables<K, ?>> extend m = (int)(kri >>> 32); int nexti = valueIntersections.findLimit(is, i, ms, m); - initialValue = fold.apply(i, nexti, param, initialValue); + initialValue = fold.apply(param, initialValue, i, nexti); if (initialValue == terminalValue) break; i = nexti; diff --git a/accord-core/src/main/java/accord/primitives/Txn.java b/accord-core/src/main/java/accord/primitives/Txn.java index d475119..907bdaf 100644 --- a/accord-core/src/main/java/accord/primitives/Txn.java +++ b/accord-core/src/main/java/accord/primitives/Txn.java @@ -175,7 +175,7 @@ public interface Txn default Future<Data> read(SafeCommandStore safeStore, Command command) { Ranges ranges = safeStore.ranges().at(command.executeAt().epoch); - List<Future<Data>> futures = read().keys().foldl(ranges, (index, key, accumulate) -> { + List<Future<Data>> futures = read().keys().foldl(ranges, (key, accumulate, index) -> { if (!safeStore.commandStore().hashIntersects(key)) return accumulate; diff --git a/accord-core/src/main/java/accord/primitives/Writes.java b/accord-core/src/main/java/accord/primitives/Writes.java index 2986dd0..b41d2bb 100644 --- a/accord-core/src/main/java/accord/primitives/Writes.java +++ b/accord-core/src/main/java/accord/primitives/Writes.java @@ -71,7 +71,7 @@ public class Writes if (ranges == null) return SUCCESS; - List<Future<Void>> futures = keys.foldl(ranges, (index, key, accumulate) -> { + List<Future<Void>> futures = keys.foldl(ranges, (key, accumulate, index) -> { if (safeStore.commandStore().hashIntersects(key)) accumulate.add(write.apply(key, safeStore, executeAt, safeStore.dataStore())); return accumulate; diff --git a/accord-core/src/main/java/accord/topology/Topologies.java b/accord-core/src/main/java/accord/topology/Topologies.java index 4881f9e..f799193 100644 --- a/accord-core/src/main/java/accord/topology/Topologies.java +++ b/accord-core/src/main/java/accord/topology/Topologies.java @@ -65,7 +65,7 @@ public interface Topologies extends TopologySorter default void forEach(IndexedConsumer<Topology> consumer) { for (int i=0, mi=size(); i<mi; i++) - consumer.accept(i, get(i)); + consumer.accept(get(i), i); } static boolean equals(Topologies t, Object o) diff --git a/accord-core/src/main/java/accord/topology/Topology.java b/accord-core/src/main/java/accord/topology/Topology.java index d7e93a7..277446f 100644 --- a/accord-core/src/main/java/accord/topology/Topology.java +++ b/accord-core/src/main/java/accord/topology/Topology.java @@ -27,6 +27,7 @@ import accord.api.RoutingKey; import accord.local.Node.Id; import accord.primitives.*; import accord.utils.*; +import accord.utils.ArrayBuffers.IntBuffers; import static accord.utils.SortedArrays.Search.FLOOR; import static accord.utils.SortedArrays.exponentialSearch; @@ -192,22 +193,22 @@ public class Topology public Topology forSelection(Unseekables<?, ?> select) { - return forSelection(select, (i, shard) -> true); + return forSelection(select, (ignore, index) -> true, null); } - public Topology forSelection(Unseekables<?, ?> select, IndexedPredicate<Shard> predicate) + public <P1> Topology forSelection(Unseekables<?, ?> select, IndexedPredicate<P1> predicate, P1 param) { - return forSubset(subsetFor(select, predicate)); + return forSubset(subsetFor(select, predicate, param)); } public Topology forSelection(Unseekables<?, ?> select, Collection<Id> nodes) { - return forSelection(select, nodes, (i, shard) -> true); + return forSelection(select, nodes, (ignore, index) -> true, null); } - public Topology forSelection(Unseekables<?, ?> select, Collection<Id> nodes, IndexedPredicate<Shard> predicate) + public <P1> Topology forSelection(Unseekables<?, ?> select, Collection<Id> nodes, IndexedPredicate<P1> predicate, P1 param) { - return forSubset(subsetFor(select, predicate), nodes); + return forSubset(subsetFor(select, predicate, param), nodes); } private Topology forSubset(int[] newSubset) @@ -236,66 +237,79 @@ public class Topology return new Topology(epoch, shards, ranges, nodeLookup, rangeSubset, newSubset); } - private int[] subsetFor(Unseekables<?, ?> select, IndexedPredicate<Shard> predicate) + private <P1> int[] subsetFor(Unseekables<?, ?> select, IndexedPredicate<P1> predicate, P1 param) { int count = 0; - int[] newSubset = new int[Math.min(select.size(), subsetOfRanges.size())]; - Unseekables<?, ?> as = select; - Ranges bs = subsetOfRanges; - int ai = 0, bi = 0; - // ailim tracks which ai have been included; since there may be multiple matches - // we cannot increment ai to avoid missing a match with a second bi - int ailim = 0; - - if (subsetOfRanges == ranges) + IntBuffers cachedInts = ArrayBuffers.cachedInts(); + int[] newSubset = cachedInts.getInts(Math.min(select.size(), subsetOfRanges.size())); + try { - while (true) + Routables<?, ?> as = select; + Ranges bs = subsetOfRanges; + int ai = 0, bi = 0; + // ailim tracks which ai have been included; since there may be multiple matches + // we cannot increment ai to avoid missing a match with a second bi + int ailim = 0; + + if (subsetOfRanges == ranges) { - long abi = as.findNextIntersection(ai, bs, bi); - if (abi < 0) + while (true) { - if (ailim < select.size()) - throw new IllegalArgumentException("Range not found for " + select.get(ailim)); - break; + long abi = as.findNextIntersection(ai, bs, bi); + if (abi < 0) + { + if (ailim < as.size()) + throw new IllegalArgumentException("Range not found for " + as.get(ailim)); + break; + } + + ai = (int)abi; + if (ailim < ai) + throw new IllegalArgumentException("Range not found for " + as.get(ailim)); + + bi = (int)(abi >>> 32); + if (predicate.test(param, bi)) + newSubset[count++] = bi; + + ailim = as.findNext(ai + 1, bs.get(bi), FLOOR); + if (ailim < 0) ailim = -1 - ailim; + else ailim++; + ++bi; } + } + else + { + while (true) + { + long abi = as.findNextIntersection(ai, bs, bi); + if (abi < 0) + break; - bi = (int)(abi >>> 32); - if (ailim < (int)abi) - throw new IllegalArgumentException("Range not found for " + select.get(ailim)); - - if (predicate.test(bi, shards[bi])) - newSubset[count++] = bi; + bi = (int)(abi >>> 32); + if (predicate.test(param, bi)) + newSubset[count++] = bi; - ai = (int)abi; - ailim = as.findNext(ai + 1, bs.get(bi), FLOOR); - if (ailim < 0) ailim = -1 - ailim; - else ailim++; - ++bi; + ++bi; + } } } - else + catch (Throwable t) { - while (true) - { - long abi = as.findNextIntersection(ai, bs, bi); - if (abi < 0) - break; + cachedInts.forceDiscard(newSubset, count); + throw t; + } - bi = (int)(abi >>> 32); - if (predicate.test(bi, shards[bi])) - newSubset[count++] = bi; + return cachedInts.completeAndDiscard(newSubset, count); + } - ++bi; - } - } - if (count != newSubset.length) - newSubset = Arrays.copyOf(newSubset, count); - return newSubset; + public <P1> void visitNodeForKeysOnceOrMore(Unseekables<?, ?> select, Consumer<Id> nodes) + { + visitNodeForKeysOnceOrMore(select, (i1, i2) -> true, null, nodes); } - public void visitNodeForKeysOnceOrMore(Unseekables<?, ?> select, IndexedPredicate<Shard> predicate, Consumer<Id> nodes) + public <P1> void visitNodeForKeysOnceOrMore(Unseekables<?, ?> select, IndexedPredicate<P1> predicate, P1 param, Consumer<Id> nodes) { - for (int shardIndex : subsetFor(select, predicate)) + for (int shardIndex : subsetFor(select, predicate, param)) { Shard shard = shards[shardIndex]; for (Id id : shard.nodes) @@ -318,7 +332,7 @@ public class Topology ai = (int)(abi); bi = (int)(abi >>> 32); - accumulator = function.apply(bi, shards[bi], accumulator); + accumulator = function.apply(shards[bi], accumulator, bi); ++bi; } @@ -336,7 +350,7 @@ public class Topology { if (a[ai] == b[bi]) { - consumer.accept(ai, shards[a[ai]]); + consumer.accept(shards[a[ai]], ai); ++ai; ++bi; } else if (a[ai] < b[bi]) @@ -363,7 +377,7 @@ public class Topology { if (a[ai] == b[bi]) { - O next = function.apply(offset + ai, p1, p2, p3); + O next = function.apply(p1, p2, p3, offset + ai); initialValue = reduce.apply(initialValue, next); ++ai; ++bi; } @@ -394,7 +408,7 @@ public class Topology { if (a[ai] == b[bi]) { - if (consumer.test(ai, shards[a[ai]])) + if (consumer.test(shards[a[ai]], ai)) ++count; ++ai; ++bi; } @@ -424,7 +438,7 @@ public class Topology { if (a[ai] == b[bi]) { - initialValue = consumer.apply(offset + ai, param, initialValue); + initialValue = consumer.apply(param, initialValue, offset + ai); if (terminalValue == initialValue) return terminalValue; ++ai; ++bi; @@ -446,7 +460,7 @@ public class Topology public void forEach(IndexedConsumer<Shard> consumer) { for (int i = 0; i < supersetIndexes.length ; ++i) - consumer.accept(i, shards[supersetIndexes[i]]); + consumer.accept(shards[supersetIndexes[i]], i); } public int size() diff --git a/accord-core/src/main/java/accord/topology/TopologyManager.java b/accord-core/src/main/java/accord/topology/TopologyManager.java index 2e7de96..ec1061f 100644 --- a/accord-core/src/main/java/accord/topology/TopologyManager.java +++ b/accord-core/src/main/java/accord/topology/TopologyManager.java @@ -113,7 +113,7 @@ public class TopologyManager implements ConfigurationService.Listener return false; if (syncComplete) return true; - Boolean result = global().foldl(intersect, (i, shard, acc) -> { + Boolean result = global().foldl(intersect, (shard, acc, i) -> { if (acc == Boolean.FALSE) return acc; return syncTracker.get(i).hasReachedQuorum(); @@ -121,7 +121,7 @@ public class TopologyManager implements ConfigurationService.Listener return result == Boolean.TRUE; } - boolean shardIsUnsynced(int idx, Shard shard) + boolean shardIsUnsynced(int idx) { return !prevSynced || !syncTracker.get(idx).hasReachedQuorum(); } @@ -334,9 +334,9 @@ public class TopologyManager implements ConfigurationService.Listener { EpochState epochState = snapshot.epochs[i]; if (epochState.epoch() < minEpoch) - epochState.global.visitNodeForKeysOnceOrMore(select, epochState::shardIsUnsynced, nodes::add); + epochState.global.visitNodeForKeysOnceOrMore(select, EpochState::shardIsUnsynced, epochState, nodes::add); else - epochState.global.visitNodeForKeysOnceOrMore(select, (i1, i2) -> true, nodes::add); + epochState.global.visitNodeForKeysOnceOrMore(select, nodes::add); } Topologies.Multi topologies = new Topologies.Multi(sorter, count); @@ -344,9 +344,9 @@ public class TopologyManager implements ConfigurationService.Listener { EpochState epochState = snapshot.epochs[i]; if (epochState.epoch() < minEpoch) - topologies.add(epochState.global.forSelection(select, nodes, epochState::shardIsUnsynced)); + topologies.add(epochState.global.forSelection(select, nodes, EpochState::shardIsUnsynced, epochState)); else - topologies.add(epochState.global.forSelection(select, nodes, (i1, i2) -> true)); + topologies.add(epochState.global.forSelection(select, nodes, (ignore, idx) -> true, null)); } return topologies; @@ -362,7 +362,7 @@ public class TopologyManager implements ConfigurationService.Listener Set<Id> nodes = new LinkedHashSet<>(); int count = (int)(1 + maxEpoch - minEpoch); for (int i = count - 1 ; i >= 0 ; --i) - snapshot.get(minEpoch + i).global().visitNodeForKeysOnceOrMore(keys, (i1, i2) -> true, nodes::add); + snapshot.get(minEpoch + i).global().visitNodeForKeysOnceOrMore(keys, nodes::add); Topologies.Multi topologies = new Topologies.Multi(sorter, count); for (int i = count - 1 ; i >= 0 ; --i) diff --git a/accord-core/src/main/java/accord/utils/ArrayBuffers.java b/accord-core/src/main/java/accord/utils/ArrayBuffers.java index b035e26..2dce966 100644 --- a/accord-core/src/main/java/accord/utils/ArrayBuffers.java +++ b/accord-core/src/main/java/accord/utils/ArrayBuffers.java @@ -117,6 +117,19 @@ public class ArrayBuffers */ boolean discard(int[] buffer, int usedSize); + /** + * Equivalent to + * int[] result = complete(buffer, usedSize); + * discard(buffer, usedSize); + * return result; + */ + default int[] completeAndDiscard(int[] buffer, int usedSize) + { + int[] result = complete(buffer, usedSize); + discard(buffer, usedSize); + return result; + } + /** * Indicate this buffer is definitely unused, and return it to a pool if possible * @return true if the buffer is discarded (and discard-able), false if it was retained @@ -577,7 +590,6 @@ public class ArrayBuffers return; savedInts = buffer; - return; } } diff --git a/accord-core/src/main/java/accord/utils/IndexedBiFunction.java b/accord-core/src/main/java/accord/utils/IndexedBiFunction.java index 80fb7ec..03b96ce 100644 --- a/accord-core/src/main/java/accord/utils/IndexedBiFunction.java +++ b/accord-core/src/main/java/accord/utils/IndexedBiFunction.java @@ -18,7 +18,7 @@ package accord.utils; -public interface IndexedBiFunction<T, U, R> +public interface IndexedBiFunction<P1, P2, O> { - R apply(int i, T t, U u); + O apply(P1 p1, P2 p2, int index); } diff --git a/accord-core/src/main/java/accord/utils/IndexedConsumer.java b/accord-core/src/main/java/accord/utils/IndexedConsumer.java index 75aa8d0..cfa2c1b 100644 --- a/accord-core/src/main/java/accord/utils/IndexedConsumer.java +++ b/accord-core/src/main/java/accord/utils/IndexedConsumer.java @@ -18,7 +18,7 @@ package accord.utils; -public interface IndexedConsumer<V> +public interface IndexedConsumer<P1> { - void accept(int i, V v); + void accept(P1 p1, int index); } diff --git a/accord-core/src/main/java/accord/utils/IndexedFold.java b/accord-core/src/main/java/accord/utils/IndexedFold.java index eec2223..079c16c 100644 --- a/accord-core/src/main/java/accord/utils/IndexedFold.java +++ b/accord-core/src/main/java/accord/utils/IndexedFold.java @@ -18,7 +18,6 @@ package accord.utils; -public interface IndexedFold<K, V> +public interface IndexedFold<P1, Accumulate> extends IndexedBiFunction<P1, Accumulate, Accumulate> { - V apply(int index, K key, V value); } \ No newline at end of file diff --git a/accord-core/src/main/java/accord/utils/IndexedFoldIntersectToLong.java b/accord-core/src/main/java/accord/utils/IndexedFoldIntersectToLong.java index 6b3def1..0cd5a82 100644 --- a/accord-core/src/main/java/accord/utils/IndexedFoldIntersectToLong.java +++ b/accord-core/src/main/java/accord/utils/IndexedFoldIntersectToLong.java @@ -18,7 +18,7 @@ package accord.utils; -public interface IndexedFoldIntersectToLong<K> +public interface IndexedFoldIntersectToLong<P1> { - long apply(int leftIndex, int rightIndex, K key, long param, long prev); + long apply(P1 p1, long p2, long accumulate, int leftIndex, int rightIndex); } diff --git a/accord-core/src/main/java/accord/utils/IndexedFoldToLong.java b/accord-core/src/main/java/accord/utils/IndexedFoldToLong.java index e01f018..c36df26 100644 --- a/accord-core/src/main/java/accord/utils/IndexedFoldToLong.java +++ b/accord-core/src/main/java/accord/utils/IndexedFoldToLong.java @@ -18,7 +18,7 @@ package accord.utils; -public interface IndexedFoldToLong<K> +public interface IndexedFoldToLong<P1> { - long apply(int index, K key, long param, long prev); + long apply(P1 p1, long p2, long accumulate, int index); } diff --git a/accord-core/src/main/java/accord/utils/IndexedFunction.java b/accord-core/src/main/java/accord/utils/IndexedFunction.java index e4d662c..df7020e 100644 --- a/accord-core/src/main/java/accord/utils/IndexedFunction.java +++ b/accord-core/src/main/java/accord/utils/IndexedFunction.java @@ -18,7 +18,7 @@ package accord.utils; -public interface IndexedFunction<T, R> +public interface IndexedFunction<P1, O> { - R apply(int i, T t); + O apply(P1 p1, int index); } diff --git a/accord-core/src/main/java/accord/utils/IndexedIntFunction.java b/accord-core/src/main/java/accord/utils/IndexedIntFunction.java index 3d35adc..272c53a 100644 --- a/accord-core/src/main/java/accord/utils/IndexedIntFunction.java +++ b/accord-core/src/main/java/accord/utils/IndexedIntFunction.java @@ -18,7 +18,7 @@ package accord.utils; -public interface IndexedIntFunction<T> +public interface IndexedIntFunction<P1> { - int apply(int i, T t, int v); + int apply(P1 p1, int p2, int index); } diff --git a/accord-core/src/main/java/accord/utils/IndexedPredicate.java b/accord-core/src/main/java/accord/utils/IndexedPredicate.java index db40c45..68b555b 100644 --- a/accord-core/src/main/java/accord/utils/IndexedPredicate.java +++ b/accord-core/src/main/java/accord/utils/IndexedPredicate.java @@ -18,7 +18,7 @@ package accord.utils; -public interface IndexedPredicate<V> +public interface IndexedPredicate<P1> { - boolean test(int i, V v); + boolean test(P1 p1, int index); } diff --git a/accord-core/src/main/java/accord/utils/IndexedRangeFoldToLong.java b/accord-core/src/main/java/accord/utils/IndexedRangeFoldToLong.java index abfc58e..6381e9c 100644 --- a/accord-core/src/main/java/accord/utils/IndexedRangeFoldToLong.java +++ b/accord-core/src/main/java/accord/utils/IndexedRangeFoldToLong.java @@ -20,5 +20,5 @@ package accord.utils; public interface IndexedRangeFoldToLong { - long apply(int from, int to, long param, long prev); + long apply(long p1, long p2, int fromIndex, int toIndex); } \ No newline at end of file diff --git a/accord-core/src/main/java/accord/utils/IndexedRangeTriConsumer.java b/accord-core/src/main/java/accord/utils/IndexedRangeTriConsumer.java new file mode 100644 index 0000000..8fca7eb --- /dev/null +++ b/accord-core/src/main/java/accord/utils/IndexedRangeTriConsumer.java @@ -0,0 +1,6 @@ +package accord.utils; + +public interface IndexedRangeTriConsumer<P1, P2, P3> +{ + void accept(P1 p1, P2 p2, P3 p3, int fromIndex, int toIndex); +} diff --git a/accord-core/src/main/java/accord/utils/IndexedTriConsumer.java b/accord-core/src/main/java/accord/utils/IndexedTriConsumer.java new file mode 100644 index 0000000..418c69c --- /dev/null +++ b/accord-core/src/main/java/accord/utils/IndexedTriConsumer.java @@ -0,0 +1,7 @@ +package accord.utils; + +// TODO (now): migrate to utils, but must standardise on parameter order with index last +public interface IndexedTriConsumer<P1, P2, P3> +{ + void accept(P1 p1, P2 p2, P3 p3, int index); +} diff --git a/accord-core/src/main/java/accord/utils/IndexedTriFunction.java b/accord-core/src/main/java/accord/utils/IndexedTriFunction.java index 3c722d5..22b2e48 100644 --- a/accord-core/src/main/java/accord/utils/IndexedTriFunction.java +++ b/accord-core/src/main/java/accord/utils/IndexedTriFunction.java @@ -18,7 +18,7 @@ package accord.utils; -public interface IndexedTriFunction<I1, I2, I3, O> +public interface IndexedTriFunction<P1, P2, P3, V> { - O apply(int i0, I1 i1, I2 i2, I3 i3); + V apply(P1 p1, P2 p2, P3 p3, int index); } diff --git a/accord-core/src/main/java/accord/utils/IntrusiveLinkedList.java b/accord-core/src/main/java/accord/utils/IntrusiveLinkedList.java new file mode 100644 index 0000000..eadcb92 --- /dev/null +++ b/accord-core/src/main/java/accord/utils/IntrusiveLinkedList.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package accord.utils; + +import java.util.Iterator; +import java.util.Spliterator; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +import static java.util.Spliterators.spliteratorUnknownSize; + +/** + * A simple intrusive double-linked list for maintaining a list of tasks, + * useful for invalidating queued ordered tasks + * + * TODO COPIED FROM CASSANDRA + */ + +@SuppressWarnings("unchecked") +public class IntrusiveLinkedList<O extends IntrusiveLinkedListNode> extends IntrusiveLinkedListNode implements Iterable<O> +{ + public IntrusiveLinkedList() + { + prev = next = this; + } + + public void addFirst(O add) + { + if (add.next != null) + throw new IllegalStateException(); + add(this, add, next); + } + + public void addLast(O add) + { + if (add.next != null) + throw new IllegalStateException(); + add(prev, add, this); + } + + private void add(IntrusiveLinkedListNode after, IntrusiveLinkedListNode add, IntrusiveLinkedListNode before) + { + add.next = before; + add.prev = after; + before.prev = add; + after.next = add; + } + + public O poll() + { + if (isEmpty()) + return null; + + IntrusiveLinkedListNode next = this.next; + next.remove(); + return (O) next; + } + + public boolean isEmpty() + { + return next == this; + } + + public Iterator<O> iterator() + { + return new Iterator<O>() + { + IntrusiveLinkedListNode next = IntrusiveLinkedList.this.next; + + @Override + public boolean hasNext() + { + return next != IntrusiveLinkedList.this; + } + + @Override + public O next() + { + O result = (O)next; + if (result.next == null) + throw new NullPointerException(); + next = result.next; + return result; + } + }; + } + + public Stream<O> stream() + { + return StreamSupport.stream(spliteratorUnknownSize(iterator(), Spliterator.IMMUTABLE), false); + } +} + diff --git a/accord-core/src/main/java/accord/utils/IndexedFoldToLong.java b/accord-core/src/main/java/accord/utils/IntrusiveLinkedListNode.java similarity index 66% copy from accord-core/src/main/java/accord/utils/IndexedFoldToLong.java copy to accord-core/src/main/java/accord/utils/IntrusiveLinkedListNode.java index e01f018..0270e28 100644 --- a/accord-core/src/main/java/accord/utils/IndexedFoldToLong.java +++ b/accord-core/src/main/java/accord/utils/IntrusiveLinkedListNode.java @@ -18,7 +18,27 @@ package accord.utils; -public interface IndexedFoldToLong<K> +/** + * TODO COPIED FROM CASSANDRA + */ +public abstract class IntrusiveLinkedListNode { - long apply(int index, K key, long param, long prev); + IntrusiveLinkedListNode prev; + IntrusiveLinkedListNode next; + + protected boolean isFree() + { + return next == null; + } + + protected void remove() + { + if (next != null) + { + prev.next = next; + next.prev = prev; + next = null; + prev = null; + } + } } diff --git a/accord-core/src/main/java/accord/utils/SortedArrays.java b/accord-core/src/main/java/accord/utils/SortedArrays.java index 6068f9b..a0aa473 100644 --- a/accord-core/src/main/java/accord/utils/SortedArrays.java +++ b/accord-core/src/main/java/accord/utils/SortedArrays.java @@ -855,7 +855,7 @@ public class SortedArrays ai = (int)(abi); bi = (int)(abi >>> 32); - initialValue = fold.apply(ai, bi, as[ai], param, initialValue); + initialValue = fold.apply(as[ai], param, initialValue, ai, bi); if (initialValue == terminalValue) break; diff --git a/accord-core/src/test/java/accord/KeysTest.java b/accord-core/src/test/java/accord/KeysTest.java index 808e506..d7867f2 100644 --- a/accord-core/src/test/java/accord/KeysTest.java +++ b/accord-core/src/test/java/accord/KeysTest.java @@ -26,7 +26,6 @@ import java.util.stream.IntStream; import accord.api.Key; import accord.impl.IntKey; import accord.impl.IntKey.Raw; -import accord.impl.IntKey.Routing; import accord.primitives.Range; import accord.primitives.Ranges; import accord.primitives.Keys; @@ -126,22 +125,22 @@ public class KeysTest void foldlTest() { List<Key> keys = new ArrayList<>(); - long result = keys(150, 250, 350, 450, 550).foldl(ranges(r(200, 400)), (i, key, p, v) -> { keys.add(key); return v * p + 1; }, 15, 0, -1); + long result = keys(150, 250, 350, 450, 550).foldl(ranges(r(200, 400)), (key, p2, v, i) -> { keys.add(key); return v * p2 + 1; }, 15, 0, -1); assertEquals(16, result); assertEquals(keys(250, 350), Keys.of(keys)); keys.clear(); - result = keys(150, 250, 350, 450, 550).foldl(ranges(r(0, 500)), (i, key, p, v) -> { keys.add(key); return v * p + 1; }, 15, 0, -1); + result = keys(150, 250, 350, 450, 550).foldl(ranges(r(0, 500)), (key, p2, v, i) -> { keys.add(key); return v * p2 + 1; }, 15, 0, -1); assertEquals(3616, result); assertEquals(keys(150, 250, 350, 450), Keys.of(keys)); keys.clear(); - result = keys(150, 250, 350, 450, 550).foldl(ranges(r(500, 1000)), (i, key, p, v) -> { keys.add(key); return v * p + 1; }, 15, 0, -1); + result = keys(150, 250, 350, 450, 550).foldl(ranges(r(500, 1000)), (key, p2, v, i) -> { keys.add(key); return v * p2 + 1; }, 15, 0, -1); assertEquals(1, result); assertEquals(keys(550), Keys.of(keys)); keys.clear(); - result = keys(150, 250, 350, 450, 550).foldl(ranges(r(0, 20), r(100, 140), r(149, 151), r(560, 2000)), (i, key, p, v) -> { keys.add(key); return v * p + 1; }, 15, 0, -1); + result = keys(150, 250, 350, 450, 550).foldl(ranges(r(0, 20), r(100, 140), r(149, 151), r(560, 2000)), (key, p2, v, i) -> { keys.add(key); return v * p2 + 1; }, 15, 0, -1); assertEquals(1, result); assertEquals(keys(150), Keys.of(keys)); } @@ -232,21 +231,21 @@ public class KeysTest qt().forAll(keysGen()).check(list -> { Keys keys = Keys.of(list); - Assertions.assertEquals(keys.size(), keys.foldl(ranges(range(Integer.MIN_VALUE, Integer.MAX_VALUE)), (index, key, accum) -> accum + 1, 0)); - Assertions.assertEquals(keys.size(), keys.foldl(ranges(range(Integer.MIN_VALUE, Integer.MAX_VALUE)), (index, key, ignore, accum) -> accum + 1, -1, 0, Long.MAX_VALUE)); - Assertions.assertEquals(keys.size(), keys.foldl((index, key, ignore, accum) -> accum + 1, -1, 0, Long.MAX_VALUE)); + Assertions.assertEquals(keys.size(), keys.foldl(ranges(range(Integer.MIN_VALUE, Integer.MAX_VALUE)), (key, accum, index) -> accum + 1, 0)); + Assertions.assertEquals(keys.size(), keys.foldl(ranges(range(Integer.MIN_VALUE, Integer.MAX_VALUE)), (p1, ignore, accum, index) -> accum + 1, -1, 0, Long.MAX_VALUE)); + Assertions.assertEquals(keys.size(), keys.foldl((p1, ignore, accum, index) -> accum + 1, -1, 0, Long.MAX_VALUE)); // early termination - Assertions.assertEquals(1, keys.foldl(ranges(range(Integer.MIN_VALUE, Integer.MAX_VALUE)), (index, key, ignore, accum) -> accum + 1, -1, 0, 1)); - Assertions.assertEquals(1, keys.foldl((index, key, ignore, accum) -> accum + 1, -1, 0, 1)); + Assertions.assertEquals(1, keys.foldl(ranges(range(Integer.MIN_VALUE, Integer.MAX_VALUE)), (p1, ignore, accum, index) -> accum + 1, -1, 0, 1)); + Assertions.assertEquals(1, keys.foldl((p1, ignore, accum, index) -> accum + 1, -1, 0, 1)); - Assertions.assertEquals(keys.size(), keys.foldl(ranges(keys), (index, key, accum) -> accum + 1, 0)); - Assertions.assertEquals(keys.size(), keys.foldl(ranges(keys), (index, key, ignore, accum) -> accum + 1, -1, 0, Long.MAX_VALUE)); + Assertions.assertEquals(keys.size(), keys.foldl(ranges(keys), (key, accum, index) -> accum + 1, 0)); + Assertions.assertEquals(keys.size(), keys.foldl(ranges(keys), (p1, ignore, accum, index) -> accum + 1, -1, 0, Long.MAX_VALUE)); for (Key k : keys) { - Assertions.assertEquals(1, keys.foldl(ranges(keys, k), (index, key, accum) -> accum + 1, 0)); - Assertions.assertEquals(1, keys.foldl(ranges(keys, k), (index, key, ignore, accum) -> accum + 1, -1, 0, Long.MAX_VALUE)); + Assertions.assertEquals(1, keys.foldl(ranges(keys, k), (key, accum, index) -> accum + 1, 0)); + Assertions.assertEquals(1, keys.foldl(ranges(keys, k), (p1, ignore, accum, index) -> accum + 1, -1, 0, Long.MAX_VALUE)); } }); } diff --git a/accord-core/src/test/java/accord/burn/BurnTest.java b/accord-core/src/test/java/accord/burn/BurnTest.java index 286f4b4..bc75e62 100644 --- a/accord-core/src/test/java/accord/burn/BurnTest.java +++ b/accord-core/src/test/java/accord/burn/BurnTest.java @@ -35,6 +35,7 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; +import java.util.function.LongSupplier; import accord.impl.IntHashKey; import accord.impl.basic.Cluster; @@ -269,7 +270,8 @@ public class BurnTest { // Long overrideSeed = null; int count = 1; - Long overrideSeed = 1683848112394089134L; + Long overrideSeed = 188057951046487786L; + LongSupplier seedGenerator = ThreadLocalRandom.current()::nextLong; for (int i = 0 ; i < args.length ; i += 2) { switch (args[i]) @@ -282,11 +284,14 @@ public class BurnTest case "-s": overrideSeed = Long.parseLong(args[i + 1]); count = 1; + break; + case "--loop-seed": + seedGenerator = new Random(Long.parseLong(args[i + 1]))::nextLong; } } while (count-- > 0) { - run(overrideSeed != null ? overrideSeed : ThreadLocalRandom.current().nextLong()); + run(overrideSeed != null ? overrideSeed : seedGenerator.getAsLong()); } } diff --git a/accord-core/src/test/java/accord/coordinate/tracking/TrackerReconciler.java b/accord-core/src/test/java/accord/coordinate/tracking/TrackerReconciler.java index acbb3b3..fc569cf 100644 --- a/accord-core/src/test/java/accord/coordinate/tracking/TrackerReconciler.java +++ b/accord-core/src/test/java/accord/coordinate/tracking/TrackerReconciler.java @@ -53,7 +53,7 @@ public abstract class TrackerReconciler<ST extends ShardTracker, T extends Abstr RequestStatus newStatus = invoke(next, tracker, from); for (int i = 0 ; i < topologies().size() ; ++i) { - topologies().get(i).forEachOn(from, (si, s) -> { + topologies().get(i).forEachOn(from, (s, si) -> { counts[si].compute(next, (ignore, cur) -> cur + 1); }); } diff --git a/accord-core/src/test/java/accord/impl/basic/Cluster.java b/accord-core/src/test/java/accord/impl/basic/Cluster.java index 1ea9210..9e2988e 100644 --- a/accord-core/src/test/java/accord/impl/basic/Cluster.java +++ b/accord-core/src/test/java/accord/impl/basic/Cluster.java @@ -88,7 +88,8 @@ public class Cluster implements Scheduler private void add(Packet packet) { boolean isReply = packet.message instanceof Reply; - trace.trace("{} {} {}", clock++, isReply ? "RPLY" : "SEND", packet); + if (trace.isTraceEnabled()) + trace.trace("{} {} {}", clock++, isReply ? "RPLY" : "SEND", packet); if (lookup.apply(packet.dst) == null) responseSink.accept(packet); else pending.add(packet); } @@ -140,11 +141,14 @@ public class Cluster implements Scheduler || !partitionSet.contains(deliver.src) && !partitionSet.contains(deliver.dst)); if (drop) { - trace.trace("{} DROP[{}] {}", clock++, on.epoch(), deliver); + if (trace.isTraceEnabled()) + trace.trace("{} DROP[{}] {}", clock++, on.epoch(), deliver); return; } - trace.trace("{} RECV[{}] {}", clock++, on.epoch(), deliver); + if (trace.isTraceEnabled()) + trace.trace("{} RECV[{}] {}", clock++, on.epoch(), deliver); + if (deliver.message instanceof Reply) { Reply reply = (Reply) deliver.message; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org