Qpid Design - Queue ImplementationPage edited by Aidan SkinnerStrict OrderingThe fundamental principal of the Queuing model is that the queue provides a strict order on the messages being enqueued. Furthermore that order is maintained through the lifetime of the entries on the queue: thus if a message is returned (e.g. the prefetched messages being released upon the consumer closing) the order of that message with respect to other messages on the queue is maintained. The strict ordering is enforced by the use of a queue data-structure. In order for this to be performant, the data structure uses a lockless thread-safe designed based around the same algorithm used in the java.util.concurrent.ConcurrentLinkedList (more precisely it is based on the public domain implementation in the backport util concurrent project). See the section on Concurrent List implementations for more details. Each subscription keeps a "pointer" into the list denoting the point at which that particular subscription has reached. A particular subscription will only deliver a message if it is the next AVAILABLE entry on the queue after the pointer which it maintains which matches any selection criteria the subscription may have. Thread safety is maintained by using the thread-safe atomic compare-and-swap operations for maintaining queue entry state (as described above) and also for updating the pointer on the subscription. The queue is written so that many threads may be simultaneously attempting to perform deliveries simultaneously on the same messages and/or subscriptions. EnqueingWhen a message is enqueued (using the enqueue() method on the AMQQueue implementation) it is first added to the tail of the list. Then the code iterates over the subscriptions (starting at the last subscription the queue was known to have delivered for reasons of fairness). For each subscription found it attempts delivery (details describe below). If the message cannot be delivered to any subscription then the "immediate" flag on the message is inspected. If the message required immediate delivery then the message is immediately dequeued, otherwise an asynchronous job is created to attempt delivery at a later point. (Note there is a "shortcut" path for queues which have an exclusive subscriber. In this case we know there is one and only one subscriber and so we can go directly to trying to deliver to it without worrying about iterators, etc.) Potential Issue: Looking at the code which performs the check of the immediate flag I believe there is a race condition:
if (entry.immediateAndNotDelivered())
{
dequeue(storeContext, entry);
entry.dispose(storeContext);
}
Change Notification Preferences
View Online
|
View Change
|
Add Comment
|
- [CONF] Apache Qpid > Qpid Design - Queue Implementation confluence
- [CONF] Apache Qpid > Qpid Design - Queue Implementation confluence
- [CONF] Apache Qpid > Qpid Design - Queue Implementation confluence
- [CONF] Apache Qpid > Qpid Design - Queue Implementation confluence
- [CONF] Apache Qpid > Qpid Design - Queue Implementation confluence
- [CONF] Apache Qpid > Qpid Design - Queue Implementation confluence
- [CONF] Apache Qpid > Qpid Design - Queue Implementation confluence
- [CONF] Apache Qpid > Qpid Design - Queue Implementation confluence
- [CONF] Apache Qpid > Qpid Design - Queue Implementation confluence