ifesdjeen commented on code in PR #263:
URL: https://github.com/apache/cassandra-accord/pull/263#discussion_r2619565595
##########
accord-core/src/main/java/accord/topology/TopologyMismatch.java:
##########
@@ -45,31 +47,19 @@ public TopologyMismatch rethrowable()
private enum Mismatch { NOT_KNOWN, PENDING_REMOVAL }
@Nullable
- public static TopologyMismatch checkForMismatch(long epoch, Routables<?>
keysOrRanges, ActiveEpochs active, Txn.Kind kind) throws TopologyException
+ public static TopologyMismatch checkForMismatch(long epoch, ActiveEpochs
active, Routables<?> keysOrRanges, Txn.Kind kind) throws TopologyException
{
- Topology topology = active.globalForEpoch(epoch);
- Mismatch result = topology.foldlWithDefault(keysOrRanges, (shard, k,
v, i) -> {
- if (shard == null)
- return Mismatch.NOT_KNOWN;
- if (shard.is(Shard.Flag.PENDING_REMOVAL) && !k.isSyncPoint())
- return Mismatch.PENDING_REMOVAL;
- return v;
- }, null, kind, null);
-
- if (result == null)
+ ActiveEpoch e = active.get(epoch);
+ Topology topology = e.get(kind.isSyncPoint() ? ALL : LIVE);
+ if (topology.ranges.containsAll(keysOrRanges))
return null;
String message;
- switch (result)
- {
- default: throw new UnhandledEnum(result);
- case PENDING_REMOVAL:
- message = String.format("Txn attempted to access keys or
ranges that are being removed in epoch %d (%s)", topology.epoch(),
keysOrRanges);
- break;
- case NOT_KNOWN:
- message = String.format("Txn attempted to access keys or
ranges that are not known in the epoch %d (%s)", topology.epoch(),
keysOrRanges);
- break;
- }
+ if (kind.isSyncPoint() || !e.all.ranges.containsAll(keysOrRanges))
+ message = String.format("Txn attempted to access keys or ranges
that are being removed in epoch %d (%s)", topology.epoch(),
keysOrRanges.without(e.all.ranges));
Review Comment:
(discussed off-thread)
Here, it seems like ranges aren't present in active, so the message might
have to be "not known" (i.e. below clause)
##########
accord-core/src/main/java/accord/topology/PendingEpoch.java:
##########
@@ -171,12 +177,35 @@ public int stripe()
return (int) epoch;
}
- static class WaitingForEpoch extends AsyncResults.SettableResult<Void>
+ static class WaitingForEpoch
{
- final long deadlineMicros;
+ private static final WaitingForEpoch DONE = new WaitingForEpoch(0);
+ static { DONE.result.setSuccess(null); }
+
+ private final AsyncResults.SettableResult<Void> result = new
AsyncResults.SettableResult<>();
+ private final long deadlineMicros;
+
+ private volatile int waiting;
+ private static final AtomicIntegerFieldUpdater<WaitingForEpoch>
waitingUpdater = AtomicIntegerFieldUpdater.newUpdater(WaitingForEpoch.class,
"waiting");
+
WaitingForEpoch(long deadlineMicros)
{
this.deadlineMicros = deadlineMicros;
}
+
+ AsyncChain<Void> chainImmediatelyElse(@Nullable AsyncExecutor executor)
+ {
+ AsyncChain<Void> chain = result.chain();
+ if (result.isDone())
+ return chain;
+
+ waitingUpdater.incrementAndGet(this);
Review Comment:
Do we also want to decrement on completion?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]