Author: mgoulish Date: Fri Nov 13 05:37:54 2009 New Revision: 835746 URL: http://svn.apache.org/viewvc?rev=835746&view=rev Log: Make failover_soak and its children adjustable as to the number of brokers in the cluster, and the number of queues talking through the cluster during the test.
resuming_receiver.cpp and replaying_sender.cpp now take command line args to control the queue name. If more than 1 queue is desired, failover_soak.cpp will start up N queue, each with its own sender and receiver. Queue names are now made unique with the failover_soak PID as part of their name. Modified: qpid/trunk/qpid/cpp/src/tests/declare_queues.cpp qpid/trunk/qpid/cpp/src/tests/failover_soak.cpp qpid/trunk/qpid/cpp/src/tests/replaying_sender.cpp qpid/trunk/qpid/cpp/src/tests/resuming_receiver.cpp qpid/trunk/qpid/cpp/src/tests/run_failover_soak Modified: qpid/trunk/qpid/cpp/src/tests/declare_queues.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/declare_queues.cpp?rev=835746&r1=835745&r2=835746&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/tests/declare_queues.cpp (original) +++ qpid/trunk/qpid/cpp/src/tests/declare_queues.cpp Fri Nov 13 05:37:54 2009 @@ -25,46 +25,73 @@ #include <cstdlib> #include <iostream> +#include <sstream> using namespace qpid::client; using namespace std; -int main(int argc, char ** argv) +int +main(int argc, char ** argv) { ConnectionSettings settings; - if ( argc != 4 ) + if ( argc != 6 ) { - cerr << "Usage: declare_queues host port durability\n"; + cerr << "Usage: declare_queues host port durability queue_name_prefix n_queues\n"; return 1; } settings.host = argv[1]; settings.port = atoi(argv[2]); int durability = atoi(argv[3]); + char const * queue_name_prefix = argv[4]; + int n_queues = atoi(argv[5]); FailoverManager connection(settings); - try { - bool complete = false; - while (!complete) { - Session session = connection.connect().newSession(); - try { - if ( durability ) - session.queueDeclare(arg::queue="message_queue", arg::durable=true); - else - session.queueDeclare(arg::queue="message_queue"); - complete = true; - } catch (const qpid::TransportFailure&) {} - } - connection.close(); - return 0; - } catch (const exception& error) { - cerr << "declare_queues failed:" << error.what() << endl; - cerr << " host: " << settings.host - << " port: " << settings.port << endl; - return 1; + + int max_fail = 13; + for ( int i = 0; i < n_queues; ++ i ) { + stringstream queue_name; + queue_name << queue_name_prefix << '_' << i; + + bool queue_created = false; + int failure_count; + + // Any non-transport failure is Bad. + try + { + while ( ! queue_created ) { + Session session = connection.connect().newSession(); + // TransportFailures aren't too bad -- they might happen because + // we are doing a cluster failover test. But if we get too many, + // we will still bug out. + failure_count = 0; + try { + if ( durability ) + session.queueDeclare(arg::queue=queue_name.str(), arg::durable=true); + else + session.queueDeclare(arg::queue=queue_name.str()); + queue_created = true; + cout << "declare_queues: Created queue " << queue_name.str() << endl; + } + catch ( const qpid::TransportFailure& ) { + if ( ++ failure_count >= max_fail ) { + cerr << "declare_queues failed: too many transport errors.\n"; + cerr << " host: " << settings.host + << " port: " << settings.port << endl; + return 1; + } + sleep ( 1 ); + } + } + } + catch ( const exception & error) { + cerr << "declare_queues failed:" << error.what() << endl; + cerr << " host: " << settings.host + << " port: " << settings.port << endl; + return 1; + } } - } Modified: qpid/trunk/qpid/cpp/src/tests/failover_soak.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/failover_soak.cpp?rev=835746&r1=835745&r2=835746&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/tests/failover_soak.cpp (original) +++ qpid/trunk/qpid/cpp/src/tests/failover_soak.cpp Fri Nov 13 05:37:54 2009 @@ -433,7 +433,9 @@ char const * host, char const * path, int verbosity, - int durable + int durable, + char const * queue_prefix, + int n_queues ) { string name("declareQueues"); @@ -456,6 +458,13 @@ argv.push_back ( "1" ); else argv.push_back ( "0" ); + + argv.push_back ( queue_prefix ); + + char n_queues_str[20]; + sprintf ( n_queues_str, "%d", n_queues ); + argv.push_back ( n_queues_str ); + argv.push_back ( 0 ); pid_t pid = fork(); @@ -475,10 +484,11 @@ pid_t startReceivingClient ( brokerVector brokers, - char const * host, - char const * receiverPath, - char const * reportFrequency, - int verbosity + char const * host, + char const * receiverPath, + char const * reportFrequency, + int verbosity, + char const * queue_name ) { string name("receiver"); @@ -502,6 +512,7 @@ argv.push_back ( portStr ); argv.push_back ( reportFrequency ); argv.push_back ( verbosityStr ); + argv.push_back ( queue_name ); argv.push_back ( 0 ); pid_t pid = fork(); @@ -522,12 +533,13 @@ pid_t startSendingClient ( brokerVector brokers, - char const * host, - char const * senderPath, - char const * nMessages, - char const * reportFrequency, + char const * host, + char const * senderPath, + char const * nMessages, + char const * reportFrequency, int verbosity, - int durability + int durability, + char const * queue_name ) { string name("sender"); @@ -555,6 +567,7 @@ argv.push_back ( "1" ); else argv.push_back ( "0" ); + argv.push_back ( queue_name ); argv.push_back ( 0 ); pid_t pid = fork(); @@ -589,10 +602,10 @@ int main ( int argc, char const ** argv ) { - if ( argc != 9 ) { + if ( argc != 11 ) { cerr << "Usage: " << argv[0] - << "moduleOrDir declareQueuesPath senderPath receiverPath nMessages reportFrequency verbosity durable" + << "moduleOrDir declareQueuesPath senderPath receiverPath nMessages reportFrequency verbosity durable n_queues n_brokers" << endl; cerr << "\tverbosity is an integer, durable is 0 or 1\n"; return BAD_ARGS; @@ -608,6 +621,8 @@ char const * reportFrequency = argv[i++]; int verbosity = atoi(argv[i++]); int durable = atoi(argv[i++]); + int n_queues = atoi(argv[i++]); + int n_brokers = atoi(argv[i++]); char const * host = "127.0.0.1"; int maxBrokers = 50; @@ -625,8 +640,7 @@ if ( verbosity > 1 ) cout << "Starting initial cluster...\n"; - int nBrokers = 3; - for ( int i = 0; i < nBrokers; ++ i ) { + for ( int i = 0; i < n_brokers; ++ i ) { startNewBroker ( brokers, moduleOrDir, clusterName, @@ -638,10 +652,22 @@ if ( verbosity > 0 ) printBrokers ( brokers ); + // Get prefix for each queue name. + stringstream queue_prefix; + queue_prefix << "failover_soak_" << getpid(); + + // Run the declareQueues child. int childStatus; pid_t dqClientPid = - runDeclareQueuesClient ( brokers, host, declareQueuesPath, verbosity, durable ); + runDeclareQueuesClient ( brokers, + host, + declareQueuesPath, + verbosity, + durable, + queue_prefix.str().c_str(), + n_queues + ); if ( -1 == dqClientPid ) { cerr << "END_OF_TEST ERROR_START_DECLARE_1\n"; return CANT_FORK_DQ; @@ -656,32 +682,42 @@ allMyChildren.exited ( dqClientPid, childStatus ); - - // Start the receiving client. - pid_t receivingClientPid = - startReceivingClient ( brokers, - host, - receiverPath, - reportFrequency, - verbosity ); - if ( -1 == receivingClientPid ) { - cerr << "END_OF_TEST ERROR_START_RECEIVER\n"; - return CANT_FORK_RECEIVER; - } + /* + Start one receiving and one sending client for each queue. + */ + for ( int i = 0; i < n_queues; ++ i ) { + + stringstream queue_name; + queue_name << queue_prefix.str() << '_' << i; + + // Receiving client --------------------------- + pid_t receivingClientPid = + startReceivingClient ( brokers, + host, + receiverPath, + reportFrequency, + verbosity, + queue_name.str().c_str() ); + if ( -1 == receivingClientPid ) { + cerr << "END_OF_TEST ERROR_START_RECEIVER\n"; + return CANT_FORK_RECEIVER; + } - // Start the sending client. - pid_t sendingClientPid = - startSendingClient ( brokers, - host, - senderPath, - nMessages, - reportFrequency, - verbosity, - durable ); - if ( -1 == sendingClientPid ) { - cerr << "END_OF_TEST ERROR_START_SENDER\n"; - return CANT_FORK_SENDER; + // Sending client --------------------------- + pid_t sendingClientPid = + startSendingClient ( brokers, + host, + senderPath, + nMessages, + reportFrequency, + verbosity, + durable, + queue_name.str().c_str() ); + if ( -1 == sendingClientPid ) { + cerr << "END_OF_TEST ERROR_START_SENDER\n"; + return CANT_FORK_SENDER; + } } @@ -689,7 +725,7 @@ maxSleep = 4; - for ( int totalBrokers = 3; + for ( int totalBrokers = n_brokers; totalBrokers < maxBrokers; ++ totalBrokers ) Modified: qpid/trunk/qpid/cpp/src/tests/replaying_sender.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/replaying_sender.cpp?rev=835746&r1=835745&r2=835746&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/tests/replaying_sender.cpp (original) +++ qpid/trunk/qpid/cpp/src/tests/replaying_sender.cpp Fri Nov 13 05:37:54 2009 @@ -54,14 +54,21 @@ uint sent; const uint reportFrequency; Message message; - int verbosity; int persistence; + string queueName; }; -Sender::Sender(const std::string& queue, uint count_, uint reportFreq ) : sender(10), count(count_), sent(0), reportFrequency(reportFreq), verbosity(0), persistence(0) +Sender::Sender(const std::string& queue, uint count_, uint reportFreq ) + : sender(10), + count(count_), + sent(0), + reportFrequency(reportFreq), + verbosity(0), + persistence(0), + queueName ( queue ) { - message.getDeliveryProperties().setRoutingKey(queue); + message.getDeliveryProperties().setRoutingKey(queueName.c_str()); } void Sender::execute(AsyncSession& session, bool isRetry) @@ -81,7 +88,13 @@ sender.send(message); if (count > reportFrequency && !(sent % reportFrequency)) { if ( verbosity > 0 ) - std::cout << "Sender sent " << sent << " of " << count << std::endl; + std::cout << "Sender sent " + << sent + << " of " + << count + << " on queue " + << queueName + << std::endl; } } message.setData("That's all, folks!"); @@ -104,9 +117,9 @@ { ConnectionSettings settings; - if ( argc != 7 ) + if ( argc != 8 ) { - std::cerr << "Usage: replaying_sender host port n_messages report_frequency verbosity persistence\n"; + std::cerr << "Usage: replaying_sender host port n_messages report_frequency verbosity persistence queue_name\n"; return 1; } @@ -116,9 +129,10 @@ int reportFrequency = atoi(argv[4]); int verbosity = atoi(argv[5]); int persistence = atoi(argv[6]); + char * queue_name = argv[7]; FailoverManager connection(settings); - Sender sender("message_queue", n_messages, reportFrequency ); + Sender sender(queue_name, n_messages, reportFrequency ); sender.setVerbosity ( verbosity ); sender.setPersistence ( persistence ); try { @@ -127,7 +141,8 @@ { std::cout << "Sender finished. Sent " << sender.getSent() - << " messages." + << " messages on queue " + << queue_name << endl; } connection.close(); Modified: qpid/trunk/qpid/cpp/src/tests/resuming_receiver.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/resuming_receiver.cpp?rev=835746&r1=835745&r2=835746&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/tests/resuming_receiver.cpp (original) +++ qpid/trunk/qpid/cpp/src/tests/resuming_receiver.cpp Fri Nov 13 05:37:54 2009 @@ -27,7 +27,6 @@ #include <iostream> #include <fstream> - using namespace qpid; using namespace qpid::client; using namespace qpid::framing; @@ -35,6 +34,7 @@ using namespace std; + namespace qpid { namespace tests { @@ -43,31 +43,34 @@ public FailoverManager::ReconnectionStrategy { public: - Listener ( int report_frequency = 1000, int verbosity = 0 ); + Listener ( int report_frequency = 1000, + int verbosity = 0, + char const * queue_name = "message_queue" ); void received(Message& message); void execute(AsyncSession& session, bool isRetry); void check(); - void editUrlList(std::vector<Url>& urls); + void editUrlList(vector<Url>& urls); private: Subscription subscription; uint count; - uint received_twice; + vector<int> received_twice; uint lastSn; bool gaps; uint reportFrequency; int verbosity; bool done; + string queueName; }; -Listener::Listener(int freq, int verbosity) +Listener::Listener ( int freq, int verbosity, char const * name ) : count(0), - received_twice(0), lastSn(0), gaps(false), reportFrequency(freq), verbosity(verbosity), - done(false) + done(false), + queueName ( name ) {} @@ -78,36 +81,51 @@ done = true; if(verbosity > 0 ) { - std::cout << "Shutting down listener for " - << message.getDestination() << std::endl; + cout << "Shutting down listener for " + << message.getDestination() << endl; - std::cout << "Listener received " + cout << "Listener received " << count << " messages (" - << received_twice + << received_twice.size() << " received_twice)" << endl; + } subscription.cancel(); if ( verbosity > 0 ) - std::cout << "LISTENER COMPLETED\n"; + cout << "LISTENER COMPLETED\n"; + + if ( ! gaps ) { + cout << "no gaps were detected\n"; + cout << received_twice.size() << " messages were received twice.\n"; + } + else { + cout << "gaps detected\n"; + for ( unsigned int i = 0; i < received_twice.size(); ++ i ) + cout << "received_twice " + << received_twice[i] + << endl; + } } else { uint sn = message.getHeaders().getAsInt("sn"); if (lastSn < sn) { if (sn - lastSn > 1) { - std::cerr << "Error: gap in sequence between " << lastSn << " and " << sn << std::endl; + cerr << "Error: gap in sequence between " << lastSn << " and " << sn << endl; gaps = true; } lastSn = sn; ++count; if ( ! ( count % reportFrequency ) ) { if ( verbosity > 0 ) - std::cout << "Listener has received " + cout << "Listener has received " << count - << " messages.\n"; + << " messages on queue " + << queueName + << endl; } } else { - ++received_twice; + received_twice.push_back ( sn ); } } } @@ -119,21 +137,21 @@ void Listener::execute(AsyncSession& session, bool isRetry) { if (verbosity > 0) - std::cout << "resuming_receiver " << (isRetry ? "first " : "re-") << "connect." << endl; + cout << "resuming_receiver " << (isRetry ? "first " : "re-") << "connect." << endl; if (!done) { SubscriptionManager subs(session); - subscription = subs.subscribe(*this, "message_queue"); + subscription = subs.subscribe(*this, queueName); subs.run(); } } -void Listener::editUrlList(std::vector<Url>& urls) +void Listener::editUrlList(vector<Url>& urls) { /** * A more realistic algorithm would be to search through the list * for prefered hosts and ensure they come first in the list. */ - if (urls.size() > 1) std::rotate(urls.begin(), urls.begin() + 1, urls.end()); + if (urls.size() > 1) rotate(urls.begin(), urls.begin() + 1, urls.end()); } }} // namespace qpid::tests @@ -144,9 +162,9 @@ { ConnectionSettings settings; - if ( argc != 5 ) + if ( argc != 6 ) { - std::cerr << "Usage: resuming_receiver host port report_frequency verbosity\n"; + cerr << "Usage: resuming_receiver host port report_frequency verbosity queue_name\n"; return 1; } @@ -154,8 +172,9 @@ settings.port = atoi(argv[2]); int reportFrequency = atoi(argv[3]); int verbosity = atoi(argv[4]); + char * queue_name = argv[5]; - Listener listener(reportFrequency, verbosity); + Listener listener ( reportFrequency, verbosity, queue_name ); FailoverManager connection(settings, &listener); try { @@ -163,8 +182,8 @@ connection.close(); listener.check(); return 0; - } catch(const std::exception& error) { - std::cerr << "Receiver failed: " << error.what() << std::endl; + } catch(const exception& error) { + cerr << "Receiver failed: " << error.what() << endl; } return 1; } Modified: qpid/trunk/qpid/cpp/src/tests/run_failover_soak URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/run_failover_soak?rev=835746&r1=835745&r2=835746&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/tests/run_failover_soak (original) +++ qpid/trunk/qpid/cpp/src/tests/run_failover_soak Fri Nov 13 05:37:54 2009 @@ -24,11 +24,13 @@ host=127.0.0.1 MODULES=${MODULES:-../.libs} -MESSAGES=${MESSAGES:-300000} -REPORT_FREQUENCY=${REPORT_FREQUENCY:-`expr $MESSAGES / 20`} -VERBOSITY=${VERBOSITY:-1} -DURABILITY=${DURABILITY:-1} +MESSAGES=${MESSAGES:-1000000} +REPORT_FREQUENCY=${REPORT_FREQUENCY:-20000} +VERBOSITY=${VERBOSITY:-10} +DURABILITY=${DURABILITY:-0} +N_QUEUES=${N_QUEUES:-1} +N_BROKERS=${N_BROKERS:-3} rm -f soak-*.log -exec ./failover_soak $MODULES ./declare_queues ./replaying_sender ./resuming_receiver $MESSAGES $REPORT_FREQUENCY $VERBOSITY $DURABILITY +exec ./failover_soak $MODULES ./declare_queues ./replaying_sender ./resuming_receiver $MESSAGES $REPORT_FREQUENCY $VERBOSITY $DURABILITY $N_QUEUES $N_BROKERS --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscr...@qpid.apache.org