This is an automated email from the ASF dual-hosted git repository.

rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 9e0070243fb IGNITE-23779 Decrease contention on attachment lock (#6472)
9e0070243fb is described below

commit 9e0070243fb20ede0353343fa7974528c4ab5265
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Mon Aug 25 17:53:28 2025 +0400

    IGNITE-23779 Decrease contention on attachment lock (#6472)
    
    A striped lock over VersatileReadWriteLock is introduced so that contention 
is decreased
---
 .../util/StripedVersatileReadWriteLock.java        | 229 +++++++++++++++++++++
 .../internal/util/VersatileReadWriteLock.java      |   4 +-
 ...java => StripedVersatileReadWriteLockTest.java} | 141 +++++--------
 .../util/VersatileReadWriteLockBenchmark.java      |  85 ++++++++
 .../internal/util/VersatileReadWriteLockTest.java  |  48 +++--
 .../internal/restart/IgniteAttachmentLock.java     |   6 +-
 .../internal/restart/RestartProofKeyValueView.java |   9 +-
 7 files changed, 414 insertions(+), 108 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/StripedVersatileReadWriteLock.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedVersatileReadWriteLock.java
new file mode 100644
index 00000000000..1488de8c69c
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedVersatileReadWriteLock.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util;
+
+import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+
+import java.util.Arrays;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Supplier;
+
+/**
+ * A versatile read-write lock that can be used both in synchronous and 
asynchronous contexts.
+ * Its blocking methods use the spinwait strategy. When they do so, they are 
not interruptible (that is, they do not break their loop on
+ * interruption signal).
+ *
+ * <p>The locks are NOT reentrant (that is, the same thread can NOT acquire 
the same lock a few times without releasing it).
+ *
+ * <p>Write lock acquire requests are prioritized over read lock acquire 
requests. That is, if both read and write lock
+ * acquire requests are received when the write lock is held by someone else, 
then, on its release, the write lock attempt will be served
+ * first.
+ *
+ * <p>Lock owners are not tracked.
+ *
+ * <p>Asynchronous locking methods may complete the futures either in the 
calling thread (if they were able to immediately acquire
+ * the requested lock) or in the supplied pool (if they had to wait for a 
release to happen before being able to satisfy the request).
+ *
+ * <p>Asynchronous locking methods never use spin loops. They do use CAS 
loops, but these are mostly very short.
+ *
+ * <p>Synchronous read lock acquisitions and releases MUST be made in the same 
thread. There is no such requirement for write lock and
+ * for asynchronous locking.
+ *
+ * <p>The lock is striped. It allows to reduce contention (as there are as 
many contention points as there are stripes).
+ */
+public class StripedVersatileReadWriteLock {
+    /** Default concurrency. */
+    private static final int DEFAULT_CONCURRENCY = Math.max(1, 
Runtime.getRuntime().availableProcessors() / 2);
+
+    private final VersatileReadWriteLock[] locks;
+
+    /** Index generator. */
+    private static final AtomicInteger INDEX_GENERATOR = new AtomicInteger();
+
+    /** Index. */
+    private static final ThreadLocal<Integer> INDEX = 
ThreadLocal.withInitial(INDEX_GENERATOR::incrementAndGet);
+
+    /**
+     * Constructor.
+     */
+    public StripedVersatileReadWriteLock(Executor asyncContinuationExecutor) {
+        this(asyncContinuationExecutor, DEFAULT_CONCURRENCY);
+    }
+
+    private StripedVersatileReadWriteLock(Executor asyncContinuationExecutor, 
int concurrency) {
+        assert concurrency > 0 : "Concurrency must be positive, but was: " + 
concurrency;
+
+        locks = new VersatileReadWriteLock[concurrency];
+        for (int i = 0; i < concurrency; i++) {
+            locks[i] = new VersatileReadWriteLock(asyncContinuationExecutor);
+        }
+    }
+
+    /**
+     * Gets current stripe index.
+     *
+     * @return Index of current thread stripe.
+     */
+    private int currentIndex() {
+        int index = INDEX.get();
+
+        return index % locks.length;
+    }
+
+    private VersatileReadWriteLock currentLock() {
+        return locks[currentIndex()];
+    }
+
+    /**
+     * Acquires the read lock. If the write lock is already held, this blocks 
until the write lock is released (and until all
+     * concurrent write locks are acquired and released, as this class 
prioritizes write lock attempts over read lock attempts).
+     *
+     * <p>In contrast with async locking, invocations of this method MUST be 
invoked in the same thread in which the corresponding
+     * {@link #readUnlock()} will be invoked.
+     */
+    public void readLock() {
+        currentLock().readLock();
+    }
+
+    /**
+     * Tries to acquire the read lock. No spinwait is used if the lock cannot 
be acquired immediately.
+     *
+     * <p>In contrast with async locking, invocations of this method MUST be 
invoked in the same thread in which the corresponding
+     * {@link #readUnlock()} will be invoked.
+     *
+     * @return {@code true} if acquired, {@code false} if write lock is 
already held by someone else (or someone is waiting to acquire
+     *     the write lock).
+     */
+    public boolean tryReadLock() {
+        return currentLock().tryReadLock();
+    }
+
+    /**
+     * Releases the read lock.
+     *
+     * <p>In contrast with async locking, invocations of this method MUST be 
invoked in the same thread in which the corresponding
+     * {@link #readLock()} is invoked.
+     *
+     * @throws IllegalMonitorStateException thrown if the read lock is not 
acquired by anyone.
+     */
+    public void readUnlock() {
+        currentLock().readUnlock();
+    }
+
+    private void readUnlock(int idx) {
+        locks[idx].readUnlock();
+    }
+
+    /**
+     * Acquires the write lock waiting, if needed. The thread will block until 
all other read and write locks are released.
+     */
+    public void writeLock() {
+        // Locks must be acquired in order to avoid deadlocks.
+        for (VersatileReadWriteLock lock : locks) {
+            lock.writeLock();
+        }
+    }
+
+    private void writeUnlock0(int fromIndex) {
+        for (int i = fromIndex; i >= 0; i--) {
+            locks[i].writeUnlock();
+        }
+    }
+
+    /**
+     * Tries to acquire the write lock. Never blocks: if any lock has already 
been acquired by someone else, returns {@code false}
+     * immediately.
+     *
+     * @return {@code true} if the write lock has been acquired, {@code false} 
otherwise
+     */
+    public boolean tryWriteLock() {
+        int i = 0;
+
+        try {
+            for (; i < locks.length; i++) {
+                if (!locks[i].tryWriteLock()) {
+                    break;
+                }
+            }
+        } finally {
+            if (0 < i && i < locks.length) {
+                writeUnlock0(i - 1);
+            }
+        }
+
+        return i == locks.length;
+    }
+
+    /**
+     * Releases the write lock.
+     *
+     * @throws IllegalMonitorStateException thrown if the write lock is not 
acquired.
+     */
+    public void writeUnlock() {
+        writeUnlock0(locks.length - 1);
+    }
+
+    /**
+     * Executes the provided asynchronous action under protection of a read 
lock: that is, it first obtains a read lock
+     * asynchronously, then executes the action, and then releases the lock.
+     *
+     * @param action Action to execute.
+     * @return Action result.
+     */
+    public <T> CompletableFuture<T> inReadLockAsync(Supplier<? extends 
CompletableFuture<T>> action) {
+        int index = currentIndex();
+
+        return locks[index].readLockAsync()
+                .thenCompose(unused -> action.get())
+                .whenComplete((res, ex) -> readUnlock(index));
+    }
+
+    /**
+     * Executes the provided asynchronous action under protection of a write 
lock: that is, it first obtains the write lock
+     * asynchronously, then executes the action, and then releases the lock.
+     *
+     * @param action Action to execute.
+     * @return Action result.
+     */
+    public <T> CompletableFuture<T> inWriteLockAsync(Supplier<? extends 
CompletableFuture<T>> action) {
+        return writeLockAsync()
+                .thenCompose(unused -> action.get())
+                .whenComplete((res, ex) -> writeUnlock());
+    }
+
+    private CompletableFuture<Void> writeLockAsync() {
+        CompletableFuture<Void> future = nullCompletedFuture();
+
+        // Locks must be acquired in order to avoid deadlocks, hence the chain.
+        for (VersatileReadWriteLock lock : locks) {
+            future = future.thenCompose(unused -> lock.writeLockAsync());
+        }
+
+        return future;
+    }
+
+    int readLocksHeld() {
+        return 
Arrays.stream(locks).mapToInt(VersatileReadWriteLock::readLocksHeld).sum();
+    }
+
+    boolean isWriteLocked() {
+        return 
Arrays.stream(locks).anyMatch(VersatileReadWriteLock::isWriteLocked);
+    }
+}
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/VersatileReadWriteLock.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/VersatileReadWriteLock.java
index 43d7ab4292d..c1b328db07a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/VersatileReadWriteLock.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/VersatileReadWriteLock.java
@@ -438,7 +438,7 @@ public class VersatileReadWriteLock {
                 .whenComplete((res, ex) -> readUnlock());
     }
 
-    private CompletableFuture<Void> readLockAsync() {
+    CompletableFuture<Void> readLockAsync() {
         if (tryReadLock()) {
             return nullCompletedFuture();
         }
@@ -473,7 +473,7 @@ public class VersatileReadWriteLock {
                 .whenComplete((res, ex) -> writeUnlock());
     }
 
-    private CompletableFuture<Void> writeLockAsync() {
+    CompletableFuture<Void> writeLockAsync() {
         if (tryWriteLock()) {
             return nullCompletedFuture();
         }
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/util/VersatileReadWriteLockTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/util/StripedVersatileReadWriteLockTest.java
similarity index 83%
copy from 
modules/core/src/test/java/org/apache/ignite/internal/util/VersatileReadWriteLockTest.java
copy to 
modules/core/src/test/java/org/apache/ignite/internal/util/StripedVersatileReadWriteLockTest.java
index 6a626f4bf5f..b5343b8444f 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/util/VersatileReadWriteLockTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/util/StripedVersatileReadWriteLockTest.java
@@ -42,39 +42,43 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.ignite.internal.lang.RunnableX;
-import org.apache.ignite.internal.logger.IgniteLogger;
-import org.apache.ignite.internal.logger.Loggers;
-import org.apache.ignite.internal.thread.IgniteThreadFactory;
+import org.apache.ignite.internal.testframework.ExecutorServiceExtension;
+import org.apache.ignite.internal.testframework.InjectExecutorService;
 import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.extension.ExtendWith;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.EnumSource;
 import org.junitpioneer.jupiter.cartesian.CartesianTest;
 import org.junitpioneer.jupiter.cartesian.CartesianTest.Enum;
 
 /**
- * Tests for {@link VersatileReadWriteLock}.
+ * Tests for {@link StripedVersatileReadWriteLock}.
  */
 @Timeout(20)
-class VersatileReadWriteLockTest {
-    private static final IgniteLogger LOG = 
Loggers.forClass(VersatileReadWriteLockTest.class);
-
+@ExtendWith(ExecutorServiceExtension.class)
+class StripedVersatileReadWriteLockTest {
     private static final String ASYNC_CONTINUATION_THREAD_PREFIX = "ace";
 
-    private final ExecutorService asyncContinuationExecutor = 
Executors.newCachedThreadPool(
-            
IgniteThreadFactory.createWithFixedPrefix(ASYNC_CONTINUATION_THREAD_PREFIX, 
false, LOG)
-    );
+    @InjectExecutorService(threadPrefix = ASYNC_CONTINUATION_THREAD_PREFIX)
+    private ExecutorService asyncContinuationExecutor;
 
     /** The lock under test. */
-    private final VersatileReadWriteLock lock = new 
VersatileReadWriteLock(asyncContinuationExecutor);
+    private StripedVersatileReadWriteLock lock;
 
     /** Executor service used to run tasks in threads different from the main 
test thread. */
-    private final ExecutorService executor = Executors.newCachedThreadPool();
+    @InjectExecutorService
+    private ExecutorService executor;
+
+    @BeforeEach
+    void createLock() {
+        lock = new StripedVersatileReadWriteLock(asyncContinuationExecutor);
+    }
 
     /**
      * Cleans up after a test.
@@ -83,9 +87,6 @@ class VersatileReadWriteLockTest {
     void cleanup() {
         releaseReadLocks();
         releaseWriteLocks();
-
-        IgniteUtils.shutdownAndAwaitTermination(executor, 3, SECONDS);
-        IgniteUtils.shutdownAndAwaitTermination(asyncContinuationExecutor, 3, 
SECONDS);
     }
 
     private void releaseReadLocks() {
@@ -123,10 +124,9 @@ class VersatileReadWriteLockTest {
     @ParameterizedTest
     @EnumSource(BlockingWriteLockAcquisition.class)
     void 
readLockDoesNotAllowWriteLockToBeAcquiredBySameThread(BlockingWriteLockAcquisition
 acquisition) {
-        assertThatActionBlocksForever(() -> {
-            lock.readLock();
-            acquisition.acquire(lock);
-        });
+        lock.readLock();
+
+        assertThatActionBlocksForever(() -> acquisition.acquire(lock));
 
         lock.readUnlock();
     }
@@ -141,27 +141,6 @@ class VersatileReadWriteLockTest {
         assertThat(future, willTimeoutIn(100, MILLISECONDS));
     }
 
-    @Test
-    void readLockDoesNotAllowWriteLockToBeAcquiredWithTimeout() throws 
Exception {
-        lock.readLock();
-
-        Boolean acquired = callWithTimeout(() -> lock.tryWriteLock(1, 
MILLISECONDS));
-        assertThat(acquired, is(false));
-
-        lock.readUnlock();
-    }
-
-    @Test
-    void readLockDoesNotAllowWriteLockToBeAcquiredWithTimeoutBySameThread() 
throws Exception {
-        Boolean acquired = callWithTimeout(() -> {
-            lock.readLock();
-            return lock.tryWriteLock(1, MILLISECONDS);
-        });
-        assertThat(acquired, is(false));
-
-        lock.readUnlock();
-    }
-
     @Test
     void readLockAllowsReadLockToBeAcquired() {
         lock.readLock();
@@ -307,10 +286,9 @@ class VersatileReadWriteLockTest {
     @ParameterizedTest
     @EnumSource(BlockingWriteLockAcquisition.class)
     void 
readLockAcquiredWithTryReadLockDoesNotAllowWriteLockToBeAcquiredBySameThread(BlockingWriteLockAcquisition
 acquisition) {
-        assertThatActionBlocksForever(() -> {
-            lock.tryReadLock();
-            acquisition.acquire(lock);
-        });
+        lock.tryReadLock();
+
+        assertThatActionBlocksForever(() -> acquisition.acquire(lock));
 
         lock.readUnlock();
     }
@@ -375,7 +353,16 @@ class VersatileReadWriteLockTest {
     }
 
     @Test
-    void inReadLockAsyncReleasesReadLockInTheEndInCaseOfException() {
+    void inReadLockAsyncReleasesReadLockInTheEndInCaseOfExceptionInSyncPart() {
+        assertThat(lock.inReadLockAsync(() -> {
+            throw new RuntimeException("Oops");
+        }), willThrow(Exception.class));
+
+        assertThatNoReadLockIsHeld();
+    }
+
+    @Test
+    void inReadLockAsyncReleasesReadLockInTheEndInCaseOfExceptionInAsyncPart() 
{
         assertThat(lock.inReadLockAsync(() -> failedFuture(new 
Exception("Oops"))), willThrow(Exception.class));
 
         assertThatNoReadLockIsHeld();
@@ -406,33 +393,6 @@ class VersatileReadWriteLockTest {
         assertThatNoReadLockIsHeld();
     }
 
-    @Test
-    void inReadLockAsyncRespectsPendingWriteLocks() throws Exception {
-        lock.readLock();
-
-        CompletableFuture<?> writeLockFuture = runAsync(lock::writeLock, 
executor);
-
-        waitTillWriteLockAcquireAttemptIsInitiated();
-
-        CompletableFuture<Void> readLockAsyncFuture = 
lock.inReadLockAsync(CompletableFutures::nullCompletedFuture);
-        assertFalse(readLockAsyncFuture.isDone());
-
-        // Letting the write lock to be acquired.
-        lock.readUnlock();
-
-        assertThat(writeLockFuture, willCompleteSuccessfully());
-
-        assertFalse(waitForCondition(readLockAsyncFuture::isDone, 100));
-
-        lock.writeUnlock();
-    }
-
-    private void waitTillWriteLockAcquireAttemptIsInitiated() throws 
InterruptedException {
-        boolean sawAnAttempt = waitForCondition(
-                () -> lock.pendingWriteLocksCount() > 0, SECONDS.toMillis(10));
-        assertTrue(sawAnAttempt, "Did not see any attempt to acquire write 
lock");
-    }
-
     @Test
     void inReadLockAsyncTakesReadLockInExecutorAfterWriteLockGetsReleased() {
         lock.writeLock();
@@ -477,7 +437,7 @@ class VersatileReadWriteLockTest {
     private void assertThatWriteLockGetsUnlocked() throws InterruptedException 
{
         assertTrue(
                 waitForCondition(() -> !lock.isWriteLocked(), 
SECONDS.toMillis(10)),
-                () -> "Still write locked"
+                "Still write locked"
         );
     }
 
@@ -494,7 +454,16 @@ class VersatileReadWriteLockTest {
     }
 
     @Test
-    void inWriteLockAsyncReleasesWriteLockInTheEndInCaseOfException() {
+    void 
inWriteLockAsyncReleasesWriteLockInTheEndInCaseOfExceptionInSyncPart() {
+        assertThat(lock.inWriteLockAsync(() -> {
+            throw new RuntimeException("Oops");
+        }), willThrow(Exception.class));
+
+        assertThatNoWriteLockIsHeld();
+    }
+
+    @Test
+    void 
inWriteLockAsyncReleasesWriteLockInTheEndInCaseOfExceptionInAsyncPart() {
         assertThat(lock.inWriteLockAsync(() -> failedFuture(new 
Exception("Oops"))), willThrow(Exception.class));
 
         assertThatNoWriteLockIsHeld();
@@ -564,46 +533,40 @@ class VersatileReadWriteLockTest {
     private enum BlockingWriteLockAcquisition {
         WRITE_LOCK {
             @Override
-            void acquire(VersatileReadWriteLock lock) {
+            void acquire(StripedVersatileReadWriteLock lock) {
                 lock.writeLock();
             }
-        },
-        WRITE_LOCK_BUSY {
-            @Override
-            void acquire(VersatileReadWriteLock lock) {
-                lock.writeLockBusy();
-            }
         };
 
-        abstract void acquire(VersatileReadWriteLock lock);
+        abstract void acquire(StripedVersatileReadWriteLock lock);
     }
 
     private enum WriteLockImpeder {
         READ_LOCK {
             @Override
-            void impede(VersatileReadWriteLock lock) {
+            void impede(StripedVersatileReadWriteLock lock) {
                 lock.readLock();
             }
 
             @Override
-            void stopImpeding(VersatileReadWriteLock lock) {
+            void stopImpeding(StripedVersatileReadWriteLock lock) {
                 lock.readUnlock();
             }
         },
         WRITE_LOCK {
             @Override
-            void impede(VersatileReadWriteLock lock) {
+            void impede(StripedVersatileReadWriteLock lock) {
                 lock.writeLock();
             }
 
             @Override
-            void stopImpeding(VersatileReadWriteLock lock) {
+            void stopImpeding(StripedVersatileReadWriteLock lock) {
                 lock.writeUnlock();
             }
         };
 
-        abstract void impede(VersatileReadWriteLock lock);
+        abstract void impede(StripedVersatileReadWriteLock lock);
 
-        abstract void stopImpeding(VersatileReadWriteLock lock);
+        abstract void stopImpeding(StripedVersatileReadWriteLock lock);
     }
 }
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/util/VersatileReadWriteLockBenchmark.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/util/VersatileReadWriteLockBenchmark.java
new file mode 100644
index 00000000000..e623f7fed98
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/util/VersatileReadWriteLockBenchmark.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util;
+
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.TimeUnit;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.RunnerException;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+
+/**
+ * Benchmark to compare performance of {@link VersatileReadWriteLock} with its 
derivatives.
+ */
+@State(Scope.Benchmark)
+@Fork(1)
+@Threads(8)
+@Warmup(iterations = 10, time = 2)
+@Measurement(iterations = 20, time = 2)
+@BenchmarkMode(Mode.Throughput)
+@OutputTimeUnit(TimeUnit.SECONDS)
+public class VersatileReadWriteLockBenchmark {
+    private final VersatileReadWriteLock simpleLock = new 
VersatileReadWriteLock(ForkJoinPool.commonPool());
+    private final StripedVersatileReadWriteLock stripedLock = new 
StripedVersatileReadWriteLock(ForkJoinPool.commonPool());
+
+    @Benchmark
+    public void simpleSync() {
+        simpleLock.readLock();
+        simpleLock.readUnlock();
+    }
+
+    @Benchmark
+    public void stripedSync() {
+        stripedLock.readLock();
+        stripedLock.readUnlock();
+    }
+
+    @Benchmark
+    public Object simpleAsync() throws Exception {
+        return 
simpleLock.inReadLockAsync(CompletableFutures::nullCompletedFuture).get();
+    }
+
+    @Benchmark
+    public Object stripedAsync() throws Exception {
+        return 
stripedLock.inReadLockAsync(CompletableFutures::nullCompletedFuture).get();
+    }
+
+    /**
+     * Benchmark's entry point.
+     */
+    public static void main(String[] args) throws RunnerException {
+        Options opt = new OptionsBuilder()
+                .include(".*" + 
VersatileReadWriteLockBenchmark.class.getSimpleName() + ".*")
+                // .jvmArgsAppend("-Djmh.executor=VIRTUAL")
+                // .addProfiler(JavaFlightRecorderProfiler.class, 
"configName=profile.jfc")
+                .build();
+
+        new Runner(opt).run();
+    }
+}
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/util/VersatileReadWriteLockTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/util/VersatileReadWriteLockTest.java
index 6a626f4bf5f..dad46111562 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/util/VersatileReadWriteLockTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/util/VersatileReadWriteLockTest.java
@@ -42,16 +42,16 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.ignite.internal.lang.RunnableX;
-import org.apache.ignite.internal.logger.IgniteLogger;
-import org.apache.ignite.internal.logger.Loggers;
-import org.apache.ignite.internal.thread.IgniteThreadFactory;
+import org.apache.ignite.internal.testframework.ExecutorServiceExtension;
+import org.apache.ignite.internal.testframework.InjectExecutorService;
 import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.extension.ExtendWith;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.EnumSource;
 import org.junitpioneer.jupiter.cartesian.CartesianTest;
@@ -61,20 +61,24 @@ import 
org.junitpioneer.jupiter.cartesian.CartesianTest.Enum;
  * Tests for {@link VersatileReadWriteLock}.
  */
 @Timeout(20)
+@ExtendWith(ExecutorServiceExtension.class)
 class VersatileReadWriteLockTest {
-    private static final IgniteLogger LOG = 
Loggers.forClass(VersatileReadWriteLockTest.class);
-
     private static final String ASYNC_CONTINUATION_THREAD_PREFIX = "ace";
 
-    private final ExecutorService asyncContinuationExecutor = 
Executors.newCachedThreadPool(
-            
IgniteThreadFactory.createWithFixedPrefix(ASYNC_CONTINUATION_THREAD_PREFIX, 
false, LOG)
-    );
+    @InjectExecutorService(threadPrefix = ASYNC_CONTINUATION_THREAD_PREFIX)
+    private ExecutorService asyncContinuationExecutor;
 
     /** The lock under test. */
-    private final VersatileReadWriteLock lock = new 
VersatileReadWriteLock(asyncContinuationExecutor);
+    private VersatileReadWriteLock lock;
 
     /** Executor service used to run tasks in threads different from the main 
test thread. */
-    private final ExecutorService executor = Executors.newCachedThreadPool();
+    @InjectExecutorService
+    private ExecutorService executor;
+
+    @BeforeEach
+    void createLock() {
+        lock = new VersatileReadWriteLock(asyncContinuationExecutor);
+    }
 
     /**
      * Cleans up after a test.
@@ -375,7 +379,16 @@ class VersatileReadWriteLockTest {
     }
 
     @Test
-    void inReadLockAsyncReleasesReadLockInTheEndInCaseOfException() {
+    void inReadLockAsyncReleasesReadLockInTheEndInCaseOfExceptionInSyncPart() {
+        assertThat(lock.inReadLockAsync(() -> {
+            throw new RuntimeException("Oops");
+        }), willThrow(Exception.class));
+
+        assertThatNoReadLockIsHeld();
+    }
+
+    @Test
+    void inReadLockAsyncReleasesReadLockInTheEndInCaseOfExceptionInAsyncPart() 
{
         assertThat(lock.inReadLockAsync(() -> failedFuture(new 
Exception("Oops"))), willThrow(Exception.class));
 
         assertThatNoReadLockIsHeld();
@@ -494,7 +507,16 @@ class VersatileReadWriteLockTest {
     }
 
     @Test
-    void inWriteLockAsyncReleasesWriteLockInTheEndInCaseOfException() {
+    void 
inWriteLockAsyncReleasesWriteLockInTheEndInCaseOfExceptionInSyncPart() {
+        assertThat(lock.inWriteLockAsync(() -> {
+            throw new RuntimeException("Oops");
+        }), willThrow(Exception.class));
+
+        assertThatNoWriteLockIsHeld();
+    }
+
+    @Test
+    void 
inWriteLockAsyncReleasesWriteLockInTheEndInCaseOfExceptionInAsyncPart() {
         assertThat(lock.inWriteLockAsync(() -> failedFuture(new 
Exception("Oops"))), willThrow(Exception.class));
 
         assertThatNoWriteLockIsHeld();
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/restart/IgniteAttachmentLock.java
 
b/modules/runner/src/main/java/org/apache/ignite/internal/restart/IgniteAttachmentLock.java
index 0c2e724bf6b..da5ad02de6e 100644
--- 
a/modules/runner/src/main/java/org/apache/ignite/internal/restart/IgniteAttachmentLock.java
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/restart/IgniteAttachmentLock.java
@@ -23,7 +23,7 @@ import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.Supplier;
 import org.apache.ignite.Ignite;
-import org.apache.ignite.internal.util.VersatileReadWriteLock;
+import org.apache.ignite.internal.util.StripedVersatileReadWriteLock;
 import org.apache.ignite.lang.ErrorGroups.Common;
 import org.apache.ignite.lang.IgniteException;
 
@@ -35,14 +35,14 @@ public class IgniteAttachmentLock {
     /** This must always be accessed under {@link #lock}. */
     private final Supplier<Ignite> igniteRef;
 
-    private final VersatileReadWriteLock lock;
+    private final StripedVersatileReadWriteLock lock;
 
     /**
      * Constructor.
      */
     public IgniteAttachmentLock(Supplier<Ignite> igniteRef, Executor 
asyncContinuationExecutor) {
         this.igniteRef = igniteRef;
-        lock = new VersatileReadWriteLock(asyncContinuationExecutor);
+        lock = new StripedVersatileReadWriteLock(asyncContinuationExecutor);
     }
 
     /**
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/restart/RestartProofKeyValueView.java
 
b/modules/runner/src/main/java/org/apache/ignite/internal/restart/RestartProofKeyValueView.java
index 9231930b5df..ac61c014ccf 100644
--- 
a/modules/runner/src/main/java/org/apache/ignite/internal/restart/RestartProofKeyValueView.java
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/restart/RestartProofKeyValueView.java
@@ -25,6 +25,8 @@ import java.util.concurrent.Flow.Publisher;
 import java.util.concurrent.Flow.Subscriber;
 import java.util.function.Function;
 import org.apache.ignite.Ignite;
+import org.apache.ignite.internal.wrapper.Wrapper;
+import org.apache.ignite.internal.wrapper.Wrappers;
 import org.apache.ignite.lang.AsyncCursor;
 import org.apache.ignite.lang.Cursor;
 import org.apache.ignite.lang.NullableValue;
@@ -44,7 +46,7 @@ import org.jetbrains.annotations.Nullable;
  * <p>API operations on this are linearized with respect to node restarts. 
Normally (except for situations when timeouts trigger), user
  * operations will not interact with detached objects.
  */
-class RestartProofKeyValueView<K, V> extends 
RestartProofApiObject<KeyValueView<K, V>> implements KeyValueView<K, V> {
+class RestartProofKeyValueView<K, V> extends 
RestartProofApiObject<KeyValueView<K, V>> implements KeyValueView<K, V>, 
Wrapper {
     RestartProofKeyValueView(
             IgniteAttachmentLock attachmentLock,
             Ignite initialIgnite,
@@ -304,4 +306,9 @@ class RestartProofKeyValueView<K, V> extends 
RestartProofApiObject<KeyValueView<
     ) {
         return attachedAsync(view -> view.queryAsync(tx, criteria, indexName, 
opts));
     }
+
+    @Override
+    public <T> T unwrap(Class<T> classToUnwrap) {
+        return attached(view -> Wrappers.unwrap(view, classToUnwrap));
+    }
 }

Reply via email to