Hi Team,

We are using qpid-jms-client-0.57.0 to publish and receive messages from
Azure ServiceBus. ServiceBus provides a feature to receive messages from
session to maintain message order. Please refer here for more details -
https://docs.microsoft.com/en-us/azure/service-bus-messaging/message-sessions

I am able to publish messages with JMXGroupId but not able to receive
messages from session enabled queue.

Getting error - javax.jms.JMSException: *It is not possible for an entity
that requires sessions to create a non-sessionful message receiver*.
TrackingId:***,
SystemTracker:mule-intr-sbus-test-standard:Queue:test-order,
Timestamp:2021-07-28T11:07:49 TrackingId:**, SystemTracker:gateway7,
Timestamp:2021-07-28T11:07:49 [condition = amqp:not-allowed]

Please find attached
1. TestSessionEnable.txt - Sample code
2. Failed_To_Receive_Msgs.txt - Error details with proton logs
3. Publish_Successfully.txt

Could you please suggest any way to receive session enabled messages?
Please let me know for any queries.

Regards,
Abhishek Kumar
** Receiver start **
[527804008:0] -> SASL
[527804008:0] <- SASL
[527804008:0] <- SaslMechanisms{saslServerMechanisms=[MSSBCBS, PLAIN, 
ANONYMOUS, EXTERNAL]}
[527804008:0] -> SaslInit{mechanism=PLAIN, initialResponse=\x00****\x0****, 
hostname='****.servicebus.windows.net'}
[527804008:0] <- SaslOutcome{_code=OK, _additionalData=Welcome!}
[527804008:0] -> AMQP
[527804008:0] -> Open{ containerId='ID:****', 
hostname='****.servicebus.windows.net', maxFrameSize=1000, channelMax=32767, 
idleTimeOut=30000, outgoingLocales=null, incomingLocales=null, 
offeredCapabilities=null, desiredCapabilities=[sole-connection-for-container, 
DELAYED_DELIVERY, ANONYMOUS-RELAY, SHARED-SUBS], properties={product=QpidJMS, 
version=0.56.0, platform=JVM: 1.8.0_242, 25.242-b08, AdoptOpenJDK, OS: Windows 
10, 10.0, amd64}}
[527804008:0] <- AMQP
[527804008:0] <- Open{ containerId='****', hostname='null', maxFrameSize=1000, 
channelMax=4999, idleTimeOut=120000, outgoingLocales=null, 
incomingLocales=null, offeredCapabilities=null, desiredCapabilities=null, 
properties=null}
[527804008:0] -> Begin{remoteChannel=null, nextOutgoingId=1, 
incomingWindow=2147483, outgoingWindow=2147483647, handleMax=65535, 
offeredCapabilities=null, desiredCapabilities=null, properties=null}
[527804008:0] <- Begin{remoteChannel=0, nextOutgoingId=1, incomingWindow=5000, 
outgoingWindow=2147483, handleMax=255, offeredCapabilities=null, 
desiredCapabilities=null, properties=null}
[527804008:1] -> Begin{remoteChannel=null, nextOutgoingId=1, 
incomingWindow=2147483, outgoingWindow=2147483647, handleMax=65535, 
offeredCapabilities=null, desiredCapabilities=null, properties=null}
[527804008:1] <- Begin{remoteChannel=1, nextOutgoingId=1, incomingWindow=5000, 
outgoingWindow=2147483, handleMax=255, offeredCapabilities=null, 
desiredCapabilities=null, properties=null}
[527804008:1] -> 
Attach{name='qpid-jms:receiver:ID:f5fced8f-f972-424d-8912-1d23a35327ca:1:1:1:test-order',
 handle=0, role=RECEIVER, sndSettleMode=UNSETTLED, rcvSettleMode=FIRST, 
source=Source{address='test-order', durable=NONE, expiryPolicy=LINK_DETACH, 
timeout=0, dynamic=false, dynamicNodeProperties=null, distributionMode=null, 
filter=null, defaultOutcome=Modified{deliveryFailed=true, 
undeliverableHere=null, messageAnnotations=null}, outcomes=[amqp:accepted:list, 
amqp:rejected:list, amqp:released:list, amqp:modified:list], 
capabilities=[queue]}, target=Target{address='null', durable=NONE, 
expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, 
capabilities=null}, unsettled=null, incompleteUnsettled=false, 
initialDeliveryCount=null, maxMessageSize=null, offeredCapabilities=null, 
desiredCapabilities=null, properties=null}
[527804008:1] <- 
Attach{name='qpid-jms:receiver:ID:f5fced8f-f972-424d-8912-1d23a35327ca:1:1:1:test-order',
 handle=0, role=SENDER, sndSettleMode=MIXED, rcvSettleMode=FIRST, source=null, 
target=null, unsettled=null, incompleteUnsettled=false, 
initialDeliveryCount=null, maxMessageSize=null, offeredCapabilities=null, 
desiredCapabilities=null, properties=null}
[527804008:1] <- Detach{handle=0, closed=true, 
error=Error{condition=amqp:not-allowed, description='It is not possible for an 
entity that requires sessions to create a non-sessionful message receiver. 
TrackingId:****, SystemTracker:****:Queue:test-order, 
Timestamp:2021-07-28T10:30:03 TrackingId:****, SystemTracker:gateway7, 
Timestamp:2021-07-28T10:30:03', info=null}}
[527804008:1] -> Detach{handle=0, closed=true, error=null}
WARN  2021-07-28 11:30:03,890 [AmqpProvider 
:(1):[amqps://****.servicebus.windows.net:-1]] 
org.apache.qpid.jms.provider.amqp.builders.AmqpResourceBuilder: Open of 
resource:(JmsConsumerInfo: { ID:f5fced8f-f972-424d-8912-1d23a35327ca:1:1:1, 
destination = test-order }) failed: It is not possible for an entity that 
requires sessions to create a non-sessionful message receiver. TrackingId:****, 
SystemTracker:****:Queue:test-order, Timestamp:2021-07-28T10:30:03 
TrackingId:****, SystemTracker:gateway7, Timestamp:2021-07-28T10:30:03 
[condition = amqp:not-allowed]
Exception in thread "main" javax.jms.JMSException: It is not possible for an 
entity that requires sessions to create a non-sessionful message receiver. 
TrackingId:****, SystemTracker:****:Queue:test-order, 
Timestamp:2021-07-28T10:30:03 TrackingId:****, SystemTracker:gateway7, 
Timestamp:2021-07-28T10:30:03 [condition = amqp:not-allowed]
        at 
org.apache.qpid.jms.provider.ProviderException.toJMSException(ProviderException.java:34)
        at 
org.apache.qpid.jms.exceptions.JmsExceptionSupport.create(JmsExceptionSupport.java:80)
        at 
org.apache.qpid.jms.exceptions.JmsExceptionSupport.create(JmsExceptionSupport.java:112)
        at 
org.apache.qpid.jms.JmsConnection.createResource(JmsConnection.java:698)
        at 
org.apache.qpid.jms.JmsMessageConsumer.<init>(JmsMessageConsumer.java:125)
        at 
org.apache.qpid.jms.JmsMessageConsumer.<init>(JmsMessageConsumer.java:82)
        at org.apache.qpid.jms.JmsSession.createConsumer(JmsSession.java:479)
        at org.apache.qpid.jms.JmsSession.createConsumer(JmsSession.java:467)
        at org.apache.qpid.jms.JmsSession.createConsumer(JmsSession.java:459)
        at 
com.qpid.test.TestSessionEnable.receiveMessage(TestSessionEnable.java:70)
        at com.qpid.test.TestSessionEnable.main(TestSessionEnable.java:80)
Caused by: org.apache.qpid.jms.provider.ProviderException: It is not possible 
for an entity that requires sessions to create a non-sessionful message 
receiver. TrackingId:****, SystemTracker:****:Queue:test-order, 
Timestamp:2021-07-28T10:30:03 TrackingId:****, SystemTracker:gateway7, 
Timestamp:2021-07-28T10:30:03 [condition = amqp:not-allowed]
        at 
org.apache.qpid.jms.provider.amqp.AmqpSupport.convertToNonFatalException(AmqpSupport.java:181)
        at 
org.apache.qpid.jms.provider.amqp.builders.AmqpResourceBuilder.getOpenAbortExceptionFromRemote(AmqpResourceBuilder.java:299)
        at 
org.apache.qpid.jms.provider.amqp.builders.AmqpResourceBuilder.handleClosed(AmqpResourceBuilder.java:185)
        at 
org.apache.qpid.jms.provider.amqp.builders.AmqpResourceBuilder.processRemoteClose(AmqpResourceBuilder.java:129)
        at 
org.apache.qpid.jms.provider.amqp.AmqpProvider.processUpdates(AmqpProvider.java:985)
        at 
org.apache.qpid.jms.provider.amqp.AmqpProvider.onData(AmqpProvider.java:871)
        at 
org.apache.qpid.jms.transports.netty.NettyTcpTransport$NettyTcpTransportHandler.channelRead0(NettyTcpTransport.java:563)
        at 
org.apache.qpid.jms.transports.netty.NettyTcpTransport$NettyTcpTransportHandler.channelRead0(NettyTcpTransport.java:556)
        at 
io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1533)
        at 
io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1282)
        at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1329)
        at 
io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:508)
        at 
io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:447)
        at 
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at 
io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at 
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
        at 
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
        at 
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719)
        at 
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
        at 
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
        at 
io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
        at 
io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at java.lang.Thread.run(Thread.java:748)
[527804008:0] <- Empty Frame
[527804008:0] <- Empty Frame
/**
 * 
 */
package com.qpid.test;

import java.util.Enumeration;
import java.util.Hashtable;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import org.apache.qpid.jms.JmsClientProperties;
import org.apache.qpid.jms.message.JmsMessage;

public class TestSessionEnable implements MessageListener{

    private static final String QUEUE_NAME = "test-order";
    public static final String SBUS_NAME = "****";
    public static final String USERNAME = "****";
    public static final String PASSWORD = "****";
    private static final String QPID_CONNECTION_FACTORY_CLASS = 
"org.apache.qpid.jms.jndi.JmsInitialContextFactory";


    public static void main(String[] args) throws Exception {
        TestSessionEnable testSessionEnable = new TestSessionEnable();
        //testSessionEnable.publishMessage();
        testSessionEnable.receiveMessage();
    }

    private Connection createConnection() throws NamingException, JMSException {
        Hashtable<String, String> hashtable = new Hashtable<>();
        int maxFrameSize = 1000;
        hashtable.put("connectionfactory.SBCF", "amqps://" + SBUS_NAME + 
".servicebus.windows.net?amqp.maxFrameSize=" + maxFrameSize
                                + 
"&transport.tcpKeepAlive=true&amqp.traceFrames=true&jms.prefetchPolicy.all=2&jms.validatePropertyNames=false");

        hashtable.put(Context.INITIAL_CONTEXT_FACTORY, 
QPID_CONNECTION_FACTORY_CLASS);

        Context context = new InitialContext(hashtable);
        ConnectionFactory connectionFactory = (ConnectionFactory) 
context.lookup("SBCF");

        Connection connection = connectionFactory.createConnection(USERNAME, 
PASSWORD);
        return connection;
    }
    
    public void receiveMessage() throws Exception {
        System.out.println("** Receiver start **");

        Connection connection = createConnection();
        connection.start();

        Session session = connection.createSession(false, 
Session.CLIENT_ACKNOWLEDGE);

        Destination destination = session.createQueue(QUEUE_NAME);
        MessageConsumer messageConsumer = session.createConsumer(destination);
        
        messageConsumer.setMessageListener(this);
        
        System.out.println("** Receiver registered **");
    }
        
        public void publishMessage() throws Exception {
        System.out.println("** Sent start **");

        Connection connection = createConnection();
        connection.start();

        Session session = connection.createSession(false, 
Session.CLIENT_ACKNOWLEDGE);

        Destination destination = session.createQueue(QUEUE_NAME);

        MessageProducer messageProducer = session.createProducer(destination);

        Message message = session.createTextMessage("Hello World");
        message.setStringProperty(JmsClientProperties.JMSXGROUPID, "112233");
        
        messageProducer.send(message);

        System.out.println("** Sent finish **");
    }

    

    /* (non-Javadoc)
     * @see javax.jms.MessageListener#onMessage(javax.jms.Message)
     */
    @Override
    public void onMessage(Message message) {
        try {
            Enumeration propertyNames = message.getPropertyNames();
            while(propertyNames.hasMoreElements()) {
                String propertyName = (String)propertyNames.nextElement();
                System.out.println("PropertyName - " + propertyName + " - " + 
message.getObjectProperty(propertyName));
            }
            
            Object partitionKey = ((JmsMessage) 
message).getFacade().getTracingAnnotation("x-opt-partition-key");
            System.out.println("partitionKey - " + partitionKey);
            
        } catch (JMSException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        
    }

}
** Sent start **
[527804008:0] -> SASL
[527804008:0] <- SASL
[527804008:0] <- SaslMechanisms{saslServerMechanisms=[MSSBCBS, PLAIN, 
ANONYMOUS, EXTERNAL]}
[527804008:0] -> SaslInit{mechanism=PLAIN, initialResponse=\x00****\x0****, 
hostname='****.servicebus.windows.net'}
[527804008:0] <- SaslOutcome{_code=OK, _additionalData=Welcome!}
[527804008:0] -> AMQP
[527804008:0] -> Open{ containerId='ID:****', 
hostname='****.servicebus.windows.net', maxFrameSize=1000, channelMax=32767, 
idleTimeOut=30000, outgoingLocales=null, incomingLocales=null, 
offeredCapabilities=null, desiredCapabilities=[sole-connection-for-container, 
DELAYED_DELIVERY, ANONYMOUS-RELAY, SHARED-SUBS], properties={product=QpidJMS, 
version=0.56.0, platform=JVM: 1.8.0_242, 25.242-b08, AdoptOpenJDK, OS: Windows 
10, 10.0, amd64}}
[527804008:0] <- AMQP
[527804008:0] <- Open{ containerId='****', hostname='null', maxFrameSize=1000, 
channelMax=4999, idleTimeOut=120000, outgoingLocales=null, 
incomingLocales=null, offeredCapabilities=null, desiredCapabilities=null, 
properties=null}
[527804008:0] -> Begin{remoteChannel=null, nextOutgoingId=1, 
incomingWindow=2147483, outgoingWindow=2147483647, handleMax=65535, 
offeredCapabilities=null, desiredCapabilities=null, properties=null}
[527804008:0] <- Begin{remoteChannel=0, nextOutgoingId=1, incomingWindow=5000, 
outgoingWindow=2147483, handleMax=255, offeredCapabilities=null, 
desiredCapabilities=null, properties=null}
[527804008:1] -> Begin{remoteChannel=null, nextOutgoingId=1, 
incomingWindow=2147483, outgoingWindow=2147483647, handleMax=65535, 
offeredCapabilities=null, desiredCapabilities=null, properties=null}
[527804008:1] <- Begin{remoteChannel=1, nextOutgoingId=1, incomingWindow=5000, 
outgoingWindow=2147483, handleMax=255, offeredCapabilities=null, 
desiredCapabilities=null, properties=null}
[527804008:1] -> Attach{name='qpid-jms:sender:ID:****:1:1:1:test-order', 
handle=0, role=SENDER, sndSettleMode=UNSETTLED, rcvSettleMode=FIRST, 
source=Source{address='ID:****:1:1:1', durable=NONE, expiryPolicy=SESSION_END, 
timeout=0, dynamic=false, dynamicNodeProperties=null, distributionMode=null, 
filter=null, defaultOutcome=null, outcomes=[amqp:accepted:list, 
amqp:rejected:list, amqp:released:list, amqp:modified:list], 
capabilities=null}, target=Target{address='test-order', durable=NONE, 
expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, 
capabilities=[queue]}, unsettled=null, incompleteUnsettled=false, 
initialDeliveryCount=0, maxMessageSize=null, offeredCapabilities=null, 
desiredCapabilities=[DELAYED_DELIVERY], properties=null}
[527804008:1] <- Attach{name='qpid-jms:sender:ID:****:1:1:1:test-order', 
handle=0, role=RECEIVER, sndSettleMode=UNSETTLED, rcvSettleMode=FIRST, 
source=Source{address='ID:****:1:1:1', durable=NONE, expiryPolicy=SESSION_END, 
timeout=0, dynamic=false, dynamicNodeProperties=null, distributionMode=null, 
filter=null, defaultOutcome=null, outcomes=[amqp:accepted:list, 
amqp:rejected:list, amqp:released:list, amqp:modified:list], 
capabilities=null}, target=Target{address='test-order', durable=NONE, 
expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, 
capabilities=[queue]}, unsettled=null, incompleteUnsettled=false, 
initialDeliveryCount=null, maxMessageSize=262144, 
offeredCapabilities=[DELAYED_DELIVERY], desiredCapabilities=null, 
properties=null}
[527804008:1] <- Flow{nextIncomingId=1, incomingWindow=5000, nextOutgoingId=1, 
outgoingWindow=2147483, handle=0, deliveryCount=0, linkCredit=1000, 
available=0, drain=false, echo=false, properties=null}
[527804008:1] -> Transfer{handle=0, deliveryId=0, deliveryTag=\x00, 
messageFormat=0, settled=false, more=false, rcvSettleMode=null, state=null, 
resume=false, aborted=false, batchable=false} (166) 
"\x00Sp\xc0\x02\x01A\x00Sr\xc1)\x04\xa3\x0ex-opt-jms-destQ\x00\xa3\x12x-opt-jms-msg-typeQ\x05\x00Ss\xd0\x00\x00\x00Y\x00\x00\x00\x0b\xa1/ID:****:1:1:1-1@\xa1\x0atest-order@@@@@@\x83\x00\x00\x01z\xec\xab\xc4\xac\xa1\x06112233\x00Sw\xa1\x0bHello
 World"
[527804008:1] <- Disposition{role=RECEIVER, first=0, last=null, settled=true, 
state=Accepted{}, batchable=false}
** Sent finish **
[527804008:0] <- Empty Frame
[527804008:0] <- Empty Frame
[527804008:0] <- Empty Frame
[527804008:0] <- Empty Frame
[527804008:0] -> Empty Frame
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to