Author: kgiusti Date: Tue Sep 27 19:20:49 2011 New Revision: 1176538 URL: http://svn.apache.org/viewvc?rev=1176538&view=rev Log: QPID-3346: incorporate more review feedback
Added: qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/FifoAllocator.cpp qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/FifoAllocator.h Removed: qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageAllocator.cpp Modified: qpid/branches/qpid-3346/qpid/cpp/src/CMakeLists.txt qpid/branches/qpid-3346/qpid/cpp/src/Makefile.am qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageAllocator.h qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageGroupManager.h qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/Queue.cpp Modified: qpid/branches/qpid-3346/qpid/cpp/src/CMakeLists.txt URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/CMakeLists.txt?rev=1176538&r1=1176537&r2=1176538&view=diff ============================================================================== --- qpid/branches/qpid-3346/qpid/cpp/src/CMakeLists.txt (original) +++ qpid/branches/qpid-3346/qpid/cpp/src/CMakeLists.txt Tue Sep 27 19:20:49 2011 @@ -975,7 +975,7 @@ set (qpidbroker_SOURCES qpid/broker/Queue.cpp qpid/broker/QueueCleaner.cpp qpid/broker/QueueListeners.cpp - qpid/broker/MessageAllocator.cpp + qpid/broker/FifoAllocator.cpp qpid/broker/MessageGroupManager.cpp qpid/broker/PersistableMessage.cpp qpid/broker/Bridge.cpp Modified: qpid/branches/qpid-3346/qpid/cpp/src/Makefile.am URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/Makefile.am?rev=1176538&r1=1176537&r2=1176538&view=diff ============================================================================== --- qpid/branches/qpid-3346/qpid/cpp/src/Makefile.am (original) +++ qpid/branches/qpid-3346/qpid/cpp/src/Makefile.am Tue Sep 27 19:20:49 2011 @@ -672,7 +672,8 @@ libqpidbroker_la_SOURCES = \ qpid/broker/Vhost.cpp \ qpid/broker/Vhost.h \ qpid/broker/MessageAllocator.h \ - qpid/broker/MessageAllocator.cpp \ + qpid/broker/FifoAllocator.h \ + qpid/broker/FifoAllocator.cpp \ qpid/broker/MessageGroupManager.cpp \ qpid/broker/MessageGroupManager.h \ qpid/management/ManagementAgent.cpp \ Added: qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/FifoAllocator.cpp URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/FifoAllocator.cpp?rev=1176538&view=auto ============================================================================== --- qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/FifoAllocator.cpp (added) +++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/FifoAllocator.cpp Tue Sep 27 19:20:49 2011 @@ -0,0 +1,58 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + + +#include "qpid/broker/Queue.h" +#include "qpid/broker/FifoAllocator.h" + +using namespace qpid::broker; + +FifoAllocator::FifoAllocator(Messages& container) + : messages(container) {} + +bool FifoAllocator::nextConsumableMessage( Consumer::shared_ptr&, QueuedMessage& next ) +{ + if (!messages.empty()) { + next = messages.front(); // by default, consume oldest msg + return true; + } + return false; +} + +bool FifoAllocator::allocate(const std::string&, const QueuedMessage& ) +{ + // by default, all messages present on the queue may be allocated as they have yet to + // be acquired. + return true; +} + +bool FifoAllocator::nextBrowsableMessage( Consumer::shared_ptr& c, QueuedMessage& next ) +{ + if (!messages.empty() && messages.next(c->position, next)) + return true; + return false; +} + +void FifoAllocator::query(qpid::types::Variant::Map&) const +{ + // nothing to see here.... +} + Added: qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/FifoAllocator.h URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/FifoAllocator.h?rev=1176538&view=auto ============================================================================== --- qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/FifoAllocator.h (added) +++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/FifoAllocator.h Tue Sep 27 19:20:49 2011 @@ -0,0 +1,58 @@ +#ifndef _broker_FifoAllocator_h +#define _broker_FifoAllocator_h + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +/** Simple MessageAllocator for FIFO Queues - the HEAD message is always the next + * available message for consumption. + */ + +#include "qpid/broker/MessageAllocator.h" + +namespace qpid { +namespace broker { + +class Messages; + +class FifoAllocator : public MessageAllocator +{ + public: + FifoAllocator(Messages& container); + + /** Locking Note: all methods assume the caller is holding the Queue::messageLock + * during the method call. + */ + + /** MessageAllocator interface */ + + bool nextConsumableMessage( Consumer::shared_ptr& consumer, QueuedMessage& next ); + bool allocate(const std::string& consumer, const QueuedMessage& target); + bool nextBrowsableMessage( Consumer::shared_ptr& consumer, QueuedMessage& next ); + void query(qpid::types::Variant::Map&) const; + + private: + Messages& messages; +}; + +}} + +#endif Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageAllocator.h URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageAllocator.h?rev=1176538&r1=1176537&r2=1176538&view=diff ============================================================================== --- qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageAllocator.h (original) +++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageAllocator.h Tue Sep 27 19:20:49 2011 @@ -22,7 +22,9 @@ * */ -/* Used by queues to allocate the next "most desirable" message to a consuming client */ +/** Abstraction used by Queue to determine the next "most desirable" message to provide to + * a particular consuming client + */ #include "qpid/broker/Consumer.h" @@ -30,48 +32,43 @@ namespace qpid { namespace broker { -class Queue; struct QueuedMessage; class MessageAllocator { - protected: - Queue *queue; public: - MessageAllocator( Queue *q ) : queue(q) {} virtual ~MessageAllocator() {}; - // Note: all methods taking a mutex assume the caller is holding the - // Queue::messageLock during the method call. + /** Locking Note: all methods assume the caller is holding the Queue::messageLock + * during the method call. + */ /** Determine the next message available for consumption by the consumer - * @param next set to the next message that the consumer may acquire. - * @return true if message is available + * @param consumer the consumer that needs a message to consume + * @param next set to the next message that the consumer may consume. + * @return true if message is available and next is set */ virtual bool nextConsumableMessage( Consumer::shared_ptr& consumer, - QueuedMessage& next, - const sys::Mutex::ScopedLock& lock); + QueuedMessage& next ) = 0; + + /** Allow the comsumer to take ownership of the given message. + * @param consumer the name of the consumer that is attempting to acquire the message + * @param qm the message to be acquired, previously returned from nextConsumableMessage() + * @return true if ownership is permitted, false if ownership cannot be assigned. + */ + virtual bool allocate( const std::string& consumer, + const QueuedMessage& target) = 0; /** Determine the next message available for browsing by the consumer + * @param consumer the consumer that is browsing the queue * @param next set to the next message that the consumer may browse. - * @return true if a message is available + * @return true if a message is available and next is returned */ virtual bool nextBrowsableMessage( Consumer::shared_ptr& consumer, - QueuedMessage& next, - const sys::Mutex::ScopedLock& lock); - - /** check if a message previously returned via next*Message() may be acquired. - * @param consumer name of consumer that is attempting to acquire the message - * @param qm the message to be acquired - * @param messageLock - ensures caller is holding it! - * @return true if acquire is permitted, false if acquire is no longer permitted. - */ - virtual bool acquirable( const std::string&, - const QueuedMessage&, - const sys::Mutex::ScopedLock&); + QueuedMessage& next ) = 0; /** hook to add any interesting management state to the status map */ - virtual void query(qpid::types::Variant::Map&, const sys::Mutex::ScopedLock&) const; + virtual void query(qpid::types::Variant::Map&) const = 0; }; }} Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp?rev=1176538&r1=1176537&r2=1176538&view=diff ============================================================================== --- qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp (original) +++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp Tue Sep 27 19:20:49 2011 @@ -61,7 +61,7 @@ void MessageGroupManager::enqueued( cons GroupState &state(messageGroups[group]); state.members.push_back(qm.position); uint32_t total = state.members.size(); - QPID_LOG( trace, "group queue " << queue->getName() << + QPID_LOG( trace, "group queue " << qName << ": added message to group id=" << group << " total=" << total ); if (total == 1) { // newly created group, no owner @@ -81,7 +81,7 @@ void MessageGroupManager::acquired( cons assert( gs != messageGroups.end() ); GroupState& state( gs->second ); state.acquired += 1; - QPID_LOG( trace, "group queue " << queue->getName() << + QPID_LOG( trace, "group queue " << qName << ": acquired message in group id=" << group << " acquired=" << state.acquired ); } @@ -99,11 +99,11 @@ void MessageGroupManager::requeued( cons assert( state.acquired != 0 ); state.acquired -= 1; if (state.acquired == 0 && state.owned()) { - QPID_LOG( trace, "group queue " << queue->getName() << + QPID_LOG( trace, "group queue " << qName << ": consumer name=" << state.owner << " released group id=" << gs->first); disown(state); } - QPID_LOG( trace, "group queue " << queue->getName() << + QPID_LOG( trace, "group queue " << qName << ": requeued message to group id=" << group << " acquired=" << state.acquired ); } @@ -138,16 +138,16 @@ void MessageGroupManager::dequeued( cons if (!state.owned()) { // unlikely, but need to remove from the free list before erase unFree( state ); } - QPID_LOG( trace, "group queue " << queue->getName() << ": deleting group id=" << gs->first); + QPID_LOG( trace, "group queue " << qName << ": deleting group id=" << gs->first); messageGroups.erase( gs ); } else { if (state.acquired == 0 && state.owned()) { - QPID_LOG( trace, "group queue " << queue->getName() << + QPID_LOG( trace, "group queue " << qName << ": consumer name=" << state.owner << " released group id=" << gs->first); disown(state); } } - QPID_LOG( trace, "group queue " << queue->getName() << + QPID_LOG( trace, "group queue " << qName << ": dequeued message from group id=" << group << " total=" << total ); } @@ -155,7 +155,7 @@ void MessageGroupManager::consumerAdded( { assert(consumers.find(c.getName()) == consumers.end()); consumers[c.getName()] = 0; // no groups owned yet - QPID_LOG( trace, "group queue " << queue->getName() << ": added consumer, name=" << c.getName() ); + QPID_LOG( trace, "group queue " << qName << ": added consumer, name=" << c.getName() ); } void MessageGroupManager::consumerRemoved( const Consumer& c ) @@ -172,20 +172,17 @@ void MessageGroupManager::consumerRemove if (state.owner == name) { --count; disown(state); - QPID_LOG( trace, "group queue " << queue->getName() << + QPID_LOG( trace, "group queue " << qName << ": consumer name=" << name << " released group id=" << gs->first); } } consumers.erase( consumer ); - QPID_LOG( trace, "group queue " << queue->getName() << ": removed consumer name=" << name ); + QPID_LOG( trace, "group queue " << qName << ": removed consumer name=" << name ); } -bool MessageGroupManager::nextConsumableMessage( Consumer::shared_ptr& c, QueuedMessage& next, - const qpid::sys::Mutex::ScopedLock& ) +bool MessageGroupManager::nextConsumableMessage( Consumer::shared_ptr& c, QueuedMessage& next ) { - Messages& messages(queue->getMessages()); - if (messages.empty()) return false; @@ -220,8 +217,7 @@ bool MessageGroupManager::nextConsumable } -bool MessageGroupManager::acquirable(const std::string& consumer, const QueuedMessage& qm, - const qpid::sys::Mutex::ScopedLock&) +bool MessageGroupManager::allocate(const std::string& consumer, const QueuedMessage& qm) { // @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage std::string group( getGroupId(qm) ); @@ -231,16 +227,22 @@ bool MessageGroupManager::acquirable(con if (!state.owned()) { own( state, consumer ); - QPID_LOG( trace, "group queue " << queue->getName() << + QPID_LOG( trace, "group queue " << qName << ": consumer name=" << consumer << " has acquired group id=" << gs->first); return true; } return state.owner == consumer; } +bool MessageGroupManager::nextBrowsableMessage( Consumer::shared_ptr& c, QueuedMessage& next ) +{ + // browse: allow access to any aquired msg, regardless of group ownership (?ok?) + if (!messages.empty() && messages.next(c->position, next)) + return true; + return false; +} -void MessageGroupManager::query(qpid::types::Variant::Map& status, - const qpid::sys::Mutex::ScopedLock&) const +void MessageGroupManager::query(qpid::types::Variant::Map& status) const { /** Add a description of the current state of the message groups for this queue. FORMAT: @@ -276,7 +278,8 @@ void MessageGroupManager::query(qpid::ty } -boost::shared_ptr<MessageGroupManager> MessageGroupManager::create( Queue *q, +boost::shared_ptr<MessageGroupManager> MessageGroupManager::create( const std::string& qName, + Messages& messages, const qpid::framing::FieldTable& settings ) { boost::shared_ptr<MessageGroupManager> empty; @@ -285,16 +288,14 @@ boost::shared_ptr<MessageGroupManager> M std::string headerKey = settings.getAsString(qpidMessageGroupKey); if (headerKey.empty()) { - QPID_LOG( error, "A Message Group header key must be configured, queue=" << q->getName()); + QPID_LOG( error, "A Message Group header key must be configured, queue=" << qName); return empty; } unsigned int timestamp = settings.getAsInt(qpidMessageGroupTimestamp); - boost::shared_ptr<MessageGroupManager> manager( new MessageGroupManager( headerKey, q, timestamp ) ); - - q->addObserver( boost::static_pointer_cast<QueueObserver>(manager) ); + boost::shared_ptr<MessageGroupManager> manager( new MessageGroupManager( headerKey, qName, messages, timestamp ) ); - QPID_LOG( debug, "Configured Queue '" << q->getName() << + QPID_LOG( debug, "Configured Queue '" << qName << "' for message grouping using header key '" << headerKey << "'" << " (timestamp=" << timestamp << ")"); return manager; @@ -346,7 +347,7 @@ void MessageGroupManager::getState(qpid: } state.setArray(GROUP_STATE, groupState); - QPID_LOG(debug, "Queue \"" << queue->getName() << "\": replicating message group state, key=" << groupIdHeader); + QPID_LOG(debug, "Queue \"" << qName << "\": replicating message group state, key=" << groupIdHeader); } @@ -363,7 +364,7 @@ void MessageGroupManager::setState(const bool ok = state.getArray(GROUP_STATE, groupState); if (!ok) { QPID_LOG(error, "Unable to find message group state information for queue \"" << - queue->getName() << "\": cluster inconsistency error!"); + qName << "\": cluster inconsistency error!"); return; } @@ -373,13 +374,13 @@ void MessageGroupManager::setState(const ok = framing::getEncodedValue<FieldTable>(*g, group); if (!ok) { QPID_LOG(error, "Invalid message group state information for queue \"" << - queue->getName() << "\": table encoding error!"); + qName << "\": table encoding error!"); return; } MessageGroupManager::GroupState state; if (!group.isSet(GROUP_NAME) || !group.isSet(GROUP_OWNER) || !group.isSet(GROUP_ACQUIRED_CT)) { QPID_LOG(error, "Invalid message group state information for queue \"" << - queue->getName() << "\": fields missing error!"); + qName << "\": fields missing error!"); return; } state.group = group.getAsString(GROUP_NAME); @@ -389,7 +390,7 @@ void MessageGroupManager::setState(const ok = group.getArray(GROUP_POSITIONS, positions); if (!ok) { QPID_LOG(error, "Invalid message group state information for queue \"" << - queue->getName() << "\": position encoding error!"); + qName << "\": position encoding error!"); return; } @@ -404,5 +405,5 @@ void MessageGroupManager::setState(const } } - QPID_LOG(debug, "Queue \"" << queue->getName() << "\": message group state replicated, key =" << groupIdHeader) + QPID_LOG(debug, "Queue \"" << qName << "\": message group state replicated, key =" << groupIdHeader) } Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageGroupManager.h URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageGroupManager.h?rev=1176538&r1=1176537&r2=1176538&view=diff ============================================================================== --- qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageGroupManager.h (original) +++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageGroupManager.h Tue Sep 27 19:20:49 2011 @@ -38,6 +38,8 @@ class MessageGroupManager : public State { const std::string groupIdHeader; // msg header holding group identifier const unsigned int timestamp; // mark messages with timestamp if set + Messages& messages; // parent Queue's in memory message container + const std::string qName; // name of parent queue (for logs) struct GroupState { typedef std::list<framing::SequenceNumber> PositionFifo; @@ -93,11 +95,14 @@ class MessageGroupManager : public State public: - static boost::shared_ptr<MessageGroupManager> create( Queue *q, const qpid::framing::FieldTable& settings ); - - MessageGroupManager(const std::string& header, Queue *q, unsigned int _timestamp=0 ) - : StatefulQueueObserver(std::string("MessageGroupManager:") + header), MessageAllocator(q), - groupIdHeader( header ), timestamp(_timestamp) {} + static boost::shared_ptr<MessageGroupManager> create( const std::string& qName, + Messages& messages, + const qpid::framing::FieldTable& settings ); + + MessageGroupManager(const std::string& header, const std::string& _qName, + Messages& container, unsigned int _timestamp=0 ) + : StatefulQueueObserver(std::string("MessageGroupManager:") + header), + groupIdHeader( header ), timestamp(_timestamp), messages(container), qName(_qName) {} void enqueued( const QueuedMessage& qm ); void acquired( const QueuedMessage& qm ); void requeued( const QueuedMessage& qm ); @@ -107,12 +112,12 @@ class MessageGroupManager : public State void getState(qpid::framing::FieldTable& state ) const; void setState(const qpid::framing::FieldTable&); - bool nextConsumableMessage( Consumer::shared_ptr& c, QueuedMessage& next, - const sys::Mutex::ScopedLock&); - // uses default nextBrowsableMessage() - bool acquirable(const std::string& consumer, const QueuedMessage& msg, - const sys::Mutex::ScopedLock&); - void query(qpid::types::Variant::Map&, const sys::Mutex::ScopedLock&) const; + // MessageAllocator iface + bool nextConsumableMessage(Consumer::shared_ptr& c, QueuedMessage& next); + bool allocate(const std::string& c, const QueuedMessage& qm); + bool nextBrowsableMessage(Consumer::shared_ptr& c, QueuedMessage& next); + void query(qpid::types::Variant::Map&) const; + bool match(const qpid::types::Variant::Map*, const QueuedMessage&) const; }; Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/Queue.cpp URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/Queue.cpp?rev=1176538&r1=1176537&r2=1176538&view=diff ============================================================================== --- qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/Queue.cpp (original) +++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/Queue.cpp Tue Sep 27 19:20:49 2011 @@ -33,7 +33,7 @@ #include "qpid/broker/QueueRegistry.h" #include "qpid/broker/QueueFlowLimit.h" #include "qpid/broker/ThresholdAlerts.h" -#include "qpid/broker/MessageAllocator.h" +#include "qpid/broker/FifoAllocator.h" #include "qpid/broker/MessageGroupManager.h" #include "qpid/StringUtils.h" @@ -115,7 +115,7 @@ Queue::Queue(const string& _name, bool _ deleted(false), barrier(*this), autoDeleteTimeout(0), - allocator(new MessageAllocator( this )) + allocator(new FifoAllocator( *messages )) { if (parent != 0 && broker != 0) { ManagementAgent* agent = broker->getManagementAgent(); @@ -249,7 +249,7 @@ bool Queue::acquire(const QueuedMessage& assertClusterSafe(); QPID_LOG(debug, consumer << " attempting to acquire message at " << msg.position); - if (!allocator->acquirable( consumer, msg, locker )) { + if (!allocator->allocate( consumer, msg )) { QPID_LOG(debug, "Not permitted to acquire msg at " << msg.position << " from '" << name); return false; } @@ -300,7 +300,7 @@ Queue::ConsumeCode Queue::consumeNextMes Mutex::ScopedLock locker(messageLock); QueuedMessage msg; - if (!allocator->nextConsumableMessage(c, msg, locker)) { // no next available + if (!allocator->nextConsumableMessage(c, msg)) { // no next available QPID_LOG(debug, "No messages available to dispatch to consumer " << c->getName() << " on queue '" << name << "'"); listeners.addListener(c); @@ -319,7 +319,7 @@ Queue::ConsumeCode Queue::consumeNextMes if (c->filter(msg.payload)) { if (c->accept(msg.payload)) { - bool ok = allocator->acquirable( c->getName(), msg, locker ); // inform allocator + bool ok = allocator->allocate( c->getName(), msg ); // inform allocator (void) ok; assert(ok); ok = acquire( msg.position, msg, locker); (void) ok; assert(ok); @@ -346,7 +346,7 @@ bool Queue::browseNextMessage(QueuedMess Mutex::ScopedLock locker(messageLock); QueuedMessage msg; - if (!allocator->nextBrowsableMessage(c, msg, locker)) { // no next available + if (!allocator->nextBrowsableMessage(c, msg)) { // no next available QPID_LOG(debug, "No browsable messages available for consumer " << c->getName() << " on queue '" << name << "'"); listeners.addListener(c); @@ -990,22 +990,27 @@ void Queue::configureImpl(const FieldTab if (lvqKey.size()) { QPID_LOG(debug, "Configured queue " << getName() << " as Last Value Queue with key " << lvqKey); messages = std::auto_ptr<Messages>(new MessageMap(lvqKey)); + allocator = boost::shared_ptr<MessageAllocator>(new FifoAllocator( *messages )); } else if (_settings.get(qpidLastValueQueueNoBrowse)) { QPID_LOG(debug, "Configured queue " << getName() << " as Legacy Last Value Queue with 'no-browse' on"); messages = LegacyLVQ::updateOrReplace(messages, qpidVQMatchProperty, true, broker); + allocator = boost::shared_ptr<MessageAllocator>(new FifoAllocator( *messages )); } else if (_settings.get(qpidLastValueQueue)) { QPID_LOG(debug, "Configured queue " << getName() << " as Legacy Last Value Queue"); messages = LegacyLVQ::updateOrReplace(messages, qpidVQMatchProperty, false, broker); + allocator = boost::shared_ptr<MessageAllocator>(new FifoAllocator( *messages )); } else { std::auto_ptr<Messages> m = Fairshare::create(_settings); if (m.get()) { messages = m; + allocator = boost::shared_ptr<MessageAllocator>(new FifoAllocator( *messages )); QPID_LOG(debug, "Configured queue " << getName() << " as priority queue."); } else { // default (FIFO) queue type // override default message allocator if message groups configured. - boost::shared_ptr<MessageAllocator> ma = boost::static_pointer_cast<MessageAllocator>(MessageGroupManager::create( this, _settings )); - if (ma) { - allocator = ma; + boost::shared_ptr<MessageGroupManager> mgm(MessageGroupManager::create( getName(), *messages, _settings)); + if (mgm) { + allocator = mgm; + addObserver(mgm); } } } @@ -1300,7 +1305,7 @@ void Queue::query(qpid::types::Variant:: { Mutex::ScopedLock locker(messageLock); /** @todo add any interesting queue state into results */ - if (allocator) allocator->query( results, messageLock ); + if (allocator) allocator->query(results); } void Queue::setPosition(SequenceNumber n) { --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscr...@qpid.apache.org