Author: aconway
Date: Fri Sep 16 20:17:25 2011
New Revision: 1171757

URL: http://svn.apache.org/viewvc?rev=1171757&view=rev
Log:
QPID-2920: Fixing QueueContext state transtions for timed ownership.
 
- Renamed release to requeue for Cluster interface.

Added:
    
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/CountdownTimer.h   
(with props)
Modified:
    qpid/branches/qpid-2920-active/qpid/cpp/src/cluster.mk
    qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/broker/Cluster.h
    qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/broker/NullCluster.h
    qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/broker/Queue.cpp
    
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp
    qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/BrokerContext.h
    
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp
    
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/MessageHandler.h
    
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp
    qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueContext.h
    
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp
    
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp
    qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueReplica.h
    qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/README.txt
    
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp
    qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/overview.h
    qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/types.h
    qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/sys/Stoppable.h
    qpid/branches/qpid-2920-active/qpid/cpp/src/tests/qpid-receive.cpp
    qpid/branches/qpid-2920-active/qpid/cpp/xml/cluster.xml

Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/cluster.mk
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/cluster.mk?rev=1171757&r1=1171756&r2=1171757&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/cluster.mk (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/cluster.mk Fri Sep 16 20:17:25 
2011
@@ -115,6 +115,7 @@ cluster2_la_SOURCES =                               \
        qpid/cluster/exp/BrokerContext.h        \
        qpid/cluster/exp/BufferFactory.h        \
        qpid/cluster/exp/Cluster2Plugin.cpp     \
+       qpid/cluster/exp/CountdownTimer.h       \
        qpid/cluster/exp/Core.cpp               \
        qpid/cluster/exp/Core.h                 \
        qpid/cluster/exp/EventHandler.cpp       \

Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/broker/Cluster.h
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/broker/Cluster.h?rev=1171757&r1=1171756&r2=1171757&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/broker/Cluster.h (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/broker/Cluster.h Fri Sep 
16 20:17:25 2011
@@ -57,7 +57,7 @@ class Cluster
 
     /** A message is delivered to a queue.
      * Called before actually pushing the message to the queue.
-     *@return If true the message should be enqueued now, false for delayed 
enqueue.
+     *@return If true the message should be enqueued now, false if it will be 
enqueued later.
      */
     virtual bool enqueue(Queue& queue, const boost::intrusive_ptr<Message>&) = 
0;
 
@@ -68,10 +68,11 @@ class Cluster
     virtual void acquire(const QueuedMessage&) = 0;
 
     /** A locally-acquired message is released by the consumer and re-queued. 
*/
-    virtual void release(const QueuedMessage&) = 0;
+    virtual void requeue(const QueuedMessage&) = 0;
 
     /** A message is removed from the queue.
-     *@return true if the message should be dequeued, false for delayed 
dequeue.
+     *@return true if the message should be dequeued now, false if it
+     * will be dequeued later.
      */
     virtual bool dequeue(const QueuedMessage&) = 0;
 

Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/broker/NullCluster.h
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/broker/NullCluster.h?rev=1171757&r1=1171756&r2=1171757&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/broker/NullCluster.h 
(original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/broker/NullCluster.h Fri 
Sep 16 20:17:25 2011
@@ -41,7 +41,7 @@ class NullCluster : public Cluster
     virtual bool enqueue(Queue&, const boost::intrusive_ptr<Message>&) { 
return true; }
     virtual void routed(const boost::intrusive_ptr<Message>&) {}
     virtual void acquire(const QueuedMessage&) {}
-    virtual void release(const QueuedMessage&) {}
+    virtual void requeue(const QueuedMessage&) {}
     virtual bool dequeue(const QueuedMessage&) { return false; }
 
     // Consumers

Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/broker/Queue.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/broker/Queue.cpp?rev=1171757&r1=1171756&r2=1171757&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/broker/Queue.cpp Fri Sep 
16 20:17:25 2011
@@ -227,7 +227,7 @@ void Queue::requeue(const QueuedMessage&
             }
         }
     }
-    if (broker) broker->getCluster().release(msg); // FIXME aconway 
2011-09-12: review. rename requeue?
+    if (broker) broker->getCluster().requeue(msg); // FIXME aconway 
2011-09-12: review. rename requeue?
     copy.notify();
 }
 

Modified: 
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp?rev=1171757&r1=1171756&r2=1171757&view=diff
==============================================================================
--- 
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp 
(original)
+++ 
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp 
Fri Sep 16 20:17:25 2011
@@ -28,7 +28,7 @@
 #include "qpid/framing/ClusterMessageEnqueueBody.h"
 #include "qpid/framing/ClusterMessageAcquireBody.h"
 #include "qpid/framing/ClusterMessageDequeueBody.h"
-#include "qpid/framing/ClusterMessageReleaseBody.h"
+#include "qpid/framing/ClusterMessageRequeueBody.h"
 #include "qpid/framing/ClusterWiringCreateQueueBody.h"
 #include "qpid/framing/ClusterWiringCreateExchangeBody.h"
 #include "qpid/framing/ClusterWiringDestroyQueueBody.h"
@@ -93,9 +93,7 @@ bool BrokerContext::enqueue(Queue& queue
         core.getRoutingMap().put(tssRoutingId, msg);
     }
     core.mcast(ClusterMessageEnqueueBody(ProtocolVersion(), tssRoutingId, 
queue.getName()));
-    // TODO aconway 2010-10-21: review delivery options: strict (wait
-    // for CPG delivery vs loose (local deliver immediately).
-    return false; // Strict delivery, cluster will call Queue deliver.
+    return false; // Strict order, wait for CPG self-delivery to enqueue.
 }
 
 void BrokerContext::routed(const boost::intrusive_ptr<Message>&) {
@@ -113,22 +111,27 @@ void BrokerContext::acquire(const broker
 }
 
 bool BrokerContext::dequeue(const broker::QueuedMessage& qm) {
+    // FIXME aconway 2011-09-15: should dequeue locally immediately
+    // instead of waiting for redeliver. No need for CPG order on
+    // dequeues.
     if (!tssNoReplicate)
         core.mcast(ClusterMessageDequeueBody(
                        ProtocolVersion(), qm.queue->getName(), qm.position));
     return false;               // FIXME aconway 2011-09-14: needed?
 }
 
-// FIXME aconway 2011-09-14: rename requeue?
-void BrokerContext::release(const broker::QueuedMessage& qm) {
+void BrokerContext::requeue(const broker::QueuedMessage& qm) {
     if (!tssNoReplicate)
-        core.mcast(ClusterMessageReleaseBody(
-                       ProtocolVersion(), qm.queue->getName(), qm.position, 
qm.payload->getRedelivered()));
+        core.mcast(ClusterMessageRequeueBody(
+                       ProtocolVersion(),
+                       qm.queue->getName(),
+                       qm.position,
+                       qm.payload->getRedelivered()));
 }
 
 // FIXME aconway 2011-06-08: should be be using shared_ptr to q here?
 void BrokerContext::create(broker::Queue& q) {
-    q.stopConsumers();          // FIXME aconway 2011-09-14: Stop queue 
initially.
+    q.stopConsumers();          // Stop queue initially.
     if (tssNoReplicate) return;
     assert(!QueueContext::get(q));
     boost::intrusive_ptr<QueueContext> context(
@@ -192,8 +195,8 @@ void BrokerContext::empty(broker::Queue&
 
 void BrokerContext::stopped(broker::Queue& q) {
     boost::intrusive_ptr<QueueContext> qc = QueueContext::get(q);
-    // Don't forward the stopped call if the queue does not yet have a cluster 
context
-    // this when the queue is first created locally.
+    // Don't forward the stopped call if the queue does not yet have a
+    // cluster context this when the queue is first created locally.
     if (qc) qc->stopped();
 }
 

Modified: 
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/BrokerContext.h
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/BrokerContext.h?rev=1171757&r1=1171756&r2=1171757&view=diff
==============================================================================
--- 
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/BrokerContext.h 
(original)
+++ 
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/BrokerContext.h 
Fri Sep 16 20:17:25 2011
@@ -58,7 +58,7 @@ class BrokerContext : public broker::Clu
     void routed(const boost::intrusive_ptr<broker::Message>&);
     void acquire(const broker::QueuedMessage&);
     bool dequeue(const broker::QueuedMessage&);
-    void release(const broker::QueuedMessage&);
+    void requeue(const broker::QueuedMessage&);
 
     // Consumers
 

Added: 
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/CountdownTimer.h
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/CountdownTimer.h?rev=1171757&view=auto
==============================================================================
--- 
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/CountdownTimer.h 
(added)
+++ 
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/CountdownTimer.h 
Fri Sep 16 20:17:25 2011
@@ -0,0 +1,95 @@
+#ifndef QPID_CLUSTER_EXP_COUNTDOWNTIMER_H
+#define QPID_CLUSTER_EXP_COUNTDOWNTIMER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright countdown.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/Timer.h"
+#include <boost/function.hpp>
+
+namespace qpid {
+namespace cluster {
+
+/** Manage the CountdownTimeout */
+class CountdownTimer {
+  public:
+    /**
+     * Resettable count-down timer for a fixed interval.
+     *@param cb callback when countdown expires.
+     *@param t Timer to  use for countdown.
+     *@param d duration of countdown.
+     */
+    CountdownTimer(boost::function<void()> cb, sys::Timer& t, sys::Duration d)
+        : task(new Task(*this, d)), timerRunning(false), callback(cb), 
timer(t) {}
+
+    ~CountdownTimer() { stop(); }
+
+    /** Start the countdown if not already started. */
+    void start() {
+        sys::Mutex::ScopedLock l(lock);
+        if (!timerRunning) {
+            timerRunning = true;
+            task->restart();
+            timer.add(task);
+        }
+    }
+
+    /** Stop the countdown if not already stopped. */
+    void stop() {
+        sys::Mutex::ScopedLock l(lock);
+        if (timerRunning) {
+            timerRunning = false;
+            task->cancel();
+        }
+    }
+
+  private:
+
+    class Task : public sys::TimerTask {
+        CountdownTimer& parent;
+      public:
+        Task(CountdownTimer& ct, const sys::Duration& d) :
+            TimerTask(d, "CountdownTimer::Task"), parent(ct) {}
+        void fire() { parent.fire(); }
+    };
+
+    // Called when countdown expires.
+    void fire() {
+        bool doCallback = false;
+        {
+            sys::Mutex::ScopedLock l(lock);
+            doCallback = timerRunning;
+            timerRunning = false;
+        }
+        if (doCallback) callback();
+    }
+
+    sys::Mutex lock;
+    boost::intrusive_ptr<Task> task;
+    bool timerRunning;
+    boost::function<void()> callback;
+    sys::Timer& timer;
+};
+
+
+}} // namespace qpid::cluster
+
+#endif  /*!QPID_CLUSTER_EXP_COUNTDOWNTIMER_H*/

Propchange: 
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/CountdownTimer.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/CountdownTimer.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: 
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp?rev=1171757&r1=1171756&r2=1171757&view=diff
==============================================================================
--- 
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp 
(original)
+++ 
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp 
Fri Sep 16 20:17:25 2011
@@ -95,14 +95,15 @@ void MessageHandler::acquire(const std::
         boost::shared_ptr<Queue> queue = findQueue(q, "Cluster acquire 
failed");
         QueuedMessage qm;
         BrokerContext::ScopedSuppressReplication ssr;
-        bool ok = queue->acquireMessageAt(position, qm);
-        (void)ok;               // Avoid unused variable warnings.
-        assert(ok);             // FIXME aconway 2011-09-14: error handling
+        if (!queue->acquireMessageAt(position, qm))
+            throw Exception(QPID_MSG("Cluster acquire: message not found: "
+                                     << q << "[" << position << "]"));
         assert(qm.position.getValue() == position);
         assert(qm.payload);
-        // Save for possible requeue.
+        // Save on context for possible requeue if released/rejected.
         QueueContext::get(*queue)->acquire(qm);
     }
+    // FIXME aconway 2011-09-15: systematic logging across cluster module.
     QPID_LOG(trace, "cluster message " << q << "[" << position
              << "] acquired by " << PrettyId(sender(), self()));
  }
@@ -124,11 +125,9 @@ void MessageHandler::dequeue(const std::
     }
 }
 
-// FIXME aconway 2011-09-14: rename as requeue?
-void MessageHandler::release(const std::string& q, uint32_t position, bool 
redelivered) {
-    // FIXME aconway 2011-09-15: review release/requeue logic.
+void MessageHandler::requeue(const std::string& q, uint32_t position, bool 
redelivered) {
     if (sender() != self()) {
-        boost::shared_ptr<Queue> queue = findQueue(q, "Cluster release 
failed");
+        boost::shared_ptr<Queue> queue = findQueue(q, "Cluster requeue 
failed");
         QueueContext::get(*queue)->requeue(position, redelivered);
     }
 }

Modified: 
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/MessageHandler.h
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/MessageHandler.h?rev=1171757&r1=1171756&r2=1171757&view=diff
==============================================================================
--- 
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/MessageHandler.h 
(original)
+++ 
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/MessageHandler.h 
Fri Sep 16 20:17:25 2011
@@ -60,7 +60,7 @@ class MessageHandler : public framing::A
     void routed(uint32_t routingId);
     void acquire(const std::string& queue, uint32_t position);
     void dequeue(const std::string& queue, uint32_t position);
-    void release(const std::string& queue, uint32_t position, bool 
redelivered);
+    void requeue(const std::string& queue, uint32_t position, bool 
redelivered);
 
   private:
     struct Member {

Modified: 
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp?rev=1171757&r1=1171756&r2=1171757&view=diff
==============================================================================
--- 
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp 
(original)
+++ 
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp 
Fri Sep 16 20:17:25 2011
@@ -31,63 +31,54 @@
 #include "qpid/broker/Queue.h"
 #include "qpid/broker/QueuedMessage.h"
 #include "qpid/log/Statement.h"
-#include "qpid/sys/Timer.h"
 
 namespace qpid {
 namespace cluster {
 
-
-class OwnershipTimeout : public sys::TimerTask {
-    QueueContext& queueContext;
-
-  public:
-    OwnershipTimeout(QueueContext& qc, const sys::Duration& interval) :
-        TimerTask(interval, "QueueContext::OwnershipTimeout"), 
queueContext(qc) {}
-
-    void fire() { queueContext.timeout(); }
-};
-
+// FIXME aconway 2011-09-16: configurable timeout.
 QueueContext::QueueContext(broker::Queue& q, Multicaster& m)
-    : timer(q.getBroker()->getTimer()), queue(q), mcast(m), consumers(0)
+    : ownership(UNSUBSCRIBED),
+      timer(boost::bind(&QueueContext::timeout, this),
+            q.getBroker()->getTimer(),
+            100*sys::TIME_MSEC),
+      queue(q), mcast(m), consumers(0)
 {
     q.setClusterContext(boost::intrusive_ptr<QueueContext>(this));
 }
 
-QueueContext::~QueueContext() {
-    if (timerTask) timerTask->cancel();
-}
+QueueContext::~QueueContext() {}
 
-void QueueContext::cancelTimer(const sys::Mutex::ScopedLock&) {
-    if (timerTask) {        // no need for timeout, sole owner.
-        timerTask->cancel();
-        timerTask = 0;
-    }
+// Invariant for ownership:
+// UNSUBSCRIBED, SUBSCRIBED => timer stopped, queue stopped
+// SOLE_OWNER => timer stopped, queue started
+// SHARED_OWNER => timer started, queue started
+
+namespace {
+bool isOwner(QueueOwnership o) { return o == SOLE_OWNER || o == SHARED_OWNER; }
 }
 
 // Called by QueueReplica in CPG deliver thread when state changes.
-void QueueContext::replicaState(QueueOwnership state) {
+void QueueContext::replicaState(QueueOwnership newOwnership) {
     sys::Mutex::ScopedLock l(lock);
-    switch (state) {
-      case UNSUBSCRIBED:
-      case SUBSCRIBED:
-        cancelTimer(l);
-        queue.stopConsumers();
-        break;
-      case SOLE_OWNER:
-        cancelTimer(l);         // Sole owner, no need for timer.
-        queue.startConsumers();
-        break;
-      case SHARED_OWNER:
-        cancelTimer(l);
+    QueueOwnership before = ownership;
+    QueueOwnership after = newOwnership;
+    ownership = after;
+    if (!isOwner(before) && !isOwner(after))
+        ;   // Nothing to do, now ownership change on this transition.
+    else if (isOwner(before) && !isOwner(after)) // Lost ownership
+        ; // Nothing to do, queue and timer were stopped before
+          // sending unsubscribe/resubscribe.
+    else if (!isOwner(before) && isOwner(after)) { // Took ownership
         queue.startConsumers();
-        // FIXME aconway 2011-07-28: configurable interval.
-        timerTask = new OwnershipTimeout(*this, 100*sys::TIME_MSEC);
-        timer.add(timerTask);
-        break;
+        if (after == SHARED_OWNER) timer.start();
+    }
+    else if (isOwner(before) && isOwner(after) && before != after) {
+        if (after == SOLE_OWNER) timer.stop();
+        else timer.start();
     }
 }
 
-// FIXME aconway 2011-07-27: Dont spin token on an empty queue.
+// FIXME aconway 2011-07-27: Dont spin the token on an empty or idle queue.
 
 // Called in connection threads when a consumer is added
 void QueueContext::consume(size_t n) {
@@ -102,14 +93,16 @@ void QueueContext::cancel(size_t n) {
     sys::Mutex::ScopedLock l(lock);
     consumers = n;
     // When consuming threads are stopped, this->stopped will be called.
-    if (n == 0) queue.stopConsumers(); // FIXME aconway 2011-07-28: Ok inside 
lock?
+    if (n == 0) {
+        timer.stop();
+        queue.stopConsumers(); // FIXME aconway 2011-07-28: Ok inside lock?
+    }
 }
 
 // Called in timer thread.
 void QueueContext::timeout() {
-    // FIXME aconway 2011-09-14: need to deal with stray timeouts.
-    queue.stopConsumers();
     // When all threads have stopped, queue will call stopped()
+    queue.stopConsumers();
 }
 
 // Callback set up by queue.stopConsumers() called in connection thread.
@@ -117,18 +110,16 @@ void QueueContext::timeout() {
 void QueueContext::stopped() {
     sys::Mutex::ScopedLock l(lock);
     // FIXME aconway 2011-07-28: review thread safety of state.
-    // Deffered call to stopped doesn't sit well.
-    // queueActive is invalid while stop is in progress?
     if (consumers == 0)
         mcast.mcast(framing::ClusterQueueUnsubscribeBody(
                         framing::ProtocolVersion(), queue.getName()));
-    else                        // FIXME aconway 2011-09-13: check if we're 
owner?
+    else            // FIXME aconway 2011-09-13: check if we're owner?
         mcast.mcast(framing::ClusterQueueResubscribeBody(
                         framing::ProtocolVersion(), queue.getName()));
 }
 
 void QueueContext::requeue(uint32_t position, bool redelivered) {
-    // FIXME aconway 2011-09-15: no lock, unacked has its own lock.
+    // No lock, unacked has its own lock.
     broker::QueuedMessage qm;
     if (unacked.get(position, qm)) {
         unacked.erase(position);

Modified: 
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueContext.h
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueContext.h?rev=1171757&r1=1171756&r2=1171757&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueContext.h 
(original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueContext.h 
Fri Sep 16 20:17:25 2011
@@ -23,9 +23,10 @@
  */
 
 #include "LockedMap.h"
-#include <qpid/RefCounted.h>
+#include "CountdownTimer.h"
+#include "qpid/RefCounted.h"
 #include "qpid/sys/Time.h"
-#include <qpid/sys/Mutex.h>
+#include "qpid/sys/Mutex.h"
 #include "qpid/cluster/types.h"
 #include <boost/intrusive_ptr.hpp>
 
@@ -37,10 +38,6 @@ namespace broker {
 class Queue;
 class QueuedMessage;
 }
-namespace sys {
-class Timer;
-class TimerTask;
-}
 
 namespace cluster {
 
@@ -91,18 +88,15 @@ class QueueContext : public RefCounted {
     void dequeue(uint32_t position);
 
   private:
-    sys::Timer& timer;
-
     sys::Mutex lock;
+    QueueOwnership ownership;
+    CountdownTimer timer;
     broker::Queue& queue;       // FIXME aconway 2011-06-08: should be 
shared/weak ptr?
     Multicaster& mcast;
-    boost::intrusive_ptr<sys::TimerTask> timerTask;
     size_t consumers;
 
     typedef LockedMap<uint32_t, broker::QueuedMessage> UnackedMap; // FIXME 
aconway 2011-09-15: don't need read/write map? Rename
     UnackedMap unacked;
-
-    void cancelTimer(const sys::Mutex::ScopedLock& l);
 };
 
 }} // namespace qpid::cluster

Modified: 
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp?rev=1171757&r1=1171756&r2=1171757&view=diff
==============================================================================
--- 
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp 
(original)
+++ 
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp 
Fri Sep 16 20:17:25 2011
@@ -52,7 +52,6 @@ void QueueHandler::resubscribe(const std
 
 void QueueHandler::left(const MemberId& member) {
     // Unsubscribe for members that leave.
-    // FIXME aconway 2011-06-28: also need to re-queue acquired messages.
     for (QueueMap::iterator i = queues.begin(); i != queues.end(); ++i)
         i->second->unsubscribe(member);
 }
@@ -66,6 +65,7 @@ void QueueHandler::add(boost::shared_ptr
     // Local queues already have a context, remote queues need one.
     if (!QueueContext::get(*q))
         new QueueContext(*q, multicaster); // Context attaches itself to the 
Queue
+    // FIXME aconway 2011-09-15: thread safety: called from wiring handler..
     queues[q->getName()] = boost::intrusive_ptr<QueueReplica>(
         new QueueReplica(q, self()));
 }

Modified: 
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp?rev=1171757&r1=1171756&r2=1171757&view=diff
==============================================================================
--- 
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp 
(original)
+++ 
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp 
Fri Sep 16 20:17:25 2011
@@ -83,8 +83,7 @@ void QueueReplica::resubscribe(const Mem
 void QueueReplica::update(QueueOwnership before) {
     QPID_LOG(trace, "cluster: queue replica " << *this << " (was " << before 
<< ")");
     QueueOwnership after = getState();
-    if (before == after) return;
-    context->replicaState(after);
+    if (before != after) context->replicaState(after);
 }
 
 QueueOwnership QueueReplica::getState() const {

Modified: 
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueReplica.h
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueReplica.h?rev=1171757&r1=1171756&r2=1171757&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueReplica.h 
(original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueReplica.h 
Fri Sep 16 20:17:25 2011
@@ -56,7 +56,7 @@ class QueueReplica : public RefCounted
     void resubscribe(const MemberId&);
 
     MemberId getSelf() const { return self; }
-    
+
   private:
     typedef std::deque<MemberId> MemberQueue;
 

Modified: 
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/README.txt
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/README.txt?rev=1171757&r1=1171756&r2=1171757&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/README.txt 
(original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/README.txt Fri 
Sep 16 20:17:25 2011
@@ -1,2 +1,4 @@
 
 Experimental code to test ideas about a new cluster design.
+
+See overview.h

Modified: 
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp?rev=1171757&r1=1171756&r2=1171757&view=diff
==============================================================================
--- 
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp 
(original)
+++ 
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp 
Fri Sep 16 20:17:25 2011
@@ -56,22 +56,23 @@ bool WiringHandler::invoke(const framing
 void WiringHandler::createQueue(const std::string& data) {
     // FIXME aconway 2011-05-25: Needs async completion.
     std::string name;
-    if (sender() != self()) {   // Created by another member, need to create 
locally.
+    if (sender() != self()) { // Created by another member, need to create 
locally.
         BrokerContext::ScopedSuppressReplication ssr;
         framing::Buffer buf(const_cast<char*>(&data[0]), data.size());
         // TODO aconway 2011-02-21: asymetric - RecoveryManager vs 
Broker::create*()
         RecoverableQueue::shared_ptr rq = recovery.recoverQueue(buf);
         name = rq->getName();
     }
-    else {                      // Created locally, Queue and QueueContext 
already exist.
+    else {   // Created locally, Queue and QueueContext already exist.
         framing::Buffer buffer(const_cast<char*>(&data[0]), data.size());
         // FIXME aconway 2011-05-10: implicit knowledge of queue encoding.
         buffer.getShortString(name);
     }
     boost::shared_ptr<broker::Queue> q = broker.getQueues().find(name);
     assert(q);                  // FIXME aconway 2011-05-10: error handling.
-    // TODO aconway 2011-05-10: if we implement multi-group for queues then
-    // this call is a problem: comes from wiring delivery thread, not queues.
+    // TODO aconway 2011-05-10: if we implement multi-group for queues
+    // then this call is a potential problem: comes from wiring
+    // delivery thread, not queues.
     queueHandler->add(q);
     QPID_LOG(debug, "cluster: create queue " << q->getName());
 }

Modified: 
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/overview.h
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/overview.h?rev=1171757&r1=1171756&r2=1171757&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/overview.h 
(original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/overview.h Fri 
Sep 16 20:17:25 2011
@@ -3,10 +3,14 @@
 
 <h1>New cluster implementation overview</h>
 
-There are 3 areas indicated by a suffix on class names:
+The code is broken down into 3 areas indicated by a suffix on class names:
 
-- Replica: State that is replicated to the entire cluster. Only called by 
Handlers in the deliver thread.
-- Context: State that is private to this member. Called by both Replia and 
broker objects in deliver and connection threads.
+- Replica: State that is replicated to the entire cluster.
+  Only called by Handlers in the deliver thread. May call on Contexts.
+
+- Context: State private to this member and associated with a local entity
+  such as the Broker or a Queue. Called in deliver and connection threads.
+  
 - Handler: Dispatch CPG messages by calling Replica objects in the deliver 
thread.
 
 

Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/types.h
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/types.h?rev=1171757&r1=1171756&r2=1171757&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/types.h (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/types.h Fri Sep 16 
20:17:25 2011
@@ -82,8 +82,6 @@ std::ostream& operator<<(std::ostream&, 
 /** Number to identify a message being routed. */
 typedef uint32_t RoutingId;
 
-// FIXME aconway 2011-07-28: can we put these 2 back  in the
-// QueueReplica & QueueContext?
 /** State of a queue with respect to a cluster member. */
 enum QueueOwnership {
     UNSUBSCRIBED,

Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/sys/Stoppable.h
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/sys/Stoppable.h?rev=1171757&r1=1171756&r2=1171757&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/sys/Stoppable.h (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/sys/Stoppable.h Fri Sep 16 
20:17:25 2011
@@ -27,8 +27,6 @@
 namespace qpid {
 namespace sys {
 
-// FIXME aconway 2011-05-25: needs better name
-
 /**
  * An activity that may be executed by multiple threads, and can be stopped.
  *
@@ -72,7 +70,7 @@ class Stoppable {
         sys::Monitor::ScopedLock l(lock);
         if (stopped) return;
         stopped = true;
-        check();
+        check(l);
     }
 
     /** Set the state to "started", allow threads to enter.
@@ -97,10 +95,10 @@ class Stoppable {
         sys::Monitor::ScopedLock l(lock);
         assert(busy > 0);
         --busy;
-        check();
+        check(l);
     }
 
-    void check() {
+    void check(const sys::Monitor::ScopedLock&) {
         // Called with lock held.
         if (stopped && busy == 0 && notify) notify();
     }

Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/tests/qpid-receive.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/tests/qpid-receive.cpp?rev=1171757&r1=1171756&r2=1171757&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/tests/qpid-receive.cpp 
(original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/tests/qpid-receive.cpp Fri Sep 
16 20:17:25 2011
@@ -198,7 +198,6 @@ int main(int argc, char ** argv)
             std::map<std::string,Sender> replyTo;
 
             while (!done && receiver.fetch(msg, timeout)) {
-                cerr << "FIXME " << msg.getProperties()[SN] << endl;
                 if (!started) {
                     // Start the time on receipt of the first message to avoid 
counting
                     // idle time at process startup.
@@ -208,7 +207,6 @@ int main(int argc, char ** argv)
                 reporter.message(msg);
                 if (!opts.ignoreDuplicates || 
!sequenceTracker.isDuplicate(msg)) {
                     if (msg.getContent() == EOS) {
-                        cerr << "FIXME eos" << endl;
                         done = true;
                     } else {
                         ++count;
@@ -227,7 +225,6 @@ int main(int argc, char ** argv)
                         if (opts.printContent)
                             std::cout << msg.getContent() << std::endl;//TODO: 
handle map or list messages
                         if (opts.messages && count >= opts.messages) {
-                            cerr << "FIXME "<< count << " >= " << 
opts.messages << endl;
                             done = true;
                         }
                     }

Modified: qpid/branches/qpid-2920-active/qpid/cpp/xml/cluster.xml
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/xml/cluster.xml?rev=1171757&r1=1171756&r2=1171757&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/xml/cluster.xml (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/xml/cluster.xml Fri Sep 16 20:17:25 
2011
@@ -358,7 +358,7 @@
       <field name="position" type="uint32"/>
     </control>
 
-    <control name="release" code="0x6">
+    <control name="requeue" code="0x6">
       <field name="queue" type="queue.name"/>
       <field name="position" type="uint32"/>
       <field name="redelivered" type="bit"/>



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscr...@qpid.apache.org

Reply via email to