[ 
https://issues.apache.org/jira/browse/QPID-7634?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Keith Wall updated QPID-7634:
-----------------------------
    Description: 
When AMQP 1.0 client sends the Flow command with drain=true and queue is empty 
the Broker does not respond if prefetch is 0.

The following snippet reproduces the issue
{code}
Connection connection = factory.createConnection(username, password);
connection.start();

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer messageConsumer = session.createConsumer(queue);

TextMessage receivedMessage = (TextMessage) messageConsumer.receiveNoWait(); // 
<-- times out
{code}

The broker logs
{noformat}
2017-01-24 11:05:56,471 DEBUG [IO-/127.0.0.1:51108] (o.a.q.s.p.frame) - 
RECV[/127.0.0.1:51108|1] : 
Attach{name=qpid-jms:receiver:ID:20aa7189-d399-472d-a8ce-aa8b7a3bef75:1:1:1:queue,handle=1,role=receiver,sndSettleMode=unsettled,rcvSettleMode=first,source=Source{address=queue,durable=none,expiryPolicy=link-detach,timeout=0,dynamic=false,defaultOutcome=Modified{deliveryFailed=true},outcomes=[amqp:accepted:list,
 amqp:rejected:list, amqp:released:list, 
amqp:modified:list],capabilities=[queue, global]},target=Target{}}
2017-01-24 11:05:56,474 DEBUG [VirtualHostNode-default-Config] 
(o.a.q.s.c.u.TaskExecutorImpl) - Performing Task['add consumer' on 'queue' with 
arguments 
'target=ConsumerTarget_1_0[linkSession=[con:7(admin@/127.0.0.1:51108/default)/ch:1]
 ], 
consumerName=qpid-jms:receiver:ID:20aa7189-d399-472d-a8ce-aa8b7a3bef75:1:1:1:queue,
 optionSet=[ACQUIRES, SEES_REQUEUES]']
2017-01-24 11:05:56,475 DEBUG [VirtualHostNode-default-Config] 
(o.a.q.s.m.AbstractConfiguredObject) - authorise returned DEFER
2017-01-24 11:05:56,475 DEBUG [VirtualHostNode-default-Config] 
(o.a.q.s.m.AbstractConfiguredObject) - authorise returned DEFER, returing 
default: ALLOWED
2017-01-24 11:05:56,475 DEBUG [VirtualHostNode-default-Config] 
(o.a.q.s.c.u.TaskExecutorImpl) - Performing Task['open' on 
'Consumer[id=164b1b27-176a-42a3-a5dc-613d97bc58b2, 
name=7|1|qpid-jms:receiver:ID:20aa7189-d399-472d-a8ce-aa8b7a3bef75:1:1:1:queue, 
type=Consumer]']
2017-01-24 11:05:56,475 DEBUG [VirtualHostNode-default-Config] 
(o.a.q.s.c.u.TaskExecutorImpl) - Task['open' on 
'Consumer[id=164b1b27-176a-42a3-a5dc-613d97bc58b2, 
name=7|1|qpid-jms:receiver:ID:20aa7189-d399-472d-a8ce-aa8b7a3bef75:1:1:1:queue, 
type=Consumer]'] performed successfully with result: null
2017-01-24 11:05:56,476 INFO  [VirtualHostNode-default-Config] (q.m.s.create) - 
[con:7(admin@/127.0.0.1:51108/default)/ch:1] [sub:7(vh(/default)/qu(queue)] 
SUB-1001 : Create
2017-01-24 11:05:56,476 DEBUG [VirtualHostNode-default-Config] 
(o.a.q.s.c.u.TaskExecutorImpl) - Task['add consumer' on 'queue' with arguments 
'target=ConsumerTarget_1_0[linkSession=[con:7(admin@/127.0.0.1:51108/default)/ch:1]
 ], 
consumerName=qpid-jms:receiver:ID:20aa7189-d399-472d-a8ce-aa8b7a3bef75:1:1:1:queue,
 optionSet=[ACQUIRES, SEES_REQUEUES]'] performed successfully with result: 
Consumer[id=164b1b27-176a-42a3-a5dc-613d97bc58b2, 
name=7|1|qpid-jms:receiver:ID:20aa7189-d399-472d-a8ce-aa8b7a3bef75:1:1:1:queue, 
type=Consumer]
2017-01-24 11:05:56,477 DEBUG [IO-/127.0.0.1:51108] (o.a.q.s.p.frame) - 
SEND[/127.0.0.1:51108|1] : 
Attach{name=qpid-jms:receiver:ID:20aa7189-d399-472d-a8ce-aa8b7a3bef75:1:1:1:queue,handle=1,role=sender,sndSettleMode=unsettled,rcvSettleMode=first,source=Source{address=queue,durable=none,expiryPolicy=link-detach,timeout=0,dynamic=false,defaultOutcome=Modified{deliveryFailed=true},outcomes=[amqp:accepted:list,
 amqp:rejected:list, amqp:released:list, 
amqp:modified:list],capabilities=[queue]},target=Target{},initialDeliveryCount=0,offeredCapabilities=[SHARED-SUBS],properties={}}
2017-01-24 11:05:56,477 DEBUG [IO-/127.0.0.1:51108] 
(o.a.q.s.t.NonBlockingConnection) - Written 251 bytes
2017-01-24 11:05:56,477 DEBUG [IO-/127.0.0.1:51108] 
(o.a.q.s.t.NonBlockingConnection) - Read 0 byte(s)
2017-01-24 11:05:56,487 DEBUG [IO-/127.0.0.1:51108] 
(o.a.q.s.t.NonBlockingConnection) - Read 34 byte(s)
2017-01-24 11:05:56,487 DEBUG [IO-/127.0.0.1:51108] (o.a.q.s.p.frame) - 
RECV[/127.0.0.1:51108|1] : 
Flow{nextIncomingId=0,incomingWindow=2047,nextOutgoingId=1,outgoingWindow=2147483647,handle=1,deliveryCount=0,linkCredit=1,drain=true}
2017-01-24 11:05:56,487 DEBUG [IO-/127.0.0.1:51108] 
(o.a.q.s.t.NonBlockingConnection) - Read 0 byte(s)
2017-01-24 11:05:58,162 DEBUG [IO-/127.0.0.1:51108] 
(o.a.q.s.t.NonBlockingConnection) - Read 0 byte(s)
...
{noformat}


Relevant part of the spec:

http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-transport-v1.0-os.html#doc-flow-control

bq. If the sender's drain flag is set and there are no available messages, the 
sender MUST advance its delivery-count until link-credit is zero, and send its 
updated flow state to the receiver.



  was:
When AMQP 1.0 client sends the Flow command with drain=true and queue is empty 
the Broker does not respond if prefetch is 0.

The following snippet reproduces the issue
{code}
Connection connection = factory.createConnection(username, password);
connection.start();

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer messageConsumer = session.createConsumer(queue);

TextMessage receivedMessage = (TextMessage) messageConsumer.receiveNoWait(); // 
<-- times out
{code}

The broker logs
{noformat}
2017-01-24 11:05:56,471 DEBUG [IO-/127.0.0.1:51108] (o.a.q.s.p.frame) - 
RECV[/127.0.0.1:51108|1] : 
Attach{name=qpid-jms:receiver:ID:20aa7189-d399-472d-a8ce-aa8b7a3bef75:1:1:1:queue,handle=1,role=receiver,sndSettleMode=unsettled,rcvSettleMode=first,source=Source{address=queue,durable=none,expiryPolicy=link-detach,timeout=0,dynamic=false,defaultOutcome=Modified{deliveryFailed=true},outcomes=[amqp:accepted:list,
 amqp:rejected:list, amqp:released:list, 
amqp:modified:list],capabilities=[queue, global]},target=Target{}}
2017-01-24 11:05:56,474 DEBUG [VirtualHostNode-default-Config] 
(o.a.q.s.c.u.TaskExecutorImpl) - Performing Task['add consumer' on 'queue' with 
arguments 
'target=ConsumerTarget_1_0[linkSession=[con:7(admin@/127.0.0.1:51108/default)/ch:1]
 ], 
consumerName=qpid-jms:receiver:ID:20aa7189-d399-472d-a8ce-aa8b7a3bef75:1:1:1:queue,
 optionSet=[ACQUIRES, SEES_REQUEUES]']
2017-01-24 11:05:56,475 DEBUG [VirtualHostNode-default-Config] 
(o.a.q.s.m.AbstractConfiguredObject) - authorise returned DEFER
2017-01-24 11:05:56,475 DEBUG [VirtualHostNode-default-Config] 
(o.a.q.s.m.AbstractConfiguredObject) - authorise returned DEFER, returing 
default: ALLOWED
2017-01-24 11:05:56,475 DEBUG [VirtualHostNode-default-Config] 
(o.a.q.s.c.u.TaskExecutorImpl) - Performing Task['open' on 
'Consumer[id=164b1b27-176a-42a3-a5dc-613d97bc58b2, 
name=7|1|qpid-jms:receiver:ID:20aa7189-d399-472d-a8ce-aa8b7a3bef75:1:1:1:queue, 
type=Consumer]']
2017-01-24 11:05:56,475 DEBUG [VirtualHostNode-default-Config] 
(o.a.q.s.c.u.TaskExecutorImpl) - Task['open' on 
'Consumer[id=164b1b27-176a-42a3-a5dc-613d97bc58b2, 
name=7|1|qpid-jms:receiver:ID:20aa7189-d399-472d-a8ce-aa8b7a3bef75:1:1:1:queue, 
type=Consumer]'] performed successfully with result: null
2017-01-24 11:05:56,476 INFO  [VirtualHostNode-default-Config] (q.m.s.create) - 
[con:7(admin@/127.0.0.1:51108/default)/ch:1] [sub:7(vh(/default)/qu(queue)] 
SUB-1001 : Create
2017-01-24 11:05:56,476 DEBUG [VirtualHostNode-default-Config] 
(o.a.q.s.c.u.TaskExecutorImpl) - Task['add consumer' on 'queue' with arguments 
'target=ConsumerTarget_1_0[linkSession=[con:7(admin@/127.0.0.1:51108/default)/ch:1]
 ], 
consumerName=qpid-jms:receiver:ID:20aa7189-d399-472d-a8ce-aa8b7a3bef75:1:1:1:queue,
 optionSet=[ACQUIRES, SEES_REQUEUES]'] performed successfully with result: 
Consumer[id=164b1b27-176a-42a3-a5dc-613d97bc58b2, 
name=7|1|qpid-jms:receiver:ID:20aa7189-d399-472d-a8ce-aa8b7a3bef75:1:1:1:queue, 
type=Consumer]
2017-01-24 11:05:56,477 DEBUG [IO-/127.0.0.1:51108] (o.a.q.s.p.frame) - 
SEND[/127.0.0.1:51108|1] : 
Attach{name=qpid-jms:receiver:ID:20aa7189-d399-472d-a8ce-aa8b7a3bef75:1:1:1:queue,handle=1,role=sender,sndSettleMode=unsettled,rcvSettleMode=first,source=Source{address=queue,durable=none,expiryPolicy=link-detach,timeout=0,dynamic=false,defaultOutcome=Modified{deliveryFailed=true},outcomes=[amqp:accepted:list,
 amqp:rejected:list, amqp:released:list, 
amqp:modified:list],capabilities=[queue]},target=Target{},initialDeliveryCount=0,offeredCapabilities=[SHARED-SUBS],properties={}}
2017-01-24 11:05:56,477 DEBUG [IO-/127.0.0.1:51108] 
(o.a.q.s.t.NonBlockingConnection) - Written 251 bytes
2017-01-24 11:05:56,477 DEBUG [IO-/127.0.0.1:51108] 
(o.a.q.s.t.NonBlockingConnection) - Read 0 byte(s)
2017-01-24 11:05:56,487 DEBUG [IO-/127.0.0.1:51108] 
(o.a.q.s.t.NonBlockingConnection) - Read 34 byte(s)
2017-01-24 11:05:56,487 DEBUG [IO-/127.0.0.1:51108] (o.a.q.s.p.frame) - 
RECV[/127.0.0.1:51108|1] : 
Flow{nextIncomingId=0,incomingWindow=2047,nextOutgoingId=1,outgoingWindow=2147483647,handle=1,deliveryCount=0,linkCredit=1,drain=true}
2017-01-24 11:05:56,487 DEBUG [IO-/127.0.0.1:51108] 
(o.a.q.s.t.NonBlockingConnection) - Read 0 byte(s)
2017-01-24 11:05:58,162 DEBUG [IO-/127.0.0.1:51108] 
(o.a.q.s.t.NonBlockingConnection) - Read 0 byte(s)
...
{noformat}


> [Java Broker,amqp 1.0] Broker does not respond to Flow command with 
> drain=true if queue is empty and prefetch is 0
> ------------------------------------------------------------------------------------------------------------------
>
>                 Key: QPID-7634
>                 URL: https://issues.apache.org/jira/browse/QPID-7634
>             Project: Qpid
>          Issue Type: Bug
>          Components: Java Broker
>    Affects Versions: qpid-java-6.1, qpid-java-6.1.1, qpid-java-7.0
>            Reporter: Alex Rudyy
>             Fix For: qpid-java-7.0
>
>
> When AMQP 1.0 client sends the Flow command with drain=true and queue is 
> empty the Broker does not respond if prefetch is 0.
> The following snippet reproduces the issue
> {code}
> Connection connection = factory.createConnection(username, password);
> connection.start();
> Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
> MessageConsumer messageConsumer = session.createConsumer(queue);
> TextMessage receivedMessage = (TextMessage) messageConsumer.receiveNoWait(); 
> // <-- times out
> {code}
> The broker logs
> {noformat}
> 2017-01-24 11:05:56,471 DEBUG [IO-/127.0.0.1:51108] (o.a.q.s.p.frame) - 
> RECV[/127.0.0.1:51108|1] : 
> Attach{name=qpid-jms:receiver:ID:20aa7189-d399-472d-a8ce-aa8b7a3bef75:1:1:1:queue,handle=1,role=receiver,sndSettleMode=unsettled,rcvSettleMode=first,source=Source{address=queue,durable=none,expiryPolicy=link-detach,timeout=0,dynamic=false,defaultOutcome=Modified{deliveryFailed=true},outcomes=[amqp:accepted:list,
>  amqp:rejected:list, amqp:released:list, 
> amqp:modified:list],capabilities=[queue, global]},target=Target{}}
> 2017-01-24 11:05:56,474 DEBUG [VirtualHostNode-default-Config] 
> (o.a.q.s.c.u.TaskExecutorImpl) - Performing Task['add consumer' on 'queue' 
> with arguments 
> 'target=ConsumerTarget_1_0[linkSession=[con:7(admin@/127.0.0.1:51108/default)/ch:1]
>  ], 
> consumerName=qpid-jms:receiver:ID:20aa7189-d399-472d-a8ce-aa8b7a3bef75:1:1:1:queue,
>  optionSet=[ACQUIRES, SEES_REQUEUES]']
> 2017-01-24 11:05:56,475 DEBUG [VirtualHostNode-default-Config] 
> (o.a.q.s.m.AbstractConfiguredObject) - authorise returned DEFER
> 2017-01-24 11:05:56,475 DEBUG [VirtualHostNode-default-Config] 
> (o.a.q.s.m.AbstractConfiguredObject) - authorise returned DEFER, returing 
> default: ALLOWED
> 2017-01-24 11:05:56,475 DEBUG [VirtualHostNode-default-Config] 
> (o.a.q.s.c.u.TaskExecutorImpl) - Performing Task['open' on 
> 'Consumer[id=164b1b27-176a-42a3-a5dc-613d97bc58b2, 
> name=7|1|qpid-jms:receiver:ID:20aa7189-d399-472d-a8ce-aa8b7a3bef75:1:1:1:queue,
>  type=Consumer]']
> 2017-01-24 11:05:56,475 DEBUG [VirtualHostNode-default-Config] 
> (o.a.q.s.c.u.TaskExecutorImpl) - Task['open' on 
> 'Consumer[id=164b1b27-176a-42a3-a5dc-613d97bc58b2, 
> name=7|1|qpid-jms:receiver:ID:20aa7189-d399-472d-a8ce-aa8b7a3bef75:1:1:1:queue,
>  type=Consumer]'] performed successfully with result: null
> 2017-01-24 11:05:56,476 INFO  [VirtualHostNode-default-Config] (q.m.s.create) 
> - [con:7(admin@/127.0.0.1:51108/default)/ch:1] [sub:7(vh(/default)/qu(queue)] 
> SUB-1001 : Create
> 2017-01-24 11:05:56,476 DEBUG [VirtualHostNode-default-Config] 
> (o.a.q.s.c.u.TaskExecutorImpl) - Task['add consumer' on 'queue' with 
> arguments 
> 'target=ConsumerTarget_1_0[linkSession=[con:7(admin@/127.0.0.1:51108/default)/ch:1]
>  ], 
> consumerName=qpid-jms:receiver:ID:20aa7189-d399-472d-a8ce-aa8b7a3bef75:1:1:1:queue,
>  optionSet=[ACQUIRES, SEES_REQUEUES]'] performed successfully with result: 
> Consumer[id=164b1b27-176a-42a3-a5dc-613d97bc58b2, 
> name=7|1|qpid-jms:receiver:ID:20aa7189-d399-472d-a8ce-aa8b7a3bef75:1:1:1:queue,
>  type=Consumer]
> 2017-01-24 11:05:56,477 DEBUG [IO-/127.0.0.1:51108] (o.a.q.s.p.frame) - 
> SEND[/127.0.0.1:51108|1] : 
> Attach{name=qpid-jms:receiver:ID:20aa7189-d399-472d-a8ce-aa8b7a3bef75:1:1:1:queue,handle=1,role=sender,sndSettleMode=unsettled,rcvSettleMode=first,source=Source{address=queue,durable=none,expiryPolicy=link-detach,timeout=0,dynamic=false,defaultOutcome=Modified{deliveryFailed=true},outcomes=[amqp:accepted:list,
>  amqp:rejected:list, amqp:released:list, 
> amqp:modified:list],capabilities=[queue]},target=Target{},initialDeliveryCount=0,offeredCapabilities=[SHARED-SUBS],properties={}}
> 2017-01-24 11:05:56,477 DEBUG [IO-/127.0.0.1:51108] 
> (o.a.q.s.t.NonBlockingConnection) - Written 251 bytes
> 2017-01-24 11:05:56,477 DEBUG [IO-/127.0.0.1:51108] 
> (o.a.q.s.t.NonBlockingConnection) - Read 0 byte(s)
> 2017-01-24 11:05:56,487 DEBUG [IO-/127.0.0.1:51108] 
> (o.a.q.s.t.NonBlockingConnection) - Read 34 byte(s)
> 2017-01-24 11:05:56,487 DEBUG [IO-/127.0.0.1:51108] (o.a.q.s.p.frame) - 
> RECV[/127.0.0.1:51108|1] : 
> Flow{nextIncomingId=0,incomingWindow=2047,nextOutgoingId=1,outgoingWindow=2147483647,handle=1,deliveryCount=0,linkCredit=1,drain=true}
> 2017-01-24 11:05:56,487 DEBUG [IO-/127.0.0.1:51108] 
> (o.a.q.s.t.NonBlockingConnection) - Read 0 byte(s)
> 2017-01-24 11:05:58,162 DEBUG [IO-/127.0.0.1:51108] 
> (o.a.q.s.t.NonBlockingConnection) - Read 0 byte(s)
> ...
> {noformat}
> Relevant part of the spec:
> http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-transport-v1.0-os.html#doc-flow-control
> bq. If the sender's drain flag is set and there are no available messages, 
> the sender MUST advance its delivery-count until link-credit is zero, and 
> send its updated flow state to the receiver.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@qpid.apache.org
For additional commands, e-mail: dev-h...@qpid.apache.org

Reply via email to