This is an automated email from the ASF dual-hosted git repository.

aweisberg pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 91336705 Allow exceptions to be propagated remotely
91336705 is described below

commit 91336705bde8332954e849219d73205d68fa168a
Author: aweisberg <ar...@weisberg.ws>
AuthorDate: Fri Aug 18 16:48:42 2023 -0400

    Allow exceptions to be propagated remotely
    
    https://github.com/apache/cassandra-accord/pull/56
    
    Patch by Ariel Weisberg; Reviewed by David Capwell for CASSANDRA-18779
---
 .../src/main/java/accord/api/MessageSink.java      |  1 +
 .../accord/coordinate/CoordinateShardDurable.java  |  5 ---
 .../src/main/java/accord/coordinate/Execute.java   | 17 +++++----
 .../java/accord/impl/AbstractFetchCoordinator.java |  8 ++--
 accord-core/src/main/java/accord/local/Node.java   | 26 ++++++++++---
 .../java/accord/messages/AbstractEpochRequest.java |  2 +-
 .../src/main/java/accord/messages/Accept.java      | 29 +++++++++-----
 .../src/main/java/accord/messages/Apply.java       |  4 +-
 .../main/java/accord/messages/BeginRecovery.java   | 40 ++++++++++++++------
 .../src/main/java/accord/messages/CheckStatus.java |  8 ++--
 .../src/main/java/accord/messages/Commit.java      | 44 +++++++++++++---------
 .../src/main/java/accord/messages/GetDeps.java     | 18 +++++----
 .../main/java/accord/messages/InformDurable.java   | 20 +++++-----
 .../src/main/java/accord/messages/MessageType.java |  1 +
 .../src/main/java/accord/messages/PreAccept.java   | 27 ++++++++-----
 .../src/main/java/accord/messages/ReadData.java    | 22 +++++------
 .../src/main/java/accord/messages/ReadTxnData.java | 10 +++--
 .../src/main/java/accord/messages/Reply.java       | 21 +++++++++++
 .../main/java/accord/messages/WaitOnCommit.java    | 38 +++++++++++++++----
 .../java/accord/messages/WaitUntilApplied.java     |  9 +++--
 accord-core/src/main/java/accord/utils/Utils.java  | 16 ++++++--
 .../main/java/accord/utils/async/AsyncResults.java | 17 ++++++---
 .../accord/burn/BurnTestConfigurationService.java  | 26 +++++++------
 .../src/test/java/accord/impl/basic/NodeSink.java  | 13 +++++--
 .../basic/SimulatedDelayedExecutorService.java     |  4 +-
 .../accord/impl/basic/TaskExecutorService.java     | 19 +---------
 .../src/test/java/accord/impl/list/ListAgent.java  | 19 ++++++++--
 .../test/java/accord/impl/list/ListRequest.java    | 25 ++++++------
 .../java/accord/impl/mock/SimpleMessageSink.java   | 10 ++++-
 .../src/test/java/accord/utils/MessageTask.java    | 23 +++++++----
 .../src/main/java/accord/maelstrom/Cluster.java    |  7 ++++
 .../src/main/java/accord/maelstrom/Error.java      |  2 +-
 .../main/java/accord/maelstrom/MaelstromAgent.java | 22 ++++++++---
 .../java/accord/maelstrom/MaelstromRequest.java    | 15 ++++----
 .../src/main/java/accord/maelstrom/Main.java       |  7 ++++
 35 files changed, 376 insertions(+), 199 deletions(-)

diff --git a/accord-core/src/main/java/accord/api/MessageSink.java 
b/accord-core/src/main/java/accord/api/MessageSink.java
index ee4d681e..47a3fa03 100644
--- a/accord-core/src/main/java/accord/api/MessageSink.java
+++ b/accord-core/src/main/java/accord/api/MessageSink.java
@@ -30,4 +30,5 @@ public interface MessageSink
     void send(Id to, Request request);
     void send(Id to, Request request, AgentExecutor executor, Callback 
callback);
     void reply(Id replyingToNode, ReplyContext replyContext, Reply reply);
+    void replyWithUnknownFailure(Id replyingToNode, ReplyContext replyContext, 
Throwable failure);
 }
diff --git 
a/accord-core/src/main/java/accord/coordinate/CoordinateShardDurable.java 
b/accord-core/src/main/java/accord/coordinate/CoordinateShardDurable.java
index daaf81fe..1b5b39b1 100644
--- a/accord-core/src/main/java/accord/coordinate/CoordinateShardDurable.java
+++ b/accord-core/src/main/java/accord/coordinate/CoordinateShardDurable.java
@@ -77,11 +77,6 @@ public class CoordinateShardDurable extends 
SettableResult<Void> implements Call
                     tryFailure(new RuntimeException("Unexpected reply"));
                     return;
 
-                case Error:
-                    // TODO (required): error propagation
-                    tryFailure(new RuntimeException("Unknown error"));
-                    return;
-
                 case Invalid:
                     tryFailure(new Invalidated(exclusiveSyncPoint.syncId, 
exclusiveSyncPoint.homeKey));
                     return;
diff --git a/accord-core/src/main/java/accord/coordinate/Execute.java 
b/accord-core/src/main/java/accord/coordinate/Execute.java
index da39a056..7a64395e 100644
--- a/accord-core/src/main/java/accord/coordinate/Execute.java
+++ b/accord-core/src/main/java/accord/coordinate/Execute.java
@@ -24,14 +24,20 @@ import java.util.function.BiConsumer;
 import accord.api.Data;
 import accord.api.Result;
 import accord.local.Node;
-import accord.messages.ReadTxnData;
+import accord.local.Node.Id;
+import accord.messages.Commit;
 import accord.messages.ReadData.ReadNack;
 import accord.messages.ReadData.ReadOk;
 import accord.messages.ReadData.ReadReply;
-import accord.primitives.*;
+import accord.messages.ReadTxnData;
+import accord.primitives.Deps;
+import accord.primitives.FullRoute;
+import accord.primitives.Participants;
+import accord.primitives.Ranges;
+import accord.primitives.Timestamp;
+import accord.primitives.Txn;
+import accord.primitives.TxnId;
 import accord.topology.Topologies;
-import accord.local.Node.Id;
-import accord.messages.Commit;
 import accord.topology.Topology;
 
 import static accord.coordinate.ReadCoordinator.Action.Approve;
@@ -111,9 +117,6 @@ class Execute extends ReadCoordinator<ReadReply>
         switch (nack)
         {
             default: throw new IllegalStateException();
-            case Error:
-                // TODO (expected): report content of error
-                return Action.Reject;
             case Redundant:
                 callback.accept(null, new Preempted(txnId, route.homeKey()));
                 return Action.Aborted;
diff --git 
a/accord-core/src/main/java/accord/impl/AbstractFetchCoordinator.java 
b/accord-core/src/main/java/accord/impl/AbstractFetchCoordinator.java
index eedf19d1..aaeeb863 100644
--- a/accord-core/src/main/java/accord/impl/AbstractFetchCoordinator.java
+++ b/accord-core/src/main/java/accord/impl/AbstractFetchCoordinator.java
@@ -154,8 +154,6 @@ public abstract class AbstractFetchCoordinator extends 
FetchCoordinator
                             case Invalid:
                             case Redundant:
                                 throw new 
AssertionError(String.format("Unexpected reply: %s", reply));
-                            case Error:
-                                // TODO (required): ensure errors are 
propagated to coordinators and can be logged
                         }
                     }
                     return;
@@ -273,9 +271,11 @@ public abstract class AbstractFetchCoordinator extends 
FetchCoordinator
         }
 
         @Override
-        protected void reply(@Nullable Ranges unavailable, @Nullable Data data)
+        protected void reply(@Nullable Ranges unavailable, @Nullable Data 
data, @Nullable Throwable fail)
         {
-            node.reply(replyTo, replyContext, new FetchResponse(unavailable, 
data, maxApplied));
+            // TODO (review): If the fetch response actually does some 
streaming, but we send back the error
+            // it is a lot of work and data that might move and be unaccounted 
for at the coordinator
+            node.reply(replyTo, replyContext, fail == null ? new 
FetchResponse(unavailable, data, maxApplied) : null, fail);
         }
 
         @Override
diff --git a/accord-core/src/main/java/accord/local/Node.java 
b/accord-core/src/main/java/accord/local/Node.java
index 7cf84dfe..1916f0bb 100644
--- a/accord-core/src/main/java/accord/local/Node.java
+++ b/accord-core/src/main/java/accord/local/Node.java
@@ -34,7 +34,6 @@ import java.util.function.ToLongFunction;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
-import accord.primitives.EpochSupplier;
 import com.google.common.annotations.VisibleForTesting;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -60,6 +59,7 @@ import accord.messages.ReplyContext;
 import accord.messages.Request;
 import accord.messages.TxnRequest;
 import accord.primitives.Ballot;
+import accord.primitives.EpochSupplier;
 import accord.primitives.FullRoute;
 import accord.primitives.ProgressToken;
 import accord.primitives.Range;
@@ -475,10 +475,17 @@ public class Node implements 
ConfigurationService.Listener, NodeTimeService
         messageSink.send(to, send);
     }
 
-    public void reply(Id replyingToNode, ReplyContext replyContext, Reply send)
+    public void reply(Id replyingToNode, ReplyContext replyContext, Reply 
send, Throwable failure)
     {
-        // TODO (usability, now): add Throwable as an argument so the error 
check is here, every single message gets this wrong causing a NPE here
-        if (send == null)
+        if (failure != null)
+        {
+            agent.onUncaughtException(failure);
+            if (send != null)
+                agent().onUncaughtException(new 
IllegalArgumentException(String.format("fail (%s) and send (%s) are both not 
null", failure, send)));
+            messageSink.replyWithUnknownFailure(replyingToNode, replyContext, 
failure);
+            return;
+        }
+        else if (send == null)
         {
             NullPointerException e = new NullPointerException();
             agent.onUncaughtException(e);
@@ -633,7 +640,16 @@ public class Node implements 
ConfigurationService.Listener, NodeTimeService
                 return;
             }
         }
-        scheduler.now(() -> request.process(this, from, replyContext));
+        scheduler.now(() -> {
+            try
+            {
+                request.process(this, from, replyContext);
+            }
+            catch (Throwable t)
+            {
+                reply(from, replyContext, null, t);
+            }
+        });
     }
 
     public Scheduler scheduler()
diff --git 
a/accord-core/src/main/java/accord/messages/AbstractEpochRequest.java 
b/accord-core/src/main/java/accord/messages/AbstractEpochRequest.java
index fe1790f6..482fcf33 100644
--- a/accord-core/src/main/java/accord/messages/AbstractEpochRequest.java
+++ b/accord-core/src/main/java/accord/messages/AbstractEpochRequest.java
@@ -56,7 +56,7 @@ public abstract class AbstractEpochRequest<R extends Reply> 
implements PreLoadCo
     @Override
     public void accept(R reply, Throwable failure)
     {
-        node.reply(replyTo, replyContext, reply);
+        node.reply(replyTo, replyContext, reply, failure);
     }
 
     @Override
diff --git a/accord-core/src/main/java/accord/messages/Accept.java 
b/accord-core/src/main/java/accord/messages/Accept.java
index a55bcf43..1554e5dd 100644
--- a/accord-core/src/main/java/accord/messages/Accept.java
+++ b/accord-core/src/main/java/accord/messages/Accept.java
@@ -18,21 +18,30 @@
 
 package accord.messages;
 
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import accord.api.RoutingKey;
 import accord.local.Commands;
 import accord.local.Commands.AcceptOutcome;
+import accord.local.Node.Id;
 import accord.local.SafeCommand;
 import accord.local.SafeCommandStore;
-import accord.primitives.*;
-import accord.local.Node.Id;
+import accord.primitives.Ballot;
+import accord.primitives.Deps;
+import accord.primitives.FullRoute;
+import accord.primitives.PartialDeps;
+import accord.primitives.PartialRoute;
+import accord.primitives.Ranges;
+import accord.primitives.Seekables;
+import accord.primitives.Timestamp;
+import accord.primitives.TxnId;
 import accord.topology.Topologies;
 
-import accord.api.RoutingKey;
-
-import javax.annotation.Nonnull;
-
-import javax.annotation.Nullable;
-
-import static accord.local.Commands.AcceptOutcome.*;
+import static accord.local.Commands.AcceptOutcome.Redundant;
+import static accord.local.Commands.AcceptOutcome.RejectedBallot;
+import static accord.local.Commands.AcceptOutcome.Success;
+import static accord.local.Commands.AcceptOutcome.Truncated;
 
 // TODO (low priority, efficiency): use different objects for send and 
receive, so can be more efficient
 //                                  (e.g. serialize without slicing, and 
without unnecessary fields)
@@ -118,7 +127,7 @@ public class Accept extends 
TxnRequest.WithUnsynced<Accept.AcceptReply>
     @Override
     public void accept(AcceptReply reply, Throwable failure)
     {
-        node.reply(replyTo, replyContext, reply);
+        node.reply(replyTo, replyContext, reply, failure);
     }
 
     @Override
diff --git a/accord-core/src/main/java/accord/messages/Apply.java 
b/accord-core/src/main/java/accord/messages/Apply.java
index da06f6b1..12d5614c 100644
--- a/accord-core/src/main/java/accord/messages/Apply.java
+++ b/accord-core/src/main/java/accord/messages/Apply.java
@@ -153,9 +153,7 @@ public class Apply extends TxnRequest<ApplyReply>
     @Override
     public void accept(ApplyReply reply, Throwable failure)
     {
-        if (failure != null)
-            node.agent().onUncaughtException(failure);
-        node.reply(replyTo, replyContext, reply);
+        node.reply(replyTo, replyContext, reply, failure);
     }
 
     @Override
diff --git a/accord-core/src/main/java/accord/messages/BeginRecovery.java 
b/accord-core/src/main/java/accord/messages/BeginRecovery.java
index f1a16b89..1efd652f 100644
--- a/accord-core/src/main/java/accord/messages/BeginRecovery.java
+++ b/accord-core/src/main/java/accord/messages/BeginRecovery.java
@@ -18,25 +18,43 @@
 
 package accord.messages;
 
-import accord.api.Result;
-import accord.local.*;
-import accord.primitives.*;
-import accord.topology.Topologies;
-
 import java.util.List;
-
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
-import accord.utils.Invariants;
-
+import accord.api.Result;
+import accord.local.Command;
+import accord.local.Commands;
 import accord.local.Node.Id;
+import accord.local.SafeCommand;
+import accord.local.SafeCommandStore;
+import accord.local.Status;
+import accord.primitives.Ballot;
+import accord.primitives.Deps;
+import accord.primitives.FullRoute;
+import accord.primitives.PartialDeps;
+import accord.primitives.PartialRoute;
+import accord.primitives.PartialTxn;
+import accord.primitives.Ranges;
+import accord.primitives.Seekables;
+import accord.primitives.Timestamp;
+import accord.primitives.Txn;
+import accord.primitives.TxnId;
+import accord.primitives.Writes;
+import accord.topology.Topologies;
+import accord.utils.Invariants;
 
 import static accord.local.SafeCommandStore.TestDep.WITH;
 import static accord.local.SafeCommandStore.TestDep.WITHOUT;
 import static accord.local.SafeCommandStore.TestKind.shouldHaveWitnessed;
-import static accord.local.SafeCommandStore.TestTimestamp.*;
-import static accord.local.Status.*;
+import static accord.local.SafeCommandStore.TestTimestamp.EXECUTES_AFTER;
+import static accord.local.SafeCommandStore.TestTimestamp.STARTED_AFTER;
+import static accord.local.SafeCommandStore.TestTimestamp.STARTED_BEFORE;
+import static accord.local.Status.Accepted;
+import static accord.local.Status.Committed;
+import static accord.local.Status.Phase;
+import static accord.local.Status.PreAccepted;
+import static accord.local.Status.PreCommitted;
 import static accord.messages.PreAccept.calculatePartialDeps;
 
 public class BeginRecovery extends TxnRequest<BeginRecovery.RecoverReply>
@@ -179,7 +197,7 @@ public class BeginRecovery extends 
TxnRequest<BeginRecovery.RecoverReply>
     @Override
     public void accept(RecoverReply reply, Throwable failure)
     {
-        node.reply(replyTo, replyContext, reply);
+        node.reply(replyTo, replyContext, reply, failure);
     }
 
     @Override
diff --git a/accord-core/src/main/java/accord/messages/CheckStatus.java 
b/accord-core/src/main/java/accord/messages/CheckStatus.java
index a18b2c54..4c8ae207 100644
--- a/accord-core/src/main/java/accord/messages/CheckStatus.java
+++ b/accord-core/src/main/java/accord/messages/CheckStatus.java
@@ -18,6 +18,8 @@
 
 package accord.messages;
 
+import javax.annotation.Nullable;
+
 import accord.api.Result;
 import accord.api.RoutingKey;
 import accord.coordinate.Infer;
@@ -46,7 +48,6 @@ import accord.primitives.Writes;
 import accord.topology.Topologies;
 import accord.utils.Invariants;
 import accord.utils.MapReduceConsume;
-import javax.annotation.Nullable;
 
 import static accord.local.Status.Committed;
 import static accord.local.Status.Definition;
@@ -175,8 +176,9 @@ public class CheckStatus extends 
AbstractEpochRequest<CheckStatus.CheckStatusRep
     @Override
     public void accept(CheckStatusReply ok, Throwable failure)
     {
-        if (ok == null) node.reply(replyTo, replyContext, 
CheckStatusNack.NotOwned);
-        else node.reply(replyTo, replyContext, ok);
+        if (failure != null) node.reply(replyTo, replyContext, ok, failure);
+        else if (ok == null) node.reply(replyTo, replyContext, 
CheckStatusNack.NotOwned, null);
+        else node.reply(replyTo, replyContext, ok, null);
     }
 
     private Status invalidIfNotAtLeast(SafeCommandStore safeStore)
diff --git a/accord-core/src/main/java/accord/messages/Commit.java 
b/accord-core/src/main/java/accord/messages/Commit.java
index 3954a8eb..b755827b 100644
--- a/accord-core/src/main/java/accord/messages/Commit.java
+++ b/accord-core/src/main/java/accord/messages/Commit.java
@@ -19,20 +19,36 @@
 package accord.messages;
 
 import java.util.Set;
+import javax.annotation.Nullable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import accord.local.*;
+import accord.local.Commands;
+import accord.local.Node;
+import accord.local.Node.Id;
+import accord.local.PreLoadContext;
+import accord.local.SafeCommand;
+import accord.local.SafeCommandStore;
 import accord.messages.ReadData.ReadNack;
 import accord.messages.ReadData.ReadReply;
-import accord.primitives.*;
-import accord.local.Node.Id;
+import accord.primitives.Deps;
+import accord.primitives.FullRoute;
+import accord.primitives.Keys;
+import accord.primitives.PartialDeps;
+import accord.primitives.PartialRoute;
+import accord.primitives.PartialTxn;
+import accord.primitives.Participants;
+import accord.primitives.Ranges;
+import accord.primitives.Route;
+import accord.primitives.Seekables;
+import accord.primitives.Timestamp;
+import accord.primitives.Txn;
+import accord.primitives.TxnId;
+import accord.primitives.Unseekables;
 import accord.topology.Topologies;
-import javax.annotation.Nullable;
-
-import accord.utils.Invariants;
-
 import accord.topology.Topology;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import accord.utils.Invariants;
 
 import static accord.local.Status.Committed;
 import static accord.local.Status.Known.DefinitionOnly;
@@ -177,14 +193,8 @@ public class Commit extends TxnRequest<ReadNack>
     @Override
     public synchronized void accept(ReadNack reply, Throwable failure)
     {
-        if (failure != null)
-        {
-            logger.error("Unhandled exception during commit", failure);
-            node.agent().onUncaughtException(failure);
-            return;
-        }
-        if (reply != null)
-            node.reply(replyTo, replyContext, reply);
+        if (reply != null || failure != null)
+            node.reply(replyTo, replyContext, reply, failure);
         else if (read != null)
             read.process(node, replyTo, replyContext);
         if (defer != null)
diff --git a/accord-core/src/main/java/accord/messages/GetDeps.java 
b/accord-core/src/main/java/accord/messages/GetDeps.java
index f757aa7e..083c15c8 100644
--- a/accord-core/src/main/java/accord/messages/GetDeps.java
+++ b/accord-core/src/main/java/accord/messages/GetDeps.java
@@ -18,14 +18,19 @@
 
 package accord.messages;
 
-import accord.local.SafeCommandStore;
-import accord.primitives.*;
-import accord.utils.Invariants;
+import javax.annotation.Nonnull;
 
 import accord.local.Node.Id;
+import accord.local.SafeCommandStore;
+import accord.primitives.FullRoute;
+import accord.primitives.PartialDeps;
+import accord.primitives.PartialRoute;
+import accord.primitives.Ranges;
+import accord.primitives.Seekables;
+import accord.primitives.Timestamp;
+import accord.primitives.TxnId;
 import accord.topology.Topologies;
-
-import javax.annotation.Nonnull;
+import accord.utils.Invariants;
 
 import static accord.messages.PreAccept.calculatePartialDeps;
 
@@ -78,8 +83,7 @@ public class GetDeps extends 
TxnRequest.WithUnsynced<PartialDeps>
     @Override
     public void accept(PartialDeps result, Throwable failure)
     {
-        if (result == null) node.agent().onUncaughtException(failure); // TODO 
(expected): propagate failures to coordinator
-        else node.reply(replyTo, replyContext, new GetDepsOk(result));
+        node.reply(replyTo, replyContext, result != null ? new 
GetDepsOk(result) : null, failure);
     }
 
     @Override
diff --git a/accord-core/src/main/java/accord/messages/InformDurable.java 
b/accord-core/src/main/java/accord/messages/InformDurable.java
index 79a16121..d9f42214 100644
--- a/accord-core/src/main/java/accord/messages/InformDurable.java
+++ b/accord-core/src/main/java/accord/messages/InformDurable.java
@@ -19,10 +19,17 @@
 package accord.messages;
 
 import accord.api.ProgressLog.ProgressShard;
-import accord.local.*;
+import accord.local.Commands;
 import accord.local.Node.Id;
+import accord.local.PreLoadContext;
+import accord.local.SafeCommand;
+import accord.local.SafeCommandStore;
+import accord.local.Status;
 import accord.local.Status.Durability;
-import accord.primitives.*;
+import accord.primitives.FullRoute;
+import accord.primitives.PartialRoute;
+import accord.primitives.Timestamp;
+import accord.primitives.TxnId;
 import accord.topology.Topologies;
 import accord.utils.Invariants;
 
@@ -104,14 +111,7 @@ public class InformDurable extends TxnRequest<Reply> 
implements PreLoadContext
     @Override
     public void accept(Reply reply, Throwable failure)
     {
-        // TODO: respond with failure
-        if (reply == null)
-        {
-            if (failure == null)
-                throw new IllegalStateException("Processed nothing on this 
node");
-            throw new IllegalStateException(failure);
-        }
-        node.reply(replyTo, replyContext, reply);
+        node.reply(replyTo, replyContext, reply, failure);
     }
 
     @Override
diff --git a/accord-core/src/main/java/accord/messages/MessageType.java 
b/accord-core/src/main/java/accord/messages/MessageType.java
index 8b68436e..7909157a 100644
--- a/accord-core/src/main/java/accord/messages/MessageType.java
+++ b/accord-core/src/main/java/accord/messages/MessageType.java
@@ -54,6 +54,7 @@ public enum MessageType
     SET_GLOBALLY_DURABLE_REQ (true ),
     QUERY_DURABLE_BEFORE_REQ (false),
     QUERY_DURABLE_BEFORE_RSP (false),
+    FAILURE_RSP              (false),
     ;
 
     /**
diff --git a/accord-core/src/main/java/accord/messages/PreAccept.java 
b/accord-core/src/main/java/accord/messages/PreAccept.java
index d035e8ed..1be51ca2 100644
--- a/accord-core/src/main/java/accord/messages/PreAccept.java
+++ b/accord-core/src/main/java/accord/messages/PreAccept.java
@@ -20,19 +20,29 @@ package accord.messages;
 
 import java.util.List;
 import java.util.Objects;
+import javax.annotation.Nullable;
 
-import accord.local.*;
-import accord.local.SafeCommandStore.TestKind;
-
+import accord.local.Command;
+import accord.local.Commands;
 import accord.local.Node.Id;
+import accord.local.SafeCommand;
+import accord.local.SafeCommandStore;
+import accord.local.SafeCommandStore.TestKind;
 import accord.messages.TxnRequest.WithUnsynced;
+import accord.primitives.Deps;
+import accord.primitives.EpochSupplier;
+import accord.primitives.FullRoute;
+import accord.primitives.PartialDeps;
+import accord.primitives.PartialRoute;
+import accord.primitives.PartialTxn;
+import accord.primitives.Ranges;
+import accord.primitives.Seekables;
+import accord.primitives.Timestamp;
+import accord.primitives.Txn;
+import accord.primitives.TxnId;
 import accord.topology.Shard;
 import accord.topology.Topologies;
 
-import javax.annotation.Nullable;
-
-import accord.primitives.*;
-
 import static accord.local.SafeCommandStore.TestDep.ANY_DEPS;
 import static accord.local.SafeCommandStore.TestTimestamp.STARTED_BEFORE;
 import static accord.primitives.Txn.Kind.ExclusiveSyncPoint;
@@ -147,8 +157,7 @@ public class PreAccept extends 
WithUnsynced<PreAccept.PreAcceptReply> implements
     @Override
     public void accept(PreAcceptReply reply, Throwable failure)
     {
-        // TODO (required, error handling): communicate back the failure
-        node.reply(replyTo, replyContext, reply);
+        node.reply(replyTo, replyContext, reply, failure);
     }
 
     @Override
diff --git a/accord-core/src/main/java/accord/messages/ReadData.java 
b/accord-core/src/main/java/accord/messages/ReadData.java
index 618e1479..66b52347 100644
--- a/accord-core/src/main/java/accord/messages/ReadData.java
+++ b/accord-core/src/main/java/accord/messages/ReadData.java
@@ -21,21 +21,20 @@ package accord.messages;
 import java.util.BitSet;
 import javax.annotation.Nullable;
 
-import accord.api.Data;
-import accord.primitives.Participants;
-import accord.topology.Topologies;
-import accord.utils.Invariants;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import accord.api.Data;
 import accord.local.CommandStore;
 import accord.local.Node;
 import accord.local.SafeCommandStore;
 import accord.primitives.PartialTxn;
+import accord.primitives.Participants;
 import accord.primitives.Ranges;
 import accord.primitives.Timestamp;
 import accord.primitives.TxnId;
+import accord.topology.Topologies;
+import accord.utils.Invariants;
 
 import static accord.messages.MessageType.READ_RSP;
 import static accord.messages.TxnRequest.computeWaitForEpoch;
@@ -71,7 +70,7 @@ public abstract class ReadData extends 
AbstractEpochRequest<ReadData.ReadNack>
 
     protected abstract void cancel();
     protected abstract long executeAtEpoch();
-    protected abstract void reply(@Nullable Ranges unavailable, @Nullable Data 
data);
+    protected abstract void reply(@Nullable Ranges unavailable, @Nullable Data 
data, @Nullable Throwable fail);
 
     @Override
     public long waitForEpoch()
@@ -99,12 +98,12 @@ public abstract class ReadData extends 
AbstractEpochRequest<ReadData.ReadNack>
     {
         if (reply != null)
         {
-            node.reply(replyTo, replyContext, reply);
+            node.reply(replyTo, replyContext, reply, failure);
         }
         else if (failure != null)
         {
             // TODO (expected, testing): test
-            node.reply(replyTo, replyContext, ReadNack.Error);
+            node.reply(replyTo, replyContext, null, failure);
             data = null;
             // TODO (expected, exceptions): probably a better way to handle 
this, as might not be uncaught
             node.agent().onUncaughtException(failure);
@@ -129,7 +128,7 @@ public abstract class ReadData extends 
AbstractEpochRequest<ReadData.ReadNack>
         // and prevents races where we respond before dispatching all the 
required reads (if the reads are
         // completing faster than the reads can be setup on all required 
shards)
         if (-1 == --waitingOnCount)
-            reply(this.unavailable, data);
+            reply(this.unavailable, data, null);
     }
 
     protected synchronized void readComplete(CommandStore commandStore, 
@Nullable Data result, @Nullable Ranges unavailable)
@@ -153,7 +152,8 @@ public abstract class ReadData extends 
AbstractEpochRequest<ReadData.ReadNack>
             {
                 // TODO (expected, exceptions): should send exception to 
client, and consistency handle/propagate locally
                 logger.trace("{}: read failed for {}: {}", txnId, unsafeStore, 
throwable);
-                node.reply(replyTo, replyContext, ReadNack.Error);
+                node.reply(replyTo, replyContext, null, throwable);
+                cancel();
             }
             else
                 readComplete(unsafeStore, next, unavailable);
@@ -173,7 +173,7 @@ public abstract class ReadData extends 
AbstractEpochRequest<ReadData.ReadNack>
 
     public enum ReadNack implements ReadReply
     {
-        Invalid, NotCommitted, Redundant, Error;
+        Invalid, NotCommitted, Redundant;
 
         @Override
         public String toString()
diff --git a/accord-core/src/main/java/accord/messages/ReadTxnData.java 
b/accord-core/src/main/java/accord/messages/ReadTxnData.java
index b34e9aeb..4e31910c 100644
--- a/accord-core/src/main/java/accord/messages/ReadTxnData.java
+++ b/accord-core/src/main/java/accord/messages/ReadTxnData.java
@@ -199,7 +199,7 @@ public class ReadTxnData extends ReadData implements 
Command.TransientListener,
         if (state == State.PENDING)
         {
             state = State.OBSOLETE;
-            node.reply(replyTo, replyContext, Redundant);
+            node.reply(replyTo, replyContext, Redundant, null);
         }
     }
 
@@ -231,18 +231,20 @@ public class ReadTxnData extends ReadData implements 
Command.TransientListener,
     }
 
     @Override
-    protected void reply(@Nullable Ranges unavailable, @Nullable Data data)
+    protected void reply(@Nullable Ranges unavailable, @Nullable Data data, 
@Nullable Throwable fail)
     {
         switch (state)
         {
             case RETURNED:
-                throw new IllegalStateException("ReadOk was sent, yet ack 
called again");
+                throw new IllegalStateException("ReadOk was sent, yet ack 
called again", fail);
             case OBSOLETE:
                 logger.debug("After the read completed for txn {}, the result 
was marked obsolete", txnId);
+                if (fail != null)
+                    node.agent().onUncaughtException(fail);
                 break;
             case PENDING:
                 state = State.RETURNED;
-                node.reply(replyTo, replyContext, new ReadOk(unavailable, 
data));
+                node.reply(replyTo, replyContext, fail == null ? new 
ReadOk(unavailable, data) : null, fail);
                 break;
             default:
                 throw new AssertionError("Unknown state: " + state);
diff --git a/accord-core/src/main/java/accord/messages/Reply.java 
b/accord-core/src/main/java/accord/messages/Reply.java
index e82c4f99..e1fc7e07 100644
--- a/accord-core/src/main/java/accord/messages/Reply.java
+++ b/accord-core/src/main/java/accord/messages/Reply.java
@@ -18,7 +18,28 @@
 
 package accord.messages;
 
+import javax.annotation.Nonnull;
+
+import static accord.messages.MessageType.FAILURE_RSP;
+
 public interface Reply extends Message
 {
     default boolean isFinal() { return true; }
+
+    class FailureReply implements Reply
+    {
+        @Nonnull
+        public final Throwable failure;
+
+        public FailureReply(@Nonnull Throwable failure)
+        {
+            this.failure = failure;
+        }
+
+        @Override
+        public MessageType type()
+        {
+            return FAILURE_RSP;
+        }
+    }
 }
diff --git a/accord-core/src/main/java/accord/messages/WaitOnCommit.java 
b/accord-core/src/main/java/accord/messages/WaitOnCommit.java
index 5ec41e0b..ca52b693 100644
--- a/accord-core/src/main/java/accord/messages/WaitOnCommit.java
+++ b/accord-core/src/main/java/accord/messages/WaitOnCommit.java
@@ -20,16 +20,21 @@ package accord.messages;
 
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 
-import accord.local.*;
-import accord.local.Node.Id;
-import accord.primitives.*;
-import accord.utils.MapReduceConsume;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static accord.local.SaveStatus.LocalExecution.WaitingToExecute;
-
+import accord.local.Command;
+import accord.local.Node;
+import accord.local.Node.Id;
+import accord.local.PreLoadContext;
+import accord.local.SafeCommand;
+import accord.local.SafeCommandStore;
+import accord.primitives.Participants;
+import accord.primitives.TxnId;
 import accord.topology.Topology;
+import accord.utils.MapReduceConsume;
+
+import static accord.local.SaveStatus.LocalExecution.WaitingToExecute;
 
 public class WaitOnCommit implements Request, 
MapReduceConsume<SafeCommandStore, Void>, PreLoadContext, 
Command.TransientListener
 {
@@ -141,13 +146,30 @@ public class WaitOnCommit implements Request, 
MapReduceConsume<SafeCommandStore,
     @Override
     public void accept(Void result, Throwable failure)
     {
-        ack();
+        if (failure != null)
+        {
+            while (true)
+            {
+                int initialValue = waitingOnUpdater.get(this);
+                if (initialValue == -1)
+                {
+                    node.agent().onUncaughtException(new 
IllegalStateException("Had error in WaitOnCommit, but already replied so can't 
send failure response", failure));
+                    break;
+                }
+                if (waitingOnUpdater.compareAndSet(this, initialValue, -1))
+                    node.reply(replyTo, replyContext, null, failure);
+            }
+        }
+        else
+        {
+            ack();
+        }
     }
 
     private void ack()
     {
         if (waitingOnUpdater.decrementAndGet(this) == -1)
-            node.reply(replyTo, replyContext, WaitOnCommitOk.INSTANCE);
+            node.reply(replyTo, replyContext, WaitOnCommitOk.INSTANCE, null);
     }
 
     @Override
diff --git a/accord-core/src/main/java/accord/messages/WaitUntilApplied.java 
b/accord-core/src/main/java/accord/messages/WaitUntilApplied.java
index 06905dda..682c33b4 100644
--- a/accord-core/src/main/java/accord/messages/WaitUntilApplied.java
+++ b/accord-core/src/main/java/accord/messages/WaitUntilApplied.java
@@ -193,7 +193,7 @@ public class WaitUntilApplied extends ReadData implements 
Command.TransientListe
             return;
 
         isInvalid = true;
-        node.reply(replyTo, replyContext, Invalid);
+        node.reply(replyTo, replyContext, Invalid, null);
     }
 
     synchronized void sendTruncated()
@@ -202,7 +202,7 @@ public class WaitUntilApplied extends ReadData implements 
Command.TransientListe
             return;
 
         isInvalid = true;
-        node.reply(replyTo, replyContext, Redundant);
+        node.reply(replyTo, replyContext, Redundant, null);
     }
 
     void applied(SafeCommandStore safeStore, SafeCommand safeCommand)
@@ -223,12 +223,13 @@ public class WaitUntilApplied extends ReadData implements 
Command.TransientListe
     }
 
     @Override
-    protected void reply(@Nullable Ranges unavailable, @Nullable Data data)
+    protected void reply(@Nullable Ranges unavailable, @Nullable Data data, 
@Nullable Throwable fail)
     {
         if (isInvalid)
             return;
 
-        node.reply(replyTo, replyContext, new ReadOk(unavailable, data));
+        // data can be null so send the failure response if a failure is 
present
+        node.reply(replyTo, replyContext, fail == null ? new 
ReadOk(unavailable, data) : null, fail);
     }
 
     private void removeListener(SafeCommandStore safeStore, TxnId txnId)
diff --git a/accord-core/src/main/java/accord/utils/Utils.java 
b/accord-core/src/main/java/accord/utils/Utils.java
index 3ce0cf1e..696edb94 100644
--- a/accord-core/src/main/java/accord/utils/Utils.java
+++ b/accord-core/src/main/java/accord/utils/Utils.java
@@ -18,13 +18,23 @@
 
 package accord.utils;
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.NavigableMap;
+import java.util.NavigableSet;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.SortedSet;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.function.IntFunction;
+
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.ImmutableSortedMap;
 import com.google.common.collect.ImmutableSortedSet;
 
-import java.util.*;
-import java.util.function.IntFunction;
-
 // TODO (low priority): remove when jdk8 support is dropped
 public class Utils
 {
diff --git a/accord-core/src/main/java/accord/utils/async/AsyncResults.java 
b/accord-core/src/main/java/accord/utils/async/AsyncResults.java
index dfa7fe08..5773e685 100644
--- a/accord-core/src/main/java/accord/utils/async/AsyncResults.java
+++ b/accord-core/src/main/java/accord/utils/async/AsyncResults.java
@@ -18,9 +18,6 @@
 
 package accord.utils.async;
 
-import accord.api.VisibleForImplementation;
-import accord.utils.Invariants;
-
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.Callable;
@@ -28,6 +25,9 @@ import 
java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 import java.util.function.BiConsumer;
 import java.util.function.Function;
 
+import accord.api.VisibleForImplementation;
+import accord.utils.Invariants;
+
 public class AsyncResults
 {
     public static final AsyncResult<Void> SUCCESS_VOID = success(null);
@@ -104,7 +104,7 @@ public class AsyncResults
             }
         }
 
-        boolean trySetResult(V result, Throwable failure)
+        protected boolean trySetResult(V result, Throwable failure)
         {
             return trySetResult(new Result<>(result, failure));
         }
@@ -323,7 +323,7 @@ public class AsyncResults
     @VisibleForImplementation
     public static class RunnableResult<V> extends AbstractResult<V> implements 
Runnable
     {
-        private final Callable<V> callable;
+        protected final Callable<V> callable;
 
         public RunnableResult(Callable<V> callable)
         {
@@ -333,14 +333,19 @@ public class AsyncResults
         @Override
         public void run()
         {
+            // There are two different type of exceptions: user function 
throws, listener throws.  To make sure this is clear,
+            // make sure to catch the exception from the user function and set 
as failed, and let the listener exceptions bubble up.
+            V call;
             try
             {
-                trySetResult(callable.call(), null);
+                call = callable.call();
             }
             catch (Throwable t)
             {
                 trySetResult(null, t);
+                return;
             }
+            trySetResult(call, null);
         }
     }
 
diff --git 
a/accord-core/src/test/java/accord/burn/BurnTestConfigurationService.java 
b/accord-core/src/test/java/accord/burn/BurnTestConfigurationService.java
index 4c9ce6c0..01ea96d9 100644
--- a/accord-core/src/test/java/accord/burn/BurnTestConfigurationService.java
+++ b/accord-core/src/test/java/accord/burn/BurnTestConfigurationService.java
@@ -18,16 +18,6 @@
 
 package accord.burn;
 
-import accord.api.TestableConfigurationService;
-import accord.local.AgentExecutor;
-import accord.impl.AbstractConfigurationService;
-import accord.primitives.Ranges;
-import accord.utils.RandomSource;
-import accord.local.Node;
-import accord.messages.*;
-import accord.topology.Topology;
-import accord.utils.async.AsyncResults;
-
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -35,6 +25,20 @@ import java.util.Map;
 import java.util.function.Function;
 import java.util.function.Supplier;
 
+import accord.api.TestableConfigurationService;
+import accord.impl.AbstractConfigurationService;
+import accord.local.AgentExecutor;
+import accord.local.Node;
+import accord.messages.Callback;
+import accord.messages.MessageType;
+import accord.messages.Reply;
+import accord.messages.ReplyContext;
+import accord.messages.Request;
+import accord.primitives.Ranges;
+import accord.topology.Topology;
+import accord.utils.RandomSource;
+import accord.utils.async.AsyncResults;
+
 public class BurnTestConfigurationService extends 
AbstractConfigurationService.Minimal implements TestableConfigurationService
 {
     private final AgentExecutor executor;
@@ -66,7 +70,7 @@ public class BurnTestConfigurationService extends 
AbstractConfigurationService.M
         public void process(Node on, Node.Id from, ReplyContext replyContext)
         {
             Topology topology = on.configService().getTopologyForEpoch(epoch);
-            on.reply(from, replyContext, new FetchTopologyReply(topology));
+            on.reply(from, replyContext, new FetchTopologyReply(topology), 
null);
         }
 
         @Override
diff --git a/accord-core/src/test/java/accord/impl/basic/NodeSink.java 
b/accord-core/src/test/java/accord/impl/basic/NodeSink.java
index 20f1705e..e14fb523 100644
--- a/accord-core/src/test/java/accord/impl/basic/NodeSink.java
+++ b/accord-core/src/test/java/accord/impl/basic/NodeSink.java
@@ -23,16 +23,17 @@ import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 
+import accord.api.MessageSink;
 import accord.local.AgentExecutor;
-import accord.messages.SafeCallback;
-import accord.utils.RandomSource;
 import accord.local.Node;
 import accord.local.Node.Id;
-import accord.api.MessageSink;
 import accord.messages.Callback;
 import accord.messages.Reply;
+import accord.messages.Reply.FailureReply;
 import accord.messages.ReplyContext;
 import accord.messages.Request;
+import accord.messages.SafeCallback;
+import accord.utils.RandomSource;
 
 import static accord.impl.basic.Packet.SENTINEL_MESSAGE_ID;
 
@@ -82,4 +83,10 @@ public class NodeSink implements MessageSink
     {
         parent.add(self, replyToNode, Packet.getMessageId(replyContext), 
reply);
     }
+
+    @Override
+    public void replyWithUnknownFailure(Id replyingToNode, ReplyContext 
replyContext, Throwable failure)
+    {
+        reply(replyingToNode, replyContext, new FailureReply(failure));
+    }
 }
diff --git 
a/accord-core/src/test/java/accord/impl/basic/SimulatedDelayedExecutorService.java
 
b/accord-core/src/test/java/accord/impl/basic/SimulatedDelayedExecutorService.java
index 63fbae83..2f3bd1b1 100644
--- 
a/accord-core/src/test/java/accord/impl/basic/SimulatedDelayedExecutorService.java
+++ 
b/accord-core/src/test/java/accord/impl/basic/SimulatedDelayedExecutorService.java
@@ -98,7 +98,7 @@ public class SimulatedDelayedExecutorService extends 
TaskExecutorService impleme
                 // run without setting the result
                 try
                 {
-                    fn.call();
+                    callable.call();
                     long nowMillis = pending.nowInMillis();
                     if (periodMillis > 0)
                     {
@@ -117,7 +117,7 @@ public class SimulatedDelayedExecutorService extends 
TaskExecutorService impleme
                 }
                 catch (Throwable t)
                 {
-                    setFailure(t);
+                    trySetResult(null, t);
                 }
             }
         }
diff --git 
a/accord-core/src/test/java/accord/impl/basic/TaskExecutorService.java 
b/accord-core/src/test/java/accord/impl/basic/TaskExecutorService.java
index 059bc635..0e733095 100644
--- a/accord-core/src/test/java/accord/impl/basic/TaskExecutorService.java
+++ b/accord-core/src/test/java/accord/impl/basic/TaskExecutorService.java
@@ -32,26 +32,11 @@ import accord.utils.async.AsyncResults;
 
 public abstract class TaskExecutorService extends AbstractExecutorService 
implements AgentExecutor
 {
-    public static class Task<T> extends AsyncResults.SettableResult<T> 
implements Pending, RunnableFuture<T>
+    public static class Task<T> extends AsyncResults.RunnableResult<T> 
implements Pending, RunnableFuture<T>
     {
-        protected final Callable<T> fn;
-
         public Task(Callable<T> fn)
         {
-            this.fn = fn;
-        }
-
-        @Override
-        public void run()
-        {
-            try
-            {
-                setSuccess(fn.call());
-            }
-            catch (Throwable t)
-            {
-                setFailure(t);
-            }
+            super(fn);
         }
 
         @Override
diff --git a/accord-core/src/test/java/accord/impl/list/ListAgent.java 
b/accord-core/src/test/java/accord/impl/list/ListAgent.java
index a0b55d35..5fb3924c 100644
--- a/accord-core/src/test/java/accord/impl/list/ListAgent.java
+++ b/accord-core/src/test/java/accord/impl/list/ListAgent.java
@@ -20,14 +20,20 @@ package accord.impl.list;
 
 import java.util.function.Consumer;
 
+import accord.api.Agent;
+import accord.api.Result;
 import accord.impl.mock.Network;
 import accord.local.Command;
 import accord.local.Node;
-import accord.api.Agent;
-import accord.api.Result;
-import accord.primitives.*;
+import accord.primitives.Keys;
+import accord.primitives.Ranges;
+import accord.primitives.Seekables;
+import accord.primitives.Timestamp;
+import accord.primitives.Txn;
+import accord.primitives.TxnId;
 
 import static accord.local.Node.Id.NONE;
+import static accord.utils.Invariants.checkState;
 import static com.google.common.base.Functions.identity;
 
 public class ListAgent implements Agent
@@ -46,11 +52,16 @@ public class ListAgent implements Agent
     @Override
     public void onRecover(Node node, Result success, Throwable fail)
     {
+        if (fail != null)
+        {
+            checkState(success == null, "fail (%s) and success (%s) are both 
not null", fail, success);
+            // We don't really process errors for Recover here even though it 
is provided in the interface
+        }
         if (success != null)
         {
             ListResult result = (ListResult) success;
             if (result.requestId > Integer.MIN_VALUE)
-                node.reply(result.client, 
Network.replyCtxFor(result.requestId), result);
+                node.reply(result.client, 
Network.replyCtxFor(result.requestId), result, null);
         }
     }
 
diff --git a/accord-core/src/test/java/accord/impl/list/ListRequest.java 
b/accord-core/src/test/java/accord/impl/list/ListRequest.java
index b0874c61..c6127a30 100644
--- a/accord-core/src/test/java/accord/impl/list/ListRequest.java
+++ b/accord-core/src/test/java/accord/impl/list/ListRequest.java
@@ -35,9 +35,9 @@ import accord.messages.CheckStatus.CheckStatusOk;
 import accord.messages.CheckStatus.IncludeInfo;
 import accord.messages.MessageType;
 import accord.messages.ReplyContext;
+import accord.messages.Request;
 import accord.primitives.RoutingKeys;
 import accord.primitives.Txn;
-import accord.messages.Request;
 import accord.primitives.TxnId;
 
 import static accord.local.Status.Phase.Cleanup;
@@ -107,41 +107,36 @@ public class ListRequest implements Request
         @Override
         public void accept(Result success, Throwable fail)
         {
-            // TODO (desired, testing): error handling
-            if (success != null)
-            {
-                node.reply(client, replyContext, (ListResult) success);
-            }
-            else if (fail instanceof CoordinationFailed)
+            if (fail instanceof CoordinationFailed)
             {
                 RoutingKey homeKey = ((CoordinationFailed) fail).homeKey();
                 TxnId txnId = ((CoordinationFailed) fail).txnId();
                 if (fail instanceof Invalidated)
                 {
-                    node.reply(client, replyContext, new ListResult(client, 
((Packet) replyContext).requestId, txnId, null, null, null, null));
+                    node.reply(client, replyContext, new ListResult(client, 
((Packet) replyContext).requestId, txnId, null, null, null, null), null);
                     return;
                 }
 
-                node.reply(client, replyContext, new ListResult(client, 
((Packet)replyContext).requestId, txnId, null, null, new int[0][], null));
+                node.reply(client, replyContext, new ListResult(client, 
((Packet)replyContext).requestId, txnId, null, null, new int[0][], null), null);
                 ((Cluster)node.scheduler()).onDone(() -> {
                     node.commandStores()
                         .select(homeKey)
                         .execute(() -> CheckOnResult.checkOnResult(node, 
txnId, homeKey, (s, f) -> {
                             if (f != null)
                             {
-                                node.reply(client, replyContext, new 
ListResult(client, ((Packet) replyContext).requestId, txnId, null, null, f 
instanceof Truncated ? new int[2][] : new int[3][], null));
+                                node.reply(client, replyContext, new 
ListResult(client, ((Packet) replyContext).requestId, txnId, null, null, f 
instanceof Truncated ? new int[2][] : new int[3][], null), null);
                                 return;
                             }
                             switch (s)
                             {
                                 case Truncated:
-                                    node.reply(client, replyContext, new 
ListResult(client, ((Packet) replyContext).requestId, txnId, null, null, new 
int[2][], null));
+                                    node.reply(client, replyContext, new 
ListResult(client, ((Packet) replyContext).requestId, txnId, null, null, new 
int[2][], null), null);
                                     break;
                                 case Invalidated:
-                                    node.reply(client, replyContext, new 
ListResult(client, ((Packet) replyContext).requestId, txnId, null, null, null, 
null));
+                                    node.reply(client, replyContext, new 
ListResult(client, ((Packet) replyContext).requestId, txnId, null, null, null, 
null), null);
                                     break;
                                 case Lost:
-                                    node.reply(client, replyContext, new 
ListResult(client, ((Packet) replyContext).requestId, txnId, null, null, new 
int[1][], null));
+                                    node.reply(client, replyContext, new 
ListResult(client, ((Packet) replyContext).requestId, txnId, null, null, new 
int[1][], null), null);
                                     break;
                                 case Other:
                                     // currently caught elsewhere in response 
tracking, but might help to throw an exception here
@@ -149,6 +144,10 @@ public class ListRequest implements Request
                         }));
                 });
             }
+            else
+            {
+                node.reply(client, replyContext, (ListResult) success, fail);
+            }
         }
     }
 
diff --git a/accord-core/src/test/java/accord/impl/mock/SimpleMessageSink.java 
b/accord-core/src/test/java/accord/impl/mock/SimpleMessageSink.java
index e33314d2..ba0502ef 100644
--- a/accord-core/src/test/java/accord/impl/mock/SimpleMessageSink.java
+++ b/accord-core/src/test/java/accord/impl/mock/SimpleMessageSink.java
@@ -18,11 +18,13 @@
 
 package accord.impl.mock;
 
+import accord.api.MessageSink;
 import accord.local.AgentExecutor;
 import accord.local.Node;
-import accord.api.MessageSink;
+import accord.local.Node.Id;
 import accord.messages.Callback;
 import accord.messages.Reply;
+import accord.messages.Reply.FailureReply;
 import accord.messages.ReplyContext;
 import accord.messages.Request;
 
@@ -54,4 +56,10 @@ public class SimpleMessageSink implements MessageSink
     {
         network.reply(node, replyingToNode, 
Network.getMessageId(replyContext), reply);
     }
+
+    @Override
+    public void replyWithUnknownFailure(Id replyingToNode, ReplyContext 
replyContext, Throwable failure)
+    {
+        network.reply(node, replyingToNode, 
Network.getMessageId(replyContext), new FailureReply(failure));
+    }
 }
diff --git a/accord-core/src/test/java/accord/utils/MessageTask.java 
b/accord-core/src/test/java/accord/utils/MessageTask.java
index d1064e31..528a6e4b 100644
--- a/accord-core/src/test/java/accord/utils/MessageTask.java
+++ b/accord-core/src/test/java/accord/utils/MessageTask.java
@@ -18,15 +18,24 @@
 
 package accord.utils;
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+
+import com.google.common.collect.ImmutableList;
+
 import accord.local.AgentExecutor;
 import accord.local.Node;
-import accord.messages.*;
+import accord.messages.Callback;
+import accord.messages.MessageType;
+import accord.messages.Reply;
+import accord.messages.ReplyContext;
+import accord.messages.Request;
 import accord.utils.async.AsyncResults;
-import com.google.common.collect.ImmutableList;
-
-import java.util.*;
-import java.util.function.BiConsumer;
-import java.util.function.Consumer;
 
 /**
  * Message task that will continue sending messages to a set of nodes until all
@@ -88,7 +97,7 @@ public class MessageTask extends 
AsyncResults.SettableResult<Void> implements Ru
         @Override
         public void process(Node on, Node.Id from, ReplyContext replyContext)
         {
-            process.process(on, from, success -> on.reply(from, replyContext, 
success ? SUCCESS : FAILURE));
+            process.process(on, from, success -> on.reply(from, replyContext, 
success ? SUCCESS : FAILURE, null));
         }
 
         @Override
diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java 
b/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java
index 51da1b3c..cd7741be 100644
--- a/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java
@@ -51,6 +51,7 @@ import accord.local.NodeTimeService;
 import accord.local.ShardDistributor;
 import accord.messages.Callback;
 import accord.messages.Reply;
+import accord.messages.Reply.FailureReply;
 import accord.messages.ReplyContext;
 import accord.messages.Request;
 import accord.messages.SafeCallback;
@@ -120,6 +121,12 @@ public class Cluster implements Scheduler
             long replyToMessage = ((Packet) replyContext).body.msg_id;
             parent.add(self, replyToNode, replyToMessage, reply);
         }
+
+        @Override
+        public void replyWithUnknownFailure(Id replyingToNode, ReplyContext 
replyContext, Throwable failure)
+        {
+            reply(replyingToNode, replyContext, new FailureReply(failure));
+        }
     }
 
     final Function<Id, Node> lookup;
diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/Error.java 
b/accord-maelstrom/src/main/java/accord/maelstrom/Error.java
index d0691180..dad08b0d 100644
--- a/accord-maelstrom/src/main/java/accord/maelstrom/Error.java
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/Error.java
@@ -22,8 +22,8 @@ import java.io.IOException;
 
 import accord.maelstrom.Packet.Type;
 import accord.messages.MessageType;
-import com.google.gson.stream.JsonWriter;
 import accord.messages.Reply;
+import com.google.gson.stream.JsonWriter;
 
 public class Error extends Body implements Reply
 {
diff --git 
a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromAgent.java 
b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromAgent.java
index 34f34d1f..30f2a0ff 100644
--- a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromAgent.java
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromAgent.java
@@ -18,13 +18,20 @@
 
 package accord.maelstrom;
 
-import accord.local.Command;
-import accord.local.Node;
+import java.util.concurrent.TimeUnit;
+
 import accord.api.Agent;
 import accord.api.Result;
-import accord.primitives.*;
+import accord.local.Command;
+import accord.local.Node;
+import accord.primitives.Keys;
+import accord.primitives.Ranges;
+import accord.primitives.Seekables;
+import accord.primitives.Timestamp;
+import accord.primitives.Txn;
+import accord.primitives.TxnId;
 
-import java.util.concurrent.TimeUnit;
+import static accord.utils.Invariants.checkState;
 
 public class MaelstromAgent implements Agent
 {
@@ -33,10 +40,15 @@ public class MaelstromAgent implements Agent
     @Override
     public void onRecover(Node node, Result success, Throwable fail)
     {
+        if (fail != null)
+        {
+            checkState(success == null, "fail (%s) and success (%s) are both 
not null", fail, success);
+            // We don't really process errors for Recover here even though it 
is provided in the interface
+        }
         if (success != null)
         {
             MaelstromResult result = (MaelstromResult) success;
-            node.reply(result.client, 
MaelstromReplyContext.contextFor(result.requestId), new 
MaelstromReply(result.requestId, result));
+            node.reply(result.client, 
MaelstromReplyContext.contextFor(result.requestId), new 
MaelstromReply(result.requestId, result), null);
         }
     }
 
diff --git 
a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromRequest.java 
b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromRequest.java
index e81f539b..f9aad092 100644
--- a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromRequest.java
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromRequest.java
@@ -23,17 +23,18 @@ import java.util.NavigableSet;
 import java.util.TreeSet;
 
 import accord.api.Key;
+import accord.local.Node;
+import accord.local.Node.Id;
+import accord.maelstrom.Packet.Type;
 import accord.messages.MessageType;
+import accord.messages.Reply;
 import accord.messages.ReplyContext;
+import accord.messages.Request;
 import accord.primitives.Keys;
+import accord.primitives.Txn;
 import com.google.gson.stream.JsonReader;
 import com.google.gson.stream.JsonToken;
 import com.google.gson.stream.JsonWriter;
-import accord.local.Node;
-import accord.local.Node.Id;
-import accord.primitives.Txn;
-import accord.maelstrom.Packet.Type;
-import accord.messages.Request;
 
 public class MaelstromRequest extends Body implements Request
 {
@@ -49,8 +50,8 @@ public class MaelstromRequest extends Body implements Request
     public void process(Node node, Id client, ReplyContext replyContext)
     {
         node.coordinate(txn).addCallback((success, fail) -> {
-            if (success != null) node.reply(client, replyContext, new 
MaelstromReply(MaelstromReplyContext.messageIdFor(replyContext), 
(MaelstromResult) success));
-//            else node.reply(client, messageId, new Error(messageId, 13, 
fail.getMessage()));
+            Reply reply = success != null ? new 
MaelstromReply(MaelstromReplyContext.messageIdFor(replyContext), 
(MaelstromResult) success) : null;
+            node.reply(client, replyContext, reply, fail);
         });
     }
 
diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/Main.java 
b/accord-maelstrom/src/main/java/accord/maelstrom/Main.java
index 0d96058e..1600e03a 100644
--- a/accord-maelstrom/src/main/java/accord/maelstrom/Main.java
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/Main.java
@@ -44,6 +44,7 @@ import accord.local.ShardDistributor;
 import accord.maelstrom.Packet.Type;
 import accord.messages.Callback;
 import accord.messages.Reply;
+import accord.messages.Reply.FailureReply;
 import accord.messages.ReplyContext;
 import accord.messages.Request;
 import accord.topology.Topology;
@@ -129,6 +130,12 @@ public class Main
         {
             send(new Packet(self, replyToNode, 
MaelstromReplyContext.messageIdFor(replyContext), reply));
         }
+
+        @Override
+        public void replyWithUnknownFailure(Id replyingToNode, ReplyContext 
replyContext, Throwable failure)
+        {
+            reply(replyingToNode, replyContext, new FailureReply(failure));
+        }
     }
 
     public static void listen(TopologyFactory topologyFactory, InputStream 
stdin, PrintStream out, PrintStream err) throws IOException


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to