sashapolo commented on code in PR #6198:
URL: https://github.com/apache/ignite-3/pull/6198#discussion_r2189546404
##########
modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java:
##########
@@ -255,63 +257,81 @@ private void enqueuePrimaryReplicaEvents(
}
}
- private void awaitPrimaryReplica(
- ReplicationGroupId groupId,
- HybridTimestamp timestamp,
- CompletableFuture<ReplicaMeta> resultFuture
- ) {
- inBusyLockAsync(busyLock, () ->
getOrCreatePrimaryReplicaWaiter(groupId).waitFor(timestamp)
- .thenAccept(replicaMeta -> {
- ClusterNode leaseholderNode =
clusterNodeResolver.getById(replicaMeta.getLeaseholderId());
-
- if (leaseholderNode == null && !resultFuture.isDone()) {
- awaitPrimaryReplica(
- groupId,
- replicaMeta.getExpirationTime().tick(),
- resultFuture
- );
- } else {
- resultFuture.complete(replicaMeta);
- }
- })
- );
- }
-
@Override
public CompletableFuture<ReplicaMeta> awaitPrimaryReplica(
ReplicationGroupId groupId,
HybridTimestamp timestamp,
long timeout,
TimeUnit unit
) {
- if (!busyLock.enterBusy()) {
- throw new IgniteInternalException(NODE_STOPPING_ERR, new
NodeStoppingException());
- }
- try {
+ return inBusyLockAsync(busyLock, () -> {
ReplicaMeta currentMeta = getCurrentPrimaryReplica(groupId,
timestamp);
if (currentMeta != null &&
clusterNodeResolver.getById(currentMeta.getLeaseholderId()) != null) {
return completedFuture(currentMeta);
}
- } finally {
- busyLock.leaveBusy();
- }
- CompletableFuture<ReplicaMeta> future = new CompletableFuture<>();
+ return awaitPrimaryReplicaImpl(groupId, timestamp, timeout, unit);
+ });
+ }
+
+ private CompletableFuture<ReplicaMeta> awaitPrimaryReplicaImpl(
+ ReplicationGroupId groupId,
+ HybridTimestamp timestamp,
+ long timeout,
+ TimeUnit unit
+ ) {
+ var resultFuture = new
CompletableFuture<ReplicaMeta>().orTimeout(timeout, unit);
- awaitPrimaryReplica(groupId, timestamp, future);
+ awaitPrimaryReplicaImpl(groupId, timestamp, resultFuture)
+ .whenComplete(copyStateTo(resultFuture));
- return future
- .orTimeout(timeout, unit)
+ return resultFuture
.exceptionally(e -> {
if (e instanceof TimeoutException) {
throw new PrimaryReplicaAwaitTimeoutException(groupId,
timestamp, leases.leaseByGroupId().get(groupId), e);
+ } else if (hasCause(e, TrackerClosedException.class)) {
+ // TrackerClosedException is thrown when trackers are
closed on node stop.
+ throw new CompletionException(new
NodeStoppingException(e));
+ } else {
+ throw new PrimaryReplicaAwaitException(groupId,
timestamp, e);
}
-
- throw new PrimaryReplicaAwaitException(groupId, timestamp,
e);
});
}
+ /**
+ * Returns a future that completes when the target primary replica appears.
+ *
+ * <p>{@code timeoutFuture} here is not used for storing the operation
result, but rather works in conjunction with the
+ * {@link #awaitPrimaryReplicaImpl(ReplicationGroupId, HybridTimestamp,
long, TimeUnit)} method which passes a future that timeouts
+ * after a configurable amount of time. This future is therefore used to
stop waiting if the timeout has been reached.
+ */
+ private CompletableFuture<ReplicaMeta> awaitPrimaryReplicaImpl(
+ ReplicationGroupId groupId,
+ HybridTimestamp timestamp,
+ CompletableFuture<?> timeoutFuture
Review Comment:
I don't like it either, that's why added a big javadoc. But I think I have
an idea....
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]