Hi,
qpid jms client version - 0.23.0
qpid c++ broker version - 0.34
I am unable to send messages asynchronously using the JMS 2.0 API.
Connection string is same for JMS 1.1 and JMS 2.0 -
"amqp://host:20405?jms.username=admin&jms.password=admin&amqp.traceFrames=true&jms.forceAsyncSend=true&jms.forceAsyncAcks=true"
When I use JMS 1.1 API, it works ok.
*Code for JMS 1.1 below.*
try
{
Topic queue = (Topic) initialContext.lookup("node");
Connection connection = ((ConnectionFactory)
initialContext.lookup("connection")).createConnection();
Session session = connection.createSession(false,
Session.CLIENT_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(queue);
connection.start();
for (int i = 0; i < 500; i++)
{
producer.send(session.createTextMessage("test"));
}
}
catch
...
However when I use JMS 2.0 API, sending is extremely slow (1 message per
second).
*Code for JMS 2.0 API below.*
try (JMSContext context =
connectionFactory.createContext(JMSContext.CLIENT_ACKNOWLEDGE))
{
JMSProducer producer = context.createProducer();
producer.setAsync(new SendCompletitionListener());
for (int i = 0; i < senderOptionParser.getMessageCountValue(); i++)
{
producer.send(queue, context.createTextMessage("test"));
}
}
catch
...
*Part of trace output of JMS 1.1 during one message sent:*
{Output before producer.send(session.createTextMessage("test"));}
...
[AmqpProvider:(1):[amqp://host:20405]] TRACE
org.apache.qpid.jms.provider.amqp.AmqpFixedProducer - Producer sending
message: JmsOutboundMessageDispatch {dispatchId =
ID:dfbce852-0cdc-4669-99df-e9d59569b0f6:1:1:1-1, MessageID =
ID:dfbce852-0cdc-4669-99df-e9d59569b0f6:1:1:1-1 }
[AmqpProvider:(1):[amqp://host:20405]] TRACE
org.apache.qpid.jms.provider.amqp.FRAMES - SENT: Transfer{handle=0,
deliveryId=0, deliveryTag=0, messageFormat=0, settled=null, more=false,
rcvSettleMode=null, state=null, resume=false, aborted=false,
batchable=false}
[AmqpProvider:(1):[amqp://host:20405]] TRACE
org.apache.qpid.jms.transports.netty.NettyTcpTransport - Attempted write of:
2283 bytes
...
{Output after producer.send(session.createTextMessage("test"));}
*Part of trace output of JMS 2.0 during one message sent:*
{Output before producer.send(queue, context.createTextMessage("test"));}
...
[AmqpProvider:(1):[amqp://host:20405]] TRACE
org.apache.qpid.jms.provider.amqp.FRAMES - SENT:
Attach{name='qpid-jms:sender:ID:8261d327-2100-47a1-8860-69bc1d0fd6ae:1:-1:1:broadcast',
handle=0, role=SENDER, sndSettleMode=UNSETTLED, rcvSettleMode=FIRST,
source=Source{address='ID:8261d327-2100-47a1-8860-69bc1d0fd6ae:1:-1:1',
durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false,
dynamicNodeProperties=null, distributionMode=null, filter=null,
defaultOutcome=null, outcomes=[amqp:accepted:list, amqp:rejected:list,
amqp:released:list, amqp:modified:list], capabilities=null},
target=Target{address='broadcast', durable=NONE, expiryPolicy=SESSION_END,
timeout=0, dynamic=false, dynamicNodeProperties=null, capabilities=[topic]},
unsettled=null, incompleteUnsettled=false, initialDeliveryCount=0,
maxMessageSize=null, offeredCapabilities=null,
desiredCapabilities=[DELAYED_DELIVERY], properties=null}
[AmqpProvider:(1):[amqp://host:20405]] TRACE
org.apache.qpid.jms.transports.netty.NettyTcpTransport - Attempted write of:
324 bytes
[epollEventLoopGroup-2-1] TRACE
org.apache.qpid.jms.transports.netty.NettyTcpTransport - New data read: 212
bytes incoming: PooledUnsafeDirectByteBuf(ridx: 0, widx: 212, cap: 65536)
[AmqpProvider:(1):[amqp://host:20405]] TRACE
org.apache.qpid.jms.provider.amqp.FRAMES - RECV:
Attach{name='qpid-jms:sender:ID:8261d327-2100-47a1-8860-69bc1d0fd6ae:1:-1:1:broadcast',
handle=0, role=RECEIVER, sndSettleMode=MIXED, rcvSettleMode=FIRST,
source=Source{address='null', durable=NONE, expiryPolicy=SESSION_END,
timeout=0, dynamic=false, dynamicNodeProperties=null, distributionMode=null,
filter=null, defaultOutcome=null, outcomes=null, capabilities=null},
target=Target{address='broadcast', durable=NONE, expiryPolicy=SESSION_END,
timeout=0, dynamic=false, dynamicNodeProperties=null, capabilities=[topic]},
unsettled=null, incompleteUnsettled=false, initialDeliveryCount=0,
maxMessageSize=null, offeredCapabilities=null, desiredCapabilities=null,
properties=null}
[AmqpProvider:(1):[amqp://host:20405]] TRACE
org.apache.qpid.jms.provider.amqp.FRAMES - RECV: Flow{nextIncomingId=2,
incomingWindow=2147483647, nextOutgoingId=0, outgoingWindow=2147483647,
handle=0, deliveryCount=0, linkCredit=500, available=null, drain=false,
echo=false, properties=null}
[AmqpProvider:(1):[amqp://host:20405]] TRACE
org.apache.qpid.jms.provider.amqp.AmqpProvider - New Proton Event: LINK_INIT
[AmqpProvider:(1):[amqp://host:20405]] TRACE
org.apache.qpid.jms.provider.amqp.AmqpProvider - New Proton Event:
LINK_LOCAL_OPEN
[AmqpProvider:(1):[amqp://host:20405]] TRACE
org.apache.qpid.jms.provider.amqp.AmqpProvider - New Proton Event:
LINK_REMOTE_OPEN
[AmqpProvider:(1):[amqp://host:20405]] TRACE
org.apache.qpid.jms.provider.amqp.AmqpAnonymousFallbackProducer - Open phase
of anonymous send complete: ID:fe9d80f0-ab3f-45e1-bb7e-3950e460b0a3:1:1:1
[AmqpProvider:(1):[amqp://host:20405]] TRACE
org.apache.qpid.jms.provider.amqp.AmqpFixedProducer - Producer sending
message: JmsOutboundMessageDispatch {dispatchId =
ID:fe9d80f0-ab3f-45e1-bb7e-3950e460b0a3:1:1:1-2, MessageID =
ID:fe9d80f0-ab3f-45e1-bb7e-3950e460b0a3:1:1:1-2 }
[AmqpProvider:(1):[amqp://host:20405]] TRACE
org.apache.qpid.jms.provider.amqp.FRAMES - SENT: Transfer{handle=0,
deliveryId=1, deliveryTag=0, messageFormat=0, settled=null, more=false,
rcvSettleMode=null, state=null, resume=false, aborted=false,
batchable=false}
[AmqpProvider:(1):[amqp://host:20405]] TRACE
org.apache.qpid.jms.transports.netty.NettyTcpTransport - Attempted write of:
2284 bytes
[AmqpProvider:(1):[amqp://host:20405]] TRACE
org.apache.qpid.jms.provider.amqp.AmqpProvider - New Proton Event: LINK_FLOW
[epollEventLoopGroup-2-1] TRACE
org.apache.qpid.jms.transports.netty.NettyTcpTransport - New data read: 45
bytes incoming: PooledUnsafeDirectByteBuf(ridx: 0, widx: 45, cap: 65536)
[AmqpProvider:(1):[amqp://host:20405]] TRACE
org.apache.qpid.jms.provider.amqp.FRAMES - RECV: Flow{nextIncomingId=3,
incomingWindow=2147483647, nextOutgoingId=0, outgoingWindow=2147483647,
handle=0, deliveryCount=1, linkCredit=500, available=null, drain=false,
echo=false, properties=null}
[AmqpProvider:(1):[amqp://host:20405]] TRACE
org.apache.qpid.jms.provider.amqp.AmqpProvider - New Proton Event: LINK_FLOW
[epollEventLoopGroup-2-1] TRACE
org.apache.qpid.jms.transports.netty.NettyTcpTransport - New data read: 38
bytes incoming: PooledUnsafeDirectByteBuf(ridx: 0, widx: 38, cap: 65536)
[AmqpProvider:(1):[amqp://host:20405]] TRACE
org.apache.qpid.jms.provider.amqp.FRAMES - RECV: Disposition{role=RECEIVER,
first=1, last=1, settled=true, state=Accepted{}, batchable=false}
[AmqpProvider:(1):[amqp://host:20405]] TRACE
org.apache.qpid.jms.provider.amqp.AmqpProvider - New Proton Event: DELIVERY
[AmqpProvider:(1):[amqp://host:20405]] TRACE
org.apache.qpid.jms.provider.amqp.AmqpFixedProducer - Outcome of delivery
was accepted: DeliveryImpl [_tag=[48],
_link=org.apache.qpid.proton.engine.impl.SenderImpl@67de5132,
_deliveryState=null, _settled=false, _remoteSettled=true,
_remoteDeliveryState=Accepted{}, _flags=0, _defaultDeliveryState=null,
_transportDelivery=org.apache.qpid.proton.engine.impl.TransportDelivery@1303044f,
_dataSize=0, _complete=true, _updated=true, _done=true, _offset=0]
[AmqpProvider:(1):[amqp://host:20405]] TRACE
org.apache.qpid.jms.provider.amqp.AmqpAnonymousFallbackProducer - Send phase
of anonymous send complete: ID:fe9d80f0-ab3f-45e1-bb7e-3950e460b0a3:1:1:1
[AmqpProvider:(1):[amqp://host:20405]] TRACE
org.apache.qpid.jms.provider.amqp.AmqpAbstractResource - AmqpFixedProducer {
ID:8261d327-2100-47a1-8860-69bc1d0fd6ae:1:-1:1 } requesting close on remote.
[AmqpProvider:(1):[amqp://host:20405]] TRACE
org.apache.qpid.jms.provider.amqp.AmqpProvider - New Proton Event:
LINK_LOCAL_CLOSE
[AmqpProvider:(1):[amqp://host:20405]] TRACE
org.apache.qpid.jms.provider.amqp.FRAMES - SENT: Detach{handle=0,
closed=true, error=null}
[AmqpProvider:(1):[amqp://host:20405]] TRACE
org.apache.qpid.jms.transports.netty.NettyTcpTransport - Attempted write of:
16 bytes
[epollEventLoopGroup-2-1] TRACE
org.apache.qpid.jms.transports.netty.NettyTcpTransport - New data read: 24
bytes incoming: PooledUnsafeDirectByteBuf(ridx: 0, widx: 24, cap: 65536)
[AmqpProvider:(1):[amqp://host:20405]] TRACE
org.apache.qpid.jms.provider.amqp.FRAMES - RECV: Detach{handle=0,
closed=true, error=null}
[AmqpProvider:(1):[amqp://host:20405]] TRACE
org.apache.qpid.jms.provider.amqp.AmqpProvider - New Proton Event:
LINK_REMOTE_CLOSE
[AmqpProvider:(1):[amqp://host:20405]] DEBUG
org.apache.qpid.jms.provider.amqp.AmqpAbstractResource - AmqpFixedProducer {
ID:8261d327-2100-47a1-8860-69bc1d0fd6ae:1:-1:1 } is now closed:
[AmqpProvider:(1):[amqp://host:20405]] TRACE
org.apache.qpid.jms.provider.amqp.AmqpAnonymousFallbackProducer - Close
phase of anonymous send complete:
ID:fe9d80f0-ab3f-45e1-bb7e-3950e460b0a3:1:1:1
[AmqpProvider:(1):[amqp://host:20405]] TRACE
org.apache.qpid.jms.provider.amqp.AmqpProvider - New Proton Event:
LINK_FINAL
[AmqpProvider:(1):[amqp://host:20405]] TRACE
org.apache.qpid.jms.provider.amqp.AmqpAnonymousFallbackProducer - Started
send chain for anonymous producer:
ID:fe9d80f0-ab3f-45e1-bb7e-3950e460b0a3:1:1:1
[AmqpProvider:(1):[amqp://host:20405]] DEBUG
org.apache.qpid.jms.provider.amqp.builders.AmqpProducerBuilder - Creating
AmqpFixedProducer for: broadcast
[AmqpProvider:(1):[amqp://host:20405]] TRACE
org.apache.qpid.jms.provider.amqp.FRAMES - SENT:
Attach{name='qpid-jms:sender:ID:8261d327-2100-47a1-8860-69bc1d0fd6ae:1:-1:2:broadcast',
handle=0, role=SENDER, sndSettleMode=UNSETTLED, rcvSettleMode=FIRST,
source=Source{address='ID:8261d327-2100-47a1-8860-69bc1d0fd6ae:1:-1:2',
durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false,
dynamicNodeProperties=null, distributionMode=null, filter=null,
defaultOutcome=null, outcomes=[amqp:accepted:list, amqp:rejected:list,
amqp:released:list, amqp:modified:list], capabilities=null},
target=Target{address='broadcast', durable=NONE, expiryPolicy=SESSION_END,
timeout=0, dynamic=false, dynamicNodeProperties=null, capabilities=[topic]},
unsettled=null, incompleteUnsettled=false, initialDeliveryCount=0,
maxMessageSize=null, offeredCapabilities=null,
desiredCapabilities=[DELAYED_DELIVERY], properties=null}
[AmqpProvider:(1):[amqp://host:20405]] TRACE
org.apache.qpid.jms.transports.netty.NettyTcpTransport - Attempted write of:
324 bytes
...
{Output after producer.send(queue, context.createTextMessage("test"));}
It seems to me that in JMS 2.0 api producer closes link after each send of
message and then opens a new one when producer wants to send another
message.
Am I doing something wrong with JMS 2.0 API?
Tomas