Author: aconway
Date: Mon Nov  1 14:10:29 2010
New Revision: 1029670

URL: http://svn.apache.org/viewvc?rev=1029670&view=rev
Log:
Updates to new cluster design docs.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/cluster/new-cluster-design.txt
    qpid/trunk/qpid/cpp/src/qpid/cluster/new-cluster-plan.txt

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/new-cluster-design.txt
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/new-cluster-design.txt?rev=1029670&r1=1029669&r2=1029670&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/new-cluster-design.txt (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/new-cluster-design.txt Mon Nov  1 
14:10:29 2010
@@ -76,22 +76,27 @@ broker code.
 Virtual synchrony delivers all data from all clients in a single
 stream to each broker.  The cluster must play this data thru the full
 broker code stack: connections, sessions etc. in a single thread
-context. The cluster has a pipelined design to get some concurrency
-but this is a severe limitation on scalability in multi-core hosts
-compared to the standalone broker which processes each connection
-in a separate thread context.
+context in order to get identical behavior on each broker. The cluster
+has a pipelined design to get some concurrency but this is a severe
+limitation on scalability in multi-core hosts compared to the
+standalone broker which processes each connection in a separate thread
+context.
 
 ** A new cluster design.
-   
-Maintain /equivalent/ state not /identical/ state on each member. 
 
-Messages from different sources need not be ordered identically on all 
members. 
+Clearly defined interface between broker code and cluster plug-in.
+
+Replicate queue events rather than client data.
+- Broker behavior only needs to match per-queue.
+- Smaller amount of code (queue implementation) that must behave predictably.
+- Events only need be serialized per-queue, allows concurrency between queues
 
-Use a moving queue ownership protocol to agree order of dequeues, rather
-than relying on identical state and lock-step behavior to cause
+Use a moving queue ownership protocol to agree order of dequeues.
+No longer relies on identical state and lock-step behavior to cause
 identical dequeues on each broker.
 
-Clearly defined interface between broker code and cluster plug-in.
+Each queue has an associated thread-context. Events for a queue are executed
+in that queues context, in parallel with events for other queues.
 
 *** Requirements
 
@@ -104,35 +109,19 @@ The cluster must provide these delivery 
 - client rejects message: message must be dead-lettered or discarded and 
forgotten.
 - client disconnects/broker crashes: acquired but not accepted messages must 
be re-queued on cluster.
 
-
 Each guarantee takes effect when the client receives a *completion*
 for the associated command (transfer, acquire, reject, accept)
 
-The cluster must provide this ordering guarantee:
-
-- messages from the same publisher received by the same subscriber
-  must be received in the order they were sent (except in the case of
-  re-queued messages.)
-
 *** Broker receiving messages
      
 On recieving a message transfer, in the connection thread we:
 - multicast a message-received event.
-- enqueue the message on the local queue.
-- asynchronously complete the transfer when the message-received is 
self-delivered.
+- enqueue and complete the transfer when it is self-delivered.
 
-This like asynchronous completion in the MessageStore: the cluster
-"stores" a message by multicast. We send a completion to the client
-asynchronously when the multicast "completes" by self-delivery. This
-satisfies the "client sends transfer" guarantee, but makes the message
-available on the queue immediately, avoiding the multicast latency.
-
-It also moves most of the work to the client connection thread. The
-only work in the virtual synchrony deliver thread is sending the client
-completion.
+Other brokers enqueue the message when they recieve the message-received event.
 
-Other brokers enqueue the message when they recieve the
-message-received event, in the virtual synchrony deliver thread.
+Enqueues are queued up with other queue operations to be executed in the
+thread context associated with the queue.
 
 *** Broker sending messages: moving queue ownership
 
@@ -143,10 +132,6 @@ Periodically the owner hands over owners
 broker, providing time-shared access to the queue among all interested
 brokers.
 
-This means we no longer need identical message ordering on all brokers
-to get consistent dequeuing.  Messages from different sources can be
-ordered differently on different brokers. 
-
 We assume the same IO-driven dequeuing algorithm as the standalone
 broker with one modification: queues can be "locked". A locked queue
 is not available for dequeuing messages and will be skipped by the
@@ -164,14 +149,14 @@ a release-queue event, allowing another 
 ownership.
 
 *** Asynchronous completion of accept
-
+### HERE
 In acknowledged mode a message is not forgotten until it is accepted,
 to allow for requeue on rejection or crash. The accept should not be
 completed till the message has been forgotten.
 
 On receiving an accept the broker:
 - dequeues the message from the local queue
-- multicasts a "dequeue" event
+- multicasts an "accept" event
 - completes the accept asynchronously when the dequeue event is self 
delivered. 
 
 NOTE: The message store does not currently implement asynchronous
@@ -191,16 +176,6 @@ being resolved.
 
 #TODO: The only source of dequeue errors is probably an unrecoverable journal 
failure.
 
-When a new member (the updatee) joins a cluster it needs to be brought up to 
date.
-The old cluster design an existing member (the updater) sends a state snapshot.
-
-To ensure consistency of the snapshot both the updatee and the updater
-"stall" at the point of the update. They stop processing multicast
-events and queue them up for processing when the update is
-complete. This creates a back-log of work for each to get through,
-which leaves them lagging behind the rest of the cluster till they
-catch up (which is not guaranteed to happen in a bounded time.)
-
 ** Updating new members
 
 When a new member (the updatee) joins a cluster it needs to be brought
@@ -270,8 +245,8 @@ The cluster interface captures these eve
 
 The cluster will require some extensions to the Queue:
 - Queues can be "locked", locked queues are ignored by IO-driven output.
-- Messages carry a cluster-message-id.
-- messages can be dequeued by cluster-message-id
+- Cluster must be able to apply queue events from the cluster to a queue.
+  These appear to fit into existing queue operations.
 
 ** Maintainability
 
@@ -302,27 +277,27 @@ The only way to verify the relative perf
 to prototype & profile. The following points suggest the new design
 may scale/perform better:
 
-Moving work from virtual synchrony thread to connection threads where
-it can be done in parallel:
+Some work moved from virtual synchrony thread to connection threads:
 - All connection/session logic moves to connection thread.
 - Exchange routing logic moves to connection thread.
-- Local broker does all enqueue/dequeue work in connection thread
-- Enqueue/dequeue are IO driven as for a standalone broker.
+- On local broker dequeueing is done in connection thread
+- Local broker dequeue is IO driven as for a standalone broker.
 
-Optimizes common cases (pay for what you use):
-- Publisher/subscriber on same broker: replication is fully asynchronous, no 
extra latency.
-- Unshared queue: dequeue is all IO-driven in connection thread.
-- Time sharing: pay for time-sharing only if queue has consumers on multiple 
brokers. 
-
-#TODO Not clear how the time sharing algorithm would compare with the existing 
cluster delivery algorithm.
-
-Extra decode/encode: The old design multicasts raw client data and
-decodes it in the virtual synchrony thread. The new design would
-decode messages in the connection thread, re-encode them for
-multicast, and decode (on non-local brokers) in the virtual synchrony
-thread. There is extra work here, but only in the *connection* thread:
-on a multi-core machine this happens in parallel for every connection,
-so it probably is not a bottleneck. There may be scope to optimize
+For queues with all consumers on a single node dequeue is all
+IO-driven in connection thread. Pay for time-sharing only if queue has
+consumers on multiple brokers.
+
+Doing work for different queues in parallel scales on multi-core boxes when
+there are multiple queues.
+
+One difference works against performance, thre is an extra
+encode/decode. The old design multicasts raw client data and decodes
+it in the virtual synchrony thread. The new design would decode
+messages in the connection thread, re-encode them for multicast, and
+decode (on non-local brokers) in the virtual synchrony thread. There
+is extra work here, but only in the *connection* thread: on a
+multi-core machine this happens in parallel for every connection, so
+it probably is not a bottleneck. There may be scope to optimize
 decode/re-encode by re-using some of the original encoded data, this
 could also benefit the stand-alone broker.
 
@@ -342,20 +317,15 @@ queue replication', allowing such replic
 on a WAN say) to be initiated after the queue had already been created
 and been in use (one of the key missing features).
 
-** Optimizing the active-passive special case.
-
-In the case where all consumers of a queue are on the same broker, we
-can get better performance because we don't need to transfer ownership
-or information about acquisition. We need to optimize this case to
-perform like an active-passive mode of replication.
-
 ** Increasing Concurrency and load sharing
+
 The current cluster is bottlenecked by processing everything in the
 CPG deliver thread. By removing the need for identical operation on
 each broker, we open up the possiblility of greater concurrency.
 
-Handling multicast enqueue, acquire, accpet, release etc: concurrency per 
queue.
-Operatons on different queues can be done in different threads.
+Handling multicast enqueue, acquire, accpet, release etc: concurrency
+per queue.  Operatons on different queues can be done in different
+threads.
 
 The new design does not force each broker to do all the work in the
 CPG thread so spreading load across cluster members should give some
@@ -401,3 +371,25 @@ Clustering and scalability: new design m
 address scalability as part of cluster design. Think about
 relationship to federation and "fragmented queues" idea.
 
+* Design questions/descisions
+** Total ordering.
+Initial thinking: allow message ordering to differ between brokers.
+New thinking: use CPG total ordering, get identical ordering on all brokers.
+- Allowing variation in order introduces too much chance of unexpected 
behavior.
+- Usign total order allows other optimizations, see Message Identifiers below.
+
+** Message identifiers.
+Initial thinking: message ID = CPG node id + 64 bit sequence number.
+This involves a lot of mapping between cluster IDs and broker messsages.
+
+New thinking: message ID = queue name + queue position.
+- Removes most of the mapping and memory management for cluster code.
+- Requires total ordering of messages (see above)
+
+** Message rejection
+Initial thinking: add special reject/rejected points to cluster interface so
+rejected messages could be re-queued without multicast.
+
+New thinking: treat re-queueing after reject as entirely new message.
+- Simplifies cluster interface & implementation
+- Not on the critical path.

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/new-cluster-plan.txt
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/new-cluster-plan.txt?rev=1029670&r1=1029669&r2=1029670&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/new-cluster-plan.txt (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/new-cluster-plan.txt Mon Nov  1 
14:10:29 2010
@@ -92,18 +92,18 @@ is omitted.
 
 *** BrokerHandler
 Types:
-- struct QueuedMessage { Message msg; QueueName q; Position pos; }
-- SequenceNumber 64 bit sequence number to identify messages.
-- NodeId 64 bit CPG node-id, identifies member of the cluster.
-- struct MessageId { NodeId node; SequenceNumber seq; }
+- struct QueuedMessage { Message msg; QueueName q; SequenceNumber position; }
+- struct
 
-NOTE: Message ID's identify a QueuedMessage, i.e. a position on a queue.
+NOTE:
+- Messages on queues are identified by a queue name + a position.
+- Messages being routed are identified by a sequence number.
 
 Members:
-- atomic<SequenceNumber> sequence // sequence number for message IDs.
 - thread_local bool noReplicate // suppress replication.
 - thread_local bool isRouting // suppress operations while routing
-- QueuedMessage localMessage[SequenceNumber] // local messages being enqueued.
+- Message localMessage[SequenceNumber] // local messages being routed.
+- thread_local SequenceNumber routingSequence
 
 NOTE: localMessage is also modified by MessageHandler.
 
@@ -118,47 +118,37 @@ routing(msg)
 
 enqueue(qmsg):
   if noReplicate: return
-  if !qmsg.msg.id:
-    seq = sequence++
-    qmsg.msg.id = (self,seq)
-    localMessage[seq] = qmsg
-    mcast create(encode(qmsg.msg),seq)
-  mcast enqueue(qmsg.q,qmsg.msg.id.seq)
+  if routingSequence == 0 # thread local
+    routingSequence = nextRoutingSequence()
+    mcast create(encode(qmsg.msg),routingSeq)
+  mcast enqueue(qmsg.q,routingSeq)
 
 routed(msg):
   if noReplicate: return
-  if msg.id: mcast routed(msg.id.seq)
   isRouting = false
 
 acquire(qmsg):
   if noReplicate: return
   if isRouting: return # Ignore while we are routing a message.
-  if msg.id: mcast acquire(msg.id, q)
+  if msg.id: mcast acquire(qmsg)
 
 release(QueuedMessage) 
   if noReplicate: return
   if isRouting: return # Ignore while we are routing a message.
-  if msg.id: mcast release(id, q)
+  mcast release(qmsg)
 
 accept(QueuedMessage):
   if noReplicate: return
   if isRouting: return # Ignore while we are routing a message.
-  if msg.id: mcast dequeue(msg.id, msg.q)
+  mcast accept(qmsg)
 
 reject(QueuedMessage):
   isRejecting = true
-  if msg.id: mcast reject(msg.id, msg.q)
+  mcast reject(qmsg)
 
-rejected(QueuedMessage):
-  isRejecting = false
-  mcast dequeue(msg.id, msg.q)
-
-dequeue(QueuedMessage) 
-  # No mcast in dequeue, only used for local cleanup of resources. 
-  # E.g. messages that are replaced on an LVQ are dequeued without being
-  # accepted or rejected. dequeue is called with the queue lock held
-  # FIXME revisit - move it out of the queue lock.
-  cleanup(msg)
+# FIXME no longer needed?
+drop(QueuedMessage)
+  cleanup(qmsg)
 
 *** MessageHandler and mcast messages
 Types:
@@ -328,6 +318,21 @@ cancel(q,consumer,consumerCount) - Queue
 
 #TODO: lifecycle, updating cluster data structures when queues are destroyed
 
+*** Increasing concurrency
+The major performance limitation of the old cluster is that it does
+everything in the single CPG deliver thread context.
+
+We can get additional concurrency by creating a thread context _per queue_
+for queue operations: enqueue, acquire, accept etc.
+
+We associate a PollableQueue of queue operations with each AMQP queue.
+The CPG deliver thread would
+- build messages and associate with cluster IDs.
+- push queue ops to the appropriate PollableQueue to be dispatched the queues 
thread.
+
+Serializing operations on the same queue avoids contention, but takes advantage
+of the independence of operations on separate queues.
+
 *** Re-use of existing cluster code
 - re-use Event
 - re-use Multicaster
@@ -387,10 +392,18 @@ Target broker may not have all messages 
 Need to add callpoints & mcast messages to replicate these?
 
 ** TODO [#B] Flow control for internal queues.
-   
-Need to bound the size of the internal queues holding cluster events & frames.
-- stop polling when we reach bound.
-- start polling when we get back under it.
+
+Need to bound the size of internal queues: delivery and multicast.
+- stop polling for read on client connections when we reach a bound.
+- restart polling when we get back under it.
+
+That will stop local multicasting, we still have to deal with remote
+multicasting (note existing cluster does not do this.) Something like:
+- when over bounds multicast a flow-control event.
+- on delivery of flow-control all members stop polling to read client 
connections
+- when back under bounds send flow-control-end, all members resume
+- if flow-controling member dies others resume
+
 ** TODO [#B] Integration with transactions.
 Do we want to replicate during transaction & replicate commit/rollback
 or replicate only on commit?



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscr...@qpid.apache.org

Reply via email to