Author: aconway
Date: Fri Feb 17 14:04:43 2012
New Revision: 1245479

URL: http://svn.apache.org/viewvc?rev=1245479&view=rev
Log:
QPID-3603: Move broker::ReplicatingSubscription to ha namespace and ha plugin.

Added:
    qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/ConsumerFactory.h   
(with props)
    qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp  
 (contents, props changed)
      - copied, changed from r1245478, 
qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/ReplicatingSubscription.cpp
    qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h   
(contents, props changed)
      - copied, changed from r1245478, 
qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/ReplicatingSubscription.h
Removed:
    
qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/ReplicatingSubscription.cpp
    qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/ReplicatingSubscription.h
Modified:
    qpid/branches/qpid-3603-7/qpid/cpp/src/Makefile.am
    qpid/branches/qpid-3603-7/qpid/cpp/src/ha.mk
    qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/Broker.h
    qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/Consumer.h
    qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
    qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/DeliveryRecord.h
    qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/SemanticState.cpp
    qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/SemanticState.h

Modified: qpid/branches/qpid-3603-7/qpid/cpp/src/Makefile.am
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-7/qpid/cpp/src/Makefile.am?rev=1245479&r1=1245478&r2=1245479&view=diff
==============================================================================
--- qpid/branches/qpid-3603-7/qpid/cpp/src/Makefile.am (original)
+++ qpid/branches/qpid-3603-7/qpid/cpp/src/Makefile.am Fri Feb 17 14:04:43 2012
@@ -542,6 +542,7 @@ libqpidbroker_la_SOURCES = \
   qpid/broker/Consumer.h \
   qpid/broker/Credit.h \
   qpid/broker/Credit.cpp \
+  qpid/broker/ConsumerFactory.h \
   qpid/broker/Daemon.cpp \
   qpid/broker/Daemon.h \
   qpid/broker/Deliverable.h \
@@ -629,8 +630,6 @@ libqpidbroker_la_SOURCES = \
   qpid/broker/QueuedMessage.h \
   qpid/broker/QueueFlowLimit.h \
   qpid/broker/QueueFlowLimit.cpp \
-  qpid/broker/ReplicatingSubscription.h \
-  qpid/broker/ReplicatingSubscription.cpp \
   qpid/broker/RateFlowcontrol.h \
   qpid/broker/RecoverableConfig.h \
   qpid/broker/RecoverableExchange.h \

Modified: qpid/branches/qpid-3603-7/qpid/cpp/src/ha.mk
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-7/qpid/cpp/src/ha.mk?rev=1245479&r1=1245478&r2=1245479&view=diff
==============================================================================
--- qpid/branches/qpid-3603-7/qpid/cpp/src/ha.mk (original)
+++ qpid/branches/qpid-3603-7/qpid/cpp/src/ha.mk Fri Feb 17 14:04:43 2012
@@ -31,6 +31,8 @@ ha_la_SOURCES =                                       \
   qpid/ha/Settings.h                           \
   qpid/ha/QueueReplicator.h                    \
   qpid/ha/QueueReplicator.cpp                  \
+  qpid/ha/ReplicatingSubscription.h            \
+  qpid/ha/ReplicatingSubscription.cpp          \
   qpid/ha/WiringReplicator.cpp                 \
   qpid/ha/WiringReplicator.h
 

Modified: qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/Broker.h
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/Broker.h?rev=1245479&r1=1245478&r2=1245479&view=diff
==============================================================================
--- qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/Broker.h (original)
+++ qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/Broker.h Fri Feb 17 
14:04:43 2012
@@ -37,6 +37,7 @@
 #include "qpid/broker/Vhost.h"
 #include "qpid/broker/System.h"
 #include "qpid/broker/ExpiryPolicy.h"
+#include "qpid/broker/ConsumerFactory.h"
 #include "qpid/management/Manageable.h"
 #include "qpid/management/ManagementAgent.h"
 #include "qmf/org/apache/qpid/broker/Broker.h"
@@ -198,6 +199,7 @@ public:
     bool inCluster, clusterUpdatee;
     boost::intrusive_ptr<ExpiryPolicy> expiryPolicy;
     ConnectionCounter connectionCounter;
+    ConsumerFactories consumerFactories;
 
   public:
     virtual ~Broker();
@@ -356,6 +358,8 @@ public:
                 const std::string& key,
                 const std::string& userId,
                 const std::string& connectionId);
+
+    ConsumerFactories&  getConsumerFactories() { return consumerFactories; }
 };
 
 }}

Modified: qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/Consumer.h
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/Consumer.h?rev=1245479&r1=1245478&r2=1245479&view=diff
==============================================================================
--- qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/Consumer.h (original)
+++ qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/Consumer.h Fri Feb 17 
14:04:43 2012
@@ -31,6 +31,9 @@ namespace broker {
 class Queue;
 class QueueListeners;
 
+/**
+ * Base class for consumers which represent a subscription to a queue.
+ */
 class Consumer {
     const bool acquires;
     // inListeners allows QueueListeners to efficiently track if this instance 
is registered

Added: qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/ConsumerFactory.h
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/ConsumerFactory.h?rev=1245479&view=auto
==============================================================================
--- qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/ConsumerFactory.h (added)
+++ qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/ConsumerFactory.h Fri 
Feb 17 14:04:43 2012
@@ -0,0 +1,70 @@
+#ifndef QPID_BROKER_CONSUMERFACTORY_H
+#define QPID_BROKER_CONSUMERFACTORY_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+// TODO aconway 2011-11-25: it's ugly exposing SemanticState::ConsumerImpl in 
public.
+// Refactor to use a more abstract interface.
+
+#include "qpid/broker/SemanticState.h"
+
+namespace qpid {
+namespace broker {
+
+/**
+ * Base class for consumer factoires. Plugins can register a
+ * ConsumerFactory via Broker:: getConsumerFactories() Each time a
+ * conumer is created, each factory is tried in turn till one returns
+ * non-0.
+ */
+class ConsumerFactory
+{
+  public:
+    virtual ~ConsumerFactory() {}
+
+    virtual boost::shared_ptr<SemanticState::ConsumerImpl> create(
+        SemanticState* parent,
+        const std::string& name, boost::shared_ptr<Queue> queue,
+        bool ack, bool acquire, bool exclusive, const std::string& tag,
+        const std::string& resumeId, uint64_t resumeTtl, const 
framing::FieldTable& arguments) = 0;
+};
+
+/** A set of factories held by the broker
+ * THREAD UNSAFE: see notes on member functions.
+ */
+class ConsumerFactories {
+  public:
+    typedef std::vector<boost::shared_ptr<ConsumerFactory> > Factories;
+
+    /** Thread safety: May only be called during plug-in initialization. */
+    void add(const boost::shared_ptr<ConsumerFactory>& cf) { 
factories.push_back(cf); }
+
+    /** Thread safety: May only be called after plug-in initialization. */
+    const Factories& get() const { return factories; }
+
+  private:
+    Factories factories;
+};
+
+}} // namespace qpid::broker
+
+#endif  /*!QPID_BROKER_CONSUMERFACTORY_H*/

Propchange: qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/ConsumerFactory.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/ConsumerFactory.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp?rev=1245479&r1=1245478&r2=1245479&view=diff
==============================================================================
--- qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp 
(original)
+++ qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp Fri 
Feb 17 14:04:43 2012
@@ -37,7 +37,7 @@ DeliveryRecord::DeliveryRecord(const Que
                                bool _acquired,
                                bool accepted, 
                                bool _windowing,
-                               uint32_t _credit, bool _delayedCompletion) : 
msg(_msg),
+                               uint32_t _credit, bool _isDelayedCompletion) : 
msg(_msg),
                                                   queue(_queue), 
                                                   tag(_tag),
                                                   acquired(_acquired),
@@ -47,7 +47,7 @@ DeliveryRecord::DeliveryRecord(const Que
                                                   ended(accepted && acquired),
                                                   windowing(_windowing),
                                                   credit(msg.payload ? 
msg.payload->getRequiredCredit() : _credit),
-                                                  
delayedCompletion(_delayedCompletion)
+                                                  
isDelayedCompletion(_isDelayedCompletion)
 {}
 
 bool DeliveryRecord::setEnded()
@@ -115,7 +115,7 @@ bool DeliveryRecord::accept(TransactionC
     if (!ended) {
         if (acquired) {
             queue->dequeue(ctxt, msg);
-        } else if (delayedCompletion) {
+        } else if (isDelayedCompletion) {
             //TODO: this is a nasty way to do this; change it
             msg.payload->getIngressCompletion().finishCompleter();
             QPID_LOG(debug, "Completed " << msg.payload.get());

Modified: qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/DeliveryRecord.h
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/DeliveryRecord.h?rev=1245479&r1=1245478&r2=1245479&view=diff
==============================================================================
--- qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/DeliveryRecord.h 
(original)
+++ qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/DeliveryRecord.h Fri Feb 
17 14:04:43 2012
@@ -63,7 +63,7 @@ class DeliveryRecord
      * after that).
      */
     uint32_t credit;
-    bool delayedCompletion;
+    bool isDelayedCompletion;
 
   public:
     QPID_BROKER_EXTERN DeliveryRecord(const QueuedMessage& msg,
@@ -73,7 +73,7 @@ class DeliveryRecord
                                       bool accepted,
                                       bool windowing,
                                       uint32_t credit=0,       // Only used if 
msg is empty.
-                                      bool delayedCompletion=false
+                                      bool isDelayedCompletion=false
     );
     
     bool coveredBy(const framing::SequenceSet* const range) const { return 
range->contains(id); }

Modified: qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=1245479&r1=1245478&r2=1245479&view=diff
==============================================================================
--- qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/SemanticState.cpp 
(original)
+++ qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/SemanticState.cpp Fri 
Feb 17 14:04:43 2012
@@ -25,9 +25,7 @@
 #include "qpid/broker/DtxAck.h"
 #include "qpid/broker/DtxTimeout.h"
 #include "qpid/broker/Message.h"
-#include "qpid/ha/WiringReplicator.h"
 #include "qpid/broker/Queue.h"
-#include "qpid/broker/ReplicatingSubscription.h"
 #include "qpid/broker/SessionContext.h"
 #include "qpid/broker/SessionOutputException.h"
 #include "qpid/broker/TxAccept.h"
@@ -108,15 +106,25 @@ bool SemanticState::exists(const string&
 namespace {
     const std::string SEPARATOR("::");
 }
-    
+
 void SemanticState::consume(const string& tag,
                             Queue::shared_ptr queue, bool ackRequired, bool 
acquire,
-                            bool exclusive, const string& resumeId, uint64_t 
resumeTtl, const FieldTable& arguments)
+                            bool exclusive, const string& resumeId, uint64_t 
resumeTtl,
+                            const FieldTable& arguments)
 {
     // "tag" is only guaranteed to be unique to this session (see AMQP 0-10 
Message.subscribe, destination).
     // Create a globally unique name so the broker can identify individual 
consumers
     std::string name = session.getSessionId().str() + SEPARATOR + tag;
-    ConsumerImpl::shared_ptr c(ConsumerImpl::create(this, name, queue, 
ackRequired, acquire, exclusive, tag, resumeId, resumeTtl, arguments));
+    const ConsumerFactories::Factories& cf(
+        session.getBroker().getConsumerFactories().get());
+    ConsumerImpl::shared_ptr c;
+    for (ConsumerFactories::Factories::const_iterator i = cf.begin(); i != 
cf.end(); !c)
+        c = (*i)->create(this, name, queue, ackRequired, acquire, exclusive, 
tag,
+                         resumeId, resumeTtl, arguments);
+    if (!c)                     // Create plain consumer
+        c = ConsumerImpl::shared_ptr(
+            new ConsumerImpl(this, name, queue, ackRequired, acquire, 
exclusive, tag,
+                             resumeId, resumeTtl, arguments));
     queue->consume(c, exclusive);//may throw exception
     consumers[tag] = c;
 }
@@ -266,26 +274,6 @@ void SemanticState::record(const Deliver
 
 const std::string QPID_SYNC_FREQUENCY("qpid.sync_frequency");
 
-SemanticState::ConsumerImpl::shared_ptr 
SemanticState::ConsumerImpl::create(SemanticState* parent,
-                                                                            
const string& name,
-                                                                            
Queue::shared_ptr queue,
-                                                                            
bool ack,
-                                                                            
bool acquire,
-                                                                            
bool exclusive,
-                                                                            
const string& tag,
-                                                                            
const string& resumeId,
-                                                                            
uint64_t resumeTtl,
-                                                                            
const framing::FieldTable& arguments)
-{
-    if (arguments.isSet("qpid.replicating-subscription")) {
-        shared_ptr result(new ReplicatingSubscription(parent, name, queue, 
ack, acquire, exclusive, tag, resumeId, resumeTtl, arguments));
-        boost::dynamic_pointer_cast<ReplicatingSubscription>(result)->init();
-        return result;
-    } else {
-        return shared_ptr(new ConsumerImpl(parent, name, queue, ack, acquire, 
exclusive, tag, resumeId, resumeTtl, arguments));
-    }
-}
-
 SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent,
                                           const string& _name,
                                           Queue::shared_ptr _queue,
@@ -297,7 +285,6 @@ SemanticState::ConsumerImpl::ConsumerImp
                                           uint64_t _resumeTtl,
                                           const framing::FieldTable& _arguments
 
-
 ) :
     Consumer(_name, _acquire),
     parent(_parent),
@@ -354,7 +341,7 @@ bool SemanticState::ConsumerImpl::delive
 {
     assertClusterSafe();
     allocateCredit(msg.payload);
-    DeliveryRecord record(msg, msg.queue->shared_from_this(), getTag(), 
acquire, !ackExpected, credit.isWindowMode(), 0, dynamic_cast<const 
ReplicatingSubscription*>(this));
+    DeliveryRecord record(msg, msg.queue->shared_from_this(), getTag(), 
acquire, !ackExpected, credit.isWindowMode(), 0, isDelayedCompletion());
     bool sync = syncFrequency && ++deliveryCount >= syncFrequency;
     if (sync) deliveryCount = 0;//reset
     parent->deliver(record, sync);

Modified: qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/SemanticState.h
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/SemanticState.h?rev=1245479&r1=1245478&r2=1245479&view=diff
==============================================================================
--- qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/SemanticState.h 
(original)
+++ qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/SemanticState.h Fri Feb 
17 14:04:43 2012
@@ -153,11 +153,10 @@ class SemanticState : private boost::non
         management::ManagementObject* GetManagementObject (void) const;
         management::Manageable::status_t ManagementMethod (uint32_t methodId, 
management::Args& args, std::string& text);
 
-        static shared_ptr create(SemanticState* parent,
-                                 const std::string& name, 
boost::shared_ptr<Queue> queue,
-                                 bool ack, bool acquire, bool exclusive, const 
std::string& tag,
-                                 const std::string& resumeId, uint64_t 
resumeTtl, const framing::FieldTable& arguments);
-
+        /** This consumer wants delayed completion.
+         * Overridden by ConsumerImpl subclasses.
+         */
+        virtual bool isDelayedCompletion() const { return false; }
     };
 
     typedef std::map<std::string, DtxBuffer::shared_ptr> DtxBufferMap;

Copied: 
qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp 
(from r1245478, 
qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/ReplicatingSubscription.cpp)
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp?p2=qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp&p1=qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/ReplicatingSubscription.cpp&r1=1245478&r2=1245479&rev=1245479&view=diff
==============================================================================
--- 
qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/ReplicatingSubscription.cpp 
(original)
+++ qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp 
Fri Feb 17 14:04:43 2012
@@ -20,18 +20,19 @@
  */
 
 #include "ReplicatingSubscription.h"
-#include "Queue.h"
+#include "qpid/broker/Queue.h"
 #include "qpid/framing/AMQFrame.h"
 #include "qpid/framing/MessageTransferBody.h"
 #include "qpid/log/Statement.h"
 
 namespace qpid {
-namespace broker {
+namespace ha {
 
 using namespace framing;
+using namespace broker;
 
 const std::string DOLLAR("$");
-const std::string INTERNAL("_internall");
+const std::string INTERNAL("_internal");
 
 class ReplicationStateInitialiser
 {

Propchange: 
qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Copied: 
qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h (from 
r1245478, 
qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/ReplicatingSubscription.h)
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h?p2=qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h&p1=qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/ReplicatingSubscription.h&r1=1245478&r2=1245479&rev=1245479&view=diff
==============================================================================
--- 
qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/ReplicatingSubscription.h 
(original)
+++ qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h 
Fri Feb 17 14:04:43 2012
@@ -23,35 +23,48 @@
  */
 
 #include "qpid/broker/SemanticState.h"
+#include "qpid/broker/QueueObserver.h"
 
 namespace qpid {
+
 namespace broker {
+class Message;
+class Queue;
+class QueuedMessage;
+class OwnershipToken;
+}
+
+namespace ha {
 
 /**
  * Subscriber to a remote queue that replicates to a local queue.
  */
-class ReplicatingSubscription : public SemanticState::ConsumerImpl, public 
QueueObserver
+class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl,
+                                public broker::QueueObserver
 {
   public:
-    ReplicatingSubscription(SemanticState* parent,
-                            const std::string& name, boost::shared_ptr<Queue> 
queue,
+    ReplicatingSubscription(broker::SemanticState* parent,
+                            const std::string& name, 
boost::shared_ptr<broker::Queue> ,
                             bool ack, bool acquire, bool exclusive, const 
std::string& tag,
-                            const std::string& resumeId, uint64_t resumeTtl, 
const framing::FieldTable& arguments);
+                            const std::string& resumeId, uint64_t resumeTtl,
+                            const framing::FieldTable& arguments);
     ~ReplicatingSubscription();
 
     void init();
     void cancel();
-    bool deliver(QueuedMessage& msg);
-    void enqueued(const QueuedMessage&);
-    void dequeued(const QueuedMessage&);
-    void acquired(const QueuedMessage&) {}
-    void requeued(const QueuedMessage&) {}
+    bool deliver(broker::QueuedMessage& msg);
+    void enqueued(const broker::QueuedMessage&);
+    void dequeued(const broker::QueuedMessage&);
+    void acquired(const broker::QueuedMessage&) {}
+    void requeued(const broker::QueuedMessage&) {}
+
+    bool isDelayedCompletion() const { return true; }
 
   protected:
     bool doDispatch();
   private:
-    boost::shared_ptr<Queue> events;
-    boost::shared_ptr<Consumer> consumer;
+    boost::shared_ptr<broker::Queue> events;
+    boost::shared_ptr<broker::Consumer> consumer;
     qpid::framing::SequenceSet range;
 
     void generateDequeueEvent();
@@ -60,7 +73,7 @@ class ReplicatingSubscription : public S
       public:
         DelegatingConsumer(ReplicatingSubscription&);
         ~DelegatingConsumer();
-        bool deliver(QueuedMessage& msg);
+        bool deliver(broker::QueuedMessage& msg);
         void notify();
         bool filter(boost::intrusive_ptr<Message>);
         bool accept(boost::intrusive_ptr<Message>);

Propchange: 
qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscr...@qpid.apache.org

Reply via email to