Objective and Scope.
Overview
The broker is prone to heap exhaustion, leading to OutOfMemory errors. One major source of memory consumption are the buffers used to hold unprocessed AMQP frames once they have been read from the socket. These are currently able to grow uninhibited, and there is no means available to control them. Further, the transport layer is poorly implemented and difficult to work in. Improving encapsulation is an explicit goal of this work.
For more information on the current design, please see Current Architecture.
Problem Statement
When the broker is unable to process frames as quickly as they are being sent these buffers begin to fill up and the broker has no way to limit those. For the broker to effectively manage its memory usage, it needs to be able to at least place an upper bound on the size of it's network buffers. It also has no way to know how large those buffers are.
1 High message throughput on one publishing connection
This is where the publishing client is sending a consistently sustained high rate of messages to the broker, and is more likely to happen where some of the messages are persistent
Data from the client gets out of the client side buffers and into the broker side buffers. The broker is processing messages onto the queues as fast as it can, but gets backed up and the broker side buffers grow until eventually the broker OOM's, the heap filled by the MINA buffers along with the data in queues.
2 Medium-High throughput on several publishing connections
This is where the are multiple connections sending data to the broker, at varying rates from medium to high (as measured in Qpid terms as greater than the ave/max throughputs measured in test). The broker threads are being managed by the JVM in terms of processor time i.e. yielding to each other in an unpredictable way.
As for case 1 above, with the caveat that it's far harder to predict how long it'd take to happen and how the TCP socket level behaviour will impact the client. It's also a more likely real world scenario i.e. probably more than one connection for MDS publication (for example) would result in a set of growing buffers resulting in OOM.
3. Environmental issue prevents broker from processing buffer data
This is where the broker, for example, runs out of SAN or something else (disk for logging etc) and so cannot process the messages out of the buffers and onto the queues. CPU ?
As for case 1, assuming that the broker just needs to be up to have data being buffered which previous tests certainly indicate i.e. a publisher can happily pump data into a disabled broker for some time before the connection gets killed.
4. Slow client causes broker network buffers to grow
This is where the client is not reading data as fast as the broker is sending it. The data will be buffered on the broker, increasing memory usage. This is particularly problematic when sending messages to the client since the payload will then be in both the message store and the network buffers at the same time, doubling usage per message.
Exclusions: / Assumptions
- No AMQP semantics are involved. The aim of this work is purely to limt the size of the network buffers between the client producing AMQP frames and the broker processing them. It does not involve any protocol specific work. In OSI terms, this work is aimed at layer 4, not layer 7.
- Higher level information should be determined by the broker itself. No policy will be applied beyond blocking reads if the buffer is full.
- Buffers are sized uniformly across all connections
- Buffers are fixed at startup and do not change
- Standard TCP flow control is the only mechanism used to signal to the client that it should cease to send data.
- It is better for the client to block further writes to the socket than allowing memory consumption to grow unimpeeded
- The broker should not block
Functional Requirements
- Buffer size control - all buffers have an upper size limit other than the queue itself
- TCP options: SO_KEEPALIVE, OOBINLINE, SO_RCVBUF, SO_REUSEADDR, SO_SNDBUF, SO_LINGER, SO_TIMEOUT, TCP_NODELAY
- SSL: link level encryption, do we want to consider things like certificate validation etc here or at a higher level? Consult with RHS
- signal on idle requires timer support
- Need to be notified when socket has been closed
- The broker needs to know that the transport layer is full and the write would / did not succeed - "don't send anymore just now until I clear this Future"?
- Non-TCP transports such as InVM, infiniband.
- Network buffers can be of unlimited size
- Rate statistics need to be available, including total throughput and average time for send() to complete. See Java Broker Design - Operational Logging for details
- send() should (optionally) fail after a configurable timeout rather than block forever
- Need to be able to change buffer sizes on new connections at runtime, existing connections can remain unchanged
- Send and recieve buffers should be independently sized
Non Functional Requirements
- Startup loading of transport plugins
- User can select specific transport to use
- Peer A running transport A can talk to Peer B running transport B
- Connections do not require a thread each (broker only, client can probably live with that)
- the semantics of org.apache.qpid.BasicMessageProducer.send() need to change. It may now block if there isn't enough free space to write the entire message out. The change to this methods semantics needs to be considered in the light of the stated JMS semantics and the change to support acknowledgement of publishes in AMQP 0-10 and higher.
- Need to document relative impact of buffer sizes
Architecture Design
Common should have an interface which all transport plugins can implement and which the server and client can use. The interface would include a means to set the standard socket options and to limit it's total memory usage.
TCP itself has a flow control mechanism which kicks in when the receiver of data cannot read from the socket as fast as data is being sent. TCP sockets use send and receive buffers to attempt to smooth the flow from application to network and maximize network performance. By limitting the rate at which the application reads from the network to the rate at which is can process the data the sender of the data is throttled to that production rate.
Overview of Design
- Common will hold a transport layer interface which the existing MINA transport will be ported too. We will also port the 0-10 client o.a.q.transport.network.io package to that interface. This interface should be quite simple.
- Methods to send, receive, flush, close, open, listen and a method to set TCP options are likely to be sufficient. These would operate on a QpidByteBuffer, essentially MinaByteBuffer to avoid having to fix our use of expanding buffers at the same time.
- The server and client both use common for their network layer, and will need to be updated to use the new interface. They will need to pass through the configured socket options.
- When processing the incoming data, one frame at a time will be processed and that frames processing will be completed before the next one is read. There will be no other data structures used to hold unprocessed frames. This will mean that the sender will become aware of variations in the recievers processing speed much sooner than is currently the case. Slow downs or pauses in processing incoming frames will cause the buffer to fill up and flow control to kick in. This can be mitigated if desired by increasing the relevant buffer sizes.
- The server will need to be substantively modified to push the MINA specific parts into the appropriate plugin. This primarily involves replacing the MINA ByteBuffer with a QpidByteBuffer and refactoring the MINA specific parts of the protocol handlers.
- Implementing fixed size IO buffers would require replacing MINA with an alternative implmentation of the transport layer. the first step in any such process would be to clearly encapsulate the concept of a transport layer using the existing MINA code as the initial implementation. The next step would be to adapt the already used 0-10 transport to sit clearly behind the same interface. Finally a network transport more atuned to the needs of a broker (supporting large numbers of incoming connections) could be developed from the base of the existing 0-10 transport.
- The changes would impact the client and broker where they interface with the transport layer. In the first phase the client and broker would be altered to use this implementation independent interface. Once this work had been completed the client and broker could be tested and released using the MINA implementation proving that no adverse impact from the encapsulation of the transport layer had occurred.
- Once the first phase of the work has been completed, alternative transport implementations could be developed. this would require no code changes to the client or broker, but would require system testing to prove that behaviour was correct when using the alternative transport implementation.
- Bounding the buffers attempts to address the issue of regulating incoming data flow to the rate at which it can be processed by the receiver. This will generally occur when the sender is capable of sending bursts of data at a high rate. This is most evident with persistent messages where the rate at which messages can be persisted to disk is much lower than the rate at which they can be sent over the network. Fundamentally if it is not able to process messages at the rate at which they are being sent, Qpid should not accept them, pretending to do so is giving the application a false impression about what Qpid is doing, and is potentially only deferring an issue to a later point when there will be a great deal of message loss. Further more the behaviour is non-JMS compliant (JMS expect publishing to be a synchronous activity).
Breakdown of work
- Encapsulate existing networking layer better
- Current and proposed network interfaces
- Port server to new interface
- Port client to new interface
- Remove Job/Event
- Bind network buffers
- Tests
- Representative workload tests need to be developed and put into perftests.
Testing
Testing under load and handling error conditions (unexpected disconnection etc) will need to be carried out.
New load tests which simulate application workloads need to be developed so that we can provide accurate configuration guidance. These tests then need to be carried out on Windows, Linux and Solaris in all permutations of client/server.
New unit tests will need to be written to cover the transport plugins and the new interfaces. Existing test coverage in this area is minimal.
Impact
There is a potential effect upon performance, we will need to measure this once it has been implemented to quantify what effect, if any, it has had.
Compatibility / Migration Implications
- Older clients connected to a new broker may suffer OOM when tcp flow control kicks in. This seems preferrable to the broker suffering OOM.
- Clients which upgrade their library may experience a change in behaviour of the send() method, since it may now block if the clients network buffer is full. This needs to be appropriately communicated. It should not be significantly different in behaviour from using transactions in the producer session however.
Risks
- MINA is quite deeply embedded in the server and will require some work to excise it fully. This is somewhat mitigated by the decision to import mina.ByteBuffer and continue using that.
- Differences in behaviour of transport layer may expose other bugs in the broker which were being hidden before.
- Inadequate test coverage, in particular the lack of representative application workloads in the performance test suite.