Hi again Ted,
Actually there's one other thing that you may be able to help with (since you clearly know about flow-control).

There's a little side project I'm working on when I get bored with the QMF stuff where I'm looking at differences between the C++ qpid::client and qpid::messaging APIs

So it's clear that the Qpid team are keen to deprecate the qpid::client API in favour of qpid::messaging, which on face value seems really sensible given that qpid::messaging is much simpler, abstracts away from the AMQP version and allows Address strings to be shared (easy to test something in Java and run it live on C++).

The issue that I have with this strategy relates to performance. So for example I can use qpid-perftest to give the broker "a good hard kicking".

Now I've somewhat pulled qpid-perftest apart to distil the essence with the aim of doing some direct comparisons. My problem is that I'm finding it really hard to optimise qpid::messaging to the same extent as qpid::client has been for qpid-perftest. My view is that it's not great to deprecate ::client until it can be proved that ::messaging has the same (or better) performance - does that seem a fair observation?


I think that some key things are that "flat out" qpid-perftest uses unreliable messaging, for which I can use link: {reliability: unreliable} in qpid::messaging.

qpid-perftest also seems to have flow control set to the number of iterations, so I've experimented with producer/consumer setCapacity() .

I've been able to tweak the capacity such that I get roughly the same performance as the qpid::client version but I did it through experimentation, which is a bit unsatisfactory.

My producer capacity was 4000 and receiver capacity 6000 with a number of iterations 1000000 (setting receiver capacity to 1000000 didn't seem as good though at face value that should have been the same as settings.flowControl = FlowControl::messageCredit(iterations) in ::client unless I've misunderstood things).


It seems to me that differences between messaging/client performance are almost certainly due to interesting subtleties with capacity/flow control but this is likely to require some proper detailed understanding of the interactions (someone way better than me!!).

One interesting other observation I made was for roughly the same message rate and my broker maxing at 95% CPU I found my ::client based perftest using ~50% CPU (I'm using a 2 core box) but my ::messaging based perftest uses ~77% CPU.

As you seem to know something about flow control voodoo I wonder if you've got any thoughts. My messaging producer does very little on it's critical path.

           connection.open();
           Session session = connection.createSession();
           Sender sender = session.createSender(address);
           sender.setCapacity(4000);

           int ITEM_SIZE = 900;
           char buffer[ITEM_SIZE];
           Message message(buffer, ITEM_SIZE);

           AbsTime start = now();
           for (size_t i = 0; i < iterations; i++) {
               sender.send(message);
           }
           session.sync();
           AbsTime end=now();
           double time=secs(start,end);

           cout << "pub tps = " << iterations/time << endl;
connection.close();

Similarly the consumer doesn't do much either.
           connection.open();
           Session session = connection.createSession();
           Receiver receiver = session.createReceiver(address);
           receiver.setCapacity(6000); // Enable receiver prefetch

           unsigned int count = 0;
           unsigned int unackedCount = 0;
           Message message;

           AbsTime start = now();
           while (true) {
if (receiver.get(message, qpid::messaging::Duration::SECOND)) {
                   count++;
               } else {
cout << "timeout auto acknowledged message #" << count << endl;
                   session.acknowledge();

                   AbsTime end = now();
                   double time=secs(start, end);
cout << "sub tps = " << count/time << endl;
                   count = 0;
                   unackedCount = 0;
                   start=now();
} } session.acknowledge(); connection.close();








Ted Ross wrote:
Frase,

I think you've uncovered a bug in the broker.

If a congested queue causes a flow-control stop (i.e. the broker withholds acks to the producer until the queue reduces in size), unbinding the queue from the exchange will not cause a flow-resume like deleting the queue will. This is why your producer is hanging. The messages it produced into the queue above the flow-stop threshold have not been acknowledged.

-Ted

On 12/04/2011 03:25 PM, Fraser Adams wrote:
Hi all,
I'm writing a little application called QueueFuse which is a QMF2 based application.

The idea is that it listens for the queueThresholdExceeded Event and if one occurs it recovers the name of the queue that caused the Event and "blows a fuse" to that queue.

I've got a couple of options when this occurs. My first option is to use the queue name to trigger a QMF2 queue delete that goes something like this:

QmfData arguments = new QmfData();
arguments.setValue("type", "queue");
arguments.setValue("name", queueName);

try
{
   _broker.invokeMethod("delete", arguments);
}
catch (QmfException e)
{
   System.out.println(e.getMessage());
}


This works really well and is kind of a QMF2 version of https://issues.apache.org/jira/browse/QPID-3247, thought it's obviously quite brutal.


I've also tried a slightly more subtle approach of removing bindings to the offending queue by recovering the binding referencing the queue that caused the event and dereferencing the exchange from the binding to the the exchange name.
The unbind call is as follows:


String bindingIdentifier = exchangeName + "/" + queueName + "/" + bindingKey;

QmfData arguments = new QmfData();
arguments.setValue("type", "binding");
arguments.setValue("name", bindingIdentifier);

try
{
  _broker.invokeMethod("delete", arguments);
}
catch (QmfException e)
{
  System.out.println(e.getMessage());
}


This *appears* to work initially and the producer carries on producing for much longer than had I not triggered the binding delete, however eventually the producer hangs with:

ItemProducer: exception: Exception when sending message
javax.jms.JMSException: Exception when sending message
at org.apache.qpid.client.BasicMessageProducer_0_10.sendMessage(BasicMessageProducer_0_10.java:240) at org.apache.qpid.client.BasicMessageProducer.sendImpl(BasicMessageProducer.java:501) at org.apache.qpid.client.BasicMessageProducer.sendImpl(BasicMessageProducer.java:456) at org.apache.qpid.client.BasicMessageProducer.send(BasicMessageProducer.java:283)
   at ItemProducer.<init>(ItemProducer.java:58)
   at ItemProducer.main(ItemProducer.java:130)
Caused by: org.apache.qpid.transport.SessionException: timed out waiting for completion
   at org.apache.qpid.transport.Session.invoke(Session.java:688)
   at org.apache.qpid.transport.Session.invoke(Session.java:559)
at org.apache.qpid.transport.SessionInvoker.messageTransfer(SessionInvoker.java:96) at org.apache.qpid.client.BasicMessageProducer_0_10.sendMessage(BasicMessageProducer_0_10.java:226)
   ... 5 more


As I say if I delete the queue the producer carries on ad infinitum (though clearly the messages are falling on the floor), but I can't see why I should get the exception above by dynamically deleting the binding.

Another thing I've noticed is that if the consumer isn't *too* much slower than the producer, but slow enough to trigger the Event the unbind works OK, but if the consumer is really slow (or non-existent) I get the exception above.

I've tried using a sleep to deliberately slow down a consumer and I did reach a point where the producer would hang for a while then eventually carry on, but if I then slowed the consumer down further then I get the exception.


Does anyone have any idea what should cause this exception and why it should occur when I unbind a slow consumer from a fast producer.

Any neat thoughts for resolving this?

As a slight aside I can't unbind things bound to the default direct exchange (I think that's illegal in AMQP) so is the only way to protect producers from slow consumers bound to that exchange to delete the queue? (obviously if the queue was a ring queue that would work, but that's not what I'm trying to figure out :-) ).

Cheers,
Frase








---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]




---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to