Java Broker Design - Flow to Disk has been edited by Martin Ritchie (Jan 26, 2009).

Change summary:

Initial Version

(View changes)

Content:

Flow to Disk Design

Overview

Currently, the Java Broker can do one of two things with a message it has to deliver:

  1. Keep transient messages in memory until delivered
  2. Write persistent messages to a message store (like BDB) and keep in memory until delivery complete or memory is full.

This means that the broker is not able to avoid OoM exceptions i.e. send enough messages to the broker, especially if your consumers are not active, and you could bring the broker down once it explodes its available heap.

This page pulls together the ideas from QPID-949.

Other Implementations

Active MQ use the idea of a message cursor and have a number of different policies for performing 'Message Spooling' : Message Cursors.

Current Functionality

Currently the broker treats persistent and transient messages differently. Persistent messages are written to disk as they are received and handles created as WeakReferenceMessageHandles. This means that when an Out of Memory(OoM) condition occurs then all the persistent message handles are GC'd. Performance hits the floor as all messages at the front of the queues must be read from disk whilst new messages are kept in memory but at the back of the queue.

Transient messages are created as InMemoryMessageHandles and so cannot be purged from memory. When an OoM condition occurs the broker cannot recover.

Design

There are areas of the broker that are in need of improvement that could be affected by this implementation:

  1. MessageStore : Currently only persistent msgs are written here but the store is currently a Transaction Log so needs the two functions split.
  2. Message Reference Counting : To minimise message data duplication references are used to record how many queues the message has been enqueued on. This is currently maintained by the message but really it should be up the the MessageStore when to purge the data so ref counting should be moved there.

The introduction of flow to disk is a large task and potentially has many risk for the existing code base. To minimise any impact the implementation can be spread over two initial phases:

Phase 1

This phase introduces flow to disk for transient messages as there is no mechanism currently for clearing transient messages from memory in an OoM condition. This phase looks most like the ActiveMQ section here. The existing persistent message storage is left untouched but transient messages are augmented to be capable of being sent to disk.
In this phase it is proposed that:

  1. The exisiting MessageStore be renamed TransactionLog and a new MessageStore interface be created solely for storing message content.
    Note: The functionality of the existing MessageStore/TransactionLog would be unaltered. Only transient messages would use the new MessageStore.
  2. Add flow() and restore() methods on QueueEntry[s] will be delegate to AMQMessage to purge the Handle from memory.
    This would have a performance impact if there were two slow consumers as potentially one queue could purge the data that the second queue needs. However, this can be addressed in Phase 2.
  3. Add check on message enqueue to ensure queue size does not grow beyond defined limit. Mark queue as flowed to disk when that occurs. Immediately flowing new messages on that queue.
  4. Add check on message dequeue to potentially start a prefetch thread to restore() flowed messages.
  5. Add properties to QueueDeclare for flow to disk control extensions as defined in C++ broker.
    Queue should gain new inMemorySize property which will highlight amount of data flowed to disk.

At this stage we should now have the broker in a state where persistent messages can be purged and retrieved from the current store(TransactionLog)and the transient messages can now be flowed via the new MessageStore interface. This should mean that OoM problems should only now occur when we have filled the memory up with references to messages. Which then leaves us with the harder problem of paging the queue content to disk.

Phase 2

Once we have an operational flow to disk mechanism for all message types we can begin consolidating the code base.

  1. Move message data (i.e. Everything but the TransactionalLog data) to the new MessageStore class leaving the TransactionLog as just the log of enqueues/dequeues.
  2. Move Reference handling into the unified MessageStore
  3. Consolidate WeakReference and InMemory MessageHandlers
    Note: Now all messages should be handled the same
  4. Delegate flow() and restore() methods to MessageStore so that it can purge the message from memory if the message has been flowed on all its enqueued queues.
  5. Augment Virtualhost Housekeeping thread to sweep queues that are over their inMemorySize limit to flow() data.
    This could also be used to prefetch data into the queue as required.

Future Phases

  1. Enable the flow to disk of the queue structure. This will remove the final constraints on memory and only limit the broker to the amount of disk space available.

--------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscr...@qpid.apache.org

Reply via email to