denis-chudov commented on code in PR #4329:
URL: https://github.com/apache/ignite-3/pull/4329#discussion_r1756565974


##########
modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java:
##########
@@ -370,38 +397,32 @@ private void updateLeaseBatchInternal() {
                 if (!lease.isAccepted()) {
                     LeaseAgreement agreement = 
leaseNegotiator.getAndRemoveIfReady(grpId);
 
-                    agreement.checkValid(grpId, 
topologyTracker.currentTopologySnapshot(), assignments);
+                    if (lease.isProlongable()) {
+                        agreement.checkValid(grpId, 
topologyTracker.currentTopologySnapshot(), assignments);
 
-                    if (agreement.isAccepted()) {
-                        Lease negotiatedLease = agreement.getLease();
+                        if (agreement.isAccepted()) {
+                            Lease negotiatedLease = agreement.getLease();
 
-                        // Lease information is taken from lease tracker, 
where it appears on meta storage watch updates, so it can contain
-                        // stale leases, if watch processing was delayed for 
some reason. It is ok: negotiated lease is guaranteed to be
-                        // already written to meta storage before negotiation 
begins, and in this case its start time would be
-                        // greater than lease's.
-                        assert negotiatedLease.getStartTime().longValue() >= 
lease.getStartTime().longValue()
-                                : format("Can't publish the lease that was not 
negotiated [groupId={}, startTime={}, "
+                            // Lease information is taken from lease tracker, 
where it appears on meta storage watch updates, so it can
+                            // contain stale leases, if watch processing was 
delayed for some reason. It is ok: negotiated lease is
+                            // guaranteed to be already written to meta 
storage before negotiation begins, and in this case its start time
+                            // would be greater than lease's.
+                            assert negotiatedLease.getStartTime().longValue() 
>= lease.getStartTime().longValue()
+                                    : format("Can't publish the lease that was 
not negotiated [groupId={}, startTime={}, "
                                     + "agreementLeaseStartTime={}].", grpId, 
lease.getStartTime(), agreement.getLease().getStartTime());
 
-                        publishLease(grpId, negotiatedLease, renewedLeases);
-
-                        continue;
-                    } else if (agreement.isDeclined()) {
-                        // Here we initiate negotiations for 
UNDEFINED_AGREEMENT and retry them on newly started active actor as well.
-                        ClusterNode candidate = nextLeaseHolder(assignments, 
grpId, agreement.getRedirectTo());
+                            publishLease(grpId, negotiatedLease, 
renewedLeases);
 
-                        if (candidate == null) {
-                            leaseUpdateStatistics.onLeaseWithoutCandidate();
+                            continue;
+                        } else if (agreement.isDeclined()) {
+                            // Here we initiate negotiations for 
UNDEFINED_AGREEMENT and retry them on newly started active actor as well.
+                            chooseCandidateAndCreateNewLease(grpId, lease, 
agreement, assignments, renewedLeases, toBeNegotiated);
 
                             continue;
                         }
-
-                        // New lease is granted.
-                        writeNewLease(grpId, candidate, renewedLeases);
-
-                        boolean force = Objects.equals(lease.getLeaseholder(), 
candidate.name());
-
-                        toBeNegotiated.put(grpId, force);
+                    } else {
+                        // If the lease was denied, create the new one.
+                        chooseCandidateAndCreateNewLease(grpId, lease, 
agreement, assignments, renewedLeases, toBeNegotiated);

Review Comment:
   refactored



##########
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java:
##########
@@ -546,23 +554,76 @@ private void 
onPlacementDriverMessageReceived(NetworkMessage msg0, ClusterNode s
      *
      * @param groupId Replication group id.
      * @param redirectNodeId Node consistent id to redirect.
+     * @return Future that is completed when the lease is denied to prolong, 
containing the expiration time of this lease.
      */
-    private void stopLeaseProlongation(ReplicationGroupId groupId, @Nullable 
String redirectNodeId) {
-        LOG.info("The replica does not meet the requirements for the 
leaseholder [groupId={}, redirectNodeId={}]", groupId, redirectNodeId);
-
-        msNodes.thenAccept(nodeIds -> {
-            for (String nodeId : nodeIds) {
-                ClusterNode node = 
clusterNetSvc.topologyService().getByConsistentId(nodeId);
-
-                if (node != null) {
-                    // TODO: IGNITE-19441 Stop lease prolongation message 
might be sent several
-                    clusterNetSvc.messagingService().send(node, 
PLACEMENT_DRIVER_MESSAGES_FACTORY.stopLeaseProlongationMessage()
-                            .groupId(groupId)
-                            .redirectProposal(redirectNodeId)
-                            .build());
+    private CompletableFuture<HybridTimestamp> stopLeaseProlongation(
+            ReplicationGroupId groupId,
+            @Nullable String redirectNodeId
+    ) {
+        long retriesTimeout = 60_000;

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to