This is an automated email from the ASF dual-hosted git repository.
NSAmelchev pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 72c39d3f3f1 IGNITE-28758 Fixed an explicit cache lock race condition
with timeout/cancel cleanup (#13227)
72c39d3f3f1 is described below
commit 72c39d3f3f106a348ae6038ccb9d462debc982a6
Author: Nikita Amelchev <[email protected]>
AuthorDate: Thu Jun 11 16:22:12 2026 +0300
IGNITE-28758 Fixed an explicit cache lock race condition with
timeout/cancel cleanup (#13227)
---
.../cache/distributed/dht/GridDhtLockFuture.java | 41 ++++++++--
.../dht/colocated/GridDhtColocatedLockFuture.java | 32 +++++---
.../distributed/CacheTryLockMultithreadedTest.java | 87 ++++++++++++++++++++--
3 files changed, 140 insertions(+), 20 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
index 7cba44d7ef5..740b235639b 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
@@ -553,6 +553,17 @@ public final class GridDhtLockFuture extends
GridCacheCompoundIdentityFuture<Boo
return;
}
+ if (timedOut) {
+ if (msgLog.isDebugEnabled()) {
+ msgLog.debug("DHT lock fut, response for finished future
[txId=" + nearLockVer +
+ ", dhtTxId=" + lockVer +
+ ", inTx=" + inTx() +
+ ", node=" + nodeId + ']');
+ }
+
+ return;
+ }
+
U.warn(msgLog, "DHT lock fut, failed to find mini future [txId=" +
nearLockVer +
", dhtTxId=" + lockVer +
", inTx=" + inTx() +
@@ -661,7 +672,7 @@ public final class GridDhtLockFuture extends
GridCacheCompoundIdentityFuture<Boo
* @param entry Entry whose lock ownership changed.
*/
@Override public boolean onOwnerChanged(GridCacheEntryEx entry,
GridCacheMvccCandidate owner) {
- if (isDone() || (inTx() && (tx.remainingTime() == -1 ||
tx.isRollbackOnly())))
+ if (checkDone() || (inTx() && (tx.remainingTime() == -1 ||
tx.isRollbackOnly())))
return false; // Check other futures.
if (log.isDebugEnabled())
@@ -671,6 +682,9 @@ public final class GridDhtLockFuture extends
GridCacheCompoundIdentityFuture<Boo
boolean isEmpty;
synchronized (this) {
+ if (checkDone())
+ return false;
+
if (!pendingLocks.remove(entry.key()))
return false;
@@ -743,6 +757,9 @@ public final class GridDhtLockFuture extends
GridCacheCompoundIdentityFuture<Boo
if (log.isDebugEnabled())
log.debug("Received onComplete(..) callback [success=" + success +
", fut=" + this + ']');
+ if (isDone())
+ return false;
+
if (!success && !stopping && unlock)
undoLocks(true);
@@ -809,7 +826,7 @@ public final class GridDhtLockFuture extends
GridCacheCompoundIdentityFuture<Boo
* @return {@code True} if future is done.
*/
private boolean checkDone() {
- if (isDone()) {
+ if (isDone() || timedOut) {
if (log.isDebugEnabled())
log.debug("Mapping won't proceed because future is done: " +
this);
@@ -824,7 +841,7 @@ public final class GridDhtLockFuture extends
GridCacheCompoundIdentityFuture<Boo
*/
private void map(Iterable<GridDhtCacheEntry> entries) {
synchronized (this) {
- if (mapped)
+ if (mapped || checkDone())
return;
mapped = true;
@@ -970,15 +987,27 @@ public final class GridDhtLockFuture extends
GridCacheCompoundIdentityFuture<Boo
GridCacheMvccCandidate added =
e.candidate(lockVer);
- assert added != null;
+ // Possible in case of lock cancellation.
+ if (added == null) {
+ onFailed();
+
+ return;
+ }
+
assert added.dhtLocal();
if (added.ownerVersion() != null)
req.owned(e.key(),
added.ownerVersion());
}
catch (GridCacheEntryRemovedException ex) {
- assert false : "Entry cannot become
obsolete when DHT local candidate is added " +
- "[e=" + e + ", ex=" + ex + ']';
+ if (log.isDebugEnabled()) {
+ log.debug("Entry was removed while
mapping DHT lock future (will fail) " +
+ "[e=" + e + ", ex=" + ex + ",
fut=" + this + ']');
+ }
+
+ onFailed();
+
+ return;
}
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
index 6bf6c13aedc..76690dd06d8 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
@@ -1283,16 +1283,18 @@ public final class GridDhtColocatedLockFuture extends
GridCacheCompoundIdentityF
log.debug("Acquired lock for local DHT mapping [locId=" +
cctx.nodeId() +
", mappedKeys=" + keys + ", fut=" + this + ']');
- if (inTx()) {
- for (KeyCacheObject key : keys)
- tx.entry(cctx.txKey(key)).markLocked();
- }
- else {
- for (KeyCacheObject key : keys)
- cctx.mvcc().markExplicitOwner(cctx.txKey(key),
threadId);
- }
-
try {
+ if (timeoutObj == null)
+ markLocalDhtLocksAcquired(keys);
+ else {
+ synchronized (timeoutObj) {
+ if (isDone())
+ return false;
+
+ markLocalDhtLocksAcquired(keys);
+ }
+ }
+
// Proceed and add new future (if any) before completing
embedded future.
if (mappings != null)
proceedMapping();
@@ -1308,6 +1310,18 @@ public final class GridDhtColocatedLockFuture extends
GridCacheCompoundIdentityF
fut));
}
+ /** @param keys Locally locked keys. */
+ private void markLocalDhtLocksAcquired(Collection<KeyCacheObject> keys) {
+ if (inTx()) {
+ for (KeyCacheObject key : keys)
+ tx.entry(cctx.txKey(key)).markLocked();
+ }
+ else {
+ for (KeyCacheObject key : keys)
+ cctx.mvcc().markExplicitOwner(cctx.txKey(key), threadId);
+ }
+ }
+
/**
* Tries to map this future in assumption that local node is primary for
all keys passed in.
* If node is not primary for one of the keys, then mapping is reverted
and full remote mapping is performed.
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheTryLockMultithreadedTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheTryLockMultithreadedTest.java
index f8fb20c421c..f988c84dbb8 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheTryLockMultithreadedTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheTryLockMultithreadedTest.java
@@ -17,27 +17,57 @@
package org.apache.ignite.internal.processors.cache.distributed;
+import java.util.List;
import java.util.concurrent.Callable;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.cache.CacheMode.REPLICATED;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
/**
*
*/
+@RunWith(Parameterized.class)
public class CacheTryLockMultithreadedTest extends GridCommonAbstractTest {
/** */
- private static final int SRVS = 2;
+ private static final int SRVS = 3;
+
+ /** */
+ @Parameterized.Parameter
+ public CacheMode cacheMode;
+
+ /** */
+ @Parameterized.Parameter(1)
+ public int backups;
+
+ /** */
+ @Parameterized.Parameters(name = "cacheMode={0}, backups={1}")
+ public static List<Object[]> parameters() {
+ return F.asList(
+ new Object[] {REPLICATED, 0},
+ new Object[] {PARTITIONED, 0},
+ new Object[] {PARTITIONED, 1}
+ );
+ }
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String
igniteInstanceName) throws Exception {
@@ -47,7 +77,8 @@ public class CacheTryLockMultithreadedTest extends
GridCommonAbstractTest {
ccfg.setAtomicityMode(TRANSACTIONAL);
ccfg.setWriteSynchronizationMode(FULL_SYNC);
- ccfg.setCacheMode(REPLICATED);
+ ccfg.setCacheMode(cacheMode);
+ ccfg.setBackups(backups);
cfg.setCacheConfiguration(ccfg);
@@ -55,14 +86,19 @@ public class CacheTryLockMultithreadedTest extends
GridCommonAbstractTest {
}
/** {@inheritDoc} */
- @Override protected void beforeTestsStarted() throws Exception {
- super.beforeTestsStarted();
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
startGridsMultiThreaded(SRVS);
startClientGrid(SRVS);
}
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+ }
+
/**
* @throws Exception If failed.
*/
@@ -74,7 +110,7 @@ public class CacheTryLockMultithreadedTest extends
GridCommonAbstractTest {
final IgniteCache<Integer, Integer> cache =
client.cache(DEFAULT_CACHE_NAME);
- final long stopTime = System.currentTimeMillis() + 30_000;
+ final long stopTime = System.currentTimeMillis() + 15_000;
GridTestUtils.runMultiThreaded(new Callable<Void>() {
@Override public Void call() throws Exception {
@@ -93,4 +129,45 @@ public class CacheTryLockMultithreadedTest extends
GridCommonAbstractTest {
}
}, 20, "lock-thread");
}
+
+ /** */
+ @Test
+ public void testCancelRequestOnTimeout() throws Exception {
+ IgniteEx node = grid(1);
+
+ IgniteCache<Integer, Integer> cache = node.cache(DEFAULT_CACHE_NAME);
+
+ List<Integer> keys = primaryKeys(cache, 100);
+
+ for (Integer key : keys)
+ cache.put(key, key);
+
+ long stopTime = U.currentTimeMillis() + 10_000;
+
+ IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(()
-> {
+ while (U.currentTimeMillis() < stopTime) {
+ Integer key =
keys.get(ThreadLocalRandom.current().nextInt(keys.size()));
+
+ Lock lock = cache.lock(key);
+
+ boolean locked = false;
+
+ try {
+ locked = lock.tryLock(10, TimeUnit.MILLISECONDS);
+
+ if (locked)
+ doSleep(20);
+ }
+ catch (InterruptedException ignored) {
+ // No-op.
+ }
+ finally {
+ if (locked)
+ lock.unlock();
+ }
+ }
+ }, 16, "lock-thread");
+
+ fut.get(getTestTimeout());
+ }
}