This is an automated email from the ASF dual-hosted git repository. aleksey pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git
The following commit(s) were added to refs/heads/trunk by this push: new 1e508d34 CEP-15: Simplify handling of Insufficient replies from Commit and Apply 1e508d34 is described below commit 1e508d340935fef496f58606a14717bed59e8af4 Author: Aleksey Yeschenko <alek...@apache.org> AuthorDate: Fri Oct 13 15:47:32 2023 +0100 CEP-15: Simplify handling of Insufficient replies from Commit and Apply patch by Aleksey Yeschenko; reviewed by Benedict Elliott Smith for CASSANDRA-18928 --- .../src/main/java/accord/coordinate/Persist.java | 22 ++--- .../src/main/java/accord/messages/Commit.java | 15 --- .../src/main/java/accord/messages/Defer.java | 107 --------------------- 3 files changed, 11 insertions(+), 133 deletions(-) diff --git a/accord-core/src/main/java/accord/coordinate/Persist.java b/accord-core/src/main/java/accord/coordinate/Persist.java index 0607722f..9a132683 100644 --- a/accord-core/src/main/java/accord/coordinate/Persist.java +++ b/accord-core/src/main/java/accord/coordinate/Persist.java @@ -28,7 +28,6 @@ import accord.local.Node.Id; import accord.messages.Apply; import accord.messages.Apply.ApplyReply; import accord.messages.Callback; -import accord.messages.Commit; import accord.messages.InformDurable; import accord.primitives.*; import accord.topology.Topologies; @@ -37,7 +36,6 @@ import static accord.coordinate.tracking.RequestStatus.Success; import static accord.local.Status.Durability.Majority; import static accord.messages.Apply.executes; import static accord.messages.Apply.participates; -import static accord.messages.Commit.Kind.Maximal; public class Persist implements Callback<ApplyReply> { @@ -47,6 +45,8 @@ public class Persist implements Callback<ApplyReply> final Txn txn; final Timestamp executeAt; final Deps deps; + final Writes writes; + final Result result; final QuorumTracker tracker; final Set<Id> persistedOn; boolean isDone; @@ -60,7 +60,7 @@ public class Persist implements Callback<ApplyReply> public static void persist(Node node, Topologies executes, TxnId txnId, FullRoute<?> route, Txn txn, Timestamp executeAt, Deps deps, Writes writes, Result result) { Topologies participates = participates(node, route, txnId, executeAt, executes); - Persist persist = new Persist(node, executes, txnId, route, txn, executeAt, deps); + Persist persist = new Persist(node, executes, txnId, route, txn, executeAt, deps, writes, result); node.send(participates.nodes(), to -> Apply.applyMinimal(to, participates, executes, txnId, route, txn, executeAt, deps, writes, result), persist); } @@ -68,7 +68,7 @@ public class Persist implements Callback<ApplyReply> { Topologies executes = executes(node, route, executeAt); Topologies participates = participates(node, route, txnId, executeAt, executes); - Persist persist = new Persist(node, participates, txnId, route, txn, executeAt, deps); + Persist persist = new Persist(node, participates, txnId, route, txn, executeAt, deps, writes, result); node.send(participates.nodes(), to -> Apply.applyMaximal(to, participates, executes, txnId, route, txn, executeAt, deps, writes, result), persist); } @@ -76,19 +76,21 @@ public class Persist implements Callback<ApplyReply> { Topologies executes = executes(node, sendTo, executeAt); Topologies participates = participates(node, sendTo, txnId, executeAt, executes); - Persist persist = new Persist(node, participates, txnId, route, txn, executeAt, deps); + Persist persist = new Persist(node, participates, txnId, route, txn, executeAt, deps, writes, result); node.send(participates.nodes(), to -> Apply.applyMaximal(to, participates, executes, txnId, route, txn, executeAt, deps, writes, result), persist); } - private Persist(Node node, Topologies topologies, TxnId txnId, FullRoute<?> route, Txn txn, Timestamp executeAt, Deps deps) + private Persist(Node node, Topologies topologies, TxnId txnId, FullRoute<?> route, Txn txn, Timestamp executeAt, Deps deps, Writes writes, Result result) { this.node = node; this.txnId = txnId; + this.route = route; this.txn = txn; + this.executeAt = executeAt; this.deps = deps; - this.route = route; + this.writes = writes; + this.result = result; this.tracker = new QuorumTracker(topologies); - this.executeAt = executeAt; this.persistedOn = new HashSet<>(); } @@ -112,9 +114,7 @@ public class Persist implements Callback<ApplyReply> } break; case Insufficient: - Topologies topologies = node.topology().preciseEpochs(route, txnId.epoch(), executeAt.epoch()); - // TODO (easy, cleanup): use static method in Commit - node.send(from, new Commit(Maximal, from, topologies.forEpoch(txnId.epoch()), topologies, txnId, txn, route, null, executeAt, deps, false)); + Apply.sendMaximal(node, from, txnId, route, txn, executeAt, deps, writes, result); } } diff --git a/accord-core/src/main/java/accord/messages/Commit.java b/accord-core/src/main/java/accord/messages/Commit.java index 9d692e6a..97e67cc8 100644 --- a/accord-core/src/main/java/accord/messages/Commit.java +++ b/accord-core/src/main/java/accord/messages/Commit.java @@ -50,9 +50,6 @@ import accord.topology.Topologies; import accord.topology.Topology; import accord.utils.Invariants; -import static accord.local.Status.Committed; -import static accord.local.Status.Known.DefinitionOnly; - public class Commit extends TxnRequest<ReadNack> { private static final Logger logger = LoggerFactory.getLogger(Commit.class); @@ -72,8 +69,6 @@ public class Commit extends TxnRequest<ReadNack> public final @Nullable FullRoute<?> route; public final ReadTxnData read; - private transient Defer defer; - public enum Kind { Minimal, Maximal } // TODO (low priority, clarity): cleanup passing of topologies here - maybe fetch them afresh from Node? @@ -177,12 +172,7 @@ public class Commit extends TxnRequest<ReadNack> case Success: case Redundant: return null; - case Insufficient: - Invariants.checkState(!safeCommand.current().known().isDefinitionKnown()); - if (defer == null) - defer = new Defer(DefinitionOnly, Committed.minKnown, Commit.this); - defer.add(safeStore, safeCommand, safeStore.commandStore()); return ReadNack.NotCommitted; } } @@ -200,11 +190,6 @@ public class Commit extends TxnRequest<ReadNack> node.reply(replyTo, replyContext, reply, failure); else if (read != null) read.process(node, replyTo, replyContext); - if (defer != null) - { - defer.ack(); - defer = null; - } } @Override diff --git a/accord-core/src/main/java/accord/messages/Defer.java b/accord-core/src/main/java/accord/messages/Defer.java deleted file mode 100644 index acb80fe0..00000000 --- a/accord-core/src/main/java/accord/messages/Defer.java +++ /dev/null @@ -1,107 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package accord.messages; - -import java.util.function.Function; - -import accord.local.*; -import accord.local.Status.Known; -import accord.primitives.TxnId; -import accord.utils.Invariants; -import org.agrona.collections.IntHashSet; - -import static accord.messages.Defer.Ready.Expired; -import static accord.messages.Defer.Ready.No; -import static accord.messages.Defer.Ready.Yes; - -class Defer implements Command.TransientListener -{ - public enum Ready { No, Yes, Expired } - - final Function<Command, Ready> waitUntil; - final TxnRequest<?> request; - final IntHashSet waitingOn = new IntHashSet(); - int waitingOnCount; - boolean isDone; - - Defer(Known waitUntil, Known expireAt, TxnRequest<?> request) - { - this(command -> { - if (!waitUntil.isSatisfiedBy(command.known())) - return No; - if (expireAt.isSatisfiedBy(command.known())) - return Expired; - return Yes; - }, request); - } - - Defer(Function<Command, Ready> waitUntil, TxnRequest<?> request) - { - this.waitUntil = waitUntil; - this.request = request; - } - - synchronized void add(SafeCommandStore safeStore, SafeCommand safeCommand, CommandStore commandStore) - { - if (isDone) - throw new IllegalStateException("Recurrent retry of " + request); - - waitingOn.add(commandStore.id()); - ++waitingOnCount; - safeCommand.addListener(this); - } - - @Override - public synchronized void onChange(SafeCommandStore safeStore, SafeCommand safeCommand) - { - Command command = safeCommand.current(); - Ready ready = waitUntil.apply(command); - if (ready == No) return; - - if (!safeCommand.removeListener(this)) - return; - - if (ready == Expired) return; - - int id = safeStore.commandStore().id(); - // TODO (desired): it would be nice at least for transient listener lists to annotate that they are notifying a listener, to avoid redundant invocations - // we can then impose this as an invariant check rather than an early abort - Invariants.checkState(waitingOn.contains(id)); - waitingOn.remove(id); - - ack(); - } - - synchronized void ack() - { - if (-1 == --waitingOnCount) - { - isDone = true; - request.process(); - } - } - - @Override - public PreLoadContext listenerPreLoadContext(TxnId caller) - { - Invariants.checkState(caller.equals(request.txnId)); - return request; - } -} - --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org