This is an automated email from the ASF dual-hosted git repository.

NSAmelchev pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 72c39d3f3f1 IGNITE-28758 Fixed an explicit cache lock race condition 
with timeout/cancel cleanup (#13227)
72c39d3f3f1 is described below

commit 72c39d3f3f106a348ae6038ccb9d462debc982a6
Author: Nikita Amelchev <[email protected]>
AuthorDate: Thu Jun 11 16:22:12 2026 +0300

    IGNITE-28758 Fixed an explicit cache lock race condition with 
timeout/cancel cleanup (#13227)
---
 .../cache/distributed/dht/GridDhtLockFuture.java   | 41 ++++++++--
 .../dht/colocated/GridDhtColocatedLockFuture.java  | 32 +++++---
 .../distributed/CacheTryLockMultithreadedTest.java | 87 ++++++++++++++++++++--
 3 files changed, 140 insertions(+), 20 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
index 7cba44d7ef5..740b235639b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
@@ -553,6 +553,17 @@ public final class GridDhtLockFuture extends 
GridCacheCompoundIdentityFuture<Boo
                 return;
             }
 
+            if (timedOut) {
+                if (msgLog.isDebugEnabled()) {
+                    msgLog.debug("DHT lock fut, response for finished future 
[txId=" + nearLockVer +
+                        ", dhtTxId=" + lockVer +
+                        ", inTx=" + inTx() +
+                        ", node=" + nodeId + ']');
+                }
+
+                return;
+            }
+
             U.warn(msgLog, "DHT lock fut, failed to find mini future [txId=" + 
nearLockVer +
                 ", dhtTxId=" + lockVer +
                 ", inTx=" + inTx() +
@@ -661,7 +672,7 @@ public final class GridDhtLockFuture extends 
GridCacheCompoundIdentityFuture<Boo
      * @param entry Entry whose lock ownership changed.
      */
     @Override public boolean onOwnerChanged(GridCacheEntryEx entry, 
GridCacheMvccCandidate owner) {
-        if (isDone() || (inTx() && (tx.remainingTime() == -1 || 
tx.isRollbackOnly())))
+        if (checkDone() || (inTx() && (tx.remainingTime() == -1 || 
tx.isRollbackOnly())))
             return false; // Check other futures.
 
         if (log.isDebugEnabled())
@@ -671,6 +682,9 @@ public final class GridDhtLockFuture extends 
GridCacheCompoundIdentityFuture<Boo
             boolean isEmpty;
 
             synchronized (this) {
+                if (checkDone())
+                    return false;
+
                 if (!pendingLocks.remove(entry.key()))
                     return false;
 
@@ -743,6 +757,9 @@ public final class GridDhtLockFuture extends 
GridCacheCompoundIdentityFuture<Boo
         if (log.isDebugEnabled())
             log.debug("Received onComplete(..) callback [success=" + success + 
", fut=" + this + ']');
 
+        if (isDone())
+            return false;
+
         if (!success && !stopping && unlock)
             undoLocks(true);
 
@@ -809,7 +826,7 @@ public final class GridDhtLockFuture extends 
GridCacheCompoundIdentityFuture<Boo
      * @return {@code True} if future is done.
      */
     private boolean checkDone() {
-        if (isDone()) {
+        if (isDone() || timedOut) {
             if (log.isDebugEnabled())
                 log.debug("Mapping won't proceed because future is done: " + 
this);
 
@@ -824,7 +841,7 @@ public final class GridDhtLockFuture extends 
GridCacheCompoundIdentityFuture<Boo
      */
     private void map(Iterable<GridDhtCacheEntry> entries) {
         synchronized (this) {
-            if (mapped)
+            if (mapped || checkDone())
                 return;
 
             mapped = true;
@@ -970,15 +987,27 @@ public final class GridDhtLockFuture extends 
GridCacheCompoundIdentityFuture<Boo
 
                                     GridCacheMvccCandidate added = 
e.candidate(lockVer);
 
-                                    assert added != null;
+                                    // Possible in case of lock cancellation.
+                                    if (added == null) {
+                                        onFailed();
+
+                                        return;
+                                    }
+
                                     assert added.dhtLocal();
 
                                     if (added.ownerVersion() != null)
                                         req.owned(e.key(), 
added.ownerVersion());
                                 }
                                 catch (GridCacheEntryRemovedException ex) {
-                                    assert false : "Entry cannot become 
obsolete when DHT local candidate is added " +
-                                        "[e=" + e + ", ex=" + ex + ']';
+                                    if (log.isDebugEnabled()) {
+                                        log.debug("Entry was removed while 
mapping DHT lock future (will fail) " +
+                                            "[e=" + e + ", ex=" + ex + ", 
fut=" + this + ']');
+                                    }
+
+                                    onFailed();
+
+                                    return;
                                 }
                             }
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
index 6bf6c13aedc..76690dd06d8 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
@@ -1283,16 +1283,18 @@ public final class GridDhtColocatedLockFuture extends 
GridCacheCompoundIdentityF
                     log.debug("Acquired lock for local DHT mapping [locId=" + 
cctx.nodeId() +
                         ", mappedKeys=" + keys + ", fut=" + this + ']');
 
-                if (inTx()) {
-                    for (KeyCacheObject key : keys)
-                        tx.entry(cctx.txKey(key)).markLocked();
-                }
-                else {
-                    for (KeyCacheObject key : keys)
-                        cctx.mvcc().markExplicitOwner(cctx.txKey(key), 
threadId);
-                }
-
                 try {
+                    if (timeoutObj == null)
+                        markLocalDhtLocksAcquired(keys);
+                    else {
+                        synchronized (timeoutObj) {
+                            if (isDone())
+                                return false;
+
+                            markLocalDhtLocksAcquired(keys);
+                        }
+                    }
+
                     // Proceed and add new future (if any) before completing 
embedded future.
                     if (mappings != null)
                         proceedMapping();
@@ -1308,6 +1310,18 @@ public final class GridDhtColocatedLockFuture extends 
GridCacheCompoundIdentityF
             fut));
     }
 
+    /** @param keys Locally locked keys. */
+    private void markLocalDhtLocksAcquired(Collection<KeyCacheObject> keys) {
+        if (inTx()) {
+            for (KeyCacheObject key : keys)
+                tx.entry(cctx.txKey(key)).markLocked();
+        }
+        else {
+            for (KeyCacheObject key : keys)
+                cctx.mvcc().markExplicitOwner(cctx.txKey(key), threadId);
+        }
+    }
+
     /**
      * Tries to map this future in assumption that local node is primary for 
all keys passed in.
      * If node is not primary for one of the keys, then mapping is reverted 
and full remote mapping is performed.
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheTryLockMultithreadedTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheTryLockMultithreadedTest.java
index f8fb20c421c..f988c84dbb8 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheTryLockMultithreadedTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheTryLockMultithreadedTest.java
@@ -17,27 +17,57 @@
 
 package org.apache.ignite.internal.processors.cache.distributed;
 
+import java.util.List;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
 import static org.apache.ignite.cache.CacheMode.REPLICATED;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
 
 /**
  *
  */
+@RunWith(Parameterized.class)
 public class CacheTryLockMultithreadedTest extends GridCommonAbstractTest {
     /** */
-    private static final int SRVS = 2;
+    private static final int SRVS = 3;
+
+    /** */
+    @Parameterized.Parameter
+    public CacheMode cacheMode;
+
+    /** */
+    @Parameterized.Parameter(1)
+    public int backups;
+
+    /** */
+    @Parameterized.Parameters(name = "cacheMode={0}, backups={1}")
+    public static List<Object[]> parameters() {
+        return F.asList(
+            new Object[] {REPLICATED, 0},
+            new Object[] {PARTITIONED, 0},
+            new Object[] {PARTITIONED, 1}
+        );
+    }
 
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
@@ -47,7 +77,8 @@ public class CacheTryLockMultithreadedTest extends 
GridCommonAbstractTest {
 
         ccfg.setAtomicityMode(TRANSACTIONAL);
         ccfg.setWriteSynchronizationMode(FULL_SYNC);
-        ccfg.setCacheMode(REPLICATED);
+        ccfg.setCacheMode(cacheMode);
+        ccfg.setBackups(backups);
 
         cfg.setCacheConfiguration(ccfg);
 
@@ -55,14 +86,19 @@ public class CacheTryLockMultithreadedTest extends 
GridCommonAbstractTest {
     }
 
     /** {@inheritDoc} */
-    @Override protected void beforeTestsStarted() throws Exception {
-        super.beforeTestsStarted();
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
 
         startGridsMultiThreaded(SRVS);
 
         startClientGrid(SRVS);
     }
 
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+    }
+
     /**
      * @throws Exception If failed.
      */
@@ -74,7 +110,7 @@ public class CacheTryLockMultithreadedTest extends 
GridCommonAbstractTest {
 
         final IgniteCache<Integer, Integer> cache = 
client.cache(DEFAULT_CACHE_NAME);
 
-        final long stopTime = System.currentTimeMillis() + 30_000;
+        final long stopTime = System.currentTimeMillis() + 15_000;
 
         GridTestUtils.runMultiThreaded(new Callable<Void>() {
             @Override public Void call() throws Exception {
@@ -93,4 +129,45 @@ public class CacheTryLockMultithreadedTest extends 
GridCommonAbstractTest {
             }
         }, 20, "lock-thread");
     }
+
+    /** */
+    @Test
+    public void testCancelRequestOnTimeout() throws Exception {
+        IgniteEx node = grid(1);
+
+        IgniteCache<Integer, Integer> cache = node.cache(DEFAULT_CACHE_NAME);
+
+        List<Integer> keys = primaryKeys(cache, 100);
+
+        for (Integer key : keys)
+            cache.put(key, key);
+
+        long stopTime = U.currentTimeMillis() + 10_000;
+
+        IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(() 
-> {
+            while (U.currentTimeMillis() < stopTime) {
+                Integer key = 
keys.get(ThreadLocalRandom.current().nextInt(keys.size()));
+
+                Lock lock = cache.lock(key);
+
+                boolean locked = false;
+
+                try {
+                    locked = lock.tryLock(10, TimeUnit.MILLISECONDS);
+
+                    if (locked)
+                        doSleep(20);
+                }
+                catch (InterruptedException ignored) {
+                    // No-op.
+                }
+                finally {
+                    if (locked)
+                        lock.unlock();
+                }
+            }
+        }, 16, "lock-thread");
+
+        fut.get(getTestTimeout());
+    }
 }

Reply via email to