D'oh idiot!! I forgot to call queue.refresh() after the purge method, so I was looking at the previous flowStopped value oops :-)

So, calling purge does indeed reset flowStopped to false so I can purge a few messages at a time, but my other comments below still hold and this approach is a bit of a "chatty" way to reduce the number of enqueued messages to below the lower flow control limit to avoid the producer exception.

Any thoughts?

Fraser Adams wrote:
As an update to this my code now goes:

                           QmfData arguments = new QmfData();
                           arguments.setValue("type", "binding");
                           arguments.setValue("name", bindingIdentifier);
                           _broker.invokeMethod("delete", arguments);

                           arguments = new QmfData();
                           arguments.setValue("request", 0);
                           queue.invokeMethod("purge", arguments);

So purging the queue after deleting the binding prevents the exception, which fits in nicely with Ted's comments on the flow control stuff. However there is still some weird unaccounted for stuff.

I've tried getting the flowStopped property and as it happens it appears set to true before and after I purge the queue. I guess that I'd have expected it to be set back to false after purging.

It's also a bit unfortunate to purge all of the messages, but I can't think of a systematic way to remove "just enough" messages such that the flow-resume threshold is crossed downward.

I can get the byteDepth and msgDepth but there's no way to find the number of messages to remove. It's impossible to accurately know the maximum queue size. One can guess - if there's no size set in the queue arguments it's "probably" the default queue limit of 104857600, but there's no way of actually checking that value on the broker, so if someone has changed the default then it's impossible to tell :-(

Also as the flowStopped doesn't seem to get reset by purging I don't think that I can even delete a guess, check for flowStopped then try again.

Any thoughts?

It's possibly slightly moot as I've just unbound so messages are spilling on the floor now, but it's a bit of a shame to purge the messages that have actually been delivered.


Also for queues bound to the default direct exchange I can't think of any protection for the producer other than deleting the queue because I can't unbind from that. Any good ideas?

Perhaps I'm just slightly twisted :-) it's fun for sure finding out exactly what the art of the possible is with QMF.
Frase


Fraser Adams wrote:
Thanks for your response Ted,
I'll have a play and see if I can get further. TBH I was planning on invoking the queue purge method as an experiment. I think what your response it telling me is that there's a slightly more scientific approach.

This is really useful info I'm afraid that I don't really know the flow control stuff that well so this is a useful learning experience. I'll Javadoc my Fuse with any interesting findings.

As well as being a fairly useful little tool part of the motivation for writing QpidFuse is to act as a useful demo of the QMF2 stuff, so ironically it's actually quite useful to find issues as we can then illustrate solutions. I've often been accused of being a cup half full sort of person :-)

Thanks again,
Frase


Ted Ross wrote:
On second thought, this is perhaps not a bug. I can't think of a way that the broker could be modified to "fix" this.

To solve your problem, you might try setting the flow-stop threshold significantly higher than the even threshold so that your fuse program can delete the binding before the producer gets flow-stopped.

Also, the queue has a boolean statistic called "flowStopped" that indicates whether it is applying back-pressure to producers. You can check this value after deleting the binding and if True, either delete the queue or remove enough messages such that the flow-resume threshold is crossed downward.

-Ted


On 12/07/2011 03:04 PM, Ted Ross wrote:
Frase,

I think you've uncovered a bug in the broker.

If a congested queue causes a flow-control stop (i.e. the broker withholds acks to the producer until the queue reduces in size), unbinding the queue from the exchange will not cause a flow-resume like deleting the queue will. This is why your producer is hanging. The messages it produced into the queue above the flow-stop threshold have not been acknowledged.

-Ted

On 12/04/2011 03:25 PM, Fraser Adams wrote:
Hi all,
I'm writing a little application called QueueFuse which is a QMF2 based application.

The idea is that it listens for the queueThresholdExceeded Event and if one occurs it recovers the name of the queue that caused the Event and "blows a fuse" to that queue.

I've got a couple of options when this occurs. My first option is to use the queue name to trigger a QMF2 queue delete that goes something like this:

QmfData arguments = new QmfData();
arguments.setValue("type", "queue");
arguments.setValue("name", queueName);

try
{
   _broker.invokeMethod("delete", arguments);
}
catch (QmfException e)
{
   System.out.println(e.getMessage());
}


This works really well and is kind of a QMF2 version of https://issues.apache.org/jira/browse/QPID-3247, thought it's obviously quite brutal.


I've also tried a slightly more subtle approach of removing bindings to the offending queue by recovering the binding referencing the queue that caused the event and dereferencing the exchange from the binding to the the exchange name.
The unbind call is as follows:


String bindingIdentifier = exchangeName + "/" + queueName + "/" + bindingKey;

QmfData arguments = new QmfData();
arguments.setValue("type", "binding");
arguments.setValue("name", bindingIdentifier);

try
{
  _broker.invokeMethod("delete", arguments);
}
catch (QmfException e)
{
  System.out.println(e.getMessage());
}


This *appears* to work initially and the producer carries on producing for much longer than had I not triggered the binding delete, however eventually the producer hangs with:

ItemProducer: exception: Exception when sending message
javax.jms.JMSException: Exception when sending message
at org.apache.qpid.client.BasicMessageProducer_0_10.sendMessage(BasicMessageProducer_0_10.java:240) at org.apache.qpid.client.BasicMessageProducer.sendImpl(BasicMessageProducer.java:501) at org.apache.qpid.client.BasicMessageProducer.sendImpl(BasicMessageProducer.java:456) at org.apache.qpid.client.BasicMessageProducer.send(BasicMessageProducer.java:283)
   at ItemProducer.<init>(ItemProducer.java:58)
   at ItemProducer.main(ItemProducer.java:130)
Caused by: org.apache.qpid.transport.SessionException: timed out waiting for completion
   at org.apache.qpid.transport.Session.invoke(Session.java:688)
   at org.apache.qpid.transport.Session.invoke(Session.java:559)
at org.apache.qpid.transport.SessionInvoker.messageTransfer(SessionInvoker.java:96) at org.apache.qpid.client.BasicMessageProducer_0_10.sendMessage(BasicMessageProducer_0_10.java:226)
   ... 5 more


As I say if I delete the queue the producer carries on ad infinitum (though clearly the messages are falling on the floor), but I can't see why I should get the exception above by dynamically deleting the binding.

Another thing I've noticed is that if the consumer isn't *too* much slower than the producer, but slow enough to trigger the Event the unbind works OK, but if the consumer is really slow (or non-existent) I get the exception above.

I've tried using a sleep to deliberately slow down a consumer and I did reach a point where the producer would hang for a while then eventually carry on, but if I then slowed the consumer down further then I get the exception.


Does anyone have any idea what should cause this exception and why it should occur when I unbind a slow consumer from a fast producer.

Any neat thoughts for resolving this?

As a slight aside I can't unbind things bound to the default direct exchange (I think that's illegal in AMQP) so is the only way to protect producers from slow consumers bound to that exchange to delete the queue? (obviously if the queue was a ring queue that would work, but that's not what I'm trying to figure out :-) ).

Cheers,
Frase








---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]




---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]




---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]




---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to