Author: aconway
Date: Mon Apr 23 15:51:46 2012
New Revision: 1329301

URL: http://svn.apache.org/viewvc?rev=1329301&view=rev
Log:
QPID-3960: Fix  performance regression in priority queue implementation.

Revision r1307582 created a serious degredation in priority queue performance.

It replaced a muti-deque implementation with o(1) complexity for consuming with
a map implementation with o(log(n)) performance.

This revision returns to a mutli-deque algorithm but with the addition
of a FIFO index for fast browsing of acquired and unacquired messages.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/Fairshare.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Fairshare.h
    qpid/trunk/qpid/cpp/src/qpid/broker/MessageDeque.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/MessageDeque.h
    qpid/trunk/qpid/cpp/src/qpid/broker/PriorityQueue.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/PriorityQueue.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
    qpid/trunk/qpid/cpp/src/tests/ha_tests.py
    qpid/trunk/qpid/cpp/xml/cluster.xml

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Fairshare.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Fairshare.cpp?rev=1329301&r1=1329300&r2=1329301&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Fairshare.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Fairshare.cpp Mon Apr 23 15:51:46 2012
@@ -23,7 +23,6 @@
 #include "qpid/framing/FieldTable.h"
 #include "qpid/framing/FieldValue.h"
 #include "qpid/log/Statement.h"
-#include <algorithm>
 #include <boost/format.hpp>
 #include <boost/lexical_cast.hpp>
 #include <boost/assign/list_of.hpp>
@@ -33,7 +32,7 @@ namespace broker {
 
 Fairshare::Fairshare(size_t levels, uint limit) :
     PriorityQueue(levels),
-    limits(levels, limit), counts(levels, 0) {}
+    limits(levels, limit), priority(levels-1), count(0) {}
 
 
 void Fairshare::setLimit(size_t level, uint limit)
@@ -41,63 +40,70 @@ void Fairshare::setLimit(size_t level, u
     limits[level] = limit;
 }
 
+bool Fairshare::limitReached()
+{
+    uint l = limits[priority];
+    return l && ++count > l;
+}
+
+uint Fairshare::currentLevel()
+{
+    if (limitReached()) {
+        return nextLevel();
+    } else {
+        return priority;
+    }
+}
+
+uint Fairshare::nextLevel()
+{
+    count = 1;
+    if (priority) --priority;
+    else priority = levels-1;
+    return priority;
+}
+
 bool Fairshare::isNull()
 {
     for (int i = 0; i < levels; i++) if (limits[i]) return false;
     return true;
 }
 
-bool Fairshare::getState(qpid::framing::FieldTable& state) const
+bool Fairshare::getState(uint& p, uint& c) const
 {
-    for (int i = 0; i < levels; i++) {
-        if (counts[i]) {
-            std::string key = (boost::format("fairshare-count-%1%") % i).str();
-            state.setInt(key, counts[i]);
-        }
-    }
+    p = priority;
+    c = count;
     return true;
 }
 
-bool Fairshare::checkLevel(uint level)
+bool Fairshare::setState(uint p, uint c)
 {
-    if (!limits[level] || counts[level] < limits[level]) {
-        counts[level]++;
-        return true;
-    } else {
-        return false;
-    }
+    priority = p;
+    count = c;
+    return true;
 }
 
-bool Fairshare::consume(QueuedMessage& message)
+bool Fairshare::findFrontLevel(uint& p, PriorityLevels& messages)
 {
-    for (Available::iterator i = available.begin(); i != available.end(); ++i) 
{
-        QueuedMessage* next = *i;
-        if (checkLevel(getPriorityLevel(*next))) {
-            messages[next->position].status = QueuedMessage::ACQUIRED;
-            message = *next;
-            available.erase(i);
-            return true;
-        }
-    }
-    if (!available.empty()) {
-        std::fill(counts.begin(), counts.end(), 0);//reset counts
-        return consume(message);
-    } else {
-        return false;
-    }
+    const uint start = p = currentLevel();
+    do {
+        if (!messages[p].empty()) return true;
+    } while ((p = nextLevel()) != start);
+    return false;
 }
 
 
-bool Fairshare::getState(const Messages& m, qpid::framing::FieldTable& counts)
+
+bool Fairshare::getState(const Messages& m, uint& priority, uint& count)
 {
     const Fairshare* fairshare = dynamic_cast<const Fairshare*>(&m);
-    return fairshare && fairshare->getState(counts);
+    return fairshare && fairshare->getState(priority, count);
 }
 
-bool Fairshare::setState(Messages& m, const qpid::framing::FieldTable& counts)
+bool Fairshare::setState(Messages& m, uint priority, uint count)
 {
     Fairshare* fairshare = dynamic_cast<Fairshare*>(&m);
-    return fairshare && fairshare->setState(counts);
+    return fairshare && fairshare->setState(priority, count);
 }
 
 int getIntegerSetting(const qpid::framing::FieldTable& settings, const 
std::vector<std::string>& keys)
@@ -130,14 +136,7 @@ int getIntegerSettingForKey(const qpid::
 {
     return getIntegerSetting(settings, 
boost::assign::list_of<std::string>(key));
 }
-bool Fairshare::setState(const qpid::framing::FieldTable& state)
-{
-    for (int i = 0; i < levels; i++) {
-        std::string key = (boost::format("fairshare-count-%1%") % i).str();
-        counts[i] = state.isSet(key) ? getIntegerSettingForKey(state, key) : 0;
-    }
-    return true;
-}
+
 int getSetting(const qpid::framing::FieldTable& settings, const 
std::vector<std::string>& keys, int minvalue, int maxvalue)
 {
     return std::max(minvalue,std::min(getIntegerSetting(settings, keys), 
maxvalue));

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Fairshare.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Fairshare.h?rev=1329301&r1=1329300&r2=1329301&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Fairshare.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Fairshare.h Mon Apr 23 15:51:46 2012
@@ -22,7 +22,6 @@
  *
  */
 #include "qpid/broker/PriorityQueue.h"
-#include <vector>
 
 namespace qpid {
 namespace framing {
@@ -39,19 +38,23 @@ class Fairshare : public PriorityQueue
 {
   public:
     Fairshare(size_t levels, uint limit);
-    bool getState(qpid::framing::FieldTable& counts) const;
-    bool setState(const qpid::framing::FieldTable& counts);
+    bool getState(uint& priority, uint& count) const;
+    bool setState(uint priority, uint count);
     void setLimit(size_t level, uint limit);
     bool isNull();
-    bool consume(QueuedMessage&);
     static std::auto_ptr<Messages> create(const qpid::framing::FieldTable& 
settings);
-    static bool getState(const Messages&, qpid::framing::FieldTable& counts);
-    static bool setState(Messages&, const qpid::framing::FieldTable& counts);
+    static bool getState(const Messages&, uint& priority, uint& count);
+    static bool setState(Messages&, uint priority, uint count);
   private:
     std::vector<uint> limits;
-    std::vector<uint> counts;
 
-    bool checkLevel(uint level);
+    uint priority;
+    uint count;
+
+    uint currentLevel();
+    uint nextLevel();
+    bool limitReached();
+    bool findFrontLevel(uint& p, PriorityLevels&);
 };
 }} // namespace qpid::broker
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/MessageDeque.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/MessageDeque.cpp?rev=1329301&r1=1329300&r2=1329301&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/MessageDeque.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/MessageDeque.cpp Mon Apr 23 15:51:46 
2012
@@ -21,6 +21,7 @@
 #include "qpid/broker/MessageDeque.h"
 #include "qpid/broker/QueuedMessage.h"
 #include "qpid/log/Statement.h"
+#include "assert.h"
 
 namespace qpid {
 namespace broker {
@@ -39,7 +40,7 @@ size_t MessageDeque::index(const framing
 bool MessageDeque::deleted(const QueuedMessage& m)
 {
     size_t i = index(m.position);
-    if (i < messages.size()) {
+    if (i < messages.size() && messages[i].status != QueuedMessage::DELETED) {
         messages[i].status = QueuedMessage::DELETED;
         clean();
         return true;
@@ -53,7 +54,7 @@ size_t MessageDeque::size()
     return available;
 }
 
-void MessageDeque::release(const QueuedMessage& message)
+QueuedMessage* MessageDeque::releasePtr(const QueuedMessage& message)
 {
     size_t i = index(message.position);
     if (i < messages.size()) {
@@ -62,12 +63,17 @@ void MessageDeque::release(const QueuedM
             if (head > i) head = i;
             m.status = QueuedMessage::AVAILABLE;
             ++available;
+            return &messages[i];
         }
     } else {
+        assert(0);
         QPID_LOG(error, "Failed to release message at " << message.position << 
" " << message.payload->getFrames().getContent() << "; no such message (index=" 
<< i << ", size=" << messages.size() << ")");
     }
+    return 0;
 }
 
+void MessageDeque::release(const QueuedMessage& message) { 
releasePtr(message); }
+
 bool MessageDeque::acquire(const framing::SequenceNumber& position, 
QueuedMessage& message)
 {
     if (position < messages.front().position) return false;
@@ -129,8 +135,7 @@ QueuedMessage padding(uint32_t pos) {
 }
 } // namespace
 
-bool MessageDeque::push(const QueuedMessage& added, QueuedMessage& /*not 
needed*/)
-{
+QueuedMessage* MessageDeque::pushPtr(const QueuedMessage& added) {
     //add padding to prevent gaps in sequence, which break the index
     //calculation (needed for queue replication)
     while (messages.size() && (added.position - messages.back().position) > 1)
@@ -139,7 +144,12 @@ bool MessageDeque::push(const QueuedMess
     messages.back().status = QueuedMessage::AVAILABLE;
     if (head >= messages.size()) head = messages.size() - 1;
     ++available;
-    return false;//adding a message never causes one to be removed for deque
+    return &messages.back();
+}
+
+bool MessageDeque::push(const QueuedMessage& added, QueuedMessage& /*not 
needed*/) {
+    pushPtr(added);
+    return false; // adding a message never causes one to be removed for deque
 }
 
 void MessageDeque::updateAcquired(const QueuedMessage& acquired)

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/MessageDeque.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/MessageDeque.h?rev=1329301&r1=1329300&r2=1329301&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/MessageDeque.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/MessageDeque.h Mon Apr 23 15:51:46 2012
@@ -48,6 +48,12 @@ class MessageDeque : public Messages
     void foreach(Functor);
     void removeIf(Predicate);
 
+    // For use by other Messages implementations that use MessageDeque as a 
FIFO index
+    // and keep pointers to its elements in their own indexing strctures.
+    void clean();
+    QueuedMessage* releasePtr(const QueuedMessage&);
+    QueuedMessage* pushPtr(const QueuedMessage& added);
+
   private:
     typedef std::deque<QueuedMessage> Deque;
     Deque messages;
@@ -55,7 +61,6 @@ class MessageDeque : public Messages
     size_t head;
 
     size_t index(const framing::SequenceNumber&);
-    void clean();
 };
 }} // namespace qpid::broker
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/PriorityQueue.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/PriorityQueue.cpp?rev=1329301&r1=1329300&r2=1329301&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/PriorityQueue.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/PriorityQueue.cpp Mon Apr 23 15:51:46 
2012
@@ -3,13 +3,13 @@
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
+ * regarding copyright ownersip.  The ASF licenses this file
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -22,133 +22,124 @@
 #include "qpid/broker/Queue.h"
 #include "qpid/broker/QueuedMessage.h"
 #include "qpid/framing/reply_exceptions.h"
+#include "qpid/log/Statement.h"
 #include <cmath>
 
 namespace qpid {
 namespace broker {
 
-PriorityQueue::PriorityQueue(int l) : 
-    levels(l) {}
-
-bool PriorityQueue::deleted(const QueuedMessage& message)
-{
-    Index::iterator i = messages.find(message.position);
-    if (i != messages.end()) {
-        //remove from available list if necessary
-        if (i->second.status == QueuedMessage::AVAILABLE) {
-            Available::iterator j = std::find(available.begin(), 
available.end(), &i->second);
-            if (j != available.end()) available.erase(j);
-        }
-        //remove from messages map
-        messages.erase(i);
-        return true;
-    } else {
-        return false;
-    }
+PriorityQueue::PriorityQueue(int l) :
+    levels(l),
+    messages(levels, Deque()),
+    frontLevel(0), haveFront(false), cached(false) {}
+
+bool PriorityQueue::deleted(const QueuedMessage& qm) {
+    bool deleted = fifo.deleted(qm);
+    if (deleted) erase(qm);
+    return deleted;
 }
 
 size_t PriorityQueue::size()
 {
-    return available.size();
+    return fifo.size();
+}
+
+namespace {
+bool before(QueuedMessage* a, QueuedMessage* b) { return *a < *b; }
 }
 
 void PriorityQueue::release(const QueuedMessage& message)
 {
-    Index::iterator i = messages.find(message.position);
-    if (i != messages.end() && i->second.status == QueuedMessage::ACQUIRED) {
-        i->second.status = QueuedMessage::AVAILABLE;
-        //insert message back into the correct place in available queue, based 
on priority:
-        Available::iterator j = upper_bound(available.begin(), 
available.end(), &i->second, boost::bind(&PriorityQueue::compare, this, _1, 
_2));
-        available.insert(j, &i->second);
+    QueuedMessage* qm = fifo.releasePtr(message);
+    if (qm) {
+        uint p = getPriorityLevel(message);
+        messages[p].insert(
+            lower_bound(messages[p].begin(), messages[p].end(), qm, before), 
qm);
+        clearCache();
+    }
+}
+
+
+void PriorityQueue::erase(const QueuedMessage& qm) {
+    size_t i = getPriorityLevel(qm);
+    if (!messages[i].empty()) {
+        long diff = qm.position.getValue() - 
messages[i].front()->position.getValue();
+        if (diff < 0) return;
+        long maxEnd = std::min(size_t(diff), messages[i].size());
+        QueuedMessage mutableQm = qm; // need non-const qm for lower_bound
+        Deque::iterator l =
+            lower_bound(messages[i].begin(),messages[i].begin()+maxEnd, 
&mutableQm, before);
+        if (l != messages[i].end() && (*l)->position == qm.position) {
+            messages[i].erase(l);
+            clearCache();
+            return;
+        }
     }
 }
 
 bool PriorityQueue::acquire(const framing::SequenceNumber& position, 
QueuedMessage& message)
 {
-    Index::iterator i = messages.find(position);
-    if (i != messages.end() && i->second.status == QueuedMessage::AVAILABLE) {
-        i->second.status = QueuedMessage::ACQUIRED;
-        message = i->second;
-        //remove it from available list (could make this faster by using 
ordering):
-        Available::iterator j = std::find(available.begin(), available.end(), 
&i->second);
-        assert(j != available.end());
-        available.erase(j);
-        return true;
-    } else {
-        return false;
-    }
+    bool acquired = fifo.acquire(position, message);
+    if (acquired) erase(message); // No longer available
+    return acquired;
 }
 
 bool PriorityQueue::find(const framing::SequenceNumber& position, 
QueuedMessage& message)
 {
-    Index::iterator i = messages.find(position);
-    if (i != messages.end() && i->second.status == QueuedMessage::AVAILABLE) {
-        message = i->second;
-        return true;
-    } else {
-        return false;
-    }
+    return fifo.find(position, message);
 }
 
-bool PriorityQueue::browse(const framing::SequenceNumber& position, 
QueuedMessage& message, bool unacquired)
+bool PriorityQueue::browse(
+    const framing::SequenceNumber& position, QueuedMessage& message, bool 
unacquired)
 {
-    Index::iterator i = messages.lower_bound(position+1);
-    if (i != messages.end() && (i->second.status == QueuedMessage::AVAILABLE  
|| (!unacquired && i->second.status == QueuedMessage::ACQUIRED))) {
-        message = i->second;
-        return true;
-    } else {
-        return false;
-    }
+    return fifo.browse(position, message, unacquired);
 }
 
 bool PriorityQueue::consume(QueuedMessage& message)
 {
-    if (!available.empty()) {
-        QueuedMessage* next = available.front();
-        messages[next->position].status = QueuedMessage::ACQUIRED;
-        message = *next;
-        available.pop_front();
+    if (checkFront()) {
+        QueuedMessage* pm = messages[frontLevel].front();
+        messages[frontLevel].pop_front();
+        clearCache();
+        pm->status = QueuedMessage::ACQUIRED; // Updates FIFO index
+        message = *pm;
         return true;
     } else {
         return false;
     }
 }
 
-bool PriorityQueue::compare(const QueuedMessage* a, const QueuedMessage* b) 
const
+bool PriorityQueue::push(const QueuedMessage& added, QueuedMessage& /*not 
needed*/)
 {
-    int priorityA = getPriorityLevel(*a);
-    int priorityB = getPriorityLevel(*b);
-    if (priorityA == priorityB) return a->position < b->position;
-    else return priorityA > priorityB;
+    QueuedMessage* qmp = fifo.pushPtr(added);
+    messages[getPriorityLevel(added)].push_back(qmp);
+    clearCache();
+    return false; // Adding a message never causes one to be removed for deque
 }
 
-bool PriorityQueue::push(const QueuedMessage& added, QueuedMessage& /*not 
needed*/)
-{
-    Index::iterator i = messages.insert(Index::value_type(added.position, 
added)).first;
-    i->second.status = QueuedMessage::AVAILABLE;
-    //insert message into the correct place in available queue, based on 
priority:
-    Available::iterator j = upper_bound(available.begin(), available.end(), 
&i->second, boost::bind(&PriorityQueue::compare, this, _1, _2));
-    available.insert(j, &i->second);
-    return false;//adding a message never causes one to be removed for deque
+void PriorityQueue::updateAcquired(const QueuedMessage& acquired) {
+    fifo.updateAcquired(acquired);
 }
 
 void PriorityQueue::foreach(Functor f)
 {
-    for (Available::iterator i = available.begin(); i != available.end(); ++i) 
{
-        f(**i);
-    }
+    fifo.foreach(f);
 }
 
 void PriorityQueue::removeIf(Predicate p)
 {
-    for (Available::iterator i = available.begin(); i != available.end();) {
-        if (p(**i)) {
-            messages[(*i)->position].status = QueuedMessage::REMOVED;
-            i = available.erase(i);
-        } else {
-            ++i;
+    for (int priority = 0; priority < levels; ++priority) {
+        for (Deque::iterator i = messages[priority].begin(); i != 
messages[priority].end();) {
+            if (p(**i)) {
+                (*i)->status = QueuedMessage::DELETED; // Updates fifo index
+                i = messages[priority].erase(i);
+                clearCache();
+            } else {
+                ++i;
+            }
         }
     }
+    fifo.clean();
 }
 
 uint PriorityQueue::getPriorityLevel(const QueuedMessage& m) const
@@ -161,6 +152,30 @@ uint PriorityQueue::getPriorityLevel(con
     return std::min(priority - firstLevel, (uint)levels-1);
 }
 
+void PriorityQueue::clearCache()
+{
+    cached = false;
+}
+
+bool PriorityQueue::findFrontLevel(uint& l, PriorityLevels& m)
+{
+    for (int p = levels-1; p >= 0; --p) {
+        if (!m[p].empty()) {
+            l = p;
+            return true;
+        }
+    }
+    return false;
+}
+
+bool PriorityQueue::checkFront()
+{
+    if (!cached) {
+        haveFront = findFrontLevel(frontLevel, messages);
+        cached = true;
+    }
+    return haveFront;
+}
 
 uint PriorityQueue::getPriority(const QueuedMessage& message)
 {

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/PriorityQueue.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/PriorityQueue.h?rev=1329301&r1=1329300&r2=1329301&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/PriorityQueue.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/PriorityQueue.h Mon Apr 23 15:51:46 2012
@@ -10,9 +10,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -21,10 +21,10 @@
  * under the License.
  *
  */
-#include "qpid/broker/Messages.h"
+#include "qpid/broker/MessageDeque.h"
 #include "qpid/sys/IntegerTypes.h"
-#include <list>
-#include <map>
+#include <deque>
+#include <vector>
 
 namespace qpid {
 namespace broker {
@@ -32,7 +32,10 @@ namespace broker {
 /**
  * Basic priority queue with a configurable number of recognised
  * priority levels. This is implemented as a separate deque per
- * priority level. Browsing is FIFO not priority order.
+ * priority level.
+ *
+ * Browsing is FIFO not priority order. There is a MessageDeque
+ * for fast browsing.
  */
 class PriorityQueue : public Messages
 {
@@ -46,22 +49,36 @@ class PriorityQueue : public Messages
     bool acquire(const framing::SequenceNumber&, QueuedMessage&);
     bool find(const framing::SequenceNumber&, QueuedMessage&);
     bool browse(const framing::SequenceNumber&, QueuedMessage&, bool);
-    virtual bool consume(QueuedMessage&);
+    bool consume(QueuedMessage&);
     bool push(const QueuedMessage& added, QueuedMessage& removed);
-
+    void updateAcquired(const QueuedMessage& acquired);
     void foreach(Functor);
     void removeIf(Predicate);
+
     static uint getPriority(const QueuedMessage&);
+
   protected:
-    typedef std::list<QueuedMessage*> Available;
-    typedef std::map<framing::SequenceNumber, QueuedMessage> Index;
+    typedef std::deque<QueuedMessage*> Deque;
+    typedef std::vector<Deque> PriorityLevels;
+    virtual bool findFrontLevel(uint& p, PriorityLevels&);
 
     const int levels;
-    Index messages;
-    Available available;
 
-    bool compare(const QueuedMessage* a, const QueuedMessage* b) const;
-    uint getPriorityLevel(const QueuedMessage& m) const;
+  private:
+    /** Available messages separated by priority and sorted in priority order.
+     *  Holds pointers to the QueuedMessages in fifo
+     */
+    PriorityLevels messages;
+    /** FIFO index of all messsagse (including acquired messages)  for fast 
browsing and indexing */
+    MessageDeque fifo;
+    uint frontLevel;
+    bool haveFront;
+    bool cached;
+
+    void erase(const QueuedMessage&);
+    uint getPriorityLevel(const QueuedMessage&) const;
+    void clearCache();
+    bool checkFront();
 };
 
 }} // namespace qpid::broker

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=1329301&r1=1329300&r2=1329301&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp Mon Apr 23 15:51:46 2012
@@ -585,9 +585,9 @@ void Connection::queuePosition(const str
     findQueue(qname)->setPosition(position);
 }
 
-void Connection::queueFairshareState(const std::string& qname, const 
framing::FieldTable& counts)
+void Connection::queueFairshareState(const std::string& qname, const uint8_t 
priority, const uint8_t count)
 {
-    if (!qpid::broker::Fairshare::setState(findQueue(qname)->getMessages(), 
counts)) {
+    if (!qpid::broker::Fairshare::setState(findQueue(qname)->getMessages(), 
priority, count)) {
         QPID_LOG(error, "Failed to set fair share state on queue " << qname << 
"; this will result in inconsistencies.");
     }
 }

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h?rev=1329301&r1=1329300&r2=1329301&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h Mon Apr 23 15:51:46 2012
@@ -156,7 +156,7 @@ class Connection :
                         uint32_t credit);
 
     void queuePosition(const std::string&, const framing::SequenceNumber&);
-    void queueFairshareState(const std::string&, const framing::FieldTable& 
count);
+    void queueFairshareState(const std::string&, const uint8_t priority, const 
uint8_t count);
     void queueObserverState(const std::string&, const std::string&, const 
framing::FieldTable&);
 
     void txStart();

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp?rev=1329301&r1=1329300&r2=1329301&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp Mon Apr 23 15:51:46 
2012
@@ -389,9 +389,9 @@ void UpdateClient::updateQueue(client::A
     q->eachMessage(boost::bind(&MessageUpdater::updateQueuedMessage, &updater, 
_1));
     q->eachBinding(boost::bind(&UpdateClient::updateBinding, this, s, 
q->getName(), _1));
     ClusterConnectionProxy(s).queuePosition(q->getName(), q->getPosition());
-    qpid::framing::FieldTable counts;
-    if (qpid::broker::Fairshare::getState(q->getMessages(), counts)) {
-        ClusterConnectionProxy(s).queueFairshareState(q->getName(), counts);
+    uint priority, count;
+    if (qpid::broker::Fairshare::getState(q->getMessages(), priority, count)) {
+        ClusterConnectionProxy(s).queueFairshareState(q->getName(), priority, 
count);
     }
 
     ClusterConnectionProxy(s).queueDequeueSincePurgeState(q->getName(), 
q->getDequeueSincePurge());

Modified: qpid/trunk/qpid/cpp/src/tests/ha_tests.py
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ha_tests.py?rev=1329301&r1=1329300&r2=1329301&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ha_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/ha_tests.py Mon Apr 23 15:51:46 2012
@@ -468,6 +468,7 @@ class ReplicationTests(BrokerTest):
         s = primary.connect().session().sender("q; {create:always, 
node:{x-declare:{arguments:{'qpid.policy_type':ring, 'qpid.max_count':5, 
'qpid.priorities':10}}}}")
         priorities = [8,9,5,1,2,2,3,4,9,7,8,9,9,2]
         for p in priorities: s.send(Message(priority=p))
+
         # FIXME aconway 2012-02-22: there is a bug in priority ring
         # queues that allows a low priority message to displace a high
         # one. The following commented-out assert_browse is for the

Modified: qpid/trunk/qpid/cpp/xml/cluster.xml
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/xml/cluster.xml?rev=1329301&r1=1329300&r2=1329301&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/xml/cluster.xml (original)
+++ qpid/trunk/qpid/cpp/xml/cluster.xml Mon Apr 23 15:51:46 2012
@@ -304,7 +304,8 @@
     <!-- Set the fairshare delivery related state of a replicated queue. -->
     <control name="queue-fairshare-state" code="0x38">
       <field name="queue" type="str8"/>
-      <field name="counts" type="map"/>
+      <field name="position" type="uint8"/>
+      <field name="count" type="uint8"/>
     </control>
 
     <!-- Replicate a QueueObserver for a given queue. -->



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to