Java Broker Design - Flow to Disk has been edited by Martin Ritchie (Feb 02, 2009).

Change summary:

Update to remove suggestion of byte array copies on message delivery.

(View changes)

Content:

Flow to Disk Design

Overview

Currently, the Java Broker can do one of two things with a message it has to deliver:

  1. Keep transient messages in memory until delivered
  2. Write persistent messages to a message store (like BDB) and keep in memory until delivery complete or memory is full.

This means that the broker is not able to avoid OoM exceptions i.e. send enough messages to the broker, especially if your consumers are not active, and you could bring the broker down once it explodes its available heap.

This page pulls together the ideas from QPID-949.

Other Implementations

Active MQ use the idea of a message cursor and have a number of different policies for performing 'Message Spooling' : Message Cursors.

Current Functionality

Currently the broker treats persistent and transient messages differently. Persistent messages are written to disk as they are received and handles created as WeakReferenceMessageHandles. This means that when an Out of Memory(OoM) condition occurs then all the persistent message handles are GC'd. Performance hits the floor as all messages at the front of the queues must be read from disk whilst new messages are kept in memory but at the back of the queue.

Transient messages are created as InMemoryMessageHandles and so cannot be purged from memory. When an OoM condition occurs the broker cannot recover.

Design

There are areas of the broker that are in need of improvement that could be affected by this implementation:

  1. MessageStore : Currently only persistent msgs are written here along with transactional data. The MS should become a TransactionLog_ so messages should not be retrieved from here for normal operation.
  2. Message Reference Counting : To minimise message data duplication references are used to record how many queues the message has been enqueued on. This is currently maintained by the message but has been a large cause of runtime problems. If the TransactionLog maintains a list of Message,Queue Tuples then we can remove the error prone reference count integer that is currently used.

Approach Overview

The approach here is to reduce the overall complexity of the broker so that it is in easier to reason about smaller chunks. Focusing at the level of a Queues would make life easier as we move towards AMQP 1-0. To facilitate this we should replace our optimisation of only having a single copy of the Message with a copy per Queue. This will allow the Queues to better reason and act upon their memory usage. The type of message transient/persistent should be handled the same by the queue flowing both to disk if required.

Approach Summary

  1. Remove shared state from the AMQMessage class, and move everything into QueueEntry s (this allows for a message to be flowed to disk on one queue while staying in memory on another).
  2. Break apart the MessageStore interface creating a new TransacitonLog interface that covers only the logging of the durable transactional events (message data arriving, enqueues, dequeues).
  3. Move reference counting into the TransactionLog
    At this point we will have removed our current (fragile) flow-to-disk capabilities on persistent messages... and all messages will be held in memory while live
  4. Add a new properties to queues to keep track of memory usage.
  5. Create QueueBacking to enable storage and retrieval of flowed messages.
  6. Update QueueEntries / AMQMessage to use QueueBacking for, disk to disk
  7. Add capabilities to queues to shrink their in memory profile by flowing queue-entries to disk (from the tail upward) until they are under a given notional memory usage.
  8. Add check on message enqueue to ensure queue size does not grow beyond defined limit. Mark queue as flowed to disk when that occurs. Immediately flowing new messages on that queue, and potentially starting a Purger thread.
  9. Add check on message send to potentially start an Inhale thread to restore flowed messages.
  10. Add properties to QueueDeclare for flow to disk control extensions as defined in C++ broker.

Future Enhancements

  1. Enable the flow to disk of the queue structure. This will remove the final constraints on memory and only limit the broker to the amount of disk space available.

Approach Details

To highlight the changes that will be required lets look at the processing that is performed on an incoming message:

AMQChannel

When a persistent message is received the headers and chunks are recorded in the new TransactionLog, the remainder of the current MessageStore will be moved to a new RoutingTable interface.

+- TransactionLog -+
| enqueue          |
| dequeue          |
| storeHeader      |
| storeChunk       |
|                  |
| startTransaction |
| commit           |
| abort            |
+------------------+

+--RoutingTable--+
| createQueue    |
| createExchange |
| createBinding  |
|                |
| deleteQueue    |
| deleteExchange |
| deleteBinding  |
+----------------+

The TransactionLog is a distilled version of the current MessageStore interfaces. The log is the persistent record of the state of the broker. On start up this log is used to restore the the routing and message states. It is not to be used as a lookup mechanism, the queue's must now be responsible for remembering all the enqueue messages and not rely on the previous MessageStore. As no random access to the log file is needed it can be implemented as a write ahead log. It can periodically cleanup the old state by writing a new log but as it its primary function is to ensure state is persisted to disk it need not maintain maps of the data thus simplifying its implementation. The responsibility for remembering the message data is delegated to the Queue. The TransactionLog shall absorb the current reference counting code and be responsible for deciding when to recoverably delete a message. Currently the reference counting is still spread across a number of different classes and has a couple of serious problems. The TransactionLog will record a series of Queue/Message tuples so that it can pair enqueue/dequeue calls. When there are no more references to the message then it can safely know that the message is no longer needed. By using a list of tuples rather than an integer count the TransactionLog is capable of safely interleaving transactions as there is no shared count value.

TransactionLog Recovery

Currently the MessasgeStore is responsible for providing unique MessageIDs, this is not strictly part of a TransactionLog as a result it would not make sense to include it in the interface. What is recommended is that we unify our message creation as part of removing the *MessageHandle objects. Messages recovered directly from the store currently create the *MessageHandle directly with a Message ID; while message delivered via the wire ask the MessageStore for an ID before creating an IncommingMessage which in turn creates the *MessageHandle. As we will be removing the *MessageHandle objects it makes sense to unify our message creation through a Factory MessageFactory. This will allow the factory to be responsible for the sequence of IDs. When recovery is in progress a call to createMessage(id) will take place and the factory need only:

  1. ensure the id is unique
  2. record the highest value seen to seed its sequence of IDs handed out by createMessage()
    +--MessageFactory---+
    | createMessage(id) |
    | createMessage()   |
    +-------------------+
    

AMQQueue/QueueEntry

When a message has been fully received it is then routed to the required Queues as before. Only persistent messages that are routed to persistent queues are written to the TransactionLog which is then responsible for the ultimate deletion of persistent message data.

For this to occur the existing model needs to be updated. The *Handle objects we currently have need to be merged in to AMQMessage and all the state about the message needs to be moved to the QueueEntry. This will allow us to null the AMQMessage reference as and when the message is flowed to disk. The QueueEntry interface will be augmented to allow the Queue to flow the data when required. When the data is recovered then no attempt is made to restore the single instance of the message. i.e. If a single message is sent to 10 queues initially there will be one AMQMessage and one copy of the data. When a queue is flowed then it will lose the reference to that message so on recover a new message with a copy of the data will be created soley for that queues use.

+-QueueEntry-+
| flow       |
| isFlowed   |
| recover    |
+------------+

Our existing Queue needs to be updated to be able to record the additional state of the QueueEntry s. Currently we have queueCount and queueSize that represent the count and data size used by the queue. The Queue needs to have additional queueMaxMemory, queueMinMemory, queueInMemory and isFlowed added. It is proposed that only the data size is used for flow to disk calculations as counting messages will not give us the control that we need over memory usage.

These new variables will be used to control two new threads Inhaler and Purger.

Purger

When queueMaxMemory is reached the queue is set to flow and all new messages on to the queue are sent straight to disk. As messages are sent to a subscriber there are a couple of possibilities when the queue is in a flowed state:

  1. The messages are also flowed to disk.
  2. A number or percentage of the queueMaxMemory could be kept to handle rollbacks.

Using the first mechanism we do not need to have a Purger thread for the simple queue case. However for the second case and to handle queues where the position of the incoming message is not known then the Purger thread will be required. The Purger simply needs to start at the front of the queue and record the amount of data still in memory on the Queue when queueMaxMemory is reached all subsequent messages are flowed.

Inhaler

The Inhaler is an optimisation to ensure that the broker returns to peek performance after a flow to disk event. Lazily loading messages on demand would be quite slow; so on delivery to a subscriber a check can be performed to see if the current queueInMemory is less than the queueMinMemory which would indicate that there is room to reload older messages. The Inhaler can then begin to load messages from disk until queueMaxMemory has been reached or all messages are back in memory. If all the messages are back in memory then the queue flow state can be reset allowing incoming messages to stay in memory.
NOTE: as there are no locks a second check by the Inhaler is required to ensure a message was not flowed between the last load and the change of queue state.

The updates to delivery to the queue and to the subscriber are expected to be updated in the following ways:

Pseudo-Code - Delivery to Subscriber
while (message in queue)
  subscriber.deliver(message)

  if (flowed)
    flowToDisk(message)

  if (_queueInMemory < _queueMinMemory)
    startInhaler
Pseudo-Code - Delivery to Queue
  addToQueue(message)

  if (flowed)
    flowToDisk(message)
  else
    if (_queueInMemory > _queueMaxMemory)
      setFlowed
      startPurger

The additional overhead of checking state is done after the message deliveries have been performed and are simple calculations compared to the existing message flow paths. As a result the non-flowed state performance should not be affected.

QueueBacking

The flow to disk implementation will be performed on a queue by queue basis so a new QueueBacking will be created to handle the flowing of messages to and from disk.

+-QueueBacking-+
| flow         |
| recover      |
| delete       |
+--------------+

When a message is dequeue then it must also be removed from QueueBacking.
NOTE: care must be taken here for the NO_ACK mode as the dequeue is performed before delivery so the message must be in memory before that occurs or the data will be lost.

--------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscr...@qpid.apache.org

Reply via email to