> On July 9, 2012, 6:08 p.m., Andrew Stitcher wrote: > > I haven't looked at this change in detail, what I've looked at seems good. > > > > One comment though, I'd strongly recommend breaking this change up into > > smaller changesets - This will make it much easier to review I think. > > > > For example I can see that one (seemingly) separate change here is changing > > the interface to Codec::encode. This change looks (to me) to be self > > evidently what the interface should be and so should be applied separately > > - I'd say it's simple enough just to make that change without further > > review. There may well be other similar changes here. > > > > Gordon Sim wrote: > Yes, the Codec change should not be in this patch. The cluster changes > are also irrelevant. I'll have a scan through and get rid of these and others > for the next patch. However I think the reality is that this will be a very > large patch, simply due to the ripple effect of the central change to the way > messages are encapsulated. (E.g. try deleting QueuedMessage and editing > Message to not be RefCounted and see the impact of simply getting that to > compile, let alone run and pass all the tests!)
It certainly will be large, so taking extraneous changes from consideration will make it easier to review that's all. - Andrew ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/5833/#review8977 ----------------------------------------------------------- On July 9, 2012, 12:31 p.m., Gordon Sim wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/5833/ > ----------------------------------------------------------- > > (Updated July 9, 2012, 12:31 p.m.) > > > Review request for qpid, Alan Conway and Kenneth Giusti. > > > Description > ------- > > == Background == > > I've been looking at what would be required to get AMQP 1.0 support in > the qpidd broker (using proton-c). In that context I felt there was a > need to refactor the broker code, particularly that part that would be > shared between different protocol versions. Part of the motivation was > clearer separation of 0-10 specific logic, so that 1.0 logic could be > introduced as an alternative. However part of it was also simply the > recognition of some long-standing problems that we have never stopped > to address. > > So, here is a patch representing my ideas on what is needed. This is > already a little stale (patch was generated against r1331342) and > needs a rebase. However I think the basic ideas involved are clear > enough that it would be worth getting some early feedback. > > == Key Changes == > > qpid::broker::Message > > This is now supposed to be a protocol neutral representation of a > message. It no longer exposes qpid::framing::FrameSet. It can be based > on data received in different encodings (this patch only includes the > existing 0-10 encoding). > > The immutable, sharable state is separated from the mutable > queue-specific state. Messages themselves are no longer held through a > shared pointer but are passed by reference or copied if needed. The > immutable state (essentially the data as received) *is* still shared > and referenced internally through an intrusive pointer. There is no > longer a message level lock. A message instance is 'owned' by > someother entity (usually the queue it is on) which controls > concurrent access/modification if necessary. > > The persistence context is a separate part of the message > also. Currently that can be shared between two message instances if > desired. > > qpid::broker::Messages > > Switched from using qpid::broker::QueuedMessage (which relied on > shared pointer to message itself and made sequence number the explicit > - and only - way to refer to a specific message) to using modified > Message class directly and a new qpid::broker::QueueCursor. > > The cursor is opaque outside the Messages implementation to which it > relates. It provides a way to refer to a specific message (without > directly using sequence number, though at present that is what is used > 'under the covers') and/or to track progress through a sequence of > messages (for consumers or other iterating entities). > > I.e. its an iterator to a Message within its containing Messages > instance that is not invalidated by changes to that container. > > A Messages instance *owns* the Message instances within it. Other > classes access this through a reference or (raw) pointer, or if needed > copy it (the immutable part can be - and is - safely shared). > > The codepath for browse/consume is a lot more unified now. You use a > cursor and call Messages::next() in each case. This also lays the > foundation for selectors. > > The simplified Messages interface led to a simplied > MessageDistributor. There is still a little more to do to clarify > these separate roles (or indeed perhaps unify them?) but more on that > later. > > qpid::broker::amqp_0_10::MessageTransfer > > This represents the familiar 0-10 encoding of a message. This class is > broadly similar to the old Message class, based on a FrameSet. However > it represents the shared and essentially immutable state. The > sendHeader() method now explicitly takes a copy of the original > headers and adds to it or otherwise modifies it if needed (e.g. for > redelivered flag, ttl, annotations etc). > > [Ideally I'd like to move more of the 0-10 specific classes out of > qpid::broker and into qpid::broker::amqp_0_10, but that has no > functional relevance so I've left existing classes alone for now.] > > qpid::broker::Consumer > > The deliver() method now takes a QueueCursor (representing a 'handle' > to this message for use in subsequent operations such as accept, > relese etc) and a *constant reference* to the Message itself > (i.e. consumers can't alter the state of the message on the queue > directly, but only through operations on the queue itself). > > qpid::broker::QueueRegistry > > The actual queue creation has been pulled out into a base class, > QueueFactory. The actual class of the Queue returned can now be varied > and there are two subclasses in the current patch. The first is a > replacement for the ring policy logic, whereby messages are removed > from the queue in order to keep the queue from growing above a > configured limit. The second is for last value queues and simply pulls > the special case behaviour out of the common code path. > > The handling of queue configuration has also been made cleaner and > more uniform, based on the QueueSettings class. > > qpid::broker::QueuePolicy > > This class has been removed. There is a new QueueDepth utility used > for configuring limits, tracking current depth and testing the latter > against the former. This is used directly by Queue. The behaviour at > the limit can be varied by subclassing queue. > > == Limitations etc == > > clustering > > This breaks clustering. Indeed it will not compile unless clustering > is disabled (--without-cpg in configure). Keeping the cluster code in > sync was distracting me from the core goal, given its entanglement > with the broker code. > > My assumption is that the new ha code will eventually replace the > cluster anyway and the amount of change that would be required to get > the cluster working with this refactor may not be worth it and may in > fact undermine its stability anyway (which seem the only good argument > for using it). > > I don't believe there is anything insurmountable to do to re-enable > cluster if that was desired however. > > old & nasty features removed > > I have removed support for flow to disk, the legacy version of lvq > with two modes (the updated version of lvq is of course still > functional), the last-man-standing persistence in clustering and the > old async queue replication. They are really quite horribly > implemented and/or are no longer necessary in my view. > > == Still To Do == > > * rebase again to latest trunk > > * update cmake build (and test on windows) > > * fix xml exchange > > * c++ unit test cleanup (remove or re-enable those tests that have > been commented out) > > * test against store(s), consider how this might merge with store > interface redesign > > * rethink/clarify role of MessageDistributor v. Messages? > > * can QueueCursors be made more like std iterators? > > > Diffs > ----- > > /trunk/qpid/cpp/examples/messaging/spout.cpp 1331342 > /trunk/qpid/cpp/src/CMakeLists.txt 1331342 > /trunk/qpid/cpp/src/Makefile.am 1331342 > /trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.h 1331342 > /trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp 1331342 > /trunk/qpid/cpp/src/qpid/broker/AsyncCompletion.h 1331342 > /trunk/qpid/cpp/src/qpid/broker/Broker.h 1331342 > /trunk/qpid/cpp/src/qpid/broker/Broker.cpp 1331342 > /trunk/qpid/cpp/src/qpid/broker/Consumer.h 1331342 > /trunk/qpid/cpp/src/qpid/broker/Deliverable.h 1331342 > /trunk/qpid/cpp/src/qpid/broker/DeliverableMessage.h 1331342 > /trunk/qpid/cpp/src/qpid/broker/DeliverableMessage.cpp 1331342 > /trunk/qpid/cpp/src/qpid/broker/DeliveryAdapter.h 1331342 > /trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h 1331342 > /trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp 1331342 > /trunk/qpid/cpp/src/qpid/broker/DtxAck.h 1331342 > /trunk/qpid/cpp/src/qpid/broker/Exchange.h 1331342 > /trunk/qpid/cpp/src/qpid/broker/Exchange.cpp 1331342 > /trunk/qpid/cpp/src/qpid/broker/ExpiryPolicy.h 1331342 > /trunk/qpid/cpp/src/qpid/broker/ExpiryPolicy.cpp 1331342 > /trunk/qpid/cpp/src/qpid/broker/Fairshare.h 1331342 > /trunk/qpid/cpp/src/qpid/broker/Fairshare.cpp 1331342 > /trunk/qpid/cpp/src/qpid/broker/FifoDistributor.h 1331342 > /trunk/qpid/cpp/src/qpid/broker/FifoDistributor.cpp 1331342 > /trunk/qpid/cpp/src/qpid/broker/HeadersExchange.h 1331342 > /trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp 1331342 > /trunk/qpid/cpp/src/qpid/broker/IndexedDeque.h PRE-CREATION > /trunk/qpid/cpp/src/qpid/broker/LegacyLVQ.h 1331342 > /trunk/qpid/cpp/src/qpid/broker/LegacyLVQ.cpp 1331342 > /trunk/qpid/cpp/src/qpid/broker/LossyQueue.h PRE-CREATION > /trunk/qpid/cpp/src/qpid/broker/LossyQueue.cpp PRE-CREATION > /trunk/qpid/cpp/src/qpid/broker/Lvq.h PRE-CREATION > /trunk/qpid/cpp/src/qpid/broker/Lvq.cpp PRE-CREATION > /trunk/qpid/cpp/src/qpid/broker/MapHandler.h PRE-CREATION > /trunk/qpid/cpp/src/qpid/broker/Message.h 1331342 > /trunk/qpid/cpp/src/qpid/broker/Message.cpp 1331342 > /trunk/qpid/cpp/src/qpid/broker/MessageBuilder.h 1331342 > /trunk/qpid/cpp/src/qpid/broker/MessageBuilder.cpp 1331342 > /trunk/qpid/cpp/src/qpid/broker/MessageDeque.h 1331342 > /trunk/qpid/cpp/src/qpid/broker/MessageDeque.cpp 1331342 > /trunk/qpid/cpp/src/qpid/broker/MessageDistributor.h 1331342 > /trunk/qpid/cpp/src/qpid/broker/MessageGroupManager.h 1331342 > /trunk/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp 1331342 > /trunk/qpid/cpp/src/qpid/broker/MessageMap.h 1331342 > /trunk/qpid/cpp/src/qpid/broker/MessageMap.cpp 1331342 > /trunk/qpid/cpp/src/qpid/broker/Messages.h 1331342 > /trunk/qpid/cpp/src/qpid/broker/Persistable.h 1331342 > /trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h 1331342 > /trunk/qpid/cpp/src/qpid/broker/PersistableMessage.cpp 1331342 > /trunk/qpid/cpp/src/qpid/broker/PriorityQueue.h 1331342 > /trunk/qpid/cpp/src/qpid/broker/PriorityQueue.cpp 1331342 > /trunk/qpid/cpp/src/qpid/broker/Queue.h 1331342 > /trunk/qpid/cpp/src/qpid/broker/Queue.cpp 1331342 > /trunk/qpid/cpp/src/qpid/broker/QueueCursor.h PRE-CREATION > /trunk/qpid/cpp/src/qpid/broker/QueueCursor.cpp PRE-CREATION > /trunk/qpid/cpp/src/qpid/broker/QueueDepth.h PRE-CREATION > /trunk/qpid/cpp/src/qpid/broker/QueueDepth.cpp PRE-CREATION > /trunk/qpid/cpp/src/qpid/broker/QueueEvents.h 1331342 > /trunk/qpid/cpp/src/qpid/broker/QueueEvents.cpp 1331342 > /trunk/qpid/cpp/src/qpid/broker/QueueFactory.h PRE-CREATION > /trunk/qpid/cpp/src/qpid/broker/QueueFactory.cpp PRE-CREATION > /trunk/qpid/cpp/src/qpid/broker/QueueFlowLimit.h 1331342 > /trunk/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp 1331342 > /trunk/qpid/cpp/src/qpid/broker/QueueObserver.h 1331342 > /trunk/qpid/cpp/src/qpid/broker/QueuePolicy.h 1331342 > /trunk/qpid/cpp/src/qpid/broker/QueuePolicy.cpp 1331342 > /trunk/qpid/cpp/src/qpid/broker/QueueRegistry.h 1331342 > /trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp 1331342 > /trunk/qpid/cpp/src/qpid/broker/QueueSettings.h PRE-CREATION > /trunk/qpid/cpp/src/qpid/broker/QueueSettings.cpp PRE-CREATION > /trunk/qpid/cpp/src/qpid/broker/QueuedMessage.h 1331342 > /trunk/qpid/cpp/src/qpid/broker/RecoveredDequeue.h 1331342 > /trunk/qpid/cpp/src/qpid/broker/RecoveredDequeue.cpp 1331342 > /trunk/qpid/cpp/src/qpid/broker/RecoveredEnqueue.h 1331342 > /trunk/qpid/cpp/src/qpid/broker/RecoveredEnqueue.cpp 1331342 > /trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp 1331342 > /trunk/qpid/cpp/src/qpid/broker/SecureConnection.h 1331342 > /trunk/qpid/cpp/src/qpid/broker/SecureConnection.cpp 1331342 > /trunk/qpid/cpp/src/qpid/broker/SemanticState.h 1331342 > /trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp 1331342 > /trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp 1331342 > /trunk/qpid/cpp/src/qpid/broker/SessionHandler.h 1331342 > /trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp 1331342 > /trunk/qpid/cpp/src/qpid/broker/SessionState.h 1331342 > /trunk/qpid/cpp/src/qpid/broker/SessionState.cpp 1331342 > /trunk/qpid/cpp/src/qpid/broker/ThresholdAlerts.h 1331342 > /trunk/qpid/cpp/src/qpid/broker/ThresholdAlerts.cpp 1331342 > /trunk/qpid/cpp/src/qpid/broker/TxAccept.h 1331342 > /trunk/qpid/cpp/src/qpid/broker/TxBuffer.h 1331342 > /trunk/qpid/cpp/src/qpid/broker/TxBuffer.cpp 1331342 > /trunk/qpid/cpp/src/qpid/broker/TxOp.h 1331342 > /trunk/qpid/cpp/src/qpid/broker/TxOpVisitor.h 1331342 > /trunk/qpid/cpp/src/qpid/broker/TxPublish.h 1331342 > /trunk/qpid/cpp/src/qpid/broker/TxPublish.cpp 1331342 > /trunk/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h PRE-CREATION > /trunk/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp PRE-CREATION > /trunk/qpid/cpp/src/qpid/client/TCPConnector.h 1331342 > /trunk/qpid/cpp/src/qpid/client/TCPConnector.cpp 1331342 > /trunk/qpid/cpp/src/qpid/cluster/Cluster.h 1331342 > /trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp 1331342 > /trunk/qpid/cpp/src/qpid/cluster/Connection.cpp 1331342 > /trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.h 1331342 > /trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp 1331342 > /trunk/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h 1331342 > /trunk/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp 1331342 > /trunk/qpid/cpp/src/qpid/cluster/FailoverExchange.cpp 1331342 > /trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp 1331342 > /trunk/qpid/cpp/src/qpid/cluster/UpdateDataExchange.cpp 1331342 > /trunk/qpid/cpp/src/qpid/cluster/UpdateExchange.h 1331342 > /trunk/qpid/cpp/src/qpid/cluster/UpdateExchange.cpp 1331342 > /trunk/qpid/cpp/src/qpid/framing/ProtocolInitiation.cpp 1331342 > /trunk/qpid/cpp/src/qpid/ha/Backup.cpp 1331342 > /trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp 1331342 > /trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp 1331342 > /trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h 1331342 > /trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp 1331342 > /trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp 1331342 > /trunk/qpid/cpp/src/qpid/management/ManagementDirectExchange.cpp 1331342 > /trunk/qpid/cpp/src/qpid/management/ManagementTopicExchange.cpp 1331342 > /trunk/qpid/cpp/src/qpid/messaging/MessageImpl.h 1331342 > /trunk/qpid/cpp/src/qpid/messaging/MessageImpl.cpp 1331342 > /trunk/qpid/cpp/src/qpid/messaging/ReceiverImpl.h 1331342 > /trunk/qpid/cpp/src/qpid/messaging/SenderImpl.h 1331342 > /trunk/qpid/cpp/src/qpid/replication/ReplicatingEventListener.h 1331342 > /trunk/qpid/cpp/src/qpid/replication/ReplicatingEventListener.cpp 1331342 > /trunk/qpid/cpp/src/qpid/replication/ReplicationExchange.h 1331342 > /trunk/qpid/cpp/src/qpid/replication/ReplicationExchange.cpp 1331342 > /trunk/qpid/cpp/src/qpid/replication/constants.h 1331342 > /trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp 1331342 > /trunk/qpid/cpp/src/qpid/sys/Codec.h 1331342 > /trunk/qpid/cpp/src/qpid/sys/cyrus/CyrusSecurityLayer.h 1331342 > /trunk/qpid/cpp/src/qpid/sys/cyrus/CyrusSecurityLayer.cpp 1331342 > /trunk/qpid/cpp/src/qpid/xml/XmlExchange.cpp 1331342 > /trunk/qpid/cpp/src/replication.mk 1331342 > /trunk/qpid/cpp/src/tests/ClientSessionTest.cpp 1331342 > /trunk/qpid/cpp/src/tests/DeliveryRecordTest.cpp 1331342 > /trunk/qpid/cpp/src/tests/ExchangeTest.cpp 1331342 > /trunk/qpid/cpp/src/tests/Makefile.am 1331342 > /trunk/qpid/cpp/src/tests/MessageBuilderTest.cpp 1331342 > /trunk/qpid/cpp/src/tests/MessageTest.cpp 1331342 > /trunk/qpid/cpp/src/tests/QueueDepth.cpp PRE-CREATION > /trunk/qpid/cpp/src/tests/QueueEvents.cpp 1331342 > /trunk/qpid/cpp/src/tests/QueueFlowLimitTest.cpp 1331342 > /trunk/qpid/cpp/src/tests/QueuePolicyTest.cpp 1331342 > /trunk/qpid/cpp/src/tests/QueueRegistryTest.cpp 1331342 > /trunk/qpid/cpp/src/tests/QueueTest.cpp 1331342 > /trunk/qpid/cpp/src/tests/ReplicationTest.cpp 1331342 > /trunk/qpid/cpp/src/tests/TxMocks.h 1331342 > /trunk/qpid/cpp/src/tests/TxPublishTest.cpp 1331342 > /trunk/qpid/cpp/src/tests/ha_tests.py 1331342 > /trunk/qpid/cpp/src/tests/test_store.cpp 1331342 > /trunk/qpid/specs/management-schema.xml 1331342 > /trunk/qpid/tests/src/py/qpid_tests/broker_0_10/alternate_exchange.py > 1331342 > /trunk/qpid/tests/src/py/qpid_tests/broker_0_10/management.py 1331342 > /trunk/qpid/tests/src/py/qpid_tests/broker_0_10/msg_groups.py 1331342 > /trunk/qpid/tests/src/py/qpid_tests/broker_0_10/new_api.py 1331342 > /trunk/qpid/tests/src/py/qpid_tests/broker_0_10/priority.py 1331342 > /trunk/qpid/tests/src/py/qpid_tests/broker_0_10/threshold.py 1331342 > > Diff: https://reviews.apache.org/r/5833/diff/ > > > Testing > ------- > > With clustering disabled, make check passes for me. However I have just > realised I've not got the xml exchange enabled > either - though that was inadvertant - and there is some work needed there > (similar to what was done in the HeadersExchange). > > > Thanks, > > Gordon Sim > >