ifesdjeen commented on code in PR #244:
URL: https://github.com/apache/cassandra-accord/pull/244#discussion_r2293335962
##########
accord-core/src/test/java/accord/impl/TestAgent.java:
##########
@@ -158,6 +158,12 @@ public long slowCoordinatorDelay(Node node,
SafeCommandStore safeStore, TxnId tx
return units.convert(1L, SECONDS);
}
+ @Override
+ public boolean isSlowCoordinator(long elapsed, TimeUnit units, TxnId
txnId, int attempt)
+ {
+ return units.toSeconds(elapsed) > 1;
Review Comment:
Won't this round it down too much? Should we maybe do `>=`. Maybe does not
matter that much for test agent though.
##########
accord-core/src/main/java/accord/topology/TopologyManager.java:
##########
@@ -417,6 +417,9 @@ public void syncComplete(Id node, long epoch)
*/
public Ranges epochClosed(Ranges ranges, long epoch)
{
+ if (epochs.length == 0)
Review Comment:
Did you see any test failures related to this? I am concerned if we have
problems with startup order now, since usually we should not report closed
epochs while epochs is still empty. I thought I have precluded this, but maybe
not everywhere? Another potential problem could be that we won't report the
epoch as closed again.
##########
accord-core/src/main/java/accord/coordinate/AbstractCoordination.java:
##########
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.coordinate;
+
+import java.util.Objects;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.function.Predicate;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import accord.coordinate.tracking.AbstractTracker;
+import accord.local.Node;
+import accord.local.SequentialAsyncExecutor;
+import accord.messages.Callback;
+import accord.messages.Request;
+import accord.primitives.Route;
+import accord.primitives.TxnId;
+import accord.utils.DebugMap;
+import accord.utils.Invariants;
+import accord.utils.SimpleBitSet;
+import accord.utils.SortedArrays.SortedArrayList;
+import accord.utils.SortedList;
+import accord.utils.SortedListMap;
+import accord.utils.SortedListSet;
+import accord.utils.WrappableException;
+import accord.utils.async.AsyncChain;
+import accord.utils.async.Cancellable;
+
+// TODO (expected): move message sending here, so we can invalidate any
pending callbacks when we advance the state machine
+public abstract class AbstractCoordination<Result, Reply extends
accord.messages.Reply, Ok> extends AbstractSimpleCoordination implements
Callback<Reply>
+{
+ private BiConsumer<? super Result, Throwable> callback;
+ private final SortedArrayList<Node.Id> nodes;
+ private final SimpleBitSet expectingReply;
+ private Object[] replyState;
+ private final @Nullable DebugMap debug;
+ private int replyCount;
+
+ protected AbstractCoordination(Node node, SequentialAsyncExecutor
executor, TxnId txnId, SortedArrayList<Node.Id> nodes, BiConsumer<? super
Result, Throwable> callback)
+ {
+ super(node, executor, txnId);
+ this.nodes = nodes;
+ this.callback = Invariants.nonNull(callback);
+ this.replyState = new Object[nodes.size()];
+ this.expectingReply = SimpleBitSet.allocate(nodes.size());
+ this.debug = Invariants.debug() ? new DebugMap(nodes) : null;
+ }
+
+ abstract void onSuccessInternal(Node.Id from, int fromIndex, Reply reply);
+ abstract void onFailureInternal(Node.Id from, int fromIndex, Throwable
fail);
+ void onSlowResponseInternal(Node.Id from) {}
+
+ void recordOk(int fromIndex, Ok ok)
+ {
+ Invariants.require(replyState[fromIndex] == null, "%s", this);
+ replyState[fromIndex] = ok;
+ replyCount++;
+ }
+
+ SortedListMap<Node.Id, Ok> finishOks()
+ {
+ Invariants.require(replyState != null, "%s", this);
+ setDoneWithReplies();
+ for (int i = expectingReply.nextSetBit(0) ; i >= 0 ; i =
expectingReply.nextSetBit(i + 1))
+ {
+ Object cancel = replyState[i];
+ if (cancel != null)
+ {
+ ((Cancellable)cancel).cancel();
+ replyState[i] = null;
+ }
+ }
+ expectingReply.clear();
+ SortedListMap<Node.Id, Ok> result = new SortedListMap<>(nodes,
replyState, replyCount);
+ replyState = null;
+ return result;
+ }
+
+ <V> V foldlOks(BiFunction<Ok, V, V> foldl, V zero)
+ {
+ V result = zero;
+ for (int i = 0; i < replyState.length ; ++i)
+ {
+ if (replyState[i] != null && !expectingReply.get(i))
+ result = foldl.apply((Ok)replyState[i], result);
+ }
+ return result;
+ }
+
+ void finishWithSuccess(Result success)
+ {
+ finishAndInvokeCallback(success, null);
+ }
+
+ void finishWithFailureOverride(Throwable failure)
+ {
+ finishAndInvokeCallback(null, FailureAccumulator.append(failure,
this.failure()));
+ }
+
+ void finishOnExaustion()
+ {
+ finishOnFailure();
+ }
+
+ void finishOnFailure()
+ {
+ finishAndInvokeCallback(null, FailureAccumulator.fail(node.agent(),
this.failure(), txnId, Route.tryCastToRoute(scope())));
+ }
+
+ void awaitEpochExactToFinish(long epoch, Runnable runnable)
+ {
+ setFinishing();
+ node.withEpochExact(epoch, executor, (ignore, failure) ->
finishWithFailureOverride(failure), WrappableException::wrap, () -> {
+ runnable.run();
+ Invariants.require(isDone(), "%s", this);
+ });
+ }
+
+ void awaitEpochAtLeastToFinish(long epoch, Runnable runnable)
+ {
+ setFinishing();
+ node.withEpochAtLeast(epoch, executor, (ignore, failure) ->
finishWithFailureOverride(failure), WrappableException::wrap, () -> {
+ runnable.run();
+ Invariants.require(isDone(), "%s", this);
+ });
+ }
+
+ void awaitToFinish(AsyncChain<?> await)
+ {
+ setFinishing();
+ await.begin((success, fail) -> {
+ if (fail != null) finishWithFailureOverride(fail);
+ else Invariants.require(isDone(), "%s", this);
+ });
+ }
+
+ void markSelfContacted()
+ {
+ expectingReply.set(nodes.find(node.id()));
+ }
+
+ void contact(Function<Node.Id, Request> request)
+ {
+ contact(request, null);
+ }
+
+ void contact(Function<Node.Id, Request> request, @Nullable
Predicate<Node.Id> include)
+ {
+ for (int i = 0; i < nodes.size() ; ++i)
+ {
+ Node.Id to = nodes.get(i);
+ if (include == null || include.test(to))
+ {
+ expectingReply.set(i);
+ replyState[i] = node.send(to, request.apply(to), executor,
this);
+ }
+ }
+ }
+
+ @Override
+ public void onSuccess(Node.Id from, Reply reply)
+ {
+ int fromIndex = onReply(from, reply, reply.isFinal());
+ if (fromIndex < 0)
+ return;
+
+ onSuccessInternal(from, fromIndex, reply);
+ Invariants.require(!expectingReply.isEmpty() || isFinishing() ||
isDone(), "%s", this);
+ }
+
+ @Override
+ public final void onSlowResponse(Node.Id from)
+ {
+ if (!isDoneWithReplies())
+ onSlowResponseInternal(from);
+ }
+
+ @Override
+ public void onFailure(Node.Id from, @Nullable Throwable failure)
+ {
+ int fromIndex = onReply(from, failure, true);
+ if (fromIndex < 0)
+ return;
+
+ onFailureInternal(from, fromIndex, failure);
+ Invariants.require(!expectingReply.isEmpty() || isFinishing() ||
isDone(), "%s", this);
+ }
+
+ private int onReply(Node.Id from, Object reply, boolean isFinal)
+ {
+ int fromIndex = nodes.find(from);
+ if (isDoneWithReplies())
+ {
+ if (isFinal && replyState != null)
+ replyState[fromIndex] = null;
+ return -1;
+ }
+
+ if (debug != null)
+ debug.debug(from, reply);
+
+ if (isFinal)
+ {
+ boolean expecting = expectingReply.unset(fromIndex);
+ Invariants.require(expecting, "%s", this);
+ replyState[fromIndex] = null;
+ }
+ else
+ {
+ boolean expecting = expectingReply.get(fromIndex);
+ Invariants.require(expecting, "%s", this);
+ }
+ return fromIndex;
+ }
+
+ @Override
+ public boolean onCallbackFailure(Node.Id from, Throwable failure)
+ {
+ if (isDone())
+ return false;
+
+ setDone();
+ BiConsumer<?, Throwable> callback = tryTakeCallback();
+ if (callback != null) callback.accept(null, failure);
+ else node.agent().onUncaughtException(failure);
+ return true;
+ }
+
+ void finishAndInvokeCallback(Result success, Throwable failure)
+ {
+ finishAndTakeCallback().accept(success, failure);
+ }
+
+ BiConsumer<? super Result, Throwable> finishAndTakeCallback()
+ {
+ setDone();
+ return takeCallback();
+ }
+
+ private BiConsumer<? super Result, Throwable> takeCallback()
+ {
+ BiConsumer<? super Result, Throwable> callback = this.callback;
+ this.callback = null;
+ Invariants.require(callback != null, "%s", this);
+ return callback;
+ }
+
+ private BiConsumer<? super Result, Throwable> tryTakeCallback()
+ {
+ if (callback == null)
+ return null;
+ return takeCallback();
+ }
+
+ @Override
+ public boolean abort()
+ {
+ if (isDone())
+ return false;
+
+ finishWithFailureOverride(Aborted.aborted(txnId,
Route.tryCastToRoute(scope())));
+ return true;
+ }
+
+ @Override
+ public final SortedListMap<Node.Id, ?> replies()
+ {
+ Object[] replyState = this.replyState;
+ if (replyState == null)
+ return null;
+
+ Object[] replies = replyState.clone();
+ for (int i = 0 ; i < replies.length ; ++i)
+ {
+ if (replies[i] != null && !(replies[i] instanceof
accord.messages.Reply))
+ replies[i] = null;
+ }
+ return new SortedListMap<Node.Id, Ok>(nodes, replies, replyCount);
Review Comment:
nit: I think this can even be `new SortedListMap<>` but generally reply type
is erased, do you want to specify here?
##########
accord-core/src/main/java/accord/coordinate/ReadCoordinator.java:
##########
@@ -301,4 +308,74 @@ protected RequestStatus trySendMore()
contact.forEach(this::contact);
return status;
}
+
+ @Override
+ public SortedList<Id> inflight()
+ {
+ Node.Id[] ids = new Node.Id[inflight.size()];
Review Comment:
Should we just use int arrays here? Especially if we're using this for debug
output only
##########
accord-core/src/main/java/accord/impl/RequestCallbacks.java:
##########
@@ -110,7 +112,7 @@ public void onExpire(long nowMicros)
@Override
public void onExpire(long nowMicros)
{
- safeInvoke(RegisteredCallback::unsafeOnFailure, new
accord.coordinate.Timeout(null, null));
+ safeInvoke(RegisteredCallback::unsafeOnFailure,
(Throwable)null);
Review Comment:
Should w maybe use some static variable in place of timeout if the idea here
was to avoid additional allocations?
##########
accord-core/src/main/java/accord/coordinate/AbstractCoordination.java:
##########
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.coordinate;
+
+import java.util.Objects;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.function.Predicate;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import accord.coordinate.tracking.AbstractTracker;
+import accord.local.Node;
+import accord.local.SequentialAsyncExecutor;
+import accord.messages.Callback;
+import accord.messages.Request;
+import accord.primitives.Route;
+import accord.primitives.TxnId;
+import accord.utils.DebugMap;
+import accord.utils.Invariants;
+import accord.utils.SimpleBitSet;
+import accord.utils.SortedArrays.SortedArrayList;
+import accord.utils.SortedList;
+import accord.utils.SortedListMap;
+import accord.utils.SortedListSet;
+import accord.utils.WrappableException;
+import accord.utils.async.AsyncChain;
+import accord.utils.async.Cancellable;
+
+// TODO (expected): move message sending here, so we can invalidate any
pending callbacks when we advance the state machine
+public abstract class AbstractCoordination<Result, Reply extends
accord.messages.Reply, Ok> extends AbstractSimpleCoordination implements
Callback<Reply>
+{
+ private BiConsumer<? super Result, Throwable> callback;
+ private final SortedArrayList<Node.Id> nodes;
+ private final SimpleBitSet expectingReply;
+ private Object[] replyState;
Review Comment:
It is unclear what is `Ok`, and whether `Ok` is always cancellable, and what
this `Object` can be safely cast to.
##########
accord-core/src/main/java/accord/local/Node.java:
##########
@@ -998,6 +970,39 @@ public final long currentStamp()
return stamp;
}
+ public long nextCoordinationId()
+ {
+ long startedAtNanos = time.elapsed(NANOSECONDS);
+ long nextId = nextCoordinationId.get();
+ if (startedAtNanos >= nextId &&
nextCoordinationId.compareAndSet(nextId, startedAtNanos))
+ return startedAtNanos;
+ return nextCoordinationId.incrementAndGet();
+ }
+
+ public void register(Coordination coordination)
+ {
+ coordinations.register(coordination);
Review Comment:
Could you wrap calls to coordinations register/unregister into try/catch?
Just to prevent txns failing in case something goes wrong in coordinations.
##########
accord-core/src/main/java/accord/coordinate/Recover.java:
##########
@@ -244,17 +212,57 @@ public void onSuccess(Id from, RecoverReply reply)
recover();
}
+ BiConsumer<? super Outcome, Throwable> finishAndTakeCallback()
+ {
+ return finishAndWrapCallback();
+ }
+
+ private BiConsumer<? super Outcome, Throwable> finishAndWrapCallback()
+ {
+ BiConsumer<? super Outcome, Throwable> callback =
super.finishAndTakeCallback();
+ return (success, failure) -> {
+ callback.accept(success, failure);
+ node.agent().coordinatorEvents().onRecoveryStopped(node, txnId,
ballot, null, failure);
+ };
+ }
+
+ private BiConsumer<? super Outcome, Throwable> finishAndUnwrapCallback()
+ {
+ return super.finishAndTakeCallback();
+ }
+
+ private BiConsumer<Result, Throwable> finishAndTakeResultCallback()
+ {
+ BiConsumer<? super Outcome, Throwable> callback =
finishAndUnwrapCallback();
+ return (success, failure) -> {
+ if (failure == null)
+ {
+ callback.accept(ProgressToken.APPLIED, null);
+ node.agent().coordinatorEvents().onRecoveryStopped(node,
txnId, ballot, success, failure);
+ }
+ else if (failure instanceof Redundant)
+ {
+ Timestamp committedExecuteAt = ((Redundant)
failure).committedExecuteAt;
+ if (tracing != null)
+ tracing.trace(null, "Recover found Redundant; retrying
with known committedExecuteAt " + committedExecuteAt);
+ retry(committedExecuteAt, callback);
Review Comment:
Could you add an invariant that current recover is done before calling
retry? Maybe superfluous.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]