Thanks, the one remaining issue I know of with the selector patch is
that consumer
sequence number wrap-around I don't think works.
We need a test there and maybe change the comp operators in your patch.
I was looking into
that last week on the selector patch, I'm itching to get the patch in.
Carl.
chenta lee wrote:
Hi Carl,
This patch looks great, I will update the selector patch later.
Chenta
On Sat, Nov 7, 2009 at 3:30 AM, Carl Trieloff <[email protected]
<mailto:[email protected]>> wrote:
I created a patch which seems to work well, it targets querying
the queue, count, acquire making the
queue access faster for large queues (best 1 {if no requeue or
acquire}, worst case binary-search) . In
most cases if it faster then binary search even if requeue or
selector is used.
It does require that the re-queue order be corrected - which
should be done regardless.
The remaining function that could use some similar dressing would
be Queue::seek()
Any thoughts on the patch... This patch opens the way for
reasonable selector performance.
Carl.
Index: qpid/broker/Queue.cpp
===================================================================
--- qpid/broker/Queue.cpp (revision 833135)
+++ qpid/broker/Queue.cpp (working copy)
@@ -243,18 +243,18 @@
{
Mutex::ScopedLock locker(messageLock);
QPID_LOG(debug, "Attempting to acquire message at " << position);
- for (Messages::iterator i = messages.begin(); i !=
messages.end(); i++) {
- if (i->position == position) {
- message = *i;
- if (lastValueQueue) {
- clearLVQIndex(*i);
- }
- QPID_LOG(debug,
- "Acquired message at " << i->position << "
from " << name);
- messages.erase(i);
- return true;
+
+ Messages::iterator i = findAt(position);
+ if (i != messages.end() ) {
+ message = *i;
+ if (lastValueQueue) {
+ clearLVQIndex(*i);
}
- }
+ QPID_LOG(debug,
+ "Acquired message at " << i->position << " from
" << name);
+ messages.erase(i);
+ return true;
+ }
QPID_LOG(debug, "Could not acquire message at " << position <<
" from " << name << "; no message at that position");
return false;
}
@@ -262,21 +262,21 @@
bool Queue::acquire(const QueuedMessage& msg) {
Mutex::ScopedLock locker(messageLock);
QPID_LOG(debug, "attempting to acquire " << msg.position);
- for (Messages::iterator i = messages.begin(); i !=
messages.end(); i++) {
- if ((i->position == msg.position && !lastValueQueue) //
note that in some cases payload not be set
- || (lastValueQueue && (i->position == msg.position) &&
- msg.payload.get() ==
checkLvqReplace(*i).payload.get()) ) {
+ Messages::iterator i = findAt(msg.position);
+ if ((i != messages.end() && !lastValueQueue) // note that in
some cases payload not be set
+ || (lastValueQueue && (i->position == msg.position) &&
+ msg.payload.get() ==
checkLvqReplace(*i).payload.get()) ) {
- clearLVQIndex(msg);
- QPID_LOG(debug,
- "Match found, acquire succeeded: " <<
- i->position << " == " << msg.position);
- messages.erase(i);
- return true;
- } else {
- QPID_LOG(debug, "No match: " << i->position << " != "
<< msg.position);
- }
+ clearLVQIndex(msg);
+ QPID_LOG(debug,
+ "Match found, acquire succeeded: " <<
+ i->position << " == " << msg.position);
+ messages.erase(i);
+ return true;
+ } else {
+ QPID_LOG(debug, "No match: " << i->position << " != " <<
msg.position);
}
+
QPID_LOG(debug, "Acquire failed for " << msg.position);
return false;
}
@@ -445,19 +445,35 @@
return false;
}
-namespace {
-struct PositionEquals {
- SequenceNumber pos;
- PositionEquals(SequenceNumber p) : pos(p) {}
- bool operator()(const QueuedMessage& msg) const { return
msg.position == pos; }
-};
-}// namespace
+Queue::Messages::iterator Queue::findAt(SequenceNumber pos) {
+ if(!messages.empty()){
+ QueuedMessage compM;
+ compM.position = pos;
+ unsigned long diff = pos.getValue() -
messages.front().position.getValue();
+ long maxEnd = diff < messages.size()? diff : messages.size();
+
+ Messages::iterator i =
lower_bound(messages.begin(),messages.begin()+maxEnd,compM);
+ if (i->position == pos)
+ return i;
+ }
+ return messages.end(); // no match found.
+}
+
+
QueuedMessage Queue::find(SequenceNumber pos) const {
+
Mutex::ScopedLock locker(messageLock);
- Messages::const_iterator i = std::find_if(messages.begin(),
messages.end(), PositionEquals(pos));
- if (i != messages.end())
- return *i;
+ if(!messages.empty()){
+ QueuedMessage compM;
+ compM.position = pos;
+ unsigned long diff = pos.getValue() -
messages.front().position.getValue();
+ long maxEnd = diff < messages.size()? diff : messages.size();
+
+ Messages::const_iterator i =
lower_bound(messages.begin(),messages.begin()+maxEnd,compM);
+ if (i != messages.end())
+ return *i;
+ }
return QueuedMessage();
}
@@ -642,10 +658,9 @@
}
/** function only provided for unit tests, or code not in
critical message path */
-uint32_t Queue::getMessageCount() const
+uint32_t Queue::getEnqueueCompleteMessageCount() const
{
Mutex::ScopedLock locker(messageLock);
-
uint32_t count = 0;
for ( Messages::const_iterator i = messages.begin(); i !=
messages.end(); ++i ) {
//NOTE: don't need to use checkLvqReplace() here as it
@@ -657,6 +672,12 @@
return count;
}
+uint32_t Queue::getMessageCount() const
+{
+ Mutex::ScopedLock locker(messageLock);
+ return messages.size();
+}
+
uint32_t Queue::getConsumerCount() const
{
Mutex::ScopedLock locker(consumerLock);
Index: qpid/broker/QueuedMessage.h
===================================================================
--- qpid/broker/QueuedMessage.h (revision 833135)
+++ qpid/broker/QueuedMessage.h (working copy)
@@ -38,7 +38,9 @@
QueuedMessage(Queue* q, boost::intrusive_ptr<Message> msg,
framing::SequenceNumber sn) :
payload(msg), position(sn), queue(q) {}
QueuedMessage(Queue* q) : queue(q) {}
+
};
+ inline bool operator<(const QueuedMessage& a, const
QueuedMessage& b) { return a.position < b.position; }
}}
Index: qpid/broker/Queue.h
===================================================================
--- qpid/broker/Queue.h (revision 833135)
+++ qpid/broker/Queue.h (working copy)
@@ -148,6 +148,8 @@
}
}
}
+
+ Messages::iterator findAt(framing::SequenceNumber pos);
public:
@@ -221,6 +223,7 @@
uint32_t move(const Queue::shared_ptr destq, uint32_t
qty);
QPID_BROKER_EXTERN uint32_t getMessageCount() const;
+ QPID_BROKER_EXTERN uint32_t
getEnqueueCompleteMessageCount() const;
QPID_BROKER_EXTERN uint32_t getConsumerCount() const;
inline const string& getName() const { return name; }
bool isExclusiveOwner(const OwnershipToken* const o)
const;
Index: tests/QueueTest.cpp
===================================================================
--- tests/QueueTest.cpp (revision 833135)
+++ tests/QueueTest.cpp (working copy)
@@ -120,9 +120,10 @@
queue->process(msg1);
sleep(2);
uint32_t compval=0;
- BOOST_CHECK_EQUAL(compval, queue->getMessageCount());
+ BOOST_CHECK_EQUAL(compval,
queue->getEnqueueCompleteMessageCount());
msg1->enqueueComplete();
compval=1;
+ BOOST_CHECK_EQUAL(compval,
queue->getEnqueueCompleteMessageCount());
BOOST_CHECK_EQUAL(compval, queue->getMessageCount());
}
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]
<mailto:[email protected]>