This is an automated email from the ASF dual-hosted git repository.
rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 9e0070243fb IGNITE-23779 Decrease contention on attachment lock (#6472)
9e0070243fb is described below
commit 9e0070243fb20ede0353343fa7974528c4ab5265
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Mon Aug 25 17:53:28 2025 +0400
IGNITE-23779 Decrease contention on attachment lock (#6472)
A striped lock over VersatileReadWriteLock is introduced so that contention
is decreased
---
.../util/StripedVersatileReadWriteLock.java | 229 +++++++++++++++++++++
.../internal/util/VersatileReadWriteLock.java | 4 +-
...java => StripedVersatileReadWriteLockTest.java} | 141 +++++--------
.../util/VersatileReadWriteLockBenchmark.java | 85 ++++++++
.../internal/util/VersatileReadWriteLockTest.java | 48 +++--
.../internal/restart/IgniteAttachmentLock.java | 6 +-
.../internal/restart/RestartProofKeyValueView.java | 9 +-
7 files changed, 414 insertions(+), 108 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/util/StripedVersatileReadWriteLock.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedVersatileReadWriteLock.java
new file mode 100644
index 00000000000..1488de8c69c
--- /dev/null
+++
b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedVersatileReadWriteLock.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util;
+
+import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+
+import java.util.Arrays;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Supplier;
+
+/**
+ * A versatile read-write lock that can be used both in synchronous and
asynchronous contexts.
+ * Its blocking methods use the spinwait strategy. When they do so, they are
not interruptible (that is, they do not break their loop on
+ * interruption signal).
+ *
+ * <p>The locks are NOT reentrant (that is, the same thread can NOT acquire
the same lock a few times without releasing it).
+ *
+ * <p>Write lock acquire requests are prioritized over read lock acquire
requests. That is, if both read and write lock
+ * acquire requests are received when the write lock is held by someone else,
then, on its release, the write lock attempt will be served
+ * first.
+ *
+ * <p>Lock owners are not tracked.
+ *
+ * <p>Asynchronous locking methods may complete the futures either in the
calling thread (if they were able to immediately acquire
+ * the requested lock) or in the supplied pool (if they had to wait for a
release to happen before being able to satisfy the request).
+ *
+ * <p>Asynchronous locking methods never use spin loops. They do use CAS
loops, but these are mostly very short.
+ *
+ * <p>Synchronous read lock acquisitions and releases MUST be made in the same
thread. There is no such requirement for write lock and
+ * for asynchronous locking.
+ *
+ * <p>The lock is striped. It allows to reduce contention (as there are as
many contention points as there are stripes).
+ */
+public class StripedVersatileReadWriteLock {
+ /** Default concurrency. */
+ private static final int DEFAULT_CONCURRENCY = Math.max(1,
Runtime.getRuntime().availableProcessors() / 2);
+
+ private final VersatileReadWriteLock[] locks;
+
+ /** Index generator. */
+ private static final AtomicInteger INDEX_GENERATOR = new AtomicInteger();
+
+ /** Index. */
+ private static final ThreadLocal<Integer> INDEX =
ThreadLocal.withInitial(INDEX_GENERATOR::incrementAndGet);
+
+ /**
+ * Constructor.
+ */
+ public StripedVersatileReadWriteLock(Executor asyncContinuationExecutor) {
+ this(asyncContinuationExecutor, DEFAULT_CONCURRENCY);
+ }
+
+ private StripedVersatileReadWriteLock(Executor asyncContinuationExecutor,
int concurrency) {
+ assert concurrency > 0 : "Concurrency must be positive, but was: " +
concurrency;
+
+ locks = new VersatileReadWriteLock[concurrency];
+ for (int i = 0; i < concurrency; i++) {
+ locks[i] = new VersatileReadWriteLock(asyncContinuationExecutor);
+ }
+ }
+
+ /**
+ * Gets current stripe index.
+ *
+ * @return Index of current thread stripe.
+ */
+ private int currentIndex() {
+ int index = INDEX.get();
+
+ return index % locks.length;
+ }
+
+ private VersatileReadWriteLock currentLock() {
+ return locks[currentIndex()];
+ }
+
+ /**
+ * Acquires the read lock. If the write lock is already held, this blocks
until the write lock is released (and until all
+ * concurrent write locks are acquired and released, as this class
prioritizes write lock attempts over read lock attempts).
+ *
+ * <p>In contrast with async locking, invocations of this method MUST be
invoked in the same thread in which the corresponding
+ * {@link #readUnlock()} will be invoked.
+ */
+ public void readLock() {
+ currentLock().readLock();
+ }
+
+ /**
+ * Tries to acquire the read lock. No spinwait is used if the lock cannot
be acquired immediately.
+ *
+ * <p>In contrast with async locking, invocations of this method MUST be
invoked in the same thread in which the corresponding
+ * {@link #readUnlock()} will be invoked.
+ *
+ * @return {@code true} if acquired, {@code false} if write lock is
already held by someone else (or someone is waiting to acquire
+ * the write lock).
+ */
+ public boolean tryReadLock() {
+ return currentLock().tryReadLock();
+ }
+
+ /**
+ * Releases the read lock.
+ *
+ * <p>In contrast with async locking, invocations of this method MUST be
invoked in the same thread in which the corresponding
+ * {@link #readLock()} is invoked.
+ *
+ * @throws IllegalMonitorStateException thrown if the read lock is not
acquired by anyone.
+ */
+ public void readUnlock() {
+ currentLock().readUnlock();
+ }
+
+ private void readUnlock(int idx) {
+ locks[idx].readUnlock();
+ }
+
+ /**
+ * Acquires the write lock waiting, if needed. The thread will block until
all other read and write locks are released.
+ */
+ public void writeLock() {
+ // Locks must be acquired in order to avoid deadlocks.
+ for (VersatileReadWriteLock lock : locks) {
+ lock.writeLock();
+ }
+ }
+
+ private void writeUnlock0(int fromIndex) {
+ for (int i = fromIndex; i >= 0; i--) {
+ locks[i].writeUnlock();
+ }
+ }
+
+ /**
+ * Tries to acquire the write lock. Never blocks: if any lock has already
been acquired by someone else, returns {@code false}
+ * immediately.
+ *
+ * @return {@code true} if the write lock has been acquired, {@code false}
otherwise
+ */
+ public boolean tryWriteLock() {
+ int i = 0;
+
+ try {
+ for (; i < locks.length; i++) {
+ if (!locks[i].tryWriteLock()) {
+ break;
+ }
+ }
+ } finally {
+ if (0 < i && i < locks.length) {
+ writeUnlock0(i - 1);
+ }
+ }
+
+ return i == locks.length;
+ }
+
+ /**
+ * Releases the write lock.
+ *
+ * @throws IllegalMonitorStateException thrown if the write lock is not
acquired.
+ */
+ public void writeUnlock() {
+ writeUnlock0(locks.length - 1);
+ }
+
+ /**
+ * Executes the provided asynchronous action under protection of a read
lock: that is, it first obtains a read lock
+ * asynchronously, then executes the action, and then releases the lock.
+ *
+ * @param action Action to execute.
+ * @return Action result.
+ */
+ public <T> CompletableFuture<T> inReadLockAsync(Supplier<? extends
CompletableFuture<T>> action) {
+ int index = currentIndex();
+
+ return locks[index].readLockAsync()
+ .thenCompose(unused -> action.get())
+ .whenComplete((res, ex) -> readUnlock(index));
+ }
+
+ /**
+ * Executes the provided asynchronous action under protection of a write
lock: that is, it first obtains the write lock
+ * asynchronously, then executes the action, and then releases the lock.
+ *
+ * @param action Action to execute.
+ * @return Action result.
+ */
+ public <T> CompletableFuture<T> inWriteLockAsync(Supplier<? extends
CompletableFuture<T>> action) {
+ return writeLockAsync()
+ .thenCompose(unused -> action.get())
+ .whenComplete((res, ex) -> writeUnlock());
+ }
+
+ private CompletableFuture<Void> writeLockAsync() {
+ CompletableFuture<Void> future = nullCompletedFuture();
+
+ // Locks must be acquired in order to avoid deadlocks, hence the chain.
+ for (VersatileReadWriteLock lock : locks) {
+ future = future.thenCompose(unused -> lock.writeLockAsync());
+ }
+
+ return future;
+ }
+
+ int readLocksHeld() {
+ return
Arrays.stream(locks).mapToInt(VersatileReadWriteLock::readLocksHeld).sum();
+ }
+
+ boolean isWriteLocked() {
+ return
Arrays.stream(locks).anyMatch(VersatileReadWriteLock::isWriteLocked);
+ }
+}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/util/VersatileReadWriteLock.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/VersatileReadWriteLock.java
index 43d7ab4292d..c1b328db07a 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/util/VersatileReadWriteLock.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/util/VersatileReadWriteLock.java
@@ -438,7 +438,7 @@ public class VersatileReadWriteLock {
.whenComplete((res, ex) -> readUnlock());
}
- private CompletableFuture<Void> readLockAsync() {
+ CompletableFuture<Void> readLockAsync() {
if (tryReadLock()) {
return nullCompletedFuture();
}
@@ -473,7 +473,7 @@ public class VersatileReadWriteLock {
.whenComplete((res, ex) -> writeUnlock());
}
- private CompletableFuture<Void> writeLockAsync() {
+ CompletableFuture<Void> writeLockAsync() {
if (tryWriteLock()) {
return nullCompletedFuture();
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/util/VersatileReadWriteLockTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/util/StripedVersatileReadWriteLockTest.java
similarity index 83%
copy from
modules/core/src/test/java/org/apache/ignite/internal/util/VersatileReadWriteLockTest.java
copy to
modules/core/src/test/java/org/apache/ignite/internal/util/StripedVersatileReadWriteLockTest.java
index 6a626f4bf5f..b5343b8444f 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/util/VersatileReadWriteLockTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/util/StripedVersatileReadWriteLockTest.java
@@ -42,39 +42,43 @@ import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.internal.lang.RunnableX;
-import org.apache.ignite.internal.logger.IgniteLogger;
-import org.apache.ignite.internal.logger.Loggers;
-import org.apache.ignite.internal.thread.IgniteThreadFactory;
+import org.apache.ignite.internal.testframework.ExecutorServiceExtension;
+import org.apache.ignite.internal.testframework.InjectExecutorService;
import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.junitpioneer.jupiter.cartesian.CartesianTest;
import org.junitpioneer.jupiter.cartesian.CartesianTest.Enum;
/**
- * Tests for {@link VersatileReadWriteLock}.
+ * Tests for {@link StripedVersatileReadWriteLock}.
*/
@Timeout(20)
-class VersatileReadWriteLockTest {
- private static final IgniteLogger LOG =
Loggers.forClass(VersatileReadWriteLockTest.class);
-
+@ExtendWith(ExecutorServiceExtension.class)
+class StripedVersatileReadWriteLockTest {
private static final String ASYNC_CONTINUATION_THREAD_PREFIX = "ace";
- private final ExecutorService asyncContinuationExecutor =
Executors.newCachedThreadPool(
-
IgniteThreadFactory.createWithFixedPrefix(ASYNC_CONTINUATION_THREAD_PREFIX,
false, LOG)
- );
+ @InjectExecutorService(threadPrefix = ASYNC_CONTINUATION_THREAD_PREFIX)
+ private ExecutorService asyncContinuationExecutor;
/** The lock under test. */
- private final VersatileReadWriteLock lock = new
VersatileReadWriteLock(asyncContinuationExecutor);
+ private StripedVersatileReadWriteLock lock;
/** Executor service used to run tasks in threads different from the main
test thread. */
- private final ExecutorService executor = Executors.newCachedThreadPool();
+ @InjectExecutorService
+ private ExecutorService executor;
+
+ @BeforeEach
+ void createLock() {
+ lock = new StripedVersatileReadWriteLock(asyncContinuationExecutor);
+ }
/**
* Cleans up after a test.
@@ -83,9 +87,6 @@ class VersatileReadWriteLockTest {
void cleanup() {
releaseReadLocks();
releaseWriteLocks();
-
- IgniteUtils.shutdownAndAwaitTermination(executor, 3, SECONDS);
- IgniteUtils.shutdownAndAwaitTermination(asyncContinuationExecutor, 3,
SECONDS);
}
private void releaseReadLocks() {
@@ -123,10 +124,9 @@ class VersatileReadWriteLockTest {
@ParameterizedTest
@EnumSource(BlockingWriteLockAcquisition.class)
void
readLockDoesNotAllowWriteLockToBeAcquiredBySameThread(BlockingWriteLockAcquisition
acquisition) {
- assertThatActionBlocksForever(() -> {
- lock.readLock();
- acquisition.acquire(lock);
- });
+ lock.readLock();
+
+ assertThatActionBlocksForever(() -> acquisition.acquire(lock));
lock.readUnlock();
}
@@ -141,27 +141,6 @@ class VersatileReadWriteLockTest {
assertThat(future, willTimeoutIn(100, MILLISECONDS));
}
- @Test
- void readLockDoesNotAllowWriteLockToBeAcquiredWithTimeout() throws
Exception {
- lock.readLock();
-
- Boolean acquired = callWithTimeout(() -> lock.tryWriteLock(1,
MILLISECONDS));
- assertThat(acquired, is(false));
-
- lock.readUnlock();
- }
-
- @Test
- void readLockDoesNotAllowWriteLockToBeAcquiredWithTimeoutBySameThread()
throws Exception {
- Boolean acquired = callWithTimeout(() -> {
- lock.readLock();
- return lock.tryWriteLock(1, MILLISECONDS);
- });
- assertThat(acquired, is(false));
-
- lock.readUnlock();
- }
-
@Test
void readLockAllowsReadLockToBeAcquired() {
lock.readLock();
@@ -307,10 +286,9 @@ class VersatileReadWriteLockTest {
@ParameterizedTest
@EnumSource(BlockingWriteLockAcquisition.class)
void
readLockAcquiredWithTryReadLockDoesNotAllowWriteLockToBeAcquiredBySameThread(BlockingWriteLockAcquisition
acquisition) {
- assertThatActionBlocksForever(() -> {
- lock.tryReadLock();
- acquisition.acquire(lock);
- });
+ lock.tryReadLock();
+
+ assertThatActionBlocksForever(() -> acquisition.acquire(lock));
lock.readUnlock();
}
@@ -375,7 +353,16 @@ class VersatileReadWriteLockTest {
}
@Test
- void inReadLockAsyncReleasesReadLockInTheEndInCaseOfException() {
+ void inReadLockAsyncReleasesReadLockInTheEndInCaseOfExceptionInSyncPart() {
+ assertThat(lock.inReadLockAsync(() -> {
+ throw new RuntimeException("Oops");
+ }), willThrow(Exception.class));
+
+ assertThatNoReadLockIsHeld();
+ }
+
+ @Test
+ void inReadLockAsyncReleasesReadLockInTheEndInCaseOfExceptionInAsyncPart()
{
assertThat(lock.inReadLockAsync(() -> failedFuture(new
Exception("Oops"))), willThrow(Exception.class));
assertThatNoReadLockIsHeld();
@@ -406,33 +393,6 @@ class VersatileReadWriteLockTest {
assertThatNoReadLockIsHeld();
}
- @Test
- void inReadLockAsyncRespectsPendingWriteLocks() throws Exception {
- lock.readLock();
-
- CompletableFuture<?> writeLockFuture = runAsync(lock::writeLock,
executor);
-
- waitTillWriteLockAcquireAttemptIsInitiated();
-
- CompletableFuture<Void> readLockAsyncFuture =
lock.inReadLockAsync(CompletableFutures::nullCompletedFuture);
- assertFalse(readLockAsyncFuture.isDone());
-
- // Letting the write lock to be acquired.
- lock.readUnlock();
-
- assertThat(writeLockFuture, willCompleteSuccessfully());
-
- assertFalse(waitForCondition(readLockAsyncFuture::isDone, 100));
-
- lock.writeUnlock();
- }
-
- private void waitTillWriteLockAcquireAttemptIsInitiated() throws
InterruptedException {
- boolean sawAnAttempt = waitForCondition(
- () -> lock.pendingWriteLocksCount() > 0, SECONDS.toMillis(10));
- assertTrue(sawAnAttempt, "Did not see any attempt to acquire write
lock");
- }
-
@Test
void inReadLockAsyncTakesReadLockInExecutorAfterWriteLockGetsReleased() {
lock.writeLock();
@@ -477,7 +437,7 @@ class VersatileReadWriteLockTest {
private void assertThatWriteLockGetsUnlocked() throws InterruptedException
{
assertTrue(
waitForCondition(() -> !lock.isWriteLocked(),
SECONDS.toMillis(10)),
- () -> "Still write locked"
+ "Still write locked"
);
}
@@ -494,7 +454,16 @@ class VersatileReadWriteLockTest {
}
@Test
- void inWriteLockAsyncReleasesWriteLockInTheEndInCaseOfException() {
+ void
inWriteLockAsyncReleasesWriteLockInTheEndInCaseOfExceptionInSyncPart() {
+ assertThat(lock.inWriteLockAsync(() -> {
+ throw new RuntimeException("Oops");
+ }), willThrow(Exception.class));
+
+ assertThatNoWriteLockIsHeld();
+ }
+
+ @Test
+ void
inWriteLockAsyncReleasesWriteLockInTheEndInCaseOfExceptionInAsyncPart() {
assertThat(lock.inWriteLockAsync(() -> failedFuture(new
Exception("Oops"))), willThrow(Exception.class));
assertThatNoWriteLockIsHeld();
@@ -564,46 +533,40 @@ class VersatileReadWriteLockTest {
private enum BlockingWriteLockAcquisition {
WRITE_LOCK {
@Override
- void acquire(VersatileReadWriteLock lock) {
+ void acquire(StripedVersatileReadWriteLock lock) {
lock.writeLock();
}
- },
- WRITE_LOCK_BUSY {
- @Override
- void acquire(VersatileReadWriteLock lock) {
- lock.writeLockBusy();
- }
};
- abstract void acquire(VersatileReadWriteLock lock);
+ abstract void acquire(StripedVersatileReadWriteLock lock);
}
private enum WriteLockImpeder {
READ_LOCK {
@Override
- void impede(VersatileReadWriteLock lock) {
+ void impede(StripedVersatileReadWriteLock lock) {
lock.readLock();
}
@Override
- void stopImpeding(VersatileReadWriteLock lock) {
+ void stopImpeding(StripedVersatileReadWriteLock lock) {
lock.readUnlock();
}
},
WRITE_LOCK {
@Override
- void impede(VersatileReadWriteLock lock) {
+ void impede(StripedVersatileReadWriteLock lock) {
lock.writeLock();
}
@Override
- void stopImpeding(VersatileReadWriteLock lock) {
+ void stopImpeding(StripedVersatileReadWriteLock lock) {
lock.writeUnlock();
}
};
- abstract void impede(VersatileReadWriteLock lock);
+ abstract void impede(StripedVersatileReadWriteLock lock);
- abstract void stopImpeding(VersatileReadWriteLock lock);
+ abstract void stopImpeding(StripedVersatileReadWriteLock lock);
}
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/util/VersatileReadWriteLockBenchmark.java
b/modules/core/src/test/java/org/apache/ignite/internal/util/VersatileReadWriteLockBenchmark.java
new file mode 100644
index 00000000000..e623f7fed98
--- /dev/null
+++
b/modules/core/src/test/java/org/apache/ignite/internal/util/VersatileReadWriteLockBenchmark.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util;
+
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.TimeUnit;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.RunnerException;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+
+/**
+ * Benchmark to compare performance of {@link VersatileReadWriteLock} with its
derivatives.
+ */
+@State(Scope.Benchmark)
+@Fork(1)
+@Threads(8)
+@Warmup(iterations = 10, time = 2)
+@Measurement(iterations = 20, time = 2)
+@BenchmarkMode(Mode.Throughput)
+@OutputTimeUnit(TimeUnit.SECONDS)
+public class VersatileReadWriteLockBenchmark {
+ private final VersatileReadWriteLock simpleLock = new
VersatileReadWriteLock(ForkJoinPool.commonPool());
+ private final StripedVersatileReadWriteLock stripedLock = new
StripedVersatileReadWriteLock(ForkJoinPool.commonPool());
+
+ @Benchmark
+ public void simpleSync() {
+ simpleLock.readLock();
+ simpleLock.readUnlock();
+ }
+
+ @Benchmark
+ public void stripedSync() {
+ stripedLock.readLock();
+ stripedLock.readUnlock();
+ }
+
+ @Benchmark
+ public Object simpleAsync() throws Exception {
+ return
simpleLock.inReadLockAsync(CompletableFutures::nullCompletedFuture).get();
+ }
+
+ @Benchmark
+ public Object stripedAsync() throws Exception {
+ return
stripedLock.inReadLockAsync(CompletableFutures::nullCompletedFuture).get();
+ }
+
+ /**
+ * Benchmark's entry point.
+ */
+ public static void main(String[] args) throws RunnerException {
+ Options opt = new OptionsBuilder()
+ .include(".*" +
VersatileReadWriteLockBenchmark.class.getSimpleName() + ".*")
+ // .jvmArgsAppend("-Djmh.executor=VIRTUAL")
+ // .addProfiler(JavaFlightRecorderProfiler.class,
"configName=profile.jfc")
+ .build();
+
+ new Runner(opt).run();
+ }
+}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/util/VersatileReadWriteLockTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/util/VersatileReadWriteLockTest.java
index 6a626f4bf5f..dad46111562 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/util/VersatileReadWriteLockTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/util/VersatileReadWriteLockTest.java
@@ -42,16 +42,16 @@ import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.internal.lang.RunnableX;
-import org.apache.ignite.internal.logger.IgniteLogger;
-import org.apache.ignite.internal.logger.Loggers;
-import org.apache.ignite.internal.thread.IgniteThreadFactory;
+import org.apache.ignite.internal.testframework.ExecutorServiceExtension;
+import org.apache.ignite.internal.testframework.InjectExecutorService;
import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.junitpioneer.jupiter.cartesian.CartesianTest;
@@ -61,20 +61,24 @@ import
org.junitpioneer.jupiter.cartesian.CartesianTest.Enum;
* Tests for {@link VersatileReadWriteLock}.
*/
@Timeout(20)
+@ExtendWith(ExecutorServiceExtension.class)
class VersatileReadWriteLockTest {
- private static final IgniteLogger LOG =
Loggers.forClass(VersatileReadWriteLockTest.class);
-
private static final String ASYNC_CONTINUATION_THREAD_PREFIX = "ace";
- private final ExecutorService asyncContinuationExecutor =
Executors.newCachedThreadPool(
-
IgniteThreadFactory.createWithFixedPrefix(ASYNC_CONTINUATION_THREAD_PREFIX,
false, LOG)
- );
+ @InjectExecutorService(threadPrefix = ASYNC_CONTINUATION_THREAD_PREFIX)
+ private ExecutorService asyncContinuationExecutor;
/** The lock under test. */
- private final VersatileReadWriteLock lock = new
VersatileReadWriteLock(asyncContinuationExecutor);
+ private VersatileReadWriteLock lock;
/** Executor service used to run tasks in threads different from the main
test thread. */
- private final ExecutorService executor = Executors.newCachedThreadPool();
+ @InjectExecutorService
+ private ExecutorService executor;
+
+ @BeforeEach
+ void createLock() {
+ lock = new VersatileReadWriteLock(asyncContinuationExecutor);
+ }
/**
* Cleans up after a test.
@@ -375,7 +379,16 @@ class VersatileReadWriteLockTest {
}
@Test
- void inReadLockAsyncReleasesReadLockInTheEndInCaseOfException() {
+ void inReadLockAsyncReleasesReadLockInTheEndInCaseOfExceptionInSyncPart() {
+ assertThat(lock.inReadLockAsync(() -> {
+ throw new RuntimeException("Oops");
+ }), willThrow(Exception.class));
+
+ assertThatNoReadLockIsHeld();
+ }
+
+ @Test
+ void inReadLockAsyncReleasesReadLockInTheEndInCaseOfExceptionInAsyncPart()
{
assertThat(lock.inReadLockAsync(() -> failedFuture(new
Exception("Oops"))), willThrow(Exception.class));
assertThatNoReadLockIsHeld();
@@ -494,7 +507,16 @@ class VersatileReadWriteLockTest {
}
@Test
- void inWriteLockAsyncReleasesWriteLockInTheEndInCaseOfException() {
+ void
inWriteLockAsyncReleasesWriteLockInTheEndInCaseOfExceptionInSyncPart() {
+ assertThat(lock.inWriteLockAsync(() -> {
+ throw new RuntimeException("Oops");
+ }), willThrow(Exception.class));
+
+ assertThatNoWriteLockIsHeld();
+ }
+
+ @Test
+ void
inWriteLockAsyncReleasesWriteLockInTheEndInCaseOfExceptionInAsyncPart() {
assertThat(lock.inWriteLockAsync(() -> failedFuture(new
Exception("Oops"))), willThrow(Exception.class));
assertThatNoWriteLockIsHeld();
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/restart/IgniteAttachmentLock.java
b/modules/runner/src/main/java/org/apache/ignite/internal/restart/IgniteAttachmentLock.java
index 0c2e724bf6b..da5ad02de6e 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/restart/IgniteAttachmentLock.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/restart/IgniteAttachmentLock.java
@@ -23,7 +23,7 @@ import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.ignite.Ignite;
-import org.apache.ignite.internal.util.VersatileReadWriteLock;
+import org.apache.ignite.internal.util.StripedVersatileReadWriteLock;
import org.apache.ignite.lang.ErrorGroups.Common;
import org.apache.ignite.lang.IgniteException;
@@ -35,14 +35,14 @@ public class IgniteAttachmentLock {
/** This must always be accessed under {@link #lock}. */
private final Supplier<Ignite> igniteRef;
- private final VersatileReadWriteLock lock;
+ private final StripedVersatileReadWriteLock lock;
/**
* Constructor.
*/
public IgniteAttachmentLock(Supplier<Ignite> igniteRef, Executor
asyncContinuationExecutor) {
this.igniteRef = igniteRef;
- lock = new VersatileReadWriteLock(asyncContinuationExecutor);
+ lock = new StripedVersatileReadWriteLock(asyncContinuationExecutor);
}
/**
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/restart/RestartProofKeyValueView.java
b/modules/runner/src/main/java/org/apache/ignite/internal/restart/RestartProofKeyValueView.java
index 9231930b5df..ac61c014ccf 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/restart/RestartProofKeyValueView.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/restart/RestartProofKeyValueView.java
@@ -25,6 +25,8 @@ import java.util.concurrent.Flow.Publisher;
import java.util.concurrent.Flow.Subscriber;
import java.util.function.Function;
import org.apache.ignite.Ignite;
+import org.apache.ignite.internal.wrapper.Wrapper;
+import org.apache.ignite.internal.wrapper.Wrappers;
import org.apache.ignite.lang.AsyncCursor;
import org.apache.ignite.lang.Cursor;
import org.apache.ignite.lang.NullableValue;
@@ -44,7 +46,7 @@ import org.jetbrains.annotations.Nullable;
* <p>API operations on this are linearized with respect to node restarts.
Normally (except for situations when timeouts trigger), user
* operations will not interact with detached objects.
*/
-class RestartProofKeyValueView<K, V> extends
RestartProofApiObject<KeyValueView<K, V>> implements KeyValueView<K, V> {
+class RestartProofKeyValueView<K, V> extends
RestartProofApiObject<KeyValueView<K, V>> implements KeyValueView<K, V>,
Wrapper {
RestartProofKeyValueView(
IgniteAttachmentLock attachmentLock,
Ignite initialIgnite,
@@ -304,4 +306,9 @@ class RestartProofKeyValueView<K, V> extends
RestartProofApiObject<KeyValueView<
) {
return attachedAsync(view -> view.queryAsync(tx, criteria, indexName,
opts));
}
+
+ @Override
+ public <T> T unwrap(Class<T> classToUnwrap) {
+ return attached(view -> Wrappers.unwrap(view, classToUnwrap));
+ }
}