Author: gsim Date: Wed Jul 8 11:48:57 2009 New Revision: 792103 URL: http://svn.apache.org/viewvc?rev=792103&view=rev Log: QPID-1974: Fixes (and tests) for updating lvq state to new cluster members.
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp qpid/trunk/qpid/cpp/src/qpid/broker/Message.h qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp?rev=792103&r1=792102&r2=792103&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp Wed Jul 8 11:48:57 2009 @@ -401,4 +401,19 @@ return getProperties<MessageProperties>()->getApplicationHeaders(); } + +void Message::setUpdateDestination(const std::string& d) +{ + updateDestination = d; +} + + +bool Message::isUpdateMessage() +{ + return updateDestination.size() && isA<MessageTransferBody>() + && getMethod<MessageTransferBody>()->getDestination() == updateDestination; +} + +std::string Message::updateDestination; + }} // namespace qpid::broker Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Message.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h?rev=792103&r1=792102&r2=792103&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/broker/Message.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/broker/Message.h Wed Jul 8 11:48:57 2009 @@ -160,6 +160,9 @@ void setDequeueCompleteCallback(MessageCallback& cb); void resetDequeueCompleteCallback(); + bool isUpdateMessage(); + static void setUpdateDestination(const std::string&); + private: typedef std::map<const Queue*,boost::intrusive_ptr<Message> > Replacement; @@ -186,6 +189,7 @@ mutable boost::intrusive_ptr<Message> empty; MessageCallback* enqueueCallback; MessageCallback* dequeueCallback; + static std::string updateDestination; }; }} Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=792103&r1=792102&r2=792103&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Wed Jul 8 11:48:57 2009 @@ -570,7 +570,7 @@ string key = ft->getAsString(qpidVQMatchProperty); i = lvq.find(key); - if (i == lvq.end()){ + if (i == lvq.end() || msg->isUpdateMessage()){ messages.push_back(qm); listeners.populate(copy); lvq[key] = msg; Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h?rev=792103&r1=792102&r2=792103&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h Wed Jul 8 11:48:57 2009 @@ -300,9 +300,15 @@ ManagementMethod (uint32_t methodId, management::Args& args, std::string& text); /** Apply f to each Message on the queue. */ - template <class F> void eachMessage(F f) const { + template <class F> void eachMessage(F f) { sys::Mutex::ScopedLock l(messageLock); - std::for_each(messages.begin(), messages.end(), f); + if (lastValueQueue) { + for (Messages::iterator i = messages.begin(); i != messages.end(); ++i) { + f(checkLvqReplace(*i)); + } + } else { + std::for_each(messages.begin(), messages.end(), f); + } } /** Apply f to each QueueBinding on the queue */ Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp?rev=792103&r1=792102&r2=792103&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp Wed Jul 8 11:48:57 2009 @@ -34,6 +34,7 @@ #include "qpid/management/ManagementAgent.h" #include "qpid/management/IdAllocator.h" #include "qpid/broker/Exchange.h" +#include "qpid/broker/Message.h" #include "qpid/broker/Queue.h" #include "qpid/broker/SessionState.h" #include "qpid/client/ConnectionSettings.h" @@ -136,6 +137,7 @@ broker->setConnectionFactory( boost::shared_ptr<sys::ConnectionCodec::Factory>( new ConnectionCodec::Factory(broker->getConnectionFactory(), *cluster))); + broker::Message::setUpdateDestination(UpdateClient::UPDATE); ManagementAgent* mgmt = broker->getManagementAgent(); if (mgmt) { std::auto_ptr<IdAllocator> allocator(new UpdateClientIdAllocator()); Modified: qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp?rev=792103&r1=792102&r2=792103&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp (original) +++ qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp Wed Jul 8 11:48:57 2009 @@ -936,10 +936,13 @@ } } -void send(Client& client, const std::string& queue, int count, int start=1, const std::string& base="m") +void send(Client& client, const std::string& queue, int count, int start=1, const std::string& base="m", + const std::string& lvqKey="") { for (int i = 0; i < count; i++) { - client.session.messageTransfer(arg::content=makeMessage((boost::format("%1%_%2%") % base % (i+start)).str(), queue, durableFlag)); + Message message = makeMessage((boost::format("%1%_%2%") % base % (i+start)).str(), queue, durableFlag); + if (!lvqKey.empty()) message.getHeaders().setString(QueueOptions::strLVQMatchProperty, lvqKey); + client.session.messageTransfer(arg::content=message); } } @@ -998,6 +1001,64 @@ } } +QPID_AUTO_TEST_CASE(testLvqUpdate) { + //tests that lvqs are accurately replicated on newly joined nodes + ClusterFixture::Args args; + args += "--log-enable", "critical"; + prepareArgs(args, durableFlag); + ClusterFixture cluster(1, args, -1); + Client c1(cluster[0], "c1"); + { + ScopedSuppressLogging allQuiet; + QueueOptions options; + options.setOrdering(LVQ); + c1.session.queueDeclare("q", arg::arguments=options, arg::durable=durableFlag); + + send(c1, "q", 5, 1, "a", "a"); + send(c1, "q", 2, 1, "b", "b"); + send(c1, "q", 1, 1, "c", "c"); + send(c1, "q", 1, 3, "b", "b"); + + //add new node + cluster.add(); + BOOST_CHECK_EQUAL(2u, knownBrokerPorts(c1.connection, 2).size());//wait till joined + + //check state of queue on both nodes + checkQueue(cluster, "q", list_of<string>("a_5")("b_3")("c_1")); + } +} + + +QPID_AUTO_TEST_CASE(testBrowsedLvqUpdate) { + //tests that lvqs are accurately replicated on newly joined nodes + //if the lvq state has been affected by browsers + ClusterFixture::Args args; + args += "--log-enable", "critical"; + prepareArgs(args, durableFlag); + ClusterFixture cluster(1, args, -1); + Client c1(cluster[0], "c1"); + { + ScopedSuppressLogging allQuiet; + QueueOptions options; + options.setOrdering(LVQ); + c1.session.queueDeclare("q", arg::arguments=options, arg::durable=durableFlag); + + send(c1, "q", 1, 1, "a", "a"); + send(c1, "q", 2, 1, "b", "b"); + send(c1, "q", 1, 1, "c", "c"); + checkQueue(cluster, "q", list_of<string>("a_1")("b_2")("c_1")); + send(c1, "q", 4, 2, "a", "a"); + send(c1, "q", 1, 3, "b", "b"); + + //add new node + cluster.add(); + BOOST_CHECK_EQUAL(2u, knownBrokerPorts(c1.connection, 2).size());//wait till joined + + //check state of queue on both nodes + checkQueue(cluster, "q", list_of<string>("a_1")("b_2")("c_1")("a_5")("b_3")); + } +} + QPID_AUTO_TEST_CASE(testRelease) { //tests that releasing a messages that was unacked when one node //joined works correctly --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscr...@qpid.apache.org