Thanks, the one remaining issue I know of with the selector patch is that consumer
sequence number wrap-around I don't think works.

We need a test there and maybe change the comp operators in your patch. I was looking into
that last week on the selector patch, I'm itching to get the patch in.

Carl.


chenta lee wrote:
Hi Carl,
This patch looks great, I will update the selector patch later.

Chenta

On Sat, Nov 7, 2009 at 3:30 AM, Carl Trieloff <[email protected] <mailto:[email protected]>> wrote:


    I created a patch which seems to work well, it targets querying
    the queue, count, acquire making the
    queue access faster for large queues (best 1 {if no requeue or
    acquire}, worst case binary-search) . In
    most cases if it faster then binary search even if requeue or
    selector is used.

    It does require that the re-queue order be corrected - which
    should be done regardless.

    The remaining function that could use some similar dressing would
    be Queue::seek()

    Any thoughts on the patch... This patch opens the way for
    reasonable selector performance.
    Carl.


    Index: qpid/broker/Queue.cpp
    ===================================================================
    --- qpid/broker/Queue.cpp       (revision 833135)
    +++ qpid/broker/Queue.cpp       (working copy)
    @@ -243,18 +243,18 @@
     {
        Mutex::ScopedLock locker(messageLock);
        QPID_LOG(debug, "Attempting to acquire message at " << position);
    -    for (Messages::iterator i = messages.begin(); i !=
    messages.end(); i++) {
    -        if (i->position == position) {
    -            message = *i;
    -            if (lastValueQueue) {
    -                clearLVQIndex(*i);
    -            }
    -            QPID_LOG(debug,
    -                     "Acquired message at " << i->position << "
    from " << name);
    -            messages.erase(i);
    -            return true;
    +
    +    Messages::iterator i = findAt(position);
    +    if (i != messages.end() ) {
    +        message = *i;
    +        if (lastValueQueue) {
    +            clearLVQIndex(*i);
            }
    -    }
    +        QPID_LOG(debug,
    +                 "Acquired message at " << i->position << " from
    " << name);
    +        messages.erase(i);
    +        return true;
    +    }
        QPID_LOG(debug, "Could not acquire message at " << position <<
    " from " << name << "; no message at that position");
        return false;
     }
    @@ -262,21 +262,21 @@
     bool Queue::acquire(const QueuedMessage& msg) {
        Mutex::ScopedLock locker(messageLock);
        QPID_LOG(debug, "attempting to acquire " << msg.position);
    -    for (Messages::iterator i = messages.begin(); i !=
    messages.end(); i++) {
    -        if ((i->position == msg.position && !lastValueQueue) //
    note that in some cases payload not be set
    -            || (lastValueQueue && (i->position == msg.position) &&
    -                msg.payload.get() ==
    checkLvqReplace(*i).payload.get()) )  {
    +    Messages::iterator i = findAt(msg.position);
    +    if ((i != messages.end() && !lastValueQueue) // note that in
    some cases payload not be set
    +        || (lastValueQueue && (i->position == msg.position) &&
    +            msg.payload.get() ==
    checkLvqReplace(*i).payload.get()) )  {

    -            clearLVQIndex(msg);
    -            QPID_LOG(debug,
    -                     "Match found, acquire succeeded: " <<
    -                     i->position << " == " << msg.position);
    -            messages.erase(i);
    -            return true;
    -        } else {
    -            QPID_LOG(debug, "No match: " << i->position << " != "
    << msg.position);
    -        }
    +        clearLVQIndex(msg);
    +        QPID_LOG(debug,
    +                 "Match found, acquire succeeded: " <<
    +                 i->position << " == " << msg.position);
    +        messages.erase(i);
    +        return true;
    +    } else {
    +        QPID_LOG(debug, "No match: " << i->position << " != " <<
    msg.position);
        }
    +
        QPID_LOG(debug, "Acquire failed for " << msg.position);
        return false;
     }
    @@ -445,19 +445,35 @@
        return false;
     }

    -namespace {
    -struct PositionEquals {
    -    SequenceNumber pos;
    -    PositionEquals(SequenceNumber p) : pos(p) {}
    -    bool operator()(const QueuedMessage& msg) const { return
    msg.position == pos; }
    -};
    -}// namespace
    +Queue::Messages::iterator Queue::findAt(SequenceNumber pos) {

    +    if(!messages.empty()){
    +        QueuedMessage compM;
    +        compM.position = pos;
    +        unsigned long diff = pos.getValue() -
    messages.front().position.getValue();
    +        long maxEnd = diff < messages.size()? diff : messages.size();
    +
    +        Messages::iterator i =
    lower_bound(messages.begin(),messages.begin()+maxEnd,compM);
    +        if (i->position == pos)
    +            return i;
    +    }
    +    return messages.end(); // no match found.
    +}
    +
    +
     QueuedMessage Queue::find(SequenceNumber pos) const {
    +
        Mutex::ScopedLock locker(messageLock);
    -    Messages::const_iterator i = std::find_if(messages.begin(),
    messages.end(), PositionEquals(pos));
    -    if (i != messages.end())
    -        return *i;
    +    if(!messages.empty()){
    +        QueuedMessage compM;
    +        compM.position = pos;
    +        unsigned long diff = pos.getValue() -
    messages.front().position.getValue();
    +        long maxEnd = diff < messages.size()? diff : messages.size();
    +
    +        Messages::const_iterator i =
    lower_bound(messages.begin(),messages.begin()+maxEnd,compM);
    +        if (i != messages.end())
    +            return *i;
    +    }
        return QueuedMessage();
     }

    @@ -642,10 +658,9 @@
     }

     /** function only provided for unit tests, or code not in
    critical message path */
    -uint32_t Queue::getMessageCount() const
    +uint32_t Queue::getEnqueueCompleteMessageCount() const
     {
        Mutex::ScopedLock locker(messageLock);
    -
        uint32_t count = 0;
        for ( Messages::const_iterator i = messages.begin(); i !=
    messages.end(); ++i ) {
            //NOTE: don't need to use checkLvqReplace() here as it
    @@ -657,6 +672,12 @@
        return count;
     }

    +uint32_t Queue::getMessageCount() const
    +{
    +    Mutex::ScopedLock locker(messageLock);
    +    return messages.size();
    +}
    +
     uint32_t Queue::getConsumerCount() const
     {
        Mutex::ScopedLock locker(consumerLock);
    Index: qpid/broker/QueuedMessage.h
    ===================================================================
    --- qpid/broker/QueuedMessage.h (revision 833135)
    +++ qpid/broker/QueuedMessage.h (working copy)
    @@ -38,7 +38,9 @@
        QueuedMessage(Queue* q, boost::intrusive_ptr<Message> msg,
    framing::SequenceNumber sn) :
            payload(msg), position(sn), queue(q) {}
        QueuedMessage(Queue* q) : queue(q) {}
    +
     };
    +    inline bool operator<(const QueuedMessage& a, const
    QueuedMessage& b) { return a.position < b.position; }

     }}

    Index: qpid/broker/Queue.h
    ===================================================================
    --- qpid/broker/Queue.h (revision 833135)
    +++ qpid/broker/Queue.h (working copy)
    @@ -148,6 +148,8 @@
                        }
                    }
                }
    +
    +            Messages::iterator findAt(framing::SequenceNumber pos);

            public:

    @@ -221,6 +223,7 @@
                uint32_t move(const Queue::shared_ptr destq, uint32_t
    qty);

                QPID_BROKER_EXTERN uint32_t getMessageCount() const;
    +            QPID_BROKER_EXTERN uint32_t
    getEnqueueCompleteMessageCount() const;
                QPID_BROKER_EXTERN uint32_t getConsumerCount() const;
                inline const string& getName() const { return name; }
                bool isExclusiveOwner(const OwnershipToken* const o)
    const;
    Index: tests/QueueTest.cpp
    ===================================================================
    --- tests/QueueTest.cpp (revision 833135)
    +++ tests/QueueTest.cpp (working copy)
    @@ -120,9 +120,10 @@
        queue->process(msg1);
        sleep(2);
        uint32_t compval=0;
    -    BOOST_CHECK_EQUAL(compval, queue->getMessageCount());
    +    BOOST_CHECK_EQUAL(compval,
    queue->getEnqueueCompleteMessageCount());
        msg1->enqueueComplete();
        compval=1;
    +    BOOST_CHECK_EQUAL(compval,
    queue->getEnqueueCompleteMessageCount());
        BOOST_CHECK_EQUAL(compval, queue->getMessageCount());
     }



    ---------------------------------------------------------------------
    Apache Qpid - AMQP Messaging Implementation
    Project:      http://qpid.apache.org
    Use/Interact: mailto:[email protected]
    <mailto:[email protected]>



Reply via email to