This is an automated email from the ASF dual-hosted git repository.

vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new a2cdcda807 IGNITE-21348 Trigger the lease negotiation retry in case 
when the lease candidate is no more contained in assignments (#3401)
a2cdcda807 is described below

commit a2cdcda807d509739ae97a1076396c99023aa7a7
Author: Denis Chudov <[email protected]>
AuthorDate: Fri Mar 15 15:51:36 2024 +0300

    IGNITE-21348 Trigger the lease negotiation retry in case when the lease 
candidate is no more contained in assignments (#3401)
---
 .../placementdriver/AssignmentsTracker.java        |  28 +-
 .../internal/placementdriver/LeaseUpdater.java     |  12 +-
 .../internal/placementdriver/TopologyTracker.java  |  13 +-
 .../placementdriver/leases/LeaseTracker.java       |   2 +-
 .../negotiation/LeaseAgreement.java                |  53 +++-
 .../negotiation/LeaseNegotiator.java               |  41 +--
 .../placementdriver/LeaseNegotiationTest.java      | 291 +++++++++++++++++++++
 .../internal/placementdriver/LeaseTrackerTest.java |   2 +-
 .../apache/ignite/internal/replicator/Replica.java |   2 +-
 9 files changed, 382 insertions(+), 62 deletions(-)

diff --git 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/AssignmentsTracker.java
 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/AssignmentsTracker.java
index 9783715fab..11e4da4de6 100644
--- 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/AssignmentsTracker.java
+++ 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/AssignmentsTracker.java
@@ -39,7 +39,6 @@ import org.apache.ignite.internal.metastorage.WatchEvent;
 import org.apache.ignite.internal.metastorage.WatchListener;
 import org.apache.ignite.internal.replicator.ReplicationGroupId;
 import org.apache.ignite.internal.replicator.TablePartitionId;
-import org.apache.ignite.internal.util.CollectionUtils;
 import org.apache.ignite.internal.util.Cursor;
 
 /**
@@ -142,28 +141,20 @@ public class AssignmentsTracker {
                                 .collect(Collectors.joining(",")));
             }
 
-            boolean leaseRenewalRequired = false;
-
             for (EntryEvent evt : event.entryEvents()) {
+                Entry entry = evt.newEntry();
+
                 var replicationGrpId = TablePartitionId.fromString(
-                        new String(evt.newEntry().key(), 
StandardCharsets.UTF_8).replace(STABLE_ASSIGNMENTS_PREFIX, ""));
+                        new String(entry.key(), 
StandardCharsets.UTF_8).replace(STABLE_ASSIGNMENTS_PREFIX, ""));
 
-                if (evt.newEntry().tombstone()) {
+                if (entry.tombstone()) {
                     groupAssignments.remove(replicationGrpId);
                 } else {
-                    Set<Assignment> newAssignments = 
Assignments.fromBytes(evt.newEntry().value()).nodes();
-                    Set<Assignment> prevAssignment = 
groupAssignments.put(replicationGrpId, newAssignments);
-
-                    if (CollectionUtils.nullOrEmpty(prevAssignment)) {
-                        leaseRenewalRequired = true;
-                    }
+                    Set<Assignment> newAssignments = 
Assignments.fromBytes(entry.value()).nodes();
+                    groupAssignments.put(replicationGrpId, newAssignments);
                 }
             }
 
-            if (leaseRenewalRequired) {
-                triggerToRenewLeases();
-            }
-
             return nullCompletedFuture();
         }
 
@@ -171,11 +162,4 @@ public class AssignmentsTracker {
         public void onError(Throwable e) {
         }
     }
-
-    /**
-     * Triggers to renew leases forcibly. The method wakes up the monitor of 
{@link LeaseUpdater}.
-     */
-    private void triggerToRenewLeases() {
-        // TODO: IGNITE-18879 Implement lease maintenance.
-    }
 }
diff --git 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java
 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java
index d77e4304d9..59407a49f3 100644
--- 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java
+++ 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java
@@ -340,6 +340,7 @@ public class LeaseUpdater {
 
             for (Map.Entry<ReplicationGroupId, Set<Assignment>> entry : 
currentAssignments.entrySet()) {
                 ReplicationGroupId grpId = entry.getKey();
+                Set<Assignment> assignments = entry.getValue();
 
                 Lease lease = leaseTracker.getLease(grpId);
 
@@ -348,7 +349,9 @@ public class LeaseUpdater {
                 }
 
                 if (!lease.isAccepted()) {
-                    LeaseAgreement agreement = 
leaseNegotiator.negotiated(grpId);
+                    LeaseAgreement agreement = 
leaseNegotiator.getAndRemoveIfReady(grpId);
+
+                    agreement.checkValid(grpId, 
topologyTracker.currentTopologySnapshot(), assignments);
 
                     if (agreement.isAccepted()) {
                         publishLease(grpId, lease, renewedLeases);
@@ -356,7 +359,7 @@ public class LeaseUpdater {
                         continue;
                     } else if (agreement.ready()) {
                         // Here we initiate negotiations for 
UNDEFINED_AGREEMENT and retry them on newly started active actor as well.
-                        ClusterNode candidate = 
nextLeaseHolder(entry.getValue(), agreement.getRedirectTo());
+                        ClusterNode candidate = nextLeaseHolder(assignments, 
agreement.getRedirectTo());
 
                         if (candidate == null) {
                             leaseUpdateStatistics.onLeaseWithoutCandidate();
@@ -373,10 +376,7 @@ public class LeaseUpdater {
 
                 // The lease is expired or close to this.
                 if (lease.getExpirationTime().getPhysical() < 
outdatedLeaseThreshold) {
-                    ClusterNode candidate = nextLeaseHolder(
-                            entry.getValue(),
-                            lease.isProlongable() ? lease.getLeaseholder() : 
null
-                    );
+                    ClusterNode candidate = nextLeaseHolder(assignments, 
lease.isProlongable() ? lease.getLeaseholder() : null);
 
                     if (candidate == null) {
                         leaseUpdateStatistics.onLeaseWithoutCandidate();
diff --git 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/TopologyTracker.java
 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/TopologyTracker.java
index 15a7e48ad2..d8d3908bbb 100644
--- 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/TopologyTracker.java
+++ 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/TopologyTracker.java
@@ -106,6 +106,10 @@ public class TopologyTracker {
         return null;
     }
 
+    LogicalTopologySnapshot currentTopologySnapshot() {
+        return topologySnapRef.get();
+    }
+
     /**
      * Topology listener.
      */
@@ -142,15 +146,6 @@ public class TopologyTracker {
             } while (!topologySnapRef.compareAndSet(logicalTopologySnap0, 
topologySnap));
 
             LOG.debug("Logical topology updated for placement driver 
[topologySnap={}]", topologySnap);
-
-            triggerToRenewLeases();
         }
     }
-
-    /**
-     * Triggers to renew leases forcibly. The method wakes up the monitor of 
{@link LeaseUpdater}.
-     */
-    private void triggerToRenewLeases() {
-        // TODO: IGNITE-18879 Implement lease maintenance.
-    }
 }
diff --git 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java
 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java
index 1ba16aec7e..62154f2fb1 100644
--- 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java
+++ 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java
@@ -116,7 +116,7 @@ public class LeaseTracker extends 
AbstractEventProducer<PrimaryReplicaEvent, Pri
      */
     public void startTrack(long recoveryRevision) {
         inBusyLock(busyLock, () -> {
-            msManager.registerPrefixWatch(PLACEMENTDRIVER_LEASES_KEY, 
updateListener);
+            msManager.registerExactWatch(PLACEMENTDRIVER_LEASES_KEY, 
updateListener);
 
             loadLeasesBusyAsync(recoveryRevision);
         });
diff --git 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/negotiation/LeaseAgreement.java
 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/negotiation/LeaseAgreement.java
index 3033777efb..61fc719798 100644
--- 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/negotiation/LeaseAgreement.java
+++ 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/negotiation/LeaseAgreement.java
@@ -17,16 +17,30 @@
 
 package org.apache.ignite.internal.placementdriver.negotiation;
 
+import static java.util.Objects.requireNonNull;
+import static java.util.stream.Collectors.toSet;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static org.apache.ignite.internal.util.IgniteUtils.findAny;
 
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.affinity.Assignment;
+import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
+import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.placementdriver.leases.Lease;
 import 
org.apache.ignite.internal.placementdriver.message.LeaseGrantedMessageResponse;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * The agreement is formed from {@link LeaseGrantedMessageResponse}.
  */
 public class LeaseAgreement {
+    /** The logger. */
+    private static final IgniteLogger LOG = 
Loggers.forClass(LeaseAgreement.class);
+
     /**
      * The agreement, which has not try negotiating yet. We assume that it is 
{@link #ready()} and not {@link #isAccepted()}
      * which allows both initiation and retries of negotiation.
@@ -47,7 +61,7 @@ public class LeaseAgreement {
      */
     public LeaseAgreement(Lease lease, 
CompletableFuture<LeaseGrantedMessageResponse> remoteNodeResponseFuture) {
         this.lease = lease;
-        this.responseFut = remoteNodeResponseFuture;
+        this.responseFut = requireNonNull(remoteNodeResponseFuture);
     }
 
     /**
@@ -84,6 +98,7 @@ public class LeaseAgreement {
      *
      * @return Node id to propose a lease.
      */
+    @Nullable
     public String getRedirectTo() {
         assert ready() : "The method should be invoked only after the 
agreement is ready";
 
@@ -98,6 +113,40 @@ public class LeaseAgreement {
      * @return True if a response of the agreement has been received, false 
otherwise.
      */
     public boolean ready() {
-        return responseFut != null && responseFut.isDone();
+        return responseFut.isDone();
+    }
+
+    /**
+     * Check the validity of the agreement in the current logical topology and 
group assignments. If the suggested leaseholder
+     * has left topology or not included into the current assignments, the 
agreement is broken.
+     *
+     * @param groupId Group id.
+     * @param currentTopologySnapshot Current topology snapshot.
+     * @param assignments Assignments.
+     */
+    public void checkValid(
+            ReplicationGroupId groupId,
+            @Nullable LogicalTopologySnapshot currentTopologySnapshot,
+            Set<Assignment> assignments
+    ) {
+        if (ready()) {
+            return;
+        }
+
+        if (findAny(assignments, a -> 
a.consistentId().equals(lease.getLeaseholder())).isEmpty()) {
+            LOG.info("Lease was not negotiated because the node is not 
included into the group assignments anymore [node={}, group={}, "
+                    + "assignments={}].", lease.getLeaseholder(), lease, 
assignments);
+
+            responseFut.complete(null);
+        } else if (currentTopologySnapshot != null) {
+            Set<String> nodeIds = 
currentTopologySnapshot.nodes().stream().map(LogicalNode::id).collect(toSet());
+
+            if (!nodeIds.contains(lease.getLeaseholderId())) {
+                LOG.info("Lease was not negotiated because the node has left 
the logical topology [node={}, nodeId={}, group={}]",
+                        lease.getLeaseholder(), lease.getLeaseholderId(), 
groupId);
+
+                responseFut.complete(null);
+            }
+        }
     }
 }
diff --git 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/negotiation/LeaseNegotiator.java
 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/negotiation/LeaseNegotiator.java
index fecf46563f..ca3b719888 100644
--- 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/negotiation/LeaseNegotiator.java
+++ 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/negotiation/LeaseNegotiator.java
@@ -27,7 +27,6 @@ import org.apache.ignite.internal.lang.NodeStoppingException;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.network.ClusterService;
-import org.apache.ignite.internal.placementdriver.LeaseUpdater;
 import org.apache.ignite.internal.placementdriver.leases.Lease;
 import 
org.apache.ignite.internal.placementdriver.message.LeaseGrantedMessageResponse;
 import 
org.apache.ignite.internal.placementdriver.message.PlacementDriverMessagesFactory;
@@ -42,7 +41,7 @@ public class LeaseNegotiator {
 
     private static final PlacementDriverMessagesFactory 
PLACEMENT_DRIVER_MESSAGES_FACTORY = new PlacementDriverMessagesFactory();
 
-    /** Leases ready to accept. */
+    /** Lease agreements which are in progress of negotiation. */
     private final Map<ReplicationGroupId, LeaseAgreement> leaseToNegotiate;
 
     /** Cluster service. */
@@ -61,7 +60,7 @@ public class LeaseNegotiator {
 
     /**
      * Tries negotiating a lease with its leaseholder.
-     * The negotiation will achieve after the method is invoked. Use {@link 
#negotiated(ReplicationGroupId)} to check a result.
+     * The negotiation will achieve after the method is invoked. Use {@link 
#getAndRemoveIfReady(ReplicationGroupId)} to check a result.
      *
      * @param lease Lease to negotiate.
      * @param force If the flag is true, the process tries to insist of apply 
the lease.
@@ -88,28 +87,37 @@ public class LeaseNegotiator {
                     if (throwable == null) {
                         assert msg instanceof LeaseGrantedMessageResponse : 
"Message type is unexpected [type="
                                 + msg.getClass().getSimpleName() + ']';
-                    } else if (!(unwrapCause(throwable) instanceof 
NodeStoppingException)) {
-                        LOG.warn("Lease was not negotiated due to exception 
[lease={}]", throwable, lease);
-                    }
 
-                    LeaseGrantedMessageResponse response = 
(LeaseGrantedMessageResponse) msg;
+                        LeaseGrantedMessageResponse response = 
(LeaseGrantedMessageResponse) msg;
 
-                    fut.complete(response);
+                        fut.complete(response);
+                    } else {
+                        if (!(unwrapCause(throwable) instanceof 
NodeStoppingException)) {
+                            LOG.warn("Lease was not negotiated due to 
exception [lease={}]", throwable, lease);
+                        }
 
-                    triggerToRenewLeases();
+                        fut.complete(null);
+                    }
                 });
     }
 
     /**
-     * Gets a lease agreement or {@code null} if the agreement has not formed 
yet.
+     * Gets a lease agreement or {@link LeaseAgreement#UNDEFINED_AGREEMENT} if 
the process of agreement is not started yet. Removes
+     * the agreement from the map if it is ready.
      *
      * @param groupId Replication group id.
      * @return Lease agreement.
      */
-    public LeaseAgreement negotiated(ReplicationGroupId groupId) {
-        LeaseAgreement agreement = leaseToNegotiate.getOrDefault(groupId, 
UNDEFINED_AGREEMENT);
+    public LeaseAgreement getAndRemoveIfReady(ReplicationGroupId groupId) {
+        LeaseAgreement[] res = new LeaseAgreement[1];
+
+        leaseToNegotiate.compute(groupId, (k, v) -> {
+            res[0] = v;
+
+            return v != null && v.ready() ? null : v;
+        });
 
-        return agreement;
+        return res[0] == null ? UNDEFINED_AGREEMENT : res[0];
     }
 
     /**
@@ -120,11 +128,4 @@ public class LeaseNegotiator {
     public void onLeaseRemoved(ReplicationGroupId groupId) {
         leaseToNegotiate.remove(groupId);
     }
-
-    /**
-     * Triggers to renew leases forcibly. The method wakes up the monitor of 
{@link LeaseUpdater}.
-     */
-    private void triggerToRenewLeases() {
-        // TODO: IGNITE-18879 Implement lease maintenance.
-    }
 }
diff --git 
a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseNegotiationTest.java
 
b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseNegotiationTest.java
new file mode 100644
index 0000000000..04466876a2
--- /dev/null
+++ 
b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseNegotiationTest.java
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.placementdriver;
+
+import static java.util.UUID.randomUUID;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static org.apache.ignite.internal.affinity.Assignment.forPeer;
+import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.stablePartAssignmentsKey;
+import static 
org.apache.ignite.internal.placementdriver.PlacementDriverManager.PLACEMENTDRIVER_LEASES_KEY;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedFast;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BiFunction;
+import org.apache.ignite.internal.affinity.Assignments;
+import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
+import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyEventListener;
+import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
+import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.metastorage.Entry;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import 
org.apache.ignite.internal.metastorage.impl.StandaloneMetaStorageManager;
+import org.apache.ignite.internal.network.ClusterService;
+import org.apache.ignite.internal.network.MessagingService;
+import org.apache.ignite.internal.placementdriver.leases.Lease;
+import org.apache.ignite.internal.placementdriver.leases.LeaseBatch;
+import org.apache.ignite.internal.placementdriver.leases.LeaseTracker;
+import org.apache.ignite.internal.placementdriver.message.LeaseGrantedMessage;
+import 
org.apache.ignite.internal.placementdriver.message.LeaseGrantedMessageResponse;
+import 
org.apache.ignite.internal.placementdriver.message.PlacementDriverMessagesFactory;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.network.TopologyService;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Test checking exceptional situations on lease negotiation.
+ */
+public class LeaseNegotiationTest extends BaseIgniteAbstractTest {
+    private static final PlacementDriverMessagesFactory MSG_FACTORY = new 
PlacementDriverMessagesFactory();
+
+    private static final TablePartitionId GROUP_ID = new TablePartitionId(0, 
0);
+
+    private static final String NODE_0_NAME = "node0";
+    private static final LogicalNode CLUSTER_NODE_0 = new 
LogicalNode(randomUUID().toString(), NODE_0_NAME, mock(NetworkAddress.class));
+
+    private static final String NODE_1_NAME = "node1";
+
+    private static final LogicalNode CLUSTER_NODE_1 = new 
LogicalNode(randomUUID().toString(), NODE_1_NAME, mock(NetworkAddress.class));
+
+    private LeaseUpdater leaseUpdater;
+
+    private MetaStorageManager metaStorageManager;
+
+    private ClusterService pdClusterService;
+
+    private MessagingService pdMessagingService;
+
+    private LogicalTopologyService pdLogicalTopologyService;
+
+    private LogicalTopologyEventListener pdLogicalTopologyEventListener;
+
+    private BiFunction<String, LeaseGrantedMessage, 
LeaseGrantedMessageResponse> leaseGrantedMessageHandler;
+
+    @BeforeEach
+    public void setUp() {
+        metaStorageManager = StandaloneMetaStorageManager.create();
+        metaStorageManager.start();
+        metaStorageManager.deployWatches();
+
+        pdLogicalTopologyService = mock(LogicalTopologyService.class);
+        when(pdLogicalTopologyService.logicalTopologyOnLeader())
+                .thenAnswer(inv -> completedFuture(new 
LogicalTopologySnapshot(0, Set.of(CLUSTER_NODE_0, CLUSTER_NODE_1))));
+        doAnswer(inv -> {
+            pdLogicalTopologyEventListener = inv.getArgument(0);
+
+            return null;
+        }).when(pdLogicalTopologyService).addEventListener(any());
+
+        leaseUpdater = createLeaseUpdater();
+
+        leaseUpdater.init();
+
+        leaseUpdater.activate();
+    }
+
+    @AfterEach
+    public void tearDown() {
+        leaseUpdater.deactivate();
+    }
+
+    private LeaseUpdater createLeaseUpdater() {
+        TopologyService pdTopologyService = mock(TopologyService.class);
+        when(pdTopologyService.getById(anyString())).thenAnswer(inv -> 
CLUSTER_NODE_0);
+
+        pdMessagingService = mock(MessagingService.class);
+        when(pdMessagingService.invoke(anyString(), any(), 
anyLong())).thenAnswer(inv -> {
+            String nodeId = inv.getArgument(0);
+
+            LeaseGrantedMessage leaseGrantedMessage = inv.getArgument(1);
+
+            if (leaseGrantedMessageHandler != null) {
+                return CompletableFuture.supplyAsync(() -> 
leaseGrantedMessageHandler.apply(nodeId, leaseGrantedMessage));
+            } else {
+                return 
completedFuture(createLeaseGrantedMessageResponse(true));
+            }
+        });
+
+        pdClusterService = mock(ClusterService.class);
+        when(pdClusterService.messagingService()).thenAnswer(inv -> 
pdMessagingService);
+        when(pdClusterService.topologyService()).thenAnswer(inv -> 
pdTopologyService);
+
+        LeaseTracker leaseTracker = new LeaseTracker(metaStorageManager, 
pdClusterService.topologyService());
+
+        leaseTracker.startTrack(0L);
+
+        return new LeaseUpdater(
+                NODE_0_NAME,
+                pdClusterService,
+                metaStorageManager,
+                pdLogicalTopologyService,
+                leaseTracker,
+                new HybridClockImpl()
+        );
+    }
+
+    private static LeaseGrantedMessageResponse 
createLeaseGrantedMessageResponse(boolean accept) {
+        return 
MSG_FACTORY.leaseGrantedMessageResponse().accepted(accept).build();
+    }
+
+    @Test
+    public void testAssignmentChangeOnNegotiation() throws 
InterruptedException {
+        var lgmReceived = new CompletableFuture<>();
+        var lgmProcessed = new CompletableFuture<>();
+
+        leaseGrantedMessageHandler = (n, lgm) -> {
+            if (n.equals(NODE_0_NAME)) {
+                lgmReceived.complete(null);
+
+                lgmProcessed.join();
+            }
+
+            return createLeaseGrantedMessageResponse(true);
+        };
+
+        metaStorageManager.put(stablePartAssignmentsKey(GROUP_ID), 
Assignments.toBytes(Set.of(forPeer(NODE_0_NAME))));
+
+        assertThat(lgmReceived, willCompleteSuccessfully());
+
+        metaStorageManager.put(stablePartAssignmentsKey(GROUP_ID), 
Assignments.toBytes(Set.of(forPeer(NODE_1_NAME))));
+
+        waitForAcceptedLease();
+
+        assertLeaseCorrect(CLUSTER_NODE_1.id());
+
+        lgmProcessed.complete(null);
+    }
+
+    @Test
+    public void testAssignmentChangeOnNegotiationAndReplicaRejectsLease() 
throws InterruptedException {
+        var lgmReceived = new CompletableFuture<>();
+
+        leaseGrantedMessageHandler = (n, lgm) -> {
+            if (n.equals(NODE_0_NAME) && !lgmReceived.isDone()) {
+                lgmReceived.complete(null);
+
+                return createLeaseGrantedMessageResponse(false);
+            }
+
+            return createLeaseGrantedMessageResponse(true);
+        };
+
+        metaStorageManager.put(stablePartAssignmentsKey(GROUP_ID), 
Assignments.toBytes(Set.of(forPeer(NODE_0_NAME))));
+
+        assertThat(lgmReceived, willCompleteSuccessfully());
+
+        waitForAcceptedLease();
+
+        assertLeaseCorrect(CLUSTER_NODE_0.id());
+    }
+
+    @Test
+    public void testAssignmentChangeOnNegotiationNodeLeftTopology() throws 
InterruptedException {
+        var lgmReceived = new CompletableFuture<>();
+        var lgmProcessed = new CompletableFuture<>();
+
+        leaseGrantedMessageHandler = (n, lgm) -> {
+            if (n.equals(NODE_0_NAME)) {
+                lgmReceived.complete(null);
+
+                lgmProcessed.join();
+            }
+
+            return createLeaseGrantedMessageResponse(true);
+        };
+
+        metaStorageManager.put(stablePartAssignmentsKey(GROUP_ID), 
Assignments.toBytes(Set.of(forPeer(NODE_0_NAME), forPeer(NODE_1_NAME))));
+
+        assertThat(lgmReceived, willCompleteSuccessfully());
+
+        pdLogicalTopologyEventListener.onNodeLeft(CLUSTER_NODE_0, new 
LogicalTopologySnapshot(1L, Set.of(CLUSTER_NODE_1)));
+
+        waitForAcceptedLease();
+
+        assertLeaseCorrect(CLUSTER_NODE_1.id());
+
+        lgmProcessed.complete(null);
+    }
+
+    @Test
+    public void testNetworkExceptionOnNegotiation() throws 
InterruptedException {
+        var lgmReceived = new CompletableFuture<>();
+
+        leaseGrantedMessageHandler = (n, lgm) -> {
+            if (!lgmReceived.isDone()) {
+                lgmReceived.complete(null);
+
+                throw new RuntimeException("test");
+            }
+
+            return createLeaseGrantedMessageResponse(true);
+        };
+
+        metaStorageManager.put(stablePartAssignmentsKey(GROUP_ID), 
Assignments.toBytes(Set.of(forPeer(NODE_0_NAME))));
+
+        assertThat(lgmReceived, willCompleteSuccessfully());
+
+        waitForAcceptedLease();
+
+        assertLeaseCorrect(CLUSTER_NODE_0.id());
+    }
+
+    private Lease getLeaseFromMs() {
+        CompletableFuture<Entry> f = 
metaStorageManager.get(PLACEMENTDRIVER_LEASES_KEY);
+
+        assertThat(f, willSucceedFast());
+
+        Entry e = f.join();
+
+        LeaseBatch leases = 
LeaseBatch.fromBytes(ByteBuffer.wrap(e.value()).order(ByteOrder.LITTLE_ENDIAN));
+
+        return leases.leases().stream().findFirst().orElseThrow();
+    }
+
+    private void waitForAcceptedLease() throws InterruptedException {
+        assertTrue(waitForCondition(() -> {
+            Lease lease = getLeaseFromMs();
+
+            return lease.isAccepted();
+        }, 10_000));
+    }
+
+    private void assertLeaseCorrect(String leaseholderId) {
+        Lease lease = getLeaseFromMs();
+
+        assertTrue(lease.isAccepted());
+        assertEquals(leaseholderId, lease.getLeaseholderId());
+    }
+}
diff --git 
a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseTrackerTest.java
 
b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseTrackerTest.java
index 8304f787ee..34bdb6cf42 100644
--- 
a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseTrackerTest.java
+++ 
b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseTrackerTest.java
@@ -62,7 +62,7 @@ public class LeaseTrackerTest extends BaseIgniteAbstractTest {
                     listenerRef.set(lsnr);
                     return null;
                 }
-        ).when(msManager).registerPrefixWatch(any(), any());
+        ).when(msManager).registerExactWatch(any(), any());
 
         Entry emptyEntry = EntryImpl.empty(PLACEMENTDRIVER_LEASES_KEY.bytes());
 
diff --git 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/Replica.java
 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/Replica.java
index ad70129007..cb51d47cd0 100644
--- 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/Replica.java
+++ 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/Replica.java
@@ -182,7 +182,7 @@ public class Replica {
      * @param msg Message to process.
      * @return Future that contains a result.
      */
-    public CompletableFuture<LeaseGrantedMessageResponse> 
processLeaseGrantedMessage(LeaseGrantedMessage msg) {
+    private CompletableFuture<LeaseGrantedMessageResponse> 
processLeaseGrantedMessage(LeaseGrantedMessage msg) {
         LOG.info("Received LeaseGrantedMessage for replica belonging to 
group=" + groupId() + ", force=" + msg.force());
 
         return 
placementDriver.previousPrimaryExpired(groupId()).thenCompose(unused -> 
leaderFuture().thenCompose(leader -> {

Reply via email to