Author: aconway Date: Mon Nov 1 14:10:29 2010 New Revision: 1029670 URL: http://svn.apache.org/viewvc?rev=1029670&view=rev Log: Updates to new cluster design docs.
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/new-cluster-design.txt qpid/trunk/qpid/cpp/src/qpid/cluster/new-cluster-plan.txt Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/new-cluster-design.txt URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/new-cluster-design.txt?rev=1029670&r1=1029669&r2=1029670&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/new-cluster-design.txt (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/new-cluster-design.txt Mon Nov 1 14:10:29 2010 @@ -76,22 +76,27 @@ broker code. Virtual synchrony delivers all data from all clients in a single stream to each broker. The cluster must play this data thru the full broker code stack: connections, sessions etc. in a single thread -context. The cluster has a pipelined design to get some concurrency -but this is a severe limitation on scalability in multi-core hosts -compared to the standalone broker which processes each connection -in a separate thread context. +context in order to get identical behavior on each broker. The cluster +has a pipelined design to get some concurrency but this is a severe +limitation on scalability in multi-core hosts compared to the +standalone broker which processes each connection in a separate thread +context. ** A new cluster design. - -Maintain /equivalent/ state not /identical/ state on each member. -Messages from different sources need not be ordered identically on all members. +Clearly defined interface between broker code and cluster plug-in. + +Replicate queue events rather than client data. +- Broker behavior only needs to match per-queue. +- Smaller amount of code (queue implementation) that must behave predictably. +- Events only need be serialized per-queue, allows concurrency between queues -Use a moving queue ownership protocol to agree order of dequeues, rather -than relying on identical state and lock-step behavior to cause +Use a moving queue ownership protocol to agree order of dequeues. +No longer relies on identical state and lock-step behavior to cause identical dequeues on each broker. -Clearly defined interface between broker code and cluster plug-in. +Each queue has an associated thread-context. Events for a queue are executed +in that queues context, in parallel with events for other queues. *** Requirements @@ -104,35 +109,19 @@ The cluster must provide these delivery - client rejects message: message must be dead-lettered or discarded and forgotten. - client disconnects/broker crashes: acquired but not accepted messages must be re-queued on cluster. - Each guarantee takes effect when the client receives a *completion* for the associated command (transfer, acquire, reject, accept) -The cluster must provide this ordering guarantee: - -- messages from the same publisher received by the same subscriber - must be received in the order they were sent (except in the case of - re-queued messages.) - *** Broker receiving messages On recieving a message transfer, in the connection thread we: - multicast a message-received event. -- enqueue the message on the local queue. -- asynchronously complete the transfer when the message-received is self-delivered. +- enqueue and complete the transfer when it is self-delivered. -This like asynchronous completion in the MessageStore: the cluster -"stores" a message by multicast. We send a completion to the client -asynchronously when the multicast "completes" by self-delivery. This -satisfies the "client sends transfer" guarantee, but makes the message -available on the queue immediately, avoiding the multicast latency. - -It also moves most of the work to the client connection thread. The -only work in the virtual synchrony deliver thread is sending the client -completion. +Other brokers enqueue the message when they recieve the message-received event. -Other brokers enqueue the message when they recieve the -message-received event, in the virtual synchrony deliver thread. +Enqueues are queued up with other queue operations to be executed in the +thread context associated with the queue. *** Broker sending messages: moving queue ownership @@ -143,10 +132,6 @@ Periodically the owner hands over owners broker, providing time-shared access to the queue among all interested brokers. -This means we no longer need identical message ordering on all brokers -to get consistent dequeuing. Messages from different sources can be -ordered differently on different brokers. - We assume the same IO-driven dequeuing algorithm as the standalone broker with one modification: queues can be "locked". A locked queue is not available for dequeuing messages and will be skipped by the @@ -164,14 +149,14 @@ a release-queue event, allowing another ownership. *** Asynchronous completion of accept - +### HERE In acknowledged mode a message is not forgotten until it is accepted, to allow for requeue on rejection or crash. The accept should not be completed till the message has been forgotten. On receiving an accept the broker: - dequeues the message from the local queue -- multicasts a "dequeue" event +- multicasts an "accept" event - completes the accept asynchronously when the dequeue event is self delivered. NOTE: The message store does not currently implement asynchronous @@ -191,16 +176,6 @@ being resolved. #TODO: The only source of dequeue errors is probably an unrecoverable journal failure. -When a new member (the updatee) joins a cluster it needs to be brought up to date. -The old cluster design an existing member (the updater) sends a state snapshot. - -To ensure consistency of the snapshot both the updatee and the updater -"stall" at the point of the update. They stop processing multicast -events and queue them up for processing when the update is -complete. This creates a back-log of work for each to get through, -which leaves them lagging behind the rest of the cluster till they -catch up (which is not guaranteed to happen in a bounded time.) - ** Updating new members When a new member (the updatee) joins a cluster it needs to be brought @@ -270,8 +245,8 @@ The cluster interface captures these eve The cluster will require some extensions to the Queue: - Queues can be "locked", locked queues are ignored by IO-driven output. -- Messages carry a cluster-message-id. -- messages can be dequeued by cluster-message-id +- Cluster must be able to apply queue events from the cluster to a queue. + These appear to fit into existing queue operations. ** Maintainability @@ -302,27 +277,27 @@ The only way to verify the relative perf to prototype & profile. The following points suggest the new design may scale/perform better: -Moving work from virtual synchrony thread to connection threads where -it can be done in parallel: +Some work moved from virtual synchrony thread to connection threads: - All connection/session logic moves to connection thread. - Exchange routing logic moves to connection thread. -- Local broker does all enqueue/dequeue work in connection thread -- Enqueue/dequeue are IO driven as for a standalone broker. +- On local broker dequeueing is done in connection thread +- Local broker dequeue is IO driven as for a standalone broker. -Optimizes common cases (pay for what you use): -- Publisher/subscriber on same broker: replication is fully asynchronous, no extra latency. -- Unshared queue: dequeue is all IO-driven in connection thread. -- Time sharing: pay for time-sharing only if queue has consumers on multiple brokers. - -#TODO Not clear how the time sharing algorithm would compare with the existing cluster delivery algorithm. - -Extra decode/encode: The old design multicasts raw client data and -decodes it in the virtual synchrony thread. The new design would -decode messages in the connection thread, re-encode them for -multicast, and decode (on non-local brokers) in the virtual synchrony -thread. There is extra work here, but only in the *connection* thread: -on a multi-core machine this happens in parallel for every connection, -so it probably is not a bottleneck. There may be scope to optimize +For queues with all consumers on a single node dequeue is all +IO-driven in connection thread. Pay for time-sharing only if queue has +consumers on multiple brokers. + +Doing work for different queues in parallel scales on multi-core boxes when +there are multiple queues. + +One difference works against performance, thre is an extra +encode/decode. The old design multicasts raw client data and decodes +it in the virtual synchrony thread. The new design would decode +messages in the connection thread, re-encode them for multicast, and +decode (on non-local brokers) in the virtual synchrony thread. There +is extra work here, but only in the *connection* thread: on a +multi-core machine this happens in parallel for every connection, so +it probably is not a bottleneck. There may be scope to optimize decode/re-encode by re-using some of the original encoded data, this could also benefit the stand-alone broker. @@ -342,20 +317,15 @@ queue replication', allowing such replic on a WAN say) to be initiated after the queue had already been created and been in use (one of the key missing features). -** Optimizing the active-passive special case. - -In the case where all consumers of a queue are on the same broker, we -can get better performance because we don't need to transfer ownership -or information about acquisition. We need to optimize this case to -perform like an active-passive mode of replication. - ** Increasing Concurrency and load sharing + The current cluster is bottlenecked by processing everything in the CPG deliver thread. By removing the need for identical operation on each broker, we open up the possiblility of greater concurrency. -Handling multicast enqueue, acquire, accpet, release etc: concurrency per queue. -Operatons on different queues can be done in different threads. +Handling multicast enqueue, acquire, accpet, release etc: concurrency +per queue. Operatons on different queues can be done in different +threads. The new design does not force each broker to do all the work in the CPG thread so spreading load across cluster members should give some @@ -401,3 +371,25 @@ Clustering and scalability: new design m address scalability as part of cluster design. Think about relationship to federation and "fragmented queues" idea. +* Design questions/descisions +** Total ordering. +Initial thinking: allow message ordering to differ between brokers. +New thinking: use CPG total ordering, get identical ordering on all brokers. +- Allowing variation in order introduces too much chance of unexpected behavior. +- Usign total order allows other optimizations, see Message Identifiers below. + +** Message identifiers. +Initial thinking: message ID = CPG node id + 64 bit sequence number. +This involves a lot of mapping between cluster IDs and broker messsages. + +New thinking: message ID = queue name + queue position. +- Removes most of the mapping and memory management for cluster code. +- Requires total ordering of messages (see above) + +** Message rejection +Initial thinking: add special reject/rejected points to cluster interface so +rejected messages could be re-queued without multicast. + +New thinking: treat re-queueing after reject as entirely new message. +- Simplifies cluster interface & implementation +- Not on the critical path. Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/new-cluster-plan.txt URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/new-cluster-plan.txt?rev=1029670&r1=1029669&r2=1029670&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/new-cluster-plan.txt (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/new-cluster-plan.txt Mon Nov 1 14:10:29 2010 @@ -92,18 +92,18 @@ is omitted. *** BrokerHandler Types: -- struct QueuedMessage { Message msg; QueueName q; Position pos; } -- SequenceNumber 64 bit sequence number to identify messages. -- NodeId 64 bit CPG node-id, identifies member of the cluster. -- struct MessageId { NodeId node; SequenceNumber seq; } +- struct QueuedMessage { Message msg; QueueName q; SequenceNumber position; } +- struct -NOTE: Message ID's identify a QueuedMessage, i.e. a position on a queue. +NOTE: +- Messages on queues are identified by a queue name + a position. +- Messages being routed are identified by a sequence number. Members: -- atomic<SequenceNumber> sequence // sequence number for message IDs. - thread_local bool noReplicate // suppress replication. - thread_local bool isRouting // suppress operations while routing -- QueuedMessage localMessage[SequenceNumber] // local messages being enqueued. +- Message localMessage[SequenceNumber] // local messages being routed. +- thread_local SequenceNumber routingSequence NOTE: localMessage is also modified by MessageHandler. @@ -118,47 +118,37 @@ routing(msg) enqueue(qmsg): if noReplicate: return - if !qmsg.msg.id: - seq = sequence++ - qmsg.msg.id = (self,seq) - localMessage[seq] = qmsg - mcast create(encode(qmsg.msg),seq) - mcast enqueue(qmsg.q,qmsg.msg.id.seq) + if routingSequence == 0 # thread local + routingSequence = nextRoutingSequence() + mcast create(encode(qmsg.msg),routingSeq) + mcast enqueue(qmsg.q,routingSeq) routed(msg): if noReplicate: return - if msg.id: mcast routed(msg.id.seq) isRouting = false acquire(qmsg): if noReplicate: return if isRouting: return # Ignore while we are routing a message. - if msg.id: mcast acquire(msg.id, q) + if msg.id: mcast acquire(qmsg) release(QueuedMessage) if noReplicate: return if isRouting: return # Ignore while we are routing a message. - if msg.id: mcast release(id, q) + mcast release(qmsg) accept(QueuedMessage): if noReplicate: return if isRouting: return # Ignore while we are routing a message. - if msg.id: mcast dequeue(msg.id, msg.q) + mcast accept(qmsg) reject(QueuedMessage): isRejecting = true - if msg.id: mcast reject(msg.id, msg.q) + mcast reject(qmsg) -rejected(QueuedMessage): - isRejecting = false - mcast dequeue(msg.id, msg.q) - -dequeue(QueuedMessage) - # No mcast in dequeue, only used for local cleanup of resources. - # E.g. messages that are replaced on an LVQ are dequeued without being - # accepted or rejected. dequeue is called with the queue lock held - # FIXME revisit - move it out of the queue lock. - cleanup(msg) +# FIXME no longer needed? +drop(QueuedMessage) + cleanup(qmsg) *** MessageHandler and mcast messages Types: @@ -328,6 +318,21 @@ cancel(q,consumer,consumerCount) - Queue #TODO: lifecycle, updating cluster data structures when queues are destroyed +*** Increasing concurrency +The major performance limitation of the old cluster is that it does +everything in the single CPG deliver thread context. + +We can get additional concurrency by creating a thread context _per queue_ +for queue operations: enqueue, acquire, accept etc. + +We associate a PollableQueue of queue operations with each AMQP queue. +The CPG deliver thread would +- build messages and associate with cluster IDs. +- push queue ops to the appropriate PollableQueue to be dispatched the queues thread. + +Serializing operations on the same queue avoids contention, but takes advantage +of the independence of operations on separate queues. + *** Re-use of existing cluster code - re-use Event - re-use Multicaster @@ -387,10 +392,18 @@ Target broker may not have all messages Need to add callpoints & mcast messages to replicate these? ** TODO [#B] Flow control for internal queues. - -Need to bound the size of the internal queues holding cluster events & frames. -- stop polling when we reach bound. -- start polling when we get back under it. + +Need to bound the size of internal queues: delivery and multicast. +- stop polling for read on client connections when we reach a bound. +- restart polling when we get back under it. + +That will stop local multicasting, we still have to deal with remote +multicasting (note existing cluster does not do this.) Something like: +- when over bounds multicast a flow-control event. +- on delivery of flow-control all members stop polling to read client connections +- when back under bounds send flow-control-end, all members resume +- if flow-controling member dies others resume + ** TODO [#B] Integration with transactions. Do we want to replicate during transaction & replicate commit/rollback or replicate only on commit? --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscr...@qpid.apache.org