This is an automated email from the ASF dual-hosted git repository.

sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 3cb13426cc1 IGNITE-28404 Fix the stopping of lease prolongation during 
lease negotiation (#7931)
3cb13426cc1 is described below

commit 3cb13426cc139508fe24c19b4ec00f2e9c4ae884
Author: Denis Chudov <[email protected]>
AuthorDate: Fri Apr 17 15:56:38 2026 +0300

    IGNITE-28404 Fix the stopping of lease prolongation during lease 
negotiation (#7931)
---
 .../internal/placementdriver/LeaseUpdater.java     | 19 ++++-
 .../placementdriver/leases/LeaseTracker.java       |  2 +-
 .../metrics/PlacementDriverMetricSource.java       |  2 +-
 .../internal/placementdriver/LeaseUpdaterTest.java |  6 +-
 .../internal/replicator/ReplicaStateManager.java   |  4 ++
 modules/rest/build.gradle                          |  1 +
 ...terRecoveryControllerRestartPartitionsTest.java | 83 +++++++++++++++++++++-
 7 files changed, 108 insertions(+), 9 deletions(-)

diff --git 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java
 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java
index 60a2a06fcb7..3b5ecc4fb93 100644
--- 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java
+++ 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java
@@ -261,7 +261,7 @@ public class LeaseUpdater {
 
         leaseNegotiator.cancelAgreement(grpId);
 
-        Leases leasesCurrent = leaseTracker.leasesLatest();
+        Leases leasesCurrent = leaseTracker.latestLeases();
 
         Collection<Lease> currentLeases = 
leasesCurrent.leaseByGroupId().values();
 
@@ -440,7 +440,7 @@ public class LeaseUpdater {
 
                 leases = new Leases(newLeasesMap, entry.value());
             } else {
-                leases = leaseTracker.leasesLatest();
+                leases = leaseTracker.latestLeases();
             }
         }
 
@@ -815,6 +815,21 @@ public class LeaseUpdater {
                             clusterService.messagingService().respond(sender, 
response, correlationId);
                         }
                     });
+                } else {
+                    // Return non null value to prevent retries from 
non-leaseholder.
+                    long time = clockService.currentLong();
+
+                    LOG.info("Stop lease prolongation message was received 
from non-leaseholder "
+                                    + "[groupId={}, sender={}, leaseholder={}, 
time={}]", grpId, sender, lease.getLeaseholder(), time);
+
+                    if (correlationId != null) {
+                        StopLeaseProlongationMessageResponse response = 
PLACEMENT_DRIVER_MESSAGES_FACTORY
+                                .stopLeaseProlongationMessageResponse()
+                                .deniedLeaseExpirationTimeLong(time)
+                                .build();
+
+                        clusterService.messagingService().respond(sender, 
response, correlationId);
+                    }
                 }
             } else {
                 LOG.warn("Unknown message type [msg={}]", 
msg.getClass().getSimpleName());
diff --git 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java
 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java
index e112fa6b896..f2ade824a34 100644
--- 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java
+++ 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java
@@ -181,7 +181,7 @@ public class LeaseTracker extends 
AbstractEventProducer<PrimaryReplicaEvent, Pri
     }
 
     /** Returns collection of latest leases, ordered by replication group. 
Shows all latest leases including expired ones. */
-    public Leases leasesLatest() {
+    public Leases latestLeases() {
         return leases;
     }
 
diff --git 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/metrics/PlacementDriverMetricSource.java
 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/metrics/PlacementDriverMetricSource.java
index bfa15036aa8..a63328c16db 100644
--- 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/metrics/PlacementDriverMetricSource.java
+++ 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/metrics/PlacementDriverMetricSource.java
@@ -97,7 +97,7 @@ public class PlacementDriverMetricSource extends 
AbstractMetricSource<Holder> {
             int count = 0;
             HybridTimestamp now = clockService.current();
 
-            for (Lease lease : 
leaseTracker.leasesLatest().leaseByGroupId().values()) {
+            for (Lease lease : 
leaseTracker.latestLeases().leaseByGroupId().values()) {
                 // Expired leases can be ignored.
                 if (lease != null && accepted == lease.isAccepted() && 
clockService.before(lease.getExpirationTime(), now)) {
                     count++;
diff --git 
a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseUpdaterTest.java
 
b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseUpdaterTest.java
index 58181434cbe..d18babbd863 100644
--- 
a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseUpdaterTest.java
+++ 
b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseUpdaterTest.java
@@ -162,7 +162,7 @@ public class LeaseUpdaterTest extends 
BaseIgniteAbstractTest {
 
         when(clusterService.messagingService()).thenReturn(messagingService);
 
-        lenient().when(leaseTracker.leasesLatest()).thenReturn(leases);
+        lenient().when(leaseTracker.latestLeases()).thenReturn(leases);
         
lenient().when(leaseTracker.getLease(any(ReplicationGroupId.class))).then(i -> 
Lease.emptyLease(i.getArgument(0)));
 
         
when(metaStorageManager.recoveryFinishedFuture()).thenReturn(completedFuture(new
 Revisions(1, -1)));
@@ -410,7 +410,7 @@ public class LeaseUpdaterTest extends 
BaseIgniteAbstractTest {
 
         Leases currentLeases = new Leases(staleLeasesByGroup, 
BYTE_EMPTY_ARRAY);
 
-        lenient().when(leaseTracker.leasesLatest()).thenReturn(currentLeases);
+        lenient().when(leaseTracker.latestLeases()).thenReturn(currentLeases);
         
lenient().when(leaseTracker.getLease(any(ReplicationGroupId.class))).thenAnswer(invocation
 ->
                 
currentLeases.leaseByGroupId().getOrDefault(invocation.getArgument(0), 
Lease.emptyLease(invocation.getArgument(0))));
 
@@ -501,7 +501,7 @@ public class LeaseUpdaterTest extends 
BaseIgniteAbstractTest {
 
         Leases currentLeases = new Leases(leasesByGroup, BYTE_EMPTY_ARRAY);
 
-        lenient().when(leaseTracker.leasesLatest()).thenReturn(currentLeases);
+        lenient().when(leaseTracker.latestLeases()).thenReturn(currentLeases);
         
lenient().when(leaseTracker.getLease(any(ReplicationGroupId.class))).thenAnswer(invocation
 ->
                 
currentLeases.leaseByGroupId().getOrDefault(invocation.getArgument(0), 
Lease.emptyLease(invocation.getArgument(0))));
 
diff --git 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaStateManager.java
 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaStateManager.java
index b5163899dbf..3c8a8cac4a3 100644
--- 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaStateManager.java
+++ 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaStateManager.java
@@ -283,6 +283,8 @@ class ReplicaStateManager {
                     // If is primary, turning off the primary first.
                     context.replicaState = ReplicaState.RESTART_PLANNED;
 
+                    LOG.info("Stopping lease prolongation due to partition 
restart [groupId={}].", groupId);
+
                     return replicaManager.stopLeaseProlongation(groupId, null)
                             .thenCompose(unused -> 
planDeferredReplicaStop(groupId, context, stopOperation));
                 } else {
@@ -319,6 +321,8 @@ class ReplicaStateManager {
         // These is some probability that the replica would be reserved after 
the previous lease is expired and before this method
         // is called, so reservation state needs to be checked again.
         if (context.reservedForPrimary) {
+            LOG.info("Stopping lease prolongation due to replica stop 
[groupId={}].", groupId);
+
             return replicaManager.stopLeaseProlongation(groupId, null)
                     .thenCompose(unused -> planDeferredReplicaStop(groupId, 
context, stopOperation));
         }
diff --git a/modules/rest/build.gradle b/modules/rest/build.gradle
index 37b751097ea..978fd32cef0 100644
--- a/modules/rest/build.gradle
+++ b/modules/rest/build.gradle
@@ -97,6 +97,7 @@ dependencies {
     integrationTestImplementation project(':ignite-table')
     integrationTestImplementation project(':ignite-transactions')
     integrationTestImplementation project(':ignite-eventlog')
+    integrationTestImplementation project(':ignite-placement-driver-api')
     integrationTestImplementation testFixtures(project(':ignite-core'))
     integrationTestImplementation testFixtures(project(':ignite-runner'))
     integrationTestImplementation 
testFixtures(project(':ignite-cluster-management'))
diff --git 
a/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/recovery/ItDisasterRecoveryControllerRestartPartitionsTest.java
 
b/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/recovery/ItDisasterRecoveryControllerRestartPartitionsTest.java
index f1cbf89bd8f..4db33e159db 100644
--- 
a/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/recovery/ItDisasterRecoveryControllerRestartPartitionsTest.java
+++ 
b/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/recovery/ItDisasterRecoveryControllerRestartPartitionsTest.java
@@ -19,11 +19,13 @@ package org.apache.ignite.internal.rest.recovery;
 
 import static io.micronaut.http.HttpStatus.BAD_REQUEST;
 import static io.micronaut.http.HttpStatus.OK;
+import static java.util.concurrent.TimeUnit.SECONDS;
 import static java.util.stream.Collectors.toSet;
 import static 
org.apache.ignite.internal.TestDefaultProfilesNames.DEFAULT_AIPERSIST_PROFILE_NAME;
 import static 
org.apache.ignite.internal.rest.matcher.MicronautHttpResponseMatcher.assertThrowsProblem;
 import static 
org.apache.ignite.internal.rest.matcher.MicronautHttpResponseMatcher.hasStatus;
 import static org.apache.ignite.internal.rest.matcher.ProblemMatcher.isProblem;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.allOf;
 
@@ -36,20 +38,33 @@ import jakarta.inject.Inject;
 import java.util.Collection;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.stream.Collectors;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.internal.ClusterConfiguration;
 import org.apache.ignite.internal.ClusterPerClassIntegrationTest;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.placementdriver.ReplicaMeta;
+import org.apache.ignite.internal.placementdriver.message.LeaseGrantedMessage;
+import 
org.apache.ignite.internal.placementdriver.message.LeaseGrantedMessageResponse;
+import 
org.apache.ignite.internal.placementdriver.message.PlacementDriverMessagesFactory;
+import 
org.apache.ignite.internal.placementdriver.message.StopLeaseProlongationMessage;
+import org.apache.ignite.internal.replicator.ZonePartitionId;
 import 
org.apache.ignite.internal.rest.api.recovery.RestartZonePartitionsRequest;
+import org.apache.ignite.internal.table.TableImpl;
+import org.apache.ignite.internal.wrapper.Wrappers;
 import org.hamcrest.Matcher;
 import org.hamcrest.Matchers;
 import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 
 /** Test for disaster recovery restart partitions command. */
 @MicronautTest
 public class ItDisasterRecoveryControllerRestartPartitionsTest extends 
ClusterPerClassIntegrationTest {
+    private static final PlacementDriverMessagesFactory 
PLACEMENT_DRIVER_MESSAGES_FACTORY = new PlacementDriverMessagesFactory();
+
     private static final String NODE_URL = "http://localhost:"; + 
ClusterConfiguration.DEFAULT_BASE_HTTP_PORT;
 
     private static final String FIRST_ZONE = "first_ZONE";
@@ -65,10 +80,17 @@ public class 
ItDisasterRecoveryControllerRestartPartitionsTest extends ClusterPe
     @BeforeAll
     public void setUp() {
         sql(String.format("CREATE ZONE \"%s\" storage profiles ['%s']", 
FIRST_ZONE, DEFAULT_AIPERSIST_PROFILE_NAME));
-        sql(String.format("CREATE TABLE PUBLIC.\"%s\" (id INT PRIMARY KEY, val 
INT) ZONE \"%s\"", TABLE_NAME,
+        sql(String.format("CREATE TABLE %s (id INT PRIMARY KEY, val INT) ZONE 
\"%s\"", TABLE_NAME,
                 FIRST_ZONE));
     }
 
+    @BeforeEach
+    public void beforeEach() {
+        for (IgniteImpl node : runningNodesList()) {
+            node.stopDroppingMessages();
+        }
+    }
+
     @Test
     public void testRestartPartitionZoneNotFound() {
         String unknownZone = "unknown_zone";
@@ -135,7 +157,6 @@ public class 
ItDisasterRecoveryControllerRestartPartitionsTest extends ClusterPe
     }
 
     @Test
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-26377";)
     public void testRestartSpecifiedPartitions() {
         MutableHttpRequest<?> post = restartPartitionsRequest(Set.of(), 
FIRST_ZONE, Set.of(0, 1));
 
@@ -151,6 +172,64 @@ public class 
ItDisasterRecoveryControllerRestartPartitionsTest extends ClusterPe
         assertThat(client.toBlocking().exchange(post), hasStatus(OK));
     }
 
+    @Test
+    public void testRestartPartitionDuringLeaseNegotiation() {
+        IgniteImpl node = anyNode();
+
+        int zoneId = Wrappers.unwrap(node.tables().table(TABLE_NAME), 
TableImpl.class).zoneId();
+        ZonePartitionId partId = new ZonePartitionId(zoneId, 0);
+
+        CompletableFuture<ReplicaMeta> primaryReplicaFut = 
anyNode().placementDriver().awaitPrimaryReplica(
+                partId,
+                node.clock().now(),
+                10,
+                SECONDS
+        );
+
+        assertThat(primaryReplicaFut, willCompleteSuccessfully());
+
+        log.info("Test: primary replica [groupId={}, leaseholder={}]", partId, 
primaryReplicaFut.join().getLeaseholder());
+
+        CompletableFuture<?> newNegotiationFuture = new CompletableFuture<>();
+
+        for (IgniteImpl n : runningNodesList()) {
+            n.dropMessages((recp, msg) -> {
+                if (msg instanceof LeaseGrantedMessage) {
+                    LeaseGrantedMessage lgm = (LeaseGrantedMessage) msg;
+                    if (lgm.groupId().equals(partId)) {
+                        log.info("Test: new negotiation begins [groupId={}, 
leaseholder={}]", lgm.groupId(), recp);
+                        newNegotiationFuture.complete(null);
+                    }
+                }
+
+                if (msg instanceof LeaseGrantedMessageResponse) {
+                    log.info("Test: lease negotiation tries to finish 
[accepted={}]", ((LeaseGrantedMessageResponse) msg).accepted());
+                    return true;
+                }
+
+                return false;
+            });
+        }
+
+        StopLeaseProlongationMessage stopMsg = 
PLACEMENT_DRIVER_MESSAGES_FACTORY.stopLeaseProlongationMessage()
+                .groupId(partId)
+                .build();
+
+        for (IgniteImpl n : runningNodesList()) {
+            for (IgniteImpl recp : runningNodesList()) {
+                
n.clusterService().messagingService().invoke(recp.clusterService().topologyService().localMember(),
 stopMsg, 3000);
+            }
+        }
+
+        assertThat(newNegotiationFuture, willCompleteSuccessfully());
+
+        log.info("Test: partition restart");
+
+        MutableHttpRequest<?> post = restartPartitionsRequest(Set.of(), 
FIRST_ZONE, Set.of(0));
+
+        assertThat(client.toBlocking().exchange(post), hasStatus(OK));
+    }
+
     private static Set<String> nodeNames(int count) {
         return CLUSTER.runningNodes()
                 .map(Ignite::name)

Reply via email to