[
https://issues.apache.org/jira/browse/QPID-7634?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Keith Wall updated QPID-7634:
-----------------------------
Description:
When AMQP 1.0 client sends the Flow command with drain=true and queue is empty
the Broker does not respond if prefetch is 0.
The following snippet reproduces the issue
{code}
Connection connection = factory.createConnection(username, password);
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer messageConsumer = session.createConsumer(queue);
TextMessage receivedMessage = (TextMessage) messageConsumer.receiveNoWait(); //
<-- times out
{code}
The broker logs
{noformat}
2017-01-24 11:05:56,471 DEBUG [IO-/127.0.0.1:51108] (o.a.q.s.p.frame) -
RECV[/127.0.0.1:51108|1] :
Attach{name=qpid-jms:receiver:ID:20aa7189-d399-472d-a8ce-aa8b7a3bef75:1:1:1:queue,handle=1,role=receiver,sndSettleMode=unsettled,rcvSettleMode=first,source=Source{address=queue,durable=none,expiryPolicy=link-detach,timeout=0,dynamic=false,defaultOutcome=Modified{deliveryFailed=true},outcomes=[amqp:accepted:list,
amqp:rejected:list, amqp:released:list,
amqp:modified:list],capabilities=[queue, global]},target=Target{}}
2017-01-24 11:05:56,474 DEBUG [VirtualHostNode-default-Config]
(o.a.q.s.c.u.TaskExecutorImpl) - Performing Task['add consumer' on 'queue' with
arguments
'target=ConsumerTarget_1_0[linkSession=[con:7(admin@/127.0.0.1:51108/default)/ch:1]
],
consumerName=qpid-jms:receiver:ID:20aa7189-d399-472d-a8ce-aa8b7a3bef75:1:1:1:queue,
optionSet=[ACQUIRES, SEES_REQUEUES]']
2017-01-24 11:05:56,475 DEBUG [VirtualHostNode-default-Config]
(o.a.q.s.m.AbstractConfiguredObject) - authorise returned DEFER
2017-01-24 11:05:56,475 DEBUG [VirtualHostNode-default-Config]
(o.a.q.s.m.AbstractConfiguredObject) - authorise returned DEFER, returing
default: ALLOWED
2017-01-24 11:05:56,475 DEBUG [VirtualHostNode-default-Config]
(o.a.q.s.c.u.TaskExecutorImpl) - Performing Task['open' on
'Consumer[id=164b1b27-176a-42a3-a5dc-613d97bc58b2,
name=7|1|qpid-jms:receiver:ID:20aa7189-d399-472d-a8ce-aa8b7a3bef75:1:1:1:queue,
type=Consumer]']
2017-01-24 11:05:56,475 DEBUG [VirtualHostNode-default-Config]
(o.a.q.s.c.u.TaskExecutorImpl) - Task['open' on
'Consumer[id=164b1b27-176a-42a3-a5dc-613d97bc58b2,
name=7|1|qpid-jms:receiver:ID:20aa7189-d399-472d-a8ce-aa8b7a3bef75:1:1:1:queue,
type=Consumer]'] performed successfully with result: null
2017-01-24 11:05:56,476 INFO [VirtualHostNode-default-Config] (q.m.s.create) -
[con:7(admin@/127.0.0.1:51108/default)/ch:1] [sub:7(vh(/default)/qu(queue)]
SUB-1001 : Create
2017-01-24 11:05:56,476 DEBUG [VirtualHostNode-default-Config]
(o.a.q.s.c.u.TaskExecutorImpl) - Task['add consumer' on 'queue' with arguments
'target=ConsumerTarget_1_0[linkSession=[con:7(admin@/127.0.0.1:51108/default)/ch:1]
],
consumerName=qpid-jms:receiver:ID:20aa7189-d399-472d-a8ce-aa8b7a3bef75:1:1:1:queue,
optionSet=[ACQUIRES, SEES_REQUEUES]'] performed successfully with result:
Consumer[id=164b1b27-176a-42a3-a5dc-613d97bc58b2,
name=7|1|qpid-jms:receiver:ID:20aa7189-d399-472d-a8ce-aa8b7a3bef75:1:1:1:queue,
type=Consumer]
2017-01-24 11:05:56,477 DEBUG [IO-/127.0.0.1:51108] (o.a.q.s.p.frame) -
SEND[/127.0.0.1:51108|1] :
Attach{name=qpid-jms:receiver:ID:20aa7189-d399-472d-a8ce-aa8b7a3bef75:1:1:1:queue,handle=1,role=sender,sndSettleMode=unsettled,rcvSettleMode=first,source=Source{address=queue,durable=none,expiryPolicy=link-detach,timeout=0,dynamic=false,defaultOutcome=Modified{deliveryFailed=true},outcomes=[amqp:accepted:list,
amqp:rejected:list, amqp:released:list,
amqp:modified:list],capabilities=[queue]},target=Target{},initialDeliveryCount=0,offeredCapabilities=[SHARED-SUBS],properties={}}
2017-01-24 11:05:56,477 DEBUG [IO-/127.0.0.1:51108]
(o.a.q.s.t.NonBlockingConnection) - Written 251 bytes
2017-01-24 11:05:56,477 DEBUG [IO-/127.0.0.1:51108]
(o.a.q.s.t.NonBlockingConnection) - Read 0 byte(s)
2017-01-24 11:05:56,487 DEBUG [IO-/127.0.0.1:51108]
(o.a.q.s.t.NonBlockingConnection) - Read 34 byte(s)
2017-01-24 11:05:56,487 DEBUG [IO-/127.0.0.1:51108] (o.a.q.s.p.frame) -
RECV[/127.0.0.1:51108|1] :
Flow{nextIncomingId=0,incomingWindow=2047,nextOutgoingId=1,outgoingWindow=2147483647,handle=1,deliveryCount=0,linkCredit=1,drain=true}
2017-01-24 11:05:56,487 DEBUG [IO-/127.0.0.1:51108]
(o.a.q.s.t.NonBlockingConnection) - Read 0 byte(s)
2017-01-24 11:05:58,162 DEBUG [IO-/127.0.0.1:51108]
(o.a.q.s.t.NonBlockingConnection) - Read 0 byte(s)
...
{noformat}
Relevant part of the spec:
http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-transport-v1.0-os.html#doc-flow-control
bq. If the sender's drain flag is set and there are no available messages, the
sender MUST advance its delivery-count until link-credit is zero, and send its
updated flow state to the receiver.
was:
When AMQP 1.0 client sends the Flow command with drain=true and queue is empty
the Broker does not respond if prefetch is 0.
The following snippet reproduces the issue
{code}
Connection connection = factory.createConnection(username, password);
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer messageConsumer = session.createConsumer(queue);
TextMessage receivedMessage = (TextMessage) messageConsumer.receiveNoWait(); //
<-- times out
{code}
The broker logs
{noformat}
2017-01-24 11:05:56,471 DEBUG [IO-/127.0.0.1:51108] (o.a.q.s.p.frame) -
RECV[/127.0.0.1:51108|1] :
Attach{name=qpid-jms:receiver:ID:20aa7189-d399-472d-a8ce-aa8b7a3bef75:1:1:1:queue,handle=1,role=receiver,sndSettleMode=unsettled,rcvSettleMode=first,source=Source{address=queue,durable=none,expiryPolicy=link-detach,timeout=0,dynamic=false,defaultOutcome=Modified{deliveryFailed=true},outcomes=[amqp:accepted:list,
amqp:rejected:list, amqp:released:list,
amqp:modified:list],capabilities=[queue, global]},target=Target{}}
2017-01-24 11:05:56,474 DEBUG [VirtualHostNode-default-Config]
(o.a.q.s.c.u.TaskExecutorImpl) - Performing Task['add consumer' on 'queue' with
arguments
'target=ConsumerTarget_1_0[linkSession=[con:7(admin@/127.0.0.1:51108/default)/ch:1]
],
consumerName=qpid-jms:receiver:ID:20aa7189-d399-472d-a8ce-aa8b7a3bef75:1:1:1:queue,
optionSet=[ACQUIRES, SEES_REQUEUES]']
2017-01-24 11:05:56,475 DEBUG [VirtualHostNode-default-Config]
(o.a.q.s.m.AbstractConfiguredObject) - authorise returned DEFER
2017-01-24 11:05:56,475 DEBUG [VirtualHostNode-default-Config]
(o.a.q.s.m.AbstractConfiguredObject) - authorise returned DEFER, returing
default: ALLOWED
2017-01-24 11:05:56,475 DEBUG [VirtualHostNode-default-Config]
(o.a.q.s.c.u.TaskExecutorImpl) - Performing Task['open' on
'Consumer[id=164b1b27-176a-42a3-a5dc-613d97bc58b2,
name=7|1|qpid-jms:receiver:ID:20aa7189-d399-472d-a8ce-aa8b7a3bef75:1:1:1:queue,
type=Consumer]']
2017-01-24 11:05:56,475 DEBUG [VirtualHostNode-default-Config]
(o.a.q.s.c.u.TaskExecutorImpl) - Task['open' on
'Consumer[id=164b1b27-176a-42a3-a5dc-613d97bc58b2,
name=7|1|qpid-jms:receiver:ID:20aa7189-d399-472d-a8ce-aa8b7a3bef75:1:1:1:queue,
type=Consumer]'] performed successfully with result: null
2017-01-24 11:05:56,476 INFO [VirtualHostNode-default-Config] (q.m.s.create) -
[con:7(admin@/127.0.0.1:51108/default)/ch:1] [sub:7(vh(/default)/qu(queue)]
SUB-1001 : Create
2017-01-24 11:05:56,476 DEBUG [VirtualHostNode-default-Config]
(o.a.q.s.c.u.TaskExecutorImpl) - Task['add consumer' on 'queue' with arguments
'target=ConsumerTarget_1_0[linkSession=[con:7(admin@/127.0.0.1:51108/default)/ch:1]
],
consumerName=qpid-jms:receiver:ID:20aa7189-d399-472d-a8ce-aa8b7a3bef75:1:1:1:queue,
optionSet=[ACQUIRES, SEES_REQUEUES]'] performed successfully with result:
Consumer[id=164b1b27-176a-42a3-a5dc-613d97bc58b2,
name=7|1|qpid-jms:receiver:ID:20aa7189-d399-472d-a8ce-aa8b7a3bef75:1:1:1:queue,
type=Consumer]
2017-01-24 11:05:56,477 DEBUG [IO-/127.0.0.1:51108] (o.a.q.s.p.frame) -
SEND[/127.0.0.1:51108|1] :
Attach{name=qpid-jms:receiver:ID:20aa7189-d399-472d-a8ce-aa8b7a3bef75:1:1:1:queue,handle=1,role=sender,sndSettleMode=unsettled,rcvSettleMode=first,source=Source{address=queue,durable=none,expiryPolicy=link-detach,timeout=0,dynamic=false,defaultOutcome=Modified{deliveryFailed=true},outcomes=[amqp:accepted:list,
amqp:rejected:list, amqp:released:list,
amqp:modified:list],capabilities=[queue]},target=Target{},initialDeliveryCount=0,offeredCapabilities=[SHARED-SUBS],properties={}}
2017-01-24 11:05:56,477 DEBUG [IO-/127.0.0.1:51108]
(o.a.q.s.t.NonBlockingConnection) - Written 251 bytes
2017-01-24 11:05:56,477 DEBUG [IO-/127.0.0.1:51108]
(o.a.q.s.t.NonBlockingConnection) - Read 0 byte(s)
2017-01-24 11:05:56,487 DEBUG [IO-/127.0.0.1:51108]
(o.a.q.s.t.NonBlockingConnection) - Read 34 byte(s)
2017-01-24 11:05:56,487 DEBUG [IO-/127.0.0.1:51108] (o.a.q.s.p.frame) -
RECV[/127.0.0.1:51108|1] :
Flow{nextIncomingId=0,incomingWindow=2047,nextOutgoingId=1,outgoingWindow=2147483647,handle=1,deliveryCount=0,linkCredit=1,drain=true}
2017-01-24 11:05:56,487 DEBUG [IO-/127.0.0.1:51108]
(o.a.q.s.t.NonBlockingConnection) - Read 0 byte(s)
2017-01-24 11:05:58,162 DEBUG [IO-/127.0.0.1:51108]
(o.a.q.s.t.NonBlockingConnection) - Read 0 byte(s)
...
{noformat}
> [Java Broker,amqp 1.0] Broker does not respond to Flow command with
> drain=true if queue is empty and prefetch is 0
> ------------------------------------------------------------------------------------------------------------------
>
> Key: QPID-7634
> URL: https://issues.apache.org/jira/browse/QPID-7634
> Project: Qpid
> Issue Type: Bug
> Components: Java Broker
> Affects Versions: qpid-java-6.1, qpid-java-6.1.1, qpid-java-7.0
> Reporter: Alex Rudyy
> Fix For: qpid-java-7.0
>
>
> When AMQP 1.0 client sends the Flow command with drain=true and queue is
> empty the Broker does not respond if prefetch is 0.
> The following snippet reproduces the issue
> {code}
> Connection connection = factory.createConnection(username, password);
> connection.start();
> Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
> MessageConsumer messageConsumer = session.createConsumer(queue);
> TextMessage receivedMessage = (TextMessage) messageConsumer.receiveNoWait();
> // <-- times out
> {code}
> The broker logs
> {noformat}
> 2017-01-24 11:05:56,471 DEBUG [IO-/127.0.0.1:51108] (o.a.q.s.p.frame) -
> RECV[/127.0.0.1:51108|1] :
> Attach{name=qpid-jms:receiver:ID:20aa7189-d399-472d-a8ce-aa8b7a3bef75:1:1:1:queue,handle=1,role=receiver,sndSettleMode=unsettled,rcvSettleMode=first,source=Source{address=queue,durable=none,expiryPolicy=link-detach,timeout=0,dynamic=false,defaultOutcome=Modified{deliveryFailed=true},outcomes=[amqp:accepted:list,
> amqp:rejected:list, amqp:released:list,
> amqp:modified:list],capabilities=[queue, global]},target=Target{}}
> 2017-01-24 11:05:56,474 DEBUG [VirtualHostNode-default-Config]
> (o.a.q.s.c.u.TaskExecutorImpl) - Performing Task['add consumer' on 'queue'
> with arguments
> 'target=ConsumerTarget_1_0[linkSession=[con:7(admin@/127.0.0.1:51108/default)/ch:1]
> ],
> consumerName=qpid-jms:receiver:ID:20aa7189-d399-472d-a8ce-aa8b7a3bef75:1:1:1:queue,
> optionSet=[ACQUIRES, SEES_REQUEUES]']
> 2017-01-24 11:05:56,475 DEBUG [VirtualHostNode-default-Config]
> (o.a.q.s.m.AbstractConfiguredObject) - authorise returned DEFER
> 2017-01-24 11:05:56,475 DEBUG [VirtualHostNode-default-Config]
> (o.a.q.s.m.AbstractConfiguredObject) - authorise returned DEFER, returing
> default: ALLOWED
> 2017-01-24 11:05:56,475 DEBUG [VirtualHostNode-default-Config]
> (o.a.q.s.c.u.TaskExecutorImpl) - Performing Task['open' on
> 'Consumer[id=164b1b27-176a-42a3-a5dc-613d97bc58b2,
> name=7|1|qpid-jms:receiver:ID:20aa7189-d399-472d-a8ce-aa8b7a3bef75:1:1:1:queue,
> type=Consumer]']
> 2017-01-24 11:05:56,475 DEBUG [VirtualHostNode-default-Config]
> (o.a.q.s.c.u.TaskExecutorImpl) - Task['open' on
> 'Consumer[id=164b1b27-176a-42a3-a5dc-613d97bc58b2,
> name=7|1|qpid-jms:receiver:ID:20aa7189-d399-472d-a8ce-aa8b7a3bef75:1:1:1:queue,
> type=Consumer]'] performed successfully with result: null
> 2017-01-24 11:05:56,476 INFO [VirtualHostNode-default-Config] (q.m.s.create)
> - [con:7(admin@/127.0.0.1:51108/default)/ch:1] [sub:7(vh(/default)/qu(queue)]
> SUB-1001 : Create
> 2017-01-24 11:05:56,476 DEBUG [VirtualHostNode-default-Config]
> (o.a.q.s.c.u.TaskExecutorImpl) - Task['add consumer' on 'queue' with
> arguments
> 'target=ConsumerTarget_1_0[linkSession=[con:7(admin@/127.0.0.1:51108/default)/ch:1]
> ],
> consumerName=qpid-jms:receiver:ID:20aa7189-d399-472d-a8ce-aa8b7a3bef75:1:1:1:queue,
> optionSet=[ACQUIRES, SEES_REQUEUES]'] performed successfully with result:
> Consumer[id=164b1b27-176a-42a3-a5dc-613d97bc58b2,
> name=7|1|qpid-jms:receiver:ID:20aa7189-d399-472d-a8ce-aa8b7a3bef75:1:1:1:queue,
> type=Consumer]
> 2017-01-24 11:05:56,477 DEBUG [IO-/127.0.0.1:51108] (o.a.q.s.p.frame) -
> SEND[/127.0.0.1:51108|1] :
> Attach{name=qpid-jms:receiver:ID:20aa7189-d399-472d-a8ce-aa8b7a3bef75:1:1:1:queue,handle=1,role=sender,sndSettleMode=unsettled,rcvSettleMode=first,source=Source{address=queue,durable=none,expiryPolicy=link-detach,timeout=0,dynamic=false,defaultOutcome=Modified{deliveryFailed=true},outcomes=[amqp:accepted:list,
> amqp:rejected:list, amqp:released:list,
> amqp:modified:list],capabilities=[queue]},target=Target{},initialDeliveryCount=0,offeredCapabilities=[SHARED-SUBS],properties={}}
> 2017-01-24 11:05:56,477 DEBUG [IO-/127.0.0.1:51108]
> (o.a.q.s.t.NonBlockingConnection) - Written 251 bytes
> 2017-01-24 11:05:56,477 DEBUG [IO-/127.0.0.1:51108]
> (o.a.q.s.t.NonBlockingConnection) - Read 0 byte(s)
> 2017-01-24 11:05:56,487 DEBUG [IO-/127.0.0.1:51108]
> (o.a.q.s.t.NonBlockingConnection) - Read 34 byte(s)
> 2017-01-24 11:05:56,487 DEBUG [IO-/127.0.0.1:51108] (o.a.q.s.p.frame) -
> RECV[/127.0.0.1:51108|1] :
> Flow{nextIncomingId=0,incomingWindow=2047,nextOutgoingId=1,outgoingWindow=2147483647,handle=1,deliveryCount=0,linkCredit=1,drain=true}
> 2017-01-24 11:05:56,487 DEBUG [IO-/127.0.0.1:51108]
> (o.a.q.s.t.NonBlockingConnection) - Read 0 byte(s)
> 2017-01-24 11:05:58,162 DEBUG [IO-/127.0.0.1:51108]
> (o.a.q.s.t.NonBlockingConnection) - Read 0 byte(s)
> ...
> {noformat}
> Relevant part of the spec:
> http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-transport-v1.0-os.html#doc-flow-control
> bq. If the sender's drain flag is set and there are no available messages,
> the sender MUST advance its delivery-count until link-credit is zero, and
> send its updated flow state to the receiver.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]