This is an automated email from the ASF dual-hosted git repository.

ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new c74a930f587 IGNITE-27717 Add timeout to page locking (#7514)
c74a930f587 is described below

commit c74a930f587e0d121e4360c1c7776fbafc536ce6
Author: Ivan Bessonov <[email protected]>
AuthorDate: Wed Feb 4 07:52:16 2026 +0300

    IGNITE-27717 Add timeout to page locking (#7514)
---
 check-rules/spotbugs-excludes.xml                  |  12 +++
 .../ignite/internal/util/OffheapReadWriteLock.java | 116 +++++++++++++++++----
 .../ignite/internal/pagememory/tree/BplusTree.java |  50 ++++++---
 .../pagememory/PersistentPageMemoryDataRegion.java |  11 +-
 4 files changed, 151 insertions(+), 38 deletions(-)

diff --git a/check-rules/spotbugs-excludes.xml 
b/check-rules/spotbugs-excludes.xml
index 031ada26f94..5e5fc30c7f9 100644
--- a/check-rules/spotbugs-excludes.xml
+++ b/check-rules/spotbugs-excludes.xml
@@ -266,6 +266,18 @@
     <Bug pattern="SING_SINGLETON_HAS_NONPRIVATE_CONSTRUCTOR"/>
     <Class 
name="org.apache.ignite.internal.partition.replicator.schema.TableDefinitionDiff"/>
   </Match>
+  <Match>
+    <!-- Timeout is handled by other means. -->
+    <Bug pattern="RV_RETURN_VALUE_IGNORED_BAD_PRACTICE"/>
+    <Class name="org.apache.ignite.internal.util.OffheapReadWriteLock"/>
+    <Method name="awaitCondition"/>
+  </Match>
+  <Match>
+    <!-- Loop is outside of this method. -->
+    <Bug pattern="WA_AWAIT_NOT_IN_LOOP"/>
+    <Class name="org.apache.ignite.internal.util.OffheapReadWriteLock"/>
+    <Method name="awaitCondition"/>
+  </Match>
   <!-- end of false-positive exclusions -->
 
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/OffheapReadWriteLock.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/OffheapReadWriteLock.java
index f2cb3b50876..b4f12764c97 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/OffheapReadWriteLock.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/OffheapReadWriteLock.java
@@ -17,9 +17,15 @@
 
 package org.apache.ignite.internal.util;
 
+import static org.apache.ignite.internal.util.IgniteUtils.isPow2;
+import static org.apache.ignite.internal.util.StringUtils.hexInt;
+import static org.apache.ignite.internal.util.StringUtils.hexLong;
+
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
 import org.apache.ignite.internal.lang.IgniteSystemProperties;
+import org.apache.ignite.internal.tostring.S;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -70,16 +76,37 @@ public class OffheapReadWriteLock {
     /** Mask to extract stripe index from the hash. */
     private final int monitorsMask;
 
+    /** Lock timeout in nanoseconds. {@code 0L} if unlimited. */
+    private final long timeoutNanos;
+
+    /** An exception that's thrown by {@code *Lock} methods if the lock has 
not been acquired within a configured timeout. */
+    public static class LockTimeoutException extends RuntimeException {
+        private static final long serialVersionUID = 0L;
+
+        /**
+         * Constructor.
+         *
+         * @param message Exception message.
+         */
+        LockTimeoutException(String message) {
+            super(message);
+        }
+    }
+
     /**
      * Constructor.
      *
      * @param concLvl Concurrency level, must be a power of two.
+     * @param timeout Lock acquisition timeout. {@code 0} means an unlimited 
timeout.
+     * @param timeUnit Timeout time unit.
      */
-    public OffheapReadWriteLock(int concLvl) {
-        if ((concLvl & concLvl - 1) != 0) {
+    public OffheapReadWriteLock(int concLvl, long timeout, TimeUnit timeUnit) {
+        if (!isPow2(concLvl)) {
             throw new IllegalArgumentException("Concurrency level must be a 
power of 2: " + concLvl);
         }
 
+        timeoutNanos = timeUnit.toNanos(timeout);
+
         monitorsMask = concLvl - 1;
 
         locks = new ReentrantLock[concLvl];
@@ -95,6 +122,15 @@ public class OffheapReadWriteLock {
         }
     }
 
+    /**
+     * Constructor.
+     *
+     * @param concLvl Concurrency level, must be a power of two.
+     */
+    public OffheapReadWriteLock(int concLvl) {
+        this(concLvl, 0, TimeUnit.NANOSECONDS);
+    }
+
     /**
      * Initializes the lock.
      *
@@ -112,6 +148,7 @@ public class OffheapReadWriteLock {
      * Acquires a read lock.
      *
      * @param lock Lock address.
+     * @throws LockTimeoutException If lock acquisition timed out.
      */
     public boolean readLock(long lock, int tag) {
         long state = GridUnsafe.getLongVolatile(null, lock);
@@ -166,7 +203,7 @@ public class OffheapReadWriteLock {
 
             if (lockCount(state) <= 0) {
                 throw new IllegalMonitorStateException("Attempted to release a 
read lock while not holding it "
-                        + "[lock=" + StringUtils.hexLong(lock) + ", state=" + 
StringUtils.hexLong(state) + ']');
+                        + "[lock=" + hexLong(lock) + ", state=" + 
hexLong(state) + ']');
             }
 
             long updated = updateState(state, -1, 0, 0);
@@ -212,6 +249,7 @@ public class OffheapReadWriteLock {
      * Acquires a write lock.
      *
      * @param lock Lock address.
+     * @throws LockTimeoutException If lock acquisition timed out.
      */
     public boolean writeLock(long lock, int tag) {
         assert tag != 0;
@@ -285,7 +323,7 @@ public class OffheapReadWriteLock {
 
             if (lockCount(state) != -1) {
                 throw new IllegalMonitorStateException("Attempted to release 
write lock while not holding it "
-                        + "[lock=" + StringUtils.hexLong(lock) + ", state=" + 
StringUtils.hexLong(state) + ']');
+                        + "[lock=" + hexLong(lock) + ", state=" + 
hexLong(state) + ']');
             }
 
             updated = releaseWithTag(state, tag);
@@ -349,6 +387,7 @@ public class OffheapReadWriteLock {
      * @return {@code null} if tag validation failed, {@code true} if 
successfully traded the read lock to
      *      the write lock without leaving a gap. Returns {@code false} 
otherwise, in this case the resource
      *      state must be re-validated.
+     * @throws LockTimeoutException If lock acquisition timed out.
      */
     public @Nullable Boolean upgradeToWriteLock(long lock, int tag) {
         for (int i = 0; i < SPIN_CNT; i++) {
@@ -418,6 +457,7 @@ public class OffheapReadWriteLock {
         assert lockObj.isHeldByCurrentThread();
 
         boolean interrupted = false;
+        long startTimeNanos = System.nanoTime();
 
         try {
             while (true) {
@@ -426,13 +466,7 @@ public class OffheapReadWriteLock {
 
                     if (!checkTag(state, tag)) {
                         // We cannot lock with this tag, release waiter.
-                        long updated = updateState(state, 0, -1, 0);
-
-                        if (GridUnsafe.compareAndSwapLong(null, lock, state, 
updated)) {
-                            int writeWaitCnt = writersWaitCount(updated);
-
-                            signalNextWaiter(writeWaitCnt, lockIdx);
-
+                        if (tryReleaseWaiter(lock, lockIdx, state, true)) {
                             return false;
                         }
                     } else if (canReadLock(state)) {
@@ -442,7 +476,7 @@ public class OffheapReadWriteLock {
                             return true;
                         }
                     } else {
-                        waitCond.await();
+                        awaitCondition(lock, lockIdx, tag, startTimeNanos, 
waitCond, true);
                     }
                 } catch (InterruptedException ignore) {
                     interrupted = true;
@@ -470,6 +504,7 @@ public class OffheapReadWriteLock {
         assert lockObj.isHeldByCurrentThread();
 
         boolean interrupted = false;
+        long startTimeNanos = System.nanoTime();
 
         try {
             while (true) {
@@ -478,13 +513,7 @@ public class OffheapReadWriteLock {
 
                     if (!checkTag(state, tag)) {
                         // We cannot lock with this tag, release waiter.
-                        long updated = updateState(state, 0, 0, -1);
-
-                        if (GridUnsafe.compareAndSwapLong(null, lock, state, 
updated)) {
-                            int writeWaitCnt = writersWaitCount(updated);
-
-                            signalNextWaiter(writeWaitCnt, lockIdx);
-
+                        if (tryReleaseWaiter(lock, lockIdx, state, false)) {
                             return false;
                         }
                     } else if (canWriteLock(state)) {
@@ -494,7 +523,7 @@ public class OffheapReadWriteLock {
                             return true;
                         }
                     } else {
-                        waitCond.await();
+                        awaitCondition(lock, lockIdx, tag, startTimeNanos, 
waitCond, false);
                     }
                 } catch (InterruptedException ignore) {
                     interrupted = true;
@@ -507,6 +536,51 @@ public class OffheapReadWriteLock {
         }
     }
 
+    private boolean tryReleaseWaiter(long lock, int lockIdx, long state, 
boolean readLock) {
+        long updated = readLock ? updateState(state, 0, -1, 0) : 
updateState(state, 0, 0, -1);
+
+        if (GridUnsafe.compareAndSwapLong(null, lock, state, updated)) {
+            int writeWaitCnt = writersWaitCount(updated);
+
+            signalNextWaiter(writeWaitCnt, lockIdx);
+
+            return true;
+        }
+
+        return false;
+    }
+
+    @SuppressWarnings("AwaitNotInLoop")
+    private void awaitCondition(
+            long lock, int lockIdx, int tag, long startTimeNanos, Condition 
waitCond, boolean readLock
+    ) throws InterruptedException {
+        if (timeoutNanos == 0) {
+            waitCond.await();
+        } else {
+            long passedNanos = System.nanoTime() - startTimeNanos;
+
+            if (passedNanos >= timeoutNanos) {
+                //noinspection InfiniteLoopStatement
+                while (true) {
+                    long state = GridUnsafe.getLongVolatile(null, lock);
+
+                    if (tryReleaseWaiter(lock, lockIdx, state, readLock)) {
+                        throw new LockTimeoutException(S.toString("Timeout 
waiting for lock acquisition",
+                                "lock", hexLong(lock), false,
+                                "state", hexLong(state), false,
+                                "tag", hexInt(tag), false,
+                                "idx", lockIdx, false,
+                                "cond", waitCond.toString(), false,
+                                "timeout", 
TimeUnit.NANOSECONDS.toMillis(timeoutNanos) + "ms", false
+                        ));
+                    }
+                }
+            }
+
+            waitCond.awaitNanos(timeoutNanos - passedNanos);
+        }
+    }
+
     /**
      * Returns index of lock object corresponding to the stripe of this lock 
address.
      *
@@ -514,7 +588,7 @@ public class OffheapReadWriteLock {
      * @return Lock monitor object that corresponds to the stripe for this 
lock address.
      */
     private int lockIndex(long lock) {
-        return IgniteUtils.safeAbs(IgniteUtils.hash(lock)) & monitorsMask;
+        return IgniteUtils.hash(lock) & monitorsMask;
     }
 
     /**
diff --git 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/tree/BplusTree.java
 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/tree/BplusTree.java
index 4c3ea33ca5f..dbb5665ced5 100644
--- 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/tree/BplusTree.java
+++ 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/tree/BplusTree.java
@@ -1281,7 +1281,7 @@ public abstract class BplusTree<L, T extends L> extends 
DataStructure implements
         } catch (CorruptedDataStructureException e) {
             throw e;
         } catch (IgniteInternalCheckedException e) {
-            throw new IgniteInternalCheckedException("Runtime failure on 
bounds: [lower=" + lower + ", upper=" + upper + "]", e);
+            throw new IgniteInternalCheckedException("Runtime failure on 
bounds [lower=" + lower + ", upper=" + upper + "]", e);
         } catch (RuntimeException | AssertionError e) {
             long[] pageIds = pages(
                     lower == null || cursor.getCursor == null,
@@ -1289,7 +1289,7 @@ public abstract class BplusTree<L, T extends L> extends 
DataStructure implements
             );
 
             throw corruptedTreeException(
-                    "Runtime failure on bounds: [lower=" + lower + ", upper=" 
+ upper + "]",
+                    "Runtime failure on bounds [lower=" + lower + ", upper=" + 
upper + "]",
                     e,
                     grpId,
                     pageIds
@@ -1317,10 +1317,10 @@ public abstract class BplusTree<L, T extends L> extends 
DataStructure implements
         } catch (CorruptedDataStructureException e) {
             throw e;
         } catch (IgniteInternalCheckedException e) {
-            throw new IgniteInternalCheckedException("Runtime failure on 
bounds: [lower=" + lower + ", upper=" + upper + "]", e);
+            throw new IgniteInternalCheckedException("Runtime failure on 
bounds [lower=" + lower + ", upper=" + upper + "]", e);
         } catch (RuntimeException | AssertionError e) {
             throw corruptedTreeException(
-                    "Runtime failure on bounds: [lower=" + lower + ", upper=" 
+ upper + "]",
+                    "Runtime failure on bounds [lower=" + lower + ", upper=" + 
upper + "]",
                     e,
                     grpId,
                     pages(cursor.getCursor != null, () -> new 
long[]{cursor.getCursor.pageId})
@@ -1344,11 +1344,11 @@ public abstract class BplusTree<L, T extends L> extends 
DataStructure implements
         try {
             new TreeVisitor(lower, upper, c).visit();
         } catch (IgniteInternalCheckedException e) {
-            throw new IgniteInternalCheckedException("Runtime failure on 
bounds: [lower=" + lower + ", upper=" + upper + "]", e);
+            throw new IgniteInternalCheckedException("Runtime failure on 
bounds [lower=" + lower + ", upper=" + upper + "]", e);
         } catch (RuntimeException e) {
-            throw new IgniteInternalException("Runtime failure on bounds: 
[lower=" + lower + ", upper=" + upper + "]", e);
+            throw new IgniteInternalException("Runtime failure on bounds 
[lower=" + lower + ", upper=" + upper + "]", e);
         } catch (AssertionError e) {
-            throw new AssertionError("Assertion error on bounds: [lower=" + 
lower + ", upper=" + upper + "]", e);
+            throw new AssertionError("Assertion error on bounds [lower=" + 
lower + ", upper=" + upper + "]", e);
         } finally {
             checkDestroyed();
         }
@@ -1556,7 +1556,7 @@ public abstract class BplusTree<L, T extends L> extends 
DataStructure implements
         } catch (IgniteInternalCheckedException e) {
             throw new IgniteInternalCheckedException("Runtime failure on 
lookup row: " + row, e);
         } catch (RuntimeException | AssertionError e) {
-            throw corruptedTreeException("Runtime failure on lookup row: " + 
row, e, grpId, g.pageId);
+            throw corruptedTreeException("Runtime failure on lookup [row=" + 
row + "]", e, grpId, g.pageId);
         } finally {
             checkDestroyed();
         }
@@ -1589,7 +1589,7 @@ public abstract class BplusTree<L, T extends L> extends 
DataStructure implements
         } catch (IgniteInternalCheckedException e) {
             throw new IgniteInternalCheckedException("Runtime failure on 
lookup next row: " + lowerBound, e);
         } catch (RuntimeException | AssertionError e) {
-            throw corruptedTreeException("Runtime failure on lookup next row: 
" + lowerBound, e, grpId, g.pageId);
+            throw corruptedTreeException("Runtime failure on lookup next row 
[lower=" + lowerBound + "]", e, grpId, g.pageId);
         } finally {
             checkDestroyed();
         }
@@ -2103,9 +2103,9 @@ public abstract class BplusTree<L, T extends L> extends 
DataStructure implements
         } catch (CorruptedDataStructureException e) {
             throw e;
         } catch (IgniteInternalCheckedException e) {
-            throw new IgniteInternalCheckedException("Runtime failure on 
search row: " + row, e);
+            throw new IgniteInternalCheckedException("Runtime failure on 
invoke [row=" + row + ", op=" + x.op + "]", e);
         } catch (RuntimeException | AssertionError e) {
-            throw corruptedTreeException("Runtime failure on search row: " + 
row, e, grpId, x.pageId);
+            throw corruptedTreeException("Runtime failure on invoke [row=" + 
row + ", op=" + x.op + "]", e, grpId, x.pageId);
         } finally {
             x.releaseAll();
             checkDestroyed();
@@ -2253,9 +2253,9 @@ public abstract class BplusTree<L, T extends L> extends 
DataStructure implements
         } catch (CorruptedDataStructureException e) {
             throw e;
         } catch (IgniteInternalCheckedException e) {
-            throw new IgniteInternalCheckedException("Runtime failure on 
search row: " + row, e);
+            throw new IgniteInternalCheckedException("Runtime failure on 
remove [row=" + row + ", op=" + r + "]", e);
         } catch (RuntimeException | AssertionError e) {
-            throw corruptedTreeException("Runtime failure on search row: " + 
row, e, grpId, r.pageId);
+            throw corruptedTreeException("Runtime failure on remove [row=" + 
row + ", op=" + r + "]", e, grpId, r.pageId);
         } finally {
             r.releaseAll();
             checkDestroyed();
@@ -2607,9 +2607,9 @@ public abstract class BplusTree<L, T extends L> extends 
DataStructure implements
         } catch (CorruptedDataStructureException e) {
             throw e;
         } catch (IgniteInternalCheckedException e) {
-            throw new IgniteInternalCheckedException("Runtime failure on row: 
" + row, e);
+            throw new IgniteInternalCheckedException("Runtime failure on put 
[row=" + row + ", op=" + p + "]", e);
         } catch (RuntimeException | AssertionError e) {
-            throw corruptedTreeException("Runtime failure on row: " + row, e, 
grpId, p.pageId);
+            throw corruptedTreeException("Runtime failure on put [row=" + row 
+ ", op=" + p + "]", e, grpId, p.pageId);
         } finally {
             checkDestroyed();
         }
@@ -4141,6 +4141,11 @@ public abstract class BplusTree<L, T extends L> extends 
DataStructure implements
                 super.checkLockRetry();
             }
         }
+
+        @Override
+        public String toString() {
+            return "Put [super=" + super.toString() + ", needOld=" + needOld + 
"]";
+        }
     }
 
     /**
@@ -4662,6 +4667,16 @@ public abstract class BplusTree<L, T extends L> extends 
DataStructure implements
 
             return sb.toString();
         }
+
+        @Override
+        public String toString() {
+            try {
+                return "Update [tail=" + printTail(false) + ']';
+            } catch (IgniteInternalCheckedException ignore) {
+                // Should be impossible if "keys == false" in "printTail".
+                return null;
+            }
+        }
     }
 
     /**
@@ -5511,6 +5526,11 @@ public abstract class BplusTree<L, T extends L> extends 
DataStructure implements
 
             return res;
         }
+
+        @Override
+        public String toString() {
+            return "Remove [super=" + super.toString() + ']';
+        }
     }
 
     /**
diff --git 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryDataRegion.java
 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryDataRegion.java
index 1534a6a38d2..ecbe3d0922f 100644
--- 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryDataRegion.java
+++ 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryDataRegion.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.storage.pagememory;
 
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static 
org.apache.ignite.internal.storage.configurations.StorageProfileConfigurationSchema.UNSPECIFIED_SIZE;
 import static 
org.apache.ignite.internal.storage.pagememory.PersistentPageMemoryStorageEngine.ENGINE_NAME;
 import static 
org.apache.ignite.internal.storage.pagememory.PersistentPageMemoryStorageEngine.THROTTLING_LOG_THRESHOLD_SYSTEM_PROPERTY;
@@ -25,6 +26,7 @@ import static 
org.apache.ignite.internal.storage.pagememory.PersistentPageMemory
 import static 
org.apache.ignite.internal.storage.pagememory.PersistentPageMemoryStorageEngine.THROTTLING_TYPE_SYSTEM_PROPERTY;
 import static org.apache.ignite.internal.util.Constants.GiB;
 import static org.apache.ignite.internal.util.Constants.MiB;
+import static 
org.apache.ignite.internal.util.OffheapReadWriteLock.DEFAULT_CONCURRENCY_LEVEL;
 
 import java.nio.ByteBuffer;
 import java.util.Arrays;
@@ -170,6 +172,11 @@ public class PersistentPageMemoryDataRegion implements 
DataRegion<PersistentPage
 
         this.regionSize = sizeBytes;
 
+        long checkpointReadLockTimeout = 
checkpointManager.checkpointTimeoutLock().checkpointReadLockTimeout();
+        OffheapReadWriteLock offheapReadWriteLock = checkpointReadLockTimeout 
== 0L
+                ? new OffheapReadWriteLock(DEFAULT_CONCURRENCY_LEVEL)
+                : new OffheapReadWriteLock(DEFAULT_CONCURRENCY_LEVEL, 
checkpointReadLockTimeout, MILLISECONDS);
+
         PersistentPageMemory pageMemory = new PersistentPageMemory(
                 regionConfiguration(dataRegionConfigView, sizeBytes, pageSize),
                 metricSource,
@@ -179,7 +186,7 @@ public class PersistentPageMemoryDataRegion implements 
DataRegion<PersistentPage
                 filePageStoreManager,
                 this::flushDirtyPageOnReplacement,
                 checkpointManager.checkpointTimeoutLock(),
-                new 
OffheapReadWriteLock(OffheapReadWriteLock.DEFAULT_CONCURRENCY_LEVEL),
+                offheapReadWriteLock,
                 checkpointManager.partitionDestructionLockManager()
         );
 
@@ -254,7 +261,7 @@ public class PersistentPageMemoryDataRegion implements 
DataRegion<PersistentPage
     }
 
     private long getLoggingThreshold() {
-        return TimeUnit.MILLISECONDS.toNanos(getSystemConfig(
+        return MILLISECONDS.toNanos(getSystemConfig(
                 THROTTLING_LOG_THRESHOLD_SYSTEM_PROPERTY,
                 
TimeUnit.NANOSECONDS.toMillis(PagesWriteThrottlePolicy.DEFAULT_LOGGING_THRESHOLD),
                 value -> {

Reply via email to