Hi Team,
We are using qpid-jms-client-0.57.0 to publish and receive messages from
Azure ServiceBus. ServiceBus provides a feature to receive messages from
session to maintain message order. Please refer here for more details -
https://docs.microsoft.com/en-us/azure/service-bus-messaging/message-sessions
I am able to publish messages with JMXGroupId but not able to receive
messages from session enabled queue.
Getting error - javax.jms.JMSException: *It is not possible for an entity
that requires sessions to create a non-sessionful message receiver*.
TrackingId:***,
SystemTracker:mule-intr-sbus-test-standard:Queue:test-order,
Timestamp:2021-07-28T11:07:49 TrackingId:**, SystemTracker:gateway7,
Timestamp:2021-07-28T11:07:49 [condition = amqp:not-allowed]
Please find attached
1. TestSessionEnable.txt - Sample code
2. Failed_To_Receive_Msgs.txt - Error details with proton logs
3. Publish_Successfully.txt
Could you please suggest any way to receive session enabled messages?
Please let me know for any queries.
Regards,
Abhishek Kumar
** Receiver start **
[527804008:0] -> SASL
[527804008:0] <- SASL
[527804008:0] <- SaslMechanisms{saslServerMechanisms=[MSSBCBS, PLAIN,
ANONYMOUS, EXTERNAL]}
[527804008:0] -> SaslInit{mechanism=PLAIN, initialResponse=\x00****\x0****,
hostname='****.servicebus.windows.net'}
[527804008:0] <- SaslOutcome{_code=OK, _additionalData=Welcome!}
[527804008:0] -> AMQP
[527804008:0] -> Open{ containerId='ID:****',
hostname='****.servicebus.windows.net', maxFrameSize=1000, channelMax=32767,
idleTimeOut=30000, outgoingLocales=null, incomingLocales=null,
offeredCapabilities=null, desiredCapabilities=[sole-connection-for-container,
DELAYED_DELIVERY, ANONYMOUS-RELAY, SHARED-SUBS], properties={product=QpidJMS,
version=0.56.0, platform=JVM: 1.8.0_242, 25.242-b08, AdoptOpenJDK, OS: Windows
10, 10.0, amd64}}
[527804008:0] <- AMQP
[527804008:0] <- Open{ containerId='****', hostname='null', maxFrameSize=1000,
channelMax=4999, idleTimeOut=120000, outgoingLocales=null,
incomingLocales=null, offeredCapabilities=null, desiredCapabilities=null,
properties=null}
[527804008:0] -> Begin{remoteChannel=null, nextOutgoingId=1,
incomingWindow=2147483, outgoingWindow=2147483647, handleMax=65535,
offeredCapabilities=null, desiredCapabilities=null, properties=null}
[527804008:0] <- Begin{remoteChannel=0, nextOutgoingId=1, incomingWindow=5000,
outgoingWindow=2147483, handleMax=255, offeredCapabilities=null,
desiredCapabilities=null, properties=null}
[527804008:1] -> Begin{remoteChannel=null, nextOutgoingId=1,
incomingWindow=2147483, outgoingWindow=2147483647, handleMax=65535,
offeredCapabilities=null, desiredCapabilities=null, properties=null}
[527804008:1] <- Begin{remoteChannel=1, nextOutgoingId=1, incomingWindow=5000,
outgoingWindow=2147483, handleMax=255, offeredCapabilities=null,
desiredCapabilities=null, properties=null}
[527804008:1] ->
Attach{name='qpid-jms:receiver:ID:f5fced8f-f972-424d-8912-1d23a35327ca:1:1:1:test-order',
handle=0, role=RECEIVER, sndSettleMode=UNSETTLED, rcvSettleMode=FIRST,
source=Source{address='test-order', durable=NONE, expiryPolicy=LINK_DETACH,
timeout=0, dynamic=false, dynamicNodeProperties=null, distributionMode=null,
filter=null, defaultOutcome=Modified{deliveryFailed=true,
undeliverableHere=null, messageAnnotations=null}, outcomes=[amqp:accepted:list,
amqp:rejected:list, amqp:released:list, amqp:modified:list],
capabilities=[queue]}, target=Target{address='null', durable=NONE,
expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null,
capabilities=null}, unsettled=null, incompleteUnsettled=false,
initialDeliveryCount=null, maxMessageSize=null, offeredCapabilities=null,
desiredCapabilities=null, properties=null}
[527804008:1] <-
Attach{name='qpid-jms:receiver:ID:f5fced8f-f972-424d-8912-1d23a35327ca:1:1:1:test-order',
handle=0, role=SENDER, sndSettleMode=MIXED, rcvSettleMode=FIRST, source=null,
target=null, unsettled=null, incompleteUnsettled=false,
initialDeliveryCount=null, maxMessageSize=null, offeredCapabilities=null,
desiredCapabilities=null, properties=null}
[527804008:1] <- Detach{handle=0, closed=true,
error=Error{condition=amqp:not-allowed, description='It is not possible for an
entity that requires sessions to create a non-sessionful message receiver.
TrackingId:****, SystemTracker:****:Queue:test-order,
Timestamp:2021-07-28T10:30:03 TrackingId:****, SystemTracker:gateway7,
Timestamp:2021-07-28T10:30:03', info=null}}
[527804008:1] -> Detach{handle=0, closed=true, error=null}
WARN 2021-07-28 11:30:03,890 [AmqpProvider
:(1):[amqps://****.servicebus.windows.net:-1]]
org.apache.qpid.jms.provider.amqp.builders.AmqpResourceBuilder: Open of
resource:(JmsConsumerInfo: { ID:f5fced8f-f972-424d-8912-1d23a35327ca:1:1:1,
destination = test-order }) failed: It is not possible for an entity that
requires sessions to create a non-sessionful message receiver. TrackingId:****,
SystemTracker:****:Queue:test-order, Timestamp:2021-07-28T10:30:03
TrackingId:****, SystemTracker:gateway7, Timestamp:2021-07-28T10:30:03
[condition = amqp:not-allowed]
Exception in thread "main" javax.jms.JMSException: It is not possible for an
entity that requires sessions to create a non-sessionful message receiver.
TrackingId:****, SystemTracker:****:Queue:test-order,
Timestamp:2021-07-28T10:30:03 TrackingId:****, SystemTracker:gateway7,
Timestamp:2021-07-28T10:30:03 [condition = amqp:not-allowed]
at
org.apache.qpid.jms.provider.ProviderException.toJMSException(ProviderException.java:34)
at
org.apache.qpid.jms.exceptions.JmsExceptionSupport.create(JmsExceptionSupport.java:80)
at
org.apache.qpid.jms.exceptions.JmsExceptionSupport.create(JmsExceptionSupport.java:112)
at
org.apache.qpid.jms.JmsConnection.createResource(JmsConnection.java:698)
at
org.apache.qpid.jms.JmsMessageConsumer.<init>(JmsMessageConsumer.java:125)
at
org.apache.qpid.jms.JmsMessageConsumer.<init>(JmsMessageConsumer.java:82)
at org.apache.qpid.jms.JmsSession.createConsumer(JmsSession.java:479)
at org.apache.qpid.jms.JmsSession.createConsumer(JmsSession.java:467)
at org.apache.qpid.jms.JmsSession.createConsumer(JmsSession.java:459)
at
com.qpid.test.TestSessionEnable.receiveMessage(TestSessionEnable.java:70)
at com.qpid.test.TestSessionEnable.main(TestSessionEnable.java:80)
Caused by: org.apache.qpid.jms.provider.ProviderException: It is not possible
for an entity that requires sessions to create a non-sessionful message
receiver. TrackingId:****, SystemTracker:****:Queue:test-order,
Timestamp:2021-07-28T10:30:03 TrackingId:****, SystemTracker:gateway7,
Timestamp:2021-07-28T10:30:03 [condition = amqp:not-allowed]
at
org.apache.qpid.jms.provider.amqp.AmqpSupport.convertToNonFatalException(AmqpSupport.java:181)
at
org.apache.qpid.jms.provider.amqp.builders.AmqpResourceBuilder.getOpenAbortExceptionFromRemote(AmqpResourceBuilder.java:299)
at
org.apache.qpid.jms.provider.amqp.builders.AmqpResourceBuilder.handleClosed(AmqpResourceBuilder.java:185)
at
org.apache.qpid.jms.provider.amqp.builders.AmqpResourceBuilder.processRemoteClose(AmqpResourceBuilder.java:129)
at
org.apache.qpid.jms.provider.amqp.AmqpProvider.processUpdates(AmqpProvider.java:985)
at
org.apache.qpid.jms.provider.amqp.AmqpProvider.onData(AmqpProvider.java:871)
at
org.apache.qpid.jms.transports.netty.NettyTcpTransport$NettyTcpTransportHandler.channelRead0(NettyTcpTransport.java:563)
at
org.apache.qpid.jms.transports.netty.NettyTcpTransport$NettyTcpTransportHandler.channelRead0(NettyTcpTransport.java:556)
at
io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1533)
at
io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1282)
at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1329)
at
io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:508)
at
io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:447)
at
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at
io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
at
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
at
io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at
io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at java.lang.Thread.run(Thread.java:748)
[527804008:0] <- Empty Frame
[527804008:0] <- Empty Frame
/**
*
*/
package com.qpid.test;
import java.util.Enumeration;
import java.util.Hashtable;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import org.apache.qpid.jms.JmsClientProperties;
import org.apache.qpid.jms.message.JmsMessage;
public class TestSessionEnable implements MessageListener{
private static final String QUEUE_NAME = "test-order";
public static final String SBUS_NAME = "****";
public static final String USERNAME = "****";
public static final String PASSWORD = "****";
private static final String QPID_CONNECTION_FACTORY_CLASS =
"org.apache.qpid.jms.jndi.JmsInitialContextFactory";
public static void main(String[] args) throws Exception {
TestSessionEnable testSessionEnable = new TestSessionEnable();
//testSessionEnable.publishMessage();
testSessionEnable.receiveMessage();
}
private Connection createConnection() throws NamingException, JMSException {
Hashtable<String, String> hashtable = new Hashtable<>();
int maxFrameSize = 1000;
hashtable.put("connectionfactory.SBCF", "amqps://" + SBUS_NAME +
".servicebus.windows.net?amqp.maxFrameSize=" + maxFrameSize
+
"&transport.tcpKeepAlive=true&amqp.traceFrames=true&jms.prefetchPolicy.all=2&jms.validatePropertyNames=false");
hashtable.put(Context.INITIAL_CONTEXT_FACTORY,
QPID_CONNECTION_FACTORY_CLASS);
Context context = new InitialContext(hashtable);
ConnectionFactory connectionFactory = (ConnectionFactory)
context.lookup("SBCF");
Connection connection = connectionFactory.createConnection(USERNAME,
PASSWORD);
return connection;
}
public void receiveMessage() throws Exception {
System.out.println("** Receiver start **");
Connection connection = createConnection();
connection.start();
Session session = connection.createSession(false,
Session.CLIENT_ACKNOWLEDGE);
Destination destination = session.createQueue(QUEUE_NAME);
MessageConsumer messageConsumer = session.createConsumer(destination);
messageConsumer.setMessageListener(this);
System.out.println("** Receiver registered **");
}
public void publishMessage() throws Exception {
System.out.println("** Sent start **");
Connection connection = createConnection();
connection.start();
Session session = connection.createSession(false,
Session.CLIENT_ACKNOWLEDGE);
Destination destination = session.createQueue(QUEUE_NAME);
MessageProducer messageProducer = session.createProducer(destination);
Message message = session.createTextMessage("Hello World");
message.setStringProperty(JmsClientProperties.JMSXGROUPID, "112233");
messageProducer.send(message);
System.out.println("** Sent finish **");
}
/* (non-Javadoc)
* @see javax.jms.MessageListener#onMessage(javax.jms.Message)
*/
@Override
public void onMessage(Message message) {
try {
Enumeration propertyNames = message.getPropertyNames();
while(propertyNames.hasMoreElements()) {
String propertyName = (String)propertyNames.nextElement();
System.out.println("PropertyName - " + propertyName + " - " +
message.getObjectProperty(propertyName));
}
Object partitionKey = ((JmsMessage)
message).getFacade().getTracingAnnotation("x-opt-partition-key");
System.out.println("partitionKey - " + partitionKey);
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
** Sent start **
[527804008:0] -> SASL
[527804008:0] <- SASL
[527804008:0] <- SaslMechanisms{saslServerMechanisms=[MSSBCBS, PLAIN,
ANONYMOUS, EXTERNAL]}
[527804008:0] -> SaslInit{mechanism=PLAIN, initialResponse=\x00****\x0****,
hostname='****.servicebus.windows.net'}
[527804008:0] <- SaslOutcome{_code=OK, _additionalData=Welcome!}
[527804008:0] -> AMQP
[527804008:0] -> Open{ containerId='ID:****',
hostname='****.servicebus.windows.net', maxFrameSize=1000, channelMax=32767,
idleTimeOut=30000, outgoingLocales=null, incomingLocales=null,
offeredCapabilities=null, desiredCapabilities=[sole-connection-for-container,
DELAYED_DELIVERY, ANONYMOUS-RELAY, SHARED-SUBS], properties={product=QpidJMS,
version=0.56.0, platform=JVM: 1.8.0_242, 25.242-b08, AdoptOpenJDK, OS: Windows
10, 10.0, amd64}}
[527804008:0] <- AMQP
[527804008:0] <- Open{ containerId='****', hostname='null', maxFrameSize=1000,
channelMax=4999, idleTimeOut=120000, outgoingLocales=null,
incomingLocales=null, offeredCapabilities=null, desiredCapabilities=null,
properties=null}
[527804008:0] -> Begin{remoteChannel=null, nextOutgoingId=1,
incomingWindow=2147483, outgoingWindow=2147483647, handleMax=65535,
offeredCapabilities=null, desiredCapabilities=null, properties=null}
[527804008:0] <- Begin{remoteChannel=0, nextOutgoingId=1, incomingWindow=5000,
outgoingWindow=2147483, handleMax=255, offeredCapabilities=null,
desiredCapabilities=null, properties=null}
[527804008:1] -> Begin{remoteChannel=null, nextOutgoingId=1,
incomingWindow=2147483, outgoingWindow=2147483647, handleMax=65535,
offeredCapabilities=null, desiredCapabilities=null, properties=null}
[527804008:1] <- Begin{remoteChannel=1, nextOutgoingId=1, incomingWindow=5000,
outgoingWindow=2147483, handleMax=255, offeredCapabilities=null,
desiredCapabilities=null, properties=null}
[527804008:1] -> Attach{name='qpid-jms:sender:ID:****:1:1:1:test-order',
handle=0, role=SENDER, sndSettleMode=UNSETTLED, rcvSettleMode=FIRST,
source=Source{address='ID:****:1:1:1', durable=NONE, expiryPolicy=SESSION_END,
timeout=0, dynamic=false, dynamicNodeProperties=null, distributionMode=null,
filter=null, defaultOutcome=null, outcomes=[amqp:accepted:list,
amqp:rejected:list, amqp:released:list, amqp:modified:list],
capabilities=null}, target=Target{address='test-order', durable=NONE,
expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null,
capabilities=[queue]}, unsettled=null, incompleteUnsettled=false,
initialDeliveryCount=0, maxMessageSize=null, offeredCapabilities=null,
desiredCapabilities=[DELAYED_DELIVERY], properties=null}
[527804008:1] <- Attach{name='qpid-jms:sender:ID:****:1:1:1:test-order',
handle=0, role=RECEIVER, sndSettleMode=UNSETTLED, rcvSettleMode=FIRST,
source=Source{address='ID:****:1:1:1', durable=NONE, expiryPolicy=SESSION_END,
timeout=0, dynamic=false, dynamicNodeProperties=null, distributionMode=null,
filter=null, defaultOutcome=null, outcomes=[amqp:accepted:list,
amqp:rejected:list, amqp:released:list, amqp:modified:list],
capabilities=null}, target=Target{address='test-order', durable=NONE,
expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null,
capabilities=[queue]}, unsettled=null, incompleteUnsettled=false,
initialDeliveryCount=null, maxMessageSize=262144,
offeredCapabilities=[DELAYED_DELIVERY], desiredCapabilities=null,
properties=null}
[527804008:1] <- Flow{nextIncomingId=1, incomingWindow=5000, nextOutgoingId=1,
outgoingWindow=2147483, handle=0, deliveryCount=0, linkCredit=1000,
available=0, drain=false, echo=false, properties=null}
[527804008:1] -> Transfer{handle=0, deliveryId=0, deliveryTag=\x00,
messageFormat=0, settled=false, more=false, rcvSettleMode=null, state=null,
resume=false, aborted=false, batchable=false} (166)
"\x00Sp\xc0\x02\x01A\x00Sr\xc1)\x04\xa3\x0ex-opt-jms-destQ\x00\xa3\x12x-opt-jms-msg-typeQ\x05\x00Ss\xd0\x00\x00\x00Y\x00\x00\x00\x0b\xa1/ID:****:1:1:1-1@\xa1\x0atest-order@@@@@@\x83\x00\x00\x01z\xec\xab\xc4\xac\xa1\x06112233\x00Sw\xa1\x0bHello
World"
[527804008:1] <- Disposition{role=RECEIVER, first=0, last=null, settled=true,
state=Accepted{}, batchable=false}
** Sent finish **
[527804008:0] <- Empty Frame
[527804008:0] <- Empty Frame
[527804008:0] <- Empty Frame
[527804008:0] <- Empty Frame
[527804008:0] -> Empty Frame
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]