Qpid Design - Message DeliveryPage added by Aidan SkinnerAsynchronous DeliveryIf there are no subscriptions that can currently take delivery of a message then we need to schedule an asynchronous delivery. While the code is thread safe and could cope with multiple threads performing asynchronous delivery simultaneously, we limit ourselves to only having one asynchronous delivery job scheduled at any one time, so as not to overwhelm the broker:
public void deliverAsync()
{
_stateChangeCount.incrementAndGet(); Runner runner = new Runner(); if (_asynchronousRunner.compareAndSet(null, runner)) { _asyncDelivery.execute(runner); } } Here we first increment our count of "stateChanges". This provides us with a way of knowing between loops of the asynchronous delivery thread whether anything else has happened that makes it worth our while running the asynchronous delivery loop again (in effect it prevents us having to always add another thread to cope with race conditions where we want to start the async delivery just as it is ending). We then create a new instance of the asynchronous delivery "Runner", and attempt to make this instance the current one by means of the ubiquitous compare-and-swap operation. Here we test if we are the thread that moved the queue from having no asynchronous runner to having one; and if so we need to schedule the runner to execute by way of calling _asyncDelivery.execute(runner). The actual work of the asynchronous delivery is done in the processQueue(Runnable runner) method. private void processQueue(Runnable runner) throws AMQException { long stateChangeCount; long previousStateChangeCount = Long.MIN_VALUE; boolean deliveryIncomplete = true; int extraLoops = 1; int deliveries = MAX_ASYNC_DELIVERIES; _asynchronousRunner.compareAndSet(runner, null); while (deliveries != 0 && ((previousStateChangeCount != (stateChangeCount = _stateChangeCount.get())) || deliveryIncomplete) && _asynchronousRunner.compareAndSet(null, runner)) { // we want to have one extra loop after every subscription has reached the point where it cannot move // further, just in case the advance of one subscription in the last loop allows a different subscription to // move forward in the next iteration if (previousStateChangeCount != stateChangeCount) { extraLoops = 1; } previousStateChangeCount = stateChangeCount; deliveryIncomplete = _subscriptionList.size() != 0; boolean done = true; In this first fragment of the method we see the constraint on how long the asynchronous delivery will keep attempting to deliver more messages. The first constraint "deliveries != 0" is testing a countdown value "deliveries" which is intialised with an initial maximum (currently set to 10): every successful delivery the thread makes decrements this counter. This implements a limit on how long the processQueue method will be allowed to run for, stopping this queue from starving other queues of processor time. At the end, if this countdown was the factor to cause the loop to terminate, the asynchronous delivery is scheduled to run again. The second constraint "((previousStateChangeCount != (stateChangeCount = _stateChangeCount.get())) || deliveryIncomplete) " is testing whether there is provably nothing left to do on this queue. The first half tests if there have been any changes since the last iteration that have incremented that state change count (and thus require another loop), the second half says, "even if there haven't been any changes keep looping if last time round we thought there was still more to do". The final constraint "_asynchronousRunner.compareAndSet(null, runner))" is our familiar compare-and-swap operation ensuring that this is the designated instance of the asynchronous processer running. This loop runs, attempting to deliver one message in each iteration: SubscriptionList.SubscriptionNodeIterator subscriptionIter = _subscriptionList.iterator(); //iterate over the subscribers and try to advance their pointer while (subscriptionIter.advance()) { boolean closeConsumer = false; Subscription sub = subscriptionIter.getNode().getSubscription(); if (sub != null) { Iterate over the subscriptions on the queue... sub.getSendLock(); Lock the subscription so it does not get deleted while attempting to delvier to it.
try
{
QueueEntry node = moveSubscriptionToNextNode(sub);
Find the next node which the subscription should try to deliver by skipping over already acquired entries, if it is null then this subscription is at the tail of the queue. if (node != null && sub.isActive()) { Keep a track of whether this subscription is really active and whether we managed to advance the pointer on this subscription in this loop (these values go into determining if there is anything left to do in a new loop). boolean advanced = false; boolean subActive = false; if (!(node.isAcquired() || node.isDeleted())) { if (!sub.isSuspended()) { The node is not yet acquired or deleted, and we can now be sure the subscription is active. subActive = true; if (sub.hasInterest(node)) { The following code is similar to that in the deliverToSubscription method described previously. It should be possible to factor this out. The primary difference is the behaviour with a browser where need to explicitly note that we have advanced. if (!sub.wouldSuspend(node)) { if (!sub.isBrowser() && !node.acquire(sub)) { sub.restoreCredit(node); } else { deliverMessage(sub, node); deliveries--; if (sub.isBrowser()) { QueueEntry newNode = _entries.next(node); if (newNode != null) { sub.setLastSeenEntry(node, newNode); node = sub.getLastSeenEntry(); advanced = true; } } } done = false; } else // Not enough Credit for message and wouldSuspend { This case covers the scenario where we are using bytes based flow control, and the currently available credit is less than the size of the next message. We need to wait either for the credit to be increased (which will cause a state change event) or the entry to be picked off by another subscription (which we capture with the state change listener) //QPID-1187 - Treat the subscription as suspended for this message // and wait for the message to be removed to continue delivery. subActive = false; node.addStateChangeListener(new QueueEntryListener(sub, node)); } } else { // this subscription is not interested in this node so we can skip over it QueueEntry newNode = _entries.next(node); if (newNode != null) { sub.setLastSeenEntry(node, newNode); } } } } Here we calculate if there is anything left to do on this particular subscription. If we are at the tail of the subscription, or the subscription is no longer active, then this subscription can be considered done. If all subscriptions are done then we are truly finished. final boolean atTail = (_entries.next(node) == null); done = done && (!subActive || atTail); Here calculate if we need to auto-close the subscription - we do this if we are at the tail of the queue, and we didn't advance in this iteration and this is an auto-close subscription. closeConsumer = (atTail && !advanced && sub.isAutoClose()); } } finally { sub.releaseSendLock(); } if (closeConsumer) { unregisterSubscription(sub); ProtocolOutputConverter converter = sub.getChannel().getProtocolSession().getProtocolOutputConverter(); converter.confirmConsumerAutoClose(sub.getChannel().getChannelId(), sub.getConsumerTag()); } } This ends the iteration over subscriptions, now we calculate if we believe there we should try iterating over the subscriptions again. We use the value of "done" we calculated while iterating over the subscriptions to determine if we need to loop again. If we believe we are done and we have already used our "extra" loop then we can stop. If we are "done" but we have not yet used the extra loop, then we decrement the extra loop counter (setting it to 0 might be clearer since 0 and 1 are the only valid values) and go round one more time. If we are not done then we restore extraLoops to 1. if (done) { if (extraLoops == 0) { deliveryIncomplete = false; } else { extraLoops--; } } else { extraLoops = 1; } } _asynchronousRunner.set(null); } This ends the the "while(..." loop. The final action in the asynchronous process is to determine if we need to schedule ourselves for another execution: // If deliveries == 0 then the limitting factor was the time-slicing rather than available messages or credit // therefore we should schedule this runner again (unless someone beats us to it :-) ). if (deliveries == 0 && _asynchronousRunner.compareAndSet(null, runner)) { _asyncDelivery.execute(runner); } } SubscriptionsSubscription model the entities created by the receiving of a "Basic.Consume" event in AMQP0-8/0-9. That is they represent a relationship between an AMQP Channel (equivalent to a Java JMS Session) and a queue. As messages are placed on the queue, the queue takes responsibility for as quickly as possible finding a subscriber which is willing to take the message. The subscriber is responsible for delivering the message to the receiving client. As outlined above, a significant change introduced by the refactoring is that the Subscriptions now maintain state representing a pointer into the queue. This pointer represents the current position where the subscription can guarantee that no message prior to that is of interest to it. Generally this pointer only ever moves forward through the queue (see the section on reject and release for the exception to this rule). This is the only dynamic state maintained directly by the subscription. Different subclasses of SubscriptionImpl are used to model the different behviour associated with different acknowledgement modes. The subclasses used are AckSubscription, NoAckSubscription, BrowserSubscription and GetNoAckSubscription. The last of these is a special implementation which is used to model a Basic.Get command as a temporary subscription that can only ever receive one message. Modelling Get in this way mirrors how the same semantics are implemented in 0-10 and removes having two separate ways to dequeue messages from the queue. When a "Basic.Consume" event is processed the subscription is added to the list of subscriptions on the queue, the "pointer" in the subscription is set to point at the head of the list of queue entries, and then an asynchronous job is kicked off to deliver to that subscription as many messages as can be delivered starting at the head. This uses an algorithm almost identical to that described above to asynchronous message delivery, except it only considers the one subscription. This is found within the "flushSubscription" method of the queue (flushing a subscription is a 0-10 concept where you attempt to send as much as possible to a given subscription and then signal completion when either the subscription's credit runs out or there are no more messages on the queue). Future Improvement: Factor out the common code between flushSubscription and processQueue. RemovalA consumer is removed either through the reception of a "Basic.Cancel" event or through the closure of the encapsulating channel. For thread safety, the first action is to remove the subscription from the list of subscriptions that the asynchronous delivery task iterates over. Next the subscription's close() method is called. This takes out a lock on the subscription (to avoid conflicting with any attempt to concurrently send to the subscription) and changes the subscription's state to "closed". The combination of these steps allow us to assert that after that point in time the subscription will not be used by any other threads to attempt to deliver messages. Next the subscription's pointer into the queue is null-ed out in a thread-safe way - this is done to prevent memory leaks due to references being held to points in the queue (due to the way that the concurrent-safe queues work "deleted" elements may not be eligible for garabage collection for some time). Finally, if the queue is of "auto-delete" type and the subscription being removed is the last subscription attached to the queue, then the queue needs to be deleted. Flow ControlThere are now concrete classes modeling the behaviour of the flow control algorithm. These flow control managers are set at the subscription level. For AMQP 0-8 and 0-9 flow control still happens at a per-channel level, so the same instance of the flow control manager is shared between all subscriptions on a channel. For 0-10 implementations we will be able to use the same code to implement the per-subscription flow-control model that it utilizes. AcknowledgementReject and ReleaseMessages delivered to a subscription may subsequently be returned to the queue either explicitly (by use of a reject command) or implicitly (by the closure of the channel). In this case the message must be made available again to subscribers to the queue. The issue here is that the pointers held by the subscriptions are likely to be in advance of the point to which the message is being returned. Thus for each message that is returned we must iterate over all subscribers to the queue, and if their current pointer is in advance of the returned message it must be moved back such that the next entry that that subscriber sees is the returned message. We do not reset the pointer for browsing consumers however as doing so would lead to all the browsed messages that are after the returned message in the queue being redelivered to the browsing subscription.
Change Notification Preferences
View Online
|
Add Comment
|
- [CONF] Apache Qpid > Qpid Design - Message Delivery confluence