This is an automated email from the ASF dual-hosted git repository.
vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new a2cdcda807 IGNITE-21348 Trigger the lease negotiation retry in case
when the lease candidate is no more contained in assignments (#3401)
a2cdcda807 is described below
commit a2cdcda807d509739ae97a1076396c99023aa7a7
Author: Denis Chudov <[email protected]>
AuthorDate: Fri Mar 15 15:51:36 2024 +0300
IGNITE-21348 Trigger the lease negotiation retry in case when the lease
candidate is no more contained in assignments (#3401)
---
.../placementdriver/AssignmentsTracker.java | 28 +-
.../internal/placementdriver/LeaseUpdater.java | 12 +-
.../internal/placementdriver/TopologyTracker.java | 13 +-
.../placementdriver/leases/LeaseTracker.java | 2 +-
.../negotiation/LeaseAgreement.java | 53 +++-
.../negotiation/LeaseNegotiator.java | 41 +--
.../placementdriver/LeaseNegotiationTest.java | 291 +++++++++++++++++++++
.../internal/placementdriver/LeaseTrackerTest.java | 2 +-
.../apache/ignite/internal/replicator/Replica.java | 2 +-
9 files changed, 382 insertions(+), 62 deletions(-)
diff --git
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/AssignmentsTracker.java
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/AssignmentsTracker.java
index 9783715fab..11e4da4de6 100644
---
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/AssignmentsTracker.java
+++
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/AssignmentsTracker.java
@@ -39,7 +39,6 @@ import org.apache.ignite.internal.metastorage.WatchEvent;
import org.apache.ignite.internal.metastorage.WatchListener;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.replicator.TablePartitionId;
-import org.apache.ignite.internal.util.CollectionUtils;
import org.apache.ignite.internal.util.Cursor;
/**
@@ -142,28 +141,20 @@ public class AssignmentsTracker {
.collect(Collectors.joining(",")));
}
- boolean leaseRenewalRequired = false;
-
for (EntryEvent evt : event.entryEvents()) {
+ Entry entry = evt.newEntry();
+
var replicationGrpId = TablePartitionId.fromString(
- new String(evt.newEntry().key(),
StandardCharsets.UTF_8).replace(STABLE_ASSIGNMENTS_PREFIX, ""));
+ new String(entry.key(),
StandardCharsets.UTF_8).replace(STABLE_ASSIGNMENTS_PREFIX, ""));
- if (evt.newEntry().tombstone()) {
+ if (entry.tombstone()) {
groupAssignments.remove(replicationGrpId);
} else {
- Set<Assignment> newAssignments =
Assignments.fromBytes(evt.newEntry().value()).nodes();
- Set<Assignment> prevAssignment =
groupAssignments.put(replicationGrpId, newAssignments);
-
- if (CollectionUtils.nullOrEmpty(prevAssignment)) {
- leaseRenewalRequired = true;
- }
+ Set<Assignment> newAssignments =
Assignments.fromBytes(entry.value()).nodes();
+ groupAssignments.put(replicationGrpId, newAssignments);
}
}
- if (leaseRenewalRequired) {
- triggerToRenewLeases();
- }
-
return nullCompletedFuture();
}
@@ -171,11 +162,4 @@ public class AssignmentsTracker {
public void onError(Throwable e) {
}
}
-
- /**
- * Triggers to renew leases forcibly. The method wakes up the monitor of
{@link LeaseUpdater}.
- */
- private void triggerToRenewLeases() {
- // TODO: IGNITE-18879 Implement lease maintenance.
- }
}
diff --git
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java
index d77e4304d9..59407a49f3 100644
---
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java
+++
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java
@@ -340,6 +340,7 @@ public class LeaseUpdater {
for (Map.Entry<ReplicationGroupId, Set<Assignment>> entry :
currentAssignments.entrySet()) {
ReplicationGroupId grpId = entry.getKey();
+ Set<Assignment> assignments = entry.getValue();
Lease lease = leaseTracker.getLease(grpId);
@@ -348,7 +349,9 @@ public class LeaseUpdater {
}
if (!lease.isAccepted()) {
- LeaseAgreement agreement =
leaseNegotiator.negotiated(grpId);
+ LeaseAgreement agreement =
leaseNegotiator.getAndRemoveIfReady(grpId);
+
+ agreement.checkValid(grpId,
topologyTracker.currentTopologySnapshot(), assignments);
if (agreement.isAccepted()) {
publishLease(grpId, lease, renewedLeases);
@@ -356,7 +359,7 @@ public class LeaseUpdater {
continue;
} else if (agreement.ready()) {
// Here we initiate negotiations for
UNDEFINED_AGREEMENT and retry them on newly started active actor as well.
- ClusterNode candidate =
nextLeaseHolder(entry.getValue(), agreement.getRedirectTo());
+ ClusterNode candidate = nextLeaseHolder(assignments,
agreement.getRedirectTo());
if (candidate == null) {
leaseUpdateStatistics.onLeaseWithoutCandidate();
@@ -373,10 +376,7 @@ public class LeaseUpdater {
// The lease is expired or close to this.
if (lease.getExpirationTime().getPhysical() <
outdatedLeaseThreshold) {
- ClusterNode candidate = nextLeaseHolder(
- entry.getValue(),
- lease.isProlongable() ? lease.getLeaseholder() :
null
- );
+ ClusterNode candidate = nextLeaseHolder(assignments,
lease.isProlongable() ? lease.getLeaseholder() : null);
if (candidate == null) {
leaseUpdateStatistics.onLeaseWithoutCandidate();
diff --git
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/TopologyTracker.java
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/TopologyTracker.java
index 15a7e48ad2..d8d3908bbb 100644
---
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/TopologyTracker.java
+++
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/TopologyTracker.java
@@ -106,6 +106,10 @@ public class TopologyTracker {
return null;
}
+ LogicalTopologySnapshot currentTopologySnapshot() {
+ return topologySnapRef.get();
+ }
+
/**
* Topology listener.
*/
@@ -142,15 +146,6 @@ public class TopologyTracker {
} while (!topologySnapRef.compareAndSet(logicalTopologySnap0,
topologySnap));
LOG.debug("Logical topology updated for placement driver
[topologySnap={}]", topologySnap);
-
- triggerToRenewLeases();
}
}
-
- /**
- * Triggers to renew leases forcibly. The method wakes up the monitor of
{@link LeaseUpdater}.
- */
- private void triggerToRenewLeases() {
- // TODO: IGNITE-18879 Implement lease maintenance.
- }
}
diff --git
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java
index 1ba16aec7e..62154f2fb1 100644
---
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java
+++
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java
@@ -116,7 +116,7 @@ public class LeaseTracker extends
AbstractEventProducer<PrimaryReplicaEvent, Pri
*/
public void startTrack(long recoveryRevision) {
inBusyLock(busyLock, () -> {
- msManager.registerPrefixWatch(PLACEMENTDRIVER_LEASES_KEY,
updateListener);
+ msManager.registerExactWatch(PLACEMENTDRIVER_LEASES_KEY,
updateListener);
loadLeasesBusyAsync(recoveryRevision);
});
diff --git
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/negotiation/LeaseAgreement.java
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/negotiation/LeaseAgreement.java
index 3033777efb..61fc719798 100644
---
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/negotiation/LeaseAgreement.java
+++
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/negotiation/LeaseAgreement.java
@@ -17,16 +17,30 @@
package org.apache.ignite.internal.placementdriver.negotiation;
+import static java.util.Objects.requireNonNull;
+import static java.util.stream.Collectors.toSet;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static org.apache.ignite.internal.util.IgniteUtils.findAny;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.affinity.Assignment;
+import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
+import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.placementdriver.leases.Lease;
import
org.apache.ignite.internal.placementdriver.message.LeaseGrantedMessageResponse;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.jetbrains.annotations.Nullable;
/**
* The agreement is formed from {@link LeaseGrantedMessageResponse}.
*/
public class LeaseAgreement {
+ /** The logger. */
+ private static final IgniteLogger LOG =
Loggers.forClass(LeaseAgreement.class);
+
/**
* The agreement, which has not try negotiating yet. We assume that it is
{@link #ready()} and not {@link #isAccepted()}
* which allows both initiation and retries of negotiation.
@@ -47,7 +61,7 @@ public class LeaseAgreement {
*/
public LeaseAgreement(Lease lease,
CompletableFuture<LeaseGrantedMessageResponse> remoteNodeResponseFuture) {
this.lease = lease;
- this.responseFut = remoteNodeResponseFuture;
+ this.responseFut = requireNonNull(remoteNodeResponseFuture);
}
/**
@@ -84,6 +98,7 @@ public class LeaseAgreement {
*
* @return Node id to propose a lease.
*/
+ @Nullable
public String getRedirectTo() {
assert ready() : "The method should be invoked only after the
agreement is ready";
@@ -98,6 +113,40 @@ public class LeaseAgreement {
* @return True if a response of the agreement has been received, false
otherwise.
*/
public boolean ready() {
- return responseFut != null && responseFut.isDone();
+ return responseFut.isDone();
+ }
+
+ /**
+ * Check the validity of the agreement in the current logical topology and
group assignments. If the suggested leaseholder
+ * has left topology or not included into the current assignments, the
agreement is broken.
+ *
+ * @param groupId Group id.
+ * @param currentTopologySnapshot Current topology snapshot.
+ * @param assignments Assignments.
+ */
+ public void checkValid(
+ ReplicationGroupId groupId,
+ @Nullable LogicalTopologySnapshot currentTopologySnapshot,
+ Set<Assignment> assignments
+ ) {
+ if (ready()) {
+ return;
+ }
+
+ if (findAny(assignments, a ->
a.consistentId().equals(lease.getLeaseholder())).isEmpty()) {
+ LOG.info("Lease was not negotiated because the node is not
included into the group assignments anymore [node={}, group={}, "
+ + "assignments={}].", lease.getLeaseholder(), lease,
assignments);
+
+ responseFut.complete(null);
+ } else if (currentTopologySnapshot != null) {
+ Set<String> nodeIds =
currentTopologySnapshot.nodes().stream().map(LogicalNode::id).collect(toSet());
+
+ if (!nodeIds.contains(lease.getLeaseholderId())) {
+ LOG.info("Lease was not negotiated because the node has left
the logical topology [node={}, nodeId={}, group={}]",
+ lease.getLeaseholder(), lease.getLeaseholderId(),
groupId);
+
+ responseFut.complete(null);
+ }
+ }
}
}
diff --git
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/negotiation/LeaseNegotiator.java
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/negotiation/LeaseNegotiator.java
index fecf46563f..ca3b719888 100644
---
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/negotiation/LeaseNegotiator.java
+++
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/negotiation/LeaseNegotiator.java
@@ -27,7 +27,6 @@ import org.apache.ignite.internal.lang.NodeStoppingException;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.network.ClusterService;
-import org.apache.ignite.internal.placementdriver.LeaseUpdater;
import org.apache.ignite.internal.placementdriver.leases.Lease;
import
org.apache.ignite.internal.placementdriver.message.LeaseGrantedMessageResponse;
import
org.apache.ignite.internal.placementdriver.message.PlacementDriverMessagesFactory;
@@ -42,7 +41,7 @@ public class LeaseNegotiator {
private static final PlacementDriverMessagesFactory
PLACEMENT_DRIVER_MESSAGES_FACTORY = new PlacementDriverMessagesFactory();
- /** Leases ready to accept. */
+ /** Lease agreements which are in progress of negotiation. */
private final Map<ReplicationGroupId, LeaseAgreement> leaseToNegotiate;
/** Cluster service. */
@@ -61,7 +60,7 @@ public class LeaseNegotiator {
/**
* Tries negotiating a lease with its leaseholder.
- * The negotiation will achieve after the method is invoked. Use {@link
#negotiated(ReplicationGroupId)} to check a result.
+ * The negotiation will achieve after the method is invoked. Use {@link
#getAndRemoveIfReady(ReplicationGroupId)} to check a result.
*
* @param lease Lease to negotiate.
* @param force If the flag is true, the process tries to insist of apply
the lease.
@@ -88,28 +87,37 @@ public class LeaseNegotiator {
if (throwable == null) {
assert msg instanceof LeaseGrantedMessageResponse :
"Message type is unexpected [type="
+ msg.getClass().getSimpleName() + ']';
- } else if (!(unwrapCause(throwable) instanceof
NodeStoppingException)) {
- LOG.warn("Lease was not negotiated due to exception
[lease={}]", throwable, lease);
- }
- LeaseGrantedMessageResponse response =
(LeaseGrantedMessageResponse) msg;
+ LeaseGrantedMessageResponse response =
(LeaseGrantedMessageResponse) msg;
- fut.complete(response);
+ fut.complete(response);
+ } else {
+ if (!(unwrapCause(throwable) instanceof
NodeStoppingException)) {
+ LOG.warn("Lease was not negotiated due to
exception [lease={}]", throwable, lease);
+ }
- triggerToRenewLeases();
+ fut.complete(null);
+ }
});
}
/**
- * Gets a lease agreement or {@code null} if the agreement has not formed
yet.
+ * Gets a lease agreement or {@link LeaseAgreement#UNDEFINED_AGREEMENT} if
the process of agreement is not started yet. Removes
+ * the agreement from the map if it is ready.
*
* @param groupId Replication group id.
* @return Lease agreement.
*/
- public LeaseAgreement negotiated(ReplicationGroupId groupId) {
- LeaseAgreement agreement = leaseToNegotiate.getOrDefault(groupId,
UNDEFINED_AGREEMENT);
+ public LeaseAgreement getAndRemoveIfReady(ReplicationGroupId groupId) {
+ LeaseAgreement[] res = new LeaseAgreement[1];
+
+ leaseToNegotiate.compute(groupId, (k, v) -> {
+ res[0] = v;
+
+ return v != null && v.ready() ? null : v;
+ });
- return agreement;
+ return res[0] == null ? UNDEFINED_AGREEMENT : res[0];
}
/**
@@ -120,11 +128,4 @@ public class LeaseNegotiator {
public void onLeaseRemoved(ReplicationGroupId groupId) {
leaseToNegotiate.remove(groupId);
}
-
- /**
- * Triggers to renew leases forcibly. The method wakes up the monitor of
{@link LeaseUpdater}.
- */
- private void triggerToRenewLeases() {
- // TODO: IGNITE-18879 Implement lease maintenance.
- }
}
diff --git
a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseNegotiationTest.java
b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseNegotiationTest.java
new file mode 100644
index 0000000000..04466876a2
--- /dev/null
+++
b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseNegotiationTest.java
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.placementdriver;
+
+import static java.util.UUID.randomUUID;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static org.apache.ignite.internal.affinity.Assignment.forPeer;
+import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.stablePartAssignmentsKey;
+import static
org.apache.ignite.internal.placementdriver.PlacementDriverManager.PLACEMENTDRIVER_LEASES_KEY;
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedFast;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BiFunction;
+import org.apache.ignite.internal.affinity.Assignments;
+import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
+import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyEventListener;
+import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
+import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.metastorage.Entry;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import
org.apache.ignite.internal.metastorage.impl.StandaloneMetaStorageManager;
+import org.apache.ignite.internal.network.ClusterService;
+import org.apache.ignite.internal.network.MessagingService;
+import org.apache.ignite.internal.placementdriver.leases.Lease;
+import org.apache.ignite.internal.placementdriver.leases.LeaseBatch;
+import org.apache.ignite.internal.placementdriver.leases.LeaseTracker;
+import org.apache.ignite.internal.placementdriver.message.LeaseGrantedMessage;
+import
org.apache.ignite.internal.placementdriver.message.LeaseGrantedMessageResponse;
+import
org.apache.ignite.internal.placementdriver.message.PlacementDriverMessagesFactory;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.network.TopologyService;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Test checking exceptional situations on lease negotiation.
+ */
+public class LeaseNegotiationTest extends BaseIgniteAbstractTest {
+ private static final PlacementDriverMessagesFactory MSG_FACTORY = new
PlacementDriverMessagesFactory();
+
+ private static final TablePartitionId GROUP_ID = new TablePartitionId(0,
0);
+
+ private static final String NODE_0_NAME = "node0";
+ private static final LogicalNode CLUSTER_NODE_0 = new
LogicalNode(randomUUID().toString(), NODE_0_NAME, mock(NetworkAddress.class));
+
+ private static final String NODE_1_NAME = "node1";
+
+ private static final LogicalNode CLUSTER_NODE_1 = new
LogicalNode(randomUUID().toString(), NODE_1_NAME, mock(NetworkAddress.class));
+
+ private LeaseUpdater leaseUpdater;
+
+ private MetaStorageManager metaStorageManager;
+
+ private ClusterService pdClusterService;
+
+ private MessagingService pdMessagingService;
+
+ private LogicalTopologyService pdLogicalTopologyService;
+
+ private LogicalTopologyEventListener pdLogicalTopologyEventListener;
+
+ private BiFunction<String, LeaseGrantedMessage,
LeaseGrantedMessageResponse> leaseGrantedMessageHandler;
+
+ @BeforeEach
+ public void setUp() {
+ metaStorageManager = StandaloneMetaStorageManager.create();
+ metaStorageManager.start();
+ metaStorageManager.deployWatches();
+
+ pdLogicalTopologyService = mock(LogicalTopologyService.class);
+ when(pdLogicalTopologyService.logicalTopologyOnLeader())
+ .thenAnswer(inv -> completedFuture(new
LogicalTopologySnapshot(0, Set.of(CLUSTER_NODE_0, CLUSTER_NODE_1))));
+ doAnswer(inv -> {
+ pdLogicalTopologyEventListener = inv.getArgument(0);
+
+ return null;
+ }).when(pdLogicalTopologyService).addEventListener(any());
+
+ leaseUpdater = createLeaseUpdater();
+
+ leaseUpdater.init();
+
+ leaseUpdater.activate();
+ }
+
+ @AfterEach
+ public void tearDown() {
+ leaseUpdater.deactivate();
+ }
+
+ private LeaseUpdater createLeaseUpdater() {
+ TopologyService pdTopologyService = mock(TopologyService.class);
+ when(pdTopologyService.getById(anyString())).thenAnswer(inv ->
CLUSTER_NODE_0);
+
+ pdMessagingService = mock(MessagingService.class);
+ when(pdMessagingService.invoke(anyString(), any(),
anyLong())).thenAnswer(inv -> {
+ String nodeId = inv.getArgument(0);
+
+ LeaseGrantedMessage leaseGrantedMessage = inv.getArgument(1);
+
+ if (leaseGrantedMessageHandler != null) {
+ return CompletableFuture.supplyAsync(() ->
leaseGrantedMessageHandler.apply(nodeId, leaseGrantedMessage));
+ } else {
+ return
completedFuture(createLeaseGrantedMessageResponse(true));
+ }
+ });
+
+ pdClusterService = mock(ClusterService.class);
+ when(pdClusterService.messagingService()).thenAnswer(inv ->
pdMessagingService);
+ when(pdClusterService.topologyService()).thenAnswer(inv ->
pdTopologyService);
+
+ LeaseTracker leaseTracker = new LeaseTracker(metaStorageManager,
pdClusterService.topologyService());
+
+ leaseTracker.startTrack(0L);
+
+ return new LeaseUpdater(
+ NODE_0_NAME,
+ pdClusterService,
+ metaStorageManager,
+ pdLogicalTopologyService,
+ leaseTracker,
+ new HybridClockImpl()
+ );
+ }
+
+ private static LeaseGrantedMessageResponse
createLeaseGrantedMessageResponse(boolean accept) {
+ return
MSG_FACTORY.leaseGrantedMessageResponse().accepted(accept).build();
+ }
+
+ @Test
+ public void testAssignmentChangeOnNegotiation() throws
InterruptedException {
+ var lgmReceived = new CompletableFuture<>();
+ var lgmProcessed = new CompletableFuture<>();
+
+ leaseGrantedMessageHandler = (n, lgm) -> {
+ if (n.equals(NODE_0_NAME)) {
+ lgmReceived.complete(null);
+
+ lgmProcessed.join();
+ }
+
+ return createLeaseGrantedMessageResponse(true);
+ };
+
+ metaStorageManager.put(stablePartAssignmentsKey(GROUP_ID),
Assignments.toBytes(Set.of(forPeer(NODE_0_NAME))));
+
+ assertThat(lgmReceived, willCompleteSuccessfully());
+
+ metaStorageManager.put(stablePartAssignmentsKey(GROUP_ID),
Assignments.toBytes(Set.of(forPeer(NODE_1_NAME))));
+
+ waitForAcceptedLease();
+
+ assertLeaseCorrect(CLUSTER_NODE_1.id());
+
+ lgmProcessed.complete(null);
+ }
+
+ @Test
+ public void testAssignmentChangeOnNegotiationAndReplicaRejectsLease()
throws InterruptedException {
+ var lgmReceived = new CompletableFuture<>();
+
+ leaseGrantedMessageHandler = (n, lgm) -> {
+ if (n.equals(NODE_0_NAME) && !lgmReceived.isDone()) {
+ lgmReceived.complete(null);
+
+ return createLeaseGrantedMessageResponse(false);
+ }
+
+ return createLeaseGrantedMessageResponse(true);
+ };
+
+ metaStorageManager.put(stablePartAssignmentsKey(GROUP_ID),
Assignments.toBytes(Set.of(forPeer(NODE_0_NAME))));
+
+ assertThat(lgmReceived, willCompleteSuccessfully());
+
+ waitForAcceptedLease();
+
+ assertLeaseCorrect(CLUSTER_NODE_0.id());
+ }
+
+ @Test
+ public void testAssignmentChangeOnNegotiationNodeLeftTopology() throws
InterruptedException {
+ var lgmReceived = new CompletableFuture<>();
+ var lgmProcessed = new CompletableFuture<>();
+
+ leaseGrantedMessageHandler = (n, lgm) -> {
+ if (n.equals(NODE_0_NAME)) {
+ lgmReceived.complete(null);
+
+ lgmProcessed.join();
+ }
+
+ return createLeaseGrantedMessageResponse(true);
+ };
+
+ metaStorageManager.put(stablePartAssignmentsKey(GROUP_ID),
Assignments.toBytes(Set.of(forPeer(NODE_0_NAME), forPeer(NODE_1_NAME))));
+
+ assertThat(lgmReceived, willCompleteSuccessfully());
+
+ pdLogicalTopologyEventListener.onNodeLeft(CLUSTER_NODE_0, new
LogicalTopologySnapshot(1L, Set.of(CLUSTER_NODE_1)));
+
+ waitForAcceptedLease();
+
+ assertLeaseCorrect(CLUSTER_NODE_1.id());
+
+ lgmProcessed.complete(null);
+ }
+
+ @Test
+ public void testNetworkExceptionOnNegotiation() throws
InterruptedException {
+ var lgmReceived = new CompletableFuture<>();
+
+ leaseGrantedMessageHandler = (n, lgm) -> {
+ if (!lgmReceived.isDone()) {
+ lgmReceived.complete(null);
+
+ throw new RuntimeException("test");
+ }
+
+ return createLeaseGrantedMessageResponse(true);
+ };
+
+ metaStorageManager.put(stablePartAssignmentsKey(GROUP_ID),
Assignments.toBytes(Set.of(forPeer(NODE_0_NAME))));
+
+ assertThat(lgmReceived, willCompleteSuccessfully());
+
+ waitForAcceptedLease();
+
+ assertLeaseCorrect(CLUSTER_NODE_0.id());
+ }
+
+ private Lease getLeaseFromMs() {
+ CompletableFuture<Entry> f =
metaStorageManager.get(PLACEMENTDRIVER_LEASES_KEY);
+
+ assertThat(f, willSucceedFast());
+
+ Entry e = f.join();
+
+ LeaseBatch leases =
LeaseBatch.fromBytes(ByteBuffer.wrap(e.value()).order(ByteOrder.LITTLE_ENDIAN));
+
+ return leases.leases().stream().findFirst().orElseThrow();
+ }
+
+ private void waitForAcceptedLease() throws InterruptedException {
+ assertTrue(waitForCondition(() -> {
+ Lease lease = getLeaseFromMs();
+
+ return lease.isAccepted();
+ }, 10_000));
+ }
+
+ private void assertLeaseCorrect(String leaseholderId) {
+ Lease lease = getLeaseFromMs();
+
+ assertTrue(lease.isAccepted());
+ assertEquals(leaseholderId, lease.getLeaseholderId());
+ }
+}
diff --git
a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseTrackerTest.java
b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseTrackerTest.java
index 8304f787ee..34bdb6cf42 100644
---
a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseTrackerTest.java
+++
b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseTrackerTest.java
@@ -62,7 +62,7 @@ public class LeaseTrackerTest extends BaseIgniteAbstractTest {
listenerRef.set(lsnr);
return null;
}
- ).when(msManager).registerPrefixWatch(any(), any());
+ ).when(msManager).registerExactWatch(any(), any());
Entry emptyEntry = EntryImpl.empty(PLACEMENTDRIVER_LEASES_KEY.bytes());
diff --git
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/Replica.java
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/Replica.java
index ad70129007..cb51d47cd0 100644
---
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/Replica.java
+++
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/Replica.java
@@ -182,7 +182,7 @@ public class Replica {
* @param msg Message to process.
* @return Future that contains a result.
*/
- public CompletableFuture<LeaseGrantedMessageResponse>
processLeaseGrantedMessage(LeaseGrantedMessage msg) {
+ private CompletableFuture<LeaseGrantedMessageResponse>
processLeaseGrantedMessage(LeaseGrantedMessage msg) {
LOG.info("Received LeaseGrantedMessage for replica belonging to
group=" + groupId() + ", force=" + msg.force());
return
placementDriver.previousPrimaryExpired(groupId()).thenCompose(unused ->
leaderFuture().thenCompose(leader -> {