[ https://issues.apache.org/jira/browse/QPID-2921?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=12927088#action_12927088 ]
Alan Conway commented on QPID-2921: ----------------------------------- Sketch of a solution: Async completion abstracted as separte class (currently embedded in Message) /** * Class to implement asynchronous completion of AMQP commands. * * The initiating thread is the thread that initiates the action, * i.e. the connection read thread. * * A completing thread is any thread that contributes to completion, * e.g. a store thread that does an async write. * There may be more than one completing thread. */ class Completion { private: AtomicValue<uint32_t> completionsNeeded; protected: /** Called when the action is completed. *...@param initiating: true if the calling thread is the initiating thread. */ virtual void completed(bool initiating) = 0; public: /** Called in the initiating thread to increase the count of completions * expected. E.g. called when initiating an async store operation. */ void addCompleter() { ++completionsNeeded; } /** Called in initiating thread to indicate all completers have been added. * E.g. would be called after routing a message to all queues. */ void completersDone() { if (completionsNeeded == 0) doAction(true); } // Decrease completion count, e.g. called when async write complete. // called in completing thread. void complete() { if (--completionsNeeded == 0) doAction(false); } }; Replace SessionState::IncompleteMessageList with IncompleteCommandList: holds completions for all commands that can be async (transfer, accept, create/destroy/bind) Completions are associated with a command, not a message. class IncompleteCommand : Completion { void doAction(bool initiating) { if (initiating) // We're in the connection thread update_session_state_directly(); else connection.requestIOProcessing(update_session_state); } }; Remove blocking wait: sync bit commands - In IncompleteMessageList store note of sync bit commands, complete when preceeding commands do. Point of concern: the IncompleteCommandList is on the critical path, need it to be cheap to process. > c++ broker: Improvements to asynchronos completion > -------------------------------------------------- > > Key: QPID-2921 > URL: https://issues.apache.org/jira/browse/QPID-2921 > Project: Qpid > Issue Type: Improvement > Components: C++ Broker > Affects Versions: 0.8 > Reporter: Alan Conway > Assignee: Alan Conway > > ** Overview > Asynchronous completion means that command execution is initiated in one > thread > (a client connection thread) and completed in a different thread. > When the async store is loaded, message.transfer commands are > completed by a store thread that does the async write. > ** Issues with asynchronous completion code as of revision r1029686 > *** Not really asynchronous > IncompleteMessageList::process blocks the connection thread till all > outstanding async commands (messages) for the session are complete. > With the new cluster, this could deadlock since it is blocking a Poller > thread. > *** Race condition for message.transfer > > Sketch of the current code: > // Called in connection thread > PersistableMessage::enqueueAsync { ++counter; } > // Called in store thread once message is written. > PersistableMessage::enqueueComplete { if (--counter == 0) notifyCompleted(); } > The intent is that notify be called once per message, after the > message has been written to each queue it was routed to. > However of a message is routed to N queues, it's possible for > notifyCompleted to be called up to N times. The store thread could > call notifyCompleted for the first queue before the connection thread > has called enqueueAsync for the second queue, and so on. > *** No asynchronous completion for message.accept > We do not currently delay completion of message.accept until the > message is deleted from the async store. This could cause duplicate > delivery if the broker crashes after completing the message but > before it is removed from store. > There is code in PersistableMessage to maintain a counter for dequeues > analogous to to the async enqueue code but this is incorrect. > Completion of transfer is triggered when all enqueues for a message are > complete. > Completion of accept is triggered for *each* dequeue from a queue > independently. > Furthermore a single accept can reference many messages, so it can't be > associated with a message. > ** New requirements > The new cluster design will need to participate in async completion, e.g. > an accept cannot be comlpeted until the message is > - removed from store (if present) AND > - replicated to the cluster (if present) as dequeued > The new cluster also needs to asynchronously complete binding commands > (declare, bind, delete) when they are replicated to the cluster. -- This message is automatically generated by JIRA. - You can reply to this email to add a comment to the issue online. --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:dev-subscr...@qpid.apache.org