This is an automated email from the ASF dual-hosted git repository.

dcapwell pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 6e3b6cbe CEP-15 (Accord) accord.messages.Defer rejects Recurrent retry 
of Commit
6e3b6cbe is described below

commit 6e3b6cbef849cedbae2be30fe1822045c2271dc4
Author: David Capwell <dcapw...@gmail.com>
AuthorDate: Fri Apr 7 13:59:08 2023 -0700

    CEP-15 (Accord) accord.messages.Defer rejects Recurrent retry of Commit
    
    patch by David Capwell; reviewed by Blake Eggleston for CASSANDRA-18378
---
 accord-core/src/main/java/accord/messages/Commit.java |  7 ++++++-
 accord-core/src/main/java/accord/messages/Defer.java  | 12 ++++++++----
 2 files changed, 14 insertions(+), 5 deletions(-)

diff --git a/accord-core/src/main/java/accord/messages/Commit.java 
b/accord-core/src/main/java/accord/messages/Commit.java
index ce010aef..d034cc0f 100644
--- a/accord-core/src/main/java/accord/messages/Commit.java
+++ b/accord-core/src/main/java/accord/messages/Commit.java
@@ -176,7 +176,7 @@ public class Commit extends TxnRequest<ReadNack>
     }
 
     @Override
-    public void accept(ReadNack reply, Throwable failure)
+    public synchronized void accept(ReadNack reply, Throwable failure)
     {
         if (failure != null)
         {
@@ -188,6 +188,11 @@ public class Commit extends TxnRequest<ReadNack>
             node.reply(replyTo, replyContext, reply);
         else if (read != null)
             read.process(node, replyTo, replyContext);
+        if (defer != null)
+        {
+            defer.ack();
+            defer = null;
+        }
     }
 
     @Override
diff --git a/accord-core/src/main/java/accord/messages/Defer.java 
b/accord-core/src/main/java/accord/messages/Defer.java
index 1e93b036..8ca0a487 100644
--- a/accord-core/src/main/java/accord/messages/Defer.java
+++ b/accord-core/src/main/java/accord/messages/Defer.java
@@ -57,7 +57,7 @@ class Defer implements CommandListener
         this.request = request;
     }
 
-    void add(SafeCommandStore safeStore, SafeCommand safeCommand, CommandStore 
commandStore)
+    synchronized void add(SafeCommandStore safeStore, SafeCommand safeCommand, 
CommandStore commandStore)
     {
         if (isDone)
             throw new IllegalStateException("Recurrent retry of " + request);
@@ -68,7 +68,7 @@ class Defer implements CommandListener
     }
 
     @Override
-    public void onChange(SafeCommandStore safeStore, SafeCommand safeCommand)
+    public synchronized void onChange(SafeCommandStore safeStore, SafeCommand 
safeCommand)
     {
         Command command = safeCommand.current();
         Ready ready = waitUntil.apply(command);
@@ -80,10 +80,14 @@ class Defer implements CommandListener
 
         int id = safeStore.commandStore().id();
         if (!waitingOn.contains(id))
-            throw new IllegalStateException();
+            throw new IllegalStateException("Not waiting on CommandStore " + 
id);
         waitingOn.remove(id);
 
-        if (0 == --waitingOnCount)
+        ack();
+    }
+
+    synchronized void ack() {
+        if (-1 == --waitingOnCount)
         {
             isDone = true;
             request.process();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to