This is an automated email from the ASF dual-hosted git repository. dcapwell pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git
The following commit(s) were added to refs/heads/trunk by this push: new 6e3b6cbe CEP-15 (Accord) accord.messages.Defer rejects Recurrent retry of Commit 6e3b6cbe is described below commit 6e3b6cbef849cedbae2be30fe1822045c2271dc4 Author: David Capwell <dcapw...@gmail.com> AuthorDate: Fri Apr 7 13:59:08 2023 -0700 CEP-15 (Accord) accord.messages.Defer rejects Recurrent retry of Commit patch by David Capwell; reviewed by Blake Eggleston for CASSANDRA-18378 --- accord-core/src/main/java/accord/messages/Commit.java | 7 ++++++- accord-core/src/main/java/accord/messages/Defer.java | 12 ++++++++---- 2 files changed, 14 insertions(+), 5 deletions(-) diff --git a/accord-core/src/main/java/accord/messages/Commit.java b/accord-core/src/main/java/accord/messages/Commit.java index ce010aef..d034cc0f 100644 --- a/accord-core/src/main/java/accord/messages/Commit.java +++ b/accord-core/src/main/java/accord/messages/Commit.java @@ -176,7 +176,7 @@ public class Commit extends TxnRequest<ReadNack> } @Override - public void accept(ReadNack reply, Throwable failure) + public synchronized void accept(ReadNack reply, Throwable failure) { if (failure != null) { @@ -188,6 +188,11 @@ public class Commit extends TxnRequest<ReadNack> node.reply(replyTo, replyContext, reply); else if (read != null) read.process(node, replyTo, replyContext); + if (defer != null) + { + defer.ack(); + defer = null; + } } @Override diff --git a/accord-core/src/main/java/accord/messages/Defer.java b/accord-core/src/main/java/accord/messages/Defer.java index 1e93b036..8ca0a487 100644 --- a/accord-core/src/main/java/accord/messages/Defer.java +++ b/accord-core/src/main/java/accord/messages/Defer.java @@ -57,7 +57,7 @@ class Defer implements CommandListener this.request = request; } - void add(SafeCommandStore safeStore, SafeCommand safeCommand, CommandStore commandStore) + synchronized void add(SafeCommandStore safeStore, SafeCommand safeCommand, CommandStore commandStore) { if (isDone) throw new IllegalStateException("Recurrent retry of " + request); @@ -68,7 +68,7 @@ class Defer implements CommandListener } @Override - public void onChange(SafeCommandStore safeStore, SafeCommand safeCommand) + public synchronized void onChange(SafeCommandStore safeStore, SafeCommand safeCommand) { Command command = safeCommand.current(); Ready ready = waitUntil.apply(command); @@ -80,10 +80,14 @@ class Defer implements CommandListener int id = safeStore.commandStore().id(); if (!waitingOn.contains(id)) - throw new IllegalStateException(); + throw new IllegalStateException("Not waiting on CommandStore " + id); waitingOn.remove(id); - if (0 == --waitingOnCount) + ack(); + } + + synchronized void ack() { + if (-1 == --waitingOnCount) { isDone = true; request.process(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org