This is an automated email from the ASF dual-hosted git repository.
ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new c74a930f587 IGNITE-27717 Add timeout to page locking (#7514)
c74a930f587 is described below
commit c74a930f587e0d121e4360c1c7776fbafc536ce6
Author: Ivan Bessonov <[email protected]>
AuthorDate: Wed Feb 4 07:52:16 2026 +0300
IGNITE-27717 Add timeout to page locking (#7514)
---
check-rules/spotbugs-excludes.xml | 12 +++
.../ignite/internal/util/OffheapReadWriteLock.java | 116 +++++++++++++++++----
.../ignite/internal/pagememory/tree/BplusTree.java | 50 ++++++---
.../pagememory/PersistentPageMemoryDataRegion.java | 11 +-
4 files changed, 151 insertions(+), 38 deletions(-)
diff --git a/check-rules/spotbugs-excludes.xml
b/check-rules/spotbugs-excludes.xml
index 031ada26f94..5e5fc30c7f9 100644
--- a/check-rules/spotbugs-excludes.xml
+++ b/check-rules/spotbugs-excludes.xml
@@ -266,6 +266,18 @@
<Bug pattern="SING_SINGLETON_HAS_NONPRIVATE_CONSTRUCTOR"/>
<Class
name="org.apache.ignite.internal.partition.replicator.schema.TableDefinitionDiff"/>
</Match>
+ <Match>
+ <!-- Timeout is handled by other means. -->
+ <Bug pattern="RV_RETURN_VALUE_IGNORED_BAD_PRACTICE"/>
+ <Class name="org.apache.ignite.internal.util.OffheapReadWriteLock"/>
+ <Method name="awaitCondition"/>
+ </Match>
+ <Match>
+ <!-- Loop is outside of this method. -->
+ <Bug pattern="WA_AWAIT_NOT_IN_LOOP"/>
+ <Class name="org.apache.ignite.internal.util.OffheapReadWriteLock"/>
+ <Method name="awaitCondition"/>
+ </Match>
<!-- end of false-positive exclusions -->
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/util/OffheapReadWriteLock.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/OffheapReadWriteLock.java
index f2cb3b50876..b4f12764c97 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/util/OffheapReadWriteLock.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/util/OffheapReadWriteLock.java
@@ -17,9 +17,15 @@
package org.apache.ignite.internal.util;
+import static org.apache.ignite.internal.util.IgniteUtils.isPow2;
+import static org.apache.ignite.internal.util.StringUtils.hexInt;
+import static org.apache.ignite.internal.util.StringUtils.hexLong;
+
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.ignite.internal.lang.IgniteSystemProperties;
+import org.apache.ignite.internal.tostring.S;
import org.jetbrains.annotations.Nullable;
/**
@@ -70,16 +76,37 @@ public class OffheapReadWriteLock {
/** Mask to extract stripe index from the hash. */
private final int monitorsMask;
+ /** Lock timeout in nanoseconds. {@code 0L} if unlimited. */
+ private final long timeoutNanos;
+
+ /** An exception that's thrown by {@code *Lock} methods if the lock has
not been acquired within a configured timeout. */
+ public static class LockTimeoutException extends RuntimeException {
+ private static final long serialVersionUID = 0L;
+
+ /**
+ * Constructor.
+ *
+ * @param message Exception message.
+ */
+ LockTimeoutException(String message) {
+ super(message);
+ }
+ }
+
/**
* Constructor.
*
* @param concLvl Concurrency level, must be a power of two.
+ * @param timeout Lock acquisition timeout. {@code 0} means an unlimited
timeout.
+ * @param timeUnit Timeout time unit.
*/
- public OffheapReadWriteLock(int concLvl) {
- if ((concLvl & concLvl - 1) != 0) {
+ public OffheapReadWriteLock(int concLvl, long timeout, TimeUnit timeUnit) {
+ if (!isPow2(concLvl)) {
throw new IllegalArgumentException("Concurrency level must be a
power of 2: " + concLvl);
}
+ timeoutNanos = timeUnit.toNanos(timeout);
+
monitorsMask = concLvl - 1;
locks = new ReentrantLock[concLvl];
@@ -95,6 +122,15 @@ public class OffheapReadWriteLock {
}
}
+ /**
+ * Constructor.
+ *
+ * @param concLvl Concurrency level, must be a power of two.
+ */
+ public OffheapReadWriteLock(int concLvl) {
+ this(concLvl, 0, TimeUnit.NANOSECONDS);
+ }
+
/**
* Initializes the lock.
*
@@ -112,6 +148,7 @@ public class OffheapReadWriteLock {
* Acquires a read lock.
*
* @param lock Lock address.
+ * @throws LockTimeoutException If lock acquisition timed out.
*/
public boolean readLock(long lock, int tag) {
long state = GridUnsafe.getLongVolatile(null, lock);
@@ -166,7 +203,7 @@ public class OffheapReadWriteLock {
if (lockCount(state) <= 0) {
throw new IllegalMonitorStateException("Attempted to release a
read lock while not holding it "
- + "[lock=" + StringUtils.hexLong(lock) + ", state=" +
StringUtils.hexLong(state) + ']');
+ + "[lock=" + hexLong(lock) + ", state=" +
hexLong(state) + ']');
}
long updated = updateState(state, -1, 0, 0);
@@ -212,6 +249,7 @@ public class OffheapReadWriteLock {
* Acquires a write lock.
*
* @param lock Lock address.
+ * @throws LockTimeoutException If lock acquisition timed out.
*/
public boolean writeLock(long lock, int tag) {
assert tag != 0;
@@ -285,7 +323,7 @@ public class OffheapReadWriteLock {
if (lockCount(state) != -1) {
throw new IllegalMonitorStateException("Attempted to release
write lock while not holding it "
- + "[lock=" + StringUtils.hexLong(lock) + ", state=" +
StringUtils.hexLong(state) + ']');
+ + "[lock=" + hexLong(lock) + ", state=" +
hexLong(state) + ']');
}
updated = releaseWithTag(state, tag);
@@ -349,6 +387,7 @@ public class OffheapReadWriteLock {
* @return {@code null} if tag validation failed, {@code true} if
successfully traded the read lock to
* the write lock without leaving a gap. Returns {@code false}
otherwise, in this case the resource
* state must be re-validated.
+ * @throws LockTimeoutException If lock acquisition timed out.
*/
public @Nullable Boolean upgradeToWriteLock(long lock, int tag) {
for (int i = 0; i < SPIN_CNT; i++) {
@@ -418,6 +457,7 @@ public class OffheapReadWriteLock {
assert lockObj.isHeldByCurrentThread();
boolean interrupted = false;
+ long startTimeNanos = System.nanoTime();
try {
while (true) {
@@ -426,13 +466,7 @@ public class OffheapReadWriteLock {
if (!checkTag(state, tag)) {
// We cannot lock with this tag, release waiter.
- long updated = updateState(state, 0, -1, 0);
-
- if (GridUnsafe.compareAndSwapLong(null, lock, state,
updated)) {
- int writeWaitCnt = writersWaitCount(updated);
-
- signalNextWaiter(writeWaitCnt, lockIdx);
-
+ if (tryReleaseWaiter(lock, lockIdx, state, true)) {
return false;
}
} else if (canReadLock(state)) {
@@ -442,7 +476,7 @@ public class OffheapReadWriteLock {
return true;
}
} else {
- waitCond.await();
+ awaitCondition(lock, lockIdx, tag, startTimeNanos,
waitCond, true);
}
} catch (InterruptedException ignore) {
interrupted = true;
@@ -470,6 +504,7 @@ public class OffheapReadWriteLock {
assert lockObj.isHeldByCurrentThread();
boolean interrupted = false;
+ long startTimeNanos = System.nanoTime();
try {
while (true) {
@@ -478,13 +513,7 @@ public class OffheapReadWriteLock {
if (!checkTag(state, tag)) {
// We cannot lock with this tag, release waiter.
- long updated = updateState(state, 0, 0, -1);
-
- if (GridUnsafe.compareAndSwapLong(null, lock, state,
updated)) {
- int writeWaitCnt = writersWaitCount(updated);
-
- signalNextWaiter(writeWaitCnt, lockIdx);
-
+ if (tryReleaseWaiter(lock, lockIdx, state, false)) {
return false;
}
} else if (canWriteLock(state)) {
@@ -494,7 +523,7 @@ public class OffheapReadWriteLock {
return true;
}
} else {
- waitCond.await();
+ awaitCondition(lock, lockIdx, tag, startTimeNanos,
waitCond, false);
}
} catch (InterruptedException ignore) {
interrupted = true;
@@ -507,6 +536,51 @@ public class OffheapReadWriteLock {
}
}
+ private boolean tryReleaseWaiter(long lock, int lockIdx, long state,
boolean readLock) {
+ long updated = readLock ? updateState(state, 0, -1, 0) :
updateState(state, 0, 0, -1);
+
+ if (GridUnsafe.compareAndSwapLong(null, lock, state, updated)) {
+ int writeWaitCnt = writersWaitCount(updated);
+
+ signalNextWaiter(writeWaitCnt, lockIdx);
+
+ return true;
+ }
+
+ return false;
+ }
+
+ @SuppressWarnings("AwaitNotInLoop")
+ private void awaitCondition(
+ long lock, int lockIdx, int tag, long startTimeNanos, Condition
waitCond, boolean readLock
+ ) throws InterruptedException {
+ if (timeoutNanos == 0) {
+ waitCond.await();
+ } else {
+ long passedNanos = System.nanoTime() - startTimeNanos;
+
+ if (passedNanos >= timeoutNanos) {
+ //noinspection InfiniteLoopStatement
+ while (true) {
+ long state = GridUnsafe.getLongVolatile(null, lock);
+
+ if (tryReleaseWaiter(lock, lockIdx, state, readLock)) {
+ throw new LockTimeoutException(S.toString("Timeout
waiting for lock acquisition",
+ "lock", hexLong(lock), false,
+ "state", hexLong(state), false,
+ "tag", hexInt(tag), false,
+ "idx", lockIdx, false,
+ "cond", waitCond.toString(), false,
+ "timeout",
TimeUnit.NANOSECONDS.toMillis(timeoutNanos) + "ms", false
+ ));
+ }
+ }
+ }
+
+ waitCond.awaitNanos(timeoutNanos - passedNanos);
+ }
+ }
+
/**
* Returns index of lock object corresponding to the stripe of this lock
address.
*
@@ -514,7 +588,7 @@ public class OffheapReadWriteLock {
* @return Lock monitor object that corresponds to the stripe for this
lock address.
*/
private int lockIndex(long lock) {
- return IgniteUtils.safeAbs(IgniteUtils.hash(lock)) & monitorsMask;
+ return IgniteUtils.hash(lock) & monitorsMask;
}
/**
diff --git
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/tree/BplusTree.java
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/tree/BplusTree.java
index 4c3ea33ca5f..dbb5665ced5 100644
---
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/tree/BplusTree.java
+++
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/tree/BplusTree.java
@@ -1281,7 +1281,7 @@ public abstract class BplusTree<L, T extends L> extends
DataStructure implements
} catch (CorruptedDataStructureException e) {
throw e;
} catch (IgniteInternalCheckedException e) {
- throw new IgniteInternalCheckedException("Runtime failure on
bounds: [lower=" + lower + ", upper=" + upper + "]", e);
+ throw new IgniteInternalCheckedException("Runtime failure on
bounds [lower=" + lower + ", upper=" + upper + "]", e);
} catch (RuntimeException | AssertionError e) {
long[] pageIds = pages(
lower == null || cursor.getCursor == null,
@@ -1289,7 +1289,7 @@ public abstract class BplusTree<L, T extends L> extends
DataStructure implements
);
throw corruptedTreeException(
- "Runtime failure on bounds: [lower=" + lower + ", upper="
+ upper + "]",
+ "Runtime failure on bounds [lower=" + lower + ", upper=" +
upper + "]",
e,
grpId,
pageIds
@@ -1317,10 +1317,10 @@ public abstract class BplusTree<L, T extends L> extends
DataStructure implements
} catch (CorruptedDataStructureException e) {
throw e;
} catch (IgniteInternalCheckedException e) {
- throw new IgniteInternalCheckedException("Runtime failure on
bounds: [lower=" + lower + ", upper=" + upper + "]", e);
+ throw new IgniteInternalCheckedException("Runtime failure on
bounds [lower=" + lower + ", upper=" + upper + "]", e);
} catch (RuntimeException | AssertionError e) {
throw corruptedTreeException(
- "Runtime failure on bounds: [lower=" + lower + ", upper="
+ upper + "]",
+ "Runtime failure on bounds [lower=" + lower + ", upper=" +
upper + "]",
e,
grpId,
pages(cursor.getCursor != null, () -> new
long[]{cursor.getCursor.pageId})
@@ -1344,11 +1344,11 @@ public abstract class BplusTree<L, T extends L> extends
DataStructure implements
try {
new TreeVisitor(lower, upper, c).visit();
} catch (IgniteInternalCheckedException e) {
- throw new IgniteInternalCheckedException("Runtime failure on
bounds: [lower=" + lower + ", upper=" + upper + "]", e);
+ throw new IgniteInternalCheckedException("Runtime failure on
bounds [lower=" + lower + ", upper=" + upper + "]", e);
} catch (RuntimeException e) {
- throw new IgniteInternalException("Runtime failure on bounds:
[lower=" + lower + ", upper=" + upper + "]", e);
+ throw new IgniteInternalException("Runtime failure on bounds
[lower=" + lower + ", upper=" + upper + "]", e);
} catch (AssertionError e) {
- throw new AssertionError("Assertion error on bounds: [lower=" +
lower + ", upper=" + upper + "]", e);
+ throw new AssertionError("Assertion error on bounds [lower=" +
lower + ", upper=" + upper + "]", e);
} finally {
checkDestroyed();
}
@@ -1556,7 +1556,7 @@ public abstract class BplusTree<L, T extends L> extends
DataStructure implements
} catch (IgniteInternalCheckedException e) {
throw new IgniteInternalCheckedException("Runtime failure on
lookup row: " + row, e);
} catch (RuntimeException | AssertionError e) {
- throw corruptedTreeException("Runtime failure on lookup row: " +
row, e, grpId, g.pageId);
+ throw corruptedTreeException("Runtime failure on lookup [row=" +
row + "]", e, grpId, g.pageId);
} finally {
checkDestroyed();
}
@@ -1589,7 +1589,7 @@ public abstract class BplusTree<L, T extends L> extends
DataStructure implements
} catch (IgniteInternalCheckedException e) {
throw new IgniteInternalCheckedException("Runtime failure on
lookup next row: " + lowerBound, e);
} catch (RuntimeException | AssertionError e) {
- throw corruptedTreeException("Runtime failure on lookup next row:
" + lowerBound, e, grpId, g.pageId);
+ throw corruptedTreeException("Runtime failure on lookup next row
[lower=" + lowerBound + "]", e, grpId, g.pageId);
} finally {
checkDestroyed();
}
@@ -2103,9 +2103,9 @@ public abstract class BplusTree<L, T extends L> extends
DataStructure implements
} catch (CorruptedDataStructureException e) {
throw e;
} catch (IgniteInternalCheckedException e) {
- throw new IgniteInternalCheckedException("Runtime failure on
search row: " + row, e);
+ throw new IgniteInternalCheckedException("Runtime failure on
invoke [row=" + row + ", op=" + x.op + "]", e);
} catch (RuntimeException | AssertionError e) {
- throw corruptedTreeException("Runtime failure on search row: " +
row, e, grpId, x.pageId);
+ throw corruptedTreeException("Runtime failure on invoke [row=" +
row + ", op=" + x.op + "]", e, grpId, x.pageId);
} finally {
x.releaseAll();
checkDestroyed();
@@ -2253,9 +2253,9 @@ public abstract class BplusTree<L, T extends L> extends
DataStructure implements
} catch (CorruptedDataStructureException e) {
throw e;
} catch (IgniteInternalCheckedException e) {
- throw new IgniteInternalCheckedException("Runtime failure on
search row: " + row, e);
+ throw new IgniteInternalCheckedException("Runtime failure on
remove [row=" + row + ", op=" + r + "]", e);
} catch (RuntimeException | AssertionError e) {
- throw corruptedTreeException("Runtime failure on search row: " +
row, e, grpId, r.pageId);
+ throw corruptedTreeException("Runtime failure on remove [row=" +
row + ", op=" + r + "]", e, grpId, r.pageId);
} finally {
r.releaseAll();
checkDestroyed();
@@ -2607,9 +2607,9 @@ public abstract class BplusTree<L, T extends L> extends
DataStructure implements
} catch (CorruptedDataStructureException e) {
throw e;
} catch (IgniteInternalCheckedException e) {
- throw new IgniteInternalCheckedException("Runtime failure on row:
" + row, e);
+ throw new IgniteInternalCheckedException("Runtime failure on put
[row=" + row + ", op=" + p + "]", e);
} catch (RuntimeException | AssertionError e) {
- throw corruptedTreeException("Runtime failure on row: " + row, e,
grpId, p.pageId);
+ throw corruptedTreeException("Runtime failure on put [row=" + row
+ ", op=" + p + "]", e, grpId, p.pageId);
} finally {
checkDestroyed();
}
@@ -4141,6 +4141,11 @@ public abstract class BplusTree<L, T extends L> extends
DataStructure implements
super.checkLockRetry();
}
}
+
+ @Override
+ public String toString() {
+ return "Put [super=" + super.toString() + ", needOld=" + needOld +
"]";
+ }
}
/**
@@ -4662,6 +4667,16 @@ public abstract class BplusTree<L, T extends L> extends
DataStructure implements
return sb.toString();
}
+
+ @Override
+ public String toString() {
+ try {
+ return "Update [tail=" + printTail(false) + ']';
+ } catch (IgniteInternalCheckedException ignore) {
+ // Should be impossible if "keys == false" in "printTail".
+ return null;
+ }
+ }
}
/**
@@ -5511,6 +5526,11 @@ public abstract class BplusTree<L, T extends L> extends
DataStructure implements
return res;
}
+
+ @Override
+ public String toString() {
+ return "Remove [super=" + super.toString() + ']';
+ }
}
/**
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryDataRegion.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryDataRegion.java
index 1534a6a38d2..ecbe3d0922f 100644
---
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryDataRegion.java
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryDataRegion.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.storage.pagememory;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static
org.apache.ignite.internal.storage.configurations.StorageProfileConfigurationSchema.UNSPECIFIED_SIZE;
import static
org.apache.ignite.internal.storage.pagememory.PersistentPageMemoryStorageEngine.ENGINE_NAME;
import static
org.apache.ignite.internal.storage.pagememory.PersistentPageMemoryStorageEngine.THROTTLING_LOG_THRESHOLD_SYSTEM_PROPERTY;
@@ -25,6 +26,7 @@ import static
org.apache.ignite.internal.storage.pagememory.PersistentPageMemory
import static
org.apache.ignite.internal.storage.pagememory.PersistentPageMemoryStorageEngine.THROTTLING_TYPE_SYSTEM_PROPERTY;
import static org.apache.ignite.internal.util.Constants.GiB;
import static org.apache.ignite.internal.util.Constants.MiB;
+import static
org.apache.ignite.internal.util.OffheapReadWriteLock.DEFAULT_CONCURRENCY_LEVEL;
import java.nio.ByteBuffer;
import java.util.Arrays;
@@ -170,6 +172,11 @@ public class PersistentPageMemoryDataRegion implements
DataRegion<PersistentPage
this.regionSize = sizeBytes;
+ long checkpointReadLockTimeout =
checkpointManager.checkpointTimeoutLock().checkpointReadLockTimeout();
+ OffheapReadWriteLock offheapReadWriteLock = checkpointReadLockTimeout
== 0L
+ ? new OffheapReadWriteLock(DEFAULT_CONCURRENCY_LEVEL)
+ : new OffheapReadWriteLock(DEFAULT_CONCURRENCY_LEVEL,
checkpointReadLockTimeout, MILLISECONDS);
+
PersistentPageMemory pageMemory = new PersistentPageMemory(
regionConfiguration(dataRegionConfigView, sizeBytes, pageSize),
metricSource,
@@ -179,7 +186,7 @@ public class PersistentPageMemoryDataRegion implements
DataRegion<PersistentPage
filePageStoreManager,
this::flushDirtyPageOnReplacement,
checkpointManager.checkpointTimeoutLock(),
- new
OffheapReadWriteLock(OffheapReadWriteLock.DEFAULT_CONCURRENCY_LEVEL),
+ offheapReadWriteLock,
checkpointManager.partitionDestructionLockManager()
);
@@ -254,7 +261,7 @@ public class PersistentPageMemoryDataRegion implements
DataRegion<PersistentPage
}
private long getLoggingThreshold() {
- return TimeUnit.MILLISECONDS.toNanos(getSystemConfig(
+ return MILLISECONDS.toNanos(getSystemConfig(
THROTTLING_LOG_THRESHOLD_SYSTEM_PROPERTY,
TimeUnit.NANOSECONDS.toMillis(PagesWriteThrottlePolicy.DEFAULT_LOGGING_THRESHOLD),
value -> {