sashapolo commented on code in PR #6472: URL: https://github.com/apache/ignite-3/pull/6472#discussion_r2297933894
########## modules/core/src/main/java/org/apache/ignite/internal/util/StripedVersatileReadWriteLock.java: ########## @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.util; + +import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; + +import java.util.Arrays; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; + +/** + * A versatile read-write lock that can be used both in synchronous and asynchronous contexts. + * Its blocking methods use the spinwait strategy. When they do so, they are not interruptible (that is, they do not break their loop on + * interruption signal). + * + * <p>The locks are NOT reentrant (that is, the same thread can NOT acquire the same lock a few times without releasing it). + * + * <p>Write lock acquire requests are prioritized over read lock acquire requests. That is, if both read and write lock + * acquire requests are received when the write lock is held by someone else, then, on its release, the write lock attempt will be served + * first. + * + * <p>Lock owners are not tracked. + * + * <p>Asynchronous locking methods may complete the futures either in the calling thread (if they were able to immediately acquire + * the requested lock) or in the supplied pool (if they had to wait for a release to happen before being able to satisfy the request). + * + * <p>Asynchronous locking methods never use spin loops. They do use CAS loops, but these are mostly very short. + * + * <p>Synchronous read lock acquisitions and releases MUST be made in the same thread. There is no such requirement for write lock and + * for asynchronous locking. + * + * <p>The lock is striped. It allows to reduce contention (as there are as many contention points as there are stripes). + */ +public class StripedVersatileReadWriteLock { + /** Default concurrency. */ + private static final int DEFAULT_CONCURRENCY = Math.max(1, Runtime.getRuntime().availableProcessors() / 2); + + private final VersatileReadWriteLock[] locks; + + /** Index generator. */ + private static final AtomicInteger INDEX_GENERATOR = new AtomicInteger(); + + /** Index. */ + private static final ThreadLocal<Integer> INDEX = ThreadLocal.withInitial(INDEX_GENERATOR::incrementAndGet); + + /** + * Constructor. + */ + public StripedVersatileReadWriteLock(Executor asyncContinuationExecutor) { + this(asyncContinuationExecutor, DEFAULT_CONCURRENCY); + } + + /** + * Constructor. + */ + private StripedVersatileReadWriteLock(Executor asyncContinuationExecutor, int concurrency) { + assert concurrency > 0 : "Concurrency must be positive, but was: " + concurrency; + + locks = new VersatileReadWriteLock[concurrency]; + for (int i = 0; i < concurrency; i++) { + locks[i] = new VersatileReadWriteLock(asyncContinuationExecutor); + } + } + + /** + * Gets current stripe index. + * + * @return Index of current thread stripe. + */ + private int currentIndex() { + int index = INDEX.get(); + + return index % locks.length; + } + + private VersatileReadWriteLock currentLock() { + return locks[currentIndex()]; + } + + /** + * Acquires the read lock. If the write lock is already held, this blocks until the write lock is released (and until all + * concurrent write locks are acquired and released, as this class prioritizes write lock attempts over read lock attempts). + * + * <p>In contrast with async locking, invocations of this method MUST be invoked in the same thread in which the corresponding + * {@link #readUnlock()} will be invoked. + */ + public void readLock() { + currentLock().readLock(); + } + + /** + * Tries to acquire the read lock. No spinwait is used if the lock cannot be acquired immediately. + * + * <p>In contrast with async locking, invocations of this method MUST be invoked in the same thread in which the corresponding + * {@link #readUnlock()} will be invoked. + * + * @return {@code true} if acquired, {@code false} if write lock is already held by someone else (or someone is waiting to acquire + * the write lock). + */ + public boolean tryReadLock() { + return currentLock().tryReadLock(); + } + + /** + * Releases the read lock. + * + * <p>In contrast with async locking, invocations of this method MUST be invoked in the same thread in which the corresponding + * {@link #readLock()} is invoked. + * + * @throws IllegalMonitorStateException thrown if the read lock is not acquired by anyone. + */ + public void readUnlock() { + currentLock().readUnlock(); + } + + private void readUnlock(int idx) { + locks[idx].readUnlock(); + } + + /** + * Acquires the write lock waiting, if needed. The thread will block until all other read and write locks are released. + */ + public void writeLock() { + int i = 0; + + // Locks must be acquired in order to avoid deadlocks. + for (; i < locks.length; i++) { + locks[i].writeLock(); + } + } + + private void writeUnlock0(int fromIndex) { + for (int i = fromIndex; i >= 0; i--) { + locks[i].writeUnlock(); + } + } + + /** + * Tries to acquire the write lock. Never blocks: if any lock has already been acquired by someone else, returns {@code false} + * immediately. + * + * @return {@code true} if the write lock has been acquired, {@code false} otherwise + */ + public boolean tryWriteLock() { + int i = 0; + + try { + for (; i < locks.length; i++) { + if (!locks[i].tryWriteLock()) { + break; + } + } + } finally { + if (0 < i && i < locks.length) { + writeUnlock0(i - 1); + } + } + + return i == locks.length; Review Comment: and `return true` here? ########## modules/core/src/main/java/org/apache/ignite/internal/util/StripedVersatileReadWriteLock.java: ########## @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.util; + +import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; + +import java.util.Arrays; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; + +/** + * A versatile read-write lock that can be used both in synchronous and asynchronous contexts. + * Its blocking methods use the spinwait strategy. When they do so, they are not interruptible (that is, they do not break their loop on + * interruption signal). + * + * <p>The locks are NOT reentrant (that is, the same thread can NOT acquire the same lock a few times without releasing it). + * + * <p>Write lock acquire requests are prioritized over read lock acquire requests. That is, if both read and write lock + * acquire requests are received when the write lock is held by someone else, then, on its release, the write lock attempt will be served + * first. + * + * <p>Lock owners are not tracked. + * + * <p>Asynchronous locking methods may complete the futures either in the calling thread (if they were able to immediately acquire + * the requested lock) or in the supplied pool (if they had to wait for a release to happen before being able to satisfy the request). + * + * <p>Asynchronous locking methods never use spin loops. They do use CAS loops, but these are mostly very short. + * + * <p>Synchronous read lock acquisitions and releases MUST be made in the same thread. There is no such requirement for write lock and + * for asynchronous locking. + * + * <p>The lock is striped. It allows to reduce contention (as there are as many contention points as there are stripes). + */ +public class StripedVersatileReadWriteLock { + /** Default concurrency. */ + private static final int DEFAULT_CONCURRENCY = Math.max(1, Runtime.getRuntime().availableProcessors() / 2); + + private final VersatileReadWriteLock[] locks; + + /** Index generator. */ + private static final AtomicInteger INDEX_GENERATOR = new AtomicInteger(); + + /** Index. */ + private static final ThreadLocal<Integer> INDEX = ThreadLocal.withInitial(INDEX_GENERATOR::incrementAndGet); + + /** + * Constructor. + */ + public StripedVersatileReadWriteLock(Executor asyncContinuationExecutor) { + this(asyncContinuationExecutor, DEFAULT_CONCURRENCY); + } + + /** + * Constructor. + */ + private StripedVersatileReadWriteLock(Executor asyncContinuationExecutor, int concurrency) { + assert concurrency > 0 : "Concurrency must be positive, but was: " + concurrency; + + locks = new VersatileReadWriteLock[concurrency]; + for (int i = 0; i < concurrency; i++) { + locks[i] = new VersatileReadWriteLock(asyncContinuationExecutor); + } + } + + /** + * Gets current stripe index. + * + * @return Index of current thread stripe. + */ + private int currentIndex() { + int index = INDEX.get(); + + return index % locks.length; + } + + private VersatileReadWriteLock currentLock() { + return locks[currentIndex()]; + } + + /** + * Acquires the read lock. If the write lock is already held, this blocks until the write lock is released (and until all + * concurrent write locks are acquired and released, as this class prioritizes write lock attempts over read lock attempts). + * + * <p>In contrast with async locking, invocations of this method MUST be invoked in the same thread in which the corresponding + * {@link #readUnlock()} will be invoked. + */ + public void readLock() { + currentLock().readLock(); + } + + /** + * Tries to acquire the read lock. No spinwait is used if the lock cannot be acquired immediately. + * + * <p>In contrast with async locking, invocations of this method MUST be invoked in the same thread in which the corresponding + * {@link #readUnlock()} will be invoked. + * + * @return {@code true} if acquired, {@code false} if write lock is already held by someone else (or someone is waiting to acquire + * the write lock). + */ + public boolean tryReadLock() { + return currentLock().tryReadLock(); + } + + /** + * Releases the read lock. + * + * <p>In contrast with async locking, invocations of this method MUST be invoked in the same thread in which the corresponding + * {@link #readLock()} is invoked. + * + * @throws IllegalMonitorStateException thrown if the read lock is not acquired by anyone. + */ + public void readUnlock() { + currentLock().readUnlock(); + } + + private void readUnlock(int idx) { + locks[idx].readUnlock(); + } + + /** + * Acquires the write lock waiting, if needed. The thread will block until all other read and write locks are released. + */ + public void writeLock() { + int i = 0; + + // Locks must be acquired in order to avoid deadlocks. + for (; i < locks.length; i++) { + locks[i].writeLock(); + } + } + + private void writeUnlock0(int fromIndex) { + for (int i = fromIndex; i >= 0; i--) { + locks[i].writeUnlock(); + } + } + + /** + * Tries to acquire the write lock. Never blocks: if any lock has already been acquired by someone else, returns {@code false} + * immediately. + * + * @return {@code true} if the write lock has been acquired, {@code false} otherwise + */ + public boolean tryWriteLock() { + int i = 0; + + try { + for (; i < locks.length; i++) { + if (!locks[i].tryWriteLock()) { + break; Review Comment: can we just write `return false` here? ########## modules/core/src/test/java/org/apache/ignite/internal/util/StripedVersatileReadWriteLockTest.java: ########## @@ -0,0 +1,571 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.util; + +import static java.lang.Thread.currentThread; +import static java.util.concurrent.CompletableFuture.allOf; +import static java.util.concurrent.CompletableFuture.anyOf; +import static java.util.concurrent.CompletableFuture.completedFuture; +import static java.util.concurrent.CompletableFuture.failedFuture; +import static java.util.concurrent.CompletableFuture.runAsync; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.ignite.internal.testframework.IgniteTestUtils.runRace; +import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition; +import static org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow; +import static org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willTimeoutIn; +import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe; +import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.startsWith; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.ignite.internal.lang.RunnableX; +import org.apache.ignite.internal.logger.IgniteLogger; +import org.apache.ignite.internal.logger.Loggers; +import org.apache.ignite.internal.thread.IgniteThreadFactory; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; +import org.junitpioneer.jupiter.cartesian.CartesianTest; +import org.junitpioneer.jupiter.cartesian.CartesianTest.Enum; + +/** + * Tests for {@link StripedVersatileReadWriteLock}. + */ +@Timeout(20) +class StripedVersatileReadWriteLockTest { + private static final IgniteLogger LOG = Loggers.forClass(StripedVersatileReadWriteLockTest.class); + + private static final String ASYNC_CONTINUATION_THREAD_PREFIX = "ace"; + + private final ExecutorService asyncContinuationExecutor = Executors.newCachedThreadPool( + IgniteThreadFactory.createWithFixedPrefix(ASYNC_CONTINUATION_THREAD_PREFIX, false, LOG) + ); + + /** The lock under test. */ + private final StripedVersatileReadWriteLock lock = new StripedVersatileReadWriteLock(asyncContinuationExecutor); + + /** Executor service used to run tasks in threads different from the main test thread. */ + private final ExecutorService executor = Executors.newCachedThreadPool(); + + /** + * Cleans up after a test. + */ + @AfterEach + void cleanup() { + releaseReadLocks(); + releaseWriteLocks(); + + IgniteUtils.shutdownAndAwaitTermination(executor, 3, SECONDS); + IgniteUtils.shutdownAndAwaitTermination(asyncContinuationExecutor, 3, SECONDS); + } + + private void releaseReadLocks() { + while (true) { + try { + lock.readUnlock(); + } catch (IllegalMonitorStateException e) { + // Released our read lock completely. Review Comment: I'm confused, why do we need to release the locks in a cycle? ########## modules/core/src/main/java/org/apache/ignite/internal/util/StripedVersatileReadWriteLock.java: ########## @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.util; + +import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; + +import java.util.Arrays; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; + +/** + * A versatile read-write lock that can be used both in synchronous and asynchronous contexts. + * Its blocking methods use the spinwait strategy. When they do so, they are not interruptible (that is, they do not break their loop on + * interruption signal). + * + * <p>The locks are NOT reentrant (that is, the same thread can NOT acquire the same lock a few times without releasing it). + * + * <p>Write lock acquire requests are prioritized over read lock acquire requests. That is, if both read and write lock + * acquire requests are received when the write lock is held by someone else, then, on its release, the write lock attempt will be served + * first. + * + * <p>Lock owners are not tracked. + * + * <p>Asynchronous locking methods may complete the futures either in the calling thread (if they were able to immediately acquire + * the requested lock) or in the supplied pool (if they had to wait for a release to happen before being able to satisfy the request). + * + * <p>Asynchronous locking methods never use spin loops. They do use CAS loops, but these are mostly very short. + * + * <p>Synchronous read lock acquisitions and releases MUST be made in the same thread. There is no such requirement for write lock and + * for asynchronous locking. + * + * <p>The lock is striped. It allows to reduce contention (as there are as many contention points as there are stripes). + */ +public class StripedVersatileReadWriteLock { + /** Default concurrency. */ + private static final int DEFAULT_CONCURRENCY = Math.max(1, Runtime.getRuntime().availableProcessors() / 2); + + private final VersatileReadWriteLock[] locks; + + /** Index generator. */ + private static final AtomicInteger INDEX_GENERATOR = new AtomicInteger(); + + /** Index. */ + private static final ThreadLocal<Integer> INDEX = ThreadLocal.withInitial(INDEX_GENERATOR::incrementAndGet); + + /** + * Constructor. + */ + public StripedVersatileReadWriteLock(Executor asyncContinuationExecutor) { + this(asyncContinuationExecutor, DEFAULT_CONCURRENCY); + } + + /** + * Constructor. + */ + private StripedVersatileReadWriteLock(Executor asyncContinuationExecutor, int concurrency) { + assert concurrency > 0 : "Concurrency must be positive, but was: " + concurrency; + + locks = new VersatileReadWriteLock[concurrency]; + for (int i = 0; i < concurrency; i++) { + locks[i] = new VersatileReadWriteLock(asyncContinuationExecutor); + } + } + + /** + * Gets current stripe index. + * + * @return Index of current thread stripe. + */ + private int currentIndex() { + int index = INDEX.get(); + + return index % locks.length; + } + + private VersatileReadWriteLock currentLock() { + return locks[currentIndex()]; + } + + /** + * Acquires the read lock. If the write lock is already held, this blocks until the write lock is released (and until all + * concurrent write locks are acquired and released, as this class prioritizes write lock attempts over read lock attempts). + * + * <p>In contrast with async locking, invocations of this method MUST be invoked in the same thread in which the corresponding + * {@link #readUnlock()} will be invoked. + */ + public void readLock() { + currentLock().readLock(); + } + + /** + * Tries to acquire the read lock. No spinwait is used if the lock cannot be acquired immediately. + * + * <p>In contrast with async locking, invocations of this method MUST be invoked in the same thread in which the corresponding + * {@link #readUnlock()} will be invoked. + * + * @return {@code true} if acquired, {@code false} if write lock is already held by someone else (or someone is waiting to acquire + * the write lock). + */ + public boolean tryReadLock() { + return currentLock().tryReadLock(); + } + + /** + * Releases the read lock. + * + * <p>In contrast with async locking, invocations of this method MUST be invoked in the same thread in which the corresponding + * {@link #readLock()} is invoked. + * + * @throws IllegalMonitorStateException thrown if the read lock is not acquired by anyone. + */ + public void readUnlock() { + currentLock().readUnlock(); + } + + private void readUnlock(int idx) { + locks[idx].readUnlock(); + } + + /** + * Acquires the write lock waiting, if needed. The thread will block until all other read and write locks are released. + */ + public void writeLock() { + int i = 0; + + // Locks must be acquired in order to avoid deadlocks. + for (; i < locks.length; i++) { + locks[i].writeLock(); + } + } + + private void writeUnlock0(int fromIndex) { + for (int i = fromIndex; i >= 0; i--) { + locks[i].writeUnlock(); + } + } + + /** + * Tries to acquire the write lock. Never blocks: if any lock has already been acquired by someone else, returns {@code false} + * immediately. + * + * @return {@code true} if the write lock has been acquired, {@code false} otherwise + */ + public boolean tryWriteLock() { + int i = 0; + + try { + for (; i < locks.length; i++) { + if (!locks[i].tryWriteLock()) { + break; + } + } + } finally { + if (0 < i && i < locks.length) { + writeUnlock0(i - 1); + } + } + + return i == locks.length; + } + + /** + * Releases the write lock. + * + * @throws IllegalMonitorStateException thrown if the write lock is not acquired. + */ + public void writeUnlock() { + writeUnlock0(locks.length - 1); + } + + /** + * Executes the provided asynchronous action under protection of a read lock: that is, it first obtains a read lock + * asynchronously, then executes the action, and then releases the lock. + * + * @param action Action to execute. + * @return Action result. + */ + public <T> CompletableFuture<T> inReadLockAsync(Supplier<? extends CompletableFuture<T>> action) { + return readLockAsync() + .thenCompose(index -> { + return nullCompletedFuture() Review Comment: I'm not a fan of this approach, maybe we can compute the `index` at the beginning of this method and pass it both to `readLockAsync` and `readUnlock`? This will make this method looks similar to `inWriteLockAsync` ########## modules/core/src/main/java/org/apache/ignite/internal/util/StripedVersatileReadWriteLock.java: ########## @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.util; + +import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; + +import java.util.Arrays; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; + +/** + * A versatile read-write lock that can be used both in synchronous and asynchronous contexts. + * Its blocking methods use the spinwait strategy. When they do so, they are not interruptible (that is, they do not break their loop on + * interruption signal). + * + * <p>The locks are NOT reentrant (that is, the same thread can NOT acquire the same lock a few times without releasing it). + * + * <p>Write lock acquire requests are prioritized over read lock acquire requests. That is, if both read and write lock + * acquire requests are received when the write lock is held by someone else, then, on its release, the write lock attempt will be served + * first. + * + * <p>Lock owners are not tracked. + * + * <p>Asynchronous locking methods may complete the futures either in the calling thread (if they were able to immediately acquire + * the requested lock) or in the supplied pool (if they had to wait for a release to happen before being able to satisfy the request). + * + * <p>Asynchronous locking methods never use spin loops. They do use CAS loops, but these are mostly very short. + * + * <p>Synchronous read lock acquisitions and releases MUST be made in the same thread. There is no such requirement for write lock and + * for asynchronous locking. + * + * <p>The lock is striped. It allows to reduce contention (as there are as many contention points as there are stripes). + */ +public class StripedVersatileReadWriteLock { + /** Default concurrency. */ + private static final int DEFAULT_CONCURRENCY = Math.max(1, Runtime.getRuntime().availableProcessors() / 2); + + private final VersatileReadWriteLock[] locks; + + /** Index generator. */ + private static final AtomicInteger INDEX_GENERATOR = new AtomicInteger(); + + /** Index. */ + private static final ThreadLocal<Integer> INDEX = ThreadLocal.withInitial(INDEX_GENERATOR::incrementAndGet); + + /** + * Constructor. + */ + public StripedVersatileReadWriteLock(Executor asyncContinuationExecutor) { + this(asyncContinuationExecutor, DEFAULT_CONCURRENCY); + } + + /** + * Constructor. + */ + private StripedVersatileReadWriteLock(Executor asyncContinuationExecutor, int concurrency) { + assert concurrency > 0 : "Concurrency must be positive, but was: " + concurrency; + + locks = new VersatileReadWriteLock[concurrency]; + for (int i = 0; i < concurrency; i++) { + locks[i] = new VersatileReadWriteLock(asyncContinuationExecutor); + } + } + + /** + * Gets current stripe index. + * + * @return Index of current thread stripe. + */ + private int currentIndex() { + int index = INDEX.get(); + + return index % locks.length; + } + + private VersatileReadWriteLock currentLock() { + return locks[currentIndex()]; + } + + /** + * Acquires the read lock. If the write lock is already held, this blocks until the write lock is released (and until all + * concurrent write locks are acquired and released, as this class prioritizes write lock attempts over read lock attempts). + * + * <p>In contrast with async locking, invocations of this method MUST be invoked in the same thread in which the corresponding + * {@link #readUnlock()} will be invoked. + */ + public void readLock() { + currentLock().readLock(); + } + + /** + * Tries to acquire the read lock. No spinwait is used if the lock cannot be acquired immediately. + * + * <p>In contrast with async locking, invocations of this method MUST be invoked in the same thread in which the corresponding + * {@link #readUnlock()} will be invoked. + * + * @return {@code true} if acquired, {@code false} if write lock is already held by someone else (or someone is waiting to acquire + * the write lock). + */ + public boolean tryReadLock() { + return currentLock().tryReadLock(); + } + + /** + * Releases the read lock. + * + * <p>In contrast with async locking, invocations of this method MUST be invoked in the same thread in which the corresponding + * {@link #readLock()} is invoked. + * + * @throws IllegalMonitorStateException thrown if the read lock is not acquired by anyone. + */ + public void readUnlock() { + currentLock().readUnlock(); + } + + private void readUnlock(int idx) { + locks[idx].readUnlock(); + } + + /** + * Acquires the write lock waiting, if needed. The thread will block until all other read and write locks are released. + */ + public void writeLock() { + int i = 0; Review Comment: why is this variable declared here and not as a part of the cycle? ########## modules/core/src/test/java/org/apache/ignite/internal/util/StripedVersatileReadWriteLockTest.java: ########## @@ -0,0 +1,571 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.util; + +import static java.lang.Thread.currentThread; +import static java.util.concurrent.CompletableFuture.allOf; +import static java.util.concurrent.CompletableFuture.anyOf; +import static java.util.concurrent.CompletableFuture.completedFuture; +import static java.util.concurrent.CompletableFuture.failedFuture; +import static java.util.concurrent.CompletableFuture.runAsync; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.ignite.internal.testframework.IgniteTestUtils.runRace; +import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition; +import static org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow; +import static org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willTimeoutIn; +import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe; +import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.startsWith; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.ignite.internal.lang.RunnableX; +import org.apache.ignite.internal.logger.IgniteLogger; +import org.apache.ignite.internal.logger.Loggers; +import org.apache.ignite.internal.thread.IgniteThreadFactory; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; +import org.junitpioneer.jupiter.cartesian.CartesianTest; +import org.junitpioneer.jupiter.cartesian.CartesianTest.Enum; + +/** + * Tests for {@link StripedVersatileReadWriteLock}. + */ +@Timeout(20) +class StripedVersatileReadWriteLockTest { + private static final IgniteLogger LOG = Loggers.forClass(StripedVersatileReadWriteLockTest.class); + + private static final String ASYNC_CONTINUATION_THREAD_PREFIX = "ace"; + + private final ExecutorService asyncContinuationExecutor = Executors.newCachedThreadPool( + IgniteThreadFactory.createWithFixedPrefix(ASYNC_CONTINUATION_THREAD_PREFIX, false, LOG) + ); + + /** The lock under test. */ + private final StripedVersatileReadWriteLock lock = new StripedVersatileReadWriteLock(asyncContinuationExecutor); + + /** Executor service used to run tasks in threads different from the main test thread. */ + private final ExecutorService executor = Executors.newCachedThreadPool(); Review Comment: Can we use `ExecutorExtension` here? ########## modules/core/src/main/java/org/apache/ignite/internal/util/StripedVersatileReadWriteLock.java: ########## @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.util; + +import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; + +import java.util.Arrays; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; + +/** + * A versatile read-write lock that can be used both in synchronous and asynchronous contexts. + * Its blocking methods use the spinwait strategy. When they do so, they are not interruptible (that is, they do not break their loop on + * interruption signal). + * + * <p>The locks are NOT reentrant (that is, the same thread can NOT acquire the same lock a few times without releasing it). + * + * <p>Write lock acquire requests are prioritized over read lock acquire requests. That is, if both read and write lock + * acquire requests are received when the write lock is held by someone else, then, on its release, the write lock attempt will be served + * first. + * + * <p>Lock owners are not tracked. + * + * <p>Asynchronous locking methods may complete the futures either in the calling thread (if they were able to immediately acquire + * the requested lock) or in the supplied pool (if they had to wait for a release to happen before being able to satisfy the request). + * + * <p>Asynchronous locking methods never use spin loops. They do use CAS loops, but these are mostly very short. + * + * <p>Synchronous read lock acquisitions and releases MUST be made in the same thread. There is no such requirement for write lock and + * for asynchronous locking. + * + * <p>The lock is striped. It allows to reduce contention (as there are as many contention points as there are stripes). + */ +public class StripedVersatileReadWriteLock { + /** Default concurrency. */ + private static final int DEFAULT_CONCURRENCY = Math.max(1, Runtime.getRuntime().availableProcessors() / 2); + + private final VersatileReadWriteLock[] locks; + + /** Index generator. */ + private static final AtomicInteger INDEX_GENERATOR = new AtomicInteger(); + + /** Index. */ + private static final ThreadLocal<Integer> INDEX = ThreadLocal.withInitial(INDEX_GENERATOR::incrementAndGet); + + /** + * Constructor. + */ + public StripedVersatileReadWriteLock(Executor asyncContinuationExecutor) { + this(asyncContinuationExecutor, DEFAULT_CONCURRENCY); + } + + /** + * Constructor. Review Comment: Why do we need this javadoc? This method is private -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
