This is an automated email from the ASF dual-hosted git repository.

sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 4ef09c38d9a IGNITE-28395 Fix Lease updater accumulates concurrent 
in-flight invocations causing constant CAS failures (#7976)
4ef09c38d9a is described below

commit 4ef09c38d9a9afdb6d8806c6ab597221967ffeb0
Author: Denis Chudov <[email protected]>
AuthorDate: Thu Apr 16 12:12:07 2026 +0300

    IGNITE-28395 Fix Lease updater accumulates concurrent in-flight invocations 
causing constant CAS failures (#7976)
---
 .../ItLeaseUpdaterInflightTest.java                | 128 +++++++++++++++++++++
 .../internal/placementdriver/LeaseUpdater.java     |  45 +++++++-
 .../ReplicationConfigurationSchema.java            |   5 +-
 3 files changed, 173 insertions(+), 5 deletions(-)

diff --git 
a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ItLeaseUpdaterInflightTest.java
 
b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ItLeaseUpdaterInflightTest.java
new file mode 100644
index 00000000000..b7b1dd984ad
--- /dev/null
+++ 
b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ItLeaseUpdaterInflightTest.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.placementdriver;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.ignite.internal.TestWrappers.unwrapTableImpl;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.runAsync;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.sleep;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.InitParametersBuilder;
+import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.lang.ByteArray;
+import org.apache.ignite.internal.replicator.ZonePartitionId;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests for the lease updater's inflight futures.
+ */
+public class ItLeaseUpdaterInflightTest extends ClusterPerTestIntegrationTest {
+    private static final String TEST_ZONE = "TEST_ZONE";
+    private static final String TEST_TABLE = "TEST_TABLE";
+    private static final String LEASE_EXPIRATION_INTERVAL_MILLIS_STR = "2000";
+
+    @Override
+    protected int initialNodes() {
+        return 1;
+    }
+
+    @BeforeEach
+    public void setup() {
+        sql("CREATE ZONE " + TEST_ZONE + " (partitions 1, replicas 1) storage 
profiles ['" + CatalogService.DEFAULT_STORAGE_PROFILE + "']");
+        sql("CREATE TABLE " + TEST_TABLE + " (ID INT PRIMARY KEY, VAL 
VARCHAR(20)) ZONE " + TEST_ZONE);
+    }
+
+    @Override
+    protected void customizeInitParameters(InitParametersBuilder builder) {
+        super.customizeInitParameters(builder);
+
+        builder.clusterConfiguration("ignite {"
+                + "  replication.leaseExpirationIntervalMillis: " + 
LEASE_EXPIRATION_INTERVAL_MILLIS_STR
+                + "}");
+    }
+
+    @Test
+    public void test() {
+        AtomicInteger msInflightCount = new AtomicInteger();
+        AtomicBoolean stopped = new AtomicBoolean();
+
+        IgniteImpl node = anyNode();
+
+        int zoneId = unwrapTableImpl(node.tables().table(TEST_TABLE)).zoneId();
+        ZonePartitionId partId = new ZonePartitionId(zoneId, 0);
+
+        ReplicaMeta replicaMeta = waitAndGetPrimaryReplica(node, partId);
+
+        log.info("Test: zoneId={}, leaseStartTime={}", zoneId, 
replicaMeta.getStartTime());
+
+        String testKey = "testKey";
+        node.metaStorageManager().registerPrefixWatch(new ByteArray(testKey), 
event -> {
+            sleep(10);
+
+            msInflightCount.decrementAndGet();
+
+            return nullCompletedFuture();
+        });
+
+        runAsync(() -> {
+            while (!stopped.get()) {
+                if (msInflightCount.get() > 300) {
+                    continue;
+                }
+
+                msInflightCount.incrementAndGet();
+                node.metaStorageManager().put(new ByteArray(testKey), 
"testValue".getBytes(StandardCharsets.UTF_8));
+            }
+        });
+
+        try {
+            sleep(Long.parseLong(LEASE_EXPIRATION_INTERVAL_MILLIS_STR) * 5);
+
+            ReplicaMeta newReplicaMeta = waitAndGetPrimaryReplica(node, 
partId);
+            log.info("Test: newLease={}", newReplicaMeta);
+
+            assertEquals(replicaMeta.getStartTime().longValue(), 
newReplicaMeta.getStartTime().longValue());
+        } finally {
+            stopped.set(true);
+        }
+    }
+
+    private static ReplicaMeta waitAndGetPrimaryReplica(IgniteImpl node, 
ZonePartitionId replicationGrpId) {
+        CompletableFuture<ReplicaMeta> primaryReplicaFut = 
node.placementDriver().awaitPrimaryReplica(
+                replicationGrpId,
+                node.clock().now(),
+                10,
+                SECONDS
+        );
+
+        assertThat(primaryReplicaFut, willCompleteSuccessfully());
+
+        return primaryReplicaFut.join();
+    }
+}
diff --git 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java
 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java
index 6816eed83f5..60a2a06fcb7 100644
--- 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java
+++ 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java
@@ -17,9 +17,11 @@
 
 package org.apache.ignite.internal.placementdriver;
 
+import static java.util.Collections.emptyMap;
 import static java.util.Objects.hash;
 import static java.util.Objects.requireNonNullElse;
 import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static 
org.apache.ignite.internal.hlc.HybridTimestamp.NULL_HYBRID_TIMESTAMP;
 import static org.apache.ignite.internal.metastorage.dsl.Conditions.notExists;
 import static org.apache.ignite.internal.metastorage.dsl.Conditions.or;
@@ -28,9 +30,11 @@ import static 
org.apache.ignite.internal.metastorage.dsl.Operations.noop;
 import static org.apache.ignite.internal.metastorage.dsl.Operations.put;
 import static 
org.apache.ignite.internal.placementdriver.PlacementDriverManager.PLACEMENTDRIVER_LEASES_KEY;
 import static 
org.apache.ignite.internal.placementdriver.leases.Lease.emptyLease;
+import static org.apache.ignite.internal.util.ArrayUtils.BYTE_EMPTY_ARRAY;
 import static org.apache.ignite.internal.util.CollectionUtils.union;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 import static org.apache.ignite.internal.util.ExceptionUtils.hasCause;
+import static org.apache.ignite.internal.util.IgniteUtils.newHashMap;
 
 import java.util.ArrayList;
 import java.util.Collection;
@@ -135,6 +139,14 @@ public class LeaseUpdater {
 
     private final Executor throttledLogExecutor;
 
+    private CompletableFuture<?> leaseUpdateFuture = nullCompletedFuture();
+
+    /**
+     * Leases cache for updating leases via {@link MetaStorageManager#invoke}. 
It is renewed right before the lease update, because leases
+     * in {@link LeaseTracker} may be stale a bit, which is critical for 
invoke.
+     */
+    private volatile Leases leases = new Leases(emptyMap(), BYTE_EMPTY_ARRAY);
+
     /**
      * Constructor.
      *
@@ -388,6 +400,8 @@ public class LeaseUpdater {
 
                 try {
                     if (active()) {
+                        waitForInflight();
+
                         updateLeaseBatchInternal();
                     }
                 } catch (Throwable e) {
@@ -408,6 +422,28 @@ public class LeaseUpdater {
             }
         }
 
+        private void waitForInflight() {
+            try {
+                
leaseUpdateFuture.get(replicationConfiguration.leaseExpirationIntervalMillis().value()
 / 2, MILLISECONDS);
+            } catch (Exception e) {
+                LOG.info("Could not wait for the previous lease update to 
complete, proceeding with the next update attempt.", e);
+            }
+
+            var entry = msManager.getLocally(PLACEMENTDRIVER_LEASES_KEY);
+
+            if (entry != null && entry.value() != null) {
+                LeaseBatch leaseBatch = LeaseBatch.fromBytes(entry.value());
+                Map<ReplicationGroupId, Lease> newLeasesMap = 
newHashMap(leaseBatch.leases().size());
+                for (Lease lease : leaseBatch.leases()) {
+                    newLeasesMap.put(lease.replicationGroupId(), lease);
+                }
+
+                leases = new Leases(newLeasesMap, entry.value());
+            } else {
+                leases = leaseTracker.leasesLatest();
+            }
+        }
+
         /** Updates leases in Meta storage. This method is supposed to be used 
in the busy lock. */
         private void updateLeaseBatchInternal() {
             HybridTimestamp currentTime = clockService.current();
@@ -418,7 +454,7 @@ public class LeaseUpdater {
 
             HybridTimestamp newExpirationTimestamp = new 
HybridTimestamp(currentTime.getPhysical() + leaseExpirationInterval, 0);
 
-            Leases leasesCurrent = leaseTracker.leasesLatest();
+            Leases leasesCurrent = leases;
             Map<ReplicationGroupId, LeaseAgreement> toBeNegotiated = new 
HashMap<>();
             Map<ReplicationGroupId, Lease> renewedLeases = new 
HashMap<>(leasesCurrent.leaseByGroupId().size());
 
@@ -505,11 +541,12 @@ public class LeaseUpdater {
                     // so we must start a negotiation round from the 
beginning; the same we do for the groups that don't have
                     // leaseholders at all.
                     if (isLeaseOutdated) {
-                        LOG.info("Lease is expired, creating a new one 
[groupId={}, lease={}, candidate={}]", grpId, lease, candidate);
-
                         // New lease is granted.
                         Lease newLease = writeNewLease(grpId, candidate, 
renewedLeases);
 
+                        LOG.info("Lease is expired, creating a new one 
[groupId={}, oldLease={}, newLease={}, candidate={}]",
+                                grpId, lease, newLease, candidate);
+
                         boolean force = !lease.isProlongable() && 
lease.proposedCandidate() != null;
 
                         toBeNegotiated.put(grpId, new LeaseAgreement(newLease, 
force));
@@ -553,7 +590,7 @@ public class LeaseUpdater {
 
             byte[] renewedValue = new 
LeaseBatch(renewedLeases.values()).bytes();
 
-            msManager.invoke(
+            leaseUpdateFuture = msManager.invoke(
                     or(notExists(key), 
value(key).eq(leasesCurrent.leasesBytes())),
                     put(key, renewedValue),
                     noop()
diff --git 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/configuration/ReplicationConfigurationSchema.java
 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/configuration/ReplicationConfigurationSchema.java
index a53ac60c03e..e29c0d8d64f 100644
--- 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/configuration/ReplicationConfigurationSchema.java
+++ 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/configuration/ReplicationConfigurationSchema.java
@@ -31,6 +31,9 @@ public class ReplicationConfigurationSchema {
     /** Default value for {@link #idleSafeTimePropagationDurationMillis}. */
     public static final long DEFAULT_IDLE_SAFE_TIME_PROP_DURATION = 
TimeUnit.SECONDS.toMillis(1);
 
+    /** Default value for {@link #leaseExpirationIntervalMillis}. */
+    public static final long DEFAULT_LEASE_EXPIRATION_INTERVAL_MILLIS = 5000;
+
     /** Default value for {@link #batchSizeBytes}. */
     public static final int DEFAULT_BATCH_SIZE_BYTES = 8192;
 
@@ -57,7 +60,7 @@ public class ReplicationConfigurationSchema {
     @Value(hasDefault = true)
     @Range(min = 2000, max = 120000)
     @PublicName(legacyNames = "leaseExpirationInterval")
-    public long leaseExpirationIntervalMillis = 5_000;
+    public long leaseExpirationIntervalMillis = 
DEFAULT_LEASE_EXPIRATION_INTERVAL_MILLIS;
 
     @Value(hasDefault = true)
     @Range(max = 10_000)

Reply via email to