This is an automated email from the ASF dual-hosted git repository.
sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 4ef09c38d9a IGNITE-28395 Fix Lease updater accumulates concurrent
in-flight invocations causing constant CAS failures (#7976)
4ef09c38d9a is described below
commit 4ef09c38d9a9afdb6d8806c6ab597221967ffeb0
Author: Denis Chudov <[email protected]>
AuthorDate: Thu Apr 16 12:12:07 2026 +0300
IGNITE-28395 Fix Lease updater accumulates concurrent in-flight invocations
causing constant CAS failures (#7976)
---
.../ItLeaseUpdaterInflightTest.java | 128 +++++++++++++++++++++
.../internal/placementdriver/LeaseUpdater.java | 45 +++++++-
.../ReplicationConfigurationSchema.java | 5 +-
3 files changed, 173 insertions(+), 5 deletions(-)
diff --git
a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ItLeaseUpdaterInflightTest.java
b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ItLeaseUpdaterInflightTest.java
new file mode 100644
index 00000000000..b7b1dd984ad
--- /dev/null
+++
b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ItLeaseUpdaterInflightTest.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.placementdriver;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.ignite.internal.TestWrappers.unwrapTableImpl;
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.runAsync;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.sleep;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.InitParametersBuilder;
+import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.lang.ByteArray;
+import org.apache.ignite.internal.replicator.ZonePartitionId;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests for the lease updater's inflight futures.
+ */
+public class ItLeaseUpdaterInflightTest extends ClusterPerTestIntegrationTest {
+ private static final String TEST_ZONE = "TEST_ZONE";
+ private static final String TEST_TABLE = "TEST_TABLE";
+ private static final String LEASE_EXPIRATION_INTERVAL_MILLIS_STR = "2000";
+
+ @Override
+ protected int initialNodes() {
+ return 1;
+ }
+
+ @BeforeEach
+ public void setup() {
+ sql("CREATE ZONE " + TEST_ZONE + " (partitions 1, replicas 1) storage
profiles ['" + CatalogService.DEFAULT_STORAGE_PROFILE + "']");
+ sql("CREATE TABLE " + TEST_TABLE + " (ID INT PRIMARY KEY, VAL
VARCHAR(20)) ZONE " + TEST_ZONE);
+ }
+
+ @Override
+ protected void customizeInitParameters(InitParametersBuilder builder) {
+ super.customizeInitParameters(builder);
+
+ builder.clusterConfiguration("ignite {"
+ + " replication.leaseExpirationIntervalMillis: " +
LEASE_EXPIRATION_INTERVAL_MILLIS_STR
+ + "}");
+ }
+
+ @Test
+ public void test() {
+ AtomicInteger msInflightCount = new AtomicInteger();
+ AtomicBoolean stopped = new AtomicBoolean();
+
+ IgniteImpl node = anyNode();
+
+ int zoneId = unwrapTableImpl(node.tables().table(TEST_TABLE)).zoneId();
+ ZonePartitionId partId = new ZonePartitionId(zoneId, 0);
+
+ ReplicaMeta replicaMeta = waitAndGetPrimaryReplica(node, partId);
+
+ log.info("Test: zoneId={}, leaseStartTime={}", zoneId,
replicaMeta.getStartTime());
+
+ String testKey = "testKey";
+ node.metaStorageManager().registerPrefixWatch(new ByteArray(testKey),
event -> {
+ sleep(10);
+
+ msInflightCount.decrementAndGet();
+
+ return nullCompletedFuture();
+ });
+
+ runAsync(() -> {
+ while (!stopped.get()) {
+ if (msInflightCount.get() > 300) {
+ continue;
+ }
+
+ msInflightCount.incrementAndGet();
+ node.metaStorageManager().put(new ByteArray(testKey),
"testValue".getBytes(StandardCharsets.UTF_8));
+ }
+ });
+
+ try {
+ sleep(Long.parseLong(LEASE_EXPIRATION_INTERVAL_MILLIS_STR) * 5);
+
+ ReplicaMeta newReplicaMeta = waitAndGetPrimaryReplica(node,
partId);
+ log.info("Test: newLease={}", newReplicaMeta);
+
+ assertEquals(replicaMeta.getStartTime().longValue(),
newReplicaMeta.getStartTime().longValue());
+ } finally {
+ stopped.set(true);
+ }
+ }
+
+ private static ReplicaMeta waitAndGetPrimaryReplica(IgniteImpl node,
ZonePartitionId replicationGrpId) {
+ CompletableFuture<ReplicaMeta> primaryReplicaFut =
node.placementDriver().awaitPrimaryReplica(
+ replicationGrpId,
+ node.clock().now(),
+ 10,
+ SECONDS
+ );
+
+ assertThat(primaryReplicaFut, willCompleteSuccessfully());
+
+ return primaryReplicaFut.join();
+ }
+}
diff --git
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java
index 6816eed83f5..60a2a06fcb7 100644
---
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java
+++
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java
@@ -17,9 +17,11 @@
package org.apache.ignite.internal.placementdriver;
+import static java.util.Collections.emptyMap;
import static java.util.Objects.hash;
import static java.util.Objects.requireNonNullElse;
import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static
org.apache.ignite.internal.hlc.HybridTimestamp.NULL_HYBRID_TIMESTAMP;
import static org.apache.ignite.internal.metastorage.dsl.Conditions.notExists;
import static org.apache.ignite.internal.metastorage.dsl.Conditions.or;
@@ -28,9 +30,11 @@ import static
org.apache.ignite.internal.metastorage.dsl.Operations.noop;
import static org.apache.ignite.internal.metastorage.dsl.Operations.put;
import static
org.apache.ignite.internal.placementdriver.PlacementDriverManager.PLACEMENTDRIVER_LEASES_KEY;
import static
org.apache.ignite.internal.placementdriver.leases.Lease.emptyLease;
+import static org.apache.ignite.internal.util.ArrayUtils.BYTE_EMPTY_ARRAY;
import static org.apache.ignite.internal.util.CollectionUtils.union;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static org.apache.ignite.internal.util.ExceptionUtils.hasCause;
+import static org.apache.ignite.internal.util.IgniteUtils.newHashMap;
import java.util.ArrayList;
import java.util.Collection;
@@ -135,6 +139,14 @@ public class LeaseUpdater {
private final Executor throttledLogExecutor;
+ private CompletableFuture<?> leaseUpdateFuture = nullCompletedFuture();
+
+ /**
+ * Leases cache for updating leases via {@link MetaStorageManager#invoke}.
It is renewed right before the lease update, because leases
+ * in {@link LeaseTracker} may be stale a bit, which is critical for
invoke.
+ */
+ private volatile Leases leases = new Leases(emptyMap(), BYTE_EMPTY_ARRAY);
+
/**
* Constructor.
*
@@ -388,6 +400,8 @@ public class LeaseUpdater {
try {
if (active()) {
+ waitForInflight();
+
updateLeaseBatchInternal();
}
} catch (Throwable e) {
@@ -408,6 +422,28 @@ public class LeaseUpdater {
}
}
+ private void waitForInflight() {
+ try {
+
leaseUpdateFuture.get(replicationConfiguration.leaseExpirationIntervalMillis().value()
/ 2, MILLISECONDS);
+ } catch (Exception e) {
+ LOG.info("Could not wait for the previous lease update to
complete, proceeding with the next update attempt.", e);
+ }
+
+ var entry = msManager.getLocally(PLACEMENTDRIVER_LEASES_KEY);
+
+ if (entry != null && entry.value() != null) {
+ LeaseBatch leaseBatch = LeaseBatch.fromBytes(entry.value());
+ Map<ReplicationGroupId, Lease> newLeasesMap =
newHashMap(leaseBatch.leases().size());
+ for (Lease lease : leaseBatch.leases()) {
+ newLeasesMap.put(lease.replicationGroupId(), lease);
+ }
+
+ leases = new Leases(newLeasesMap, entry.value());
+ } else {
+ leases = leaseTracker.leasesLatest();
+ }
+ }
+
/** Updates leases in Meta storage. This method is supposed to be used
in the busy lock. */
private void updateLeaseBatchInternal() {
HybridTimestamp currentTime = clockService.current();
@@ -418,7 +454,7 @@ public class LeaseUpdater {
HybridTimestamp newExpirationTimestamp = new
HybridTimestamp(currentTime.getPhysical() + leaseExpirationInterval, 0);
- Leases leasesCurrent = leaseTracker.leasesLatest();
+ Leases leasesCurrent = leases;
Map<ReplicationGroupId, LeaseAgreement> toBeNegotiated = new
HashMap<>();
Map<ReplicationGroupId, Lease> renewedLeases = new
HashMap<>(leasesCurrent.leaseByGroupId().size());
@@ -505,11 +541,12 @@ public class LeaseUpdater {
// so we must start a negotiation round from the
beginning; the same we do for the groups that don't have
// leaseholders at all.
if (isLeaseOutdated) {
- LOG.info("Lease is expired, creating a new one
[groupId={}, lease={}, candidate={}]", grpId, lease, candidate);
-
// New lease is granted.
Lease newLease = writeNewLease(grpId, candidate,
renewedLeases);
+ LOG.info("Lease is expired, creating a new one
[groupId={}, oldLease={}, newLease={}, candidate={}]",
+ grpId, lease, newLease, candidate);
+
boolean force = !lease.isProlongable() &&
lease.proposedCandidate() != null;
toBeNegotiated.put(grpId, new LeaseAgreement(newLease,
force));
@@ -553,7 +590,7 @@ public class LeaseUpdater {
byte[] renewedValue = new
LeaseBatch(renewedLeases.values()).bytes();
- msManager.invoke(
+ leaseUpdateFuture = msManager.invoke(
or(notExists(key),
value(key).eq(leasesCurrent.leasesBytes())),
put(key, renewedValue),
noop()
diff --git
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/configuration/ReplicationConfigurationSchema.java
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/configuration/ReplicationConfigurationSchema.java
index a53ac60c03e..e29c0d8d64f 100644
---
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/configuration/ReplicationConfigurationSchema.java
+++
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/configuration/ReplicationConfigurationSchema.java
@@ -31,6 +31,9 @@ public class ReplicationConfigurationSchema {
/** Default value for {@link #idleSafeTimePropagationDurationMillis}. */
public static final long DEFAULT_IDLE_SAFE_TIME_PROP_DURATION =
TimeUnit.SECONDS.toMillis(1);
+ /** Default value for {@link #leaseExpirationIntervalMillis}. */
+ public static final long DEFAULT_LEASE_EXPIRATION_INTERVAL_MILLIS = 5000;
+
/** Default value for {@link #batchSizeBytes}. */
public static final int DEFAULT_BATCH_SIZE_BYTES = 8192;
@@ -57,7 +60,7 @@ public class ReplicationConfigurationSchema {
@Value(hasDefault = true)
@Range(min = 2000, max = 120000)
@PublicName(legacyNames = "leaseExpirationInterval")
- public long leaseExpirationIntervalMillis = 5_000;
+ public long leaseExpirationIntervalMillis =
DEFAULT_LEASE_EXPIRATION_INTERVAL_MILLIS;
@Value(hasDefault = true)
@Range(max = 10_000)