Author: aconway Date: Fri Sep 30 20:55:40 2011 New Revision: 1177829 URL: http://svn.apache.org/viewvc?rev=1177829&view=rev Log: QPID-2920: Renamed Stoppable to Activity, clearer naming.
Added: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/sys/Activity.h (contents, props changed) - copied, changed from r1177676, qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/sys/Stoppable.h Removed: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/sys/Stoppable.h Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/broker/Queue.cpp qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/broker/Queue.h qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/broker/Queue.cpp URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/broker/Queue.cpp?rev=1177829&r1=1177828&r2=1177829&view=diff ============================================================================== --- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/broker/Queue.cpp (original) +++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/broker/Queue.cpp Fri Sep 30 20:55:40 2011 @@ -304,7 +304,7 @@ Queue::ConsumeCode Queue::consumeNextMes { while (true) { ClusterAcquireScope acquireScope; // Outside the lock - Stoppable::Scope consumeScope(consuming); + Activity::Scope consumeScope(consuming); Mutex::ScopedLock locker(messageLock); if (!consumeScope) { QPID_LOG(trace, "Queue stopped, can't consume: " << name); @@ -1288,6 +1288,8 @@ void Queue::startConsumers() { notifyListener(); } +bool Queue::isConsumingStopped() { return consuming.isStopped(); } + // Called when all busy threads exited after stopConsumers() void Queue::consumingStopped() { QPID_LOG(trace, "Stopped consumers on " << getName()); Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/broker/Queue.h URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/broker/Queue.h?rev=1177829&r1=1177828&r2=1177829&view=diff ============================================================================== --- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/broker/Queue.h (original) +++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/broker/Queue.h Fri Sep 30 20:55:40 2011 @@ -31,11 +31,10 @@ #include "qpid/broker/QueueBindings.h" #include "qpid/broker/QueueListeners.h" #include "qpid/broker/QueueObserver.h" - #include "qpid/framing/FieldTable.h" +#include "qpid/sys/Activity.h" #include "qpid/sys/AtomicValue.h" #include "qpid/sys/Monitor.h" -#include "qpid/sys/Stoppable.h" #include "qpid/sys/Timer.h" #include "qpid/management/Manageable.h" #include "qmf/org/apache/qpid/broker/Queue.h" @@ -130,7 +129,7 @@ class Queue : public boost::enable_share UsageBarrier barrier; int autoDeleteTimeout; boost::intrusive_ptr<qpid::sys::TimerTask> autoDeleteTask; - sys::Stoppable consuming; // Allow consumer threads to be stopped, used by cluster + sys::Activity consuming; // Allow consumer threads to be stopped, used by cluster boost::intrusive_ptr<RefCounted> clusterContext; // Used by cluster void push(boost::intrusive_ptr<Message>& msg, bool isRecovery=false); @@ -398,6 +397,9 @@ class Queue : public boost::enable_share /** Start consumers. */ void startConsumers(); + /** Return true if consumers are stopped */ + bool isConsumingStopped(); + /** Context information used in a cluster. */ boost::intrusive_ptr<RefCounted> getClusterContext() { return clusterContext; } void setClusterContext(boost::intrusive_ptr<RefCounted> context) { clusterContext = context; } Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp?rev=1177829&r1=1177828&r2=1177829&view=diff ============================================================================== --- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp (original) +++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp Fri Sep 30 20:55:40 2011 @@ -52,9 +52,9 @@ using namespace broker; namespace { const ProtocolVersion pv; // shorthand -// noReplicate means the current thread is handling a message -// received from the cluster so it should not be replicated. -QPID_TSS bool tssNoReplicate = false; +// True means the current thread is handling a local event that should be replicated. +// False means we're handling a cluster event it should not be replicated. +QPID_TSS bool tssReplicate = true; } // FIXME aconway 2011-09-26: de-const the broker::Cluster interface, @@ -72,13 +72,13 @@ Multicaster& BrokerContext::mcaster(cons } BrokerContext::ScopedSuppressReplication::ScopedSuppressReplication() { - assert(!tssNoReplicate); - tssNoReplicate = true; + assert(tssReplicate); + tssReplicate = false; } BrokerContext::ScopedSuppressReplication::~ScopedSuppressReplication() { - assert(tssNoReplicate); - tssNoReplicate = false; + assert(!tssReplicate); + tssReplicate = true; } BrokerContext::BrokerContext(Core& c, boost::intrusive_ptr<QueueHandler> q) @@ -88,7 +88,7 @@ BrokerContext::~BrokerContext() {} bool BrokerContext::enqueue(Queue& queue, const boost::intrusive_ptr<Message>& msg) { - if (tssNoReplicate) return true; + if (!tssReplicate) return true; // FIXME aconway 2010-10-20: replicate message in fragments // (frames), using fixed size bufffers. std::string data(msg->encodedSize(),char()); @@ -104,18 +104,18 @@ void BrokerContext::routing(const boost: void BrokerContext::routed(const boost::intrusive_ptr<Message>&) {} void BrokerContext::acquire(const broker::QueuedMessage& qm) { - if (tssNoReplicate) return; - mcaster(qm).mcast(ClusterMessageAcquireBody(pv, qm.queue->getName(), qm.position)); + if (tssReplicate) + mcaster(qm).mcast(ClusterMessageAcquireBody(pv, qm.queue->getName(), qm.position)); } void BrokerContext::dequeue(const broker::QueuedMessage& qm) { - if (!tssNoReplicate) + if (tssReplicate) mcaster(qm).mcast( ClusterMessageDequeueBody(pv, qm.queue->getName(), qm.position)); } void BrokerContext::requeue(const broker::QueuedMessage& qm) { - if (!tssNoReplicate) + if (tssReplicate) mcaster(qm).mcast(ClusterMessageRequeueBody( pv, qm.queue->getName(), @@ -125,7 +125,7 @@ void BrokerContext::requeue(const broker // FIXME aconway 2011-06-08: should be be using shared_ptr to q here? void BrokerContext::create(broker::Queue& q) { - if (tssNoReplicate) return; + if (!tssReplicate) return; assert(!QueueContext::get(q)); boost::intrusive_ptr<QueueContext> context( new QueueContext(q, core.getSettings().getConsumeLock(), mcaster(q.getName()))); @@ -137,12 +137,12 @@ void BrokerContext::create(broker::Queue } void BrokerContext::destroy(broker::Queue& q) { - if (tssNoReplicate) return; + if (!tssReplicate) return; mcaster(q).mcast(ClusterWiringDestroyQueueBody(pv, q.getName())); } void BrokerContext::create(broker::Exchange& ex) { - if (tssNoReplicate) return; + if (!tssReplicate) return; std::string data(ex.encodedSize(), '\0'); framing::Buffer buf(&data[0], data.size()); ex.encode(buf); @@ -150,7 +150,7 @@ void BrokerContext::create(broker::Excha } void BrokerContext::destroy(broker::Exchange& ex) { - if (tssNoReplicate) return; + if (!tssReplicate) return; mcaster(ex.getName()).mcast( ClusterWiringDestroyExchangeBody(pv, ex.getName())); } @@ -158,14 +158,14 @@ void BrokerContext::destroy(broker::Exch void BrokerContext::bind(broker::Queue& q, broker::Exchange& ex, const std::string& key, const framing::FieldTable& args) { - if (tssNoReplicate) return; + if (!tssReplicate) return; mcaster(q).mcast(ClusterWiringBindBody(pv, q.getName(), ex.getName(), key, args)); } void BrokerContext::unbind(broker::Queue& q, broker::Exchange& ex, const std::string& key, const framing::FieldTable& args) { - if (tssNoReplicate) return; + if (!tssReplicate) return; mcaster(q).mcast(ClusterWiringUnbindBody(pv, q.getName(), ex.getName(), key, args)); } Copied: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/sys/Activity.h (from r1177676, qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/sys/Stoppable.h) URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/sys/Activity.h?p2=qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/sys/Activity.h&p1=qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/sys/Stoppable.h&r1=1177676&r2=1177829&rev=1177829&view=diff ============================================================================== --- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/sys/Stoppable.h (original) +++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/sys/Activity.h Fri Sep 30 20:55:40 2011 @@ -1,5 +1,5 @@ -#ifndef QPID_SYS_STOPPABLE_H -#define QPID_SYS_STOPPABLE_H +#ifndef QPID_SYS_ACTIVITY_H +#define QPID_SYS_ACTIVITY_H /* * @@ -28,39 +28,42 @@ namespace qpid { namespace sys { /** - * An activity that may be executed by multiple threads, and can be stopped. - * - * Stopping prevents new threads from entering and calls a callback - * when all busy threads have left. + * An activity that may be executed by multiple threads concurrently. + * An activity has 3 states: + * - active: may have active threads, new threads may enter. + * - stopping: may have active threads but no new threads may enter. + * - stopped: no active threads and no new threads may enter. */ -class Stoppable { +class Activity { public: /** - * Initially not stopped. + * Initially active. *@param stoppedCallback: called when all threads have stopped. */ - Stoppable(boost::function<void()> stoppedCallback) + Activity(boost::function<void()> stoppedCallback) : busy(0), stopped(false), notify(stoppedCallback) {} - /** Mark the scope of a busy thread like this: + /** Mark the scope of an activity thread like this: * <pre> * { - * Stoppable::Scope working(stoppable); - * if (working) { do stuff } + * Activity::Scope working(activity); + * if (working) { do stuff } // Only if activity is active. * } * </pre> */ class Scope { - Stoppable& state; + Activity& state; bool entered; public: - Scope(Stoppable& s) : state(s) { entered = state.enter(); } + Scope(Activity& s) : state(s) { entered = state.enter(); } ~Scope() { if (entered) state.exit(); } operator bool() const { return entered; } }; friend class Scope; + // FIXME aconway 2011-09-30: fix pre-conditions with asserts, don't allow + // multiple stops/starts. /** * Set state to "stopped", so no new threads can enter. * Notify function will be called when all busy threads have left. @@ -80,8 +83,13 @@ class Stoppable { stopped = false; } - private: + /** True if Activity is stopped with no */ + bool isStopped() { + sys::Monitor::ScopedLock l(lock); + return stopped && busy == 0; + } + private: // Busy thread enters scope bool enter() { sys::Monitor::ScopedLock l(lock); @@ -110,4 +118,4 @@ class Stoppable { }} // namespace qpid::sys -#endif /*!QPID_SYS_STOPPABLE_H*/ +#endif /*!QPID_SYS_ACTIVITY_H*/ Propchange: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/sys/Activity.h ------------------------------------------------------------------------------ svn:eol-style = native Propchange: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/sys/Activity.h ------------------------------------------------------------------------------ svn:keywords = Rev Date --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscr...@qpid.apache.org