This is an automated email from the ASF dual-hosted git repository. aweisberg pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git
The following commit(s) were added to refs/heads/trunk by this push: new 91336705 Allow exceptions to be propagated remotely 91336705 is described below commit 91336705bde8332954e849219d73205d68fa168a Author: aweisberg <ar...@weisberg.ws> AuthorDate: Fri Aug 18 16:48:42 2023 -0400 Allow exceptions to be propagated remotely https://github.com/apache/cassandra-accord/pull/56 Patch by Ariel Weisberg; Reviewed by David Capwell for CASSANDRA-18779 --- .../src/main/java/accord/api/MessageSink.java | 1 + .../accord/coordinate/CoordinateShardDurable.java | 5 --- .../src/main/java/accord/coordinate/Execute.java | 17 +++++---- .../java/accord/impl/AbstractFetchCoordinator.java | 8 ++-- accord-core/src/main/java/accord/local/Node.java | 26 ++++++++++--- .../java/accord/messages/AbstractEpochRequest.java | 2 +- .../src/main/java/accord/messages/Accept.java | 29 +++++++++----- .../src/main/java/accord/messages/Apply.java | 4 +- .../main/java/accord/messages/BeginRecovery.java | 40 ++++++++++++++------ .../src/main/java/accord/messages/CheckStatus.java | 8 ++-- .../src/main/java/accord/messages/Commit.java | 44 +++++++++++++--------- .../src/main/java/accord/messages/GetDeps.java | 18 +++++---- .../main/java/accord/messages/InformDurable.java | 20 +++++----- .../src/main/java/accord/messages/MessageType.java | 1 + .../src/main/java/accord/messages/PreAccept.java | 27 ++++++++----- .../src/main/java/accord/messages/ReadData.java | 22 +++++------ .../src/main/java/accord/messages/ReadTxnData.java | 10 +++-- .../src/main/java/accord/messages/Reply.java | 21 +++++++++++ .../main/java/accord/messages/WaitOnCommit.java | 38 +++++++++++++++---- .../java/accord/messages/WaitUntilApplied.java | 9 +++-- accord-core/src/main/java/accord/utils/Utils.java | 16 ++++++-- .../main/java/accord/utils/async/AsyncResults.java | 17 ++++++--- .../accord/burn/BurnTestConfigurationService.java | 26 +++++++------ .../src/test/java/accord/impl/basic/NodeSink.java | 13 +++++-- .../basic/SimulatedDelayedExecutorService.java | 4 +- .../accord/impl/basic/TaskExecutorService.java | 19 +--------- .../src/test/java/accord/impl/list/ListAgent.java | 19 ++++++++-- .../test/java/accord/impl/list/ListRequest.java | 25 ++++++------ .../java/accord/impl/mock/SimpleMessageSink.java | 10 ++++- .../src/test/java/accord/utils/MessageTask.java | 23 +++++++---- .../src/main/java/accord/maelstrom/Cluster.java | 7 ++++ .../src/main/java/accord/maelstrom/Error.java | 2 +- .../main/java/accord/maelstrom/MaelstromAgent.java | 22 ++++++++--- .../java/accord/maelstrom/MaelstromRequest.java | 15 ++++---- .../src/main/java/accord/maelstrom/Main.java | 7 ++++ 35 files changed, 376 insertions(+), 199 deletions(-) diff --git a/accord-core/src/main/java/accord/api/MessageSink.java b/accord-core/src/main/java/accord/api/MessageSink.java index ee4d681e..47a3fa03 100644 --- a/accord-core/src/main/java/accord/api/MessageSink.java +++ b/accord-core/src/main/java/accord/api/MessageSink.java @@ -30,4 +30,5 @@ public interface MessageSink void send(Id to, Request request); void send(Id to, Request request, AgentExecutor executor, Callback callback); void reply(Id replyingToNode, ReplyContext replyContext, Reply reply); + void replyWithUnknownFailure(Id replyingToNode, ReplyContext replyContext, Throwable failure); } diff --git a/accord-core/src/main/java/accord/coordinate/CoordinateShardDurable.java b/accord-core/src/main/java/accord/coordinate/CoordinateShardDurable.java index daaf81fe..1b5b39b1 100644 --- a/accord-core/src/main/java/accord/coordinate/CoordinateShardDurable.java +++ b/accord-core/src/main/java/accord/coordinate/CoordinateShardDurable.java @@ -77,11 +77,6 @@ public class CoordinateShardDurable extends SettableResult<Void> implements Call tryFailure(new RuntimeException("Unexpected reply")); return; - case Error: - // TODO (required): error propagation - tryFailure(new RuntimeException("Unknown error")); - return; - case Invalid: tryFailure(new Invalidated(exclusiveSyncPoint.syncId, exclusiveSyncPoint.homeKey)); return; diff --git a/accord-core/src/main/java/accord/coordinate/Execute.java b/accord-core/src/main/java/accord/coordinate/Execute.java index da39a056..7a64395e 100644 --- a/accord-core/src/main/java/accord/coordinate/Execute.java +++ b/accord-core/src/main/java/accord/coordinate/Execute.java @@ -24,14 +24,20 @@ import java.util.function.BiConsumer; import accord.api.Data; import accord.api.Result; import accord.local.Node; -import accord.messages.ReadTxnData; +import accord.local.Node.Id; +import accord.messages.Commit; import accord.messages.ReadData.ReadNack; import accord.messages.ReadData.ReadOk; import accord.messages.ReadData.ReadReply; -import accord.primitives.*; +import accord.messages.ReadTxnData; +import accord.primitives.Deps; +import accord.primitives.FullRoute; +import accord.primitives.Participants; +import accord.primitives.Ranges; +import accord.primitives.Timestamp; +import accord.primitives.Txn; +import accord.primitives.TxnId; import accord.topology.Topologies; -import accord.local.Node.Id; -import accord.messages.Commit; import accord.topology.Topology; import static accord.coordinate.ReadCoordinator.Action.Approve; @@ -111,9 +117,6 @@ class Execute extends ReadCoordinator<ReadReply> switch (nack) { default: throw new IllegalStateException(); - case Error: - // TODO (expected): report content of error - return Action.Reject; case Redundant: callback.accept(null, new Preempted(txnId, route.homeKey())); return Action.Aborted; diff --git a/accord-core/src/main/java/accord/impl/AbstractFetchCoordinator.java b/accord-core/src/main/java/accord/impl/AbstractFetchCoordinator.java index eedf19d1..aaeeb863 100644 --- a/accord-core/src/main/java/accord/impl/AbstractFetchCoordinator.java +++ b/accord-core/src/main/java/accord/impl/AbstractFetchCoordinator.java @@ -154,8 +154,6 @@ public abstract class AbstractFetchCoordinator extends FetchCoordinator case Invalid: case Redundant: throw new AssertionError(String.format("Unexpected reply: %s", reply)); - case Error: - // TODO (required): ensure errors are propagated to coordinators and can be logged } } return; @@ -273,9 +271,11 @@ public abstract class AbstractFetchCoordinator extends FetchCoordinator } @Override - protected void reply(@Nullable Ranges unavailable, @Nullable Data data) + protected void reply(@Nullable Ranges unavailable, @Nullable Data data, @Nullable Throwable fail) { - node.reply(replyTo, replyContext, new FetchResponse(unavailable, data, maxApplied)); + // TODO (review): If the fetch response actually does some streaming, but we send back the error + // it is a lot of work and data that might move and be unaccounted for at the coordinator + node.reply(replyTo, replyContext, fail == null ? new FetchResponse(unavailable, data, maxApplied) : null, fail); } @Override diff --git a/accord-core/src/main/java/accord/local/Node.java b/accord-core/src/main/java/accord/local/Node.java index 7cf84dfe..1916f0bb 100644 --- a/accord-core/src/main/java/accord/local/Node.java +++ b/accord-core/src/main/java/accord/local/Node.java @@ -34,7 +34,6 @@ import java.util.function.ToLongFunction; import javax.annotation.Nonnull; import javax.annotation.Nullable; -import accord.primitives.EpochSupplier; import com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -60,6 +59,7 @@ import accord.messages.ReplyContext; import accord.messages.Request; import accord.messages.TxnRequest; import accord.primitives.Ballot; +import accord.primitives.EpochSupplier; import accord.primitives.FullRoute; import accord.primitives.ProgressToken; import accord.primitives.Range; @@ -475,10 +475,17 @@ public class Node implements ConfigurationService.Listener, NodeTimeService messageSink.send(to, send); } - public void reply(Id replyingToNode, ReplyContext replyContext, Reply send) + public void reply(Id replyingToNode, ReplyContext replyContext, Reply send, Throwable failure) { - // TODO (usability, now): add Throwable as an argument so the error check is here, every single message gets this wrong causing a NPE here - if (send == null) + if (failure != null) + { + agent.onUncaughtException(failure); + if (send != null) + agent().onUncaughtException(new IllegalArgumentException(String.format("fail (%s) and send (%s) are both not null", failure, send))); + messageSink.replyWithUnknownFailure(replyingToNode, replyContext, failure); + return; + } + else if (send == null) { NullPointerException e = new NullPointerException(); agent.onUncaughtException(e); @@ -633,7 +640,16 @@ public class Node implements ConfigurationService.Listener, NodeTimeService return; } } - scheduler.now(() -> request.process(this, from, replyContext)); + scheduler.now(() -> { + try + { + request.process(this, from, replyContext); + } + catch (Throwable t) + { + reply(from, replyContext, null, t); + } + }); } public Scheduler scheduler() diff --git a/accord-core/src/main/java/accord/messages/AbstractEpochRequest.java b/accord-core/src/main/java/accord/messages/AbstractEpochRequest.java index fe1790f6..482fcf33 100644 --- a/accord-core/src/main/java/accord/messages/AbstractEpochRequest.java +++ b/accord-core/src/main/java/accord/messages/AbstractEpochRequest.java @@ -56,7 +56,7 @@ public abstract class AbstractEpochRequest<R extends Reply> implements PreLoadCo @Override public void accept(R reply, Throwable failure) { - node.reply(replyTo, replyContext, reply); + node.reply(replyTo, replyContext, reply, failure); } @Override diff --git a/accord-core/src/main/java/accord/messages/Accept.java b/accord-core/src/main/java/accord/messages/Accept.java index a55bcf43..1554e5dd 100644 --- a/accord-core/src/main/java/accord/messages/Accept.java +++ b/accord-core/src/main/java/accord/messages/Accept.java @@ -18,21 +18,30 @@ package accord.messages; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import accord.api.RoutingKey; import accord.local.Commands; import accord.local.Commands.AcceptOutcome; +import accord.local.Node.Id; import accord.local.SafeCommand; import accord.local.SafeCommandStore; -import accord.primitives.*; -import accord.local.Node.Id; +import accord.primitives.Ballot; +import accord.primitives.Deps; +import accord.primitives.FullRoute; +import accord.primitives.PartialDeps; +import accord.primitives.PartialRoute; +import accord.primitives.Ranges; +import accord.primitives.Seekables; +import accord.primitives.Timestamp; +import accord.primitives.TxnId; import accord.topology.Topologies; -import accord.api.RoutingKey; - -import javax.annotation.Nonnull; - -import javax.annotation.Nullable; - -import static accord.local.Commands.AcceptOutcome.*; +import static accord.local.Commands.AcceptOutcome.Redundant; +import static accord.local.Commands.AcceptOutcome.RejectedBallot; +import static accord.local.Commands.AcceptOutcome.Success; +import static accord.local.Commands.AcceptOutcome.Truncated; // TODO (low priority, efficiency): use different objects for send and receive, so can be more efficient // (e.g. serialize without slicing, and without unnecessary fields) @@ -118,7 +127,7 @@ public class Accept extends TxnRequest.WithUnsynced<Accept.AcceptReply> @Override public void accept(AcceptReply reply, Throwable failure) { - node.reply(replyTo, replyContext, reply); + node.reply(replyTo, replyContext, reply, failure); } @Override diff --git a/accord-core/src/main/java/accord/messages/Apply.java b/accord-core/src/main/java/accord/messages/Apply.java index da06f6b1..12d5614c 100644 --- a/accord-core/src/main/java/accord/messages/Apply.java +++ b/accord-core/src/main/java/accord/messages/Apply.java @@ -153,9 +153,7 @@ public class Apply extends TxnRequest<ApplyReply> @Override public void accept(ApplyReply reply, Throwable failure) { - if (failure != null) - node.agent().onUncaughtException(failure); - node.reply(replyTo, replyContext, reply); + node.reply(replyTo, replyContext, reply, failure); } @Override diff --git a/accord-core/src/main/java/accord/messages/BeginRecovery.java b/accord-core/src/main/java/accord/messages/BeginRecovery.java index f1a16b89..1efd652f 100644 --- a/accord-core/src/main/java/accord/messages/BeginRecovery.java +++ b/accord-core/src/main/java/accord/messages/BeginRecovery.java @@ -18,25 +18,43 @@ package accord.messages; -import accord.api.Result; -import accord.local.*; -import accord.primitives.*; -import accord.topology.Topologies; - import java.util.List; - import javax.annotation.Nonnull; import javax.annotation.Nullable; -import accord.utils.Invariants; - +import accord.api.Result; +import accord.local.Command; +import accord.local.Commands; import accord.local.Node.Id; +import accord.local.SafeCommand; +import accord.local.SafeCommandStore; +import accord.local.Status; +import accord.primitives.Ballot; +import accord.primitives.Deps; +import accord.primitives.FullRoute; +import accord.primitives.PartialDeps; +import accord.primitives.PartialRoute; +import accord.primitives.PartialTxn; +import accord.primitives.Ranges; +import accord.primitives.Seekables; +import accord.primitives.Timestamp; +import accord.primitives.Txn; +import accord.primitives.TxnId; +import accord.primitives.Writes; +import accord.topology.Topologies; +import accord.utils.Invariants; import static accord.local.SafeCommandStore.TestDep.WITH; import static accord.local.SafeCommandStore.TestDep.WITHOUT; import static accord.local.SafeCommandStore.TestKind.shouldHaveWitnessed; -import static accord.local.SafeCommandStore.TestTimestamp.*; -import static accord.local.Status.*; +import static accord.local.SafeCommandStore.TestTimestamp.EXECUTES_AFTER; +import static accord.local.SafeCommandStore.TestTimestamp.STARTED_AFTER; +import static accord.local.SafeCommandStore.TestTimestamp.STARTED_BEFORE; +import static accord.local.Status.Accepted; +import static accord.local.Status.Committed; +import static accord.local.Status.Phase; +import static accord.local.Status.PreAccepted; +import static accord.local.Status.PreCommitted; import static accord.messages.PreAccept.calculatePartialDeps; public class BeginRecovery extends TxnRequest<BeginRecovery.RecoverReply> @@ -179,7 +197,7 @@ public class BeginRecovery extends TxnRequest<BeginRecovery.RecoverReply> @Override public void accept(RecoverReply reply, Throwable failure) { - node.reply(replyTo, replyContext, reply); + node.reply(replyTo, replyContext, reply, failure); } @Override diff --git a/accord-core/src/main/java/accord/messages/CheckStatus.java b/accord-core/src/main/java/accord/messages/CheckStatus.java index a18b2c54..4c8ae207 100644 --- a/accord-core/src/main/java/accord/messages/CheckStatus.java +++ b/accord-core/src/main/java/accord/messages/CheckStatus.java @@ -18,6 +18,8 @@ package accord.messages; +import javax.annotation.Nullable; + import accord.api.Result; import accord.api.RoutingKey; import accord.coordinate.Infer; @@ -46,7 +48,6 @@ import accord.primitives.Writes; import accord.topology.Topologies; import accord.utils.Invariants; import accord.utils.MapReduceConsume; -import javax.annotation.Nullable; import static accord.local.Status.Committed; import static accord.local.Status.Definition; @@ -175,8 +176,9 @@ public class CheckStatus extends AbstractEpochRequest<CheckStatus.CheckStatusRep @Override public void accept(CheckStatusReply ok, Throwable failure) { - if (ok == null) node.reply(replyTo, replyContext, CheckStatusNack.NotOwned); - else node.reply(replyTo, replyContext, ok); + if (failure != null) node.reply(replyTo, replyContext, ok, failure); + else if (ok == null) node.reply(replyTo, replyContext, CheckStatusNack.NotOwned, null); + else node.reply(replyTo, replyContext, ok, null); } private Status invalidIfNotAtLeast(SafeCommandStore safeStore) diff --git a/accord-core/src/main/java/accord/messages/Commit.java b/accord-core/src/main/java/accord/messages/Commit.java index 3954a8eb..b755827b 100644 --- a/accord-core/src/main/java/accord/messages/Commit.java +++ b/accord-core/src/main/java/accord/messages/Commit.java @@ -19,20 +19,36 @@ package accord.messages; import java.util.Set; +import javax.annotation.Nullable; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import accord.local.*; +import accord.local.Commands; +import accord.local.Node; +import accord.local.Node.Id; +import accord.local.PreLoadContext; +import accord.local.SafeCommand; +import accord.local.SafeCommandStore; import accord.messages.ReadData.ReadNack; import accord.messages.ReadData.ReadReply; -import accord.primitives.*; -import accord.local.Node.Id; +import accord.primitives.Deps; +import accord.primitives.FullRoute; +import accord.primitives.Keys; +import accord.primitives.PartialDeps; +import accord.primitives.PartialRoute; +import accord.primitives.PartialTxn; +import accord.primitives.Participants; +import accord.primitives.Ranges; +import accord.primitives.Route; +import accord.primitives.Seekables; +import accord.primitives.Timestamp; +import accord.primitives.Txn; +import accord.primitives.TxnId; +import accord.primitives.Unseekables; import accord.topology.Topologies; -import javax.annotation.Nullable; - -import accord.utils.Invariants; - import accord.topology.Topology; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import accord.utils.Invariants; import static accord.local.Status.Committed; import static accord.local.Status.Known.DefinitionOnly; @@ -177,14 +193,8 @@ public class Commit extends TxnRequest<ReadNack> @Override public synchronized void accept(ReadNack reply, Throwable failure) { - if (failure != null) - { - logger.error("Unhandled exception during commit", failure); - node.agent().onUncaughtException(failure); - return; - } - if (reply != null) - node.reply(replyTo, replyContext, reply); + if (reply != null || failure != null) + node.reply(replyTo, replyContext, reply, failure); else if (read != null) read.process(node, replyTo, replyContext); if (defer != null) diff --git a/accord-core/src/main/java/accord/messages/GetDeps.java b/accord-core/src/main/java/accord/messages/GetDeps.java index f757aa7e..083c15c8 100644 --- a/accord-core/src/main/java/accord/messages/GetDeps.java +++ b/accord-core/src/main/java/accord/messages/GetDeps.java @@ -18,14 +18,19 @@ package accord.messages; -import accord.local.SafeCommandStore; -import accord.primitives.*; -import accord.utils.Invariants; +import javax.annotation.Nonnull; import accord.local.Node.Id; +import accord.local.SafeCommandStore; +import accord.primitives.FullRoute; +import accord.primitives.PartialDeps; +import accord.primitives.PartialRoute; +import accord.primitives.Ranges; +import accord.primitives.Seekables; +import accord.primitives.Timestamp; +import accord.primitives.TxnId; import accord.topology.Topologies; - -import javax.annotation.Nonnull; +import accord.utils.Invariants; import static accord.messages.PreAccept.calculatePartialDeps; @@ -78,8 +83,7 @@ public class GetDeps extends TxnRequest.WithUnsynced<PartialDeps> @Override public void accept(PartialDeps result, Throwable failure) { - if (result == null) node.agent().onUncaughtException(failure); // TODO (expected): propagate failures to coordinator - else node.reply(replyTo, replyContext, new GetDepsOk(result)); + node.reply(replyTo, replyContext, result != null ? new GetDepsOk(result) : null, failure); } @Override diff --git a/accord-core/src/main/java/accord/messages/InformDurable.java b/accord-core/src/main/java/accord/messages/InformDurable.java index 79a16121..d9f42214 100644 --- a/accord-core/src/main/java/accord/messages/InformDurable.java +++ b/accord-core/src/main/java/accord/messages/InformDurable.java @@ -19,10 +19,17 @@ package accord.messages; import accord.api.ProgressLog.ProgressShard; -import accord.local.*; +import accord.local.Commands; import accord.local.Node.Id; +import accord.local.PreLoadContext; +import accord.local.SafeCommand; +import accord.local.SafeCommandStore; +import accord.local.Status; import accord.local.Status.Durability; -import accord.primitives.*; +import accord.primitives.FullRoute; +import accord.primitives.PartialRoute; +import accord.primitives.Timestamp; +import accord.primitives.TxnId; import accord.topology.Topologies; import accord.utils.Invariants; @@ -104,14 +111,7 @@ public class InformDurable extends TxnRequest<Reply> implements PreLoadContext @Override public void accept(Reply reply, Throwable failure) { - // TODO: respond with failure - if (reply == null) - { - if (failure == null) - throw new IllegalStateException("Processed nothing on this node"); - throw new IllegalStateException(failure); - } - node.reply(replyTo, replyContext, reply); + node.reply(replyTo, replyContext, reply, failure); } @Override diff --git a/accord-core/src/main/java/accord/messages/MessageType.java b/accord-core/src/main/java/accord/messages/MessageType.java index 8b68436e..7909157a 100644 --- a/accord-core/src/main/java/accord/messages/MessageType.java +++ b/accord-core/src/main/java/accord/messages/MessageType.java @@ -54,6 +54,7 @@ public enum MessageType SET_GLOBALLY_DURABLE_REQ (true ), QUERY_DURABLE_BEFORE_REQ (false), QUERY_DURABLE_BEFORE_RSP (false), + FAILURE_RSP (false), ; /** diff --git a/accord-core/src/main/java/accord/messages/PreAccept.java b/accord-core/src/main/java/accord/messages/PreAccept.java index d035e8ed..1be51ca2 100644 --- a/accord-core/src/main/java/accord/messages/PreAccept.java +++ b/accord-core/src/main/java/accord/messages/PreAccept.java @@ -20,19 +20,29 @@ package accord.messages; import java.util.List; import java.util.Objects; +import javax.annotation.Nullable; -import accord.local.*; -import accord.local.SafeCommandStore.TestKind; - +import accord.local.Command; +import accord.local.Commands; import accord.local.Node.Id; +import accord.local.SafeCommand; +import accord.local.SafeCommandStore; +import accord.local.SafeCommandStore.TestKind; import accord.messages.TxnRequest.WithUnsynced; +import accord.primitives.Deps; +import accord.primitives.EpochSupplier; +import accord.primitives.FullRoute; +import accord.primitives.PartialDeps; +import accord.primitives.PartialRoute; +import accord.primitives.PartialTxn; +import accord.primitives.Ranges; +import accord.primitives.Seekables; +import accord.primitives.Timestamp; +import accord.primitives.Txn; +import accord.primitives.TxnId; import accord.topology.Shard; import accord.topology.Topologies; -import javax.annotation.Nullable; - -import accord.primitives.*; - import static accord.local.SafeCommandStore.TestDep.ANY_DEPS; import static accord.local.SafeCommandStore.TestTimestamp.STARTED_BEFORE; import static accord.primitives.Txn.Kind.ExclusiveSyncPoint; @@ -147,8 +157,7 @@ public class PreAccept extends WithUnsynced<PreAccept.PreAcceptReply> implements @Override public void accept(PreAcceptReply reply, Throwable failure) { - // TODO (required, error handling): communicate back the failure - node.reply(replyTo, replyContext, reply); + node.reply(replyTo, replyContext, reply, failure); } @Override diff --git a/accord-core/src/main/java/accord/messages/ReadData.java b/accord-core/src/main/java/accord/messages/ReadData.java index 618e1479..66b52347 100644 --- a/accord-core/src/main/java/accord/messages/ReadData.java +++ b/accord-core/src/main/java/accord/messages/ReadData.java @@ -21,21 +21,20 @@ package accord.messages; import java.util.BitSet; import javax.annotation.Nullable; -import accord.api.Data; -import accord.primitives.Participants; -import accord.topology.Topologies; -import accord.utils.Invariants; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import accord.api.Data; import accord.local.CommandStore; import accord.local.Node; import accord.local.SafeCommandStore; import accord.primitives.PartialTxn; +import accord.primitives.Participants; import accord.primitives.Ranges; import accord.primitives.Timestamp; import accord.primitives.TxnId; +import accord.topology.Topologies; +import accord.utils.Invariants; import static accord.messages.MessageType.READ_RSP; import static accord.messages.TxnRequest.computeWaitForEpoch; @@ -71,7 +70,7 @@ public abstract class ReadData extends AbstractEpochRequest<ReadData.ReadNack> protected abstract void cancel(); protected abstract long executeAtEpoch(); - protected abstract void reply(@Nullable Ranges unavailable, @Nullable Data data); + protected abstract void reply(@Nullable Ranges unavailable, @Nullable Data data, @Nullable Throwable fail); @Override public long waitForEpoch() @@ -99,12 +98,12 @@ public abstract class ReadData extends AbstractEpochRequest<ReadData.ReadNack> { if (reply != null) { - node.reply(replyTo, replyContext, reply); + node.reply(replyTo, replyContext, reply, failure); } else if (failure != null) { // TODO (expected, testing): test - node.reply(replyTo, replyContext, ReadNack.Error); + node.reply(replyTo, replyContext, null, failure); data = null; // TODO (expected, exceptions): probably a better way to handle this, as might not be uncaught node.agent().onUncaughtException(failure); @@ -129,7 +128,7 @@ public abstract class ReadData extends AbstractEpochRequest<ReadData.ReadNack> // and prevents races where we respond before dispatching all the required reads (if the reads are // completing faster than the reads can be setup on all required shards) if (-1 == --waitingOnCount) - reply(this.unavailable, data); + reply(this.unavailable, data, null); } protected synchronized void readComplete(CommandStore commandStore, @Nullable Data result, @Nullable Ranges unavailable) @@ -153,7 +152,8 @@ public abstract class ReadData extends AbstractEpochRequest<ReadData.ReadNack> { // TODO (expected, exceptions): should send exception to client, and consistency handle/propagate locally logger.trace("{}: read failed for {}: {}", txnId, unsafeStore, throwable); - node.reply(replyTo, replyContext, ReadNack.Error); + node.reply(replyTo, replyContext, null, throwable); + cancel(); } else readComplete(unsafeStore, next, unavailable); @@ -173,7 +173,7 @@ public abstract class ReadData extends AbstractEpochRequest<ReadData.ReadNack> public enum ReadNack implements ReadReply { - Invalid, NotCommitted, Redundant, Error; + Invalid, NotCommitted, Redundant; @Override public String toString() diff --git a/accord-core/src/main/java/accord/messages/ReadTxnData.java b/accord-core/src/main/java/accord/messages/ReadTxnData.java index b34e9aeb..4e31910c 100644 --- a/accord-core/src/main/java/accord/messages/ReadTxnData.java +++ b/accord-core/src/main/java/accord/messages/ReadTxnData.java @@ -199,7 +199,7 @@ public class ReadTxnData extends ReadData implements Command.TransientListener, if (state == State.PENDING) { state = State.OBSOLETE; - node.reply(replyTo, replyContext, Redundant); + node.reply(replyTo, replyContext, Redundant, null); } } @@ -231,18 +231,20 @@ public class ReadTxnData extends ReadData implements Command.TransientListener, } @Override - protected void reply(@Nullable Ranges unavailable, @Nullable Data data) + protected void reply(@Nullable Ranges unavailable, @Nullable Data data, @Nullable Throwable fail) { switch (state) { case RETURNED: - throw new IllegalStateException("ReadOk was sent, yet ack called again"); + throw new IllegalStateException("ReadOk was sent, yet ack called again", fail); case OBSOLETE: logger.debug("After the read completed for txn {}, the result was marked obsolete", txnId); + if (fail != null) + node.agent().onUncaughtException(fail); break; case PENDING: state = State.RETURNED; - node.reply(replyTo, replyContext, new ReadOk(unavailable, data)); + node.reply(replyTo, replyContext, fail == null ? new ReadOk(unavailable, data) : null, fail); break; default: throw new AssertionError("Unknown state: " + state); diff --git a/accord-core/src/main/java/accord/messages/Reply.java b/accord-core/src/main/java/accord/messages/Reply.java index e82c4f99..e1fc7e07 100644 --- a/accord-core/src/main/java/accord/messages/Reply.java +++ b/accord-core/src/main/java/accord/messages/Reply.java @@ -18,7 +18,28 @@ package accord.messages; +import javax.annotation.Nonnull; + +import static accord.messages.MessageType.FAILURE_RSP; + public interface Reply extends Message { default boolean isFinal() { return true; } + + class FailureReply implements Reply + { + @Nonnull + public final Throwable failure; + + public FailureReply(@Nonnull Throwable failure) + { + this.failure = failure; + } + + @Override + public MessageType type() + { + return FAILURE_RSP; + } + } } diff --git a/accord-core/src/main/java/accord/messages/WaitOnCommit.java b/accord-core/src/main/java/accord/messages/WaitOnCommit.java index 5ec41e0b..ca52b693 100644 --- a/accord-core/src/main/java/accord/messages/WaitOnCommit.java +++ b/accord-core/src/main/java/accord/messages/WaitOnCommit.java @@ -20,16 +20,21 @@ package accord.messages; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; -import accord.local.*; -import accord.local.Node.Id; -import accord.primitives.*; -import accord.utils.MapReduceConsume; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static accord.local.SaveStatus.LocalExecution.WaitingToExecute; - +import accord.local.Command; +import accord.local.Node; +import accord.local.Node.Id; +import accord.local.PreLoadContext; +import accord.local.SafeCommand; +import accord.local.SafeCommandStore; +import accord.primitives.Participants; +import accord.primitives.TxnId; import accord.topology.Topology; +import accord.utils.MapReduceConsume; + +import static accord.local.SaveStatus.LocalExecution.WaitingToExecute; public class WaitOnCommit implements Request, MapReduceConsume<SafeCommandStore, Void>, PreLoadContext, Command.TransientListener { @@ -141,13 +146,30 @@ public class WaitOnCommit implements Request, MapReduceConsume<SafeCommandStore, @Override public void accept(Void result, Throwable failure) { - ack(); + if (failure != null) + { + while (true) + { + int initialValue = waitingOnUpdater.get(this); + if (initialValue == -1) + { + node.agent().onUncaughtException(new IllegalStateException("Had error in WaitOnCommit, but already replied so can't send failure response", failure)); + break; + } + if (waitingOnUpdater.compareAndSet(this, initialValue, -1)) + node.reply(replyTo, replyContext, null, failure); + } + } + else + { + ack(); + } } private void ack() { if (waitingOnUpdater.decrementAndGet(this) == -1) - node.reply(replyTo, replyContext, WaitOnCommitOk.INSTANCE); + node.reply(replyTo, replyContext, WaitOnCommitOk.INSTANCE, null); } @Override diff --git a/accord-core/src/main/java/accord/messages/WaitUntilApplied.java b/accord-core/src/main/java/accord/messages/WaitUntilApplied.java index 06905dda..682c33b4 100644 --- a/accord-core/src/main/java/accord/messages/WaitUntilApplied.java +++ b/accord-core/src/main/java/accord/messages/WaitUntilApplied.java @@ -193,7 +193,7 @@ public class WaitUntilApplied extends ReadData implements Command.TransientListe return; isInvalid = true; - node.reply(replyTo, replyContext, Invalid); + node.reply(replyTo, replyContext, Invalid, null); } synchronized void sendTruncated() @@ -202,7 +202,7 @@ public class WaitUntilApplied extends ReadData implements Command.TransientListe return; isInvalid = true; - node.reply(replyTo, replyContext, Redundant); + node.reply(replyTo, replyContext, Redundant, null); } void applied(SafeCommandStore safeStore, SafeCommand safeCommand) @@ -223,12 +223,13 @@ public class WaitUntilApplied extends ReadData implements Command.TransientListe } @Override - protected void reply(@Nullable Ranges unavailable, @Nullable Data data) + protected void reply(@Nullable Ranges unavailable, @Nullable Data data, @Nullable Throwable fail) { if (isInvalid) return; - node.reply(replyTo, replyContext, new ReadOk(unavailable, data)); + // data can be null so send the failure response if a failure is present + node.reply(replyTo, replyContext, fail == null ? new ReadOk(unavailable, data) : null, fail); } private void removeListener(SafeCommandStore safeStore, TxnId txnId) diff --git a/accord-core/src/main/java/accord/utils/Utils.java b/accord-core/src/main/java/accord/utils/Utils.java index 3ce0cf1e..696edb94 100644 --- a/accord-core/src/main/java/accord/utils/Utils.java +++ b/accord-core/src/main/java/accord/utils/Utils.java @@ -18,13 +18,23 @@ package accord.utils; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.NavigableMap; +import java.util.NavigableSet; +import java.util.Set; +import java.util.SortedMap; +import java.util.SortedSet; +import java.util.TreeMap; +import java.util.TreeSet; +import java.util.function.IntFunction; + import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSortedMap; import com.google.common.collect.ImmutableSortedSet; -import java.util.*; -import java.util.function.IntFunction; - // TODO (low priority): remove when jdk8 support is dropped public class Utils { diff --git a/accord-core/src/main/java/accord/utils/async/AsyncResults.java b/accord-core/src/main/java/accord/utils/async/AsyncResults.java index dfa7fe08..5773e685 100644 --- a/accord-core/src/main/java/accord/utils/async/AsyncResults.java +++ b/accord-core/src/main/java/accord/utils/async/AsyncResults.java @@ -18,9 +18,6 @@ package accord.utils.async; -import accord.api.VisibleForImplementation; -import accord.utils.Invariants; - import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; @@ -28,6 +25,9 @@ import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.function.BiConsumer; import java.util.function.Function; +import accord.api.VisibleForImplementation; +import accord.utils.Invariants; + public class AsyncResults { public static final AsyncResult<Void> SUCCESS_VOID = success(null); @@ -104,7 +104,7 @@ public class AsyncResults } } - boolean trySetResult(V result, Throwable failure) + protected boolean trySetResult(V result, Throwable failure) { return trySetResult(new Result<>(result, failure)); } @@ -323,7 +323,7 @@ public class AsyncResults @VisibleForImplementation public static class RunnableResult<V> extends AbstractResult<V> implements Runnable { - private final Callable<V> callable; + protected final Callable<V> callable; public RunnableResult(Callable<V> callable) { @@ -333,14 +333,19 @@ public class AsyncResults @Override public void run() { + // There are two different type of exceptions: user function throws, listener throws. To make sure this is clear, + // make sure to catch the exception from the user function and set as failed, and let the listener exceptions bubble up. + V call; try { - trySetResult(callable.call(), null); + call = callable.call(); } catch (Throwable t) { trySetResult(null, t); + return; } + trySetResult(call, null); } } diff --git a/accord-core/src/test/java/accord/burn/BurnTestConfigurationService.java b/accord-core/src/test/java/accord/burn/BurnTestConfigurationService.java index 4c9ce6c0..01ea96d9 100644 --- a/accord-core/src/test/java/accord/burn/BurnTestConfigurationService.java +++ b/accord-core/src/test/java/accord/burn/BurnTestConfigurationService.java @@ -18,16 +18,6 @@ package accord.burn; -import accord.api.TestableConfigurationService; -import accord.local.AgentExecutor; -import accord.impl.AbstractConfigurationService; -import accord.primitives.Ranges; -import accord.utils.RandomSource; -import accord.local.Node; -import accord.messages.*; -import accord.topology.Topology; -import accord.utils.async.AsyncResults; - import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -35,6 +25,20 @@ import java.util.Map; import java.util.function.Function; import java.util.function.Supplier; +import accord.api.TestableConfigurationService; +import accord.impl.AbstractConfigurationService; +import accord.local.AgentExecutor; +import accord.local.Node; +import accord.messages.Callback; +import accord.messages.MessageType; +import accord.messages.Reply; +import accord.messages.ReplyContext; +import accord.messages.Request; +import accord.primitives.Ranges; +import accord.topology.Topology; +import accord.utils.RandomSource; +import accord.utils.async.AsyncResults; + public class BurnTestConfigurationService extends AbstractConfigurationService.Minimal implements TestableConfigurationService { private final AgentExecutor executor; @@ -66,7 +70,7 @@ public class BurnTestConfigurationService extends AbstractConfigurationService.M public void process(Node on, Node.Id from, ReplyContext replyContext) { Topology topology = on.configService().getTopologyForEpoch(epoch); - on.reply(from, replyContext, new FetchTopologyReply(topology)); + on.reply(from, replyContext, new FetchTopologyReply(topology), null); } @Override diff --git a/accord-core/src/test/java/accord/impl/basic/NodeSink.java b/accord-core/src/test/java/accord/impl/basic/NodeSink.java index 20f1705e..e14fb523 100644 --- a/accord-core/src/test/java/accord/impl/basic/NodeSink.java +++ b/accord-core/src/test/java/accord/impl/basic/NodeSink.java @@ -23,16 +23,17 @@ import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.function.Function; +import accord.api.MessageSink; import accord.local.AgentExecutor; -import accord.messages.SafeCallback; -import accord.utils.RandomSource; import accord.local.Node; import accord.local.Node.Id; -import accord.api.MessageSink; import accord.messages.Callback; import accord.messages.Reply; +import accord.messages.Reply.FailureReply; import accord.messages.ReplyContext; import accord.messages.Request; +import accord.messages.SafeCallback; +import accord.utils.RandomSource; import static accord.impl.basic.Packet.SENTINEL_MESSAGE_ID; @@ -82,4 +83,10 @@ public class NodeSink implements MessageSink { parent.add(self, replyToNode, Packet.getMessageId(replyContext), reply); } + + @Override + public void replyWithUnknownFailure(Id replyingToNode, ReplyContext replyContext, Throwable failure) + { + reply(replyingToNode, replyContext, new FailureReply(failure)); + } } diff --git a/accord-core/src/test/java/accord/impl/basic/SimulatedDelayedExecutorService.java b/accord-core/src/test/java/accord/impl/basic/SimulatedDelayedExecutorService.java index 63fbae83..2f3bd1b1 100644 --- a/accord-core/src/test/java/accord/impl/basic/SimulatedDelayedExecutorService.java +++ b/accord-core/src/test/java/accord/impl/basic/SimulatedDelayedExecutorService.java @@ -98,7 +98,7 @@ public class SimulatedDelayedExecutorService extends TaskExecutorService impleme // run without setting the result try { - fn.call(); + callable.call(); long nowMillis = pending.nowInMillis(); if (periodMillis > 0) { @@ -117,7 +117,7 @@ public class SimulatedDelayedExecutorService extends TaskExecutorService impleme } catch (Throwable t) { - setFailure(t); + trySetResult(null, t); } } } diff --git a/accord-core/src/test/java/accord/impl/basic/TaskExecutorService.java b/accord-core/src/test/java/accord/impl/basic/TaskExecutorService.java index 059bc635..0e733095 100644 --- a/accord-core/src/test/java/accord/impl/basic/TaskExecutorService.java +++ b/accord-core/src/test/java/accord/impl/basic/TaskExecutorService.java @@ -32,26 +32,11 @@ import accord.utils.async.AsyncResults; public abstract class TaskExecutorService extends AbstractExecutorService implements AgentExecutor { - public static class Task<T> extends AsyncResults.SettableResult<T> implements Pending, RunnableFuture<T> + public static class Task<T> extends AsyncResults.RunnableResult<T> implements Pending, RunnableFuture<T> { - protected final Callable<T> fn; - public Task(Callable<T> fn) { - this.fn = fn; - } - - @Override - public void run() - { - try - { - setSuccess(fn.call()); - } - catch (Throwable t) - { - setFailure(t); - } + super(fn); } @Override diff --git a/accord-core/src/test/java/accord/impl/list/ListAgent.java b/accord-core/src/test/java/accord/impl/list/ListAgent.java index a0b55d35..5fb3924c 100644 --- a/accord-core/src/test/java/accord/impl/list/ListAgent.java +++ b/accord-core/src/test/java/accord/impl/list/ListAgent.java @@ -20,14 +20,20 @@ package accord.impl.list; import java.util.function.Consumer; +import accord.api.Agent; +import accord.api.Result; import accord.impl.mock.Network; import accord.local.Command; import accord.local.Node; -import accord.api.Agent; -import accord.api.Result; -import accord.primitives.*; +import accord.primitives.Keys; +import accord.primitives.Ranges; +import accord.primitives.Seekables; +import accord.primitives.Timestamp; +import accord.primitives.Txn; +import accord.primitives.TxnId; import static accord.local.Node.Id.NONE; +import static accord.utils.Invariants.checkState; import static com.google.common.base.Functions.identity; public class ListAgent implements Agent @@ -46,11 +52,16 @@ public class ListAgent implements Agent @Override public void onRecover(Node node, Result success, Throwable fail) { + if (fail != null) + { + checkState(success == null, "fail (%s) and success (%s) are both not null", fail, success); + // We don't really process errors for Recover here even though it is provided in the interface + } if (success != null) { ListResult result = (ListResult) success; if (result.requestId > Integer.MIN_VALUE) - node.reply(result.client, Network.replyCtxFor(result.requestId), result); + node.reply(result.client, Network.replyCtxFor(result.requestId), result, null); } } diff --git a/accord-core/src/test/java/accord/impl/list/ListRequest.java b/accord-core/src/test/java/accord/impl/list/ListRequest.java index b0874c61..c6127a30 100644 --- a/accord-core/src/test/java/accord/impl/list/ListRequest.java +++ b/accord-core/src/test/java/accord/impl/list/ListRequest.java @@ -35,9 +35,9 @@ import accord.messages.CheckStatus.CheckStatusOk; import accord.messages.CheckStatus.IncludeInfo; import accord.messages.MessageType; import accord.messages.ReplyContext; +import accord.messages.Request; import accord.primitives.RoutingKeys; import accord.primitives.Txn; -import accord.messages.Request; import accord.primitives.TxnId; import static accord.local.Status.Phase.Cleanup; @@ -107,41 +107,36 @@ public class ListRequest implements Request @Override public void accept(Result success, Throwable fail) { - // TODO (desired, testing): error handling - if (success != null) - { - node.reply(client, replyContext, (ListResult) success); - } - else if (fail instanceof CoordinationFailed) + if (fail instanceof CoordinationFailed) { RoutingKey homeKey = ((CoordinationFailed) fail).homeKey(); TxnId txnId = ((CoordinationFailed) fail).txnId(); if (fail instanceof Invalidated) { - node.reply(client, replyContext, new ListResult(client, ((Packet) replyContext).requestId, txnId, null, null, null, null)); + node.reply(client, replyContext, new ListResult(client, ((Packet) replyContext).requestId, txnId, null, null, null, null), null); return; } - node.reply(client, replyContext, new ListResult(client, ((Packet)replyContext).requestId, txnId, null, null, new int[0][], null)); + node.reply(client, replyContext, new ListResult(client, ((Packet)replyContext).requestId, txnId, null, null, new int[0][], null), null); ((Cluster)node.scheduler()).onDone(() -> { node.commandStores() .select(homeKey) .execute(() -> CheckOnResult.checkOnResult(node, txnId, homeKey, (s, f) -> { if (f != null) { - node.reply(client, replyContext, new ListResult(client, ((Packet) replyContext).requestId, txnId, null, null, f instanceof Truncated ? new int[2][] : new int[3][], null)); + node.reply(client, replyContext, new ListResult(client, ((Packet) replyContext).requestId, txnId, null, null, f instanceof Truncated ? new int[2][] : new int[3][], null), null); return; } switch (s) { case Truncated: - node.reply(client, replyContext, new ListResult(client, ((Packet) replyContext).requestId, txnId, null, null, new int[2][], null)); + node.reply(client, replyContext, new ListResult(client, ((Packet) replyContext).requestId, txnId, null, null, new int[2][], null), null); break; case Invalidated: - node.reply(client, replyContext, new ListResult(client, ((Packet) replyContext).requestId, txnId, null, null, null, null)); + node.reply(client, replyContext, new ListResult(client, ((Packet) replyContext).requestId, txnId, null, null, null, null), null); break; case Lost: - node.reply(client, replyContext, new ListResult(client, ((Packet) replyContext).requestId, txnId, null, null, new int[1][], null)); + node.reply(client, replyContext, new ListResult(client, ((Packet) replyContext).requestId, txnId, null, null, new int[1][], null), null); break; case Other: // currently caught elsewhere in response tracking, but might help to throw an exception here @@ -149,6 +144,10 @@ public class ListRequest implements Request })); }); } + else + { + node.reply(client, replyContext, (ListResult) success, fail); + } } } diff --git a/accord-core/src/test/java/accord/impl/mock/SimpleMessageSink.java b/accord-core/src/test/java/accord/impl/mock/SimpleMessageSink.java index e33314d2..ba0502ef 100644 --- a/accord-core/src/test/java/accord/impl/mock/SimpleMessageSink.java +++ b/accord-core/src/test/java/accord/impl/mock/SimpleMessageSink.java @@ -18,11 +18,13 @@ package accord.impl.mock; +import accord.api.MessageSink; import accord.local.AgentExecutor; import accord.local.Node; -import accord.api.MessageSink; +import accord.local.Node.Id; import accord.messages.Callback; import accord.messages.Reply; +import accord.messages.Reply.FailureReply; import accord.messages.ReplyContext; import accord.messages.Request; @@ -54,4 +56,10 @@ public class SimpleMessageSink implements MessageSink { network.reply(node, replyingToNode, Network.getMessageId(replyContext), reply); } + + @Override + public void replyWithUnknownFailure(Id replyingToNode, ReplyContext replyContext, Throwable failure) + { + network.reply(node, replyingToNode, Network.getMessageId(replyContext), new FailureReply(failure)); + } } diff --git a/accord-core/src/test/java/accord/utils/MessageTask.java b/accord-core/src/test/java/accord/utils/MessageTask.java index d1064e31..528a6e4b 100644 --- a/accord-core/src/test/java/accord/utils/MessageTask.java +++ b/accord-core/src/test/java/accord/utils/MessageTask.java @@ -18,15 +18,24 @@ package accord.utils; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.function.BiConsumer; +import java.util.function.Consumer; + +import com.google.common.collect.ImmutableList; + import accord.local.AgentExecutor; import accord.local.Node; -import accord.messages.*; +import accord.messages.Callback; +import accord.messages.MessageType; +import accord.messages.Reply; +import accord.messages.ReplyContext; +import accord.messages.Request; import accord.utils.async.AsyncResults; -import com.google.common.collect.ImmutableList; - -import java.util.*; -import java.util.function.BiConsumer; -import java.util.function.Consumer; /** * Message task that will continue sending messages to a set of nodes until all @@ -88,7 +97,7 @@ public class MessageTask extends AsyncResults.SettableResult<Void> implements Ru @Override public void process(Node on, Node.Id from, ReplyContext replyContext) { - process.process(on, from, success -> on.reply(from, replyContext, success ? SUCCESS : FAILURE)); + process.process(on, from, success -> on.reply(from, replyContext, success ? SUCCESS : FAILURE, null)); } @Override diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java b/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java index 51da1b3c..cd7741be 100644 --- a/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java +++ b/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java @@ -51,6 +51,7 @@ import accord.local.NodeTimeService; import accord.local.ShardDistributor; import accord.messages.Callback; import accord.messages.Reply; +import accord.messages.Reply.FailureReply; import accord.messages.ReplyContext; import accord.messages.Request; import accord.messages.SafeCallback; @@ -120,6 +121,12 @@ public class Cluster implements Scheduler long replyToMessage = ((Packet) replyContext).body.msg_id; parent.add(self, replyToNode, replyToMessage, reply); } + + @Override + public void replyWithUnknownFailure(Id replyingToNode, ReplyContext replyContext, Throwable failure) + { + reply(replyingToNode, replyContext, new FailureReply(failure)); + } } final Function<Id, Node> lookup; diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/Error.java b/accord-maelstrom/src/main/java/accord/maelstrom/Error.java index d0691180..dad08b0d 100644 --- a/accord-maelstrom/src/main/java/accord/maelstrom/Error.java +++ b/accord-maelstrom/src/main/java/accord/maelstrom/Error.java @@ -22,8 +22,8 @@ import java.io.IOException; import accord.maelstrom.Packet.Type; import accord.messages.MessageType; -import com.google.gson.stream.JsonWriter; import accord.messages.Reply; +import com.google.gson.stream.JsonWriter; public class Error extends Body implements Reply { diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromAgent.java b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromAgent.java index 34f34d1f..30f2a0ff 100644 --- a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromAgent.java +++ b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromAgent.java @@ -18,13 +18,20 @@ package accord.maelstrom; -import accord.local.Command; -import accord.local.Node; +import java.util.concurrent.TimeUnit; + import accord.api.Agent; import accord.api.Result; -import accord.primitives.*; +import accord.local.Command; +import accord.local.Node; +import accord.primitives.Keys; +import accord.primitives.Ranges; +import accord.primitives.Seekables; +import accord.primitives.Timestamp; +import accord.primitives.Txn; +import accord.primitives.TxnId; -import java.util.concurrent.TimeUnit; +import static accord.utils.Invariants.checkState; public class MaelstromAgent implements Agent { @@ -33,10 +40,15 @@ public class MaelstromAgent implements Agent @Override public void onRecover(Node node, Result success, Throwable fail) { + if (fail != null) + { + checkState(success == null, "fail (%s) and success (%s) are both not null", fail, success); + // We don't really process errors for Recover here even though it is provided in the interface + } if (success != null) { MaelstromResult result = (MaelstromResult) success; - node.reply(result.client, MaelstromReplyContext.contextFor(result.requestId), new MaelstromReply(result.requestId, result)); + node.reply(result.client, MaelstromReplyContext.contextFor(result.requestId), new MaelstromReply(result.requestId, result), null); } } diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromRequest.java b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromRequest.java index e81f539b..f9aad092 100644 --- a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromRequest.java +++ b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromRequest.java @@ -23,17 +23,18 @@ import java.util.NavigableSet; import java.util.TreeSet; import accord.api.Key; +import accord.local.Node; +import accord.local.Node.Id; +import accord.maelstrom.Packet.Type; import accord.messages.MessageType; +import accord.messages.Reply; import accord.messages.ReplyContext; +import accord.messages.Request; import accord.primitives.Keys; +import accord.primitives.Txn; import com.google.gson.stream.JsonReader; import com.google.gson.stream.JsonToken; import com.google.gson.stream.JsonWriter; -import accord.local.Node; -import accord.local.Node.Id; -import accord.primitives.Txn; -import accord.maelstrom.Packet.Type; -import accord.messages.Request; public class MaelstromRequest extends Body implements Request { @@ -49,8 +50,8 @@ public class MaelstromRequest extends Body implements Request public void process(Node node, Id client, ReplyContext replyContext) { node.coordinate(txn).addCallback((success, fail) -> { - if (success != null) node.reply(client, replyContext, new MaelstromReply(MaelstromReplyContext.messageIdFor(replyContext), (MaelstromResult) success)); -// else node.reply(client, messageId, new Error(messageId, 13, fail.getMessage())); + Reply reply = success != null ? new MaelstromReply(MaelstromReplyContext.messageIdFor(replyContext), (MaelstromResult) success) : null; + node.reply(client, replyContext, reply, fail); }); } diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/Main.java b/accord-maelstrom/src/main/java/accord/maelstrom/Main.java index 0d96058e..1600e03a 100644 --- a/accord-maelstrom/src/main/java/accord/maelstrom/Main.java +++ b/accord-maelstrom/src/main/java/accord/maelstrom/Main.java @@ -44,6 +44,7 @@ import accord.local.ShardDistributor; import accord.maelstrom.Packet.Type; import accord.messages.Callback; import accord.messages.Reply; +import accord.messages.Reply.FailureReply; import accord.messages.ReplyContext; import accord.messages.Request; import accord.topology.Topology; @@ -129,6 +130,12 @@ public class Main { send(new Packet(self, replyToNode, MaelstromReplyContext.messageIdFor(replyContext), reply)); } + + @Override + public void replyWithUnknownFailure(Id replyingToNode, ReplyContext replyContext, Throwable failure) + { + reply(replyingToNode, replyContext, new FailureReply(failure)); + } } public static void listen(TopologyFactory topologyFactory, InputStream stdin, PrintStream out, PrintStream err) throws IOException --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org