This is an automated email from the ASF dual-hosted git repository.
vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 2fa6e079d7f IGNITE-23246 Fixed leaking LeaseAgreements (#5779)
2fa6e079d7f is described below
commit 2fa6e079d7f27dbbaf8ad7c5566126ce02bf8be2
Author: Denis Chudov <[email protected]>
AuthorDate: Fri May 9 00:09:17 2025 +0300
IGNITE-23246 Fixed leaking LeaseAgreements (#5779)
---
.../internal/placementdriver/LeaseUpdater.java | 5 ++
.../placementdriver/PlacementDriverManager.java | 8 ---
.../negotiation/LeaseNegotiator.java | 2 +
.../placementdriver/LeaseNegotiationTest.java | 60 +++++++++++++++++++++-
4 files changed, 65 insertions(+), 10 deletions(-)
diff --git
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java
index dd14fb37a3e..e511799e7e4 100644
---
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java
+++
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java
@@ -199,6 +199,8 @@ public class LeaseUpdater {
return;
}
+ LOG.info("Placement driver active actor is starting.");
+
leaseNegotiator = new LeaseNegotiator(clusterService);
updaterThread = new IgniteThread(nodeName, "lease-updater",
updater);
@@ -222,6 +224,8 @@ public class LeaseUpdater {
return;
}
+ LOG.info("Placement driver active actor is stopping.");
+
leaseNegotiator = null;
updaterThread.interrupt();
@@ -574,6 +578,7 @@ public class LeaseUpdater {
if (clockService.before(lease.getExpirationTime(), currentTime)
&&
!groupsAmongCurrentStableAndPendingAssignments.contains(groupId)) {
iter.remove();
+ leaseNegotiator.cancelAgreement(groupId);
} else if (prolongableLeaseGroupIds.contains(groupId)) {
entry.setValue(prolongLease(lease,
newExpirationTimestamp));
}
diff --git
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java
index 12556b1de53..7218e66869d 100644
---
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java
+++
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java
@@ -36,8 +36,6 @@ import org.apache.ignite.internal.hlc.ClockService;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.lang.ByteArray;
import org.apache.ignite.internal.lang.NodeStoppingException;
-import org.apache.ignite.internal.logger.IgniteLogger;
-import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.manager.ComponentContext;
import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
@@ -64,8 +62,6 @@ import org.jetbrains.annotations.TestOnly;
* The another role of the manager is providing a node, which is leaseholder
at the moment, for a particular replication group.
*/
public class PlacementDriverManager implements IgniteComponent {
- private static final IgniteLogger LOG =
Loggers.forClass(PlacementDriverManager.class);
-
private static final String PLACEMENTDRIVER_LEASES_KEY_STRING =
"placementdriver.leases";
public static final ByteArray PLACEMENTDRIVER_LEASES_KEY =
ByteArray.fromString(PLACEMENTDRIVER_LEASES_KEY_STRING);
@@ -256,15 +252,11 @@ public class PlacementDriverManager implements
IgniteComponent {
/** Takes over active actor of placement driver group. */
private void takeOverActiveActorBusy() {
- LOG.info("Placement driver active actor is starting.");
-
leaseUpdater.activate();
}
/** Steps down as active actor. */
private void stepDownActiveActorBusy() {
- LOG.info("Placement driver active actor is stopping.");
-
leaseUpdater.deactivate();
}
diff --git
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/negotiation/LeaseNegotiator.java
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/negotiation/LeaseNegotiator.java
index 6577c1b439f..c11d3e57510 100644
---
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/negotiation/LeaseNegotiator.java
+++
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/negotiation/LeaseNegotiator.java
@@ -90,6 +90,8 @@ public class LeaseNegotiator {
LOG.warn("Lease was not negotiated due to
exception [lease={}]", throwable, lease);
}
+ leaseToNegotiate.remove(agreement.groupId(),
agreement);
+
agreement.cancel();
}
});
diff --git
a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseNegotiationTest.java
b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseNegotiationTest.java
index 79934746ae3..3380011178a 100644
---
a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseNegotiationTest.java
+++
b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseNegotiationTest.java
@@ -37,12 +37,15 @@ import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+import java.lang.reflect.Field;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.Collection;
+import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiFunction;
@@ -73,6 +76,7 @@ import
org.apache.ignite.internal.placementdriver.leases.LeaseTracker;
import org.apache.ignite.internal.placementdriver.message.LeaseGrantedMessage;
import
org.apache.ignite.internal.placementdriver.message.LeaseGrantedMessageResponse;
import
org.apache.ignite.internal.placementdriver.message.PlacementDriverMessagesFactory;
+import org.apache.ignite.internal.placementdriver.negotiation.LeaseNegotiator;
import org.apache.ignite.internal.replicator.PartitionGroupId;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.replicator.ZonePartitionId;
@@ -121,7 +125,7 @@ public class LeaseNegotiationTest extends
BaseIgniteAbstractTest {
private final long assignmentsTimestamp = new HybridTimestamp(0,
1).longValue();
- @InjectConfiguration
+ @InjectConfiguration("mock.leaseAgreementAcceptanceTimeLimitMillis = 2000")
private ReplicationConfiguration replicationConfiguration;
private PartitionGroupId replicationGroupId(int objectId, int partId) {
@@ -166,12 +170,14 @@ public class LeaseNegotiationTest extends
BaseIgniteAbstractTest {
pdMessagingService = mock(MessagingService.class);
when(pdMessagingService.invoke(anyString(), any(),
anyLong())).thenAnswer(inv -> {
String nodeId = inv.getArgument(0);
+ long timeout = inv.getArgument(2);
LeaseGrantedMessage leaseGrantedMessage = inv.getArgument(1);
if (leaseGrantedMessageHandler != null) {
return CompletableFuture.supplyAsync(() -> null)
- .thenCompose(unused ->
leaseGrantedMessageHandler.apply(nodeId, leaseGrantedMessage));
+ .thenCompose(unused ->
leaseGrantedMessageHandler.apply(nodeId, leaseGrantedMessage))
+ .orTimeout(timeout, TimeUnit.MILLISECONDS);
} else {
return
completedFuture(createLeaseGrantedMessageResponse(true));
}
@@ -414,6 +420,50 @@ public class LeaseNegotiationTest extends
BaseIgniteAbstractTest {
assertTrue(waitForCondition(() -> getAllLeasesFromMs().isEmpty(),
20_000));
}
+ @Test
+ public void testLeaseAgreementCleanup() throws Exception {
+ CompletableFuture<?> timedOutGroupLgmReceived = new
CompletableFuture<>();
+ CompletableFuture<?> removedGroupLgmReceived = new
CompletableFuture<>();
+
+ PartitionGroupId timedOutGroup = replicationGroupId(1, 1);
+ PartitionGroupId removedGroup = replicationGroupId(1, 2);
+ byte[] assignmentBytes =
Assignments.toBytes(Set.of(forPeer(NODE_0_NAME)), assignmentsTimestamp);
+
+ leaseGrantedMessageHandler = (n, lgm) -> {
+ if (lgm.groupId().equals(groupId)) {
+ return
completedFuture(createLeaseGrantedMessageResponse(true));
+ } else if (lgm.groupId().equals(timedOutGroup)) {
+ timedOutGroupLgmReceived.complete(null);
+
+ // Return a future that will never be completed, to trigger
the agreement timeout.
+ return new CompletableFuture<>();
+ } else {
+ removedGroupLgmReceived.complete(null);
+ return new CompletableFuture<>();
+ }
+ };
+
+ metaStorageManager.put(stableAssignmentsKey(groupId), assignmentBytes);
+ metaStorageManager.put(stableAssignmentsKey(timedOutGroup),
assignmentBytes);
+ metaStorageManager.put(stableAssignmentsKey(removedGroup),
assignmentBytes);
+
+ // Wait for accepted lease for groupId.
+ assertTrue(waitForCondition(
+ () -> getAllLeasesFromMs().stream().anyMatch(l ->
l.replicationGroupId().equals(groupId) && l.isAccepted()),
+ 5000
+ ));
+
+ assertThat(timedOutGroupLgmReceived, willSucceedFast());
+ assertThat(removedGroupLgmReceived, willSucceedFast());
+
+ metaStorageManager.remove(stableAssignmentsKey(removedGroup));
+
+ LeaseNegotiator leaseNegotiator = getFieldValue(leaseUpdater,
"leaseNegotiator");
+ Map agreementsMap = getFieldValue(leaseNegotiator, "leaseToNegotiate");
+
+ assertTrue(waitForCondition(() -> agreementsMap.isEmpty(), 10_000));
+ }
+
@Test
public void testLeasesCleanupOfOneGroupFromMultiple() throws
InterruptedException {
leaseGrantedMessageHandler = (n, lgm) ->
completedFuture(createLeaseGrantedMessageResponse(true));
@@ -472,4 +522,10 @@ public class LeaseNegotiationTest extends
BaseIgniteAbstractTest {
assertTrue(lease.isAccepted());
assertEquals(leaseholderId, lease.getLeaseholderId());
}
+
+ private <T> T getFieldValue(Object o, String fieldName) throws
NoSuchFieldException, IllegalAccessException {
+ Field f = o.getClass().getDeclaredField(fieldName);
+ f.setAccessible(true);
+ return (T) f.get(o);
+ }
}