This is an automated email from the ASF dual-hosted git repository. aweisberg pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git
commit 6c6872270e16d2e777f1fa2c510b8f15396be3f3 Author: Ariel Weisberg <aweisb...@apple.com> AuthorDate: Mon May 1 12:19:34 2023 -0400 Support for interoperability between Accord and non-Accord managed data Patch by Ariel Weisberg and Blake Eggleston; Reviewed by Blake Eggleston for CASSANDRA-18129 Co-authored-by: Blake Eggleston <beggles...@apple.com> --- accord-core/src/main/java/accord/api/Data.java | 12 ++ accord-core/src/main/java/accord/api/Read.java | 8 +- .../src/main/java/accord/coordinate/Barrier.java | 1 + .../src/main/java/accord/coordinate/Execute.java | 133 ++---------- .../src/main/java/accord/coordinate/Persist.java | 74 +++++-- .../coordinate/{Execute.java => TxnExecute.java} | 49 ++--- .../main/java/accord/coordinate/TxnPersist.java | 48 +++++ .../accord/coordinate/tracking/AppliedTracker.java | 2 +- .../accord/coordinate/tracking/QuorumTracker.java | 2 +- .../tracking/ResponseTracker.java} | 16 +- accord-core/src/main/java/accord/local/Node.java | 42 ++-- .../main/java/accord/local/SerializerSupport.java | 68 ++++-- .../{ReadTxnData.java => AbstractExecute.java} | 136 +++++++----- .../src/main/java/accord/messages/Apply.java | 27 ++- .../accord/messages/ApplyThenWaitUntilApplied.java | 2 + .../src/main/java/accord/messages/Commit.java | 11 +- .../java/accord/messages/InformHomeDurable.java | 4 +- .../src/main/java/accord/messages/MessageType.java | 123 +++++++---- .../src/main/java/accord/messages/ReadData.java | 21 +- .../src/main/java/accord/messages/ReadTxnData.java | 237 +-------------------- .../java/accord/messages/WaitUntilApplied.java | 9 +- .../src/main/java/accord/primitives/Keys.java | 15 +- .../main/java/accord/primitives/RoutingKeys.java | 11 + .../src/main/java/accord/primitives/Txn.java | 16 +- .../src/main/java/accord/primitives/Writes.java | 14 +- .../src/main/java/accord/topology/Topology.java | 26 ++- .../src/main/java/accord/utils/Invariants.java | 13 +- accord-core/src/test/java/accord/Utils.java | 10 +- .../src/test/java/accord/burn/BurnTest.java | 9 +- .../src/test/java/accord/impl/basic/Cluster.java | 11 +- .../src/test/java/accord/impl/list/ListRead.java | 13 +- .../test/java/accord/impl/mock/MockCluster.java | 6 + .../src/test/java/accord/impl/mock/MockStore.java | 11 +- .../java/accord/local/ImmutableCommandTest.java | 11 +- .../test/java/accord/messages/PreAcceptTest.java | 49 +++-- .../test/java/accord/messages/ReadDataTest.java | 4 +- .../src/main/java/accord/maelstrom/Cluster.java | 6 +- .../main/java/accord/maelstrom/MaelstromRead.java | 12 +- .../src/main/java/accord/maelstrom/Main.java | 6 +- 39 files changed, 629 insertions(+), 639 deletions(-) diff --git a/accord-core/src/main/java/accord/api/Data.java b/accord-core/src/main/java/accord/api/Data.java index 24945648..6f9510c0 100644 --- a/accord-core/src/main/java/accord/api/Data.java +++ b/accord-core/src/main/java/accord/api/Data.java @@ -18,11 +18,23 @@ package accord.api; +import static accord.utils.Invariants.checkState; + /** * The result of some (potentially partial) {@link Read} from some {@link DataStore} */ public interface Data { + Data NOOP_DATA = new Data() + { + @Override + public Data merge(Data data) + { + checkState(data == null || data == NOOP_DATA, "Can't mix no op Data with other implementations of Data"); + return NOOP_DATA; + } + }; + /** * Combine the contents of the parameter with this object and return the resultant object. * This method may modify the current object and return itself. diff --git a/accord-core/src/main/java/accord/api/Read.java b/accord-core/src/main/java/accord/api/Read.java index 7def168c..3b77ed3c 100644 --- a/accord-core/src/main/java/accord/api/Read.java +++ b/accord-core/src/main/java/accord/api/Read.java @@ -19,17 +19,19 @@ package accord.api; import accord.local.SafeCommandStore; -import accord.primitives.*; +import accord.primitives.Ranges; +import accord.primitives.Seekable; +import accord.primitives.Seekables; +import accord.primitives.Timestamp; import accord.utils.async.AsyncChain; - /** * A read to be performed on potentially multiple shards, the inputs of which may be fed to a {@link Query} */ public interface Read { Seekables<?, ?> keys(); - AsyncChain<Data> read(Seekable key, Txn.Kind kind, SafeCommandStore commandStore, Timestamp executeAt, DataStore store); + AsyncChain<Data> read(Seekable key, SafeCommandStore commandStore, Timestamp executeAt, DataStore store); Read slice(Ranges ranges); Read merge(Read other); default boolean isEqualOrFuller(Read other) { return true; } diff --git a/accord-core/src/main/java/accord/coordinate/Barrier.java b/accord-core/src/main/java/accord/coordinate/Barrier.java index 39e56dee..2939c479 100644 --- a/accord-core/src/main/java/accord/coordinate/Barrier.java +++ b/accord-core/src/main/java/accord/coordinate/Barrier.java @@ -146,6 +146,7 @@ public class Barrier<S extends Seekables<?, ?>> extends AsyncResults.AbstractRes catch (ExecutionException e) { tryFailure(e); + return; } coordinateSyncPoint.addCallback((syncPoint, syncPointFailure) -> { if (syncPointFailure != null) diff --git a/accord-core/src/main/java/accord/coordinate/Execute.java b/accord-core/src/main/java/accord/coordinate/Execute.java index edfd033a..0bc5c6ed 100644 --- a/accord-core/src/main/java/accord/coordinate/Execute.java +++ b/accord-core/src/main/java/accord/coordinate/Execute.java @@ -18,59 +18,35 @@ package accord.coordinate; -import java.util.Set; import java.util.function.BiConsumer; -import accord.api.Data; import accord.api.Result; import accord.local.Node; -import accord.local.Node.Id; -import accord.messages.Commit; -import accord.messages.ReadData.ReadNack; -import accord.messages.ReadData.ReadOk; -import accord.messages.ReadData.ReadReply; -import accord.messages.ReadTxnData; import accord.primitives.Deps; import accord.primitives.FullRoute; import accord.primitives.Participants; -import accord.primitives.Ranges; +import accord.primitives.Seekables; import accord.primitives.Timestamp; import accord.primitives.Txn; -import accord.primitives.Txn.Kind; import accord.primitives.TxnId; -import accord.topology.Topologies; -import accord.topology.Topology; +import accord.primitives.Writes; +import accord.primitives.Txn.Kind; -import static accord.coordinate.ReadCoordinator.Action.Approve; -import static accord.coordinate.ReadCoordinator.Action.ApprovePartial; -import static accord.messages.Commit.Kind.Maximal; import static accord.utils.Invariants.checkArgument; -class Execute extends ReadCoordinator<ReadReply> +public interface Execute { - final Txn txn; - final Participants<?> readScope; - final FullRoute<?> route; - final Timestamp executeAt; - final Deps deps; - final Topologies executes; - final BiConsumer<? super Result, Throwable> callback; - private Data data; - - private Execute(Node node, TxnId txnId, Txn txn, FullRoute<?> route, Participants<?> readScope, Timestamp executeAt, Deps deps, BiConsumer<? super Result, Throwable> callback) + interface Factory { - super(node, node.topology().forEpoch(readScope, executeAt.epoch()), txnId); - this.txn = txn; - this.route = route; - this.readScope = readScope; - this.executeAt = executeAt; - this.deps = deps; - this.executes = node.topology().forEpoch(route, executeAt.epoch()); - this.callback = callback; + Execute create(Node node, TxnId txnId, Txn txn, FullRoute<?> route, Participants<?> readScope, Timestamp executeAt, Deps deps, BiConsumer<? super Result, Throwable> callback); } - public static void execute(Node node, TxnId txnId, Txn txn, FullRoute<?> route, Timestamp executeAt, Deps deps, BiConsumer<? super Result, Throwable> callback) + void start(); + + static void execute(Node node, TxnId txnId, Txn txn, FullRoute<?> route, Timestamp executeAt, Deps deps, BiConsumer<? super Result, Throwable> callback) { + Seekables<?, ?> readKeys = txn.read().keys(); + Participants<?> readScope = readKeys.toParticipants(); // Recovery calls execute and we would like execute to run BlockOnDeps because that will notify the agent // of the local barrier if (txn.kind() == Kind.SyncPoint) @@ -78,84 +54,19 @@ class Execute extends ReadCoordinator<ReadReply> checkArgument(txnId.equals(executeAt)); BlockOnDeps.blockOnDeps(node, txnId, txn, route, deps, callback); } - else if (txn.read().keys().isEmpty()) - { - Result result = txn.result(txnId, executeAt, null); - Persist.persist(node, txnId, route, txn, executeAt, deps, txn.execute(txnId, executeAt, null), result); - callback.accept(result, null); - } - else - { - Execute execute = new Execute(node, txnId, txn, route, txn.keys().toParticipants(), executeAt, deps, callback); - execute.start(); - } - } - - @Override - protected void start(Set<Id> readSet) - { - Commit.commitMinimalAndRead(node, executes, txnId, txn, route, readScope, executeAt, deps, readSet, this); - } - - @Override - public void contact(Id to) - { - node.send(to, new ReadTxnData(to, topologies(), txnId, readScope, executeAt), this); - } - - @Override - protected Ranges unavailable(ReadReply reply) - { - return ((ReadOk)reply).unavailable; - } - - @Override - protected Action process(Id from, ReadReply reply) - { - if (reply.isOk()) - { - ReadOk ok = ((ReadOk) reply); - Data next = ok.data; - if (next != null) - data = data == null ? next : data.merge(next); - - return ok.unavailable == null ? Approve : ApprovePartial; - } - - ReadNack nack = (ReadNack) reply; - switch (nack) - { - default: throw new IllegalStateException(); - case Redundant: - callback.accept(null, new Preempted(txnId, route.homeKey())); - return Action.Aborted; - case NotCommitted: - // the replica may be missing the original commit, or the additional commit, so send everything - Topologies topology = node.topology().preciseEpochs(route, txnId.epoch(), executeAt.epoch()); - Topology coordinateTopology = topology.forEpoch(txnId.epoch()); - node.send(from, new Commit(Maximal, from, coordinateTopology, topology, txnId, txn, route, readScope, executeAt, deps, false)); - // also try sending a read command to another replica, in case they're ready to serve a response - return Action.TryAlternative; - case Invalid: - callback.accept(null, new IllegalStateException("Submitted a read command to a replica that did not own the range")); - return Action.Aborted; - } - } - - - @Override - protected void onDone(Success success, Throwable failure) - { - if (failure == null) - { - Result result = txn.result(txnId, executeAt, data); - callback.accept(result, null); - // avoid re-calculating topologies if it is unchanged - Persist.persist(node, executes, txnId, route, txn, executeAt, deps, txn.execute(txnId, executeAt, data), result); - } else { - callback.accept(null, failure); + if (readKeys.isEmpty()) + { + Result result = txn.result(txnId, executeAt, null); + Writes writes = txn.execute(txnId, executeAt, null); + Persist.persist(node, txnId, route, txn, executeAt, deps, writes, result, callback); + } + else + { + Execute execute = node.executionFactory().create(node, txnId, txn, route, readScope, executeAt, deps, callback); + execute.start(); + } } } } diff --git a/accord-core/src/main/java/accord/coordinate/Persist.java b/accord-core/src/main/java/accord/coordinate/Persist.java index 9a132683..a1f3a84a 100644 --- a/accord-core/src/main/java/accord/coordinate/Persist.java +++ b/accord-core/src/main/java/accord/coordinate/Persist.java @@ -20,6 +20,7 @@ package accord.coordinate; import java.util.HashSet; import java.util.Set; +import java.util.function.BiConsumer; import accord.api.Result; import accord.coordinate.tracking.QuorumTracker; @@ -29,7 +30,14 @@ import accord.messages.Apply; import accord.messages.Apply.ApplyReply; import accord.messages.Callback; import accord.messages.InformDurable; -import accord.primitives.*; +import accord.primitives.Deps; +import accord.primitives.FullRoute; +import accord.primitives.PartialTxn; +import accord.primitives.Timestamp; +import accord.primitives.Txn; +import accord.primitives.TxnId; +import accord.primitives.Unseekables; +import accord.primitives.Writes; import accord.topology.Topologies; import static accord.coordinate.tracking.RequestStatus.Success; @@ -37,50 +45,56 @@ import static accord.local.Status.Durability.Majority; import static accord.messages.Apply.executes; import static accord.messages.Apply.participates; -public class Persist implements Callback<ApplyReply> +public abstract class Persist implements Callback<ApplyReply> { - final Node node; - final TxnId txnId; - final FullRoute<?> route; - final Txn txn; - final Timestamp executeAt; - final Deps deps; - final Writes writes; - final Result result; - final QuorumTracker tracker; - final Set<Id> persistedOn; + public interface Factory + { + Persist create(Node node, Topologies topologies, TxnId txnId, FullRoute<?> route, Txn txn, Timestamp executeAt, Deps deps, Writes writes, Result result); + } + + protected final Node node; + protected final TxnId txnId; + protected final FullRoute<?> route; + protected final Txn txn; + protected final Timestamp executeAt; + protected final Deps deps; + protected final Writes writes; + protected final Result result; + protected final Topologies topologies; + protected final QuorumTracker tracker; + protected final Set<Id> persistedOn; boolean isDone; - public static void persist(Node node, TxnId txnId, FullRoute<?> route, Txn txn, Timestamp executeAt, Deps deps, Writes writes, Result result) + public static void persist(Node node, TxnId txnId, FullRoute<?> route, Txn txn, Timestamp executeAt, Deps deps, Writes writes, Result result, BiConsumer<? super Result, Throwable> clientCallback) { Topologies executes = executes(node, route, executeAt); - persist(node, executes, txnId, route, txn, executeAt, deps, writes, result); + persist(node, executes, txnId, route, txn, executeAt, deps, writes, result, clientCallback); } - public static void persist(Node node, Topologies executes, TxnId txnId, FullRoute<?> route, Txn txn, Timestamp executeAt, Deps deps, Writes writes, Result result) + public static void persist(Node node, Topologies executes, TxnId txnId, FullRoute<?> route, Txn txn, Timestamp executeAt, Deps deps, Writes writes, Result result, BiConsumer<? super Result, Throwable> clientCallback) { Topologies participates = participates(node, route, txnId, executeAt, executes); - Persist persist = new Persist(node, executes, txnId, route, txn, executeAt, deps, writes, result); - node.send(participates.nodes(), to -> Apply.applyMinimal(to, participates, executes, txnId, route, txn, executeAt, deps, writes, result), persist); + node.persistFactory().create(node, executes, txnId, route, txn, executeAt, deps, writes, result) + .applyMinimal(participates, executes, writes, result, clientCallback); } public static void persistMaximal(Node node, TxnId txnId, FullRoute<?> route, Txn txn, Timestamp executeAt, Deps deps, Writes writes, Result result) { Topologies executes = executes(node, route, executeAt); Topologies participates = participates(node, route, txnId, executeAt, executes); - Persist persist = new Persist(node, participates, txnId, route, txn, executeAt, deps, writes, result); - node.send(participates.nodes(), to -> Apply.applyMaximal(to, participates, executes, txnId, route, txn, executeAt, deps, writes, result), persist); + node.persistFactory().create(node, participates, txnId, route, txn, executeAt, deps, writes, result) + .applyMaximal(participates, executes, writes, result, null); } public static void persistPartialMaximal(Node node, TxnId txnId, Unseekables<?> sendTo, FullRoute<?> route, PartialTxn txn, Timestamp executeAt, Deps deps, Writes writes, Result result) { Topologies executes = executes(node, sendTo, executeAt); Topologies participates = participates(node, sendTo, txnId, executeAt, executes); - Persist persist = new Persist(node, participates, txnId, route, txn, executeAt, deps, writes, result); - node.send(participates.nodes(), to -> Apply.applyMaximal(to, participates, executes, txnId, route, txn, executeAt, deps, writes, result), persist); + Persist persist = node.persistFactory().create(node, participates, txnId, route, txn, executeAt, deps, writes, result); + node.send(participates.nodes(), to -> Apply.applyMaximal(Apply.FACTORY, to, participates, executes, txnId, route, txn, executeAt, deps, writes, result), persist); } - private Persist(Node node, Topologies topologies, TxnId txnId, FullRoute<?> route, Txn txn, Timestamp executeAt, Deps deps, Writes writes, Result result) + protected Persist(Node node, Topologies topologies, TxnId txnId, FullRoute<?> route, Txn txn, Timestamp executeAt, Deps deps, Writes writes, Result result) { this.node = node; this.txnId = txnId; @@ -90,6 +104,7 @@ public class Persist implements Callback<ApplyReply> this.deps = deps; this.writes = writes; this.result = result; + this.topologies = topologies; this.tracker = new QuorumTracker(topologies); this.persistedOn = new HashSet<>(); } @@ -128,4 +143,19 @@ public class Persist implements Callback<ApplyReply> public void onCallbackFailure(Id from, Throwable failure) { } + + public void applyMinimal(Topologies participates, Topologies executes, Writes writes, Result result, BiConsumer<? super Result, Throwable> clientCallback) + { + registerClientCallback(writes, result, clientCallback); + // applyMinimal is used for transaction execution by the original coordinator so it's important to use + // Node's Apply factory in case the factory has to do synchronous Apply. + node.send(participates.nodes(), to -> Apply.applyMinimal(node.applyFactory(), to, participates, executes, txnId, route, txn, executeAt, deps, writes, result), this); + } + public void applyMaximal(Topologies participates, Topologies executes, Writes writes, Result result, BiConsumer<? super Result, Throwable> clientCallback) + { + registerClientCallback(writes, result, clientCallback); + node.send(participates.nodes(), to -> Apply.applyMaximal(Apply.FACTORY, to, participates, executes, txnId, route, txn, executeAt, deps, writes, result), this); + } + + public abstract void registerClientCallback(Writes writes, Result result, BiConsumer<? super Result, Throwable> clientCallback); } diff --git a/accord-core/src/main/java/accord/coordinate/Execute.java b/accord-core/src/main/java/accord/coordinate/TxnExecute.java similarity index 68% copy from accord-core/src/main/java/accord/coordinate/Execute.java copy to accord-core/src/main/java/accord/coordinate/TxnExecute.java index edfd033a..d9c0d4cd 100644 --- a/accord-core/src/main/java/accord/coordinate/Execute.java +++ b/accord-core/src/main/java/accord/coordinate/TxnExecute.java @@ -21,6 +21,9 @@ package accord.coordinate; import java.util.Set; import java.util.function.BiConsumer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import accord.api.Data; import accord.api.Result; import accord.local.Node; @@ -36,18 +39,19 @@ import accord.primitives.Participants; import accord.primitives.Ranges; import accord.primitives.Timestamp; import accord.primitives.Txn; -import accord.primitives.Txn.Kind; import accord.primitives.TxnId; import accord.topology.Topologies; -import accord.topology.Topology; import static accord.coordinate.ReadCoordinator.Action.Approve; import static accord.coordinate.ReadCoordinator.Action.ApprovePartial; -import static accord.messages.Commit.Kind.Maximal; -import static accord.utils.Invariants.checkArgument; -class Execute extends ReadCoordinator<ReadReply> +public class TxnExecute extends ReadCoordinator<ReadReply> implements Execute { + public static final Execute.Factory FACTORY = TxnExecute::new; + + @SuppressWarnings("unused") + private static final Logger logger = LoggerFactory.getLogger(TxnExecute.class); + final Txn txn; final Participants<?> readScope; final FullRoute<?> route; @@ -57,7 +61,7 @@ class Execute extends ReadCoordinator<ReadReply> final BiConsumer<? super Result, Throwable> callback; private Data data; - private Execute(Node node, TxnId txnId, Txn txn, FullRoute<?> route, Participants<?> readScope, Timestamp executeAt, Deps deps, BiConsumer<? super Result, Throwable> callback) + private TxnExecute(Node node, TxnId txnId, Txn txn, FullRoute<?> route, Participants<?> readScope, Timestamp executeAt, Deps deps, BiConsumer<? super Result, Throwable> callback) { super(node, node.topology().forEpoch(readScope, executeAt.epoch()), txnId); this.txn = txn; @@ -69,28 +73,6 @@ class Execute extends ReadCoordinator<ReadReply> this.callback = callback; } - public static void execute(Node node, TxnId txnId, Txn txn, FullRoute<?> route, Timestamp executeAt, Deps deps, BiConsumer<? super Result, Throwable> callback) - { - // Recovery calls execute and we would like execute to run BlockOnDeps because that will notify the agent - // of the local barrier - if (txn.kind() == Kind.SyncPoint) - { - checkArgument(txnId.equals(executeAt)); - BlockOnDeps.blockOnDeps(node, txnId, txn, route, deps, callback); - } - else if (txn.read().keys().isEmpty()) - { - Result result = txn.result(txnId, executeAt, null); - Persist.persist(node, txnId, route, txn, executeAt, deps, txn.execute(txnId, executeAt, null), result); - callback.accept(result, null); - } - else - { - Execute execute = new Execute(node, txnId, txn, route, txn.keys().toParticipants(), executeAt, deps, callback); - execute.start(); - } - } - @Override protected void start(Set<Id> readSet) { @@ -115,7 +97,7 @@ class Execute extends ReadCoordinator<ReadReply> if (reply.isOk()) { ReadOk ok = ((ReadOk) reply); - Data next = ok.data; + Data next = ((ReadOk) reply).data; if (next != null) data = data == null ? next : data.merge(next); @@ -131,9 +113,7 @@ class Execute extends ReadCoordinator<ReadReply> return Action.Aborted; case NotCommitted: // the replica may be missing the original commit, or the additional commit, so send everything - Topologies topology = node.topology().preciseEpochs(route, txnId.epoch(), executeAt.epoch()); - Topology coordinateTopology = topology.forEpoch(txnId.epoch()); - node.send(from, new Commit(Maximal, from, coordinateTopology, topology, txnId, txn, route, readScope, executeAt, deps, false)); + Commit.commitMaximal(node, from, txn, txnId, executeAt, route, deps, readScope); // also try sending a read command to another replica, in case they're ready to serve a response return Action.TryAlternative; case Invalid: @@ -142,16 +122,13 @@ class Execute extends ReadCoordinator<ReadReply> } } - @Override protected void onDone(Success success, Throwable failure) { if (failure == null) { Result result = txn.result(txnId, executeAt, data); - callback.accept(result, null); - // avoid re-calculating topologies if it is unchanged - Persist.persist(node, executes, txnId, route, txn, executeAt, deps, txn.execute(txnId, executeAt, data), result); + Persist.persist(node, executes, txnId, route, txn, executeAt, deps, txn.execute(txnId, executeAt, data), result, callback); } else { diff --git a/accord-core/src/main/java/accord/coordinate/TxnPersist.java b/accord-core/src/main/java/accord/coordinate/TxnPersist.java new file mode 100644 index 00000000..28119cf1 --- /dev/null +++ b/accord-core/src/main/java/accord/coordinate/TxnPersist.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package accord.coordinate; + +import java.util.function.BiConsumer; + +import accord.api.Result; +import accord.local.Node; +import accord.primitives.Deps; +import accord.primitives.FullRoute; +import accord.primitives.Timestamp; +import accord.primitives.Txn; +import accord.primitives.TxnId; +import accord.primitives.Writes; +import accord.topology.Topologies; + +public class TxnPersist extends Persist +{ + public static final Persist.Factory FACTORY = TxnPersist::new; + + private TxnPersist(Node node, Topologies topologies, TxnId txnId, FullRoute<?> route, Txn txn, Timestamp executeAt, Deps deps, Writes writes, Result result) + { + super(node, topologies, txnId, route, txn, executeAt, deps, writes, result); + } + + @Override + public void registerClientCallback(Writes writes, Result result, BiConsumer<? super Result, Throwable> clientCallback) + { + if (clientCallback != null) + clientCallback.accept(result, null); + } +} diff --git a/accord-core/src/main/java/accord/coordinate/tracking/AppliedTracker.java b/accord-core/src/main/java/accord/coordinate/tracking/AppliedTracker.java index 159f2b88..da49dc01 100644 --- a/accord-core/src/main/java/accord/coordinate/tracking/AppliedTracker.java +++ b/accord-core/src/main/java/accord/coordinate/tracking/AppliedTracker.java @@ -26,7 +26,7 @@ import static accord.coordinate.tracking.AbstractTracker.ShardOutcomes.Fail; import static accord.coordinate.tracking.AbstractTracker.ShardOutcomes.NoChange; import static accord.coordinate.tracking.AbstractTracker.ShardOutcomes.Success; -public class AppliedTracker extends AbstractTracker<AppliedTracker.AppliedShardTracker> +public class AppliedTracker extends AbstractTracker<AppliedTracker.AppliedShardTracker> implements ResponseTracker { public static class AppliedShardTracker extends ShardTracker { diff --git a/accord-core/src/main/java/accord/coordinate/tracking/QuorumTracker.java b/accord-core/src/main/java/accord/coordinate/tracking/QuorumTracker.java index b20c0a70..c771406d 100644 --- a/accord-core/src/main/java/accord/coordinate/tracking/QuorumTracker.java +++ b/accord-core/src/main/java/accord/coordinate/tracking/QuorumTracker.java @@ -24,7 +24,7 @@ import accord.topology.Topologies; import static accord.coordinate.tracking.AbstractTracker.ShardOutcomes.*; -public class QuorumTracker extends AbstractTracker<QuorumTracker.QuorumShardTracker> +public class QuorumTracker extends AbstractTracker<QuorumTracker.QuorumShardTracker> implements ResponseTracker { public static class QuorumShardTracker extends ShardTracker { diff --git a/accord-core/src/main/java/accord/api/Data.java b/accord-core/src/main/java/accord/coordinate/tracking/ResponseTracker.java similarity index 70% copy from accord-core/src/main/java/accord/api/Data.java copy to accord-core/src/main/java/accord/coordinate/tracking/ResponseTracker.java index 24945648..5b2a5f5d 100644 --- a/accord-core/src/main/java/accord/api/Data.java +++ b/accord-core/src/main/java/accord/coordinate/tracking/ResponseTracker.java @@ -16,16 +16,12 @@ * limitations under the License. */ -package accord.api; +package accord.coordinate.tracking; -/** - * The result of some (potentially partial) {@link Read} from some {@link DataStore} - */ -public interface Data +import accord.local.Node; + +public interface ResponseTracker { - /** - * Combine the contents of the parameter with this object and return the resultant object. - * This method may modify the current object and return itself. - */ - Data merge(Data data); + RequestStatus recordSuccess(Node.Id node); + RequestStatus recordFailure(Node.Id node); } diff --git a/accord-core/src/main/java/accord/local/Node.java b/accord-core/src/main/java/accord/local/Node.java index 6492ddf5..b7115b1b 100644 --- a/accord-core/src/main/java/accord/local/Node.java +++ b/accord-core/src/main/java/accord/local/Node.java @@ -39,9 +39,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import accord.api.Agent; -import accord.api.ConfigurationService; - import accord.api.BarrierType; +import accord.api.ConfigurationService; import accord.api.ConfigurationService.EpochReady; import accord.api.DataStore; import accord.api.Key; @@ -51,11 +50,15 @@ import accord.api.Result; import accord.api.RoutingKey; import accord.api.Scheduler; import accord.api.TopologySorter; +import accord.coordinate.Barrier; import accord.config.LocalConfig; import accord.coordinate.CoordinateTransaction; +import accord.coordinate.Execute; import accord.coordinate.MaybeRecover; import accord.coordinate.Outcome; +import accord.coordinate.Persist; import accord.coordinate.RecoverWithRoute; +import accord.messages.Apply; import accord.messages.Callback; import accord.messages.LocalMessage; import accord.messages.Reply; @@ -68,7 +71,6 @@ import accord.primitives.FullRoute; import accord.primitives.ProgressToken; import accord.primitives.Range; import accord.primitives.Ranges; -import accord.coordinate.Barrier; import accord.primitives.Routable.Domain; import accord.primitives.Routables; import accord.primitives.Route; @@ -143,6 +145,9 @@ public class Node implements ConfigurationService.Listener, NodeTimeService private final ConfigurationService configService; private final TopologyManager topology; private final CommandStores commandStores; + private final Execute.Factory executionFactory; + private final Persist.Factory persistFactory; + private final Apply.Factory applyFactory; private final LongSupplier nowSupplier; private final ToLongFunction<TimeUnit> nowTimeUnit; @@ -160,13 +165,17 @@ public class Node implements ConfigurationService.Listener, NodeTimeService public Node(Id id, MessageSink messageSink, LocalMessage.Handler localMessageHandler, ConfigurationService configService, LongSupplier nowSupplier, ToLongFunction<TimeUnit> nowTimeUnit, Supplier<DataStore> dataSupplier, ShardDistributor shardDistributor, Agent agent, RandomSource random, Scheduler scheduler, TopologySorter.Supplier topologySorter, - Function<Node, ProgressLog.Factory> progressLogFactory, CommandStores.Factory factory, LocalConfig localConfig) + Function<Node, ProgressLog.Factory> progressLogFactory, CommandStores.Factory factory, Execute.Factory executionFactory, Persist.Factory persistFactory, Apply.Factory applyFactory, + LocalConfig localConfig) { this.id = id; this.localConfig = localConfig; this.messageSink = messageSink; this.localMessageHandler = localMessageHandler; this.configService = configService; + this.executionFactory = executionFactory; + this.persistFactory = persistFactory; + this.applyFactory = applyFactory; this.topology = new TopologyManager(topologySorter, id); this.nowSupplier = nowSupplier; this.nowTimeUnit = nowTimeUnit; @@ -676,11 +685,6 @@ public class Node implements ConfigurationService.Listener, NodeTimeService } public void receive (Request request, Id from, ReplyContext replyContext) - { - receive(request, from, replyContext, 0); - } - - public void receive (Request request, Id from, ReplyContext replyContext, long delayNanos) { long knownEpoch = request.knownEpoch(); if (knownEpoch > topology.epoch()) @@ -703,10 +707,22 @@ public class Node implements ConfigurationService.Listener, NodeTimeService reply(from, replyContext, null, t); } }; - if (delayNanos > 0) - scheduler.once(processMsg, delayNanos, TimeUnit.NANOSECONDS); - else - scheduler.now(processMsg); + scheduler.now(processMsg); + } + + public Execute.Factory executionFactory() + { + return executionFactory; + } + + public Persist.Factory persistFactory() + { + return persistFactory; + } + + public Apply.Factory applyFactory() + { + return applyFactory; } public Scheduler scheduler() diff --git a/accord-core/src/main/java/accord/local/SerializerSupport.java b/accord-core/src/main/java/accord/local/SerializerSupport.java index a42888bd..abdd7f19 100644 --- a/accord-core/src/main/java/accord/local/SerializerSupport.java +++ b/accord-core/src/main/java/accord/local/SerializerSupport.java @@ -17,20 +17,28 @@ */ package accord.local; +import java.util.Set; + +import com.google.common.collect.ImmutableSet; + import accord.api.Result; import accord.api.VisibleForImplementation; import accord.local.Command.WaitingOn; import accord.local.CommonAttributes.Mutable; -import accord.messages.*; +import accord.messages.Accept; +import accord.messages.Apply; +import accord.messages.ApplyThenWaitUntilApplied; +import accord.messages.BeginRecovery; +import accord.messages.Commit; +import accord.messages.MessageType; +import accord.messages.PreAccept; +import accord.messages.Propagate; import accord.primitives.Ballot; import accord.primitives.PartialDeps; import accord.primitives.PartialTxn; import accord.primitives.Timestamp; import accord.primitives.Writes; -import java.util.EnumSet; -import java.util.Set; - import static accord.messages.MessageType.APPLY_MAXIMAL_REQ; import static accord.messages.MessageType.APPLY_MINIMAL_REQ; import static accord.messages.MessageType.BEGIN_RECOVER_REQ; @@ -43,8 +51,6 @@ import static accord.messages.MessageType.PROPAGATE_PRE_ACCEPT_MSG; import static accord.primitives.PartialTxn.merge; import static accord.utils.Invariants.checkState; -import static java.util.EnumSet.of; - @VisibleForImplementation public class SerializerSupport { @@ -77,8 +83,8 @@ public class SerializerSupport } } - private static final EnumSet<MessageType> PRE_ACCEPT_TYPES = - of(PRE_ACCEPT_REQ, BEGIN_RECOVER_REQ, PROPAGATE_PRE_ACCEPT_MSG); + private static final Set<MessageType> PRE_ACCEPT_TYPES = + ImmutableSet.of(PRE_ACCEPT_REQ, BEGIN_RECOVER_REQ, PROPAGATE_PRE_ACCEPT_MSG); private static Command.PreAccepted preAccepted(Mutable attrs, Timestamp executeAt, Ballot promised, MessageProvider messageProvider) { @@ -106,8 +112,8 @@ public class SerializerSupport return Command.Accepted.accepted(attrs, status, executeAt, promised, accepted); } - private static final EnumSet<MessageType> PRE_ACCEPT_COMMIT_TYPES = - of(PRE_ACCEPT_REQ, BEGIN_RECOVER_REQ, PROPAGATE_PRE_ACCEPT_MSG, + private static final Set<MessageType> PRE_ACCEPT_COMMIT_TYPES = + ImmutableSet.of(PRE_ACCEPT_REQ, BEGIN_RECOVER_REQ, PROPAGATE_PRE_ACCEPT_MSG, COMMIT_MINIMAL_REQ, COMMIT_MAXIMAL_REQ, PROPAGATE_COMMIT_MSG); private static Command.Committed committed(Mutable attrs, SaveStatus status, Timestamp executeAt, Ballot promised, Ballot accepted, WaitingOnProvider waitingOnProvider, MessageProvider messageProvider) @@ -143,8 +149,8 @@ public class SerializerSupport return Command.Committed.committed(attrs, status, executeAt, promised, accepted, waitingOnProvider.provide(deps)); } - private static final EnumSet<MessageType> PRE_ACCEPT_COMMIT_APPLY_TYPES = - of(PRE_ACCEPT_REQ, BEGIN_RECOVER_REQ, PROPAGATE_PRE_ACCEPT_MSG, + private static final Set<MessageType> PRE_ACCEPT_COMMIT_APPLY_TYPES = + ImmutableSet.of(PRE_ACCEPT_REQ, BEGIN_RECOVER_REQ, PROPAGATE_PRE_ACCEPT_MSG, COMMIT_MINIMAL_REQ, COMMIT_MAXIMAL_REQ, PROPAGATE_COMMIT_MSG, APPLY_MINIMAL_REQ, APPLY_MAXIMAL_REQ, PROPAGATE_APPLY_MSG); @@ -175,19 +181,39 @@ public class SerializerSupport } else { - checkState(witnessed.contains(APPLY_MINIMAL_REQ)); + boolean haveApplyMinimal = witnessed.contains(APPLY_MINIMAL_REQ); + boolean haveCommitMaximal = witnessed.contains(COMMIT_MAXIMAL_REQ); - Apply apply = messageProvider.applyMinimal(); - writes = apply.writes; - result = apply.result; + Apply apply = null; + Commit commit = null; + String errorMessage = "Must have either an APPLY_MINIMAL_REQ or a COMMIT_MAXIMAL_REQ containing ApplyThenWaitUntilApplied"; + if (haveApplyMinimal) + { + apply = messageProvider.applyMinimal(); + writes = apply.writes; + result = apply.result; + } + else if (haveCommitMaximal) + { + commit = messageProvider.commitMaximal(); + checkState(commit.readData instanceof ApplyThenWaitUntilApplied, errorMessage); + ApplyThenWaitUntilApplied applyThenWaitUntilApplied = (ApplyThenWaitUntilApplied)commit.readData; + writes = applyThenWaitUntilApplied.writes; + result = applyThenWaitUntilApplied.txnResult; + } + else + { + throw new IllegalStateException(errorMessage); + } /* * NOTE: If Commit has been witnessed, we'll extract deps from there; * Apply has an expected TO-DO to stop including deps in such case. */ - if (witnessed.contains(COMMIT_MAXIMAL_REQ)) + if (haveCommitMaximal) { - Commit commit = messageProvider.commitMaximal(); + if (commit == null) + commit = messageProvider.commitMaximal(); txn = commit.partialTxn; deps = commit.partialDeps; } @@ -199,7 +225,7 @@ public class SerializerSupport } else if (witnessed.contains(COMMIT_MINIMAL_REQ)) { - Commit commit = messageProvider.commitMinimal(); + commit = messageProvider.commitMinimal(); txn = merge(apply.txn, merge(commit.partialTxn, txnFromPreAcceptOrBeginRecover(witnessed, messageProvider))); deps = commit.partialDeps; } @@ -216,8 +242,8 @@ public class SerializerSupport return Command.Executed.executed(attrs, status, executeAt, promised, accepted, waitingOnProvider.provide(deps), writes, result); } - private static final EnumSet<MessageType> APPLY_TYPES = - of(APPLY_MINIMAL_REQ, APPLY_MAXIMAL_REQ, PROPAGATE_APPLY_MSG); + private static final Set<MessageType> APPLY_TYPES = + ImmutableSet.of(APPLY_MINIMAL_REQ, APPLY_MAXIMAL_REQ, PROPAGATE_APPLY_MSG); private static Command.Truncated truncated(Mutable attrs, SaveStatus status, Timestamp executeAt, MessageProvider messageProvider) { diff --git a/accord-core/src/main/java/accord/messages/ReadTxnData.java b/accord-core/src/main/java/accord/messages/AbstractExecute.java similarity index 71% copy from accord-core/src/main/java/accord/messages/ReadTxnData.java copy to accord-core/src/main/java/accord/messages/AbstractExecute.java index 5dec9ba5..804c101a 100644 --- a/accord-core/src/main/java/accord/messages/ReadTxnData.java +++ b/accord-core/src/main/java/accord/messages/AbstractExecute.java @@ -18,6 +18,8 @@ package accord.messages; +import javax.annotation.Nullable; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,7 +37,6 @@ import accord.primitives.Ranges; import accord.primitives.Timestamp; import accord.primitives.TxnId; import accord.topology.Topologies; -import javax.annotation.Nullable; import static accord.local.SaveStatus.LocalExecution.WaitingToExecute; import static accord.local.Status.Committed; @@ -43,30 +44,18 @@ import static accord.messages.ReadData.ReadNack.NotCommitted; import static accord.messages.ReadData.ReadNack.Redundant; import static accord.utils.MapReduceConsume.forEach; -// TODO (required, efficiency): dedup - can currently have infinite pending reads that will be executed independently -public class ReadTxnData extends ReadData implements Command.TransientListener, EpochSupplier +public abstract class AbstractExecute extends ReadData implements Command.TransientListener, EpochSupplier { - private static final Logger logger = LoggerFactory.getLogger(ReadTxnData.class); - - public static class SerializerSupport - { - public static ReadTxnData create(TxnId txnId, Participants<?> scope, long executeAtEpoch, long waitForEpoch) - { - return new ReadTxnData(txnId, scope, executeAtEpoch, waitForEpoch); - } - } + private static final Logger logger = LoggerFactory.getLogger(AbstractExecute.class); class ObsoleteTracker implements Command.TransientListener { @Override public void onChange(SafeCommandStore safeStore, SafeCommand safeCommand) { - switch (safeCommand.current().status()) + switch (actionForStatus(safeCommand.current().status())) { - case PreApplied: - case Applied: - case Invalidated: - case Truncated: + case OBSOLETE: obsoleteAndSend(); safeCommand.removeListener(this); } @@ -75,7 +64,7 @@ public class ReadTxnData extends ReadData implements Command.TransientListener, @Override public PreLoadContext listenerPreLoadContext(TxnId caller) { - return ReadTxnData.this.listenerPreLoadContext(caller); + return AbstractExecute.this.listenerPreLoadContext(caller); } } @@ -85,13 +74,13 @@ public class ReadTxnData extends ReadData implements Command.TransientListener, final ObsoleteTracker obsoleteTracker = new ObsoleteTracker(); private transient State state = State.PENDING; // TODO (low priority, semantics): respond with the Executed result we have stored? - public ReadTxnData(Node.Id to, Topologies topologies, TxnId txnId, Participants<?> readScope, Timestamp executeAt) + public AbstractExecute(Node.Id to, Topologies topologies, TxnId txnId, Participants<?> readScope, Timestamp executeAt) { super(to, topologies, txnId, readScope); this.executeAtEpoch = executeAt.epoch(); } - protected ReadTxnData(TxnId txnId, Participants<?> readScope, long executeAtEpoch, long waitForEpoch) + public AbstractExecute(TxnId txnId, Participants<?> readScope, long waitForEpoch, long executeAtEpoch) { super(txnId, readScope, waitForEpoch); this.executeAtEpoch = executeAtEpoch; @@ -121,13 +110,34 @@ public class ReadTxnData extends ReadData implements Command.TransientListener, return PreLoadContext.contextFor(txnId, caller, keys()); } - @Override - public synchronized void onChange(SafeCommandStore safeStore, SafeCommand safeCommand) + protected enum Action { WAIT, EXECUTE, OBSOLETE } + + protected abstract boolean canExecutePreApplied(); + + /* + * Reading data definitely requires respecting obsoletion since it invalidates the read + * but writing data doesn't always make it necessary to fail the transaction due to preemption. + * At worst we do some duplicate work, and ignoring obsoletion means we don't have to fail the transaction at the + * original coordinator. + */ + protected boolean executeIfObsoleted() { - Command command = safeCommand.current(); - logger.trace("{}: updating as listener in response to change on {} with status {} ({})", - this, command.txnId(), command.status(), command); - switch (command.status()) + return false; + } + + // TODO (review): Is this too liberal in allowing old things to execute? + // would it be better to let things fail if coordinators compete? + Action maybeObsoleteOrExecute(Action action, Status status) + { + if (action == Action.OBSOLETE && executeIfObsoleted()) + // Just because it isn't obsolete doesn't mean it is ready to execute + return status.hasBeen(Status.ReadyToExecute) ? Action.EXECUTE : Action.WAIT; + return action; + } + + protected Action actionForStatus(Status status) + { + switch (status) { default: throw new AssertionError(); case NotDefined: @@ -136,15 +146,38 @@ public class ReadTxnData extends ReadData implements Command.TransientListener, case AcceptedInvalidate: case PreCommitted: case Committed: - return; + return Action.WAIT; case PreApplied: + return canExecutePreApplied() ? Action.EXECUTE : maybeObsoleteOrExecute(Action.OBSOLETE, status); + + case ReadyToExecute: + return Action.EXECUTE; + case Applied: + return maybeObsoleteOrExecute(Action.OBSOLETE, status); case Invalidated: case Truncated: + return Action.OBSOLETE; + } + } + + @Override + public synchronized void onChange(SafeCommandStore safeStore, SafeCommand safeCommand) + { + Command command = safeCommand.current(); + logger.trace("{}: updating as listener in response to change on {} with status {} ({})", + this, command.txnId(), command.status(), command); + + switch (actionForStatus(command.status())) + { + default: throw new AssertionError(); + case WAIT: + return; + case OBSOLETE: obsoleteAndSend(); return; - case ReadyToExecute: + case EXECUTE: } if (safeCommand.removeListener(this)) @@ -167,15 +200,10 @@ public class ReadTxnData extends ReadData implements Command.TransientListener, Status status = command.status(); logger.trace("{}: setting up read with status {} on {}", txnId, status, safeStore); - switch (status) { - default: - throw new AssertionError(); - case Committed: - case NotDefined: - case PreAccepted: - case Accepted: - case AcceptedInvalidate: - case PreCommitted: + switch (actionForStatus(status)) + { + default: throw new AssertionError(); + case WAIT: waitingOn.set(safeStore.commandStore().id()); ++waitingOnCount; safeCommand.addListener(this); @@ -183,15 +211,10 @@ public class ReadTxnData extends ReadData implements Command.TransientListener, safeStore.progressLog().waiting(safeCommand, WaitingToExecute, null, readScope); if (status == Committed) return null; else return NotCommitted; - - case PreApplied: - case Applied: - case Invalidated: - case Truncated: + case OBSOLETE: state = State.OBSOLETE; return Redundant; - - case ReadyToExecute: + case EXECUTE: waitingOn.set(safeStore.commandStore().id()); ++waitingOnCount; maybeRead(safeStore, safeCommand); @@ -231,7 +254,8 @@ public class ReadTxnData extends ReadData implements Command.TransientListener, @Override protected synchronized void readComplete(CommandStore commandStore, @Nullable Data result, @Nullable Ranges unavailable) { - // TODO (expected): we should unregister our listener, but this is quite costly today + // TODO (expected): lots of undesirable costs associated with the obsoletion tracker +// commandStore.execute(contextFor(txnId), safeStore -> safeStore.command(txnId).removeListener(obsoleteTracker)); super.readComplete(commandStore, result, unavailable); } @@ -241,42 +265,40 @@ public class ReadTxnData extends ReadData implements Command.TransientListener, switch (state) { case RETURNED: - throw new IllegalStateException("ReadOk was sent, yet ack called again", fail); + throw new IllegalStateException("ReadOk was sent, yet ack called again"); case OBSOLETE: logger.debug("After the read completed for txn {}, the result was marked obsolete", txnId); - if (fail != null) - node.agent().onUncaughtException(fail); break; case PENDING: state = State.RETURNED; - node.reply(replyTo, replyContext, fail == null ? new ReadOk(unavailable, data) : null, fail); + node.reply(replyTo, replyContext, fail == null ? constructReadOk(unavailable, data) : null, fail); break; default: throw new AssertionError("Unknown state: " + state); } } - private void removeListener(SafeCommandStore safeStore, TxnId txnId) + protected ReadOk constructReadOk(Ranges unavailable, Data data) { - safeStore.get(txnId, this, readScope).removeListener(this); + return new ReadOk(unavailable, data); } - @Override - protected void cancel() + private void removeListener(SafeCommandStore safeStore, TxnId txnId) { - node.commandStores().mapReduceConsume(this, waitingOn.stream(), forEach(in -> removeListener(in, txnId), node.agent())); + SafeCommand safeCommand = safeStore.ifInitialised(txnId); + safeCommand.removeListener(this); } @Override - public MessageType type() + protected void cancel() { - return MessageType.READ_REQ; + node.commandStores().mapReduceConsume(this, waitingOn.stream(), forEach(in -> removeListener(in, txnId), node.agent())); } @Override public String toString() { - return "ReadData{" + + return "ReadTxnData{" + "txnId:" + txnId + '}'; } diff --git a/accord-core/src/main/java/accord/messages/Apply.java b/accord-core/src/main/java/accord/messages/Apply.java index 3947bff4..a2e874be 100644 --- a/accord-core/src/main/java/accord/messages/Apply.java +++ b/accord-core/src/main/java/accord/messages/Apply.java @@ -45,6 +45,7 @@ import accord.topology.Topologies; public class Apply extends TxnRequest<ApplyReply> { + public static final Factory FACTORY = Apply::new; public static class SerializationSupport { public static Apply create(TxnId txnId, PartialRoute<?> scope, long waitForEpoch, Kind kind, Seekables<?, ?> keys, Timestamp executeAt, PartialDeps deps, PartialTxn txn, Writes writes, Result result) @@ -53,6 +54,11 @@ public class Apply extends TxnRequest<ApplyReply> } } + public interface Factory + { + Apply create(Kind kind, Id to, Topologies participates, Topologies executes, TxnId txnId, Route<?> route, Txn txn, Timestamp executeAt, Deps deps, Writes writes, Result result); + } + public final Kind kind; public final Timestamp executeAt; public final Seekables<?, ?> keys; @@ -63,7 +69,7 @@ public class Apply extends TxnRequest<ApplyReply> public enum Kind { Minimal, Maximal } - private Apply(Kind kind, Id to, Topologies participates, Topologies executes, TxnId txnId, Route<?> route, Txn txn, Timestamp executeAt, Deps deps, Writes writes, Result result) + protected Apply(Kind kind, Id to, Topologies participates, Topologies executes, TxnId txnId, Route<?> route, Txn txn, Timestamp executeAt, Deps deps, Writes writes, Result result) { super(to, participates, route, txnId); Ranges slice = kind == Kind.Maximal || executes == participates ? scope.covering() : executes.computeRangesForNode(to); @@ -82,14 +88,14 @@ public class Apply extends TxnRequest<ApplyReply> { Topologies executes = executes(node, route, executeAt); Topologies participates = participates(node, route, txnId, executeAt, executes); - node.send(participates.nodes(), to -> applyMaximal(to, participates, executes, txnId, route, txn, executeAt, deps, writes, result)); + node.send(participates.nodes(), to -> applyMaximal(FACTORY, to, participates, executes, txnId, route, txn, executeAt, deps, writes, result)); } public static void sendMaximal(Node node, Id to, TxnId txnId, Route<?> route, Txn txn, Timestamp executeAt, Deps deps, Writes writes, Result result) { Topologies executes = executes(node, route, executeAt); Topologies participates = participates(node, route, txnId, executeAt, executes); - node.send(to, applyMaximal(to, participates, executes, txnId, route, txn, executeAt, deps, writes, result)); + node.send(to, applyMaximal(FACTORY, to, participates, executes, txnId, route, txn, executeAt, deps, writes, result)); } public static Topologies executes(Node node, Unseekables<?> route, Timestamp executeAt) @@ -102,17 +108,17 @@ public class Apply extends TxnRequest<ApplyReply> return txnId.epoch() == executeAt.epoch() ? executes : node.topology().preciseEpochs(route, txnId.epoch(), executeAt.epoch()); } - public static Apply applyMinimal(Id to, Topologies sendTo, Topologies applyTo, TxnId txnId, Route<?> route, Txn txn, Timestamp executeAt, Deps deps, Writes writes, Result result) + public static Apply applyMinimal(Factory factory, Id to, Topologies sendTo, Topologies applyTo, TxnId txnId, Route<?> route, Txn txn, Timestamp executeAt, Deps deps, Writes writes, Result result) { - return new Apply(Kind.Minimal, to, sendTo, applyTo, txnId, route, txn, executeAt, deps, writes, result); + return factory.create(Kind.Minimal, to, sendTo, applyTo, txnId, route, txn, executeAt, deps, writes, result); } - public static Apply applyMaximal(Id to, Topologies participates, Topologies executes, TxnId txnId, Route<?> route, Txn txn, Timestamp executeAt, Deps deps, Writes writes, Result result) + public static Apply applyMaximal(Factory factory, Id to, Topologies participates, Topologies executes, TxnId txnId, Route<?> route, Txn txn, Timestamp executeAt, Deps deps, Writes writes, Result result) { - return new Apply(Kind.Maximal, to, participates, executes, txnId, route, txn, executeAt, deps, writes, result); + return factory.create(Kind.Maximal, to, participates, executes, txnId, route, txn, executeAt, deps, writes, result); } - private Apply(Kind kind, TxnId txnId, PartialRoute<?> route, long waitForEpoch, Seekables<?, ?> keys, Timestamp executeAt, PartialDeps deps, @Nullable PartialTxn txn, Writes writes, Result result) + protected Apply(Kind kind, TxnId txnId, PartialRoute<?> route, long waitForEpoch, Seekables<?, ?> keys, Timestamp executeAt, PartialDeps deps, @Nullable PartialTxn txn, Writes writes, Result result) { super(txnId, route, waitForEpoch); this.kind = kind; @@ -141,6 +147,11 @@ public class Apply extends TxnRequest<ApplyReply> public static ApplyReply apply(SafeCommandStore safeStore, PartialTxn txn, TxnId txnId, Timestamp executeAt, PartialDeps deps, PartialRoute<?> scope, Writes writes, Result result, RoutingKey progressKey) { SafeCommand safeCommand = safeStore.get(txnId, executeAt, scope); + return apply(safeStore, safeCommand, txn, txnId, executeAt, deps, scope, writes, result, progressKey); + } + + public static ApplyReply apply(SafeCommandStore safeStore, SafeCommand safeCommand, PartialTxn txn, TxnId txnId, Timestamp executeAt, PartialDeps deps, PartialRoute<?> scope, Writes writes, Result result, RoutingKey progressKey) + { switch (Commands.apply(safeStore, safeCommand, txnId, scope, progressKey, executeAt, deps, txn, writes, result)) { default: diff --git a/accord-core/src/main/java/accord/messages/ApplyThenWaitUntilApplied.java b/accord-core/src/main/java/accord/messages/ApplyThenWaitUntilApplied.java index 79a5b35d..02b4fc19 100644 --- a/accord-core/src/main/java/accord/messages/ApplyThenWaitUntilApplied.java +++ b/accord-core/src/main/java/accord/messages/ApplyThenWaitUntilApplied.java @@ -38,6 +38,8 @@ import accord.primitives.Writes; /* * Used by local and global inclusive sync points to effect the sync point at each node * Combines commit, execute (with nothing really to execute), and apply into one request/response + * + * This returns when the dependencies are Applied, but doesn't wait for this transaction to be Applied. */ public class ApplyThenWaitUntilApplied extends WaitUntilApplied { diff --git a/accord-core/src/main/java/accord/messages/Commit.java b/accord-core/src/main/java/accord/messages/Commit.java index 2e30e055..a1b0404b 100644 --- a/accord-core/src/main/java/accord/messages/Commit.java +++ b/accord-core/src/main/java/accord/messages/Commit.java @@ -50,6 +50,7 @@ import accord.topology.Topology; import accord.utils.Invariants; import accord.utils.TriFunction; +import static accord.messages.Commit.Kind.Maximal; import static accord.utils.Invariants.checkArgument; public class Commit extends TxnRequest<ReadNack> @@ -116,7 +117,7 @@ public class Commit extends TxnRequest<ReadNack> this.readData = toExecuteFactory.apply(partialTxn != null ? partialTxn : txn, scope, partialDeps); } - Commit(Kind kind, TxnId txnId, PartialRoute<?> scope, long waitForEpoch, Timestamp executeAt, @Nullable PartialTxn partialTxn, PartialDeps partialDeps, @Nullable FullRoute<?> fullRoute, @Nullable ReadData readData) + protected Commit(Kind kind, TxnId txnId, PartialRoute<?> scope, long waitForEpoch, Timestamp executeAt, @Nullable PartialTxn partialTxn, PartialDeps partialDeps, @Nullable FullRoute<?> fullRoute, @Nullable ReadData readData) { super(txnId, scope, waitForEpoch); this.kind = kind; @@ -173,6 +174,14 @@ public class Commit extends TxnRequest<ReadNack> } } + public static void commitMaximal(Node node, Node.Id to, Txn txn, TxnId txnId, Timestamp executeAt, FullRoute<?> route, Deps deps, Participants<?> readScope) + { + // the replica may be missing the original commit, or the additional commit, so send everything + Topologies topology = node.topology().preciseEpochs(route, txnId.epoch(), executeAt.epoch()); + Topology coordinateTopology = topology.forEpoch(txnId.epoch()); + node.send(to, new Commit(Maximal, to, coordinateTopology, topology, txnId, txn, route, readScope, executeAt, deps, false)); + } + @Override public TxnId primaryTxnId() { diff --git a/accord-core/src/main/java/accord/messages/InformHomeDurable.java b/accord-core/src/main/java/accord/messages/InformHomeDurable.java index 36d1a242..6f2491db 100644 --- a/accord-core/src/main/java/accord/messages/InformHomeDurable.java +++ b/accord-core/src/main/java/accord/messages/InformHomeDurable.java @@ -20,6 +20,8 @@ package accord.messages; import java.util.Set; +import com.google.common.collect.ImmutableSet; + import accord.local.Commands; import accord.local.Node; import accord.local.Node.Id; @@ -48,7 +50,7 @@ public class InformHomeDurable implements Request this.route = route; this.executeAt = executeAt; this.durability = durability; - this.persistedOn = persistedOn; + this.persistedOn = ImmutableSet.copyOf(persistedOn); // Persisted on might be mutated later } @Override diff --git a/accord-core/src/main/java/accord/messages/MessageType.java b/accord-core/src/main/java/accord/messages/MessageType.java index 42f1ad48..a3fe592a 100644 --- a/accord-core/src/main/java/accord/messages/MessageType.java +++ b/accord-core/src/main/java/accord/messages/MessageType.java @@ -17,59 +17,92 @@ */ package accord.messages; -import static accord.messages.MessageType.Kind.LOCAL; +import java.lang.reflect.Field; +import java.lang.reflect.Modifier; +import java.util.List; + +import com.google.common.collect.ImmutableList; + import static accord.messages.MessageType.Kind.REMOTE; +import static accord.messages.MessageType.Kind.LOCAL; /** * Meant to assist implementations with mapping accord messages to their own messaging systems. */ -public enum MessageType +public class MessageType { - SIMPLE_RSP (REMOTE, false), - FAILURE_RSP (REMOTE, false), - PRE_ACCEPT_REQ (REMOTE, true ), - PRE_ACCEPT_RSP (REMOTE, false), - ACCEPT_REQ (REMOTE, true ), - ACCEPT_RSP (REMOTE, false), - ACCEPT_INVALIDATE_REQ (REMOTE, true ), - GET_DEPS_REQ (REMOTE, false), - GET_DEPS_RSP (REMOTE, false), - COMMIT_MINIMAL_REQ (REMOTE, true ), - COMMIT_MAXIMAL_REQ (REMOTE, true ), - COMMIT_INVALIDATE_REQ (REMOTE, true ), - APPLY_MINIMAL_REQ (REMOTE, true ), - APPLY_MAXIMAL_REQ (REMOTE, true ), - APPLY_RSP (REMOTE, false), - READ_REQ (REMOTE, false), - READ_RSP (REMOTE, false), - BEGIN_RECOVER_REQ (REMOTE, true ), - BEGIN_RECOVER_RSP (REMOTE, false), - BEGIN_INVALIDATE_REQ (REMOTE, true ), - BEGIN_INVALIDATE_RSP (REMOTE, false), - WAIT_ON_COMMIT_REQ (REMOTE, false), - WAIT_ON_COMMIT_RSP (REMOTE, false), - WAIT_UNTIL_APPLIED_REQ (REMOTE, false), - INFORM_OF_TXN_REQ (REMOTE, true ), - INFORM_DURABLE_REQ (REMOTE, true ), - INFORM_HOME_DURABLE_REQ (REMOTE, true ), - CHECK_STATUS_REQ (REMOTE, false), - CHECK_STATUS_RSP (REMOTE, false), - FETCH_DATA_REQ (REMOTE, false), - FETCH_DATA_RSP (REMOTE, false), - SET_SHARD_DURABLE_REQ (REMOTE, true ), - SET_GLOBALLY_DURABLE_REQ (REMOTE, true ), - QUERY_DURABLE_BEFORE_REQ (REMOTE, false), - QUERY_DURABLE_BEFORE_RSP (REMOTE, false), - APPLY_AND_WAIT_UNTIL_APPLIED_REQ (REMOTE, true ), - - PROPAGATE_PRE_ACCEPT_MSG (LOCAL, true ), - PROPAGATE_COMMIT_MSG (LOCAL, true ), - PROPAGATE_APPLY_MSG (LOCAL, true ), - PROPAGATE_OTHER_MSG (LOCAL, true ), - ; + public static final MessageType SIMPLE_RSP = mt(REMOTE, false); + public static final MessageType FAILURE_RSP = mt(REMOTE, false); + public static final MessageType PRE_ACCEPT_REQ = mt(REMOTE, true ); + public static final MessageType PRE_ACCEPT_RSP = mt(REMOTE, false); + public static final MessageType ACCEPT_REQ = mt(REMOTE, true ); + public static final MessageType ACCEPT_RSP = mt(REMOTE, false); + public static final MessageType ACCEPT_INVALIDATE_REQ = mt(REMOTE, true ); + public static final MessageType GET_DEPS_REQ = mt(REMOTE, false); + public static final MessageType GET_DEPS_RSP = mt(REMOTE, false); + public static final MessageType COMMIT_MINIMAL_REQ = mt(REMOTE, true ); + public static final MessageType COMMIT_MAXIMAL_REQ = mt(REMOTE, true ); + public static final MessageType COMMIT_INVALIDATE_REQ = mt(REMOTE, true ); + public static final MessageType APPLY_MINIMAL_REQ = mt(REMOTE, true ); + public static final MessageType APPLY_MAXIMAL_REQ = mt(REMOTE, true ); + public static final MessageType APPLY_RSP = mt(REMOTE, false); + public static final MessageType READ_REQ = mt(REMOTE, false); + public static final MessageType READ_RSP = mt(REMOTE, false); + public static final MessageType BEGIN_RECOVER_REQ = mt(REMOTE, true ); + public static final MessageType BEGIN_RECOVER_RSP = mt(REMOTE, false); + public static final MessageType BEGIN_INVALIDATE_REQ = mt(REMOTE, true ); + public static final MessageType BEGIN_INVALIDATE_RSP = mt(REMOTE, false); + public static final MessageType WAIT_ON_COMMIT_REQ = mt(REMOTE, false); + public static final MessageType WAIT_ON_COMMIT_RSP = mt(REMOTE, false); + public static final MessageType WAIT_UNTIL_APPLIED_REQ = mt(REMOTE, false); + public static final MessageType INFORM_OF_TXN_REQ = mt(REMOTE, true ); + public static final MessageType INFORM_DURABLE_REQ = mt(REMOTE, true ); + public static final MessageType INFORM_HOME_DURABLE_REQ = mt(REMOTE, true ); + public static final MessageType CHECK_STATUS_REQ = mt(REMOTE, false); + public static final MessageType CHECK_STATUS_RSP = mt(REMOTE, false); + public static final MessageType FETCH_DATA_REQ = mt(REMOTE, false); + public static final MessageType FETCH_DATA_RSP = mt(REMOTE, false); + public static final MessageType SET_SHARD_DURABLE_REQ = mt(REMOTE, true ); + public static final MessageType SET_GLOBALLY_DURABLE_REQ = mt(REMOTE, true ); + public static final MessageType QUERY_DURABLE_BEFORE_REQ = mt(REMOTE, false); + public static final MessageType QUERY_DURABLE_BEFORE_RSP = mt(REMOTE, false); + public static final MessageType APPLY_AND_WAIT_UNTIL_APPLIED_REQ = mt(REMOTE, true ); + + public static final MessageType PROPAGATE_PRE_ACCEPT_MSG = mt(LOCAL, true ); + public static final MessageType PROPAGATE_COMMIT_MSG = mt(LOCAL, true ); + public static final MessageType PROPAGATE_APPLY_MSG = mt(LOCAL, true ); + public static final MessageType PROPAGATE_OTHER_MSG = mt(LOCAL, true ); + public enum Kind { LOCAL, REMOTE } + public static final List<MessageType> values; + + static + { + ImmutableList.Builder<MessageType> builder = ImmutableList.builder(); + for (Field f : MessageType.class.getDeclaredFields()) + { + if (f.getType().equals(MessageType.class) && Modifier.isStatic(f.getModifiers())) + { + try + { + builder.add((MessageType) f.get(null)); + } + catch (IllegalAccessException e) + { + throw new RuntimeException(e); + } + } + } + values = builder.build(); + } + + protected static MessageType mt(Kind kind, boolean hasSideEffects) + { + return new MessageType(kind, hasSideEffects); + } + /** * LOCAL messages are not sent to remote nodes. */ @@ -80,7 +113,7 @@ public enum MessageType */ private final boolean hasSideEffects; - MessageType(Kind kind, boolean hasSideEffects) + protected MessageType(Kind kind, boolean hasSideEffects) { this.hasSideEffects = hasSideEffects; this.kind = kind; diff --git a/accord-core/src/main/java/accord/messages/ReadData.java b/accord-core/src/main/java/accord/messages/ReadData.java index 3eacce68..889b7067 100644 --- a/accord-core/src/main/java/accord/messages/ReadData.java +++ b/accord-core/src/main/java/accord/messages/ReadData.java @@ -36,12 +36,14 @@ import accord.primitives.Timestamp; import accord.primitives.TxnId; import accord.topology.Topologies; import accord.utils.Invariants; +import accord.utils.async.AsyncChain; import static accord.messages.MessageType.READ_RSP; import static accord.messages.TxnRequest.computeWaitForEpoch; import static accord.messages.TxnRequest.latestRelevantEpochIndex; // TODO (required, efficiency): dedup - can currently have infinite pending reads that will be executed independently +// TODO (review) this is really more at its core Execute rather than read because we use it to execute all kinds of things now and we should maybe rename it? public abstract class ReadData extends AbstractEpochRequest<ReadNack> { private static final Logger logger = LoggerFactory.getLogger(ReadData.class); @@ -56,11 +58,10 @@ public abstract class ReadData extends AbstractEpochRequest<ReadNack> ReadType(int val) { - this.val = (byte)val; + this.val = (byte) val; } - @SuppressWarnings("unused") - public static ReadType fromValue(byte val) + public static ReadType valueOf(int val) { switch (val) { @@ -79,6 +80,7 @@ public abstract class ReadData extends AbstractEpochRequest<ReadNack> // TODO (expected, cleanup): should this be a Route? public final Participants<?> readScope; private final long waitForEpoch; + private Data data; transient BitSet waitingOn; transient int waitingOnCount; @@ -144,10 +146,10 @@ public abstract class ReadData extends AbstractEpochRequest<ReadNack> node.agent().onUncaughtException(failure); cancel(); } - else - { + + // Unless failed always ack to indicate setup has completed otherwise the counter never gets to -1 + if (failure == null) ack(null); - } } private void ack(@Nullable Ranges newUnavailable) @@ -180,12 +182,17 @@ public abstract class ReadData extends AbstractEpochRequest<ReadNack> ack(unavailable); } + protected AsyncChain<Data> execute(SafeCommandStore safeStore, Timestamp executeAt, PartialTxn txn) + { + return txn.read(safeStore, executeAt); + } + void read(SafeCommandStore safeStore, Timestamp executeAt, PartialTxn txn) { CommandStore unsafeStore = safeStore.commandStore(); Ranges unavailable = safeStore.ranges().unsafeToReadAt(executeAt); - txn.read(safeStore, executeAt).begin((next, throwable) -> { + execute(safeStore, executeAt, txn).begin((next, throwable) -> { if (throwable != null) { // TODO (expected, exceptions): should send exception to client, and consistency handle/propagate locally diff --git a/accord-core/src/main/java/accord/messages/ReadTxnData.java b/accord-core/src/main/java/accord/messages/ReadTxnData.java index 5dec9ba5..69e7a7ba 100644 --- a/accord-core/src/main/java/accord/messages/ReadTxnData.java +++ b/accord-core/src/main/java/accord/messages/ReadTxnData.java @@ -18,36 +18,15 @@ package accord.messages; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import accord.api.Data; -import accord.local.Command; -import accord.local.CommandStore; import accord.local.Node; -import accord.local.PreLoadContext; -import accord.local.SafeCommand; -import accord.local.SafeCommandStore; -import accord.local.Status; -import accord.primitives.EpochSupplier; import accord.primitives.Participants; -import accord.primitives.Ranges; import accord.primitives.Timestamp; import accord.primitives.TxnId; import accord.topology.Topologies; -import javax.annotation.Nullable; - -import static accord.local.SaveStatus.LocalExecution.WaitingToExecute; -import static accord.local.Status.Committed; -import static accord.messages.ReadData.ReadNack.NotCommitted; -import static accord.messages.ReadData.ReadNack.Redundant; -import static accord.utils.MapReduceConsume.forEach; // TODO (required, efficiency): dedup - can currently have infinite pending reads that will be executed independently -public class ReadTxnData extends ReadData implements Command.TransientListener, EpochSupplier +public class ReadTxnData extends AbstractExecute { - private static final Logger logger = LoggerFactory.getLogger(ReadTxnData.class); - public static class SerializerSupport { public static ReadTxnData create(TxnId txnId, Participants<?> scope, long executeAtEpoch, long waitForEpoch) @@ -56,215 +35,19 @@ public class ReadTxnData extends ReadData implements Command.TransientListener, } } - class ObsoleteTracker implements Command.TransientListener - { - @Override - public void onChange(SafeCommandStore safeStore, SafeCommand safeCommand) - { - switch (safeCommand.current().status()) - { - case PreApplied: - case Applied: - case Invalidated: - case Truncated: - obsoleteAndSend(); - safeCommand.removeListener(this); - } - } - - @Override - public PreLoadContext listenerPreLoadContext(TxnId caller) - { - return ReadTxnData.this.listenerPreLoadContext(caller); - } - } - - private enum State { PENDING, RETURNED, OBSOLETE } - - public final long executeAtEpoch; - final ObsoleteTracker obsoleteTracker = new ObsoleteTracker(); - private transient State state = State.PENDING; // TODO (low priority, semantics): respond with the Executed result we have stored? - public ReadTxnData(Node.Id to, Topologies topologies, TxnId txnId, Participants<?> readScope, Timestamp executeAt) { - super(to, topologies, txnId, readScope); - this.executeAtEpoch = executeAt.epoch(); - } - - protected ReadTxnData(TxnId txnId, Participants<?> readScope, long executeAtEpoch, long waitForEpoch) - { - super(txnId, readScope, waitForEpoch); - this.executeAtEpoch = executeAtEpoch; - } - - @Override - public ReadType kind() - { - return ReadType.readTxnData; - } - - @Override - protected long executeAtEpoch() - { - return executeAtEpoch; - } - - @Override - public long epoch() - { - return executeAtEpoch; - } - - @Override - public PreLoadContext listenerPreLoadContext(TxnId caller) - { - return PreLoadContext.contextFor(txnId, caller, keys()); - } - - @Override - public synchronized void onChange(SafeCommandStore safeStore, SafeCommand safeCommand) - { - Command command = safeCommand.current(); - logger.trace("{}: updating as listener in response to change on {} with status {} ({})", - this, command.txnId(), command.status(), command); - switch (command.status()) - { - default: throw new AssertionError(); - case NotDefined: - case PreAccepted: - case Accepted: - case AcceptedInvalidate: - case PreCommitted: - case Committed: - return; - - case PreApplied: - case Applied: - case Invalidated: - case Truncated: - obsoleteAndSend(); - return; - case ReadyToExecute: - } - - if (safeCommand.removeListener(this)) - maybeRead(safeStore, safeCommand); - } - - @Override - public synchronized ReadNack apply(SafeCommandStore safeStore) - { - SafeCommand safeCommand = safeStore.get(txnId, this, readScope); - return apply(safeStore, safeCommand); - } - - protected synchronized ReadNack apply(SafeCommandStore safeStore, SafeCommand safeCommand) - { - if (state != State.PENDING) - return null; - - Command command = safeCommand.current(); - Status status = command.status(); - - logger.trace("{}: setting up read with status {} on {}", txnId, status, safeStore); - switch (status) { - default: - throw new AssertionError(); - case Committed: - case NotDefined: - case PreAccepted: - case Accepted: - case AcceptedInvalidate: - case PreCommitted: - waitingOn.set(safeStore.commandStore().id()); - ++waitingOnCount; - safeCommand.addListener(this); - - safeStore.progressLog().waiting(safeCommand, WaitingToExecute, null, readScope); - if (status == Committed) return null; - else return NotCommitted; - - case PreApplied: - case Applied: - case Invalidated: - case Truncated: - state = State.OBSOLETE; - return Redundant; - - case ReadyToExecute: - waitingOn.set(safeStore.commandStore().id()); - ++waitingOnCount; - maybeRead(safeStore, safeCommand); - return null; - } + super(to, topologies, txnId, readScope, executeAt); } - synchronized void obsoleteAndSend() + public ReadTxnData(TxnId txnId, Participants<?> readScope, long waitForEpoch, long executeAtEpoch) { - if (state == State.PENDING) - { - state = State.OBSOLETE; - node.reply(replyTo, replyContext, Redundant, null); - } + super(txnId, readScope, waitForEpoch, executeAtEpoch); } - void maybeRead(SafeCommandStore safeStore, SafeCommand safeCommand) + protected boolean canExecutePreApplied() { - switch (state) - { - case PENDING: - Command command = safeCommand.current(); - logger.trace("{}: executing read", command.txnId()); - safeCommand.addListener(obsoleteTracker); - read(safeStore, command.executeAt(), command.partialTxn()); - break; - case OBSOLETE: - // nothing to see here - break; - case RETURNED: - throw new IllegalStateException("ReadOk was sent, yet ack called again"); - default: - throw new AssertionError("Unknown state: " + state); - } - } - - @Override - protected synchronized void readComplete(CommandStore commandStore, @Nullable Data result, @Nullable Ranges unavailable) - { - // TODO (expected): we should unregister our listener, but this is quite costly today - super.readComplete(commandStore, result, unavailable); - } - - @Override - protected void reply(@Nullable Ranges unavailable, @Nullable Data data, @Nullable Throwable fail) - { - switch (state) - { - case RETURNED: - throw new IllegalStateException("ReadOk was sent, yet ack called again", fail); - case OBSOLETE: - logger.debug("After the read completed for txn {}, the result was marked obsolete", txnId); - if (fail != null) - node.agent().onUncaughtException(fail); - break; - case PENDING: - state = State.RETURNED; - node.reply(replyTo, replyContext, fail == null ? new ReadOk(unavailable, data) : null, fail); - break; - default: - throw new AssertionError("Unknown state: " + state); - } - } - - private void removeListener(SafeCommandStore safeStore, TxnId txnId) - { - safeStore.get(txnId, this, readScope).removeListener(this); - } - - @Override - protected void cancel() - { - node.commandStores().mapReduceConsume(this, waitingOn.stream(), forEach(in -> removeListener(in, txnId), node.agent())); + return false; } @Override @@ -272,12 +55,4 @@ public class ReadTxnData extends ReadData implements Command.TransientListener, { return MessageType.READ_REQ; } - - @Override - public String toString() - { - return "ReadData{" + - "txnId:" + txnId + - '}'; - } } diff --git a/accord-core/src/main/java/accord/messages/WaitUntilApplied.java b/accord-core/src/main/java/accord/messages/WaitUntilApplied.java index 730055c4..63b38810 100644 --- a/accord-core/src/main/java/accord/messages/WaitUntilApplied.java +++ b/accord-core/src/main/java/accord/messages/WaitUntilApplied.java @@ -45,6 +45,9 @@ import static accord.messages.ReadData.ReadNack.NotCommitted; import static accord.messages.ReadData.ReadNack.Redundant; import static accord.utils.MapReduceConsume.forEach; +/** + * Wait until the dependencies for this transaction are Applied. Does not wait until this transaction is Applied. + */ // TODO (required, efficiency): dedup - can currently have infinite pending reads that will be executed independently public class WaitUntilApplied extends ReadData implements Command.TransientListener, EpochSupplier { @@ -234,7 +237,9 @@ public class WaitUntilApplied extends ReadData implements Command.TransientListe private void removeListener(SafeCommandStore safeStore, TxnId txnId) { - safeStore.get(txnId, this, readScope).removeListener(this); + SafeCommand safeCommand = safeStore.ifInitialised(txnId); + if (safeCommand != null) + safeCommand.removeListener(this); } @Override @@ -258,7 +263,7 @@ public class WaitUntilApplied extends ReadData implements Command.TransientListe @Override public String toString() { - return "WaitForApply{" + + return "WaitUntilApplied{" + "txnId:" + txnId + '}'; } diff --git a/accord-core/src/main/java/accord/primitives/Keys.java b/accord-core/src/main/java/accord/primitives/Keys.java index 6c3336db..ee358d66 100644 --- a/accord-core/src/main/java/accord/primitives/Keys.java +++ b/accord-core/src/main/java/accord/primitives/Keys.java @@ -18,12 +18,17 @@ package accord.primitives; -import java.util.*; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Set; +import java.util.SortedSet; import java.util.function.Function; import accord.api.Key; -import accord.utils.*; +import accord.utils.ArrayBuffers; import accord.utils.ArrayBuffers.ObjectBuffers; +import accord.utils.SortedArrays; import static accord.utils.ArrayBuffers.cachedKeys; import static accord.utils.SortedArrays.isSortedUnique; @@ -41,6 +46,8 @@ public class Keys extends AbstractKeys<Key> implements Seekables<Key, Keys> public static final Keys EMPTY = new Keys(new Key[0]); + private static final Key[] EMPTY_KEYS_ARRAY = new Key[0]; + public Keys(SortedSet<? extends Key> keys) { super(keys.toArray(new Key[0])); @@ -115,7 +122,9 @@ public class Keys extends AbstractKeys<Key> implements Seekables<Key, Keys> public static Keys of(Collection<? extends Key> keys) { - return of(keys.toArray(new Key[0])); + if (keys.isEmpty()) + return Keys.EMPTY; + return of(keys.toArray(EMPTY_KEYS_ARRAY)); } public static <V> Keys of(Collection<V> input, Function<? super V, ? extends Key> transform) diff --git a/accord-core/src/main/java/accord/primitives/RoutingKeys.java b/accord-core/src/main/java/accord/primitives/RoutingKeys.java index 1f1953a6..81212ef7 100644 --- a/accord-core/src/main/java/accord/primitives/RoutingKeys.java +++ b/accord-core/src/main/java/accord/primitives/RoutingKeys.java @@ -18,6 +18,8 @@ package accord.primitives; +import java.util.Collection; + import accord.api.RoutingKey; import accord.utils.SortedArrays; @@ -38,6 +40,8 @@ public class RoutingKeys extends AbstractUnseekableKeys implements Unseekables<R public static final RoutingKeys EMPTY = new RoutingKeys(new RoutingKey[0]); + private static final RoutingKey[] EMPTY_KEYS_ARRAY = new RoutingKey[0]; + RoutingKeys(RoutingKey[] keys) { super(keys); @@ -48,6 +52,13 @@ public class RoutingKeys extends AbstractUnseekableKeys implements Unseekables<R return new RoutingKeys(toUnique(sort(keys))); } + public static RoutingKeys of(Collection<? extends RoutingKey> keys) + { + if (keys.isEmpty()) + return EMPTY; + return of(keys.toArray(EMPTY_KEYS_ARRAY)); + } + public static RoutingKeys ofSortedUnique(RoutingKey ... keys) { checkArgument(isSortedUnique(keys)); diff --git a/accord-core/src/main/java/accord/primitives/Txn.java b/accord-core/src/main/java/accord/primitives/Txn.java index 93490869..5a792918 100644 --- a/accord-core/src/main/java/accord/primitives/Txn.java +++ b/accord-core/src/main/java/accord/primitives/Txn.java @@ -21,16 +21,18 @@ package accord.primitives; import java.util.ArrayList; import java.util.List; import java.util.Objects; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; -import accord.api.*; +import accord.api.Data; +import accord.api.Query; +import accord.api.Read; +import accord.api.Result; +import accord.api.Update; import accord.local.SafeCommandStore; import accord.utils.async.AsyncChain; import accord.utils.async.AsyncChains; -import javax.annotation.Nonnull; -import javax.annotation.Nullable; - - public interface Txn { enum Kind @@ -268,8 +270,8 @@ public interface Txn default AsyncChain<Data> read(SafeCommandStore safeStore, Timestamp executeAt) { Ranges ranges = safeStore.ranges().safeToReadAt(executeAt); - List<AsyncChain<Data>> chains = Routables.foldlMinimal(keys(), ranges, (key, accumulate, index) -> { - AsyncChain<Data> result = read().read(key, kind(), safeStore, executeAt, safeStore.dataStore()); + List<AsyncChain<Data>> chains = Routables.foldlMinimal(read().keys(), ranges, (key, accumulate, index) -> { + AsyncChain<Data> result = read().read(key, safeStore, executeAt, safeStore.dataStore()); accumulate.add(result); return accumulate; }, new ArrayList<>()); diff --git a/accord-core/src/main/java/accord/primitives/Writes.java b/accord-core/src/main/java/accord/primitives/Writes.java index 08fc9442..e470d074 100644 --- a/accord-core/src/main/java/accord/primitives/Writes.java +++ b/accord-core/src/main/java/accord/primitives/Writes.java @@ -18,15 +18,16 @@ package accord.primitives; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; + import accord.api.Write; import accord.local.SafeCommandStore; import accord.utils.async.AsyncChain; import accord.utils.async.AsyncChains; import javax.annotation.Nullable; -import java.util.ArrayList; -import java.util.List; -import java.util.Objects; public class Writes { @@ -50,7 +51,7 @@ public class Writes if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; Writes writes = (Writes) o; - return executeAt.equals(writes.executeAt) && keys.equals(writes.keys) && Objects.equals(write, writes.write); + return txnId.equals(writes.txnId) && executeAt.equals(writes.executeAt) && keys.equals(writes.keys) && Objects.equals(write, writes.write); } public boolean isEmpty() @@ -61,7 +62,7 @@ public class Writes @Override public int hashCode() { - return Objects.hash(executeAt, keys, write); + return Objects.hash(txnId, executeAt, keys, write); } public AsyncChain<Void> apply(SafeCommandStore safeStore, Ranges ranges, PartialTxn txn) @@ -83,7 +84,8 @@ public class Writes public String toString() { return "TxnWrites{" + - "executeAt:" + executeAt + + "txnId:" + txnId + + ", executeAt:" + executeAt + ", keys:" + keys + ", write:" + write + '}'; diff --git a/accord-core/src/main/java/accord/topology/Topology.java b/accord-core/src/main/java/accord/topology/Topology.java index 27b43ef7..753933dc 100644 --- a/accord-core/src/main/java/accord/topology/Topology.java +++ b/accord-core/src/main/java/accord/topology/Topology.java @@ -18,17 +18,34 @@ package accord.topology; -import java.util.*; +import java.util.AbstractList; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.stream.IntStream; +import com.google.common.annotations.VisibleForTesting; + import accord.api.RoutingKey; import accord.local.Node.Id; -import accord.primitives.*; -import accord.utils.*; +import accord.primitives.Range; +import accord.primitives.Ranges; +import accord.primitives.Routables; +import accord.primitives.Unseekables; +import accord.utils.ArrayBuffers; import accord.utils.ArrayBuffers.IntBuffers; -import com.google.common.annotations.VisibleForTesting; +import accord.utils.IndexedBiFunction; +import accord.utils.IndexedConsumer; +import accord.utils.IndexedIntFunction; +import accord.utils.IndexedTriFunction; import static accord.utils.SortedArrays.Search.FLOOR; import static accord.utils.SortedArrays.exponentialSearch; @@ -148,6 +165,7 @@ public class Topology return supersetIndexes.length < shards.length; } + @VisibleForTesting public Topology withEpoch(long epoch) { return new Topology(epoch, shards, ranges, nodeLookup, subsetOfRanges, supersetIndexes); diff --git a/accord-core/src/main/java/accord/utils/Invariants.java b/accord-core/src/main/java/accord/utils/Invariants.java index b960d8b2..cd5ba6bd 100644 --- a/accord-core/src/main/java/accord/utils/Invariants.java +++ b/accord-core/src/main/java/accord/utils/Invariants.java @@ -18,10 +18,10 @@ package accord.utils; -import net.nicoulaj.compilecommand.annotations.Inline; - -import javax.annotation.Nullable; import java.util.function.Predicate; +import javax.annotation.Nullable; + +import net.nicoulaj.compilecommand.annotations.Inline; import static java.lang.String.format; @@ -146,6 +146,13 @@ public class Invariants return param; } + public static <T> T nonNull(T param, String message) + { + if (param == null) + throw new NullPointerException(message); + return param; + } + public static <T> T nonNull(T param, String fmt, Object... args) { if (param == null) diff --git a/accord-core/src/test/java/accord/Utils.java b/accord-core/src/test/java/accord/Utils.java index 3fc3ac9d..ac36fb16 100644 --- a/accord-core/src/test/java/accord/Utils.java +++ b/accord-core/src/test/java/accord/Utils.java @@ -18,6 +18,7 @@ package accord; +import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.Set; @@ -25,10 +26,10 @@ import java.util.concurrent.TimeUnit; import com.google.common.collect.Sets; -import java.time.Duration; - import accord.api.MessageSink; import accord.api.Scheduler; +import accord.coordinate.TxnExecute; +import accord.coordinate.TxnPersist; import accord.config.LocalConfig; import accord.impl.InMemoryCommandStores; import accord.impl.IntKey; @@ -41,6 +42,7 @@ import accord.impl.mock.MockStore; import accord.local.Node; import accord.local.NodeTimeService; import accord.local.ShardDistributor; +import accord.messages.Apply; import accord.config.MutableLocalConfig; import accord.messages.LocalMessage; import accord.primitives.Keys; @@ -54,7 +56,6 @@ import accord.utils.DefaultRandom; import accord.utils.EpochFunction; import accord.utils.Invariants; import accord.utils.ThreadPoolScheduler; - import org.awaitility.Awaitility; import org.awaitility.core.ThrowingRunnable; @@ -112,7 +113,6 @@ public class Utils { return new Txn.InMemory(keys, MockStore.read(keys), MockStore.QUERY, MockStore.update(keys)); } - public static Txn writeTxn(Ranges ranges) { return new Txn.InMemory(ranges, MockStore.read(ranges), MockStore.QUERY, MockStore.update(ranges)); @@ -161,7 +161,7 @@ public class Utils scheduler, SizeOfIntersectionSorter.SUPPLIER, SimpleProgressLog::new, - InMemoryCommandStores.Synchronized::new, + InMemoryCommandStores.Synchronized::new, TxnExecute.FACTORY, TxnPersist.FACTORY, Apply.FACTORY, localConfig); awaitUninterruptibly(node.unsafeStart()); return node; diff --git a/accord-core/src/test/java/accord/burn/BurnTest.java b/accord-core/src/test/java/accord/burn/BurnTest.java index b147efb7..721cf030 100644 --- a/accord-core/src/test/java/accord/burn/BurnTest.java +++ b/accord-core/src/test/java/accord/burn/BurnTest.java @@ -21,8 +21,8 @@ package accord.burn; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collections; -import java.util.EnumMap; import java.util.List; +import java.util.Map; import java.util.Queue; import java.util.Set; import java.util.TreeSet; @@ -53,13 +53,14 @@ import org.slf4j.LoggerFactory; import accord.api.Key; import accord.impl.IntHashKey; +import accord.impl.TopologyFactory; import accord.impl.basic.Cluster; +import accord.impl.basic.Cluster.Stats; +import accord.impl.basic.Packet; import accord.impl.basic.PendingRunnable; import accord.impl.basic.PropagatingPendingQueue; import accord.impl.basic.RandomDelayQueue; import accord.impl.basic.RandomDelayQueue.Factory; -import accord.impl.TopologyFactory; -import accord.impl.basic.Packet; import accord.impl.basic.SimulatedDelayedExecutorService; import accord.impl.list.ListAgent; import accord.impl.list.ListQuery; @@ -307,7 +308,7 @@ public class BurnTest } }; - EnumMap<MessageType, Cluster.Stats> messageStatsMap; + Map<MessageType, Stats> messageStatsMap; try { messageStatsMap = Cluster.run(toArray(nodes, Id[]::new), listener, () -> queue, queue::checkFailures, diff --git a/accord-core/src/test/java/accord/impl/basic/Cluster.java b/accord-core/src/test/java/accord/impl/basic/Cluster.java index efeadfec..8595f7a6 100644 --- a/accord-core/src/test/java/accord/impl/basic/Cluster.java +++ b/accord-core/src/test/java/accord/impl/basic/Cluster.java @@ -21,7 +21,6 @@ package accord.impl.basic; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.EnumMap; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedHashMap; @@ -45,6 +44,8 @@ import accord.api.MessageSink; import accord.api.Scheduler; import accord.burn.BurnTestConfigurationService; import accord.burn.TopologyUpdates; +import accord.coordinate.TxnExecute; +import accord.coordinate.TxnPersist; import accord.impl.CoordinateDurabilityScheduling; import accord.impl.IntHashKey; import accord.impl.SimpleProgressLog; @@ -56,6 +57,7 @@ import accord.local.Node; import accord.local.Node.Id; import accord.local.NodeTimeService; import accord.local.ShardDistributor; +import accord.messages.Apply; import accord.config.MutableLocalConfig; import accord.messages.LocalMessage; import accord.messages.MessageType; @@ -84,7 +86,7 @@ public class Cluster implements Scheduler public String toString() { return Integer.toString(count); } } - EnumMap<MessageType, Stats> statsMap = new EnumMap<>(MessageType.class); + Map<MessageType, Stats> statsMap = new HashMap<>(); final RandomSource randomSource; final Function<Id, Node> lookup; @@ -216,7 +218,7 @@ public class Cluster implements Scheduler run.run(); } - public static EnumMap<MessageType, Stats> run(Id[] nodes, MessageListener messageListener, Supplier<PendingQueue> queueSupplier, Runnable checkFailures, Consumer<Packet> responseSink, AgentExecutor executor, Supplier<RandomSource> randomSupplier, Supplier<LongSupplier> nowSupplierSupplier, TopologyFactory topologyFactory, Supplier<Packet> in, Consumer<Runnable> noMoreWorkSignal) + public static Map<MessageType, Stats> run(Id[] nodes, MessageListener messageListener, Supplier<PendingQueue> queueSupplier, Runnable checkFailures, Consumer<Packet> responseSink, AgentExecutor executor, Supplier<RandomSource> randomSupplier, Supplier<LongSupplier> nowSupplierSupplier, TopologyFactory topologyFactory, Supplier<Packet> in, Consumer<Runnable> noMoreWorkSignal) { Topology topology = topologyFactory.toTopology(nodes); Map<Id, Node> lookup = new LinkedHashMap<>(); @@ -236,7 +238,8 @@ public class Cluster implements Scheduler () -> new ListStore(id), new ShardDistributor.EvenSplit<>(8, ignore -> new IntHashKey.Splitter()), executor.agent(), randomSupplier.get(), sinks, SizeOfIntersectionSorter.SUPPLIER, - SimpleProgressLog::new, DelayedCommandStores.factory(sinks.pending), localConfig); + SimpleProgressLog::new, DelayedCommandStores.factory(sinks.pending), TxnExecute.FACTORY, TxnPersist.FACTORY, Apply.FACTORY, + localConfig); lookup.put(id, node); CoordinateDurabilityScheduling durability = new CoordinateDurabilityScheduling(node); // TODO (desired): randomise diff --git a/accord-core/src/test/java/accord/impl/list/ListRead.java b/accord-core/src/test/java/accord/impl/list/ListRead.java index 3a287bf4..c26bda01 100644 --- a/accord-core/src/test/java/accord/impl/list/ListRead.java +++ b/accord-core/src/test/java/accord/impl/list/ListRead.java @@ -21,12 +21,6 @@ package accord.impl.list; import java.util.Map; import java.util.function.Function; -import accord.local.SafeCommandStore; -import accord.primitives.Ranges; -import accord.primitives.Timestamp; -import accord.primitives.Txn; -import accord.utils.async.AsyncChain; -import accord.utils.Timestamped; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,9 +29,14 @@ import accord.api.DataStore; import accord.api.Key; import accord.api.Read; import accord.local.CommandStore; +import accord.local.SafeCommandStore; import accord.primitives.Range; +import accord.primitives.Ranges; import accord.primitives.Seekable; import accord.primitives.Seekables; +import accord.primitives.Timestamp; +import accord.utils.Timestamped; +import accord.utils.async.AsyncChain; import accord.utils.async.AsyncExecutor; public class ListRead implements Read @@ -62,7 +61,7 @@ public class ListRead implements Read } @Override - public AsyncChain<Data> read(Seekable key, Txn.Kind kind, SafeCommandStore commandStore, Timestamp executeAt, DataStore store) + public AsyncChain<Data> read(Seekable key, SafeCommandStore commandStore, Timestamp executeAt, DataStore store) { ListStore s = (ListStore)store; return executor.apply(commandStore.commandStore()).submit(() -> { diff --git a/accord-core/src/test/java/accord/impl/mock/MockCluster.java b/accord-core/src/test/java/accord/impl/mock/MockCluster.java index 4b3d198d..3678da1b 100644 --- a/accord-core/src/test/java/accord/impl/mock/MockCluster.java +++ b/accord-core/src/test/java/accord/impl/mock/MockCluster.java @@ -34,6 +34,8 @@ import org.slf4j.LoggerFactory; import accord.NetworkFilter; import accord.api.MessageSink; +import accord.coordinate.TxnExecute; +import accord.coordinate.TxnPersist; import accord.config.LocalConfig; import accord.impl.InMemoryCommandStores; import accord.impl.IntKey; @@ -46,6 +48,7 @@ import accord.local.Node; import accord.local.Node.Id; import accord.local.NodeTimeService; import accord.local.ShardDistributor; +import accord.messages.Apply; import accord.config.MutableLocalConfig; import accord.messages.Callback; import accord.messages.LocalMessage; @@ -136,6 +139,9 @@ public class MockCluster implements Network, AutoCloseable, Iterable<Node> SizeOfIntersectionSorter.SUPPLIER, SimpleProgressLog::new, InMemoryCommandStores.SingleThread::new, + TxnExecute.FACTORY, + TxnPersist.FACTORY, + Apply.FACTORY, localConfig); awaitUninterruptibly(node.unsafeStart()); node.onTopologyUpdate(topology, true); diff --git a/accord-core/src/test/java/accord/impl/mock/MockStore.java b/accord-core/src/test/java/accord/impl/mock/MockStore.java index bae14b57..52385666 100644 --- a/accord-core/src/test/java/accord/impl/mock/MockStore.java +++ b/accord-core/src/test/java/accord/impl/mock/MockStore.java @@ -19,15 +19,20 @@ package accord.impl.mock; import accord.api.Data; +import accord.api.DataStore; import accord.api.Query; import accord.api.Read; import accord.api.Result; -import accord.api.DataStore; import accord.api.Update; import accord.api.Write; import accord.local.Node; import accord.local.SafeCommandStore; -import accord.primitives.*; +import accord.primitives.Ranges; +import accord.primitives.Seekable; +import accord.primitives.Seekables; +import accord.primitives.SyncPoint; +import accord.primitives.Timestamp; +import accord.primitives.Writes; import accord.utils.async.AsyncChain; import accord.utils.async.AsyncChains; import accord.utils.async.AsyncResults; @@ -57,7 +62,7 @@ public class MockStore implements DataStore } @Override - public AsyncChain<Data> read(Seekable key, Txn.Kind kind, SafeCommandStore commandStore, Timestamp executeAt, DataStore store) + public AsyncChain<Data> read(Seekable key, SafeCommandStore commandStore, Timestamp executeAt, DataStore store) { return AsyncChains.success(DATA); } diff --git a/accord-core/src/test/java/accord/local/ImmutableCommandTest.java b/accord-core/src/test/java/accord/local/ImmutableCommandTest.java index ef573d11..8ba38dd3 100644 --- a/accord-core/src/test/java/accord/local/ImmutableCommandTest.java +++ b/accord-core/src/test/java/accord/local/ImmutableCommandTest.java @@ -32,6 +32,8 @@ import accord.api.ProgressLog; import accord.api.RoutingKey; import accord.api.Scheduler; import accord.api.TestableConfigurationService; +import accord.coordinate.TxnExecute; +import accord.coordinate.TxnPersist; import accord.config.LocalConfig; import accord.impl.InMemoryCommandStore; import accord.impl.InMemoryCommandStores; @@ -44,6 +46,7 @@ import accord.impl.mock.MockConfigurationService; import accord.impl.mock.MockStore; import accord.local.Node.Id; import accord.local.SaveStatus.LocalExecution; +import accord.messages.Apply; import accord.config.MutableLocalConfig; import accord.primitives.FullKeyRoute; import accord.primitives.Keys; @@ -113,9 +116,11 @@ public class ImmutableCommandTest MockCluster.Clock clock = new MockCluster.Clock(100); LocalConfig localConfig = new MutableLocalConfig(); Node node = new Node(id, null, null, new MockConfigurationService(null, (epoch, service) -> { }, storeSupport.local.get()), - clock, NodeTimeService.unixWrapper(TimeUnit.MICROSECONDS, clock), - () -> storeSupport.data, new ShardDistributor.EvenSplit(8, ignore -> new IntKey.Splitter()), new TestAgent(), new DefaultRandom(), Scheduler.NEVER_RUN_SCHEDULED, - SizeOfIntersectionSorter.SUPPLIER, ignore -> ignore2 -> new NoOpProgressLog(), InMemoryCommandStores.Synchronized::new, localConfig); + clock, NodeTimeService.unixWrapper(TimeUnit.MICROSECONDS, clock), + () -> storeSupport.data, new ShardDistributor.EvenSplit(8, ignore -> new IntKey.Splitter()), new TestAgent(), new DefaultRandom(), Scheduler.NEVER_RUN_SCHEDULED, + SizeOfIntersectionSorter.SUPPLIER, ignore -> ignore2 -> new NoOpProgressLog(), InMemoryCommandStores.Synchronized::new, + TxnExecute.FACTORY, TxnPersist.FACTORY, Apply.FACTORY, + localConfig); awaitUninterruptibly(node.unsafeStart()); node.onTopologyUpdate(storeSupport.local.get(), true); return node; diff --git a/accord-core/src/test/java/accord/messages/PreAcceptTest.java b/accord-core/src/test/java/accord/messages/PreAcceptTest.java index fa84bea1..1afe638d 100644 --- a/accord-core/src/test/java/accord/messages/PreAcceptTest.java +++ b/accord-core/src/test/java/accord/messages/PreAcceptTest.java @@ -18,26 +18,47 @@ package accord.messages; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.function.BiFunction; +import java.util.stream.Stream; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + import accord.api.RoutingKey; -import accord.impl.*; +import accord.impl.AbstractSafeCommandStore; +import accord.impl.CommandTimeseries; import accord.impl.CommandTimeseries.CommandLoader; +import accord.impl.CommandsForKey; +import accord.impl.IntKey; import accord.impl.IntKey.Raw; -import accord.impl.mock.*; -import accord.local.Node.Id; +import accord.impl.TopologyFactory; import accord.impl.mock.MockCluster.Clock; -import accord.primitives.*; +import accord.impl.mock.Network; +import accord.impl.mock.RecordingMessageSink; +import accord.local.Command; +import accord.local.CommandStore; +import accord.local.Node; +import accord.local.Node.Id; +import accord.local.PreLoadContext; +import accord.local.Status; +import accord.primitives.Ballot; +import accord.primitives.FullRoute; +import accord.primitives.KeyDeps; +import accord.primitives.Keys; +import accord.primitives.PartialDeps; +import accord.primitives.Participants; +import accord.primitives.RangeDeps; +import accord.primitives.Ranges; +import accord.primitives.Timestamp; +import accord.primitives.Txn; +import accord.primitives.TxnId; import accord.topology.Topology; -import accord.local.*; - -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; - -import java.util.List; -import java.util.concurrent.ExecutionException; -import java.util.function.BiFunction; -import java.util.stream.Stream; -import static accord.Utils.*; +import static accord.Utils.createNode; +import static accord.Utils.id; +import static accord.Utils.writeTxn; import static accord.impl.InMemoryCommandStore.inMemory; import static accord.impl.IntKey.range; import static accord.impl.IntKey.routing; diff --git a/accord-core/src/test/java/accord/messages/ReadDataTest.java b/accord-core/src/test/java/accord/messages/ReadDataTest.java index 27c3e9bf..97b44a87 100644 --- a/accord-core/src/test/java/accord/messages/ReadDataTest.java +++ b/accord-core/src/test/java/accord/messages/ReadDataTest.java @@ -58,6 +58,7 @@ import accord.primitives.PartialTxn; import accord.primitives.Range; import accord.primitives.Ranges; import accord.primitives.Routable; +import accord.primitives.Seekables; import accord.primitives.Timestamp; import accord.primitives.Txn; import accord.primitives.TxnId; @@ -101,7 +102,8 @@ class ReadDataTest Read read = Mockito.mock(Read.class); Mockito.when(read.slice(any())).thenReturn(read); Mockito.when(read.merge(any())).thenReturn(read); - Mockito.when(read.read(any(), any(), any(), any(), any())).thenAnswer(new Answer<AsyncChain<Data>>() + Mockito.when(read.keys()).thenReturn((Seekables)keys); + Mockito.when(read.read(any(), any(), any(), any())).thenAnswer(new Answer<AsyncChain<Data>>() { private final boolean called = false; @Override diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java b/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java index 0184765d..f69e4ed4 100644 --- a/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java +++ b/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java @@ -43,6 +43,8 @@ import accord.api.MessageSink; import accord.api.Scheduler; import accord.config.LocalConfig; import accord.config.MutableLocalConfig; +import accord.coordinate.TxnExecute; +import accord.coordinate.TxnPersist; import accord.impl.InMemoryCommandStores; import accord.impl.SimpleProgressLog; import accord.impl.SizeOfIntersectionSorter; @@ -51,6 +53,7 @@ import accord.local.Node; import accord.local.Node.Id; import accord.local.NodeTimeService; import accord.local.ShardDistributor; +import accord.messages.Apply; import accord.messages.Callback; import accord.messages.LocalMessage; import accord.messages.Reply; @@ -322,7 +325,8 @@ public class Cluster implements Scheduler MaelstromStore::new, new ShardDistributor.EvenSplit(8, ignore -> new MaelstromKey.Splitter()), MaelstromAgent.INSTANCE, randomSupplier.get(), sinks, SizeOfIntersectionSorter.SUPPLIER, - SimpleProgressLog::new, InMemoryCommandStores.SingleThread::new, localConfig)); + SimpleProgressLog::new, InMemoryCommandStores.SingleThread::new, TxnExecute.FACTORY, TxnPersist.FACTORY, Apply.FACTORY, + localConfig)); } AsyncResult<?> startup = AsyncChains.reduce(lookup.values().stream().map(Node::unsafeStart).collect(toList()), (a, b) -> null).beginAsResult(); diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromRead.java b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromRead.java index eeb9ca1a..9bb16f11 100644 --- a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromRead.java +++ b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromRead.java @@ -18,9 +18,15 @@ package accord.maelstrom; -import accord.api.*; +import accord.api.Data; +import accord.api.DataStore; +import accord.api.Key; +import accord.api.Read; import accord.local.SafeCommandStore; -import accord.primitives.*; +import accord.primitives.Keys; +import accord.primitives.Ranges; +import accord.primitives.Seekable; +import accord.primitives.Timestamp; import accord.utils.async.AsyncChain; import accord.utils.async.AsyncChains; @@ -42,7 +48,7 @@ public class MaelstromRead implements Read } @Override - public AsyncChain<Data> read(Seekable key, Txn.Kind kind, SafeCommandStore commandStore, Timestamp executeAt, DataStore store) + public AsyncChain<Data> read(Seekable key, SafeCommandStore commandStore, Timestamp executeAt, DataStore store) { MaelstromStore s = (MaelstromStore)store; MaelstromData result = new MaelstromData(); diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/Main.java b/accord-maelstrom/src/main/java/accord/maelstrom/Main.java index 40e73ff2..5d2a459c 100644 --- a/accord-maelstrom/src/main/java/accord/maelstrom/Main.java +++ b/accord-maelstrom/src/main/java/accord/maelstrom/Main.java @@ -35,6 +35,8 @@ import accord.api.Scheduler; import accord.config.LocalConfig; import accord.config.MutableLocalConfig; import accord.coordinate.Timeout; +import accord.coordinate.TxnExecute; +import accord.coordinate.TxnPersist; import accord.impl.InMemoryCommandStores; import accord.impl.SimpleProgressLog; import accord.impl.SizeOfIntersectionSorter; @@ -44,6 +46,7 @@ import accord.local.Node.Id; import accord.local.NodeTimeService; import accord.local.ShardDistributor; import accord.maelstrom.Packet.Type; +import accord.messages.Apply; import accord.messages.Callback; import accord.messages.LocalMessage; import accord.messages.Reply; @@ -180,7 +183,8 @@ public class Main System::currentTimeMillis, NodeTimeService.unixWrapper(TimeUnit.MILLISECONDS, System::currentTimeMillis), MaelstromStore::new, new ShardDistributor.EvenSplit(8, ignore -> new MaelstromKey.Splitter()), MaelstromAgent.INSTANCE, new DefaultRandom(), scheduler, SizeOfIntersectionSorter.SUPPLIER, - SimpleProgressLog::new, InMemoryCommandStores.SingleThread::new, localConfig); + SimpleProgressLog::new, InMemoryCommandStores.SingleThread::new, TxnExecute.FACTORY, TxnPersist.FACTORY, Apply.FACTORY, + localConfig); awaitUninterruptibly(on.unsafeStart()); err.println("Initialized node " + init.self); err.flush(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org