Author: kgiusti
Date: Wed Nov 30 15:04:21 2011
New Revision: 1208463

URL: http://svn.apache.org/viewvc?rev=1208463&view=rev
Log:
QPID-3603: checkpoint prototyped changes

Modified:
    qpid/branches/qpid-3603-kgiusti/qpid/cpp/src/qpid/broker/Consumer.h
    qpid/branches/qpid-3603-kgiusti/qpid/cpp/src/qpid/broker/FifoDistributor.cpp
    qpid/branches/qpid-3603-kgiusti/qpid/cpp/src/qpid/broker/FifoDistributor.h
    
qpid/branches/qpid-3603-kgiusti/qpid/cpp/src/qpid/broker/MessageDistributor.h
    
qpid/branches/qpid-3603-kgiusti/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp
    
qpid/branches/qpid-3603-kgiusti/qpid/cpp/src/qpid/broker/MessageGroupManager.h
    qpid/branches/qpid-3603-kgiusti/qpid/cpp/src/qpid/broker/Queue.cpp
    qpid/branches/qpid-3603-kgiusti/qpid/cpp/src/qpid/broker/SemanticState.cpp
    qpid/branches/qpid-3603-kgiusti/qpid/cpp/src/qpid/broker/SemanticState.h

Modified: qpid/branches/qpid-3603-kgiusti/qpid/cpp/src/qpid/broker/Consumer.h
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-kgiusti/qpid/cpp/src/qpid/broker/Consumer.h?rev=1208463&r1=1208462&r2=1208463&view=diff
==============================================================================
--- qpid/branches/qpid-3603-kgiusti/qpid/cpp/src/qpid/broker/Consumer.h 
(original)
+++ qpid/branches/qpid-3603-kgiusti/qpid/cpp/src/qpid/broker/Consumer.h Wed Nov 
30 15:04:21 2011
@@ -33,6 +33,8 @@ class QueueListeners;
 
 class Consumer {
     const bool acquires;
+    const bool browseAcquired;
+    const bool rewindable;
     // inListeners allows QueueListeners to efficiently track if this instance 
is registered
     // for notifications without having to search its containers
     bool inListeners;
@@ -44,18 +46,29 @@ class Consumer {
 
     framing::SequenceNumber position;
 
-    Consumer(const std::string& _name, bool preAcquires = true)
-      : acquires(preAcquires), inListeners(false), name(_name), position(0) {}
+    Consumer(const std::string& _name, bool preAcquires = true, bool ba = 
false)
+      : acquires(preAcquires), browseAcquired(ba), rewindable(false), 
inListeners(false),
+        name(_name), position(0) {}
     bool preAcquires() const { return acquires; }
     const std::string& getName() const { return name; }
 
     virtual bool deliver(QueuedMessage& msg) = 0;
     virtual void notify() = 0;
-    virtual bool filter(boost::intrusive_ptr<Message>) { return true; }
-    virtual bool accept(boost::intrusive_ptr<Message>) { return true; }
+    // virtual bool filter(boost::intrusive_ptr<Message>) { return true; }
+    // virtual bool accept(boost::intrusive_ptr<Message>) { return true; }
     virtual OwnershipToken* getSession() = 0;
     virtual ~Consumer(){}
     friend class QueueListeners;
+
+    /** true if Consumer is a browsing subscription */
+    bool isBrowsing() const { return !acquires; }
+    /** if true, pass acquired messages to consumer, as well as un-acquired */
+    virtual bool allowAcquired() const { return isBrowsing() && 
browseAcquired; }
+    /** if true, reset consumer's position to queue HEAD if messages released. 
*/
+    virtual bool rewindOnRelease() const { return rewindable; }
+    /** called by Queue to allow consumer to filter the current message */
+    enum Action {ACCEPT, SKIP, RETRY};
+    virtual Action accept(const QueuedMessage& msg) = 0;
 };
 
 }}

Modified: 
qpid/branches/qpid-3603-kgiusti/qpid/cpp/src/qpid/broker/FifoDistributor.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-kgiusti/qpid/cpp/src/qpid/broker/FifoDistributor.cpp?rev=1208463&r1=1208462&r2=1208463&view=diff
==============================================================================
--- 
qpid/branches/qpid-3603-kgiusti/qpid/cpp/src/qpid/broker/FifoDistributor.cpp 
(original)
+++ 
qpid/branches/qpid-3603-kgiusti/qpid/cpp/src/qpid/broker/FifoDistributor.cpp 
Wed Nov 30 15:04:21 2011
@@ -28,23 +28,17 @@ using namespace qpid::broker;
 FifoDistributor::FifoDistributor(Messages& container)
     : messages(container) {}
 
-bool FifoDistributor::nextConsumableMessage( Consumer::shared_ptr&, 
QueuedMessage& next )
+bool FifoDistributor::nextMessage( Consumer::shared_ptr& c, QueuedMessage& 
next )
 {
-    return messages.consume(next);
+    return messages.browse(c->position, next, !c->allowAcquired());
 }
 
 bool FifoDistributor::allocate(const std::string&, const QueuedMessage& )
 {
-    // by default, all messages present on the queue may be allocated as they 
have yet to
-    // be acquired.
+    // The Fifo distributor does not enforce or record message allocation
     return true;
 }
 
-bool FifoDistributor::nextBrowsableMessage( Consumer::shared_ptr& c, 
QueuedMessage& next )
-{
-    return messages.browse(c->position, next, false);
-}
-
 void FifoDistributor::query(qpid::types::Variant::Map&) const
 {
     // nothing to see here....

Modified: 
qpid/branches/qpid-3603-kgiusti/qpid/cpp/src/qpid/broker/FifoDistributor.h
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-kgiusti/qpid/cpp/src/qpid/broker/FifoDistributor.h?rev=1208463&r1=1208462&r2=1208463&view=diff
==============================================================================
--- qpid/branches/qpid-3603-kgiusti/qpid/cpp/src/qpid/broker/FifoDistributor.h 
(original)
+++ qpid/branches/qpid-3603-kgiusti/qpid/cpp/src/qpid/broker/FifoDistributor.h 
Wed Nov 30 15:04:21 2011
@@ -44,9 +44,8 @@ class FifoDistributor : public MessageDi
 
     /** MessageDistributor interface */
 
-    bool nextConsumableMessage( Consumer::shared_ptr& consumer, QueuedMessage& 
next );
+    bool nextMessage( Consumer::shared_ptr& consumer, QueuedMessage& next );
     bool allocate(const std::string& consumer, const QueuedMessage& target);
-    bool nextBrowsableMessage( Consumer::shared_ptr& consumer, QueuedMessage& 
next );
     void query(qpid::types::Variant::Map&) const;
 
  private:

Modified: 
qpid/branches/qpid-3603-kgiusti/qpid/cpp/src/qpid/broker/MessageDistributor.h
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-kgiusti/qpid/cpp/src/qpid/broker/MessageDistributor.h?rev=1208463&r1=1208462&r2=1208463&view=diff
==============================================================================
--- 
qpid/branches/qpid-3603-kgiusti/qpid/cpp/src/qpid/broker/MessageDistributor.h 
(original)
+++ 
qpid/branches/qpid-3603-kgiusti/qpid/cpp/src/qpid/broker/MessageDistributor.h 
Wed Nov 30 15:04:21 2011
@@ -48,8 +48,7 @@ class MessageDistributor
      * @param next set to the next message that the consumer may consume.
      * @return true if message is available and next is set
      */
-    virtual bool nextConsumableMessage( Consumer::shared_ptr& consumer,
-                                        QueuedMessage& next ) = 0;
+    virtual bool nextMessage( Consumer::shared_ptr& consumer, QueuedMessage& 
next ) = 0;
 
     /** Allow the comsumer to take ownership of the given message.
      * @param consumer the name of the consumer that is attempting to acquire 
the message
@@ -59,14 +58,6 @@ class MessageDistributor
     virtual bool allocate( const std::string& consumer,
                            const QueuedMessage& target) = 0;
 
-    /** Determine the next message available for browsing by the consumer
-     * @param consumer the consumer that is browsing the queue
-     * @param next set to the next message that the consumer may browse.
-     * @return true if a message is available and next is returned
-     */
-    virtual bool nextBrowsableMessage( Consumer::shared_ptr& consumer,
-                                       QueuedMessage& next ) = 0;
-
     /** hook to add any interesting management state to the status map */
     virtual void query(qpid::types::Variant::Map&) const = 0;
 };

Modified: 
qpid/branches/qpid-3603-kgiusti/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-kgiusti/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp?rev=1208463&r1=1208462&r2=1208463&view=diff
==============================================================================
--- 
qpid/branches/qpid-3603-kgiusti/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp
 (original)
+++ 
qpid/branches/qpid-3603-kgiusti/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp
 Wed Nov 30 15:04:21 2011
@@ -202,11 +202,16 @@ MessageGroupManager::~MessageGroupManage
 {
     QPID_LOG( debug, "group queue " << qName << " cache results: hits=" << 
hits << " misses=" << misses );
 }
-bool MessageGroupManager::nextConsumableMessage( Consumer::shared_ptr& c, 
QueuedMessage& next )
+bool MessageGroupManager::nextMessage( Consumer::shared_ptr& c, QueuedMessage& 
next )
 {
     if (!messages.size())
         return false;
 
+    // Message groups are ignored for browsers
+    if (c->isBrowsing()) {
+        return messages.browse(c->position, next, !c->allowAcquired());
+    }
+
     next.position = c->position;
     if (!freeGroups.empty()) {
         const framing::SequenceNumber& nextFree = freeGroups.begin()->first;
@@ -219,13 +224,12 @@ bool MessageGroupManager::nextConsumable
     while (messages.browse( next.position, next, true )) {
         GroupState& group = findGroup(next);
         if (!group.owned()) {
-            //TODO: make acquire more efficient when we already have the 
message in question
-            if (group.members.front() == next.position && 
messages.acquire(next.position, next)) {    // only take from head!
+            if (group.members.front() == next.position) {    // only take from 
head!
                 return true;
             }
             QPID_LOG(debug, "Skipping " << next.position << " since group " << 
group.group
                      << "'s head message still pending. pos=" << 
group.members.front());
-        } else if (group.owner == c->getName() && 
messages.acquire(next.position, next)) {
+        } else if (group.owner == c->getName()) {
             return true;
         }
     }
@@ -247,12 +251,6 @@ bool MessageGroupManager::allocate(const
     return state.owner == consumer;
 }
 
-bool MessageGroupManager::nextBrowsableMessage( Consumer::shared_ptr& c, 
QueuedMessage& next )
-{
-    // browse: allow access to any available msg, regardless of group 
ownership (?ok?)
-    return messages.browse(c->position, next, false);
-}
-
 void MessageGroupManager::query(qpid::types::Variant::Map& status) const
 {
     /** Add a description of the current state of the message groups for this 
queue.

Modified: 
qpid/branches/qpid-3603-kgiusti/qpid/cpp/src/qpid/broker/MessageGroupManager.h
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-kgiusti/qpid/cpp/src/qpid/broker/MessageGroupManager.h?rev=1208463&r1=1208462&r2=1208463&view=diff
==============================================================================
--- 
qpid/branches/qpid-3603-kgiusti/qpid/cpp/src/qpid/broker/MessageGroupManager.h 
(original)
+++ 
qpid/branches/qpid-3603-kgiusti/qpid/cpp/src/qpid/broker/MessageGroupManager.h 
Wed Nov 30 15:04:21 2011
@@ -103,9 +103,8 @@ class MessageGroupManager : public State
     void setState(const qpid::framing::FieldTable&);
 
     // MessageDistributor iface
-    bool nextConsumableMessage(Consumer::shared_ptr& c, QueuedMessage& next);
+    bool nextMessage(Consumer::shared_ptr& c, QueuedMessage& next);
     bool allocate(const std::string& c, const QueuedMessage& qm);
-    bool nextBrowsableMessage(Consumer::shared_ptr& c, QueuedMessage& next);
     void query(qpid::types::Variant::Map&) const;
 
     bool match(const qpid::types::Variant::Map*, const QueuedMessage&) const;

Modified: qpid/branches/qpid-3603-kgiusti/qpid/cpp/src/qpid/broker/Queue.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-kgiusti/qpid/cpp/src/qpid/broker/Queue.cpp?rev=1208463&r1=1208462&r2=1208463&view=diff
==============================================================================
--- qpid/branches/qpid-3603-kgiusti/qpid/cpp/src/qpid/broker/Queue.cpp 
(original)
+++ qpid/branches/qpid-3603-kgiusti/qpid/cpp/src/qpid/broker/Queue.cpp Wed Nov 
30 15:04:21 2011
@@ -276,118 +276,62 @@ void Queue::notifyListener()
     set.notify();
 }
 
-bool Queue::getNextMessage(QueuedMessage& m, Consumer::shared_ptr& c)
+void Queue::removeListener(Consumer::shared_ptr c)
 {
-    checkNotDeleted();
-    if (c->preAcquires()) {
-        switch (consumeNextMessage(m, c)) {
-          case CONSUMED:
-            return true;
-          case CANT_CONSUME:
-            notifyListener();//let someone else try
-          case NO_MESSAGES:
-          default:
-            return false;
+    QueueListeners::NotificationSet set;
+    {
+        Mutex::ScopedLock locker(messageLock);
+        listeners.removeListener(c);
+        if (messages->size()) {
+            listeners.populate(set);
         }
-    } else {
-        return browseNextMessage(m, c);
     }
+    set.notify();
 }
 
-Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, 
Consumer::shared_ptr& c)
+// give the consumer a message from the queue
+bool Queue::dispatch(Consumer::shared_ptr c)
 {
-    QueuedMessage msg;
+    QueuedMessage msg(this);
     while (true) {
         Mutex::ScopedLock locker(messageLock);
-        if (allocator->nextConsumableMessage(c, msg)) {
+        if (allocator->nextMessage(c, msg)) {
+
             if (msg.payload->hasExpired()) {
                 QPID_LOG(debug, "Message expired from queue '" << name << "'");
-                c->position = msg.position;
+                acquire(msg.position, msg, locker);
                 dequeue(0, msg);
                 continue;
             }
 
-            if (c->filter(msg.payload)) {
-                if (c->accept(msg.payload)) {
-                    bool ok = allocator->allocate( c->getName(), msg );  // 
inform allocator
+            switch (c->accept(msg)) {
+            case Consumer::ACCEPT:
+                if (!c->isBrowsing()) {
+                    // consume this message
+                    bool ok = acquire(msg.position, msg, locker);
+                    (void) ok; assert(ok);
+                    ok = allocator->allocate( c->getName(), msg );
                     (void) ok; assert(ok);
-                    observeAcquire(msg, locker);
-                    m = msg;
-                    return CONSUMED;
-                } else {
-                    //message(s) are available but consumer hasn't got enough 
credit
-                    QPID_LOG(debug, "Consumer can't currently accept message 
from '" << name << "'");
-                    messages->release(msg);
-                    return CANT_CONSUME;
                 }
-            } else {
-                //consumer will never want this message
+                c->position = msg.position;
+                c->deliver(msg);
+                return true;
+            case Consumer::RETRY:
+                // consumer wants this message, but cannot accept it at this 
time
+                QPID_LOG(debug, "Consumer can't currently accept message from 
'" << name << "'");
+                notifyListener();//let someone else try
+                return false;
+            case Consumer::SKIP:
+                // consumer will never want this message, continue looking...
                 QPID_LOG(debug, "Consumer doesn't want message from '" << name 
<< "'");
-                messages->release(msg);
-                return CANT_CONSUME;
+                c->position = msg.position;
+                continue;
             }
         } else {
             QPID_LOG(debug, "No messages to dispatch on queue '" << name << 
"'");
             listeners.addListener(c);
-            return NO_MESSAGES;
-        }
-    }
-}
-
-bool Queue::browseNextMessage(QueuedMessage& m, Consumer::shared_ptr& c)
-{
-    while (true) {
-        Mutex::ScopedLock locker(messageLock);
-        QueuedMessage msg;
-
-        if (!allocator->nextBrowsableMessage(c, msg)) { // no next available
-            QPID_LOG(debug, "No browsable messages available for consumer " <<
-                     c->getName() << " on queue '" << name << "'");
-            listeners.addListener(c);
             return false;
         }
-
-        if (c->filter(msg.payload) && !msg.payload->hasExpired()) {
-            if (c->accept(msg.payload)) {
-                //consumer wants the message
-                c->position = msg.position;
-                m = msg;
-                return true;
-            } else {
-                //browser hasn't got enough credit for the message
-                QPID_LOG(debug, "Browser can't currently accept message from 
'" << name << "'");
-                return false;
-            }
-        } else {
-            //consumer will never want this message, continue seeking
-            QPID_LOG(debug, "Browser skipping message from '" << name << "'");
-            c->position = msg.position;
-        }
-    }
-    return false;
-}
-
-void Queue::removeListener(Consumer::shared_ptr c)
-{
-    QueueListeners::NotificationSet set;
-    {
-        Mutex::ScopedLock locker(messageLock);
-        listeners.removeListener(c);
-        if (messages->size()) {
-            listeners.populate(set);
-        }
-    }
-    set.notify();
-}
-
-bool Queue::dispatch(Consumer::shared_ptr c)
-{
-    QueuedMessage msg(this);
-    if (getNextMessage(msg, c)) {
-        c->deliver(msg);
-        return true;
-    } else {
-        return false;
     }
 }
 

Modified: 
qpid/branches/qpid-3603-kgiusti/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-kgiusti/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=1208463&r1=1208462&r2=1208463&view=diff
==============================================================================
--- qpid/branches/qpid-3603-kgiusti/qpid/cpp/src/qpid/broker/SemanticState.cpp 
(original)
+++ qpid/branches/qpid-3603-kgiusti/qpid/cpp/src/qpid/broker/SemanticState.cpp 
Wed Nov 30 15:04:21 2011
@@ -300,8 +300,9 @@ class ReplicatingSubscription : public S
         ~DelegatingConsumer();
         bool deliver(QueuedMessage& msg);
         void notify();
-        bool filter(boost::intrusive_ptr<Message>);
-        bool accept(boost::intrusive_ptr<Message>);
+        //bool filter(boost::intrusive_ptr<Message>);
+        //bool accept(boost::intrusive_ptr<Message>);
+        Consumer::Action accept(const QueuedMessage&);
         OwnershipToken* getSession();
       private:
         ReplicatingSubscription& delegate;
@@ -498,7 +499,7 @@ SemanticState::ConsumerImpl::ConsumerImp
 
 
 ) :
-    Consumer(_name, _acquire),
+    Consumer(_name, _acquire, !_acquire),   /** @todo KAG - allow 
configuration of 'browse acquired' */
     parent(_parent),
     queue(_queue),
     ackExpected(ack),
@@ -572,6 +573,7 @@ bool SemanticState::ConsumerImpl::delive
     return true;
 }
 
+#if 0
 bool SemanticState::ConsumerImpl::filter(intrusive_ptr<Message>)
 {
     return true;
@@ -588,6 +590,25 @@ bool SemanticState::ConsumerImpl::accept
     blocked = !(filter(msg) && checkCredit(msg));
     return !blocked;
 }
+#else
+Consumer::Action SemanticState::ConsumerImpl::accept(const QueuedMessage& msg)
+{
+    /** @todo KAG if present, run selector against msg.payload */
+    bool selected = true;
+
+    /** @todo KAG - traditional consumers/browsers (non-selectors) will never 
skip, always ACCEPT (or RETRY if !credit) */
+    if (selected) {
+        if (!checkCredit(msg.payload))
+            return Consumer::RETRY;
+        return Consumer::ACCEPT;
+    } else {
+        return Consumer::SKIP;
+    }
+}
+#endif
+
+
+
 
 namespace {
 struct ConsumerName {
@@ -618,7 +639,7 @@ void SemanticState::ConsumerImpl::alloca
 
 }
 
-bool SemanticState::ConsumerImpl::checkCredit(intrusive_ptr<Message>& msg)
+bool SemanticState::ConsumerImpl::checkCredit(const intrusive_ptr<Message>& 
msg)
 {
     bool enoughCredit = msgCredit > 0 &&
         (byteCredit == 0xFFFFFFFF || byteCredit >= msg->getRequiredCredit());
@@ -1085,8 +1106,9 @@ bool ReplicatingSubscription::Delegating
     return delegate.deliver(m);
 }
 void ReplicatingSubscription::DelegatingConsumer::notify() { 
delegate.notify(); }
-bool 
ReplicatingSubscription::DelegatingConsumer::filter(boost::intrusive_ptr<Message>
 msg) { return delegate.filter(msg); }
-bool 
ReplicatingSubscription::DelegatingConsumer::accept(boost::intrusive_ptr<Message>
 msg) { return delegate.accept(msg); }
+//bool 
ReplicatingSubscription::DelegatingConsumer::filter(boost::intrusive_ptr<Message>
 msg) { return delegate.filter(msg); }
+//bool 
ReplicatingSubscription::DelegatingConsumer::accept(boost::intrusive_ptr<Message>
 msg) { return delegate.accept(msg); }
+Consumer::Action ReplicatingSubscription::DelegatingConsumer::accept(const 
QueuedMessage& msg) { return delegate.accept(msg); }
 OwnershipToken* ReplicatingSubscription::DelegatingConsumer::getSession() { 
return delegate.getSession(); }
 
 
ReplicationStateInitialiser::ReplicationStateInitialiser(qpid::framing::SequenceSet&
 r,

Modified: 
qpid/branches/qpid-3603-kgiusti/qpid/cpp/src/qpid/broker/SemanticState.h
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-kgiusti/qpid/cpp/src/qpid/broker/SemanticState.h?rev=1208463&r1=1208462&r2=1208463&view=diff
==============================================================================
--- qpid/branches/qpid-3603-kgiusti/qpid/cpp/src/qpid/broker/SemanticState.h 
(original)
+++ qpid/branches/qpid-3603-kgiusti/qpid/cpp/src/qpid/broker/SemanticState.h 
Wed Nov 30 15:04:21 2011
@@ -96,7 +96,7 @@ class SemanticState : private boost::non
         int deliveryCount;
         qmf::org::apache::qpid::broker::Subscription* mgmtObject;
 
-        bool checkCredit(boost::intrusive_ptr<Message>& msg);
+        bool checkCredit(const boost::intrusive_ptr<Message>& msg);
         void allocateCredit(boost::intrusive_ptr<Message>& msg);
         bool haveCredit();
 
@@ -114,8 +114,10 @@ class SemanticState : private boost::non
         virtual ~ConsumerImpl();
         OwnershipToken* getSession();
         virtual bool deliver(QueuedMessage& msg);
-        bool filter(boost::intrusive_ptr<Message> msg);
-        bool accept(boost::intrusive_ptr<Message> msg);
+        //bool filter(boost::intrusive_ptr<Message> msg);
+        //bool accept(boost::intrusive_ptr<Message> msg);
+        virtual Action accept(const QueuedMessage& msg);
+
 
         void disableNotify();
         void enableNotify();



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscr...@qpid.apache.org

Reply via email to