This is an automated email from the ASF dual-hosted git repository.
sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 3cb13426cc1 IGNITE-28404 Fix the stopping of lease prolongation during
lease negotiation (#7931)
3cb13426cc1 is described below
commit 3cb13426cc139508fe24c19b4ec00f2e9c4ae884
Author: Denis Chudov <[email protected]>
AuthorDate: Fri Apr 17 15:56:38 2026 +0300
IGNITE-28404 Fix the stopping of lease prolongation during lease
negotiation (#7931)
---
.../internal/placementdriver/LeaseUpdater.java | 19 ++++-
.../placementdriver/leases/LeaseTracker.java | 2 +-
.../metrics/PlacementDriverMetricSource.java | 2 +-
.../internal/placementdriver/LeaseUpdaterTest.java | 6 +-
.../internal/replicator/ReplicaStateManager.java | 4 ++
modules/rest/build.gradle | 1 +
...terRecoveryControllerRestartPartitionsTest.java | 83 +++++++++++++++++++++-
7 files changed, 108 insertions(+), 9 deletions(-)
diff --git
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java
index 60a2a06fcb7..3b5ecc4fb93 100644
---
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java
+++
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java
@@ -261,7 +261,7 @@ public class LeaseUpdater {
leaseNegotiator.cancelAgreement(grpId);
- Leases leasesCurrent = leaseTracker.leasesLatest();
+ Leases leasesCurrent = leaseTracker.latestLeases();
Collection<Lease> currentLeases =
leasesCurrent.leaseByGroupId().values();
@@ -440,7 +440,7 @@ public class LeaseUpdater {
leases = new Leases(newLeasesMap, entry.value());
} else {
- leases = leaseTracker.leasesLatest();
+ leases = leaseTracker.latestLeases();
}
}
@@ -815,6 +815,21 @@ public class LeaseUpdater {
clusterService.messagingService().respond(sender,
response, correlationId);
}
});
+ } else {
+ // Return non null value to prevent retries from
non-leaseholder.
+ long time = clockService.currentLong();
+
+ LOG.info("Stop lease prolongation message was received
from non-leaseholder "
+ + "[groupId={}, sender={}, leaseholder={},
time={}]", grpId, sender, lease.getLeaseholder(), time);
+
+ if (correlationId != null) {
+ StopLeaseProlongationMessageResponse response =
PLACEMENT_DRIVER_MESSAGES_FACTORY
+ .stopLeaseProlongationMessageResponse()
+ .deniedLeaseExpirationTimeLong(time)
+ .build();
+
+ clusterService.messagingService().respond(sender,
response, correlationId);
+ }
}
} else {
LOG.warn("Unknown message type [msg={}]",
msg.getClass().getSimpleName());
diff --git
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java
index e112fa6b896..f2ade824a34 100644
---
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java
+++
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java
@@ -181,7 +181,7 @@ public class LeaseTracker extends
AbstractEventProducer<PrimaryReplicaEvent, Pri
}
/** Returns collection of latest leases, ordered by replication group.
Shows all latest leases including expired ones. */
- public Leases leasesLatest() {
+ public Leases latestLeases() {
return leases;
}
diff --git
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/metrics/PlacementDriverMetricSource.java
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/metrics/PlacementDriverMetricSource.java
index bfa15036aa8..a63328c16db 100644
---
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/metrics/PlacementDriverMetricSource.java
+++
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/metrics/PlacementDriverMetricSource.java
@@ -97,7 +97,7 @@ public class PlacementDriverMetricSource extends
AbstractMetricSource<Holder> {
int count = 0;
HybridTimestamp now = clockService.current();
- for (Lease lease :
leaseTracker.leasesLatest().leaseByGroupId().values()) {
+ for (Lease lease :
leaseTracker.latestLeases().leaseByGroupId().values()) {
// Expired leases can be ignored.
if (lease != null && accepted == lease.isAccepted() &&
clockService.before(lease.getExpirationTime(), now)) {
count++;
diff --git
a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseUpdaterTest.java
b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseUpdaterTest.java
index 58181434cbe..d18babbd863 100644
---
a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseUpdaterTest.java
+++
b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseUpdaterTest.java
@@ -162,7 +162,7 @@ public class LeaseUpdaterTest extends
BaseIgniteAbstractTest {
when(clusterService.messagingService()).thenReturn(messagingService);
- lenient().when(leaseTracker.leasesLatest()).thenReturn(leases);
+ lenient().when(leaseTracker.latestLeases()).thenReturn(leases);
lenient().when(leaseTracker.getLease(any(ReplicationGroupId.class))).then(i ->
Lease.emptyLease(i.getArgument(0)));
when(metaStorageManager.recoveryFinishedFuture()).thenReturn(completedFuture(new
Revisions(1, -1)));
@@ -410,7 +410,7 @@ public class LeaseUpdaterTest extends
BaseIgniteAbstractTest {
Leases currentLeases = new Leases(staleLeasesByGroup,
BYTE_EMPTY_ARRAY);
- lenient().when(leaseTracker.leasesLatest()).thenReturn(currentLeases);
+ lenient().when(leaseTracker.latestLeases()).thenReturn(currentLeases);
lenient().when(leaseTracker.getLease(any(ReplicationGroupId.class))).thenAnswer(invocation
->
currentLeases.leaseByGroupId().getOrDefault(invocation.getArgument(0),
Lease.emptyLease(invocation.getArgument(0))));
@@ -501,7 +501,7 @@ public class LeaseUpdaterTest extends
BaseIgniteAbstractTest {
Leases currentLeases = new Leases(leasesByGroup, BYTE_EMPTY_ARRAY);
- lenient().when(leaseTracker.leasesLatest()).thenReturn(currentLeases);
+ lenient().when(leaseTracker.latestLeases()).thenReturn(currentLeases);
lenient().when(leaseTracker.getLease(any(ReplicationGroupId.class))).thenAnswer(invocation
->
currentLeases.leaseByGroupId().getOrDefault(invocation.getArgument(0),
Lease.emptyLease(invocation.getArgument(0))));
diff --git
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaStateManager.java
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaStateManager.java
index b5163899dbf..3c8a8cac4a3 100644
---
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaStateManager.java
+++
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaStateManager.java
@@ -283,6 +283,8 @@ class ReplicaStateManager {
// If is primary, turning off the primary first.
context.replicaState = ReplicaState.RESTART_PLANNED;
+ LOG.info("Stopping lease prolongation due to partition
restart [groupId={}].", groupId);
+
return replicaManager.stopLeaseProlongation(groupId, null)
.thenCompose(unused ->
planDeferredReplicaStop(groupId, context, stopOperation));
} else {
@@ -319,6 +321,8 @@ class ReplicaStateManager {
// These is some probability that the replica would be reserved after
the previous lease is expired and before this method
// is called, so reservation state needs to be checked again.
if (context.reservedForPrimary) {
+ LOG.info("Stopping lease prolongation due to replica stop
[groupId={}].", groupId);
+
return replicaManager.stopLeaseProlongation(groupId, null)
.thenCompose(unused -> planDeferredReplicaStop(groupId,
context, stopOperation));
}
diff --git a/modules/rest/build.gradle b/modules/rest/build.gradle
index 37b751097ea..978fd32cef0 100644
--- a/modules/rest/build.gradle
+++ b/modules/rest/build.gradle
@@ -97,6 +97,7 @@ dependencies {
integrationTestImplementation project(':ignite-table')
integrationTestImplementation project(':ignite-transactions')
integrationTestImplementation project(':ignite-eventlog')
+ integrationTestImplementation project(':ignite-placement-driver-api')
integrationTestImplementation testFixtures(project(':ignite-core'))
integrationTestImplementation testFixtures(project(':ignite-runner'))
integrationTestImplementation
testFixtures(project(':ignite-cluster-management'))
diff --git
a/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/recovery/ItDisasterRecoveryControllerRestartPartitionsTest.java
b/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/recovery/ItDisasterRecoveryControllerRestartPartitionsTest.java
index f1cbf89bd8f..4db33e159db 100644
---
a/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/recovery/ItDisasterRecoveryControllerRestartPartitionsTest.java
+++
b/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/recovery/ItDisasterRecoveryControllerRestartPartitionsTest.java
@@ -19,11 +19,13 @@ package org.apache.ignite.internal.rest.recovery;
import static io.micronaut.http.HttpStatus.BAD_REQUEST;
import static io.micronaut.http.HttpStatus.OK;
+import static java.util.concurrent.TimeUnit.SECONDS;
import static java.util.stream.Collectors.toSet;
import static
org.apache.ignite.internal.TestDefaultProfilesNames.DEFAULT_AIPERSIST_PROFILE_NAME;
import static
org.apache.ignite.internal.rest.matcher.MicronautHttpResponseMatcher.assertThrowsProblem;
import static
org.apache.ignite.internal.rest.matcher.MicronautHttpResponseMatcher.hasStatus;
import static org.apache.ignite.internal.rest.matcher.ProblemMatcher.isProblem;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.allOf;
@@ -36,20 +38,33 @@ import jakarta.inject.Inject;
import java.util.Collection;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import org.apache.ignite.Ignite;
import org.apache.ignite.internal.ClusterConfiguration;
import org.apache.ignite.internal.ClusterPerClassIntegrationTest;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.placementdriver.ReplicaMeta;
+import org.apache.ignite.internal.placementdriver.message.LeaseGrantedMessage;
+import
org.apache.ignite.internal.placementdriver.message.LeaseGrantedMessageResponse;
+import
org.apache.ignite.internal.placementdriver.message.PlacementDriverMessagesFactory;
+import
org.apache.ignite.internal.placementdriver.message.StopLeaseProlongationMessage;
+import org.apache.ignite.internal.replicator.ZonePartitionId;
import
org.apache.ignite.internal.rest.api.recovery.RestartZonePartitionsRequest;
+import org.apache.ignite.internal.table.TableImpl;
+import org.apache.ignite.internal.wrapper.Wrappers;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
/** Test for disaster recovery restart partitions command. */
@MicronautTest
public class ItDisasterRecoveryControllerRestartPartitionsTest extends
ClusterPerClassIntegrationTest {
+ private static final PlacementDriverMessagesFactory
PLACEMENT_DRIVER_MESSAGES_FACTORY = new PlacementDriverMessagesFactory();
+
private static final String NODE_URL = "http://localhost:" +
ClusterConfiguration.DEFAULT_BASE_HTTP_PORT;
private static final String FIRST_ZONE = "first_ZONE";
@@ -65,10 +80,17 @@ public class
ItDisasterRecoveryControllerRestartPartitionsTest extends ClusterPe
@BeforeAll
public void setUp() {
sql(String.format("CREATE ZONE \"%s\" storage profiles ['%s']",
FIRST_ZONE, DEFAULT_AIPERSIST_PROFILE_NAME));
- sql(String.format("CREATE TABLE PUBLIC.\"%s\" (id INT PRIMARY KEY, val
INT) ZONE \"%s\"", TABLE_NAME,
+ sql(String.format("CREATE TABLE %s (id INT PRIMARY KEY, val INT) ZONE
\"%s\"", TABLE_NAME,
FIRST_ZONE));
}
+ @BeforeEach
+ public void beforeEach() {
+ for (IgniteImpl node : runningNodesList()) {
+ node.stopDroppingMessages();
+ }
+ }
+
@Test
public void testRestartPartitionZoneNotFound() {
String unknownZone = "unknown_zone";
@@ -135,7 +157,6 @@ public class
ItDisasterRecoveryControllerRestartPartitionsTest extends ClusterPe
}
@Test
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-26377")
public void testRestartSpecifiedPartitions() {
MutableHttpRequest<?> post = restartPartitionsRequest(Set.of(),
FIRST_ZONE, Set.of(0, 1));
@@ -151,6 +172,64 @@ public class
ItDisasterRecoveryControllerRestartPartitionsTest extends ClusterPe
assertThat(client.toBlocking().exchange(post), hasStatus(OK));
}
+ @Test
+ public void testRestartPartitionDuringLeaseNegotiation() {
+ IgniteImpl node = anyNode();
+
+ int zoneId = Wrappers.unwrap(node.tables().table(TABLE_NAME),
TableImpl.class).zoneId();
+ ZonePartitionId partId = new ZonePartitionId(zoneId, 0);
+
+ CompletableFuture<ReplicaMeta> primaryReplicaFut =
anyNode().placementDriver().awaitPrimaryReplica(
+ partId,
+ node.clock().now(),
+ 10,
+ SECONDS
+ );
+
+ assertThat(primaryReplicaFut, willCompleteSuccessfully());
+
+ log.info("Test: primary replica [groupId={}, leaseholder={}]", partId,
primaryReplicaFut.join().getLeaseholder());
+
+ CompletableFuture<?> newNegotiationFuture = new CompletableFuture<>();
+
+ for (IgniteImpl n : runningNodesList()) {
+ n.dropMessages((recp, msg) -> {
+ if (msg instanceof LeaseGrantedMessage) {
+ LeaseGrantedMessage lgm = (LeaseGrantedMessage) msg;
+ if (lgm.groupId().equals(partId)) {
+ log.info("Test: new negotiation begins [groupId={},
leaseholder={}]", lgm.groupId(), recp);
+ newNegotiationFuture.complete(null);
+ }
+ }
+
+ if (msg instanceof LeaseGrantedMessageResponse) {
+ log.info("Test: lease negotiation tries to finish
[accepted={}]", ((LeaseGrantedMessageResponse) msg).accepted());
+ return true;
+ }
+
+ return false;
+ });
+ }
+
+ StopLeaseProlongationMessage stopMsg =
PLACEMENT_DRIVER_MESSAGES_FACTORY.stopLeaseProlongationMessage()
+ .groupId(partId)
+ .build();
+
+ for (IgniteImpl n : runningNodesList()) {
+ for (IgniteImpl recp : runningNodesList()) {
+
n.clusterService().messagingService().invoke(recp.clusterService().topologyService().localMember(),
stopMsg, 3000);
+ }
+ }
+
+ assertThat(newNegotiationFuture, willCompleteSuccessfully());
+
+ log.info("Test: partition restart");
+
+ MutableHttpRequest<?> post = restartPartitionsRequest(Set.of(),
FIRST_ZONE, Set.of(0));
+
+ assertThat(client.toBlocking().exchange(post), hasStatus(OK));
+ }
+
private static Set<String> nodeNames(int count) {
return CLUSTER.runningNodes()
.map(Ignite::name)