Page Edited :
qpid :
Java Broker Design - Flow to Disk
Java Broker Design - Flow to Disk has been edited by Martin Ritchie (Feb 02, 2009). Change summary: Update to remove suggestion of byte array copies on message delivery. Flow to Disk DesignOverviewCurrently, the Java Broker can do one of two things with a message it has to deliver:
This means that the broker is not able to avoid OoM exceptions i.e. send enough messages to the broker, especially if your consumers are not active, and you could bring the broker down once it explodes its available heap. This page pulls together the ideas from QPID-949 Other ImplementationsActive MQ use the idea of a message cursor and have a number of different policies for performing 'Message Spooling' : Message Cursors Current FunctionalityCurrently the broker treats persistent and transient messages differently. Persistent messages are written to disk as they are received and handles created as WeakReferenceMessageHandles. This means that when an Out of Memory(OoM) condition occurs then all the persistent message handles are GC'd. Performance hits the floor as all messages at the front of the queues must be read from disk whilst new messages are kept in memory but at the back of the queue. Transient messages are created as InMemoryMessageHandles and so cannot be purged from memory. When an OoM condition occurs the broker cannot recover. DesignThere are areas of the broker that are in need of improvement that could be affected by this implementation:
Approach OverviewThe approach here is to reduce the overall complexity of the broker so that it is in easier to reason about smaller chunks. Focusing at the level of a Queues would make life easier as we move towards AMQP 1-0. To facilitate this we should replace our optimisation of only having a single copy of the Message with a copy per Queue. This will allow the Queues to better reason and act upon their memory usage. The type of message transient/persistent should be handled the same by the queue flowing both to disk if required. Approach Summary
Future Enhancements
Approach DetailsTo highlight the changes that will be required lets look at the processing that is performed on an incoming message: AMQChannelWhen a persistent message is received the headers and chunks are recorded in the new TransactionLog, the remainder of the current MessageStore will be moved to a new RoutingTable interface. +- TransactionLog -+ | enqueue | | dequeue | | storeHeader | | storeChunk | | | | startTransaction | | commit | | abort | +------------------+ +--RoutingTable--+ | createQueue | | createExchange | | createBinding | | | | deleteQueue | | deleteExchange | | deleteBinding | +----------------+ The TransactionLog is a distilled version of the current MessageStore interfaces. The log is the persistent record of the state of the broker. On start up this log is used to restore the the routing and message states. It is not to be used as a lookup mechanism, the queue's must now be responsible for remembering all the enqueue messages and not rely on the previous MessageStore. As no random access to the log file is needed it can be implemented as a write ahead log. It can periodically cleanup the old state by writing a new log but as it its primary function is to ensure state is persisted to disk it need not maintain maps of the data thus simplifying its implementation. The responsibility for remembering the message data is delegated to the Queue. The TransactionLog shall absorb the current reference counting code and be responsible for deciding when to recoverably delete a message. Currently the reference counting is still spread across a number of different classes and has a couple of serious problems. The TransactionLog will record a series of Queue/Message tuples so that it can pair enqueue/dequeue calls. When there are no more references to the message then it can safely know that the message is no longer needed. By using a list of tuples rather than an integer count the TransactionLog is capable of safely interleaving transactions as there is no shared count value. TransactionLog RecoveryCurrently the MessasgeStore is responsible for providing unique MessageIDs, this is not strictly part of a TransactionLog as a result it would not make sense to include it in the interface. What is recommended is that we unify our message creation as part of removing the *MessageHandle objects. Messages recovered directly from the store currently create the *MessageHandle directly with a Message ID; while message delivered via the wire ask the MessageStore for an ID before creating an IncommingMessage which in turn creates the *MessageHandle. As we will be removing the *MessageHandle objects it makes sense to unify our message creation through a Factory MessageFactory. This will allow the factory to be responsible for the sequence of IDs. When recovery is in progress a call to createMessage(id) will take place and the factory need only:
AMQQueue/QueueEntryWhen a message has been fully received it is then routed to the required Queues as before. Only persistent messages that are routed to persistent queues are written to the TransactionLog which is then responsible for the ultimate deletion of persistent message data. For this to occur the existing model needs to be updated. The *Handle objects we currently have need to be merged in to AMQMessage and all the state about the message needs to be moved to the QueueEntry. This will allow us to null the AMQMessage reference as and when the message is flowed to disk. The QueueEntry interface will be augmented to allow the Queue to flow the data when required. When the data is recovered then no attempt is made to restore the single instance of the message. i.e. If a single message is sent to 10 queues initially there will be one AMQMessage and one copy of the data. When a queue is flowed then it will lose the reference to that message so on recover a new message with a copy of the data will be created soley for that queues use. +-QueueEntry-+ | flow | | isFlowed | | recover | +------------+ Our existing Queue needs to be updated to be able to record the additional state of the QueueEntry s. Currently we have queueCount and queueSize that represent the count and data size used by the queue. The Queue needs to have additional queueMaxMemory, queueMinMemory, queueInMemory and isFlowed added. It is proposed that only the data size is used for flow to disk calculations as counting messages will not give us the control that we need over memory usage. These new variables will be used to control two new threads Inhaler and Purger. PurgerWhen queueMaxMemory is reached the queue is set to flow and all new messages on to the queue are sent straight to disk. As messages are sent to a subscriber there are a couple of possibilities when the queue is in a flowed state:
Using the first mechanism we do not need to have a Purger thread for the simple queue case. However for the second case and to handle queues where the position of the incoming message is not known then the Purger thread will be required. The Purger simply needs to start at the front of the queue and record the amount of data still in memory on the Queue when queueMaxMemory is reached all subsequent messages are flowed. InhalerThe Inhaler is an optimisation to ensure that the broker returns to peek performance after a flow to disk event. Lazily loading messages on demand would be quite slow; so on delivery to a subscriber a check can be performed to see if the current queueInMemory is less than the queueMinMemory which would indicate that there is room to reload older messages. The Inhaler can then begin to load messages from disk until queueMaxMemory has been reached or all messages are back in memory. If all the messages are back in memory then the queue flow state can be reset allowing incoming messages to stay in memory. The updates to delivery to the queue and to the subscriber are expected to be updated in the following ways: Pseudo-Code - Delivery to Subscriber while (message in queue) subscriber.deliver(message) if (flowed) flowToDisk(message) if (_queueInMemory < _queueMinMemory) startInhaler Pseudo-Code - Delivery to Queue addToQueue(message) if (flowed) flowToDisk(message) else if (_queueInMemory > _queueMaxMemory) setFlowed startPurger The additional overhead of checking state is done after the message deliveries have been performed and are simple calculations compared to the existing message flow paths. As a result the non-flowed state performance should not be affected. QueueBackingThe flow to disk implementation will be performed on a queue by queue basis so a new QueueBacking will be created to handle the flowing of messages to and from disk. +-QueueBacking-+ | flow | | recover | | delete | +--------------+ When a message is dequeue then it must also be removed from QueueBacking. |
Unsubscribe or edit your notifications preferences