Current Architecture has been edited by Aidan Skinner (Jul 10, 2009).

(View changes)

Content:

Current implementation

Inside Qpid, data is read from a socket and placed in a buffer. A separate thread then takes this buffer and attempts to parse it as an AMQP command. this AMQP command is then put on a second buffer. Finally a third thread reads the command and processes it.

Currently the two buffers between these three threads are unbounded. This means that data is read from the network as fast as possible with no regard as to whether the broker has the capacity to process it.

Queues are themselves a kind of buffer between client applications.

From a queue the message can be assigned to be send to a client. At this point a delivery command is placed in another buffer awaiting sending on the network. When received by the client a similar process to receiving on the broker occurs

The whole process looks something like this

Client App sends message -> (MINA Buffer)
-> MINA Thread takes message and sends to TCP -> (TCP Buffer)
-> TCP places bytes on wire ->
~~~~~ Network ~~~~~
-> TCP reads from wire -> (TCP Buffer)
-> MINA Reads from TCP -> (MINA Buffer)
-> Bytes parsed and converted into AMQP Command -> (Job Queue Buffer)
-> AMQP Command processed, message placed on Queue -> (Queue - which is a buffer)
-> Message taken from queue and delivery command created -> (MINA Buffer)
-> MINA Thread takes message and sends to TCP -> (TCP Buffer)
-> TCP places bytes on wire ->
~~~~~ Network ~~~~~
-> TCP reads from wire -> (TCP Buffer)
-> MINA Reads from TCP -> (MINA Buffer)
-> Bytes parsed and converted into AMQP Command -> (Job Queue Buffer)
-> AMQP Command processed, message placed on Delivery Queue -> (Delivery Queue Buffer)
-> Message received by client application code

Or, pictorally:
 

Of all the buffers above, only the TCP buffers are bounded (the Delivery Queue Buffer in the client is potentially bounded by prefetch, although prefetch is not set on bytes but on messages which may be of arbitrary size), every other buffer is a potential source of out of memory exceptions.

From the above we can see that there are many potential sources of OutOfMemoryExceptions. We need to consider where we may get unbounded growth, what scenarios will cause that, and what other ways we have to mitigate those risks.

In general we get growth of the IO (MINA) buffers when sender and receiver are operating at mismatched rates (i.e. the Client and Broker). We will get unbounded growth of the queue if the sending client is producing at a faster rate than the receiving client can process.

Issues

  1. The current MINA networking uses unbounded buffers.
  2. We replace over a dozen MINA classes, none of which have any unit test coverage. We failed to get our patches upstream and haven't attempted since then.
  3. Existing unit test coverage is minimal (approx 30%)
  4. Improving unit test coverage is difficult due to poor encapsulation
  5. Poor encapsulation has lead to tight coupling of MINA to server
  6. The current behaviour of send() leaves the potential for message loss when not using transactions and violates JMS spec. Persistent messages which are held in either the client or servers buffers before being written to disk can be lost.
  7. MINA's internal state is currently a black box, leaving no way to determine how much memory is being used by an individual client connection.
  8. The way that we use MINA is suboptimal for our purpouses but is difficult to change due to the tight coupling
  9. Supporting alternative transport layers is impossible due to tight coupling of MINA (OSI layer 4) with the AMQP handlers (OSI layer 7).

--------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscr...@qpid.apache.org

Reply via email to