ifesdjeen commented on code in PR #214:
URL: https://github.com/apache/cassandra-accord/pull/214#discussion_r2175595366
##########
accord-core/src/main/java/accord/local/durability/DurabilityService.java:
##########
@@ -73,64 +74,97 @@ public GlobalDurability global()
return global;
}
- public synchronized void start()
+ public void start()
{
- Invariants.require(!started);
- started = true;
+ synchronized (this)
+ {
+ Invariants.require(!started);
+ started = true;
+ }
Topology current = node.topology().current();
shards.updateTopology(current);
global.updateTopology(current);
shards.start();
global.start();
}
- public synchronized void stop()
+ public void stop()
{
shards.stop();
global.stop();
}
- public AsyncResult<Void> close(String requestedBy, Ranges ranges)
+ public AsyncResult<Void> close(String requestedBy, Ranges ranges, long
timeoutDelay, TimeUnit timeoutUnits)
{
- return close(requestedBy, TxnId.NONE, ranges);
+ return close(requestedBy, TxnId.NONE, ranges, timeoutDelay,
timeoutUnits);
}
- public AsyncResult<Void> close(Object requestedBy, Timestamp minBound,
Ranges ranges)
+ public AsyncResult<Void> close(Object requestedBy, Timestamp minBound,
Ranges ranges, long timeoutDelay, TimeUnit timeoutUnits)
{
- return submit(new DurabilityRequest(requestedBy, minBound, ranges,
SyncLocal.NoLocal, SyncRemote.NoRemote, null,
node.elapsed(MICROSECONDS))).result;
+ long startedAt = node.elapsed(MICROSECONDS);
+ long timeoutAt = startedAt + timeoutUnits.toMicros(timeoutDelay);
+ return submit(new DurabilityRequest(requestedBy, minBound, ranges,
SyncLocal.NoLocal, SyncRemote.NoRemote, null, startedAt, timeoutAt)).result;
}
- public AsyncResult<Void> sync(Object requestedBy, Ranges ranges, SyncLocal
local, SyncRemote remote)
+ public AsyncResult<Void> sync(Object requestedBy, Ranges ranges, SyncLocal
local, SyncRemote remote, long timeoutDelay, TimeUnit timeoutUnits)
{
- return sync(requestedBy, TxnId.NONE, ranges, local, remote);
+ return sync(requestedBy, TxnId.NONE, ranges, local, remote,
timeoutDelay, timeoutUnits);
}
- public AsyncResult<Void> sync(Object requestedBy, Timestamp minBound,
Ranges ranges, SyncLocal local, SyncRemote remote)
+ public AsyncResult<Void> sync(Object requestedBy, Timestamp minBound,
Ranges ranges, SyncLocal local, SyncRemote remote, long timeoutDelay, TimeUnit
timeoutUnits)
{
- return submit(new DurabilityRequest(requestedBy, minBound, ranges,
local, remote, null, node.elapsed(MICROSECONDS))).result;
+ long startedAt = node.elapsed(MICROSECONDS);
+ long timeoutAt = startedAt + timeoutUnits.toMicros(timeoutDelay);
+ return submit(new DurabilityRequest(requestedBy, minBound, ranges,
local, remote, null, startedAt, timeoutAt)).result;
}
- public AsyncResult<Void> sync(Object requestedBy, Ranges ranges, @Nullable
Collection<Node.Id> include, SyncLocal local, SyncRemote remote)
+ public AsyncResult<Void> sync(Object requestedBy, Ranges ranges, @Nullable
Collection<Node.Id> include, SyncLocal local, SyncRemote remote, long
timeoutDelay, TimeUnit timeoutUnits)
{
- return sync(requestedBy, TxnId.NONE, ranges, include, local, remote);
+ return sync(requestedBy, TxnId.NONE, ranges, include, local, remote,
timeoutDelay, timeoutUnits);
}
- public AsyncResult<Void> sync(Object requestedBy, Timestamp minBound,
Ranges ranges, @Nullable Collection<Node.Id> include, SyncLocal local,
SyncRemote remote)
+ public AsyncResult<Void> sync(Object requestedBy, Timestamp minBound,
Ranges ranges, @Nullable Collection<Node.Id> include, SyncLocal local,
SyncRemote remote, long timeoutDelay, TimeUnit timeoutUnits)
{
- return submit(new DurabilityRequest(requestedBy, minBound, ranges,
local, remote, include, node.elapsed(MICROSECONDS))).result;
+ long startedAt = node.elapsed(MICROSECONDS);
+ long timeoutAt = startedAt + timeoutUnits.toMicros(timeoutDelay);
+ return submit(new DurabilityRequest(requestedBy, minBound, ranges,
local, remote, include, startedAt, timeoutAt)).result;
}
private DurabilityRequest submit(DurabilityRequest request)
{
- synchronized (this)
- {
- requests.add(request);
- }
+ register(request);
logger.info("Requesting durability {}", request);
shards.request(request);
return request;
}
+ void register(DurabilityRequest request)
+ {
+ request.timeout = node.timeouts().registerAt(new Timeout()
+ {
+ @Override public int stripe() { return request.ranges.hashCode(); }
+ @Override public void timeout()
+ {
+ request.timeout();
+ unregister(request);
+ }
+ }, request.timeoutAt, MICROSECONDS);
+
+ synchronized (this)
+ {
+ if (!request.isDone()) // guard against unlikely scenario of
timeout firing before we register here
Review Comment:
If my understanding is correct, could still get triggered between isDone
check and adding to request, should we check _after_ adding and unregistering
then?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]