This is an automated email from the ASF dual-hosted git repository. benw pushed a commit to branch javax in repository https://gitbox.apache.org/repos/asf/tapestry-5.git
commit 89e191ec7f0b552db6f52cccba5362528766efe3 Author: Ben Weidig <[email protected]> AuthorDate: Sat Apr 4 16:45:56 2026 +0200 TAP5-2820: ConcurrentBarrier remove synchronized blocks Under heavy contention, the synchronized blocks introduced a global monitor on a ThreadLocal that can lead to livelock/starvation. ReentrantWriteLock already supports local thread-based locking, so relying on that is the better approach. Furthermore, InterrupedExceptions were silently swallowed without setting the flag, which is now no longer the case. The existing tests were converted from Spock to Java. --- .../ioc/internal/util/ConcurrentBarrier.java | 223 ++++++++----------- .../groovy/ioc/specs/ConcurrentBarrierSpec.groovy | 147 ------------- .../ioc/internal/util/ConcurrentBarrierTest.java | 245 +++++++++++++++++++++ 3 files changed, 340 insertions(+), 275 deletions(-) diff --git a/tapestry-ioc/src/main/java/org/apache/tapestry5/ioc/internal/util/ConcurrentBarrier.java b/tapestry-ioc/src/main/java/org/apache/tapestry5/ioc/internal/util/ConcurrentBarrier.java index eb37a0825..675f8a500 100644 --- a/tapestry-ioc/src/main/java/org/apache/tapestry5/ioc/internal/util/ConcurrentBarrier.java +++ b/tapestry-ioc/src/main/java/org/apache/tapestry5/ioc/internal/util/ConcurrentBarrier.java @@ -15,7 +15,6 @@ package org.apache.tapestry5.ioc.internal.util; import org.apache.tapestry5.ioc.Invokable; import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; /** @@ -25,57 +24,29 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; */ public class ConcurrentBarrier { - private final ReadWriteLock lock = new ReentrantReadWriteLock(); - - /** - * This is, of course, a bit of a problem. We don't have an avenue for ensuring that this ThreadLocal is destroyed - * at the end of the request, and that means a thread can hold a reference to the class and the class loader which - * loaded it. This may cause redeployment problems (leaked classes and class loaders). Apparently JDK 1.6 provides - * the APIs to check to see if the current thread has a read lock. So, we tend to remove the TL, rather than set its - * value to false. - */ - private static class ThreadBoolean extends ThreadLocal<Boolean> - { - @Override - protected Boolean initialValue() - { - return false; - } - } - - private final ThreadBoolean threadHasReadLock = new ThreadBoolean(); + private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); /** * Invokes the object after acquiring the read lock (if necessary). If invoked when the read lock has not yet been * acquired, then the lock is acquired for the duration of the call. If the lock has already been acquired, then the * status of the lock is not changed. + * <p> + * This method is completely re-entrant. If a write lock is already acquired by the current thread, + * this method gracefully bypasses acquiring the read lock to prevent deadlocks. * - * TODO: Check to see if the write lock is acquired and <em>not</em> acquire the read lock in that situation. - * Currently this code is not re-entrant. If a write lock is already acquired and the thread attempts to get the - * read lock, then the thread will hang. For the moment, all the uses of ConcurrentBarrier are coded in such a way - * that reentrant locks are not a problem. - * - * @param <T> - * @param invokable + * @param <T> the type of the return value + * @param invokable the code to execute safely inside the read lock * @return the result of invoking the invokable */ public <T> T withRead(Invokable<T> invokable) { - boolean readLockedAtEntry; - - synchronized (threadHasReadLock) - { - readLockedAtEntry = threadHasReadLock.get(); - } + // If the current thread already holds a read lock, OR if it holds the exclusive write lock, + // it implicitly has read access. We can safely bypass acquiring the read lock again. + boolean readLockedAtEntry = lock.getReadHoldCount() > 0 || lock.isWriteLockedByCurrentThread(); if (!readLockedAtEntry) { lock.readLock().lock(); - - synchronized (threadHasReadLock) - { - threadHasReadLock.set(true); - } } try @@ -84,14 +55,10 @@ public class ConcurrentBarrier } finally { + // Only unlock if we were the ones who actually locked it in this method call. if (!readLockedAtEntry) { lock.readLock().unlock(); - - synchronized (threadHasReadLock) - { - threadHasReadLock.remove(); - } } } } @@ -101,36 +68,39 @@ public class ConcurrentBarrier */ public void withRead(final Runnable runnable) { - Invokable<Void> invokable = new Invokable<Void>() - { - @Override - public Void invoke() - { - runnable.run(); - - return null; - } - }; - - withRead(invokable); + withRead((Invokable<Void>) () -> { + runnable.run(); + return null; + }); } /** * Acquires the exclusive write lock before invoking the Invokable. The code will be executed exclusively, no other - * reader or writer threads will exist (they will be blocked waiting for the lock). If the current thread has a read - * lock, it is released before attempting to acquire the write lock, and re-acquired after the write lock is - * released. Note that in that short window, between releasing the read lock and acquiring the write lock, it is - * entirely possible that some other thread will sneak in and do some work, so the {@link Invokable} object should - * be prepared for cases where the state has changed slightly, despite holding the read lock. This usually manifests - * as race conditions where either a) some parallel unrelated bit of work has occured or b) duplicate work has - * occured. The latter is only problematic if the operation is very expensive. + * reader or writer threads will exist (they will be blocked waiting for the lock). + * <p> + * If the current thread has a read lock, it is released before attempting to acquire the write lock, and + * re-acquired after the write lock is released. Note that in that short window, between releasing the read lock + * and acquiring the write lock, it is entirely possible that some other thread will sneak in and do some work, + * so the {@link Invokable} object should be prepared for cases where the state has changed slightly, despite + * holding the read lock. This usually manifests as race conditions where either a) some parallel unrelated + * bit of work has occured or b) duplicate work has occured. The latter is only problematic if the operation + * is very expensive. * - * @param <T> - * @param invokable + * @param <T> the type of the return value + * @param invokable the code to execute safely inside the write lock + * @return the result of invoking the invokable */ public <T> T withWrite(Invokable<T> invokable) { - boolean readLockedAtEntry = releaseReadLock(); + boolean readLockedAtEntry = lock.getReadHoldCount() > 0; + + // ReentrantReadWriteLock does NOT allow upgrading a read lock directly to a write lock. + // If we try to get the write lock while holding a read lock, the thread will deadlock forever. + // Therefore, we MUST release our read lock first before asking for the write lock. + if (readLockedAtEntry) + { + lock.readLock().unlock(); + } lock.writeLock().lock(); @@ -140,49 +110,14 @@ public class ConcurrentBarrier } finally { + // Always release the write lock when done lock.writeLock().unlock(); - restoreReadLock(readLockedAtEntry); - } - } - - private boolean releaseReadLock() - { - boolean readLockedAtEntry; - - synchronized (threadHasReadLock) - { - readLockedAtEntry = threadHasReadLock.get(); - } - - if (readLockedAtEntry) - { - lock.readLock().unlock(); - - synchronized (threadHasReadLock) - { - threadHasReadLock.set(false); - } - } - - return readLockedAtEntry; - } - private void restoreReadLock(boolean readLockedAtEntry) - { - if (readLockedAtEntry) - { - lock.readLock().lock(); - - synchronized (threadHasReadLock) - { - threadHasReadLock.set(true); - } - } - else - { - synchronized (threadHasReadLock) + // If we released a read lock at the start of this method, we must restore it here + // so the calling code doesn't break when it assumes it still has a read lock. + if (readLockedAtEntry) { - threadHasReadLock.remove(); + restoreReadLock(); } } } @@ -192,22 +127,14 @@ public class ConcurrentBarrier */ public void withWrite(final Runnable runnable) { - Invokable<Void> invokable = new Invokable<Void>() - { - @Override - public Void invoke() - { - runnable.run(); - - return null; - } - }; - - withWrite(invokable); + withWrite((Invokable<Void>) () -> { + runnable.run(); + return null; + }); } /** - * Try to aquire the exclusive write lock and invoke the Runnable. If the write lock is obtained within the specfied + * Try to acquire the exclusive write lock and invoke the Runnable. If the write lock is obtained within the specified * timeout, then this method behaves as {@link #withWrite(Runnable)} and will return true. If the write lock is not * obtained within the timeout then the runnable is never invoked and the method will return false. * @@ -218,34 +145,74 @@ public class ConcurrentBarrier */ public boolean tryWithWrite(final Runnable runnable, long timeout, TimeUnit timeoutUnit) { - boolean readLockedAtEntry = releaseReadLock(); + boolean readLockedAtEntry = lock.getReadHoldCount() > 0; - boolean obtainedLock = false; - - try + // Just like withWrite(), we must temporarily release any read lock we hold + // so we don't deadlock while waiting for the write lock. + if (readLockedAtEntry) { - try - { - obtainedLock = lock.writeLock().tryLock(timeout, timeoutUnit); + lock.readLock().unlock(); + } - if (obtainedLock) runnable.run(); + boolean obtainedLock = false; + try { + try { + obtainedLock = lock.writeLock().tryLock(timeout, timeoutUnit); + if (obtainedLock) + { + runnable.run(); + } } catch (InterruptedException e) { obtainedLock = false; + + // We MUST re-interrupt the thread here so the system knows the thread was asked to stop. + Thread.currentThread().interrupt(); } finally { - if (obtainedLock) lock.writeLock().unlock(); + // Only unlock the write lock if we actually succeeded in getting it + if (obtainedLock) + { + lock.writeLock().unlock(); + } } } finally { - restoreReadLock(readLockedAtEntry); + // Ensure that no matter what happened (timeout, interrupt, error), + // if the thread came in with a read lock, it leaves with a read lock. + if (readLockedAtEntry) + { + restoreReadLock(); + } } return obtainedLock; } + /** + * Helper to safely re-acquire the read lock after it was temporarily dropped. + * Uses a generous timeout to prevent unbounded reader starvation if a massive + * queue of writers is pending. + */ + private void restoreReadLock() + { + try + { + // Give it 30 seconds to re-acquire to prevent infinite starvation. + if (!lock.readLock().tryLock(30, TimeUnit.SECONDS)) + { + throw new IllegalStateException("Unable to re-acquire read lock after 30 seconds due to writer starvation."); + } + } + catch (InterruptedException e) + { + // The thread was interrupted while waiting to get its read lock back. + Thread.currentThread().interrupt(); + throw new IllegalStateException("Interrupted while attempting to re-acquire read lock.", e); + } + } } diff --git a/tapestry-ioc/src/test/groovy/ioc/specs/ConcurrentBarrierSpec.groovy b/tapestry-ioc/src/test/groovy/ioc/specs/ConcurrentBarrierSpec.groovy deleted file mode 100644 index 1a4ca016c..000000000 --- a/tapestry-ioc/src/test/groovy/ioc/specs/ConcurrentBarrierSpec.groovy +++ /dev/null @@ -1,147 +0,0 @@ -package ioc.specs - -import org.apache.tapestry5.ioc.test.internal.util.ConcurrentTarget -import org.apache.tapestry5.ioc.test.internal.util.ConcurrentTargetWrapper - -import spock.lang.Specification - -class ConcurrentBarrierSpec extends Specification { - - def target = new ConcurrentTarget() - - static final int THREAD_COUNT = 1000 - - static final int THREAD_BLOCK_SIZE = 50 - - def run(op) { - def threads = [] - def running = [] - - assert target.counter == 0 - - THREAD_COUNT.times { - def t = new Thread(op) - - threads << t - - if (threads.size() >= THREAD_BLOCK_SIZE) { - threads.each { it.start() } - running.addAll threads - threads.clear() - } - } - - running.each { it.join() } - } - - def "acquire write lock"() { - - when: - - run { target.incrementCounter() } - - then: - - target.counter == THREAD_COUNT - } - - def "acquire read lock while holding write lock"() { - - when: - - run { target.incrementCounterHard() } - - then: - - target.counter == THREAD_COUNT - } - - def "upgrade read lock to write lock"() { - when: - - run { target.incrementIfNonNegative() } - - then: - - target.counter == THREAD_COUNT - } - - def "indirection between method with read lock and method that acquires write lock"() { - - when: - - run { target.incrementViaRunnable() } - - then: - - target.counter == THREAD_COUNT - } - - def "barriers are independent when multiple are involved"() { - - when: - - run(new ConcurrentTargetWrapper(target)) - - then: - - target.counter == THREAD_COUNT - } - - def "use tryWithWrite() to get write lock if it is available"() { - - when: run { - def good = false - while (!good) { good = target.tryIncrementCounter() } - } - - then: - - target.counter == THREAD_COUNT - } - - def "acquire read lock when inside a tryWithWrite block"() { - - when: - - run { - def good = false - while (!good) { good = target.tryIncrementCounterHard() } - } - - then: - - target.counter == THREAD_COUNT - } - - def "read lock upgrades via tryWriteLock()"() { - - when: - - run { - def good = false - while (!good) { good = target.tryIncrementIfNonNegative() } - } - - then: - - target.counter == THREAD_COUNT - } - - def "write lock timeout inside read lock"() { - when: - - target.withRead { - try { - run { - assert target.tryIncrementIfNonNegative() == false - } - } - catch (InterruptedException e) { } - } - - then: - - target.counter == 0 - } -} diff --git a/tapestry-ioc/src/test/java/org/apache/tapestry5/ioc/internal/util/ConcurrentBarrierTest.java b/tapestry-ioc/src/test/java/org/apache/tapestry5/ioc/internal/util/ConcurrentBarrierTest.java new file mode 100644 index 000000000..feec7156b --- /dev/null +++ b/tapestry-ioc/src/test/java/org/apache/tapestry5/ioc/internal/util/ConcurrentBarrierTest.java @@ -0,0 +1,245 @@ +// Copyright 2026 The Apache Software Foundation +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package org.apache.tapestry5.ioc.internal.util; + +import org.apache.tapestry5.ioc.test.internal.util.ConcurrentTarget; +import org.apache.tapestry5.ioc.test.internal.util.ConcurrentTargetWrapper; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import spock.lang.Issue; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class ConcurrentBarrierTest { + + private ConcurrentTarget target; + + private static final int THREAD_COUNT = 1000; + private static final int THREAD_BLOCK_SIZE = 50; + + @BeforeEach + void beforeEach() { + target = new ConcurrentTarget(); + } + + /** + * Helper method to execute a runnable across 1000 threads, + * batched and started in blocks of 50. + */ + private void run(Runnable op) throws InterruptedException { + assertEquals(0, target.getCounter(), "Counter should initially be 0"); + + List<Thread> threads = new ArrayList<>(THREAD_COUNT); + List<Thread> running = new ArrayList<>(THREAD_COUNT); + + for (int i = 0; i < THREAD_COUNT; i++) { + Thread t = new Thread(op); + threads.add(t); + + if (threads.size() >= THREAD_BLOCK_SIZE) { + for (Thread thread : threads) { + thread.start(); + } + running.addAll(threads); + threads.clear(); + } + } + + for (Thread thread : running) { + thread.join(); + } + } + + @Test + void acquireWriteLock() throws InterruptedException { + run(target::incrementCounter); + + assertEquals(THREAD_COUNT, target.getCounter()); + } + + @Test + void acquireReadLockWhileHoldingWriteLock() throws InterruptedException { + run(target::incrementCounterHard); + + assertEquals(THREAD_COUNT, target.getCounter()); + } + + @Test + void upgradeReadLockToWriteLock() throws InterruptedException { + run(target::incrementIfNonNegative); + + assertEquals(THREAD_COUNT, target.getCounter()); + } + + @Test + void indirectionBetweenMethodWithReadLockAndMethodThatAcquiresWriteLock() throws InterruptedException { + run(target::incrementViaRunnable); + + assertEquals(THREAD_COUNT, target.getCounter()); + } + + @Test + void barriersAreIndependentWhenMultipleAreInvolved() throws InterruptedException { + run(new ConcurrentTargetWrapper(target)); + + assertEquals(THREAD_COUNT, target.getCounter()); + } + + @Test + void useTryWithWriteToGetWriteLockIfItIsAvailable() throws InterruptedException { + run(() -> { + boolean good = false; + while (!good) { + good = target.tryIncrementCounter(); + } + }); + + assertEquals(THREAD_COUNT, target.getCounter()); + } + + @Test + void acquireReadLockWhenInsideATryWithWriteBlock() throws InterruptedException { + run(() -> { + boolean good = false; + while (!good) { + good = target.tryIncrementCounterHard(); + } + }); + + assertEquals(THREAD_COUNT, target.getCounter()); + } + + @Test + void readLockUpgradesViaTryWriteLock() throws InterruptedException { + run(() -> { + boolean good = false; + while (!good) { + good = target.tryIncrementIfNonNegative(); + } + }); + + assertEquals(THREAD_COUNT, target.getCounter()); + } + + @Test + void writeLockTimeoutInsideReadLock() { + target.withRead(() -> { + try { + run(() -> { + // With a read lock already held by the main thread, + // the background threads will fail to get the write lock. + assertFalse(target.tryIncrementIfNonNegative()); + }); + } catch (InterruptedException e) { + // Restore interrupt status as best practice + Thread.currentThread().interrupt(); + } + }); + + assertEquals(0, target.getCounter()); + } + + @Issue("TAP5-2820") + @Test + void testInterruptedExceptionIsNotSwallowed() throws InterruptedException { + + ConcurrentBarrier barrier = new ConcurrentBarrier(); + AtomicBoolean threadWasInterrupted = new AtomicBoolean(false); + CountDownLatch backgroundThreadBlocked = new CountDownLatch(1); + + // 1. Main thread takes the write lock + barrier.withWrite(() -> { + Thread backgroundThread = new Thread(() -> { + backgroundThreadBlocked.countDown(); + // 2. Background thread attempts to acquire write lock and blocks + barrier.tryWithWrite(() -> {}, 10, TimeUnit.SECONDS); + + // 3. Check if the interrupt flag was safely restored after the catch block + threadWasInterrupted.set(Thread.currentThread().isInterrupted()); + }); + + backgroundThread.start(); + try { + backgroundThreadBlocked.await(); + Thread.sleep(100); // Ensure it is waiting inside tryLock() + + // 4. Send the interrupt signal + backgroundThread.interrupt(); + backgroundThread.join(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + return null; + }); + + assertTrue(threadWasInterrupted.get(), "Thread interrupt status was swallowed and lost!"); + } + + @Issue("TAP5-2820") + @Test + void testHiddenBarrierLivelock() throws InterruptedException { + + // Previously, the ConcurrenBarrier was synchronized on a ThreadLocal monitor, + // which could cause a livelock/starvation scenario under heavy contention. + + ConcurrentBarrier barrier = new ConcurrentBarrier(); + ExecutorService executor = Executors.newFixedThreadPool(THREAD_COUNT); + CountDownLatch startLatch = new CountDownLatch(1); + CountDownLatch doneLatch = new CountDownLatch(THREAD_COUNT); + + Runnable task = () -> { + try { + startLatch.await(); // Start all threads at the exact same time + + boolean good = false; + int attempts = 0; + // Simulate the while(!good) livelock scenario + while (!good && attempts < 100) { + good = barrier.tryWithWrite(() -> {}, 1, TimeUnit.MILLISECONDS); + attempts++; + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + doneLatch.countDown(); + } + }; + + for (int i = 0; i < THREAD_COUNT; i++) { + executor.submit(task); + } + + long startTime = System.currentTimeMillis(); + startLatch.countDown(); // Unleash the threads + + // Wait for threads to finish, with a max timeout + boolean finishedInTime = doneLatch.await(10, TimeUnit.SECONDS); + long duration = System.currentTimeMillis() - startTime; + + executor.shutdownNow(); + + assertTrue(finishedInTime, "Livelock detected! Threads took too long to complete: " + duration + "ms"); + } +} \ No newline at end of file
