Author: kpvdr Date: Wed Jul 1 15:21:54 2009 New Revision: 790215 URL: http://svn.apache.org/viewvc?rev=790215&view=rev Log: Fix for cluster_test problems: a) When not started from within test dir, qpidd is not found, and a process cascade results in which each forked process continues and runs all tests after its own instead of terminating; b) Hard-coded paths and names of libs, which have been moved into Makefile.am; c) Some tests use ScopedSuppressLogging, these tests are modified so the scope of the logging suppression does not include the broker/cluster startup.
Modified: qpid/trunk/qpid/cpp/src/tests/ClusterFailover.cpp qpid/trunk/qpid/cpp/src/tests/ForkedBroker.cpp qpid/trunk/qpid/cpp/src/tests/Makefile.am qpid/trunk/qpid/cpp/src/tests/PartialFailure.cpp qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp Modified: qpid/trunk/qpid/cpp/src/tests/ClusterFailover.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ClusterFailover.cpp?rev=790215&r1=790214&r2=790215&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/tests/ClusterFailover.cpp (original) +++ qpid/trunk/qpid/cpp/src/tests/ClusterFailover.cpp Wed Jul 1 15:21:54 2009 @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -51,7 +51,7 @@ // Test re-connecting with same session name after a failure. QPID_AUTO_TEST_CASE(testReconnectSameSessionName) { ostringstream clusterLib; - clusterLib << getLibPath("QPID_LIB_DIR", "../.libs") << "/cluster.so"; + clusterLib << getLibPath("CLUSTER_LIB", "../.libs/cluster.so"); ClusterFixture::Args args = list_of<string>("--auth")("no")("--no-module-dir")("--no-data-dir")("--load-module")(clusterLib.str()); ClusterFixture cluster(2, args, -1); Client c0(cluster[0], "foo"); Modified: qpid/trunk/qpid/cpp/src/tests/ForkedBroker.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ForkedBroker.cpp?rev=790215&r1=790214&r2=790215&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/tests/ForkedBroker.cpp (original) +++ qpid/trunk/qpid/cpp/src/tests/ForkedBroker.cpp Wed Jul 1 15:21:54 2009 @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -50,24 +50,24 @@ catch (const std::exception& e) { QPID_LOG(error, QPID_MSG("Killing forked broker: " << e.what())); } - if (!dataDir.empty()) + if (!dataDir.empty()) ::system(("rm -rf "+dataDir).c_str()); } void ForkedBroker::kill(int sig) { if (pid == 0) return; - int savePid = pid; + int savePid = pid; pid = 0; // Reset pid here in case of an exception. using qpid::ErrnoException; - if (::kill(savePid, sig) < 0) + if (::kill(savePid, sig) < 0) throw ErrnoException("kill failed"); int status; - if (::waitpid(savePid, &status, 0) < 0 && sig != 9) + if (::waitpid(savePid, &status, 0) < 0 && sig != 9) throw ErrnoException("wait for forked process failed"); - if (WEXITSTATUS(status) != 0 && sig != 9) + if (WEXITSTATUS(status) != 0 && sig != 9) throw qpid::Exception(QPID_MSG("Forked broker exited with: " << WEXITSTATUS(status))); } - + namespace std { static ostream& operator<<(ostream& o, const ForkedBroker::Args& a) { copy(a.begin(), a.end(), ostream_iterator<string>(o, " ")); @@ -83,7 +83,7 @@ } } - + void ForkedBroker::init(const Args& userArgs) { using qpid::ErrnoException; port = 0; @@ -105,19 +105,20 @@ ::close(pipeFds[0]); int fd = ::dup2(pipeFds[1], 1); // pipe stdout to the parent. if (fd < 0) throw ErrnoException("dup2 failed"); - const char* prog = ::getenv("QPID_FORKED_BROKER"); - if (!prog) prog = "../qpidd"; + const char* prog = ::getenv("QPIDD_EXEC"); + if (!prog) prog = "../qpidd"; // This only works from within svn checkout Args args(userArgs); args.push_back("--port=0"); // Keep quiet except for errors. if (!::getenv("QPID_TRACE") && !::getenv("QPID_LOG_ENABLE") && find_if(userArgs.begin(), userArgs.end(), isLogOption) == userArgs.end()) - args.push_back("--log-enable=error+"); + args.push_back("--log-enable=error+"); std::vector<const char*> argv(args.size()); std::transform(args.begin(), args.end(), argv.begin(), boost::bind(&std::string::c_str, _1)); argv.push_back(0); QPID_LOG(debug, "ForkedBroker exec " << prog << ": " << args); execv(prog, const_cast<char* const*>(&argv[0])); - throw ErrnoException("execv failed"); + QPID_LOG(critical, "execv failed to start broker: prog=\"" << prog << "\"; args=\"" << args << "\"; errno=" << errno << " (" << std::strerror(errno) << ")"); + ::exit(1); } } Modified: qpid/trunk/qpid/cpp/src/tests/Makefile.am URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/Makefile.am?rev=790215&r1=790214&r2=790215&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/tests/Makefile.am (original) +++ qpid/trunk/qpid/cpp/src/tests/Makefile.am Wed Jul 1 15:21:54 2009 @@ -244,7 +244,9 @@ srcdir=$(srcdir) \ top_builddir=$(top_builddir) \ QPID_DATA_DIR= \ - QPID_LIB_DIR=../.libs \ + ACL_LIB=../.libs/acl.so \ + CLUSTER_LIB=../.libs/cluster.so \ + TEST_STORE_LIB=.libs/test_store.so \ BOOST_TEST_SHOW_PROGRESS=yes \ $(srcdir)/run_test Modified: qpid/trunk/qpid/cpp/src/tests/PartialFailure.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/PartialFailure.cpp?rev=790215&r1=790214&r2=790215&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/tests/PartialFailure.cpp (original) +++ qpid/trunk/qpid/cpp/src/tests/PartialFailure.cpp Wed Jul 1 15:21:54 2009 @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -50,8 +50,8 @@ void updateArgs(ClusterFixture::Args& args, size_t index) { ostringstream clusterLib, testStoreLib, storeName; - clusterLib << getLibPath("QPID_LIB_DIR", "../.libs") << "/cluster.so"; - testStoreLib << getLibPath("QPID_LIB_DIR", "../.libs") << "/../tests/.libs/test_store.so"; + clusterLib << getLibPath("CLUSTER_LIB", "../.libs/cluster.so"); + testStoreLib << getLibPath("TEST_STORE_LIB", ".libs/test_store.so"); storeName << "s" << index; args.push_back("--auth"); args.push_back("no"); @@ -88,112 +88,120 @@ // the statements expected to fail (in BOOST_CHECK_THROW) but that // sproadically lets out messages, possibly because they're in // Connection thread. - ScopedSuppressLogging allQuiet; - ClusterFixture cluster(3, updateArgs, -1); + ClusterFixture cluster(3, updateArgs, -1); Client c0(cluster[0], "c0"); Client c1(cluster[1], "c1"); Client c2(cluster[2], "c2"); - queueAndSub(c0); - c0.session.messageTransfer(content=Message("x", "c0")); - BOOST_CHECK_EQUAL(c0.lq.get(TIMEOUT).getData(), "x"); - - // Session error. - BOOST_CHECK_THROW(c0.session.exchangeBind(), SessionException); - c1.session.messageTransfer(content=Message("stay", "c0")); // Will stay on queue, session c0 is dead. - - // Connection error, kill c1 on all members. - queueAndSub(c1); - BOOST_CHECK_THROW( - c1.session.messageTransfer( - content=pMessage("TEST_STORE_DO: s0[exception] s1[exception] s2[exception] testNormalErrors", "c1")), - ConnectionException); - c2.session.messageTransfer(content=Message("stay", "c1")); // Will stay on queue, session/connection c1 is dead. - - BOOST_CHECK_EQUAL(3u, knownBrokerPorts(c2.connection, 3).size()); - BOOST_CHECK_EQUAL(c2.subs.get("c0", TIMEOUT).getData(), "stay"); - BOOST_CHECK_EQUAL(c2.subs.get("c1", TIMEOUT).getData(), "stay"); + { + ScopedSuppressLogging allQuiet; + queueAndSub(c0); + c0.session.messageTransfer(content=Message("x", "c0")); + BOOST_CHECK_EQUAL(c0.lq.get(TIMEOUT).getData(), "x"); + + // Session error. + BOOST_CHECK_THROW(c0.session.exchangeBind(), SessionException); + c1.session.messageTransfer(content=Message("stay", "c0")); // Will stay on queue, session c0 is dead. + + // Connection error, kill c1 on all members. + queueAndSub(c1); + BOOST_CHECK_THROW( + c1.session.messageTransfer( + content=pMessage("TEST_STORE_DO: s0[exception] s1[exception] s2[exception] testNormalErrors", "c1")), + ConnectionException); + c2.session.messageTransfer(content=Message("stay", "c1")); // Will stay on queue, session/connection c1 is dead. + + BOOST_CHECK_EQUAL(3u, knownBrokerPorts(c2.connection, 3).size()); + BOOST_CHECK_EQUAL(c2.subs.get("c0", TIMEOUT).getData(), "stay"); + BOOST_CHECK_EQUAL(c2.subs.get("c1", TIMEOUT).getData(), "stay"); + } } // Test errors after a new member joins to verify frame-sequence-numbers are ok in update. QPID_AUTO_TEST_CASE(testErrorAfterJoin) { - ScopedSuppressLogging allQuiet; - ClusterFixture cluster(1, updateArgs, -1); Client c0(cluster[0]); - c0.session.queueDeclare("q", durable=true); - c0.session.messageTransfer(content=pMessage("a", "q")); + { + ScopedSuppressLogging allQuiet; - // Kill the new guy - cluster.add(); - Client c1(cluster[1]); - c0.session.messageTransfer(content=pMessage("TEST_STORE_DO: s1[exception] testErrorAfterJoin", "q")); - BOOST_CHECK_THROW(c1.session.messageTransfer(content=pMessage("xxx", "q")), TransportFailure); - BOOST_CHECK_EQUAL(1u, knownBrokerPorts(c0.connection, 1).size()); - - // Kill the old guy - cluster.add(); - Client c2(cluster[2]); - c2.session.messageTransfer(content=pMessage("TEST_STORE_DO: s0[exception] testErrorAfterJoin2", "q")); - BOOST_CHECK_THROW(c0.session.messageTransfer(content=pMessage("xxx", "q")), TransportFailure); + c0.session.queueDeclare("q", durable=true); + c0.session.messageTransfer(content=pMessage("a", "q")); - BOOST_CHECK_EQUAL(1u, knownBrokerPorts(c2.connection, 1).size()); + // Kill the new guy + cluster.add(); + Client c1(cluster[1]); + c0.session.messageTransfer(content=pMessage("TEST_STORE_DO: s1[exception] testErrorAfterJoin", "q")); + BOOST_CHECK_THROW(c1.session.messageTransfer(content=pMessage("xxx", "q")), TransportFailure); + BOOST_CHECK_EQUAL(1u, knownBrokerPorts(c0.connection, 1).size()); + + // Kill the old guy + cluster.add(); + Client c2(cluster[2]); + c2.session.messageTransfer(content=pMessage("TEST_STORE_DO: s0[exception] testErrorAfterJoin2", "q")); + BOOST_CHECK_THROW(c0.session.messageTransfer(content=pMessage("xxx", "q")), TransportFailure); + + BOOST_CHECK_EQUAL(1u, knownBrokerPorts(c2.connection, 1).size()); + } } -// Test that if one member fails and others do not, the failure leaves the cluster. +// Test that if one member fails and others do not, the failure leaves the cluster. QPID_AUTO_TEST_CASE(testSinglePartialFailure) { - ScopedSuppressLogging allQuiet; - ClusterFixture cluster(3, updateArgs, -1); Client c0(cluster[0], "c0"); Client c1(cluster[1], "c1"); Client c2(cluster[2], "c2"); - - c0.session.queueDeclare("q", durable=true); - c0.session.messageTransfer(content=pMessage("a", "q")); - // Cause partial failure on c1 - c0.session.messageTransfer(content=pMessage("TEST_STORE_DO: s1[exception] testSinglePartialFailure", "q")); - BOOST_CHECK_THROW(c1.session.queueQuery("q"), TransportFailure); - - c0.session.messageTransfer(content=pMessage("b", "q")); - BOOST_CHECK_EQUAL(c0.session.queueQuery("q").getMessageCount(), 3u); - BOOST_CHECK_EQUAL(2u, knownBrokerPorts(c0.connection, 2).size()); - - // Cause partial failure on c2 - c0.session.messageTransfer(content=pMessage("TEST_STORE_DO: s2[exception] testSinglePartialFailure2", "q")); - BOOST_CHECK_THROW(c2.session.queueQuery("q"), TransportFailure); - - c0.session.messageTransfer(content=pMessage("c", "q")); - BOOST_CHECK_EQUAL(c0.session.queueQuery("q").getMessageCount(), 5u); - BOOST_CHECK_EQUAL(1u, knownBrokerPorts(c0.connection, 1).size()); + + { + ScopedSuppressLogging allQuiet; + + c0.session.queueDeclare("q", durable=true); + c0.session.messageTransfer(content=pMessage("a", "q")); + // Cause partial failure on c1 + c0.session.messageTransfer(content=pMessage("TEST_STORE_DO: s1[exception] testSinglePartialFailure", "q")); + BOOST_CHECK_THROW(c1.session.queueQuery("q"), TransportFailure); + + c0.session.messageTransfer(content=pMessage("b", "q")); + BOOST_CHECK_EQUAL(c0.session.queueQuery("q").getMessageCount(), 3u); + BOOST_CHECK_EQUAL(2u, knownBrokerPorts(c0.connection, 2).size()); + + // Cause partial failure on c2 + c0.session.messageTransfer(content=pMessage("TEST_STORE_DO: s2[exception] testSinglePartialFailure2", "q")); + BOOST_CHECK_THROW(c2.session.queueQuery("q"), TransportFailure); + + c0.session.messageTransfer(content=pMessage("c", "q")); + BOOST_CHECK_EQUAL(c0.session.queueQuery("q").getMessageCount(), 5u); + BOOST_CHECK_EQUAL(1u, knownBrokerPorts(c0.connection, 1).size()); + } } -// Test multiple partial falures: 2 fail 2 pass +// Test multiple partial falures: 2 fail 2 pass QPID_AUTO_TEST_CASE(testMultiPartialFailure) { - ScopedSuppressLogging allQuiet; - ClusterFixture cluster(4, updateArgs, -1); Client c0(cluster[0], "c0"); Client c1(cluster[1], "c1"); Client c2(cluster[2], "c2"); Client c3(cluster[3], "c3"); - - c0.session.queueDeclare("q", durable=true); - c0.session.messageTransfer(content=pMessage("a", "q")); - - // Cause partial failure on c1, c2 - c0.session.messageTransfer(content=pMessage("TEST_STORE_DO: s1[exception] s2[exception] testMultiPartialFailure", "q")); - BOOST_CHECK_THROW(c1.session.queueQuery("q"), TransportFailure); - BOOST_CHECK_THROW(c2.session.queueQuery("q"), TransportFailure); - - c0.session.messageTransfer(content=pMessage("b", "q")); - c3.session.messageTransfer(content=pMessage("c", "q")); - BOOST_CHECK_EQUAL(c3.session.queueQuery("q").getMessageCount(), 4u); - // FIXME aconway 2009-06-30: This check fails sporadically with 2 != 3. - // It should pass reliably. - // BOOST_CHECK_EQUAL(2u, knownBrokerPorts(c0.connection, 2).size()); + + { + ScopedSuppressLogging allQuiet; + + c0.session.queueDeclare("q", durable=true); + c0.session.messageTransfer(content=pMessage("a", "q")); + + // Cause partial failure on c1, c2 + c0.session.messageTransfer(content=pMessage("TEST_STORE_DO: s1[exception] s2[exception] testMultiPartialFailure", "q")); + BOOST_CHECK_THROW(c1.session.queueQuery("q"), TransportFailure); + BOOST_CHECK_THROW(c2.session.queueQuery("q"), TransportFailure); + + c0.session.messageTransfer(content=pMessage("b", "q")); + c3.session.messageTransfer(content=pMessage("c", "q")); + BOOST_CHECK_EQUAL(c3.session.queueQuery("q").getMessageCount(), 4u); + // FIXME aconway 2009-06-30: This check fails sporadically with 2 != 3. + // It should pass reliably. + // BOOST_CHECK_EQUAL(2u, knownBrokerPorts(c0.connection, 2).size()); + } } /** FIXME aconway 2009-04-10: @@ -203,25 +211,27 @@ */ #if 0 QPID_AUTO_TEST_CASE(testPartialFailureMemberLeaves) { - ScopedSuppressLogging allQuiet; - ClusterFixture cluster(2, updateArgs, -1); Client c0(cluster[0], "c0"); Client c1(cluster[1], "c1"); - c0.session.queueDeclare("q", durable=true); - c0.session.messageTransfer(content=pMessage("a", "q")); + { + ScopedSuppressLogging allQuiet; - // Cause failure on member 0 and simultaneous crash on member 1. - BOOST_CHECK_THROW( - c0.session.messageTransfer( - content=pMessage("TEST_STORE_DO: s0[exception] s1[exit_process] testPartialFailureMemberLeaves", "q")), - ConnectionException); - cluster.wait(1); - - Client c00(cluster[0], "c00"); // Old connection is dead. - BOOST_CHECK_EQUAL(c00.session.queueQuery("q").getMessageCount(), 1u); - BOOST_CHECK_EQUAL(1u, knownBrokerPorts(c00.connection, 1).size()); + c0.session.queueDeclare("q", durable=true); + c0.session.messageTransfer(content=pMessage("a", "q")); + + // Cause failure on member 0 and simultaneous crash on member 1. + BOOST_CHECK_THROW( + c0.session.messageTransfer( + content=pMessage("TEST_STORE_DO: s0[exception] s1[exit_process] testPartialFailureMemberLeaves", "q")), + ConnectionException); + cluster.wait(1); + + Client c00(cluster[0], "c00"); // Old connection is dead. + BOOST_CHECK_EQUAL(c00.session.queueQuery("q").getMessageCount(), 1u); + BOOST_CHECK_EQUAL(1u, knownBrokerPorts(c00.connection, 1).size()); + } } #endif Modified: qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp?rev=790215&r1=790214&r2=790215&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp (original) +++ qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp Wed Jul 1 15:21:54 2009 @@ -74,7 +74,7 @@ void prepareArgs(ClusterFixture::Args& args, const bool durableFlag = false) { ostringstream clusterLib; - clusterLib << getLibPath("QPID_LIB_DIR", "../.libs") << "/cluster.so"; + clusterLib << getLibPath("CLUSTER_LIB", "../.libs/cluster.so"); args += "--auth", "no", "--no-module-dir", "--load-module", clusterLib.str(); if (durableFlag) args += "--load-module", getLibPath("STORE_LIB"), "TMP_DATA_DIR"; @@ -190,7 +190,7 @@ static bool match(const AMQBody& , const AMQBody& ) { return false; } virtual boost::intrusive_ptr<AMQBody> clone() const { return new PoisonPill; } }; - + QPID_AUTO_TEST_CASE(testBadClientData) { // Ensure that bad data on a client connection closes the // connection but does not stop the broker. @@ -227,7 +227,7 @@ char cwd[1024]; BOOST_CHECK(::getcwd(cwd, sizeof(cwd))); ostringstream aclLib; - aclLib << getLibPath("QPID_LIB_DIR", "../.libs") << "/acl.so"; + aclLib << getLibPath("ACL_LIB", "../.libs/acl.so"); ClusterFixture::Args args; prepareArgs(args, durableFlag); args += "--acl-file", string(cwd) + "/cluster_test.acl", @@ -808,7 +808,7 @@ void run() { - try { + try { mgr.execute(*this); } catch (const std::exception& e) { @@ -854,7 +854,6 @@ } QPID_AUTO_TEST_CASE(testPolicyUpdate) { - ScopedSuppressLogging allQuiet; //tests that the policys internal state is accurate on newly //joined nodes ClusterFixture::Args args; @@ -862,26 +861,28 @@ prepareArgs(args, durableFlag); ClusterFixture cluster(1, args, -1); Client c1(cluster[0], "c1"); - QueueOptions options; - options.setSizePolicy(REJECT, 0, 2); - c1.session.queueDeclare("q", arg::arguments=options, arg::durable=durableFlag); - c1.session.messageTransfer(arg::content=makeMessage("one", "q", durableFlag)); - cluster.add(); - Client c2(cluster[1], "c2"); - c2.session.messageTransfer(arg::content=makeMessage("two", "q", durableFlag)); - - BOOST_CHECK_THROW(c2.session.messageTransfer(arg::content=makeMessage("three", "q", durableFlag)), framing::ResourceLimitExceededException); - - Message received; - BOOST_CHECK(c1.subs.get(received, "q")); - BOOST_CHECK_EQUAL(received.getData(), std::string("one")); - BOOST_CHECK(c1.subs.get(received, "q")); - BOOST_CHECK_EQUAL(received.getData(), std::string("two")); - BOOST_CHECK(!c1.subs.get(received, "q")); + { + ScopedSuppressLogging allQuiet; + QueueOptions options; + options.setSizePolicy(REJECT, 0, 2); + c1.session.queueDeclare("q", arg::arguments=options, arg::durable=durableFlag); + c1.session.messageTransfer(arg::content=makeMessage("one", "q", durableFlag)); + cluster.add(); + Client c2(cluster[1], "c2"); + c2.session.messageTransfer(arg::content=makeMessage("two", "q", durableFlag)); + + BOOST_CHECK_THROW(c2.session.messageTransfer(arg::content=makeMessage("three", "q", durableFlag)), framing::ResourceLimitExceededException); + + Message received; + BOOST_CHECK(c1.subs.get(received, "q")); + BOOST_CHECK_EQUAL(received.getData(), std::string("one")); + BOOST_CHECK(c1.subs.get(received, "q")); + BOOST_CHECK_EQUAL(received.getData(), std::string("two")); + BOOST_CHECK(!c1.subs.get(received, "q")); + } } QPID_AUTO_TEST_CASE(testExclusiveQueueUpdate) { - ScopedSuppressLogging allQuiet; //tests that exclusive queues are accurately replicated on newly //joined nodes ClusterFixture::Args args; @@ -889,19 +890,22 @@ prepareArgs(args, durableFlag); ClusterFixture cluster(1, args, -1); Client c1(cluster[0], "c1"); - c1.session.queueDeclare("q", arg::exclusive=true, arg::autoDelete=true, arg::alternateExchange="amq.fanout"); - cluster.add(); - Client c2(cluster[1], "c2"); - QueueQueryResult result = c2.session.queueQuery("q"); - BOOST_CHECK_EQUAL(result.getQueue(), std::string("q")); - BOOST_CHECK(result.getExclusive()); - BOOST_CHECK(result.getAutoDelete()); - BOOST_CHECK(!result.getDurable()); - BOOST_CHECK_EQUAL(result.getAlternateExchange(), std::string("amq.fanout")); - BOOST_CHECK_THROW(c2.session.queueDeclare(arg::queue="q", arg::exclusive=true, arg::passive=true), framing::ResourceLockedException); - c1.connection.close(); - c2.session = c2.connection.newSession(); - BOOST_CHECK_THROW(c2.session.queueDeclare(arg::queue="q", arg::passive=true), framing::NotFoundException); + { + ScopedSuppressLogging allQuiet; + c1.session.queueDeclare("q", arg::exclusive=true, arg::autoDelete=true, arg::alternateExchange="amq.fanout"); + cluster.add(); + Client c2(cluster[1], "c2"); + QueueQueryResult result = c2.session.queueQuery("q"); + BOOST_CHECK_EQUAL(result.getQueue(), std::string("q")); + BOOST_CHECK(result.getExclusive()); + BOOST_CHECK(result.getAutoDelete()); + BOOST_CHECK(!result.getDurable()); + BOOST_CHECK_EQUAL(result.getAlternateExchange(), std::string("amq.fanout")); + BOOST_CHECK_THROW(c2.session.queueDeclare(arg::queue="q", arg::exclusive=true, arg::passive=true), framing::ResourceLockedException); + c1.connection.close(); + c2.session = c2.connection.newSession(); + BOOST_CHECK_THROW(c2.session.queueDeclare(arg::queue="q", arg::passive=true), framing::NotFoundException); + } } /** @@ -940,7 +944,6 @@ } QPID_AUTO_TEST_CASE(testRingQueueUpdate) { - ScopedSuppressLogging allQuiet; //tests that ring queues are accurately replicated on newly //joined nodes ClusterFixture::Args args; @@ -948,24 +951,26 @@ prepareArgs(args, durableFlag); ClusterFixture cluster(1, args, -1); Client c1(cluster[0], "c1"); - QueueOptions options; - options.setSizePolicy(RING, 0, 5); - c1.session.queueDeclare("q", arg::arguments=options, arg::durable=durableFlag); - send(c1, "q", 5); - lockMessages(c1, "q", 1); - //add new node - cluster.add(); - BOOST_CHECK_EQUAL(2u, knownBrokerPorts(c1.connection, 2).size());//wait till joined - //send one more message - send(c1, "q", 1, 6); - //release locked message - c1.close(); - //check state of queue on both nodes - checkQueue(cluster, "q", list_of<string>("m_2")("m_3")("m_4")("m_5")("m_6")); + { + ScopedSuppressLogging allQuiet; + QueueOptions options; + options.setSizePolicy(RING, 0, 5); + c1.session.queueDeclare("q", arg::arguments=options, arg::durable=durableFlag); + send(c1, "q", 5); + lockMessages(c1, "q", 1); + //add new node + cluster.add(); + BOOST_CHECK_EQUAL(2u, knownBrokerPorts(c1.connection, 2).size());//wait till joined + //send one more message + send(c1, "q", 1, 6); + //release locked message + c1.close(); + //check state of queue on both nodes + checkQueue(cluster, "q", list_of<string>("m_2")("m_3")("m_4")("m_5")("m_6")); + } } QPID_AUTO_TEST_CASE(testRingQueueUpdate2) { - ScopedSuppressLogging allQuiet; //tests that ring queues are accurately replicated on newly joined //nodes; just like testRingQueueUpdate, but new node joins after //the sixth message has been sent. @@ -974,24 +979,26 @@ prepareArgs(args, durableFlag); ClusterFixture cluster(1, args, -1); Client c1(cluster[0], "c1"); - QueueOptions options; - options.setSizePolicy(RING, 0, 5); - c1.session.queueDeclare("q", arg::arguments=options, arg::durable=durableFlag); - send(c1, "q", 5); - lockMessages(c1, "q", 1); - //send sixth message - send(c1, "q", 1, 6); - //add new node - cluster.add(); - BOOST_CHECK_EQUAL(2u, knownBrokerPorts(c1.connection, 2).size());//wait till joined - //release locked message - c1.close(); - //check state of queue on both nodes - checkQueue(cluster, "q", list_of<string>("m_2")("m_3")("m_4")("m_5")("m_6")); + { + ScopedSuppressLogging allQuiet; + QueueOptions options; + options.setSizePolicy(RING, 0, 5); + c1.session.queueDeclare("q", arg::arguments=options, arg::durable=durableFlag); + send(c1, "q", 5); + lockMessages(c1, "q", 1); + //send sixth message + send(c1, "q", 1, 6); + //add new node + cluster.add(); + BOOST_CHECK_EQUAL(2u, knownBrokerPorts(c1.connection, 2).size());//wait till joined + //release locked message + c1.close(); + //check state of queue on both nodes + checkQueue(cluster, "q", list_of<string>("m_2")("m_3")("m_4")("m_5")("m_6")); + } } QPID_AUTO_TEST_CASE(testRelease) { - ScopedSuppressLogging allQuiet; //tests that releasing a messages that was unacked when one node //joined works correctly ClusterFixture::Args args; @@ -999,31 +1006,34 @@ prepareArgs(args, durableFlag); ClusterFixture cluster(1, args, -1); Client c1(cluster[0], "c1"); - c1.session.queueDeclare("q", arg::durable=durableFlag); - for (int i = 0; i < 5; i++) { - c1.session.messageTransfer(arg::content=makeMessage((boost::format("%1%_%2%") % "m" % (i+1)).str(), "q", durableFlag)); + { + ScopedSuppressLogging allQuiet; + c1.session.queueDeclare("q", arg::durable=durableFlag); + for (int i = 0; i < 5; i++) { + c1.session.messageTransfer(arg::content=makeMessage((boost::format("%1%_%2%") % "m" % (i+1)).str(), "q", durableFlag)); + } + //receive but don't ack a message + LocalQueue lq; + SubscriptionSettings lqSettings(FlowControl::messageCredit(1)); + lqSettings.autoAck = 0; + Subscription lqSub = c1.subs.subscribe(lq, "q", lqSettings); + c1.session.messageFlush("q"); + Message received; + BOOST_CHECK(lq.get(received)); + BOOST_CHECK_EQUAL(received.getData(), std::string("m_1")); + + //add new node + cluster.add(); + + lqSub.release(lqSub.getUnaccepted()); + + //check state of queue on both nodes + vector<string> expected = list_of<string>("m_1")("m_2")("m_3")("m_4")("m_5"); + Client c3(cluster[0], "c3"); + BOOST_CHECK_EQUAL(browse(c3, "q", 5), expected); + Client c2(cluster[1], "c2"); + BOOST_CHECK_EQUAL(browse(c2, "q", 5), expected); } - //receive but don't ack a message - LocalQueue lq; - SubscriptionSettings lqSettings(FlowControl::messageCredit(1)); - lqSettings.autoAck = 0; - Subscription lqSub = c1.subs.subscribe(lq, "q", lqSettings); - c1.session.messageFlush("q"); - Message received; - BOOST_CHECK(lq.get(received)); - BOOST_CHECK_EQUAL(received.getData(), std::string("m_1")); - - //add new node - cluster.add(); - - lqSub.release(lqSub.getUnaccepted()); - - //check state of queue on both nodes - vector<string> expected = list_of<string>("m_1")("m_2")("m_3")("m_4")("m_5"); - Client c3(cluster[0], "c3"); - BOOST_CHECK_EQUAL(browse(c3, "q", 5), expected); - Client c2(cluster[1], "c2"); - BOOST_CHECK_EQUAL(browse(c2, "q", 5), expected); } QPID_AUTO_TEST_SUITE_END() --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscr...@qpid.apache.org