Hi Robbie, NOTE - Just now I subscribed to the user mailing list, so I haven't received email earlier for your reply. I checked through mail archive. Sorry in advance if it creates new email thread.
Thanks for quick reply on my query. Qpid-JMS-Client widely used in our organization, so we are trying to solve this issue with Qpid-JMS-Client which will be easy to roll out. I did testing with Microsoft SDK and was able to receive messages. Microsoft SDK also uses "proton-j" library . Microsoft SDK needs sessionId(JMXGroupId) to receive messages. Please find attached 1. Sample Microsoft SDK code - ReceiveNamedSessionAsyncSample.txt - a) referred from - https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReceiveNamedSessionAsyncSample.java b) Used - com.azure:azure-messaging-servicebus:7.3.0 2. Sample logs( including proton logs) - Received_Session_Enabled_Message_From_Msft_SDK.txt Can you please cross check attached proton logs from Microsoft SDK? Please suggest your opinion, if we can use Qpid-JMS-Client or not for this purpose? Any hacking ? If not possible then we need to use Microsoft SDK. Regards, Abhishek Kumar On Wed, Jul 28, 2021 at 12:31 PM A K <mailbox.abhishek.ku...@gmail.com> wrote: > Hi Team, > > We are using qpid-jms-client-0.57.0 to publish and receive messages from > Azure ServiceBus. ServiceBus provides a feature to receive messages from > session to maintain message order. Please refer here for more details - > https://docs.microsoft.com/en-us/azure/service-bus-messaging/message-sessions > > I am able to publish messages with JMXGroupId but not able to receive > messages from session enabled queue. > > Getting error - javax.jms.JMSException: *It is not possible for an entity > that requires sessions to create a non-sessionful message receiver*. > TrackingId:***, > SystemTracker:mule-intr-sbus-test-standard:Queue:test-order, > Timestamp:2021-07-28T11:07:49 TrackingId:**, SystemTracker:gateway7, > Timestamp:2021-07-28T11:07:49 [condition = amqp:not-allowed] > > Please find attached > 1. TestSessionEnable.txt - Sample code > 2. Failed_To_Receive_Msgs.txt - Error details with proton logs > 3. Publish_Successfully.txt > > Could you please suggest any way to receive session enabled messages? > Please let me know for any queries. > > Regards, > Abhishek Kumar >
INFO 2021-07-28 17:06:49,422 [main] com.azure.messaging.servicebus.implementation.ServiceBusConnectionProcessor: namespace[] entityPath[****.servicebus.windows.net]: Setting next AMQP channel. INFO 2021-07-28 17:06:49,424 [main] com.azure.messaging.servicebus.implementation.ServiceBusConnectionProcessor: namespace[] entityPath[****.servicebus.windows.net]: Next AMQP channel received, updating 0 current subscribers INFO 2021-07-28 17:06:49,429 [main] com.azure.messaging.servicebus.ServiceBusClientBuilder: # of open clients with shared connection: 1 *** Start receiving message ** INFO 2021-07-28 17:06:49,492 [main] com.azure.core.amqp.implementation.ReactorConnection: connectionId[MF_e00897_1627488409363]: Creating and starting connection to ****.servicebus.windows.net:5671 INFO 2021-07-28 17:06:49,528 [main] com.azure.core.amqp.implementation.ReactorExecutor: connectionId[MF_e00897_1627488409363], message[Starting reactor.] INFO 2021-07-28 17:06:49,537 [reactor-executor-1] com.azure.core.amqp.implementation.handler.ConnectionHandler: onConnectionInit connectionId[MF_e00897_1627488409363] hostname[****.servicebus.windows.net] amqpHostname[****.servicebus.windows.net] *** Finish receiving messages ** INFO 2021-07-28 17:06:49,539 [reactor-executor-1] com.azure.core.amqp.implementation.handler.ReactorHandler: connectionId[MF_e00897_1627488409363] reactor.onReactorInit INFO 2021-07-28 17:06:49,539 [reactor-executor-1] com.azure.core.amqp.implementation.handler.ConnectionHandler: onConnectionLocalOpen connectionId[MF_e00897_1627488409363] hostname[****.servicebus.windows.net] errorCondition[null] errorDescription[null] INFO 2021-07-28 17:06:49,632 [reactor-executor-1] com.azure.core.amqp.implementation.handler.ConnectionHandler: onConnectionBound connectionId[MF_e00897_1627488409363] hostname[****.servicebus.windows.net] peerDetails[****.servicebus.windows.net:5671] INFO 2021-07-28 17:06:50,208 [reactor-executor-1] com.azure.core.amqp.implementation.handler.StrictTlsContextSpi: SSLv2Hello was an enabled protocol. Filtering out. [1576319144:0] -> SASL [1576319144:0] -> SaslInit{mechanism=ANONYMOUS, initialResponse=null, hostname='null'} [1576319144:0] <- SASL [1576319144:0] <- SaslMechanisms{saslServerMechanisms=[MSSBCBS, PLAIN, ANONYMOUS, EXTERNAL]} [1576319144:0] <- SaslOutcome{_code=OK, _additionalData=Welcome!} [1576319144:0] -> AMQP [1576319144:0] -> Open{ containerId='MF_e00897_1627488409363', hostname='****.servicebus.windows.net', maxFrameSize=65536, channelMax=65535, idleTimeOut=30000, outgoingLocales=null, incomingLocales=null, offeredCapabilities=null, desiredCapabilities=null, properties={product=azure-messaging-servicebus, framework=jre:1.8.0_242;vendor:AdoptOpenJDK;jvm25.242-b08, version=7.3.0, platform=Windows 10 10.0, user-agent=azsdk-java-azure-messaging-servicebus/7.3.0 (1.8.0_242; Windows 10; 10.0)}} [1576319144:0] <- AMQP [1576319144:0] <- Open{ containerId='1748f2a9ec1b4f6787a45fe25b257dc1_G40', hostname='null', maxFrameSize=65536, channelMax=4999, idleTimeOut=120000, outgoingLocales=null, incomingLocales=null, offeredCapabilities=null, desiredCapabilities=null, properties=null} INFO 2021-07-28 17:06:50,409 [reactor-executor-1] com.azure.core.amqp.implementation.handler.ConnectionHandler: onConnectionRemoteOpen hostname[****.servicebus.windows.net], connectionId[MF_e00897_1627488409363], remoteContainer[1748f2a9ec1b4f6787a45fe25b257dc1_G40] INFO 2021-07-28 17:06:50,409 [reactor-executor-1] com.azure.messaging.servicebus.implementation.ServiceBusConnectionProcessor: namespace[] entityPath[****.servicebus.windows.net]: Channel is now active. [1576319144:0] -> Begin{remoteChannel=null, nextOutgoingId=1, incomingWindow=2147483647, outgoingWindow=2147483647, handleMax=65535, offeredCapabilities=null, desiredCapabilities=null, properties=null} [1576319144:0] <- Begin{remoteChannel=0, nextOutgoingId=1, incomingWindow=5000, outgoingWindow=2147483647, handleMax=255, offeredCapabilities=null, desiredCapabilities=null, properties=null} INFO 2021-07-28 17:06:50,453 [reactor-executor-1] com.azure.core.amqp.implementation.handler.SessionHandler: onSessionRemoteOpen connectionId[MF_e00897_1627488409363], entityName[test-order], sessionIncCapacity[0], sessionOutgoingWindow[2147483647] INFO 2021-07-28 17:06:50,480 [reactor-executor-1] com.azure.core.amqp.implementation.ReactorConnection: Setting CBS channel. [1576319144:1] -> Begin{remoteChannel=null, nextOutgoingId=1, incomingWindow=2147483647, outgoingWindow=2147483647, handleMax=65535, offeredCapabilities=null, desiredCapabilities=null, properties=null} [1576319144:1] <- Begin{remoteChannel=1, nextOutgoingId=1, incomingWindow=5000, outgoingWindow=2147483647, handleMax=255, offeredCapabilities=null, desiredCapabilities=null, properties=null} INFO 2021-07-28 17:06:50,508 [reactor-executor-1] com.azure.core.amqp.implementation.handler.SessionHandler: onSessionRemoteOpen connectionId[MF_e00897_1627488409363], entityName[cbs-session], sessionIncCapacity[0], sessionOutgoingWindow[2147483647] INFO 2021-07-28 17:06:50,524 [reactor-executor-1] com.azure.core.amqp.implementation.ReactorConnection: connectionId[MF_e00897_1627488409363] entityPath[$cbs] linkName[cbs] Emitting new response channel. INFO 2021-07-28 17:06:50,524 [reactor-executor-1] class com.azure.core.amqp.implementation.RequestResponseChannel:$cbs: namespace[MF_e00897_1627488409363] entityPath[$cbs]: Setting next AMQP channel. INFO 2021-07-28 17:06:50,524 [reactor-executor-1] class com.azure.core.amqp.implementation.RequestResponseChannel:$cbs: namespace[MF_e00897_1627488409363] entityPath[$cbs]: Next AMQP channel received, updating 1 current subscribers [1576319144:1] -> Attach{name='cbs:sender', handle=0, role=SENDER, sndSettleMode=SETTLED, rcvSettleMode=FIRST, source=Source{address='null', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, distributionMode=null, filter=null, defaultOutcome=null, outcomes=null, capabilities=null}, target=Target{address='$cbs', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, capabilities=null}, unsettled=null, incompleteUnsettled=false, initialDeliveryCount=0, maxMessageSize=null, offeredCapabilities=null, desiredCapabilities=null, properties=null} [1576319144:1] -> Attach{name='cbs:receiver', handle=1, role=RECEIVER, sndSettleMode=SETTLED, rcvSettleMode=FIRST, source=Source{address='$cbs', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, distributionMode=null, filter=null, defaultOutcome=null, outcomes=null, capabilities=null}, target=Target{address='cbs-client-reply-to', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, capabilities=null}, unsettled=null, incompleteUnsettled=false, initialDeliveryCount=null, maxMessageSize=null, offeredCapabilities=null, desiredCapabilities=null, properties=null} [1576319144:1] <- Attach{name='cbs:sender', handle=0, role=RECEIVER, sndSettleMode=SETTLED, rcvSettleMode=FIRST, source=Source{address='null', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, distributionMode=null, filter=null, defaultOutcome=null, outcomes=null, capabilities=null}, target=Target{address='$cbs', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, capabilities=null}, unsettled=null, incompleteUnsettled=false, initialDeliveryCount=null, maxMessageSize=18446744073709551615, offeredCapabilities=null, desiredCapabilities=null, properties=null} [1576319144:1] <- Flow{nextIncomingId=1, incomingWindow=5000, nextOutgoingId=1, outgoingWindow=2147483647, handle=0, deliveryCount=0, linkCredit=100, available=0, drain=false, echo=false, properties=null} [1576319144:1] <- Attach{name='cbs:receiver', handle=1, role=SENDER, sndSettleMode=SETTLED, rcvSettleMode=FIRST, source=Source{address='$cbs', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, distributionMode=null, filter=null, defaultOutcome=null, outcomes=null, capabilities=null}, target=Target{address='cbs-client-reply-to', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, capabilities=null}, unsettled=null, incompleteUnsettled=false, initialDeliveryCount=0, maxMessageSize=18446744073709551615, offeredCapabilities=null, desiredCapabilities=null, properties=null} INFO 2021-07-28 17:06:50,555 [reactor-executor-1] com.azure.core.amqp.implementation.handler.SendLinkHandler: onLinkRemoteOpen connectionId[MF_e00897_1627488409363], entityPath[$cbs], linkName[cbs:sender], remoteTarget[Target{address='$cbs', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, capabilities=null}] INFO 2021-07-28 17:06:50,557 [reactor-executor-1] com.azure.core.amqp.implementation.handler.ReceiveLinkHandler: onLinkRemoteOpen connectionId[MF_e00897_1627488409363], entityPath[$cbs], linkName[cbs:receiver], remoteSource[Source{address='$cbs', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, distributionMode=null, filter=null, defaultOutcome=null, outcomes=null, capabilities=null}] INFO 2021-07-28 17:06:50,557 [reactor-executor-1] class com.azure.core.amqp.implementation.RequestResponseChannel:$cbs: namespace[MF_e00897_1627488409363] entityPath[$cbs]: Channel is now active. [1576319144:1] -> Flow{nextIncomingId=1, incomingWindow=2147483647, nextOutgoingId=1, outgoingWindow=2147483647, handle=1, deliveryCount=0, linkCredit=1, available=null, drain=false, echo=false, properties=null} [1576319144:1] -> Transfer{handle=0, deliveryId=0, deliveryTag=952fc94ba10148778a253bd1503082d8, messageFormat=0, settled=true, more=false, rcvSettleMode=null, state=null, resume=false, aborted=false, batchable=false} (405) "\x00Ss\xd0\x00\x00\x00\x1e\x00\x00\x00\x05S\x01@@@\xa1\x13cbs-client-reply-to\x00St\xc1\xa0\x08\xa1\x04name\xa1Eamqp://****.servicebus.windows.net/test-order\xa1\x0aexpiration\x83\x00\x00\x01z\xed\xf0\xab\x1f\xa1\x04type\xa1\x1fservicebus.windows.net:sastoken\xa1\x09operation\xa1\x09put-token\x00Sw\xa1\xc5SharedAccessSignature sr=amqp%3A%2F%2F****.servicebus.windows.net%2Ftest-order&sig=MhLwCMWXckuAhscdCfxcvcSar1EBDtXWWVydu1EsfnE%3D&se=1627489610&skn=****" [1576319144:1] <- Transfer{handle=1, deliveryId=0, deliveryTag=\x01\x00\x00\x00, messageFormat=0, settled=true, more=false, rcvSettleMode=null, state=null, resume=false, aborted=false, batchable=false} (74) "\x00Ss\xc0\x0f\x0d@@@@@S\x01@@@@@@@\x00St\xc11\x04\xa1\x0bstatus-codeq\x00\x00\x00\xca\xa1\x12status-description\xa1\x08Accepted" INFO 2021-07-28 17:06:50,613 [reactor-executor-1] com.azure.core.amqp.implementation.ActiveClientTokenManager: Scheduling refresh token task. scopes[amqp://****.servicebus.windows.net/test-order] INFO 2021-07-28 17:06:50,635 [reactor-executor-1] com.azure.core.amqp.implementation.ReactorSession: connectionId[MF_e00897_1627488409363] sessionId[test-order] linkName[11] Creating a new receiver link. [1576319144:0] -> Attach{name='11', handle=0, role=RECEIVER, sndSettleMode=UNSETTLED, rcvSettleMode=SECOND, source=Source{address='test-order', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, distributionMode=null, filter={com.microsoft:session-filter=11}, defaultOutcome=null, outcomes=null, capabilities=null}, target=Target{address='null', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, capabilities=null}, unsettled=null, incompleteUnsettled=false, initialDeliveryCount=null, maxMessageSize=null, offeredCapabilities=null, desiredCapabilities=null, properties={com.microsoft:timeout=59000, com.microsoft:entity-type=0}} [1576319144:0] <- Attach{name='11', handle=0, role=SENDER, sndSettleMode=UNSETTLED, rcvSettleMode=SECOND, source=Source{address='test-order', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, distributionMode=null, filter={com.microsoft:session-filter=11}, defaultOutcome=null, outcomes=null, capabilities=null}, target=Target{address='null', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, capabilities=null}, unsettled=null, incompleteUnsettled=false, initialDeliveryCount=0, maxMessageSize=18446744073709551615, offeredCapabilities=[SHARED-SUBS], desiredCapabilities=null, properties={com.microsoft:timeout=58980, com.microsoft:entity-type=0, client-id=MF_e00897_1627488409363, com.microsoft:locked-until-utc=637630852406531084}} INFO 2021-07-28 17:06:50,671 [reactor-executor-1] com.azure.core.amqp.implementation.handler.ReceiveLinkHandler: onLinkRemoteOpen connectionId[MF_e00897_1627488409363], entityPath[test-order], linkName[11], remoteSource[Source{address='test-order', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, distributionMode=null, filter={com.microsoft:session-filter=11}, defaultOutcome=null, outcomes=null, capabilities=null}] 2021-07-28 17:06:50,719 Log4j2-TF-1-AsyncLoggerConfig-1 ERROR An exception occurred processing Appender Console java.lang.UnsupportedOperationException: Although QueueSubscription extends Queue it is purely internal and only guarantees support for poll/clear/size/isEmpty. Instances shouldn't be used/exposed as Queue outside of Reactor operators. at reactor.core.Fuseable$QueueSubscription.iterator(Fuseable.java:145) at org.apache.logging.log4j.message.ParameterFormatter.appendCollection(ParameterFormatter.java:599) at org.apache.logging.log4j.message.ParameterFormatter.appendPotentiallyRecursiveValue(ParameterFormatter.java:507) at org.apache.logging.log4j.message.ParameterFormatter.recursiveDeepToString(ParameterFormatter.java:432) at org.apache.logging.log4j.message.ParameterFormatter.formatMessage2(ParameterFormatter.java:189) at org.apache.logging.log4j.message.ParameterizedMessage.formatTo(ParameterizedMessage.java:225) at org.apache.logging.log4j.core.pattern.MessagePatternConverter.format(MessagePatternConverter.java:119) at org.apache.logging.log4j.core.pattern.PatternFormatter.format(PatternFormatter.java:38) at org.apache.logging.log4j.core.layout.PatternLayout$PatternSerializer.toSerializable(PatternLayout.java:333) at org.apache.logging.log4j.core.layout.PatternLayout.toText(PatternLayout.java:232) at org.apache.logging.log4j.core.layout.PatternLayout.encode(PatternLayout.java:217) at org.apache.logging.log4j.core.layout.PatternLayout.encode(PatternLayout.java:57) at org.apache.logging.log4j.core.appender.AbstractOutputStreamAppender.directEncodeEvent(AbstractOutputStreamAppender.java:177) at org.apache.logging.log4j.core.appender.AbstractOutputStreamAppender.tryAppend(AbstractOutputStreamAppender.java:170) at org.apache.logging.log4j.core.appender.AbstractOutputStreamAppender.append(AbstractOutputStreamAppender.java:161) at org.apache.logging.log4j.core.config.AppenderControl.tryCallAppender(AppenderControl.java:156) at org.apache.logging.log4j.core.config.AppenderControl.callAppender0(AppenderControl.java:129) at org.apache.logging.log4j.core.config.AppenderControl.callAppenderPreventRecursion(AppenderControl.java:120) at org.apache.logging.log4j.core.config.AppenderControl.callAppender(AppenderControl.java:84) at org.apache.logging.log4j.core.config.LoggerConfig.callAppenders(LoggerConfig.java:448) at org.apache.logging.log4j.core.async.AsyncLoggerConfig.asyncCallAppenders(AsyncLoggerConfig.java:115) at org.apache.logging.log4j.core.async.AsyncLoggerConfigDisruptor$Log4jEventWrapperHandler.onEvent(AsyncLoggerConfigDisruptor.java:112) at org.apache.logging.log4j.core.async.AsyncLoggerConfigDisruptor$Log4jEventWrapperHandler.onEvent(AsyncLoggerConfigDisruptor.java:98) at com.lmax.disruptor.BatchEventProcessor.run(BatchEventProcessor.java:128) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) INFO 2021-07-28 17:06:50,722 [reactor-executor-1] com.azure.core.amqp.implementation.ReactorSession: linkName[11] entityPath[test-order] Returning existing receive link. [1576319144:0] -> Flow{nextIncomingId=1, incomingWindow=2147483647, nextOutgoingId=1, outgoingWindow=2147483647, handle=0, deliveryCount=0, linkCredit=1, available=null, drain=false, echo=false, properties=null} [1576319144:0] <- Transfer{handle=0, deliveryId=0, deliveryTag=\xb1\xe8\xb6\xd8\x9e\xf4\x9bM\x8eJ\xd3\x07\xad2\xa6$, messageFormat=0, settled=null, more=false, rcvSettleMode=null, state=null, resume=false, aborted=false, batchable=true} (356) "\x00Sp\xc0\x0a\x05A@pH\x19\x08\x00@C\x00Sq\xc1$\x02\xa3\x10x-opt-lock-token\x98\xd8\xb6\xe8\xb1\xf4\x9eM\x9b\x8eJ\xd3\x07\xad2\xa6$\x00Sr\xc1\x9d\x0c\xa3\x0ex-opt-jms-destQ\x00\xa3\x12x-opt-jms-msg-typeQ\x03\xa3\x13x-opt-enqueued-time\x83\x00\x00\x01z\xed\xd6z(\xa3\x15x-opt-sequence-number\x81\x00\x00\x00\x00\x00\x00\x03\x87\xa3\x13x-opt-partition-key\xa1\x0211\xa3\x12x-opt-locked-until\x83\x00\x00\xe6w\xd2\x1f\xdc\x00\x00Ss\xc0u\x0d\xa1/ID:625d06cd-e2f9-4982-820e-ee471d9bd4a6:1:1:1-1@\xa1\x0atest-order@@@\xa3\x18application/octet-stream@\x83\x00\x00\x01{5\xef\x82!\x83\x00\x00\x01z\xed\xd6z!\xa1\x0211@@\x00Su\xa0\x0bHello World" ** SessinID **11 ** SequenceNumber **903 ** Body **Hello World [1576319144:0] -> Disposition{role=RECEIVER, first=0, last=0, settled=false, state=Accepted{}, batchable=false} [1576319144:0] <- Disposition{role=SENDER, first=0, last=null, settled=true, state=Accepted{}, batchable=false} [1576319144:0] -> Flow{nextIncomingId=2, incomingWindow=2147483647, nextOutgoingId=1, outgoingWindow=2147483647, handle=0, deliveryCount=1, linkCredit=1, available=null, drain=false, echo=false, properties=null} [1576319144:0] <- Transfer{handle=0, deliveryId=1, deliveryTag=\xc6"\xca\x1e)\xc5\x89B\xb1\xd3w\x9a\x04>#\xb8, messageFormat=0, settled=null, more=false, rcvSettleMode=null, state=null, resume=false, aborted=false, batchable=true} (356) "\x00Sp\xc0\x0a\x05A@pH\x19\x08\x00@C\x00Sq\xc1$\x02\xa3\x10x-opt-lock-token\x98\x1e\xca"\xc6\xc5)B\x89\xb1\xd3w\x9a\x04>#\xb8\x00Sr\xc1\x9d\x0c\xa3\x0ex-opt-jms-destQ\x00\xa3\x12x-opt-jms-msg-typeQ\x03\xa3\x13x-opt-enqueued-time\x83\x00\x00\x01z\xed\xd6{Q\xa3\x15x-opt-sequence-number\x81\x00\x00\x00\x00\x00\x00\x03\x88\xa3\x13x-opt-partition-key\xa1\x0211\xa3\x12x-opt-locked-until\x83\x00\x00\xe6w\xd2\x1f\xdc\x00\x00Ss\xc0u\x0d\xa1/ID:54b29b8b-77c3-4cc8-8e6c-af1ff531ad19:1:1:1-1@\xa1\x0atest-order@@@\xa3\x18application/octet-stream@\x83\x00\x00\x01{5\xef\x83S\x83\x00\x00\x01z\xed\xd6{S\xa1\x0211@@\x00Su\xa0\x0bHello World" ** SessinID **11 ** SequenceNumber **904 ** Body **Hello World [1576319144:0] -> Disposition{role=RECEIVER, first=1, last=1, settled=false, state=Accepted{}, batchable=false} [1576319144:0] <- Disposition{role=SENDER, first=1, last=null, settled=true, state=Accepted{}, batchable=false} [1576319144:0] -> Flow{nextIncomingId=3, incomingWindow=2147483647, nextOutgoingId=1, outgoingWindow=2147483647, handle=0, deliveryCount=2, linkCredit=1, available=null, drain=false, echo=false, properties=null} [1576319144:0] -> Detach{handle=0, closed=true, error=null} INFO 2021-07-28 17:06:59,547 [main] com.azure.messaging.servicebus.ServiceBusReceiverAsyncClient: Removing receiver links. INFO 2021-07-28 17:06:59,548 [main] com.azure.messaging.servicebus.ServiceBusClientBuilder: Closing a dependent client. # of open clients: 0 INFO 2021-07-28 17:06:59,549 [main] com.azure.messaging.servicebus.ServiceBusClientBuilder: No more open clients, closing shared connection [ServiceBusConnectionProcessor]. INFO 2021-07-28 17:06:59,549 [main] com.azure.messaging.servicebus.implementation.ServiceBusConnectionProcessor: Upstream connection publisher was completed. Terminating processor. INFO 2021-07-28 17:06:59,549 [main] com.azure.messaging.servicebus.implementation.ServiceBusConnectionProcessor: namespace[] entityPath[****.servicebus.windows.net]: AMQP channel processor completed. Notifying 0 subscribers. INFO 2021-07-28 17:06:59,549 [main] com.azure.core.amqp.implementation.ReactorConnection: connectionId[MF_e00897_1627488409363] signal[Disposed by client., isTransient[false], initiatedByClient[true]]: Disposing of ReactorConnection. INFO 2021-07-28 17:06:59,549 [main] com.azure.messaging.servicebus.implementation.ServiceBusConnectionProcessor: namespace[] entityPath[****.servicebus.windows.net]: Channel is disposed. [1576319144:0] -> End{error=null} [1576319144:1] -> End{error=null} [1576319144:0] <- Detach{handle=0, closed=true, error=null} INFO 2021-07-28 17:06:59,618 [reactor-executor-1] com.azure.core.amqp.implementation.handler.ReceiveLinkHandler: onLinkRemoteClose connectionId[MF_e00897_1627488409363] linkName[11], errorCondition[null] errorDescription[null] INFO 2021-07-28 17:06:59,621 [reactor-executor-1] com.azure.core.amqp.implementation.handler.ReceiveLinkHandler: onLinkFinal connectionId[MF_e00897_1627488409363], linkName[11] [1576319144:0] <- End{error=null} INFO 2021-07-28 17:06:59,632 [reactor-executor-1] com.azure.core.amqp.implementation.handler.SessionHandler: onSessionRemoteClose connectionId[test-order], entityName[MF_e00897_1627488409363], condition[Error{condition=null, description='null', info=null}] [1576319144:1] <- End{error=null} INFO 2021-07-28 17:06:59,633 [reactor-executor-1] com.azure.core.amqp.implementation.handler.SessionHandler: onSessionRemoteClose connectionId[cbs-session], entityName[MF_e00897_1627488409363], condition[Error{condition=null, description='null', info=null}]
/** * */ package com.sbus.test; import com.azure.core.amqp.models.AmqpAnnotatedMessage; import com.azure.messaging.servicebus.ServiceBusClientBuilder; import com.azure.messaging.servicebus.ServiceBusReceiverAsyncClient; import com.azure.messaging.servicebus.ServiceBusSessionReceiverAsyncClient; import com.azure.messaging.servicebus.models.ServiceBusReceiveMode; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import reactor.core.Disposable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; /** * Demonstrates how to receive messages from a named session. */ public class ReceiveNamedSessionAsyncSample { String connectionString = "**"; String queueName = "test-order"; /** * Main method to invoke this demo on how to receive messages from a session with id "greetings" in an Azure Service * Bus Queue. * * @param args Unused arguments to the program. * * @throws InterruptedException If the program is unable to sleep while waiting for the operations to complete. */ public static void main(String[] args) throws InterruptedException { ReceiveNamedSessionAsyncSample sample = new ReceiveNamedSessionAsyncSample(); sample.run(); } /** * This method to invoke this demo on how to receive messages from a session with id "greetings" in an Azure Service * Bus Queue. * * @throws InterruptedException If the program is unable to sleep while waiting for the operations to complete. */ @Test public void run() throws InterruptedException { AtomicBoolean sampleSuccessful = new AtomicBoolean(true); CountDownLatch countdownLatch = new CountDownLatch(1); // The connection string value can be obtained by: // 1. Going to your Service Bus namespace in Azure Portal. // 2. Go to "Shared access policies" // 3. Copy the connection string for the "RootManageSharedAccessKey" policy. // The 'connectionString' format is shown below. // 1. "Endpoint={fully-qualified-namespace};SharedAccessKeyName={policy-name};SharedAccessKey={key}" // 2. "<<fully-qualified-namespace>>" will look similar to "{your-namespace}.servicebus.windows.net" // 3. "queueName" will be the name of the Service Bus queue instance you created // inside the Service Bus namespace. // Create a receiver. ServiceBusSessionReceiverAsyncClient sessionReceiver = new ServiceBusClientBuilder() .connectionString(connectionString) .sessionReceiver() .receiveMode(ServiceBusReceiveMode.PEEK_LOCK) .queueName(queueName) //.disableAutoComplete() .buildAsyncClient(); // Receiving messages that have the sessionId "greetings-id" set. This can be set via // ServiceBusMessage.setMessageId(String) when sending a message. // The Mono completes successfully when a lock on the session is acquired, otherwise, it completes with an // error. Mono<ServiceBusReceiverAsyncClient> receiverMono = sessionReceiver.acceptSession("greetings-id"); System.out.println("*** Start receiving message **"); // If the session is successfully accepted, begin receiving messages from it. // Flux.usingWhen is used to dispose of the receiver after consuming messages completes. Disposable subscription = Flux.usingWhen(receiverMono, receiver -> receiver.receiveMessages(), receiver -> Mono.fromRunnable(() -> receiver.close())) .subscribe(message -> { System.out.println("** SessinID **" + message.getSessionId()); System.out.println("** SequenceNumber **" + message.getSequenceNumber()); System.out.println("** Body **" + message.getBody().toString()); // Process message. //System.out.printf("***** Session: %s. Sequence #: %s. Contents: %s%n", message.getSessionId(), // message.getSequenceNumber(), message.getBody()); // When this message function completes, the message is automatically completed. If an exception is // thrown in here, the message is abandoned. // To disable this behaviour, toggle ServiceBusSessionReceiverClientBuilder.disableAutoComplete() // when building the session receiver. }, error -> { System.err.println("Error occurred: " + error); sampleSuccessful.set(false); }); System.err.println("*** Finish receiving messages **"); // Subscribe is not a blocking call so we wait here so the program does not end. countdownLatch.await(10, TimeUnit.SECONDS); // Disposing of the subscription will cancel the receive() operation. subscription.dispose(); // Close the receiver. sessionReceiver.close(); // This assertion is to ensure that samples are working. Users should remove this. Assertions.assertTrue(sampleSuccessful.get()); } }
--------------------------------------------------------------------- To unsubscribe, e-mail: users-unsubscr...@qpid.apache.org For additional commands, e-mail: users-h...@qpid.apache.org