> On July 11, 2012, 7:28 p.m., Kenneth Giusti wrote: > > /trunk/qpid/cpp/src/qpid/broker/Queue.cpp, line 269 > > <https://reviews.apache.org/r/5833/diff/1/?file=120493#file120493line269> > > > > Why pass msg by copy instead of reference?
The intention was to enforce the fact that each 'queue' can only operate on a copy, and not affect the message as passed along to other 'queues'. I could also make it a const reference, which might make more sense as the queue implementation can then copy at the point it wants. (That is currently complicated by the fact that we may want to modify the message before we write it to disk, which happens before we add it to Messages, which is where the 'canonical' copy of the message on this queue will be held. However, copying is at present a relatively cheap thing unless there are very many annotations). I'll see if I can change to a const reference easily. Good comment! - Gordon ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/5833/#review9080 ----------------------------------------------------------- On July 9, 2012, 12:31 p.m., Gordon Sim wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/5833/ > ----------------------------------------------------------- > > (Updated July 9, 2012, 12:31 p.m.) > > > Review request for qpid, Alan Conway and Kenneth Giusti. > > > Description > ------- > > == Background == > > I've been looking at what would be required to get AMQP 1.0 support in > the qpidd broker (using proton-c). In that context I felt there was a > need to refactor the broker code, particularly that part that would be > shared between different protocol versions. Part of the motivation was > clearer separation of 0-10 specific logic, so that 1.0 logic could be > introduced as an alternative. However part of it was also simply the > recognition of some long-standing problems that we have never stopped > to address. > > So, here is a patch representing my ideas on what is needed. This is > already a little stale (patch was generated against r1331342) and > needs a rebase. However I think the basic ideas involved are clear > enough that it would be worth getting some early feedback. > > == Key Changes == > > qpid::broker::Message > > This is now supposed to be a protocol neutral representation of a > message. It no longer exposes qpid::framing::FrameSet. It can be based > on data received in different encodings (this patch only includes the > existing 0-10 encoding). > > The immutable, sharable state is separated from the mutable > queue-specific state. Messages themselves are no longer held through a > shared pointer but are passed by reference or copied if needed. The > immutable state (essentially the data as received) *is* still shared > and referenced internally through an intrusive pointer. There is no > longer a message level lock. A message instance is 'owned' by > someother entity (usually the queue it is on) which controls > concurrent access/modification if necessary. > > The persistence context is a separate part of the message > also. Currently that can be shared between two message instances if > desired. > > qpid::broker::Messages > > Switched from using qpid::broker::QueuedMessage (which relied on > shared pointer to message itself and made sequence number the explicit > - and only - way to refer to a specific message) to using modified > Message class directly and a new qpid::broker::QueueCursor. > > The cursor is opaque outside the Messages implementation to which it > relates. It provides a way to refer to a specific message (without > directly using sequence number, though at present that is what is used > 'under the covers') and/or to track progress through a sequence of > messages (for consumers or other iterating entities). > > I.e. its an iterator to a Message within its containing Messages > instance that is not invalidated by changes to that container. > > A Messages instance *owns* the Message instances within it. Other > classes access this through a reference or (raw) pointer, or if needed > copy it (the immutable part can be - and is - safely shared). > > The codepath for browse/consume is a lot more unified now. You use a > cursor and call Messages::next() in each case. This also lays the > foundation for selectors. > > The simplified Messages interface led to a simplied > MessageDistributor. There is still a little more to do to clarify > these separate roles (or indeed perhaps unify them?) but more on that > later. > > qpid::broker::amqp_0_10::MessageTransfer > > This represents the familiar 0-10 encoding of a message. This class is > broadly similar to the old Message class, based on a FrameSet. However > it represents the shared and essentially immutable state. The > sendHeader() method now explicitly takes a copy of the original > headers and adds to it or otherwise modifies it if needed (e.g. for > redelivered flag, ttl, annotations etc). > > [Ideally I'd like to move more of the 0-10 specific classes out of > qpid::broker and into qpid::broker::amqp_0_10, but that has no > functional relevance so I've left existing classes alone for now.] > > qpid::broker::Consumer > > The deliver() method now takes a QueueCursor (representing a 'handle' > to this message for use in subsequent operations such as accept, > relese etc) and a *constant reference* to the Message itself > (i.e. consumers can't alter the state of the message on the queue > directly, but only through operations on the queue itself). > > qpid::broker::QueueRegistry > > The actual queue creation has been pulled out into a base class, > QueueFactory. The actual class of the Queue returned can now be varied > and there are two subclasses in the current patch. The first is a > replacement for the ring policy logic, whereby messages are removed > from the queue in order to keep the queue from growing above a > configured limit. The second is for last value queues and simply pulls > the special case behaviour out of the common code path. > > The handling of queue configuration has also been made cleaner and > more uniform, based on the QueueSettings class. > > qpid::broker::QueuePolicy > > This class has been removed. There is a new QueueDepth utility used > for configuring limits, tracking current depth and testing the latter > against the former. This is used directly by Queue. The behaviour at > the limit can be varied by subclassing queue. > > == Limitations etc == > > clustering > > This breaks clustering. Indeed it will not compile unless clustering > is disabled (--without-cpg in configure). Keeping the cluster code in > sync was distracting me from the core goal, given its entanglement > with the broker code. > > My assumption is that the new ha code will eventually replace the > cluster anyway and the amount of change that would be required to get > the cluster working with this refactor may not be worth it and may in > fact undermine its stability anyway (which seem the only good argument > for using it). > > I don't believe there is anything insurmountable to do to re-enable > cluster if that was desired however. > > old & nasty features removed > > I have removed support for flow to disk, the legacy version of lvq > with two modes (the updated version of lvq is of course still > functional), the last-man-standing persistence in clustering and the > old async queue replication. They are really quite horribly > implemented and/or are no longer necessary in my view. > > == Still To Do == > > * rebase again to latest trunk > > * update cmake build (and test on windows) > > * fix xml exchange > > * c++ unit test cleanup (remove or re-enable those tests that have > been commented out) > > * test against store(s), consider how this might merge with store > interface redesign > > * rethink/clarify role of MessageDistributor v. Messages? > > * can QueueCursors be made more like std iterators? > > > Diffs > ----- > > /trunk/qpid/cpp/examples/messaging/spout.cpp 1331342 > /trunk/qpid/cpp/src/CMakeLists.txt 1331342 > /trunk/qpid/cpp/src/Makefile.am 1331342 > /trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.h 1331342 > /trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp 1331342 > /trunk/qpid/cpp/src/qpid/broker/AsyncCompletion.h 1331342 > /trunk/qpid/cpp/src/qpid/broker/Broker.h 1331342 > /trunk/qpid/cpp/src/qpid/broker/Broker.cpp 1331342 > /trunk/qpid/cpp/src/qpid/broker/Consumer.h 1331342 > /trunk/qpid/cpp/src/qpid/broker/Deliverable.h 1331342 > /trunk/qpid/cpp/src/qpid/broker/DeliverableMessage.h 1331342 > /trunk/qpid/cpp/src/qpid/broker/DeliverableMessage.cpp 1331342 > /trunk/qpid/cpp/src/qpid/broker/DeliveryAdapter.h 1331342 > /trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h 1331342 > /trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp 1331342 > /trunk/qpid/cpp/src/qpid/broker/DtxAck.h 1331342 > /trunk/qpid/cpp/src/qpid/broker/Exchange.h 1331342 > /trunk/qpid/cpp/src/qpid/broker/Exchange.cpp 1331342 > /trunk/qpid/cpp/src/qpid/broker/ExpiryPolicy.h 1331342 > /trunk/qpid/cpp/src/qpid/broker/ExpiryPolicy.cpp 1331342 > /trunk/qpid/cpp/src/qpid/broker/Fairshare.h 1331342 > /trunk/qpid/cpp/src/qpid/broker/Fairshare.cpp 1331342 > /trunk/qpid/cpp/src/qpid/broker/FifoDistributor.h 1331342 > /trunk/qpid/cpp/src/qpid/broker/FifoDistributor.cpp 1331342 > /trunk/qpid/cpp/src/qpid/broker/HeadersExchange.h 1331342 > /trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp 1331342 > /trunk/qpid/cpp/src/qpid/broker/IndexedDeque.h PRE-CREATION > /trunk/qpid/cpp/src/qpid/broker/LegacyLVQ.h 1331342 > /trunk/qpid/cpp/src/qpid/broker/LegacyLVQ.cpp 1331342 > /trunk/qpid/cpp/src/qpid/broker/LossyQueue.h PRE-CREATION > /trunk/qpid/cpp/src/qpid/broker/LossyQueue.cpp PRE-CREATION > /trunk/qpid/cpp/src/qpid/broker/Lvq.h PRE-CREATION > /trunk/qpid/cpp/src/qpid/broker/Lvq.cpp PRE-CREATION > /trunk/qpid/cpp/src/qpid/broker/MapHandler.h PRE-CREATION > /trunk/qpid/cpp/src/qpid/broker/Message.h 1331342 > /trunk/qpid/cpp/src/qpid/broker/Message.cpp 1331342 > /trunk/qpid/cpp/src/qpid/broker/MessageBuilder.h 1331342 > /trunk/qpid/cpp/src/qpid/broker/MessageBuilder.cpp 1331342 > /trunk/qpid/cpp/src/qpid/broker/MessageDeque.h 1331342 > /trunk/qpid/cpp/src/qpid/broker/MessageDeque.cpp 1331342 > /trunk/qpid/cpp/src/qpid/broker/MessageDistributor.h 1331342 > /trunk/qpid/cpp/src/qpid/broker/MessageGroupManager.h 1331342 > /trunk/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp 1331342 > /trunk/qpid/cpp/src/qpid/broker/MessageMap.h 1331342 > /trunk/qpid/cpp/src/qpid/broker/MessageMap.cpp 1331342 > /trunk/qpid/cpp/src/qpid/broker/Messages.h 1331342 > /trunk/qpid/cpp/src/qpid/broker/Persistable.h 1331342 > /trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h 1331342 > /trunk/qpid/cpp/src/qpid/broker/PersistableMessage.cpp 1331342 > /trunk/qpid/cpp/src/qpid/broker/PriorityQueue.h 1331342 > /trunk/qpid/cpp/src/qpid/broker/PriorityQueue.cpp 1331342 > /trunk/qpid/cpp/src/qpid/broker/Queue.h 1331342 > /trunk/qpid/cpp/src/qpid/broker/Queue.cpp 1331342 > /trunk/qpid/cpp/src/qpid/broker/QueueCursor.h PRE-CREATION > /trunk/qpid/cpp/src/qpid/broker/QueueCursor.cpp PRE-CREATION > /trunk/qpid/cpp/src/qpid/broker/QueueDepth.h PRE-CREATION > /trunk/qpid/cpp/src/qpid/broker/QueueDepth.cpp PRE-CREATION > /trunk/qpid/cpp/src/qpid/broker/QueueEvents.h 1331342 > /trunk/qpid/cpp/src/qpid/broker/QueueEvents.cpp 1331342 > /trunk/qpid/cpp/src/qpid/broker/QueueFactory.h PRE-CREATION > /trunk/qpid/cpp/src/qpid/broker/QueueFactory.cpp PRE-CREATION > /trunk/qpid/cpp/src/qpid/broker/QueueFlowLimit.h 1331342 > /trunk/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp 1331342 > /trunk/qpid/cpp/src/qpid/broker/QueueObserver.h 1331342 > /trunk/qpid/cpp/src/qpid/broker/QueuePolicy.h 1331342 > /trunk/qpid/cpp/src/qpid/broker/QueuePolicy.cpp 1331342 > /trunk/qpid/cpp/src/qpid/broker/QueueRegistry.h 1331342 > /trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp 1331342 > /trunk/qpid/cpp/src/qpid/broker/QueueSettings.h PRE-CREATION > /trunk/qpid/cpp/src/qpid/broker/QueueSettings.cpp PRE-CREATION > /trunk/qpid/cpp/src/qpid/broker/QueuedMessage.h 1331342 > /trunk/qpid/cpp/src/qpid/broker/RecoveredDequeue.h 1331342 > /trunk/qpid/cpp/src/qpid/broker/RecoveredDequeue.cpp 1331342 > /trunk/qpid/cpp/src/qpid/broker/RecoveredEnqueue.h 1331342 > /trunk/qpid/cpp/src/qpid/broker/RecoveredEnqueue.cpp 1331342 > /trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp 1331342 > /trunk/qpid/cpp/src/qpid/broker/SecureConnection.h 1331342 > /trunk/qpid/cpp/src/qpid/broker/SecureConnection.cpp 1331342 > /trunk/qpid/cpp/src/qpid/broker/SemanticState.h 1331342 > /trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp 1331342 > /trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp 1331342 > /trunk/qpid/cpp/src/qpid/broker/SessionHandler.h 1331342 > /trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp 1331342 > /trunk/qpid/cpp/src/qpid/broker/SessionState.h 1331342 > /trunk/qpid/cpp/src/qpid/broker/SessionState.cpp 1331342 > /trunk/qpid/cpp/src/qpid/broker/ThresholdAlerts.h 1331342 > /trunk/qpid/cpp/src/qpid/broker/ThresholdAlerts.cpp 1331342 > /trunk/qpid/cpp/src/qpid/broker/TxAccept.h 1331342 > /trunk/qpid/cpp/src/qpid/broker/TxBuffer.h 1331342 > /trunk/qpid/cpp/src/qpid/broker/TxBuffer.cpp 1331342 > /trunk/qpid/cpp/src/qpid/broker/TxOp.h 1331342 > /trunk/qpid/cpp/src/qpid/broker/TxOpVisitor.h 1331342 > /trunk/qpid/cpp/src/qpid/broker/TxPublish.h 1331342 > /trunk/qpid/cpp/src/qpid/broker/TxPublish.cpp 1331342 > /trunk/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h PRE-CREATION > /trunk/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp PRE-CREATION > /trunk/qpid/cpp/src/qpid/client/TCPConnector.h 1331342 > /trunk/qpid/cpp/src/qpid/client/TCPConnector.cpp 1331342 > /trunk/qpid/cpp/src/qpid/cluster/Cluster.h 1331342 > /trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp 1331342 > /trunk/qpid/cpp/src/qpid/cluster/Connection.cpp 1331342 > /trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.h 1331342 > /trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp 1331342 > /trunk/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h 1331342 > /trunk/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp 1331342 > /trunk/qpid/cpp/src/qpid/cluster/FailoverExchange.cpp 1331342 > /trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp 1331342 > /trunk/qpid/cpp/src/qpid/cluster/UpdateDataExchange.cpp 1331342 > /trunk/qpid/cpp/src/qpid/cluster/UpdateExchange.h 1331342 > /trunk/qpid/cpp/src/qpid/cluster/UpdateExchange.cpp 1331342 > /trunk/qpid/cpp/src/qpid/framing/ProtocolInitiation.cpp 1331342 > /trunk/qpid/cpp/src/qpid/ha/Backup.cpp 1331342 > /trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp 1331342 > /trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp 1331342 > /trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h 1331342 > /trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp 1331342 > /trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp 1331342 > /trunk/qpid/cpp/src/qpid/management/ManagementDirectExchange.cpp 1331342 > /trunk/qpid/cpp/src/qpid/management/ManagementTopicExchange.cpp 1331342 > /trunk/qpid/cpp/src/qpid/messaging/MessageImpl.h 1331342 > /trunk/qpid/cpp/src/qpid/messaging/MessageImpl.cpp 1331342 > /trunk/qpid/cpp/src/qpid/messaging/ReceiverImpl.h 1331342 > /trunk/qpid/cpp/src/qpid/messaging/SenderImpl.h 1331342 > /trunk/qpid/cpp/src/qpid/replication/ReplicatingEventListener.h 1331342 > /trunk/qpid/cpp/src/qpid/replication/ReplicatingEventListener.cpp 1331342 > /trunk/qpid/cpp/src/qpid/replication/ReplicationExchange.h 1331342 > /trunk/qpid/cpp/src/qpid/replication/ReplicationExchange.cpp 1331342 > /trunk/qpid/cpp/src/qpid/replication/constants.h 1331342 > /trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp 1331342 > /trunk/qpid/cpp/src/qpid/sys/Codec.h 1331342 > /trunk/qpid/cpp/src/qpid/sys/cyrus/CyrusSecurityLayer.h 1331342 > /trunk/qpid/cpp/src/qpid/sys/cyrus/CyrusSecurityLayer.cpp 1331342 > /trunk/qpid/cpp/src/qpid/xml/XmlExchange.cpp 1331342 > /trunk/qpid/cpp/src/replication.mk 1331342 > /trunk/qpid/cpp/src/tests/ClientSessionTest.cpp 1331342 > /trunk/qpid/cpp/src/tests/DeliveryRecordTest.cpp 1331342 > /trunk/qpid/cpp/src/tests/ExchangeTest.cpp 1331342 > /trunk/qpid/cpp/src/tests/Makefile.am 1331342 > /trunk/qpid/cpp/src/tests/MessageBuilderTest.cpp 1331342 > /trunk/qpid/cpp/src/tests/MessageTest.cpp 1331342 > /trunk/qpid/cpp/src/tests/QueueDepth.cpp PRE-CREATION > /trunk/qpid/cpp/src/tests/QueueEvents.cpp 1331342 > /trunk/qpid/cpp/src/tests/QueueFlowLimitTest.cpp 1331342 > /trunk/qpid/cpp/src/tests/QueuePolicyTest.cpp 1331342 > /trunk/qpid/cpp/src/tests/QueueRegistryTest.cpp 1331342 > /trunk/qpid/cpp/src/tests/QueueTest.cpp 1331342 > /trunk/qpid/cpp/src/tests/ReplicationTest.cpp 1331342 > /trunk/qpid/cpp/src/tests/TxMocks.h 1331342 > /trunk/qpid/cpp/src/tests/TxPublishTest.cpp 1331342 > /trunk/qpid/cpp/src/tests/ha_tests.py 1331342 > /trunk/qpid/cpp/src/tests/test_store.cpp 1331342 > /trunk/qpid/specs/management-schema.xml 1331342 > /trunk/qpid/tests/src/py/qpid_tests/broker_0_10/alternate_exchange.py > 1331342 > /trunk/qpid/tests/src/py/qpid_tests/broker_0_10/management.py 1331342 > /trunk/qpid/tests/src/py/qpid_tests/broker_0_10/msg_groups.py 1331342 > /trunk/qpid/tests/src/py/qpid_tests/broker_0_10/new_api.py 1331342 > /trunk/qpid/tests/src/py/qpid_tests/broker_0_10/priority.py 1331342 > /trunk/qpid/tests/src/py/qpid_tests/broker_0_10/threshold.py 1331342 > > Diff: https://reviews.apache.org/r/5833/diff/ > > > Testing > ------- > > With clustering disabled, make check passes for me. However I have just > realised I've not got the xml exchange enabled > either - though that was inadvertant - and there is some work needed there > (similar to what was done in the HeadersExchange). > > > Thanks, > > Gordon Sim > >