Hi Viktor,

I hope you're doing well. I apologize for reaching out again so soon, but after 
further reflection, I realized that my previous explanation regarding the issue 
might have been inaccurate.

In my earlier email, I mentioned that the problem was related to count and 
signal, but it seems that this isn't the cause of the issue. I wanted to 
clarify the situation based on the same assumption—that multiple threads (T1 to 
T10) are waiting in Condition::await() while the queue is full.

Here's a clearer outline of the scenario:

Threads T1 to T10 are waiting on Condition::await() because the queue is full.
T11 calls take() and holds the lock via lock.lockInterruptibly().
T12 calls queue.put() and enters the wait queue for lock.lockInterruptibly(). 
(As I understand, the wait queue for ReentrantLock and Condition are separate.)
T11 reduces the count and sends a signal, then releases the lock.
T1 receives the signal and moves to the lock queue. Since the ReentrantLock is 
in fair mode, T12 (which was already waiting) gets priority, and T1 will 
acquire the lock later.
T12 acquires the lock and successfully enqueues.
Ultimately, while take() is being called, if other threads are queued for the 
ReentrantLock, the thread that receives the signal may be pushed down in 
priority in the ReentrantLock queue. This is the situation I suspect might be 
happening, although I believe it would occur only very rarely.
Once again, I greatly appreciate your patience and willingness to listen to my 
thoughts, despite my imperfect understanding. Thank you so much for your time 
and insights.

Best regards,

Kim Minju


> 2024. 9. 7. 오전 1:02, 김민주 <miiiinj...@gmail.com> 작성:
> 
> Dear Viktor,
> 
> I hope this email finds you well. First and foremost, thank you for providing 
> such a detailed explanation of the AQS issue.
> 
> Based on Archie's suggestion, I've taken the liberty of implementing a custom 
> ArrayBlockingQueue3 class and created a test case that attempts to reproduce 
> the scenario you described. I've particularly focused on the potential 
> "barging" behavior you mentioned.
> 
> Following Archie's recommendation, I've added the following implementation to 
> ArrayBlockingQueue3:
> 
> private final HashSet<Thread> puttingThreads = new HashSet<>();
> 
> public void put(E e) throws InterruptedException {
>     Objects.requireNonNull(e);
>     final ReentrantLock lock = this.lock;
>     lock.lockInterruptibly();
>     final boolean added = puttingThreads.add(Thread.currentThread());
>     try {
>         while (count == items.length) {
>             notFull.await();
>         }
>         enqueue(e);
>     } finally {
>         if (added)
>             puttingThreads.remove(Thread.currentThread());
>         lock.unlock();
>     }
> }
> 
> public boolean isPutting(Thread thread) {
>     final ReentrantLock lock = this.lock;
>     lock.lock();
>     try {
>         return puttingThreads.contains(thread);
>     } finally {
>         lock.unlock();
>     }
> }
> Using this modified implementation, I've written a test case that attempts to 
> simulate the scenario. I've attached the full test code to this email for 
> your reference.
> 
> 
> 
> --------------------------------------------------------
> 
> The issue arises in the brief moment between when the thread performing the 
> take() decreases the count and just before it sends the signal, during which 
> another thread calling put() can acquire the lock via 
> lock.lockInterruptibly(). I understand that the waiting spaces for 
> Condition.await() and the ReentrantLock are separate, which contributes to 
> this subtle timing problem. When a thread is signaled in Condition.await(), 
> it is transferred from the Condition's waiting queue to the lock queue, where 
> it must reacquire the lock before proceeding.
> 
> To better understand the timing of this issue, it may be helpful to review 
> the dequeue method inside the queue.take()method
> 
> private E dequeue() {
>     // assert lock.isHeldByCurrentThread();
>     // assert lock.getHoldCount() == 1;
>     // assert items[takeIndex] != null;
>     final Object[] items = this.items;
>     @SuppressWarnings("unchecked")
>     E e = (E) items[takeIndex];
>     items[takeIndex] = null;
>     if (++takeIndex == items.length) takeIndex = 0;
>     count--;
>     if (itrs != null)
>         itrs.elementDequeued();
>     notFull.signal();
>     return e;
> }
> --------------------------------------------------------
> Test Code
> 
> package org.main;
> 
> import static org.junit.jupiter.api.Assertions.assertEquals;
> 
> import java.util.ArrayList;
> import java.util.List;
> import org.example.ArrayBlockingQueue3;
> import org.junit.jupiter.api.Test;
> 
> class ThreadStarvationTest {
> 
>     @Test
>     void test() throws InterruptedException {
>         // Set the capacity of the queue
>         int CAPACITY = 10;
>         // Create an instance of ArrayBlockingQueue3 with fairness mode 
> enabled
>         ArrayBlockingQueue3<Integer> queue = new 
> ArrayBlockingQueue3<>(CAPACITY, true);
>         
>         // Preload the queue to its capacity
>         int PRELOAD_VALUE = -1;
>         for (int i = 0; i < CAPACITY; i++) {
>             queue.add(PRELOAD_VALUE);
>         }
> 
>         // Create threads that are already waiting to put elements
>         int ALREADY_PUT_COUNT = 10;
>         int ALREADY_PUT_NUMBER = 10;
>         for (int i = 0; i < ALREADY_PUT_COUNT; i++) {
>             Thread thread = new Thread(() -> {
>                 try {
>                     // put() will block because the queue is full
>                     // This thread simulates T2, the thread trying to put an 
> element while the queue is full.
>                     queue.put(ALREADY_PUT_NUMBER);
>                 } catch (InterruptedException e) {
>                     // ignore
>                 }
>             });
>             thread.start();
> 
>             // Wait until the thread is actually trying to put
>             while (!queue.isPutting(thread)) ;
>         }
> 
>         // Give time for all threads to enter waiting state
>         Thread.sleep(2000);
> 
>         // Set up new producer threads
>         final int PRODUCER_PUT_NUMBER = -999;
>         final int PRODUCER_COUNT = 1000;
> 
>         final Runnable producingJob = () -> {
>             try {
>                 // T3
>                 queue.put(PRODUCER_PUT_NUMBER);
>             } catch (InterruptedException e) {
>                 // ignore
>             }
>         };
>         // Thread to start new producer threads when consumption begins
>         Thread produceWhenConsumingThread = new Thread(() -> {
>             for (int i = 0; i < PRODUCER_COUNT; i++) {
>                 Thread thd = new Thread(producingJob);
>                 thd.start();
>             }
>         });
>         
>         ArrayList<Integer> result = new ArrayList<>();
> 
>         // Start new producer threads simultaneously with consumption
>         produceWhenConsumingThread.start();
> 
>         // Consume elements from the queue
>         for (int i = 0; i < CAPACITY + ALREADY_PUT_COUNT; i++) {
>             // Can be T1
>             Integer take = queue.take();
>             result.add(take);
>         }
> 
>         // Expected result
> 
>         List<Integer> expected = new ArrayList<>();
>         // First 10 elements should be -1 (preloaded values)
>         for (int i = 0; i < CAPACITY; i++) {
>             expected.add(PRELOAD_VALUE);
>         }
>         // Next 10 elements should be 10 (from already waiting threads)
>         for (int i = 0; i < ALREADY_PUT_COUNT; i++) {
>             expected.add(ALREADY_PUT_NUMBER);
>         }
> 
>         // Assert that the actual result matches the expected result
>         assertEquals(expected, result);
>     }
> 
> }
> 
> Expected :[-1, -1, -1, -1, -1, -1, -1, -1, -1, -1, 10, 10, 10, 10, 10, 10, 
> 10, 10, 10, 10]
> Actual   :[-1, -1, -1, -1, -1, -1, -1, -1, -1, -1, 10, 10, 10, 10, -999, 
> -999, 10, -999, -999, 10]
> The test aims to recreate the following sequence:
> 
> The queue is full, and T2 is executing put() and is waiting in 
> Condition.await().
> T1 calls queue.take(), removes an item from the queue, and just before 
> signaling, T1 decreases the count. At this point, the queue is no longer full 
> (count != items.length).
> T3 acquires the lock via lock.lockInterruptibly().
> T1 then sends the Condition.signal(), and T2 is still waiting to acquire the 
> lock via lock.lockInterruptibly().
> T3 successfully enqueues an item because the while (count == items.length) 
> condition is not satisfied. T3 then increases the count again, making the 
> queue full (count == items.length).
> T2 finally acquires the lock via lock.lockInterruptibly(), but since the 
> queue is full again, T2 hits the while(count == items.length) condition and 
> goes back to Condition.await().
> This sequence illustrates a potential race condition where T3 acquires the 
> lock just after T1 decreases the count but before T1 sends the signal. Even 
> though T2 is waiting in Condition.await() and receives the signal, I 
> understand that the waiting spaces for Condition.await() and ReentrantLock 
> are separate. Because of this, I believe that T3 can still acquire the lock 
> while T2 is in await(). As a result, T3 can proceed, enqueue an item, and 
> refill the queue, causing T2, once it acquires the lock, to find the queue 
> full again. This race condition seems possible even with a fair lock due to 
> the subtle timing gap between when T1 lowers the count and when it sends the 
> signal, which would force T2 to re-enter the await() state.
> 
> Since this test involves very subtle timing issues, I found it challenging to 
> consistently reproduce it using only three threads. Therefore, my thought was 
> that threads that had previously called put() and were waiting should be 
> guaranteed to enter before subsequent threads that come in during 
> consumption. I've attempted to design the test with this intention.
> 
> I would be incredibly grateful for your insights on whether this test 
> accurately represents the scenario you described. I'm particularly interested 
> in your opinion on whether it might help identify any unexpected behavior.
> 
> There's a possibility that this situation might not correspond to the 
> unbounded unfairness you mentioned. I'm unsure whether this is the intended 
> implementation or if there might be an issue here. Your expertise on this 
> matter would be invaluable. Because it occurs only in extremely rare and 
> extreme cases.
> 
> If you need any clarification or if you think the test should be modified in 
> any way, please don't hesitate to let me know. I'm more than happy to make 
> any necessary adjustments based on your feedback.
> 
> Thank you once again for your time and patience in helping me understand this 
> complex issue.
> 
> Best regards,
> 
> Kim Minju
> 
> 
>> 2024. 9. 6. 오후 8:47, Viktor Klang <viktor.kl...@oracle.com> 작성:
>> 
>> Hi Kim,
>> 
>> The recent updated to AQS reacquisition has to do with behavior if for some 
>> reason there's an exception thrown (think SOE or OOM, or something like 
>> that), so it isn't really applicable in this case.
>> 
>> The queue is full, and T2 is executing put() and is waiting in 
>> Condition.await().
>> T1 calls queue.take(), removes an item from the queue, and is about to send 
>> a signal()
>> T3 is about to call put() and is just before lock.lockInterruptibly().
>> T1 decreases the count and sends a signal(), indicating that space is 
>> available in the queue.
>> T3 acquires the lock via lock.lockInterruptibly(), successfully enqueues an 
>> item because the count condition is satisfied, and releases the lock.
>> T2 receives the signal and wakes up, but since T3 already enqueued an item, 
>> the count has increased, and T2 must wait again in await().
>> 
>> I've re-read ReentrantLock and AQS, and from my understanding on the logic 
>> the Condition's place in the wait queue should be maintained, which means 
>> that T3 shouldn't be able to "barge". (tryLock() is documented to allow 
>> barging)
>> 
>> Let us know if you can come up with a reproducer which says otherwise. 👍
>> 
>> Cheers,
>> √
>> 
>> 
>> Viktor Klang
>> Software Architect, Java Platform Group
>> Oracle
>> 
>> From: 김민주 <miiiinj...@gmail.com>
>> Sent: Friday, 6 September 2024 04:43
>> To: Viktor Klang <viktor.kl...@oracle.com>
>> Cc: Archie Cobbs <archie.co...@gmail.com>; Daniel FUCHS 
>> <daniel.fu...@oracle.com>; core-libs-dev@openjdk.org 
>> <core-libs-dev@openjdk.org>
>> Subject: Re: [External] : Re: [POTENTIAL BUG] Potential FIFO violation in 
>> BlockingQueue under high contention and suggestion for fair mode in 
>> ArrayBlockingQueue and LinkedBlockingQueue
>>  
>> Hi Archie,
>> Thanks to your valuable suggestions, I was able to come up with a much more 
>> appropriate test, and I’ve learned a great deal in the process. I truly 
>> appreciate your insights! While this approach is clearly a significant 
>> improvement over the previous one, I still feel there might be concerns 
>> about the atomicity between notFull.await() and the signaling from outside, 
>> but I can’t deny that this new approach provides much better guarantees. 
>> Your feedback has been incredibly helpful, and I’m very grateful for your 
>> time and effort. Thank you again!
>> 
>> 
>> 
>> 
>> Hi Viktor,
>> Thank you for sharing your thoughts, which have given me much to consider. I 
>> have a follow-up question regarding the improvements in AQS that you 
>> mentioned. Specifically, I’d like to clarify whether these improvements 
>> ensure that, in the fair mode of ReentrantLock, threads waiting on a 
>> Condition are placed back in the queue without acquiring the lock, or if the 
>> signaling thread can now immediately acquire the lock after being signaled.
>> Initially, my concern was that a thread receiving a signal might still face 
>> starvation if another thread calls put() and acquires the lock before the 
>> signaled thread can do so. Here's an example scenario:
>> The queue is full, and T2 is executing put() and is waiting in 
>> Condition.await().
>> T1 calls queue.take(), removes an item from the queue, and is about to send 
>> a signal()
>> T3 is about to call put() and is just before lock.lockInterruptibly().
>> T1 decreases the count and sends a signal(), indicating that space is 
>> available in the queue.
>> T3 acquires the lock via lock.lockInterruptibly(), successfully enqueues an 
>> item because the count condition is satisfied, and releases the lock.
>> T2 receives the signal and wakes up, but since T3 already enqueued an item, 
>> the count has increased, and T2 must wait again in await().
>> It seems to me that this scenario could occur regardless of whether 
>> ReentrantLock is in fair mode or not. Has the improvement in AQS addressed 
>> this type of contention scenario to prevent such starvation issues, or is 
>> this still a possible outcome?
>> Your insights into "unbounded unfairness" have also provided me with a lot 
>> of food for thought, and I’m grateful for the opportunity to reflect on 
>> these issues. 
>> In your view, would the thread contention scenario I’ve described fall under 
>> the category of unbounded unfairness, or would it be considered an 
>> acceptable level of contention?
>> 
>> Once again, thank you for all the knowledge and understanding I've gained 
>> through your feedback. I'm truly grateful for your time and expertise.
>> 
>> 
>> Best regards,
>> Kim Minju
>> 
>> 
>> 2024년 9월 6일 (금) 오전 4:52, Viktor Klang <viktor.kl...@oracle.com 
>> <mailto:viktor.kl...@oracle.com>>님이 작성:
>> Archie,
>> 
>> I should've been more specific—Condition-as-implemented-by-ReentrantLock (in 
>> fair mode) provides stronger (for some definition of stronger) semantics 
>> that the Condition interface specifies.
>> 
>> Since it's related, I've recently integrated a hardening of AQS and AQLS 
>> reacquisition logic in await().
>> 
>> Given what you presented earlier about the detection of "producer parked" 
>> it's likely that the conclusion is that ABQ works as expected.
>> 
>> Cheers,
>> √
>> 
>> 
>> Viktor Klang
>> Software Architect, Java Platform Group
>> Oracle
>> From: Archie Cobbs <archie.co...@gmail.com <mailto:archie.co...@gmail.com>>
>> Sent: Thursday, 5 September 2024 21:23
>> To: Viktor Klang <viktor.kl...@oracle.com <mailto:viktor.kl...@oracle.com>>
>> Cc: 김민주 <miiiinj...@gmail.com <mailto:miiiinj...@gmail.com>>; Daniel FUCHS 
>> <daniel.fu...@oracle.com <mailto:daniel.fu...@oracle.com>>; 
>> core-libs-dev@openjdk.org <mailto:core-libs-dev@openjdk.org> 
>> <core-libs-dev@openjdk.org <mailto:core-libs-dev@openjdk.org>>
>> Subject: Re: [External] : Re: [POTENTIAL BUG] Potential FIFO violation in 
>> BlockingQueue under high contention and suggestion for fair mode in 
>> ArrayBlockingQueue and LinkedBlockingQueue
>>  
>> Apologies in advance if I'm misunderstanding anything...
>> 
>> On Thu, Sep 5, 2024 at 2:05 PM Viktor Klang <viktor.kl...@oracle.com 
>> <mailto:viktor.kl...@oracle.com>> wrote:
>>  Thread state polling aside, for as long as Condition::await() is allowed to 
>> spuriously wake, FIFO just cannot be "guaranteed".
>>  
>> What about this statement in the Javadoc for ReentrantLock.newCondition():
>> 
>> The ordering of lock reacquisition for threads returning from waiting 
>> methods is the same as for threads initially acquiring the lock, which is in 
>> the default case not specified, but for fair locks favors those threads that 
>> have been waiting the longest.
>> 
>> So what you're saying is that a spurious wakeup on a Condition is not the 
>> same thing as a spurious signal() on a Condition; if it were, then the above 
>> statement would apply and FIFO ordering would be preserved.
>> 
>> Of course, a spurious wakeup would not find the condition being waited on 
>> satisfied unless there was a big coincidence. So an ordering violation that 
>> actually mattered should be exceedingly rare.
>> 
>> Anyway, this does seem to be a glitch in how things are supposed to work. 
>> That is: there can be no guaranteed ordering for Condition waiters when 
>> there can be spurious wakeups.
>> 
>> Maybe this corner case should be documented. Or better yet, fix the bug by 
>> requiring Condition to "filter out" spurious wakeups if preserving FIFO 
>> ordering (it should be possible).
>> 
>> -Archie
>> 
>> --
>> Archie L. Cobbs
> 

Reply via email to