Network IO Interface has been edited by Aidan Skinner (Jul 06, 2009).

(View changes)

Content:

Objective and Scope.

Overview

The broker is prone to heap exhaustion, leading to OutOfMemory errors. The servers main memory usage is storing messages on queues, Producer flow control is needed to manage that area and is out of scope for this work.

The other major source of memory consumption are the network buffers used to hold unprocessed AMQP frames once they have been read from the socket. These are currently able to grow uninhibited, and there is no means available to control them. Further, the transport layer is poorly implemented and difficult to work in. Improving encapsulation is an explicit goal of this work.

Problem Statement

When the broker is unable to process frames as quickly as they are being sent these buffers begin to fill up and the broker has no way to limit those. For the broker to effectively manage its memory usage, it needs to be able to at least place an upper bound on the size of it's network buffers. It also has no way to know how large those buffers are.

1 High message throughput on one publishing connection

This is where the publishing client is sending a consistently sustained high rate of messages to the broker, and is more likely to happen where some of the messages are persistent

Data from the client gets out of the client side buffers and into the broker side buffers. The broker is processing messages onto the queues as fast as it can, but gets backed up and the broker side buffers grow until eventually the broker OOM's, the heap filled by the MINA buffers along with the data in queues.

2 Medium-High throughput on several publishing connections

This is where the are multiple connections sending data to the broker, at varying rates from medium to high (as measured in Qpid terms as greater than the ave/max throughputs measured in test). The broker threads are being managed by the JVM in terms of processor time i.e. yielding to each other in an unpredictable way.

As for case 1 above, with the caveat that it's far harder to predict how long it'd take to happen and how the TCP socket level behaviour will impact the client. It's also a more likely real world scenario i.e. probably more than one connection for MDS publication (for example) would result in a set of growing buffers resulting in OOM.

3. Environmental issue prevents broker from processing buffer data

This is where the broker, for example, runs out of SAN or something else (disk for logging etc) and so cannot process the messages out of the buffers and onto the queues. CPU ?

As for case 1, assuming that the broker just needs to be up to have data being buffered which previous tests certainly indicate i.e. a publisher can happily pump data into a disabled broker for some time before the connection gets killed.

4. Slow client causes broker network buffers to grow

This is where the client is not reading data as fast as the broker is sending it. The data will be buffered on the broker, increasing memory usage. This is particularly problematic when sending messages to the client since the payload will then be in both the message store and the network buffers at the same time, doubling usage per message.

Current Architecture

  1. The current MINA networking uses unbounded buffers.
  2. We replace over a dozen MINA classes, none of which have any unit test coverage. We failed to get our patches upstream and haven't attempted since then.
  3. Existing unit test coverage is minimal (approx 30%)
  4. Improving unit test coverage is difficult due to poor encapsulation
  5. Poor encapsulation has lead to tight coupling of MINA to server
  6. The current behaviour of send() leaves the potential for message loss when not using transactions and violates JMS spec. Persistent messages which are held in either the client or servers buffers before being written to disk can be lost.
  7. MINA's internal state is currently a black box, leaving no way to determine how much memory is being used by an individual client connection.
  8. The way that we use MINA is suboptimal for our purpouses but is difficult to change due to the tight coupling
  9. Supporting alternative transport layers is impossible due to tight coupling of MINA (OSI layer 4) with the AMQP handlers (OSI layer 7).

Inside Qpid, data is read from a socket and placed in a buffer. A separate thread then takes this buffer and attempts to parse it as an AMQP command. this AMQP command is then put on a second buffer. Finally a third thread reads the command and processes it.

Currently the two buffers between these three threads are unbounded. This means that data is read from the network as fast as possible with no regard as to whether the broker has the capacity to process it.

Queues are themselves a kind of buffer between client applications.

From a queue the message can be assigned to be send to a client. At this point a delivery command is placed in another buffer awaiting sending on the network. When received by the client a similar process to receiving on the broker occurs

The whole process looks something like this

Client App sends message -> (MINA Buffer)
-> MINA Thread takes message and sends to TCP -> (TCP Buffer)
-> TCP places bytes on wire ->
~~~~ Network ~~~~
-> TCP reads from wire -> (TCP Buffer)
-> MINA Reads from TCP -> (MINA Buffer)
-> Bytes parsed and converted into AMQP Command -> (Job Queue Buffer)
-> AMQP Command processed, message placed on Queue -> (Queue - which is a buffer)
-> Mesasge taken from queue and delivery command created -> (MINA Buffer)
-> MINA Thread takes message and sends to TCP -> (TCP Buffer)
-> TCP places bytes on wire ->
~~~~ Network ~~~~
-> TCP reads from wire -> (TCP Buffer)
-> MINA Reads from TCP -> (MINA Buffer)
-> Bytes parsed and converted into AMQP Command -> (Job Queue Buffer)
-> AMQP Command processed, message placed on Delivery Queue -> (Delivery Queue Buffer)
-> Message received by client application code

Of all the buffers above, only the TCP buffers are bounded (the Delivery Queue Buffer in the client is potentially bounded by prefetch, although prefetch is not set on bytes but on messages which may be of arbitrary size), every other buffer is a potential source of out of memory exceptions.

From the above we can see that there are many potential sources of OutOfMemoryExceptions. We need to consider where we may get unbounded growth, what scenarios will cause that, and what other ways we have to mitigate those risks.

In general we get growth of the IO (MINA) buffers when sender and receiver are operating at mismatched rates (i.e. the Client and Broker). We will get unbounded growth of the queue if the sending client is producing at a faster rate than the receiving client can process.

Exclusions: / Assumptions

  1. No AMQP semantics are involved. The aim of this work is purely to limt the size of the network buffers between the client producing AMQP frames and the broker processing them. It does not involve any protocol specific work. In OSI terms, this work is aimed at layer 4, not layer 7.
  2. Higher level information should be determined by the broker itself. No policy will be applied beyond blocking reads if the buffer is full.
  3. Buffers are sized uniformly across all connections
  4. Buffers are fixed at startup and do not change
  5. Standard TCP flow control is the only mechanism used to signal to the client that it should cease to send data.
  6. It is better for the client to block further writes to the socket than allowing memory consumption to grow unimpeeded
  7. The broker should not block

High-Level Technical Architecture

Functional Requirements

  1. Buffer size control - all buffers have an upper size limit other than the queue itself
  2. TCP options: nodelay, keepalive, window size etc.
  3. SSL: link level encryption, do we want to consider things like certificate validation etc here or at a higher level? Consult with RHS
  4. signal on idle requires timer support
  5. Need to be notified when socket has been closed
  6. The broker needs to know that the transport layer is full and the write would / did not succeed - "don't send anymore just now until I clear this Future"?
  7. Non-TCP transports such as InVM, infiniband.
  8. Ability to switch off buffer bounding is important. A key area is the sizing of buffers. I think it'll be very hard to get that right for all existing applications and we might find some projects which previously had no issues with OOM start to have undesirable client impact from fixed buffer bounding (see later). Being able to switch back to unbounded buffers at startup should mitigate this risk.
  9. Logging - I understand that we don't know when TCP flow control is happening, but we need to have some diagnostics to allow users to figure out why their send() is behaving differently on upgrade. I'd proposed a trigger/log on buffer threshold being reached or some stats on the per connection buffer size. Goal is to be able to see where slowdown/block is - client, network, broker, network, client.
  10. Bound Changes - I'd think even where an app is going along fine with a static buffer size, they may need to be able to say 'all connections from here on get this new value'. I'd discussed with Aidan perhaps using a % figure to calc buffer sizes so that the broker can size it up/down depending on number of connections intermittently i.e. broker checks how many it has and then recalcs the static figure for any connections after that point. So, if you have 10-20 connections they all get (for example) 5% of the total buffer space. If you have 100, they all get 0.5% ....
  11. Rate statistics need to be available, including total throughput and average time for send() to complete. See Java+Broker+Design+-+Operational+Logging for details
  12. send() should (optionally) fail after a configurable timeout rather than block forever
  13. Need to be able to change buffer sizes on new connections at runtime, existing connections can remain unchanged
  14. Send and recieve buffers should be independently sized

Non Functional Requirements

  1. Startup loading of transport plugins
  2. User can select specific transport to use
  3. Peer A running transport A can talk to Peer B running transport B
  4. Connections do not require a thread each (broker only, client can probably live with that)
  5. the semantics of org.apache.qpid.BasicMessageProducer.send() need to change. It may now block if there isn't enough free space to write the entire message out. The change to this methods semantics needs to be considered in the light of the stated JMS semantics and the change to support acknowledgement of publishes in AMQP 0-10 and higher.
  6. Need to document relative impact of buffer sizes

Architecture Design

Common should have an interface which all transport plugins can implement and which the server and client can use. The interface would include a means to set the standard socket options and to limit it's total memory usage.

TCP itself has a flow control mechanism which kicks in when the receiver of data cannot read from the socket as fast as data is being sent. TCP sockets use send and receive buffers to attempt to smooth the flow from application to network and maximize network performance. By limitting the rate at which the application reads from the network to the rate at which is can process the data the sender of the data is throttled to that production rate.

Overview of Design

Common will hold a transport layer interface which the existing MINA transport will be ported too. We will also port the 0-10 client o.a.q.transport.network.io package to that interface. This interface should be quite simple.

Methods to send, receive, flush, close, open, listen and a method to set TCP options are likely to be sufficient. These would operate on a QpidByteBuffer, essentially MinaByteBuffer to avoid having to fix our use of expanding buffers at the same time.

The server and client both use common for their network layer, and will need to be updated to use the new interface. They will need to pass through the configured socket options.

When processing the incoming data, one frame at a time will be processed and that frames processing will be completed before the next one is read. There will be no other data structures used to hold unprocessed frames.

The server will need to be substantively modified to push the MINA specific parts into the appropriate plugin.

Implementing fixed size IO buffers would require replacing MINA with an alternative implmentation of the transport layer. the first step in any such process would be to clearly encapsulate the concept of a transport layer using the existing MINA code as the initial implementation. The next step would be to adapt the already used 0-10 transport to sit clearly behind the same interface. Finally a network transport more atuned to the needs of a broker (supporting large numbers of incoming connections) could be developed from the base of the existing 0-10 transport.

The changes would impact the client and broker where they interface with the transport layer. In the first phase the client and broker would be altered to use this implementation independent interface. Once this work had been completed the client and broker could be tested and released using the MINA implementation proving that no adverse impact from the encapsulation of the transport layer had occurred.

Once the first phase of the work has been completed, alternative transport implementations could be developed. this would require no code changes to the client or broker, but would require system testing to prove that behaviour was correct when using the alternative transport implementation.

Implementing a bounded buffer transport without implementing AMQP flow control incurs no extra cost (although it obviously only addresses the issues solved by buffer bounding).

Bounding the buffers attempts to address the issue of regulating incoming data flow to the rate at which it can be processed by the receiver. This will generally occur when the sender is capable of sending bursts of data at a high rate. This is most evident with persistent messages where the rate at which messages can be persisted to disk is much lower than the rate at which they can be sent over the network. Fundamentally if it is not able to process messages at the rate at which they are being sent, Qpid should not accept them, pretending to do so is giving the application a false impression about what Qpid is doing, and is potentially only deferring an issue to a later point when there will be a great deal of message loss. Further more the behaviour is non-JMS compliant (JMS expect publishing to be a synchronous activity).

Breakdown of component parts.

  1. Common
    1. The interface for common needs to be designed, implemented and tested.
    2. Timer support for heartbeats
    3. "I'm full, hold on" support
  2. Server
    1. Existing code needs to be refactored to remove dependence on MINA
    2. Existing code ported to use new common interface
    3. Configuration to select a transport
  3. Client
    1. Existing code needs to be refactored to remove dependence on MINA
    2. Existing code ported to use new common interface
    3. Changes to send() semantics need to be considered and documented
    4. Configuration to select a transport
  4. Tests
    1. Representative workload tests need to be developed and put into perftests.

Testing

Testing under load and handling error conditions (unexpected disconnection etc) will need to be carried out. New load tests which accurately simulate application workloads need to be developed so that we can provide accurate configuration guidance. This needs to be carried out on Windows, Linux and Solaris as they have significantly different OS in all permutations of client/server.

New unit tests will need to be written to cover the transport plugins and the new interfaces. Existing test coverage in this area is minimal.

Impact

There is a potential effect upon performance, we will need to measure this once it has been implemented to quantify what effect, if any, it has had.

Compatibility / Migration Implications

  1. Older clients connected to a new broker may suffer OOM when tcp flow control kicks in. This seems preferrable to the broker suffering OOM.
  2. Clients which upgrade their library may experience a change in behaviour of the send() method, since it may now block if the clients network buffer is full. This needs to be appropriately communicated. It should not be significantly different in behaviour from using transactions in the producer session however.

Risks

  1. MINA is quite deeply embedded in the server and will require some work to excise it fully. This is somewhat mitigated by the decision to import mina.ByteBuffer and continue using that.
  2. Differences in behaviour of transport layer may expose other bugs in the broker which were being hidden before.
  3. Inadequate test coverage, in particular the lack of representative application workloads in the performance test suite.

--------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscr...@qpid.apache.org

Reply via email to