Java Broker Design - Flow to Disk has been edited by Martin Ritchie (Feb 01, 2009).

(View changes)

Content:

Flow to Disk Design

Overview

Currently, the Java Broker can do one of two things with a message it has to deliver:

  1. Keep transient messages in memory until delivered
  2. Write persistent messages to a message store (like BDB) and keep in memory until delivery complete or memory is full.

This means that the broker is not able to avoid OoM exceptions i.e. send enough messages to the broker, especially if your consumers are not active, and you could bring the broker down once it explodes its available heap.

This page pulls together the ideas from QPID-949.

Other Implementations

Active MQ use the idea of a message cursor and have a number of different policies for performing 'Message Spooling' : Message Cursors.

Current Functionality

Currently the broker treats persistent and transient messages differently. Persistent messages are written to disk as they are received and handles created as WeakReferenceMessageHandles. This means that when an Out of Memory(OoM) condition occurs then all the persistent message handles are GC'd. Performance hits the floor as all messages at the front of the queues must be read from disk whilst new messages are kept in memory but at the back of the queue.

Transient messages are created as InMemoryMessageHandles and so cannot be purged from memory. When an OoM condition occurs the broker cannot recover.

Design

There are areas of the broker that are in need of improvement that could be affected by this implementation:

  1. MessageStore : Currently only persistent msgs are written here along with transactional data. The MS should become a TransactionLog so messages should not be retrieved from here for normal operation.
  2. Message Reference Counting : To minimise message data duplication references are used to record how many queues the message has been enqueued on. This is currently maintained by the message but has been a large cause of runtime problems. If the TransactionLog maintains a list of Message,Queue Tuples then we can remove the error prone reference count integer that is currently used.

Approach Overview

The approach here is to reduce the overall complexity of the broker so that it is in easier to reason about smaller chunks. Focusing at the level of a Queues would make life easier as we move towards AMQP 1-0. To facilitate this we should replace our optimisation of only having a single copy of the Message with a copy per Queue. This will allow the Queues to better reason and act upon their memory usage. The type of message transient/persistent should

Approach Summary

  1. Remove shared state from the AMQMessage class, and move everything into QueueEntries (this allows for a message to be flowed to disk on one queue while staying in memory on another).
  2. Break apart the MessageStore interface creating a new TransacitonLog interface that covers only the logging of the durable transactional events (message data arriving, enqueues, dequeues).
  3. Move reference counting into the TransactionLog
    At this point we will have removed our current (fragile) flow-to-disk capabilities on persistent messages... and all messages will be held in memory while live
  4. Add a new properties to queues to keep track of memory usage.
  5. Create QueueBacking to enable storage and retrieval of flowed messages.
  6. Update QueueEntries / AMQMessage to use QueueBacking for, disk to disk
  7. Add capabilities to queues to shrink their in memory profile by flowing queue-entries to disk (from the tail upward) until they are under a given notional memory usage.
    This would have a performance impact if there were two slow consumers as potentially one queue could purge the data that the second queue needs. However, this can be addressed in Phase 2.
  8. Add check on message enqueue to ensure queue size does not grow beyond defined limit. Mark queue as flowed to disk when that occurs. Immediately flowing new messages on that queue.
  9. Add check on message send to potentially start an Inhale thread to restore() flowed messages.
  10. Add properties to QueueDeclare for flow to disk control extensions as defined in C++ broker.

Future Enhancments

  1. Enable the flow to disk of the queue structure. This will remove the final constraints on memory and only limit the broker to the amount of disk space available.

Approach Details

To highlight the changes that will be required lets look at the processing that is performed on an incoming message:

AMQChannel

When a persistent message is received the headers and chunks are recorded in the new TransactionLog:

+- TransactionLog -+
| createQueue      |
| createExchange   |
| createBinding    |
| deleteQueue      |
| deleteExchange   |
| deleteBinding    |
|                  |
| getNewMessageID  |
|                  |
| enqueue          |
| dequeue          |
| storeHeader      |
| storeChunk       |
|                  |
| startTransaction |
| commit           |
| abort            |
+------------------+

The TransactionLog is a distilled version of the current MessageStore interfaces. The log is the persistent record of the state of the broker. On start up this log is used to restore the the routing and message states. It is not to be used as a lookup mechanism, the queue's must now be responsible for remembering all the enqueue messages and not rely on the previous MessageStore. As no random access to the log file is needed it can be implemented as a write ahead log. It can periodically cleanup the old state by writing a new log but as it its primary function is to ensure state is persisted to disk it need not maintain maps of the data thus simplifying its implementation. The responsibility for remembering the message data is delegated to the Queue. The TransactionLog shall absorb the current reference counting code and be responsible for deciding when to recoverably delete a message. Currently the reference counting is still spread across a number of different classes and has a couple of serious problems. The TransactionLog will record a series of Queue/Message tuples so that it can pair enqueue/dequeue calls. When there are no more references to the message then it can safely know that the message is no longer needed. By using a list of tuples rather than an integer count the TransactionLog is capable of safely interleaving transactions as there is no shared count value.

AMQQueue/QueueEntry

When a message has been fully received it is then routed to the required Queues. At this point a change to the existing broker is required. A copy of the message shall be placed on each queue rather than a reference to a single data object. This approach not only allows the TransactionLog to be responsible for the ultimate deletion of persistent message data but it allows the Queue to reason more clearly about its memory usage.

For this to occur the existing model needs to be updated. The *Handle objects we currently have need to be merged in to AMQMessage and all the state about the message needs to be moved to the QueueEntry. This will allow us to null the AMQMessage reference as and when the message is flowed to disk. The QueueEntry interface will be augmented to allow the Queue to flow the data when required.

+-QueueEntry-+
| flow       |
| isFlowed   |
| recover    |
+------------+

Our existing Queue needs to be updated to be able to record the additional state of the QueueEntry_s. Currently we have _queueCount and queueSize that represent the count and data size used by the queue. The Queue needs to have additional queueMaxMemory, queueMinMemory, queueInMemory and isFlowed added. It is proposed that only the data size is used for flow to disk calculations as counting messages will not give us the control that we need over memory usage.

These new variables will be used to control two new threads Inhaler and Purger.

Purger

When queueMaxMemory is reached the queue is set to flow and all new messages on to the queue are sent straight to disk. As messages are sent to a subscriber there are a couple of possibilities when the queue is in a flowed state:

  1. The messages are also flowed to disk.
  2. A number or percentage of the queueMaxMemory could be kept to handle rollbacks.

Using the first mechanism we do not need to have a Purger thread for the simple queue case. However for the second case and to handle queues where the position of the incoming message is not known then the Purger thread will be required. The Purger simply needs to start at the front of the queue and record the amount of data still in memory on the Queue when queueMaxMemory is reached all subsequent messages are flowed.

Inhaler

The Inhaler is an optimisation to ensure that the broker returns to peek performance after a flow to disk event. Lazily loading messages on demand would be quite slow; so on delivery to a subscriber a check can be performed to see if the current queueInMemory is less than the queueMinMemory which would indicate that there is room to reload older messages. The Inhaler can then begin to load messages from disk until queueMaxMemory has been reached or all messages are back in memory. If all the messages are back in memory then the queue flow state can be reset allowing incoming messages to stay in memory. NOTE: as there are no locks a second check by the Inhaler is required to ensure a message was not flowed between the last load and the change of queue state.

The updates to delivery to the queue and to the subscriber are expected to be updated in the following ways:

Pseudo-Code - Delivery to Queue
while (message in queue)
  subscriber.deliver(message)

  if (flowed)
    flowToDisk(message)

  if (_queueInMemory < _queueMinMemory)
    startInhaler
Pseudo-Code - Delivery to Subscriber
  addToQueue(message)

  if (flowed)
    flowToDisk(message)
  else
    if (_queueInMemory > _queueMaxMemory)
      setFlowed
      startPurger

The additional overhead of checking state is done after the message deliveries have been performed and are simple calculations compared to the existing message flow paths. As a result the non-flowed state performance should not be affected.

QueueBacking

The flow to disk implementation will be performed on a queue by queue basis so a new QueueBacking will be created to handle the flowing of messages to and from disk.

+-QueueBacking-+
| flow         |
| recover      |
| delete       |
+--------------+

When a message is dequeue then it must also be removed from QueueBacking.
NOTE: care must be taken here for the NO_ACK mode as the dequeue is performed before delivery so the message must be in memory before that occurs or the data will be lost.

--------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscr...@qpid.apache.org

Reply via email to