Author: gsim
Date: Wed Jul  8 11:48:57 2009
New Revision: 792103

URL: http://svn.apache.org/viewvc?rev=792103&view=rev
Log:
QPID-1974: Fixes (and tests) for updating lvq state to new cluster members.


Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
    qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
    qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp?rev=792103&r1=792102&r2=792103&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp Wed Jul  8 11:48:57 2009
@@ -401,4 +401,19 @@
     return getProperties<MessageProperties>()->getApplicationHeaders();
 }
 
+
+void Message::setUpdateDestination(const std::string& d)
+{
+    updateDestination = d;
+}
+
+
+bool Message::isUpdateMessage()
+{
+    return updateDestination.size() && isA<MessageTransferBody>() 
+        && getMethod<MessageTransferBody>()->getDestination() == 
updateDestination;
+}
+
+std::string Message::updateDestination;
+
 }} // namespace qpid::broker

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h?rev=792103&r1=792102&r2=792103&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Message.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Message.h Wed Jul  8 11:48:57 2009
@@ -160,6 +160,9 @@
     void setDequeueCompleteCallback(MessageCallback& cb);
     void resetDequeueCompleteCallback();
 
+    bool isUpdateMessage();
+    static void setUpdateDestination(const std::string&);
+
   private:
     typedef std::map<const Queue*,boost::intrusive_ptr<Message> > Replacement;
 
@@ -186,6 +189,7 @@
     mutable boost::intrusive_ptr<Message> empty;
     MessageCallback* enqueueCallback;
     MessageCallback* dequeueCallback;
+    static std::string updateDestination;
 };
 
 }}

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=792103&r1=792102&r2=792103&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Wed Jul  8 11:48:57 2009
@@ -570,7 +570,7 @@
             string key = ft->getAsString(qpidVQMatchProperty);
 
             i = lvq.find(key);
-            if (i == lvq.end()){
+            if (i == lvq.end() || msg->isUpdateMessage()){
                 messages.push_back(qm);
                 listeners.populate(copy);
                 lvq[key] = msg; 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h?rev=792103&r1=792102&r2=792103&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h Wed Jul  8 11:48:57 2009
@@ -300,9 +300,15 @@
                 ManagementMethod (uint32_t methodId, management::Args& args, 
std::string& text);
 
             /** Apply f to each Message on the queue. */
-            template <class F> void eachMessage(F f) const {
+            template <class F> void eachMessage(F f) {
                 sys::Mutex::ScopedLock l(messageLock);
-                std::for_each(messages.begin(), messages.end(), f);
+                if (lastValueQueue) {
+                    for (Messages::iterator i = messages.begin(); i != 
messages.end(); ++i) {
+                        f(checkLvqReplace(*i));
+                    }
+                } else {
+                    std::for_each(messages.begin(), messages.end(), f);
+                }
             }
 
             /** Apply f to each QueueBinding on the queue */

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp?rev=792103&r1=792102&r2=792103&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp Wed Jul  8 11:48:57 
2009
@@ -34,6 +34,7 @@
 #include "qpid/management/ManagementAgent.h"
 #include "qpid/management/IdAllocator.h"
 #include "qpid/broker/Exchange.h"
+#include "qpid/broker/Message.h"
 #include "qpid/broker/Queue.h"
 #include "qpid/broker/SessionState.h"
 #include "qpid/client/ConnectionSettings.h"
@@ -136,6 +137,7 @@
         broker->setConnectionFactory(
             boost::shared_ptr<sys::ConnectionCodec::Factory>(
                 new ConnectionCodec::Factory(broker->getConnectionFactory(), 
*cluster)));
+        broker::Message::setUpdateDestination(UpdateClient::UPDATE);
         ManagementAgent* mgmt = broker->getManagementAgent();
         if (mgmt) {
             std::auto_ptr<IdAllocator> allocator(new 
UpdateClientIdAllocator());

Modified: qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp?rev=792103&r1=792102&r2=792103&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp Wed Jul  8 11:48:57 2009
@@ -936,10 +936,13 @@
     }
 }
 
-void send(Client& client, const std::string& queue, int count, int start=1, 
const std::string& base="m")
+void send(Client& client, const std::string& queue, int count, int start=1, 
const std::string& base="m",
+          const std::string& lvqKey="")
 {
     for (int i = 0; i < count; i++) {
-        
client.session.messageTransfer(arg::content=makeMessage((boost::format("%1%_%2%")
 % base % (i+start)).str(), queue, durableFlag));
+        Message message = makeMessage((boost::format("%1%_%2%") % base % 
(i+start)).str(), queue, durableFlag);
+        if (!lvqKey.empty()) 
message.getHeaders().setString(QueueOptions::strLVQMatchProperty, lvqKey);
+        client.session.messageTransfer(arg::content=message);
     }
 }
 
@@ -998,6 +1001,64 @@
     }
 }
 
+QPID_AUTO_TEST_CASE(testLvqUpdate) {
+    //tests that lvqs are accurately replicated on newly joined nodes
+    ClusterFixture::Args args;
+    args += "--log-enable", "critical";
+    prepareArgs(args, durableFlag);
+    ClusterFixture cluster(1, args, -1);
+    Client c1(cluster[0], "c1");
+    {
+        ScopedSuppressLogging allQuiet;
+        QueueOptions options;
+        options.setOrdering(LVQ);
+        c1.session.queueDeclare("q", arg::arguments=options, 
arg::durable=durableFlag);
+
+        send(c1, "q", 5, 1, "a", "a");
+        send(c1, "q", 2, 1, "b", "b");
+        send(c1, "q", 1, 1, "c", "c");
+        send(c1, "q", 1, 3, "b", "b");
+
+        //add new node
+        cluster.add();
+        BOOST_CHECK_EQUAL(2u, knownBrokerPorts(c1.connection, 
2).size());//wait till joined
+
+        //check state of queue on both nodes
+        checkQueue(cluster, "q", list_of<string>("a_5")("b_3")("c_1"));
+    }
+}
+
+
+QPID_AUTO_TEST_CASE(testBrowsedLvqUpdate) {
+    //tests that lvqs are accurately replicated on newly joined nodes
+    //if the lvq state has been affected by browsers
+    ClusterFixture::Args args;
+    args += "--log-enable", "critical";
+    prepareArgs(args, durableFlag);
+    ClusterFixture cluster(1, args, -1);
+    Client c1(cluster[0], "c1");
+    {
+        ScopedSuppressLogging allQuiet;
+        QueueOptions options;
+        options.setOrdering(LVQ);
+        c1.session.queueDeclare("q", arg::arguments=options, 
arg::durable=durableFlag);
+
+        send(c1, "q", 1, 1, "a", "a");
+        send(c1, "q", 2, 1, "b", "b");
+        send(c1, "q", 1, 1, "c", "c");
+        checkQueue(cluster, "q", list_of<string>("a_1")("b_2")("c_1"));
+        send(c1, "q", 4, 2, "a", "a");
+        send(c1, "q", 1, 3, "b", "b");
+
+        //add new node
+        cluster.add();
+        BOOST_CHECK_EQUAL(2u, knownBrokerPorts(c1.connection, 
2).size());//wait till joined
+
+        //check state of queue on both nodes
+        checkQueue(cluster, "q", 
list_of<string>("a_1")("b_2")("c_1")("a_5")("b_3"));
+    }
+}
+
 QPID_AUTO_TEST_CASE(testRelease) {
     //tests that releasing a messages that was unacked when one node
     //joined works correctly



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscr...@qpid.apache.org

Reply via email to