Hi Andy/Gordon et al.
I really could do with some help from performance gurus.....
OK So I think I've reproduced some of the symptoms I described in my
earlier email. I used the attached demo producer/consumer
qpid::messaging clients.
So if I run ./ItemConsumer that creates my perftest queue and waits for
messages, then if I run ./ItemProducer I get messages whizzing through
at a decent enough rate.
My box at home is a Dell laptop with Intel(R) Core(TM)2 Duo CPU P7450
@ 2.13GHz, cpu MHz 800.000, cache size 3072 KB and running qpidd plus an
instance of ItemProducer and ItemConsumer I'm getting ~17,000 900 octet
messages per second and top shows qpidd only goes as far as ~60%
Now the weirder symptom is that if I then kill ItemConsumer, run
ItemProducer for 200,000 messages or so to fill the queue then run
ItemConsumer it shows that it has acknowledged the first batch of 20000
or so messages then it hangs and top shows qpidd maxing out at 100%.
At that point ItemProducer still seems to be producing, but ItemConsumer
only periodically acks. If I then kill ItemProducer I get ItemConsumer
ramping up again.
I've set the receiver capacity to 500, though I've tried several values
of this but doesn't seem to make much difference to this.
This is really freaky why does the consumer performance drop off
dramatically when the ring queue is full. Is it a flow control thing?
How can I disable it in qpid::messaging????
Another freaky thing is that I've got tcp-nodelay enabled in my clients,
but I found that setting --tcp-nodelay on the broker (testing with 0.10)
the performance actually GOES DOWN!!!! So setting --tcp-nodelay on the
broker helps if I don't set it in the clients but not as much is I do
set it in the clients and if I set it on both the performance is less
than if I just set it on the clients ---- weird!!!!!
So c'mon folks rise to the challenge and suggest some settings to make
my clients smoke (and stop being weird when the queue fills).
And how do I get qpidd on a multi-core box show more than 100% CPU - do
I have to run multiple qpidd instances - I can't believe that's the case????
MTIA
Frase
Andy Goldstein wrote:
On Sep 23, 2011, at 9:11 AM, Fraser Adams wrote:
I'll mention that to the guys when I get back to the office. Though it seems a bit
counterintuitive to me I'd have thought that having a lower number of worker threads
wouldn't utilise the available cores. By "logic" running two (or even eight)
worker threads on your 48 core server seems low - any idea what's going on to explain
your results??
I can't say for sure, but I would guess there's maybe more lock contention
going on in the broker when you have more threads.
So have you reproduced the MRG paper results? That paper, which is over three
years old now, has figures of 380,000 256 octet messages in plus out on a 2 x 4
core Xeon box. We've not come *close* to that figure and my developers are far
from dummies. The paper describes the methodology quite well, but doesn't quite
spell out as a tutorial exactly what the setup was.
What numbers are you getting, and how are you testing?
#include <qpid/messaging/Connection.h>
#include <qpid/messaging/Message.h>
#include <qpid/messaging/Sender.h>
#include <qpid/messaging/Session.h>
#include <sys/time.h> // For gettimeofday()
#include <iostream>
using namespace std;
using namespace qpid::messaging;
unsigned long currentTimeMillis() {
struct timeval curTime;
gettimeofday(&curTime, NULL);
return (curTime.tv_usec + curTime.tv_sec * 1000000ul)/1000;
}
int main(int argc, char** argv) {
string broker = "localhost:5672";
//string address = "amq.match";
string address = "perftest";
//string connectionOptions = "{reconnect: true}";
string connectionOptions = "{reconnect: true, tcp-nodelay: true}";
Connection connection(broker, connectionOptions);
try {
connection.open();
Session session = connection.createSession();
Sender sender = session.createSender(address);
cout << "default capacity = " << sender.getCapacity() << endl;
//sender.setCapacity(500);
int NUMBER_OF_ITERATIONS = 1000000;
unsigned long startTime = currentTimeMillis();
unsigned long ackTime = startTime;
int ITEM_SIZE = 900;
for (int i = 0; i < NUMBER_OF_ITERATIONS; i++) {
char* buffer = new char[ITEM_SIZE];
Message message(buffer, ITEM_SIZE);
message.setProperty("item-owner", "fadams");
message.setProperty("data-service", "amqp-delivery");
sender.send(message);
delete buffer;
unsigned long curTime = currentTimeMillis();
if ((curTime - ackTime) > 1000) {
cout << "auto synced message #" << i << endl;
session.sync();
ackTime = curTime;
}
}
session.sync();
unsigned long finishTime = currentTimeMillis();
cout << "Elapsed time = " << (finishTime - startTime) << ", messages/second = " << NUMBER_OF_ITERATIONS*1000.0f/(finishTime - startTime) << endl;
connection.close();
return 0;
} catch(const exception& error) {
cerr << "ItemProducer Exception: " << error.what() << endl;
connection.close();
return 1;
}
}
#include <qpid/messaging/Connection.h>
#include <qpid/messaging/Message.h>
#include <qpid/messaging/Receiver.h>
#include <qpid/messaging/Session.h>
#include <sys/time.h> // For gettimeofday()
#include <iostream>
using namespace std;
using namespace qpid::messaging;
unsigned long currentTimeMillis() {
struct timeval curTime;
gettimeofday(&curTime, NULL);
return (curTime.tv_usec + curTime.tv_sec * 1000000ul)/1000;
}
int kbhit(void)
{
struct timeval tv; fd_set
read_fd; /* Do not wait at all, not even a microsecond */
tv.tv_sec=0;
tv.tv_usec=0; /* Must be done first to initialize read_fd */
FD_ZERO(&read_fd); /* Makes select() ask if input is ready: 0 is the file descriptor for stdin */
FD_SET(0,&read_fd); /* The first parameter is the number of the largest file descriptor to check + 1. */
if (select(1, &read_fd, NULL, /*No writes*/ NULL, /*No exceptions*/&tv) == -1) return 0; /* An error occured */
/* read_fd now holds a bit map of files that are readable. We test the entry for the standard input (file 0). */
if (FD_ISSET(0,&read_fd)) return 1; /* no characters were pending */
return 0;
}
int main(int argc, char** argv) {
string broker = "localhost:5672";
/*string address = "perftest; {create: receiver, node: {x-declare: {arguments: {'qpid.policy_type': ring, 'qpid.max_size': 500000000}}, x-bindings: [{exchange: 'amq.match', queue: 'perftest', key: 'data1', arguments: {x-match: all, data-service: amqp-delivery, item-owner: fadams}}]}}";*/
string address = "perftest; {create: receiver, node: {x-declare: {arguments: {'qpid.policy_type': ring, 'qpid.max_size': 100000000}}}}";
//string connectionOptions = "{reconnect: true}";
string connectionOptions = "{reconnect: true, tcp-nodelay: true}";
int count = 0;
Connection connection(broker, connectionOptions);
try {
connection.open();
Session session = connection.createSession();
Receiver receiver = session.createReceiver(address);
cout << "default capacity = " << receiver.getCapacity() << endl;
receiver.setCapacity(500); // Enable receiver prefetch
unsigned long startTime = currentTimeMillis();
unsigned long ackTime = startTime;
while (true) {
Message message;
if (receiver.fetch(message, Duration::SECOND * 1)) {
//const char* buffer = message.getContentPtr();
//cout << "message count = " << count << ", length = " << message.getContentSize() << endl;
unsigned long curTime = currentTimeMillis();
if ((curTime - ackTime) > 1000) {
cout << "auto acknowledged message #" << count << endl;
session.acknowledge();
ackTime = curTime;
}
count++;
} else {
unsigned long curTime = currentTimeMillis();
if ((curTime - ackTime) > 1000) {
cout << "timeout auto acknowledged message #" << count << endl;
session.acknowledge();
ackTime = curTime;
}
unsigned long finishTime = currentTimeMillis();
cout << "Elapsed time = " << (finishTime - startTime) << ", messages/second = " << count*1000.0f/(finishTime - startTime) << endl;
startTime = finishTime;
count = 0;
}
if (kbhit()) break;
}
session.acknowledge();
connection.close();
return 0;
} catch(const exception& error) {
cerr << "ItemConsumer Exception: " << error.what() << endl;
connection.close();
return 1;
}
}
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]