Hi Andy/Gordon et al.
I really could do with some help from performance gurus.....

OK So I think I've reproduced some of the symptoms I described in my earlier email. I used the attached demo producer/consumer qpid::messaging clients.

So if I run ./ItemConsumer that creates my perftest queue and waits for messages, then if I run ./ItemProducer I get messages whizzing through at a decent enough rate.

My box at home is a Dell laptop with Intel(R) Core(TM)2 Duo CPU P7450 @ 2.13GHz, cpu MHz 800.000, cache size 3072 KB and running qpidd plus an instance of ItemProducer and ItemConsumer I'm getting ~17,000 900 octet messages per second and top shows qpidd only goes as far as ~60%

Now the weirder symptom is that if I then kill ItemConsumer, run ItemProducer for 200,000 messages or so to fill the queue then run ItemConsumer it shows that it has acknowledged the first batch of 20000 or so messages then it hangs and top shows qpidd maxing out at 100%.

At that point ItemProducer still seems to be producing, but ItemConsumer only periodically acks. If I then kill ItemProducer I get ItemConsumer ramping up again.

I've set the receiver capacity to 500, though I've tried several values of this but doesn't seem to make much difference to this.

This is really freaky why does the consumer performance drop off dramatically when the ring queue is full. Is it a flow control thing? How can I disable it in qpid::messaging????

Another freaky thing is that I've got tcp-nodelay enabled in my clients, but I found that setting --tcp-nodelay on the broker (testing with 0.10) the performance actually GOES DOWN!!!! So setting --tcp-nodelay on the broker helps if I don't set it in the clients but not as much is I do set it in the clients and if I set it on both the performance is less than if I just set it on the clients ---- weird!!!!!

So c'mon folks rise to the challenge and suggest some settings to make my clients smoke (and stop being weird when the queue fills).

And how do I get qpidd on a multi-core box show more than 100% CPU - do I have to run multiple qpidd instances - I can't believe that's the case????

MTIA
Frase

Andy Goldstein wrote:
On Sep 23, 2011, at 9:11 AM, Fraser Adams wrote:

I'll mention that to the guys when I get back to the office. Though it seems a bit 
counterintuitive to me I'd have thought that having a lower number of worker threads 
wouldn't utilise the available cores. By "logic" running two (or even eight) 
worker threads on your 48 core server seems low - any idea what's going on to explain 
your results??

I can't say for sure, but I would guess there's maybe more lock contention 
going on in the broker when you have more threads.

So have you reproduced the MRG paper results? That paper, which is over three 
years old now, has figures of 380,000 256 octet messages in plus out on a 2 x 4 
core Xeon box. We've not come *close* to that figure and my developers are far 
from dummies. The paper describes the methodology quite well, but doesn't quite 
spell out as a tutorial exactly what the setup was.

What numbers are you getting, and how are you testing?


#include <qpid/messaging/Connection.h>
#include <qpid/messaging/Message.h>
#include <qpid/messaging/Sender.h>
#include <qpid/messaging/Session.h>
#include <sys/time.h> // For gettimeofday()
#include <iostream>

using namespace std;
using namespace qpid::messaging;

unsigned long currentTimeMillis() {
	struct timeval curTime;
	gettimeofday(&curTime, NULL);
	return (curTime.tv_usec + curTime.tv_sec * 1000000ul)/1000;
}

int main(int argc, char** argv) {
    string broker = "localhost:5672";
    //string address = "amq.match";
    string address = "perftest";
    //string connectionOptions = "{reconnect: true}";
    string connectionOptions = "{reconnect: true, tcp-nodelay: true}";
    
    Connection connection(broker, connectionOptions);
    try {
        connection.open();
        Session session = connection.createSession();
        Sender sender = session.createSender(address);
cout << "default capacity = " << sender.getCapacity() << endl;
		//sender.setCapacity(500);

		int NUMBER_OF_ITERATIONS = 1000000;
		unsigned long startTime = currentTimeMillis();
		unsigned long ackTime = startTime;

		int ITEM_SIZE = 900;
		for (int i = 0; i < NUMBER_OF_ITERATIONS; i++) {
			char* buffer = new char[ITEM_SIZE];
			Message message(buffer, ITEM_SIZE);

			message.setProperty("item-owner", "fadams");
			message.setProperty("data-service", "amqp-delivery");

        	sender.send(message);
			delete buffer;

			unsigned long curTime = currentTimeMillis();
			if ((curTime - ackTime) > 1000) {
				cout << "auto synced message #" << i << endl;
				session.sync();
				ackTime = curTime;
			}
		}
		
		session.sync();
		unsigned long finishTime = currentTimeMillis();

		cout << "Elapsed time = " << (finishTime - startTime) << ", messages/second = " << NUMBER_OF_ITERATIONS*1000.0f/(finishTime - startTime) << endl;
    
        connection.close();
        return 0;
    } catch(const exception& error) {
        cerr << "ItemProducer Exception: " << error.what() << endl;
        connection.close();
        return 1;   
    }
}
#include <qpid/messaging/Connection.h>
#include <qpid/messaging/Message.h>
#include <qpid/messaging/Receiver.h>
#include <qpid/messaging/Session.h>
#include <sys/time.h> // For gettimeofday()
#include <iostream>

using namespace std;
using namespace qpid::messaging;

unsigned long currentTimeMillis() {
	struct timeval curTime;
	gettimeofday(&curTime, NULL);
	return (curTime.tv_usec + curTime.tv_sec * 1000000ul)/1000;
}

int kbhit(void)
{
	struct timeval tv;  fd_set
	read_fd;  /* Do not wait at all, not even a microsecond */
	tv.tv_sec=0;
	tv.tv_usec=0;  /* Must be done first to initialize read_fd */
	FD_ZERO(&read_fd);  /* Makes select() ask if input is ready: 0 is the file descriptor for stdin      */
	FD_SET(0,&read_fd);  /* The first parameter is the number of the largest file descriptor to check + 1. */
    if (select(1, &read_fd, NULL, /*No writes*/ NULL, /*No exceptions*/&tv) == -1) return 0; /* An error occured */

	/* read_fd now holds a bit map of files that are readable. We test the entry for the standard input (file 0). */
	if (FD_ISSET(0,&read_fd)) return 1;  /* no characters were pending */
	return 0;
} 

int main(int argc, char** argv) {
    string broker = "localhost:5672";
    /*string address = "perftest; {create: receiver, node: {x-declare: {arguments: {'qpid.policy_type': ring, 'qpid.max_size': 500000000}}, x-bindings: [{exchange: 'amq.match', queue: 'perftest', key: 'data1', arguments: {x-match: all, data-service: amqp-delivery, item-owner: fadams}}]}}";*/

    string address = "perftest; {create: receiver, node: {x-declare: {arguments: {'qpid.policy_type': ring, 'qpid.max_size': 100000000}}}}";
    //string connectionOptions = "{reconnect: true}";
    string connectionOptions = "{reconnect: true, tcp-nodelay: true}";

	int count = 0;
    
    Connection connection(broker, connectionOptions);
    try {
        connection.open();
        Session session = connection.createSession();
        Receiver receiver = session.createReceiver(address);
cout << "default capacity = " << receiver.getCapacity() << endl;
		receiver.setCapacity(500); // Enable receiver prefetch

		unsigned long startTime = currentTimeMillis();
		unsigned long ackTime = startTime;

		while (true) {
			Message message;
			if (receiver.fetch(message, Duration::SECOND * 1)) {
				//const char* buffer = message.getContentPtr();
				//cout << "message count = " << count << ", length = " << message.getContentSize() << endl;

				unsigned long curTime = currentTimeMillis();
				if ((curTime - ackTime) > 1000) {
					cout << "auto acknowledged message #" << count << endl;
					session.acknowledge();
					ackTime = curTime;
				}
				count++;
			} else {
				unsigned long curTime = currentTimeMillis();
				if ((curTime - ackTime) > 1000) {
					cout << "timeout auto acknowledged message #" << count << endl;
					session.acknowledge();
					ackTime = curTime;
				}

				unsigned long finishTime = currentTimeMillis();
				cout << "Elapsed time = " << (finishTime - startTime) << ", messages/second = " << count*1000.0f/(finishTime - startTime) << endl;
				startTime = finishTime;
				count = 0;
			}			

			if (kbhit()) break;
		}
		
		session.acknowledge();    
        connection.close();
        return 0;
    } catch(const exception& error) {
        cerr << "ItemConsumer Exception: " << error.what() << endl;
        connection.close();
        return 1;   
    }
}

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to