Author: kgiusti
Date: Tue Sep 27 19:20:49 2011
New Revision: 1176538

URL: http://svn.apache.org/viewvc?rev=1176538&view=rev
Log:
QPID-3346: incorporate more review feedback

Added:
    qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/FifoAllocator.cpp
    qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/FifoAllocator.h
Removed:
    qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageAllocator.cpp
Modified:
    qpid/branches/qpid-3346/qpid/cpp/src/CMakeLists.txt
    qpid/branches/qpid-3346/qpid/cpp/src/Makefile.am
    qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageAllocator.h
    qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp
    qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageGroupManager.h
    qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/Queue.cpp

Modified: qpid/branches/qpid-3346/qpid/cpp/src/CMakeLists.txt
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/CMakeLists.txt?rev=1176538&r1=1176537&r2=1176538&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/CMakeLists.txt (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/CMakeLists.txt Tue Sep 27 19:20:49 2011
@@ -975,7 +975,7 @@ set (qpidbroker_SOURCES
      qpid/broker/Queue.cpp
      qpid/broker/QueueCleaner.cpp
      qpid/broker/QueueListeners.cpp
-     qpid/broker/MessageAllocator.cpp
+     qpid/broker/FifoAllocator.cpp
      qpid/broker/MessageGroupManager.cpp
      qpid/broker/PersistableMessage.cpp
      qpid/broker/Bridge.cpp

Modified: qpid/branches/qpid-3346/qpid/cpp/src/Makefile.am
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/Makefile.am?rev=1176538&r1=1176537&r2=1176538&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/Makefile.am (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/Makefile.am Tue Sep 27 19:20:49 2011
@@ -672,7 +672,8 @@ libqpidbroker_la_SOURCES = \
   qpid/broker/Vhost.cpp \
   qpid/broker/Vhost.h \
   qpid/broker/MessageAllocator.h \
-  qpid/broker/MessageAllocator.cpp \
+  qpid/broker/FifoAllocator.h \
+  qpid/broker/FifoAllocator.cpp \
   qpid/broker/MessageGroupManager.cpp \
   qpid/broker/MessageGroupManager.h \
   qpid/management/ManagementAgent.cpp \

Added: qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/FifoAllocator.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/FifoAllocator.cpp?rev=1176538&view=auto
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/FifoAllocator.cpp (added)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/FifoAllocator.cpp Tue Sep 
27 19:20:49 2011
@@ -0,0 +1,58 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+#include "qpid/broker/Queue.h"
+#include "qpid/broker/FifoAllocator.h"
+
+using namespace qpid::broker;
+
+FifoAllocator::FifoAllocator(Messages& container)
+    : messages(container) {}
+
+bool FifoAllocator::nextConsumableMessage( Consumer::shared_ptr&, 
QueuedMessage& next )
+{
+    if (!messages.empty()) {
+        next = messages.front();    // by default, consume oldest msg
+        return true;
+    }
+    return false;
+}
+
+bool FifoAllocator::allocate(const std::string&, const QueuedMessage& )
+{
+    // by default, all messages present on the queue may be allocated as they 
have yet to
+    // be acquired.
+    return true;
+}
+
+bool FifoAllocator::nextBrowsableMessage( Consumer::shared_ptr& c, 
QueuedMessage& next )
+{
+    if (!messages.empty() && messages.next(c->position, next))
+        return true;
+    return false;
+}
+
+void FifoAllocator::query(qpid::types::Variant::Map&) const
+{
+    // nothing to see here....
+}
+

Added: qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/FifoAllocator.h
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/FifoAllocator.h?rev=1176538&view=auto
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/FifoAllocator.h (added)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/FifoAllocator.h Tue Sep 27 
19:20:49 2011
@@ -0,0 +1,58 @@
+#ifndef _broker_FifoAllocator_h
+#define _broker_FifoAllocator_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/** Simple MessageAllocator for FIFO Queues - the HEAD message is always the 
next
+ * available message for consumption.
+ */
+
+#include "qpid/broker/MessageAllocator.h"
+
+namespace qpid {
+namespace broker {
+
+class Messages;
+
+class FifoAllocator : public MessageAllocator
+{
+ public:
+    FifoAllocator(Messages& container);
+
+    /** Locking Note: all methods assume the caller is holding the 
Queue::messageLock
+     * during the method call.
+     */
+
+    /** MessageAllocator interface */
+
+    bool nextConsumableMessage( Consumer::shared_ptr& consumer, QueuedMessage& 
next );
+    bool allocate(const std::string& consumer, const QueuedMessage& target);
+    bool nextBrowsableMessage( Consumer::shared_ptr& consumer, QueuedMessage& 
next );
+    void query(qpid::types::Variant::Map&) const;
+
+ private:
+    Messages& messages;
+};
+
+}}
+
+#endif

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageAllocator.h
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageAllocator.h?rev=1176538&r1=1176537&r2=1176538&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageAllocator.h 
(original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageAllocator.h Tue Sep 
27 19:20:49 2011
@@ -22,7 +22,9 @@
  *
  */
 
-/* Used by queues to allocate the next "most desirable" message to a consuming 
client */
+/** Abstraction used by Queue to determine the next "most desirable" message 
to provide to
+ * a particular consuming client
+ */
 
 
 #include "qpid/broker/Consumer.h"
@@ -30,48 +32,43 @@
 namespace qpid {
 namespace broker {
 
-class Queue;
 struct QueuedMessage;
 
 class MessageAllocator
 {
- protected:
-    Queue *queue;
  public:
-    MessageAllocator( Queue *q ) : queue(q) {}
     virtual ~MessageAllocator() {};
 
-    // Note: all methods taking a mutex assume the caller is holding the
-    // Queue::messageLock during the method call.
+    /** Locking Note: all methods assume the caller is holding the 
Queue::messageLock
+     * during the method call.
+     */
 
     /** Determine the next message available for consumption by the consumer
-     * @param next set to the next message that the consumer may acquire.
-     * @return true if message is available
+     * @param consumer the consumer that needs a message to consume
+     * @param next set to the next message that the consumer may consume.
+     * @return true if message is available and next is set
      */
     virtual bool nextConsumableMessage( Consumer::shared_ptr& consumer,
-                                        QueuedMessage& next,
-                                        const sys::Mutex::ScopedLock& lock);
+                                        QueuedMessage& next ) = 0;
+
+    /** Allow the comsumer to take ownership of the given message.
+     * @param consumer the name of the consumer that is attempting to acquire 
the message
+     * @param qm the message to be acquired, previously returned from 
nextConsumableMessage()
+     * @return true if ownership is permitted, false if ownership cannot be 
assigned.
+     */
+    virtual bool allocate( const std::string& consumer,
+                           const QueuedMessage& target) = 0;
 
     /** Determine the next message available for browsing by the consumer
+     * @param consumer the consumer that is browsing the queue
      * @param next set to the next message that the consumer may browse.
-     * @return true if a message is available
+     * @return true if a message is available and next is returned
      */
     virtual bool nextBrowsableMessage( Consumer::shared_ptr& consumer,
-                                       QueuedMessage& next,
-                                       const sys::Mutex::ScopedLock& lock);
-
-    /** check if a message previously returned via next*Message() may be 
acquired.
-     * @param consumer name of consumer that is attempting to acquire the 
message
-     * @param qm the message to be acquired
-     * @param messageLock - ensures caller is holding it!
-     * @return true if acquire is permitted, false if acquire is no longer 
permitted.
-     */
-    virtual bool acquirable( const std::string&,
-                             const QueuedMessage&,
-                             const sys::Mutex::ScopedLock&);
+                                       QueuedMessage& next ) = 0;
 
     /** hook to add any interesting management state to the status map */
-    virtual void query(qpid::types::Variant::Map&, const 
sys::Mutex::ScopedLock&) const;
+    virtual void query(qpid::types::Variant::Map&) const = 0;
 };
 
 }}

Modified: 
qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp?rev=1176538&r1=1176537&r2=1176538&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp 
(original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp 
Tue Sep 27 19:20:49 2011
@@ -61,7 +61,7 @@ void MessageGroupManager::enqueued( cons
     GroupState &state(messageGroups[group]);
     state.members.push_back(qm.position);
     uint32_t total = state.members.size();
-    QPID_LOG( trace, "group queue " << queue->getName() <<
+    QPID_LOG( trace, "group queue " << qName <<
               ": added message to group id=" << group << " total=" << total );
     if (total == 1) {
         // newly created group, no owner
@@ -81,7 +81,7 @@ void MessageGroupManager::acquired( cons
     assert( gs != messageGroups.end() );
     GroupState& state( gs->second );
     state.acquired += 1;
-    QPID_LOG( trace, "group queue " << queue->getName() <<
+    QPID_LOG( trace, "group queue " << qName <<
               ": acquired message in group id=" << group << " acquired=" << 
state.acquired );
 }
 
@@ -99,11 +99,11 @@ void MessageGroupManager::requeued( cons
     assert( state.acquired != 0 );
     state.acquired -= 1;
     if (state.acquired == 0 && state.owned()) {
-        QPID_LOG( trace, "group queue " << queue->getName() <<
+        QPID_LOG( trace, "group queue " << qName <<
                   ": consumer name=" << state.owner << " released group id=" 
<< gs->first);
         disown(state);
     }
-    QPID_LOG( trace, "group queue " << queue->getName() <<
+    QPID_LOG( trace, "group queue " << qName <<
               ": requeued message to group id=" << group << " acquired=" << 
state.acquired );
 }
 
@@ -138,16 +138,16 @@ void MessageGroupManager::dequeued( cons
         if (!state.owned()) {  // unlikely, but need to remove from the free 
list before erase
             unFree( state );
         }
-        QPID_LOG( trace, "group queue " << queue->getName() << ": deleting 
group id=" << gs->first);
+        QPID_LOG( trace, "group queue " << qName << ": deleting group id=" << 
gs->first);
         messageGroups.erase( gs );
     } else {
         if (state.acquired == 0 && state.owned()) {
-            QPID_LOG( trace, "group queue " << queue->getName() <<
+            QPID_LOG( trace, "group queue " << qName <<
                       ": consumer name=" << state.owner << " released group 
id=" << gs->first);
             disown(state);
         }
     }
-    QPID_LOG( trace, "group queue " << queue->getName() <<
+    QPID_LOG( trace, "group queue " << qName <<
               ": dequeued message from group id=" << group << " total=" << 
total );
 }
 
@@ -155,7 +155,7 @@ void MessageGroupManager::consumerAdded(
 {
     assert(consumers.find(c.getName()) == consumers.end());
     consumers[c.getName()] = 0;     // no groups owned yet
-    QPID_LOG( trace, "group queue " << queue->getName() << ": added consumer, 
name=" << c.getName() );
+    QPID_LOG( trace, "group queue " << qName << ": added consumer, name=" << 
c.getName() );
 }
 
 void MessageGroupManager::consumerRemoved( const Consumer& c )
@@ -172,20 +172,17 @@ void MessageGroupManager::consumerRemove
         if (state.owner == name) {
             --count;
             disown(state);
-            QPID_LOG( trace, "group queue " << queue->getName() <<
+            QPID_LOG( trace, "group queue " << qName <<
                       ": consumer name=" << name << " released group id=" << 
gs->first);
         }
     }
     consumers.erase( consumer );
-    QPID_LOG( trace, "group queue " << queue->getName() << ": removed consumer 
name=" << name );
+    QPID_LOG( trace, "group queue " << qName << ": removed consumer name=" << 
name );
 }
 
 
-bool MessageGroupManager::nextConsumableMessage( Consumer::shared_ptr& c, 
QueuedMessage& next,
-                                                 const 
qpid::sys::Mutex::ScopedLock& )
+bool MessageGroupManager::nextConsumableMessage( Consumer::shared_ptr& c, 
QueuedMessage& next )
 {
-    Messages& messages(queue->getMessages());
-
     if (messages.empty())
         return false;
 
@@ -220,8 +217,7 @@ bool MessageGroupManager::nextConsumable
 }
 
 
-bool MessageGroupManager::acquirable(const std::string& consumer, const 
QueuedMessage& qm,
-                                     const qpid::sys::Mutex::ScopedLock&)
+bool MessageGroupManager::allocate(const std::string& consumer, const 
QueuedMessage& qm)
 {
     // @todo KAG avoid lookup: retrieve direct reference to group state from 
QueuedMessage
     std::string group( getGroupId(qm) );
@@ -231,16 +227,22 @@ bool MessageGroupManager::acquirable(con
 
     if (!state.owned()) {
         own( state, consumer );
-        QPID_LOG( trace, "group queue " << queue->getName() <<
+        QPID_LOG( trace, "group queue " << qName <<
                   ": consumer name=" << consumer << " has acquired group id=" 
<< gs->first);
         return true;
     }
     return state.owner == consumer;
 }
 
+bool MessageGroupManager::nextBrowsableMessage( Consumer::shared_ptr& c, 
QueuedMessage& next )
+{
+    // browse: allow access to any aquired msg, regardless of group ownership 
(?ok?)
+    if (!messages.empty() && messages.next(c->position, next))
+        return true;
+    return false;
+}
 
-void MessageGroupManager::query(qpid::types::Variant::Map& status,
-                                const qpid::sys::Mutex::ScopedLock&) const
+void MessageGroupManager::query(qpid::types::Variant::Map& status) const
 {
     /** Add a description of the current state of the message groups for this 
queue.
         FORMAT:
@@ -276,7 +278,8 @@ void MessageGroupManager::query(qpid::ty
 }
 
 
-boost::shared_ptr<MessageGroupManager> MessageGroupManager::create( Queue *q,
+boost::shared_ptr<MessageGroupManager> MessageGroupManager::create( const 
std::string& qName,
+                                                                    Messages& 
messages,
                                                                     const 
qpid::framing::FieldTable& settings )
 {
     boost::shared_ptr<MessageGroupManager> empty;
@@ -285,16 +288,14 @@ boost::shared_ptr<MessageGroupManager> M
 
         std::string headerKey = settings.getAsString(qpidMessageGroupKey);
         if (headerKey.empty()) {
-            QPID_LOG( error, "A Message Group header key must be configured, 
queue=" << q->getName());
+            QPID_LOG( error, "A Message Group header key must be configured, 
queue=" << qName);
             return empty;
         }
         unsigned int timestamp = settings.getAsInt(qpidMessageGroupTimestamp);
 
-        boost::shared_ptr<MessageGroupManager> manager( new 
MessageGroupManager( headerKey, q, timestamp ) );
-
-        q->addObserver( boost::static_pointer_cast<QueueObserver>(manager) );
+        boost::shared_ptr<MessageGroupManager> manager( new 
MessageGroupManager( headerKey, qName, messages, timestamp ) );
 
-        QPID_LOG( debug, "Configured Queue '" << q->getName() <<
+        QPID_LOG( debug, "Configured Queue '" << qName <<
                   "' for message grouping using header key '" << headerKey << 
"'" <<
                   " (timestamp=" << timestamp << ")");
         return manager;
@@ -346,7 +347,7 @@ void MessageGroupManager::getState(qpid:
     }
     state.setArray(GROUP_STATE, groupState);
 
-    QPID_LOG(debug, "Queue \"" << queue->getName() << "\": replicating message 
group state, key=" << groupIdHeader);
+    QPID_LOG(debug, "Queue \"" << qName << "\": replicating message group 
state, key=" << groupIdHeader);
 }
 
 
@@ -363,7 +364,7 @@ void MessageGroupManager::setState(const
     bool ok = state.getArray(GROUP_STATE, groupState);
     if (!ok) {
         QPID_LOG(error, "Unable to find message group state information for 
queue \"" <<
-                 queue->getName() << "\": cluster inconsistency error!");
+                 qName << "\": cluster inconsistency error!");
         return;
     }
 
@@ -373,13 +374,13 @@ void MessageGroupManager::setState(const
         ok = framing::getEncodedValue<FieldTable>(*g, group);
         if (!ok) {
             QPID_LOG(error, "Invalid message group state information for queue 
\"" <<
-                     queue->getName() << "\": table encoding error!");
+                     qName << "\": table encoding error!");
             return;
         }
         MessageGroupManager::GroupState state;
         if (!group.isSet(GROUP_NAME) || !group.isSet(GROUP_OWNER) || 
!group.isSet(GROUP_ACQUIRED_CT)) {
             QPID_LOG(error, "Invalid message group state information for queue 
\"" <<
-                     queue->getName() << "\": fields missing error!");
+                     qName << "\": fields missing error!");
             return;
         }
         state.group = group.getAsString(GROUP_NAME);
@@ -389,7 +390,7 @@ void MessageGroupManager::setState(const
         ok = group.getArray(GROUP_POSITIONS, positions);
         if (!ok) {
             QPID_LOG(error, "Invalid message group state information for queue 
\"" <<
-                     queue->getName() << "\": position encoding error!");
+                     qName << "\": position encoding error!");
             return;
         }
 
@@ -404,5 +405,5 @@ void MessageGroupManager::setState(const
         }
     }
 
-    QPID_LOG(debug, "Queue \"" << queue->getName() << "\": message group state 
replicated, key =" << groupIdHeader)
+    QPID_LOG(debug, "Queue \"" << qName << "\": message group state 
replicated, key =" << groupIdHeader)
 }

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageGroupManager.h
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageGroupManager.h?rev=1176538&r1=1176537&r2=1176538&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageGroupManager.h 
(original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageGroupManager.h Tue 
Sep 27 19:20:49 2011
@@ -38,6 +38,8 @@ class MessageGroupManager : public State
 {
     const std::string groupIdHeader;    // msg header holding group identifier
     const unsigned int timestamp;       // mark messages with timestamp if set
+    Messages& messages;                 // parent Queue's in memory message 
container
+    const std::string qName;            // name of parent queue (for logs)
 
     struct GroupState {
         typedef std::list<framing::SequenceNumber> PositionFifo;
@@ -93,11 +95,14 @@ class MessageGroupManager : public State
 
  public:
 
-    static boost::shared_ptr<MessageGroupManager> create( Queue *q, const 
qpid::framing::FieldTable& settings );
-
-    MessageGroupManager(const std::string& header, Queue *q, unsigned int 
_timestamp=0 )
-      : StatefulQueueObserver(std::string("MessageGroupManager:") + header), 
MessageAllocator(q),
-        groupIdHeader( header ), timestamp(_timestamp) {}
+    static boost::shared_ptr<MessageGroupManager> create( const std::string& 
qName,
+                                                          Messages& messages,
+                                                          const 
qpid::framing::FieldTable& settings );
+
+    MessageGroupManager(const std::string& header, const std::string& _qName,
+                        Messages& container, unsigned int _timestamp=0 )
+      : StatefulQueueObserver(std::string("MessageGroupManager:") + header),
+      groupIdHeader( header ), timestamp(_timestamp), messages(container), 
qName(_qName) {}
     void enqueued( const QueuedMessage& qm );
     void acquired( const QueuedMessage& qm );
     void requeued( const QueuedMessage& qm );
@@ -107,12 +112,12 @@ class MessageGroupManager : public State
     void getState(qpid::framing::FieldTable& state ) const;
     void setState(const qpid::framing::FieldTable&);
 
-    bool nextConsumableMessage( Consumer::shared_ptr& c, QueuedMessage& next,
-                                const sys::Mutex::ScopedLock&);
-    // uses default nextBrowsableMessage()
-    bool acquirable(const std::string& consumer, const QueuedMessage& msg,
-                    const sys::Mutex::ScopedLock&);
-    void query(qpid::types::Variant::Map&, const sys::Mutex::ScopedLock&) 
const;
+    // MessageAllocator iface
+    bool nextConsumableMessage(Consumer::shared_ptr& c, QueuedMessage& next);
+    bool allocate(const std::string& c, const QueuedMessage& qm);
+    bool nextBrowsableMessage(Consumer::shared_ptr& c, QueuedMessage& next);
+    void query(qpid::types::Variant::Map&) const;
+
     bool match(const qpid::types::Variant::Map*, const QueuedMessage&) const;
 };
 

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/Queue.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/Queue.cpp?rev=1176538&r1=1176537&r2=1176538&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/Queue.cpp Tue Sep 27 
19:20:49 2011
@@ -33,7 +33,7 @@
 #include "qpid/broker/QueueRegistry.h"
 #include "qpid/broker/QueueFlowLimit.h"
 #include "qpid/broker/ThresholdAlerts.h"
-#include "qpid/broker/MessageAllocator.h"
+#include "qpid/broker/FifoAllocator.h"
 #include "qpid/broker/MessageGroupManager.h"
 
 #include "qpid/StringUtils.h"
@@ -115,7 +115,7 @@ Queue::Queue(const string& _name, bool _
     deleted(false),
     barrier(*this),
     autoDeleteTimeout(0),
-    allocator(new MessageAllocator( this ))
+    allocator(new FifoAllocator( *messages ))
 {
     if (parent != 0 && broker != 0) {
         ManagementAgent* agent = broker->getManagementAgent();
@@ -249,7 +249,7 @@ bool Queue::acquire(const QueuedMessage&
     assertClusterSafe();
     QPID_LOG(debug, consumer << " attempting to acquire message at " << 
msg.position);
 
-    if (!allocator->acquirable( consumer, msg, locker )) {
+    if (!allocator->allocate( consumer, msg )) {
         QPID_LOG(debug, "Not permitted to acquire msg at " << msg.position << 
" from '" << name);
         return false;
     }
@@ -300,7 +300,7 @@ Queue::ConsumeCode Queue::consumeNextMes
         Mutex::ScopedLock locker(messageLock);
         QueuedMessage msg;
 
-        if (!allocator->nextConsumableMessage(c, msg, locker)) { // no next 
available
+        if (!allocator->nextConsumableMessage(c, msg)) { // no next available
             QPID_LOG(debug, "No messages available to dispatch to consumer " <<
                      c->getName() << " on queue '" << name << "'");
             listeners.addListener(c);
@@ -319,7 +319,7 @@ Queue::ConsumeCode Queue::consumeNextMes
 
         if (c->filter(msg.payload)) {
             if (c->accept(msg.payload)) {
-                bool ok = allocator->acquirable( c->getName(), msg, locker );  
// inform allocator
+                bool ok = allocator->allocate( c->getName(), msg );  // inform 
allocator
                 (void) ok; assert(ok);
                 ok = acquire( msg.position, msg, locker);
                 (void) ok; assert(ok);
@@ -346,7 +346,7 @@ bool Queue::browseNextMessage(QueuedMess
         Mutex::ScopedLock locker(messageLock);
         QueuedMessage msg;
 
-        if (!allocator->nextBrowsableMessage(c, msg, locker)) { // no next 
available
+        if (!allocator->nextBrowsableMessage(c, msg)) { // no next available
             QPID_LOG(debug, "No browsable messages available for consumer " <<
                      c->getName() << " on queue '" << name << "'");
             listeners.addListener(c);
@@ -990,22 +990,27 @@ void Queue::configureImpl(const FieldTab
     if (lvqKey.size()) {
         QPID_LOG(debug, "Configured queue " <<  getName() << " as Last Value 
Queue with key " << lvqKey);
         messages = std::auto_ptr<Messages>(new MessageMap(lvqKey));
+        allocator = boost::shared_ptr<MessageAllocator>(new FifoAllocator( 
*messages ));
     } else if (_settings.get(qpidLastValueQueueNoBrowse)) {
         QPID_LOG(debug, "Configured queue " <<  getName() << " as Legacy Last 
Value Queue with 'no-browse' on");
         messages = LegacyLVQ::updateOrReplace(messages, qpidVQMatchProperty, 
true, broker);
+        allocator = boost::shared_ptr<MessageAllocator>(new FifoAllocator( 
*messages ));
     } else if (_settings.get(qpidLastValueQueue)) {
         QPID_LOG(debug, "Configured queue " <<  getName() << " as Legacy Last 
Value Queue");
         messages = LegacyLVQ::updateOrReplace(messages, qpidVQMatchProperty, 
false, broker);
+        allocator = boost::shared_ptr<MessageAllocator>(new FifoAllocator( 
*messages ));
     } else {
         std::auto_ptr<Messages> m = Fairshare::create(_settings);
         if (m.get()) {
             messages = m;
+            allocator = boost::shared_ptr<MessageAllocator>(new FifoAllocator( 
*messages ));
             QPID_LOG(debug, "Configured queue " <<  getName() << " as priority 
queue.");
         } else { // default (FIFO) queue type
             // override default message allocator if message groups configured.
-            boost::shared_ptr<MessageAllocator> ma = 
boost::static_pointer_cast<MessageAllocator>(MessageGroupManager::create( this, 
_settings ));
-            if (ma) {
-                allocator = ma;
+            boost::shared_ptr<MessageGroupManager> 
mgm(MessageGroupManager::create( getName(), *messages, _settings));
+            if (mgm) {
+                allocator = mgm;
+                addObserver(mgm);
             }
         }
     }
@@ -1300,7 +1305,7 @@ void Queue::query(qpid::types::Variant::
 {
     Mutex::ScopedLock locker(messageLock);
     /** @todo add any interesting queue state into results */
-    if (allocator) allocator->query( results, messageLock );
+    if (allocator) allocator->query(results);
 }
 
 void Queue::setPosition(SequenceNumber n) {



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscr...@qpid.apache.org

Reply via email to