Author: aconway
Date: Fri Feb 17 14:02:26 2012
New Revision: 1245464

URL: http://svn.apache.org/viewvc?rev=1245464&view=rev
Log:
QPID-3603: Move class ReplicatingSubscription into its  own files.

Added:
    
qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/ReplicatingSubscription.cpp  
 (with props)
    
qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/ReplicatingSubscription.h   
(with props)
Modified:
    qpid/branches/qpid-3603-7/qpid/cpp/src/Makefile.am
    qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/SemanticState.cpp

Modified: qpid/branches/qpid-3603-7/qpid/cpp/src/Makefile.am
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-7/qpid/cpp/src/Makefile.am?rev=1245464&r1=1245463&r2=1245464&view=diff
==============================================================================
--- qpid/branches/qpid-3603-7/qpid/cpp/src/Makefile.am (original)
+++ qpid/branches/qpid-3603-7/qpid/cpp/src/Makefile.am Fri Feb 17 14:02:26 2012
@@ -633,6 +633,8 @@ libqpidbroker_la_SOURCES = \
   qpid/broker/QueueFlowLimit.cpp \
   qpid/broker/QueueReplicator.h \
   qpid/broker/QueueReplicator.cpp \
+  qpid/broker/ReplicatingSubscription.h \
+  qpid/broker/ReplicatingSubscription.cpp \
   qpid/broker/RateFlowcontrol.h \
   qpid/broker/RecoverableConfig.h \
   qpid/broker/RecoverableExchange.h \

Added: 
qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/ReplicatingSubscription.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/ReplicatingSubscription.cpp?rev=1245464&view=auto
==============================================================================
--- 
qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/ReplicatingSubscription.cpp 
(added)
+++ 
qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/ReplicatingSubscription.cpp 
Fri Feb 17 14:02:26 2012
@@ -0,0 +1,214 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "ReplicatingSubscription.h"
+#include "Queue.h"
+#include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/MessageTransferBody.h"
+#include "qpid/log/Statement.h"
+
+namespace qpid {
+namespace broker {
+
+using namespace framing;
+
+const std::string DOLLAR("$");
+const std::string INTERNAL("_internall");
+
+class ReplicationStateInitialiser
+{
+  public:
+    ReplicationStateInitialiser(
+        qpid::framing::SequenceSet& r,
+        const qpid::framing::SequenceNumber& s,
+        const qpid::framing::SequenceNumber& e) : results(r), start(s), end(e)
+    {
+        results.add(start, end);
+    }
+
+    void operator()(const QueuedMessage& message) {
+        if (message.position < start) {
+            //replica does not have a message that should still be on the queue
+            QPID_LOG(warning, "Replica appears to be missing message at " << 
message.position);
+        } else if (message.position >= start && message.position <= end) {
+            //i.e. message is within the intial range and has not been 
dequeued, so remove it from the results
+            results.remove(message.position);
+        } //else message has not been seen by replica yet so can be ignored 
here
+    }
+
+  private:
+    qpid::framing::SequenceSet& results;
+    const qpid::framing::SequenceNumber start;
+    const qpid::framing::SequenceNumber end;
+};
+
+std::string mask(const std::string& in)
+{
+    return DOLLAR + in + INTERNAL;
+}
+
+ReplicatingSubscription::ReplicatingSubscription(
+    SemanticState* _parent,
+    const std::string& _name,
+    Queue::shared_ptr _queue,
+    bool ack,
+    bool _acquire,
+    bool _exclusive,
+    const std::string& _tag,
+    const std::string& _resumeId,
+    uint64_t _resumeTtl,
+    const framing::FieldTable& _arguments
+) : ConsumerImpl(_parent, _name, _queue, ack, _acquire, _exclusive, _tag, 
_resumeId, _resumeTtl, _arguments),
+    events(new Queue(mask(_name))),
+    consumer(new DelegatingConsumer(*this))
+{
+
+    if (_arguments.isSet("qpid.high_sequence_number")) {
+        qpid::framing::SequenceNumber hwm = 
_arguments.getAsInt("qpid.high_sequence_number");
+        qpid::framing::SequenceNumber lwm;
+        if (_arguments.isSet("qpid.low_sequence_number")) {
+            lwm = _arguments.getAsInt("qpid.low_sequence_number");
+        } else {
+            lwm = hwm;
+        }
+        qpid::framing::SequenceNumber oldest;
+        if (_queue->getOldest(oldest)) {
+            if (oldest >= hwm) {
+                range.add(lwm, --oldest);
+            } else if (oldest >= lwm) {
+                ReplicationStateInitialiser initialiser(range, lwm, hwm);
+                _queue->eachMessage(initialiser);
+            } else { //i.e. have older message on master than is reported to 
exist on replica
+                QPID_LOG(warning, "Replica appears to be missing message on 
master");
+            }
+        } else {
+            //local queue (i.e. master) is empty
+            range.add(lwm, _queue->getPosition());
+        }
+        QPID_LOG(debug, "Initial set of dequeues for " << _queue->getName() << 
" are " << range
+                 << " (lwm=" << lwm << ", hwm=" << hwm << ", current=" << 
_queue->getPosition() << ")");
+        //set position of 'cursor'
+        position = hwm;
+    }
+}
+
+bool ReplicatingSubscription::deliver(QueuedMessage& m)
+{
+    return ConsumerImpl::deliver(m);
+}
+
+void ReplicatingSubscription::init()
+{
+    
getQueue()->addObserver(boost::dynamic_pointer_cast<QueueObserver>(shared_from_this()));
+}
+
+void ReplicatingSubscription::cancel()
+{
+    
getQueue()->removeObserver(boost::dynamic_pointer_cast<QueueObserver>(shared_from_this()));
+}
+
+ReplicatingSubscription::~ReplicatingSubscription() {}
+
+//called before we get notified of the message being available and
+//under the message lock in the queue
+void ReplicatingSubscription::enqueued(const QueuedMessage& m)
+{
+    QPID_LOG(debug, "Enqueued message at " << m.position);
+    //delay completion
+    m.payload->getIngressCompletion().startCompleter();
+    QPID_LOG(debug, "Delayed " << m.payload.get());
+}
+
+void ReplicatingSubscription::generateDequeueEvent()
+{
+    std::string buf(range.encodedSize(),'\0');
+    framing::Buffer buffer(&buf[0], buf.size());
+    range.encode(buffer);
+    range.clear();
+    buffer.reset();
+
+    //generate event message
+    boost::intrusive_ptr<Message> event = new Message();
+    AMQFrame method((MessageTransferBody(ProtocolVersion(), std::string(), 0, 
0)));
+    AMQFrame header((AMQHeaderBody()));
+    AMQFrame content((AMQContentBody()));
+    content.castBody<AMQContentBody>()->decode(buffer, buffer.getSize());
+    header.setBof(false);
+    header.setEof(false);
+    header.setBos(true);
+    header.setEos(true);
+    content.setBof(false);
+    content.setEof(true);
+    content.setBos(true);
+    content.setEos(true);
+    event->getFrames().append(method);
+    event->getFrames().append(header);
+    event->getFrames().append(content);
+
+    DeliveryProperties* props = 
event->getFrames().getHeaders()->get<DeliveryProperties>(true);
+    props->setRoutingKey("dequeue-event");
+
+    events->deliver(event);
+}
+
+//called after the message has been removed from the deque and under
+//the message lock in the queue
+void ReplicatingSubscription::dequeued(const QueuedMessage& m)
+{
+    {
+        sys::Mutex::ScopedLock l(lock);
+        range.add(m.position);
+        QPID_LOG(debug, "Updated dequeue event to include message at " << 
m.position << "; subscription is at " << position);
+    }
+    notify();
+    if (m.position > position) {
+        m.payload->getIngressCompletion().finishCompleter();
+        QPID_LOG(debug, "Completed " << m.payload.get() << " early due to 
dequeue");
+    }
+}
+
+bool ReplicatingSubscription::doDispatch()
+{
+    {
+        sys::Mutex::ScopedLock l(lock);
+        if (!range.empty()) {
+            generateDequeueEvent();
+        }
+    }
+    bool r1 = events->dispatch(consumer);
+    bool r2 = ConsumerImpl::doDispatch();
+    return r1 || r2;
+}
+
+ReplicatingSubscription::DelegatingConsumer::DelegatingConsumer(ReplicatingSubscription&
 c) : Consumer(c.getName(), true), delegate(c) {}
+ReplicatingSubscription::DelegatingConsumer::~DelegatingConsumer() {}
+bool ReplicatingSubscription::DelegatingConsumer::deliver(QueuedMessage& m)
+{
+    return delegate.deliver(m);
+}
+void ReplicatingSubscription::DelegatingConsumer::notify() { 
delegate.notify(); }
+bool 
ReplicatingSubscription::DelegatingConsumer::filter(boost::intrusive_ptr<Message>
 msg) { return delegate.filter(msg); }
+bool 
ReplicatingSubscription::DelegatingConsumer::accept(boost::intrusive_ptr<Message>
 msg) { return delegate.accept(msg); }
+void ReplicatingSubscription::DelegatingConsumer::cancel() {}
+OwnershipToken* ReplicatingSubscription::DelegatingConsumer::getSession() { 
return delegate.getSession(); }
+
+
+}} // namespace qpid::broker

Propchange: 
qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/ReplicatingSubscription.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/ReplicatingSubscription.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: 
qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/ReplicatingSubscription.h
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/ReplicatingSubscription.h?rev=1245464&view=auto
==============================================================================
--- 
qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/ReplicatingSubscription.h 
(added)
+++ 
qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/ReplicatingSubscription.h 
Fri Feb 17 14:02:26 2012
@@ -0,0 +1,77 @@
+#ifndef QPID_BROKER_REPLICATINGSUBSCRIPTION_H
+#define QPID_BROKER_REPLICATINGSUBSCRIPTION_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/broker/SemanticState.h"
+
+namespace qpid {
+namespace broker {
+
+/**
+ * Subscriber to a remote queue that replicates to a local queue.
+ */
+class ReplicatingSubscription : public SemanticState::ConsumerImpl, public 
QueueObserver
+{
+  public:
+    ReplicatingSubscription(SemanticState* parent,
+                            const std::string& name, boost::shared_ptr<Queue> 
queue,
+                            bool ack, bool acquire, bool exclusive, const 
std::string& tag,
+                            const std::string& resumeId, uint64_t resumeTtl, 
const framing::FieldTable& arguments);
+    ~ReplicatingSubscription();
+
+    void init();
+    void cancel();
+    bool deliver(QueuedMessage& msg);
+    void enqueued(const QueuedMessage&);
+    void dequeued(const QueuedMessage&);
+    void acquired(const QueuedMessage&) {}
+    void requeued(const QueuedMessage&) {}
+
+  protected:
+    bool doDispatch();
+  private:
+    boost::shared_ptr<Queue> events;
+    boost::shared_ptr<Consumer> consumer;
+    qpid::framing::SequenceSet range;
+
+    void generateDequeueEvent();
+    class DelegatingConsumer : public Consumer
+    {
+      public:
+        DelegatingConsumer(ReplicatingSubscription&);
+        ~DelegatingConsumer();
+        bool deliver(QueuedMessage& msg);
+        void notify();
+        bool filter(boost::intrusive_ptr<Message>);
+        bool accept(boost::intrusive_ptr<Message>);
+        void cancel();
+        OwnershipToken* getSession();
+      private:
+        ReplicatingSubscription& delegate;
+    };
+};
+
+
+}} // namespace qpid::broker
+
+#endif  /*!QPID_BROKER_REPLICATINGSUBSCRIPTION_H*/

Propchange: 
qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/ReplicatingSubscription.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/ReplicatingSubscription.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=1245464&r1=1245463&r2=1245464&view=diff
==============================================================================
--- qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/SemanticState.cpp 
(original)
+++ qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/SemanticState.cpp Fri 
Feb 17 14:02:26 2012
@@ -28,6 +28,7 @@
 #include "qpid/broker/NodeClone.h"
 #include "qpid/broker/Queue.h"
 #include "qpid/broker/QueueReplicator.h"
+#include "qpid/broker/ReplicatingSubscription.h"
 #include "qpid/broker/SessionContext.h"
 #include "qpid/broker/SessionOutputException.h"
 #include "qpid/broker/TxAccept.h"
@@ -266,47 +267,6 @@ void SemanticState::record(const Deliver
 
 const std::string QPID_SYNC_FREQUENCY("qpid.sync_frequency");
 
-class ReplicatingSubscription : public SemanticState::ConsumerImpl, public 
QueueObserver
-{
-  public:
-    ReplicatingSubscription(SemanticState* parent,
-                            const std::string& name, boost::shared_ptr<Queue> 
queue,
-                            bool ack, bool acquire, bool exclusive, const 
std::string& tag,
-                            const std::string& resumeId, uint64_t resumeTtl, 
const framing::FieldTable& arguments);
-    ~ReplicatingSubscription();
-
-    void init();
-    void cancel();
-    bool deliver(QueuedMessage& msg);
-    void enqueued(const QueuedMessage&);
-    void dequeued(const QueuedMessage&);
-    void acquired(const QueuedMessage&) {}
-    void requeued(const QueuedMessage&) {}
-
-  protected:
-    bool doDispatch();
-  private:
-    boost::shared_ptr<Queue> events;
-    boost::shared_ptr<Consumer> consumer;
-    qpid::framing::SequenceSet range;
-
-    void generateDequeueEvent();
-    class DelegatingConsumer : public Consumer
-    {
-      public:
-        DelegatingConsumer(ReplicatingSubscription&);
-        ~DelegatingConsumer();
-        bool deliver(QueuedMessage& msg);
-        void notify();
-        bool filter(boost::intrusive_ptr<Message>);
-        bool accept(boost::intrusive_ptr<Message>);
-        void cancel() {}
-        OwnershipToken* getSession();
-      private:
-        ReplicatingSubscription& delegate;
-    };
-};
-
 SemanticState::ConsumerImpl::shared_ptr 
SemanticState::ConsumerImpl::create(SemanticState* parent,
                                                                             
const string& name,
                                                                             
Queue::shared_ptr queue,
@@ -327,163 +287,6 @@ SemanticState::ConsumerImpl::shared_ptr 
     }
 }
 
-std::string mask(const std::string& in)
-{
-    return std::string("$") + in + std::string("_internal");
-}
-
-class ReplicationStateInitialiser
-{
-  public:
-    ReplicationStateInitialiser(qpid::framing::SequenceSet& results,
-                                const qpid::framing::SequenceNumber& start,
-                                const qpid::framing::SequenceNumber& end);
-    void operator()(const QueuedMessage& m) { process(m); }
-  private:
-    qpid::framing::SequenceSet& results;
-    const qpid::framing::SequenceNumber start;
-    const qpid::framing::SequenceNumber end;
-    void process(const QueuedMessage&);
-};
-
-ReplicatingSubscription::ReplicatingSubscription(SemanticState* _parent,
-                                                                const string& 
_name,
-                                                                
Queue::shared_ptr _queue,
-                                                                bool ack,
-                                                                bool _acquire,
-                                                                bool 
_exclusive,
-                                                                const string& 
_tag,
-                                                                const string& 
_resumeId,
-                                                                uint64_t 
_resumeTtl,
-                                                                const 
framing::FieldTable& _arguments
-) : ConsumerImpl(_parent, _name, _queue, ack, _acquire, _exclusive, _tag, 
_resumeId, _resumeTtl, _arguments),
-    events(new Queue(mask(_name))),
-    consumer(new DelegatingConsumer(*this))
-{
-
-    if (_arguments.isSet("qpid.high_sequence_number")) {
-        qpid::framing::SequenceNumber hwm = 
_arguments.getAsInt("qpid.high_sequence_number");
-        qpid::framing::SequenceNumber lwm;
-        if (_arguments.isSet("qpid.low_sequence_number")) {
-            lwm = _arguments.getAsInt("qpid.low_sequence_number");
-        } else {
-            lwm = hwm;
-        }
-        qpid::framing::SequenceNumber oldest;
-        if (_queue->getOldest(oldest)) {
-            if (oldest >= hwm) {
-                range.add(lwm, --oldest);
-            } else if (oldest >= lwm) {
-                ReplicationStateInitialiser initialiser(range, lwm, hwm);
-                _queue->eachMessage(initialiser);
-            } else { //i.e. have older message on master than is reported to 
exist on replica
-                QPID_LOG(warning, "Replica appears to be missing message on 
master");
-            }
-        } else {
-            //local queue (i.e. master) is empty
-            range.add(lwm, _queue->getPosition());
-        }
-        QPID_LOG(debug, "Initial set of dequeues for " << _queue->getName() << 
" are " << range
-                 << " (lwm=" << lwm << ", hwm=" << hwm << ", current=" << 
_queue->getPosition() << ")");
-        //set position of 'cursor'
-        position = hwm;
-    }
-}
-
-bool ReplicatingSubscription::deliver(QueuedMessage& m)
-{
-    return ConsumerImpl::deliver(m);
-}
-
-void ReplicatingSubscription::init()
-{
-    
getQueue()->addObserver(boost::dynamic_pointer_cast<QueueObserver>(shared_from_this()));
-}
-
-void ReplicatingSubscription::cancel()
-{
-    
getQueue()->removeObserver(boost::dynamic_pointer_cast<QueueObserver>(shared_from_this()));
-}
-
-ReplicatingSubscription::~ReplicatingSubscription() {}
-
-//called before we get notified of the message being available and
-//under the message lock in the queue
-void ReplicatingSubscription::enqueued(const QueuedMessage& m)
-{
-    QPID_LOG(debug, "Enqueued message at " << m.position);
-    //delay completion
-    m.payload->getIngressCompletion().startCompleter();
-    QPID_LOG(debug, "Delayed " << m.payload.get());
-}
-
-class Buffer : public qpid::framing::Buffer
-{
-  public:
-    Buffer(size_t size) : qpid::framing::Buffer(new char[size], size) {}
-    ~Buffer() { delete[] getPointer(); }
-};
-
-void ReplicatingSubscription::generateDequeueEvent()
-{
-    Buffer buffer(range.encodedSize());
-    range.encode(buffer);
-    range.clear();
-    buffer.reset();
-
-    //generate event message
-    boost::intrusive_ptr<Message> event = new Message();
-    AMQFrame method((MessageTransferBody(ProtocolVersion(), std::string(), 0, 
0)));
-    AMQFrame header((AMQHeaderBody()));
-    AMQFrame content((AMQContentBody()));
-    content.castBody<AMQContentBody>()->decode(buffer, buffer.getSize());
-    header.setBof(false);
-    header.setEof(false);
-    header.setBos(true);
-    header.setEos(true);
-    content.setBof(false);
-    content.setEof(true);
-    content.setBos(true);
-    content.setEos(true);
-    event->getFrames().append(method);
-    event->getFrames().append(header);
-    event->getFrames().append(content);
-
-    DeliveryProperties* props = 
event->getFrames().getHeaders()->get<DeliveryProperties>(true);
-    props->setRoutingKey("dequeue-event");
-
-    events->deliver(event);
-}
-
-//called after the message has been removed from the deque and under
-//the message lock in the queue
-void ReplicatingSubscription::dequeued(const QueuedMessage& m)
-{
-    {
-        Mutex::ScopedLock l(lock);
-        range.add(m.position);
-        QPID_LOG(debug, "Updated dequeue event to include message at " << 
m.position << "; subscription is at " << position);
-    }
-    notify();
-    if (m.position > position) {
-        m.payload->getIngressCompletion().finishCompleter();
-        QPID_LOG(debug, "Completed " << m.payload.get() << " early due to 
dequeue");
-    }
-}
-
-bool ReplicatingSubscription::doDispatch()
-{
-    {
-        Mutex::ScopedLock l(lock);
-        if (!range.empty()) {
-            generateDequeueEvent();
-        }
-    }
-    bool r1 = events->dispatch(consumer);
-    bool r2 = ConsumerImpl::doDispatch();
-    return r1 || r2;
-}
-
 SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent,
                                           const string& _name,
                                           Queue::shared_ptr _queue,
@@ -1048,35 +851,4 @@ void SemanticState::detached()
     }
 }
 
-ReplicatingSubscription::DelegatingConsumer::DelegatingConsumer(ReplicatingSubscription&
 c) : Consumer(c.getName(), true), delegate(c) {}
-ReplicatingSubscription::DelegatingConsumer::~DelegatingConsumer() {}
-bool ReplicatingSubscription::DelegatingConsumer::deliver(QueuedMessage& m)
-{
-    return delegate.deliver(m);
-}
-void ReplicatingSubscription::DelegatingConsumer::notify() { 
delegate.notify(); }
-bool 
ReplicatingSubscription::DelegatingConsumer::filter(boost::intrusive_ptr<Message>
 msg) { return delegate.filter(msg); }
-bool 
ReplicatingSubscription::DelegatingConsumer::accept(boost::intrusive_ptr<Message>
 msg) { return delegate.accept(msg); }
-OwnershipToken* ReplicatingSubscription::DelegatingConsumer::getSession() { 
return delegate.getSession(); }
-
-ReplicationStateInitialiser::ReplicationStateInitialiser(qpid::framing::SequenceSet&
 r,
-                                                         const 
qpid::framing::SequenceNumber& s,
-                                                         const 
qpid::framing::SequenceNumber& e)
-    : results(r), start(s), end(e)
-{
-    results.add(start, end);
-}
-
-void ReplicationStateInitialiser::process(const QueuedMessage& message)
-{
-    if (message.position < start) {
-        //replica does not have a message that should still be on the queue
-        QPID_LOG(warning, "Replica appears to be missing message at " << 
message.position);
-    } else if (message.position >= start && message.position <= end) {
-        //i.e. message is within the intial range and has not been dequeued, 
so remove it from the results
-        results.remove(message.position);
-    } //else message has not been seen by replica yet so can be ignored here
-
-}
-
 }} // namespace qpid::broker



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscr...@qpid.apache.org

Reply via email to